1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_io::Timer;
10use async_std::task;
11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
12use collections::{HashMap, HashSet};
13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{
21 any::TypeId,
22 future::Future,
23 sync::Arc,
24 time::{Duration, Instant},
25};
26use store::{Store, Worktree};
27use surf::StatusCode;
28use tide::log;
29use tide::{
30 http::headers::{HeaderName, CONNECTION, UPGRADE},
31 Request, Response,
32};
33use time::OffsetDateTime;
34
35type MessageHandler = Box<
36 dyn Send
37 + Sync
38 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
39>;
40
41pub struct Server {
42 peer: Arc<Peer>,
43 store: RwLock<Store>,
44 app_state: Arc<AppState>,
45 handlers: HashMap<TypeId, MessageHandler>,
46 notifications: Option<mpsc::UnboundedSender<()>>,
47}
48
49pub trait Executor: Send + Clone {
50 type Timer: Send + Future;
51 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
52 fn timer(&self, duration: Duration) -> Self::Timer;
53}
54
55#[derive(Clone)]
56pub struct RealExecutor;
57
58const MESSAGE_COUNT_PER_PAGE: usize = 100;
59const MAX_MESSAGE_LEN: usize = 1024;
60
61impl Server {
62 pub fn new(
63 app_state: Arc<AppState>,
64 peer: Arc<Peer>,
65 notifications: Option<mpsc::UnboundedSender<()>>,
66 ) -> Arc<Self> {
67 let mut server = Self {
68 peer,
69 app_state,
70 store: Default::default(),
71 handlers: Default::default(),
72 notifications,
73 };
74
75 server
76 .add_request_handler(Server::ping)
77 .add_request_handler(Server::register_project)
78 .add_message_handler(Server::unregister_project)
79 .add_request_handler(Server::share_project)
80 .add_message_handler(Server::unshare_project)
81 .add_request_handler(Server::join_project)
82 .add_message_handler(Server::leave_project)
83 .add_request_handler(Server::register_worktree)
84 .add_message_handler(Server::unregister_worktree)
85 .add_request_handler(Server::update_worktree)
86 .add_message_handler(Server::update_diagnostic_summary)
87 .add_message_handler(Server::disk_based_diagnostics_updating)
88 .add_message_handler(Server::disk_based_diagnostics_updated)
89 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
90 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
91 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
92 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
93 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
94 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
95 .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
96 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
97 .add_request_handler(
98 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
99 )
100 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
101 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
102 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
103 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
104 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
105 .add_message_handler(Server::close_buffer)
106 .add_request_handler(Server::update_buffer)
107 .add_message_handler(Server::update_buffer_file)
108 .add_message_handler(Server::buffer_reloaded)
109 .add_message_handler(Server::buffer_saved)
110 .add_request_handler(Server::save_buffer)
111 .add_request_handler(Server::get_channels)
112 .add_request_handler(Server::get_users)
113 .add_request_handler(Server::join_channel)
114 .add_message_handler(Server::leave_channel)
115 .add_request_handler(Server::send_channel_message)
116 .add_request_handler(Server::get_channel_messages);
117
118 Arc::new(server)
119 }
120
121 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
122 where
123 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
124 Fut: 'static + Send + Future<Output = tide::Result<()>>,
125 M: EnvelopedMessage,
126 {
127 let prev_handler = self.handlers.insert(
128 TypeId::of::<M>(),
129 Box::new(move |server, envelope| {
130 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
131 (handler)(server, *envelope).boxed()
132 }),
133 );
134 if prev_handler.is_some() {
135 panic!("registered a handler for the same message twice");
136 }
137 self
138 }
139
140 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
141 where
142 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
143 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
144 M: RequestMessage,
145 {
146 self.add_message_handler(move |server, envelope| {
147 let receipt = envelope.receipt();
148 let response = (handler)(server.clone(), envelope);
149 async move {
150 match response.await {
151 Ok(response) => {
152 server.peer.respond(receipt, response)?;
153 Ok(())
154 }
155 Err(error) => {
156 server.peer.respond_with_error(
157 receipt,
158 proto::Error {
159 message: error.to_string(),
160 },
161 )?;
162 Err(error)
163 }
164 }
165 }
166 })
167 }
168
169 pub fn handle_connection<E: Executor>(
170 self: &Arc<Self>,
171 connection: Connection,
172 addr: String,
173 user_id: UserId,
174 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
175 executor: E,
176 ) -> impl Future<Output = ()> {
177 let mut this = self.clone();
178 async move {
179 let (connection_id, handle_io, mut incoming_rx) = this
180 .peer
181 .add_connection(connection, {
182 let executor = executor.clone();
183 move |duration| {
184 let timer = executor.timer(duration);
185 async move {
186 timer.await;
187 }
188 }
189 })
190 .await;
191
192 if let Some(send_connection_id) = send_connection_id.as_mut() {
193 let _ = send_connection_id.send(connection_id).await;
194 }
195
196 this.state_mut().add_connection(connection_id, user_id);
197 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
198 log::error!("error updating contacts for {:?}: {}", user_id, err);
199 }
200
201 let handle_io = handle_io.fuse();
202 futures::pin_mut!(handle_io);
203 loop {
204 let next_message = incoming_rx.next().fuse();
205 futures::pin_mut!(next_message);
206 futures::select_biased! {
207 result = handle_io => {
208 if let Err(err) = result {
209 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
210 }
211 break;
212 }
213 message = next_message => {
214 if let Some(message) = message {
215 let start_time = Instant::now();
216 let type_name = message.payload_type_name();
217 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
218 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
219 let notifications = this.notifications.clone();
220 let is_background = message.is_background();
221 let handle_message = (handler)(this.clone(), message);
222 let handle_message = async move {
223 if let Err(err) = handle_message.await {
224 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
225 } else {
226 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
227 }
228 if let Some(mut notifications) = notifications {
229 let _ = notifications.send(()).await;
230 }
231 };
232 if is_background {
233 executor.spawn_detached(handle_message);
234 } else {
235 handle_message.await;
236 }
237 } else {
238 log::warn!("unhandled message: {}", type_name);
239 }
240 } else {
241 log::info!("rpc connection closed {:?}", addr);
242 break;
243 }
244 }
245 }
246 }
247
248 if let Err(err) = this.sign_out(connection_id).await {
249 log::error!("error signing out connection {:?} - {:?}", addr, err);
250 }
251 }
252 }
253
254 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
255 self.peer.disconnect(connection_id);
256 let removed_connection = self.state_mut().remove_connection(connection_id)?;
257
258 for (project_id, project) in removed_connection.hosted_projects {
259 if let Some(share) = project.share {
260 broadcast(
261 connection_id,
262 share.guests.keys().copied().collect(),
263 |conn_id| {
264 self.peer
265 .send(conn_id, proto::UnshareProject { project_id })
266 },
267 )?;
268 }
269 }
270
271 for (project_id, peer_ids) in removed_connection.guest_project_ids {
272 broadcast(connection_id, peer_ids, |conn_id| {
273 self.peer.send(
274 conn_id,
275 proto::RemoveProjectCollaborator {
276 project_id,
277 peer_id: connection_id.0,
278 },
279 )
280 })?;
281 }
282
283 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
284 Ok(())
285 }
286
287 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
288 Ok(proto::Ack {})
289 }
290
291 async fn register_project(
292 mut self: Arc<Server>,
293 request: TypedEnvelope<proto::RegisterProject>,
294 ) -> tide::Result<proto::RegisterProjectResponse> {
295 let project_id = {
296 let mut state = self.state_mut();
297 let user_id = state.user_id_for_connection(request.sender_id)?;
298 state.register_project(request.sender_id, user_id)
299 };
300 Ok(proto::RegisterProjectResponse { project_id })
301 }
302
303 async fn unregister_project(
304 mut self: Arc<Server>,
305 request: TypedEnvelope<proto::UnregisterProject>,
306 ) -> tide::Result<()> {
307 let project = self
308 .state_mut()
309 .unregister_project(request.payload.project_id, request.sender_id)?;
310 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
311 Ok(())
312 }
313
314 async fn share_project(
315 mut self: Arc<Server>,
316 request: TypedEnvelope<proto::ShareProject>,
317 ) -> tide::Result<proto::Ack> {
318 self.state_mut()
319 .share_project(request.payload.project_id, request.sender_id);
320 Ok(proto::Ack {})
321 }
322
323 async fn unshare_project(
324 mut self: Arc<Server>,
325 request: TypedEnvelope<proto::UnshareProject>,
326 ) -> tide::Result<()> {
327 let project_id = request.payload.project_id;
328 let project = self
329 .state_mut()
330 .unshare_project(project_id, request.sender_id)?;
331
332 broadcast(request.sender_id, project.connection_ids, |conn_id| {
333 self.peer
334 .send(conn_id, proto::UnshareProject { project_id })
335 })?;
336 self.update_contacts_for_users(&project.authorized_user_ids)?;
337 Ok(())
338 }
339
340 async fn join_project(
341 mut self: Arc<Server>,
342 request: TypedEnvelope<proto::JoinProject>,
343 ) -> tide::Result<proto::JoinProjectResponse> {
344 let project_id = request.payload.project_id;
345
346 let user_id = self.state().user_id_for_connection(request.sender_id)?;
347 let (response, connection_ids, contact_user_ids) = self
348 .state_mut()
349 .join_project(request.sender_id, user_id, project_id)
350 .and_then(|joined| {
351 let share = joined.project.share()?;
352 let peer_count = share.guests.len();
353 let mut collaborators = Vec::with_capacity(peer_count);
354 collaborators.push(proto::Collaborator {
355 peer_id: joined.project.host_connection_id.0,
356 replica_id: 0,
357 user_id: joined.project.host_user_id.to_proto(),
358 });
359 let worktrees = share
360 .worktrees
361 .iter()
362 .filter_map(|(id, shared_worktree)| {
363 let worktree = joined.project.worktrees.get(&id)?;
364 Some(proto::Worktree {
365 id: *id,
366 root_name: worktree.root_name.clone(),
367 entries: shared_worktree.entries.values().cloned().collect(),
368 diagnostic_summaries: shared_worktree
369 .diagnostic_summaries
370 .values()
371 .cloned()
372 .collect(),
373 visible: worktree.visible,
374 })
375 })
376 .collect();
377 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
378 if *peer_conn_id != request.sender_id {
379 collaborators.push(proto::Collaborator {
380 peer_id: peer_conn_id.0,
381 replica_id: *peer_replica_id as u32,
382 user_id: peer_user_id.to_proto(),
383 });
384 }
385 }
386 let response = proto::JoinProjectResponse {
387 worktrees,
388 replica_id: joined.replica_id as u32,
389 collaborators,
390 };
391 let connection_ids = joined.project.connection_ids();
392 let contact_user_ids = joined.project.authorized_user_ids();
393 Ok((response, connection_ids, contact_user_ids))
394 })?;
395
396 broadcast(request.sender_id, connection_ids, |conn_id| {
397 self.peer.send(
398 conn_id,
399 proto::AddProjectCollaborator {
400 project_id,
401 collaborator: Some(proto::Collaborator {
402 peer_id: request.sender_id.0,
403 replica_id: response.replica_id,
404 user_id: user_id.to_proto(),
405 }),
406 },
407 )
408 })?;
409 self.update_contacts_for_users(&contact_user_ids)?;
410 Ok(response)
411 }
412
413 async fn leave_project(
414 mut self: Arc<Server>,
415 request: TypedEnvelope<proto::LeaveProject>,
416 ) -> tide::Result<()> {
417 let sender_id = request.sender_id;
418 let project_id = request.payload.project_id;
419 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
420
421 broadcast(sender_id, worktree.connection_ids, |conn_id| {
422 self.peer.send(
423 conn_id,
424 proto::RemoveProjectCollaborator {
425 project_id,
426 peer_id: sender_id.0,
427 },
428 )
429 })?;
430 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
431
432 Ok(())
433 }
434
435 async fn register_worktree(
436 mut self: Arc<Server>,
437 request: TypedEnvelope<proto::RegisterWorktree>,
438 ) -> tide::Result<proto::Ack> {
439 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
440
441 let mut contact_user_ids = HashSet::default();
442 contact_user_ids.insert(host_user_id);
443 for github_login in &request.payload.authorized_logins {
444 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
445 contact_user_ids.insert(contact_user_id);
446 }
447
448 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
449 let guest_connection_ids;
450 {
451 let mut state = self.state_mut();
452 guest_connection_ids = state
453 .read_project(request.payload.project_id, request.sender_id)?
454 .guest_connection_ids();
455 state.register_worktree(
456 request.payload.project_id,
457 request.payload.worktree_id,
458 request.sender_id,
459 Worktree {
460 authorized_user_ids: contact_user_ids.clone(),
461 root_name: request.payload.root_name.clone(),
462 visible: request.payload.visible,
463 },
464 )?;
465 }
466 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
467 self.peer
468 .forward_send(request.sender_id, connection_id, request.payload.clone())
469 })?;
470 self.update_contacts_for_users(&contact_user_ids)?;
471 Ok(proto::Ack {})
472 }
473
474 async fn unregister_worktree(
475 mut self: Arc<Server>,
476 request: TypedEnvelope<proto::UnregisterWorktree>,
477 ) -> tide::Result<()> {
478 let project_id = request.payload.project_id;
479 let worktree_id = request.payload.worktree_id;
480 let (worktree, guest_connection_ids) =
481 self.state_mut()
482 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
483 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
484 self.peer.send(
485 conn_id,
486 proto::UnregisterWorktree {
487 project_id,
488 worktree_id,
489 },
490 )
491 })?;
492 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
493 Ok(())
494 }
495
496 async fn update_worktree(
497 mut self: Arc<Server>,
498 request: TypedEnvelope<proto::UpdateWorktree>,
499 ) -> tide::Result<proto::Ack> {
500 let connection_ids = self.state_mut().update_worktree(
501 request.sender_id,
502 request.payload.project_id,
503 request.payload.worktree_id,
504 &request.payload.removed_entries,
505 &request.payload.updated_entries,
506 )?;
507
508 broadcast(request.sender_id, connection_ids, |connection_id| {
509 self.peer
510 .forward_send(request.sender_id, connection_id, request.payload.clone())
511 })?;
512
513 Ok(proto::Ack {})
514 }
515
516 async fn update_diagnostic_summary(
517 mut self: Arc<Server>,
518 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
519 ) -> tide::Result<()> {
520 let summary = request
521 .payload
522 .summary
523 .clone()
524 .ok_or_else(|| anyhow!("invalid summary"))?;
525 let receiver_ids = self.state_mut().update_diagnostic_summary(
526 request.payload.project_id,
527 request.payload.worktree_id,
528 request.sender_id,
529 summary,
530 )?;
531
532 broadcast(request.sender_id, receiver_ids, |connection_id| {
533 self.peer
534 .forward_send(request.sender_id, connection_id, request.payload.clone())
535 })?;
536 Ok(())
537 }
538
539 async fn disk_based_diagnostics_updating(
540 self: Arc<Server>,
541 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
542 ) -> tide::Result<()> {
543 let receiver_ids = self
544 .state()
545 .project_connection_ids(request.payload.project_id, request.sender_id)?;
546 broadcast(request.sender_id, receiver_ids, |connection_id| {
547 self.peer
548 .forward_send(request.sender_id, connection_id, request.payload.clone())
549 })?;
550 Ok(())
551 }
552
553 async fn disk_based_diagnostics_updated(
554 self: Arc<Server>,
555 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
556 ) -> tide::Result<()> {
557 let receiver_ids = self
558 .state()
559 .project_connection_ids(request.payload.project_id, request.sender_id)?;
560 broadcast(request.sender_id, receiver_ids, |connection_id| {
561 self.peer
562 .forward_send(request.sender_id, connection_id, request.payload.clone())
563 })?;
564 Ok(())
565 }
566
567 async fn forward_project_request<T>(
568 self: Arc<Server>,
569 request: TypedEnvelope<T>,
570 ) -> tide::Result<T::Response>
571 where
572 T: EntityMessage + RequestMessage,
573 {
574 let host_connection_id = self
575 .state()
576 .read_project(request.payload.remote_entity_id(), request.sender_id)?
577 .host_connection_id;
578 Ok(self
579 .peer
580 .forward_request(request.sender_id, host_connection_id, request.payload)
581 .await?)
582 }
583
584 async fn close_buffer(
585 self: Arc<Server>,
586 request: TypedEnvelope<proto::CloseBuffer>,
587 ) -> tide::Result<()> {
588 let host_connection_id = self
589 .state()
590 .read_project(request.payload.project_id, request.sender_id)?
591 .host_connection_id;
592 self.peer
593 .forward_send(request.sender_id, host_connection_id, request.payload)?;
594 Ok(())
595 }
596
597 async fn save_buffer(
598 self: Arc<Server>,
599 request: TypedEnvelope<proto::SaveBuffer>,
600 ) -> tide::Result<proto::BufferSaved> {
601 let host;
602 let mut guests;
603 {
604 let state = self.state();
605 let project = state.read_project(request.payload.project_id, request.sender_id)?;
606 host = project.host_connection_id;
607 guests = project.guest_connection_ids()
608 }
609
610 let response = self
611 .peer
612 .forward_request(request.sender_id, host, request.payload.clone())
613 .await?;
614
615 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
616 broadcast(host, guests, |conn_id| {
617 self.peer.forward_send(host, conn_id, response.clone())
618 })?;
619
620 Ok(response)
621 }
622
623 async fn update_buffer(
624 self: Arc<Server>,
625 request: TypedEnvelope<proto::UpdateBuffer>,
626 ) -> tide::Result<proto::Ack> {
627 let receiver_ids = self
628 .state()
629 .project_connection_ids(request.payload.project_id, request.sender_id)?;
630 broadcast(request.sender_id, receiver_ids, |connection_id| {
631 self.peer
632 .forward_send(request.sender_id, connection_id, request.payload.clone())
633 })?;
634 Ok(proto::Ack {})
635 }
636
637 async fn update_buffer_file(
638 self: Arc<Server>,
639 request: TypedEnvelope<proto::UpdateBufferFile>,
640 ) -> tide::Result<()> {
641 let receiver_ids = self
642 .state()
643 .project_connection_ids(request.payload.project_id, request.sender_id)?;
644 broadcast(request.sender_id, receiver_ids, |connection_id| {
645 self.peer
646 .forward_send(request.sender_id, connection_id, request.payload.clone())
647 })?;
648 Ok(())
649 }
650
651 async fn buffer_reloaded(
652 self: Arc<Server>,
653 request: TypedEnvelope<proto::BufferReloaded>,
654 ) -> tide::Result<()> {
655 let receiver_ids = self
656 .state()
657 .project_connection_ids(request.payload.project_id, request.sender_id)?;
658 broadcast(request.sender_id, receiver_ids, |connection_id| {
659 self.peer
660 .forward_send(request.sender_id, connection_id, request.payload.clone())
661 })?;
662 Ok(())
663 }
664
665 async fn buffer_saved(
666 self: Arc<Server>,
667 request: TypedEnvelope<proto::BufferSaved>,
668 ) -> tide::Result<()> {
669 let receiver_ids = self
670 .state()
671 .project_connection_ids(request.payload.project_id, request.sender_id)?;
672 broadcast(request.sender_id, receiver_ids, |connection_id| {
673 self.peer
674 .forward_send(request.sender_id, connection_id, request.payload.clone())
675 })?;
676 Ok(())
677 }
678
679 async fn get_channels(
680 self: Arc<Server>,
681 request: TypedEnvelope<proto::GetChannels>,
682 ) -> tide::Result<proto::GetChannelsResponse> {
683 let user_id = self.state().user_id_for_connection(request.sender_id)?;
684 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
685 Ok(proto::GetChannelsResponse {
686 channels: channels
687 .into_iter()
688 .map(|chan| proto::Channel {
689 id: chan.id.to_proto(),
690 name: chan.name,
691 })
692 .collect(),
693 })
694 }
695
696 async fn get_users(
697 self: Arc<Server>,
698 request: TypedEnvelope<proto::GetUsers>,
699 ) -> tide::Result<proto::GetUsersResponse> {
700 let user_ids = request
701 .payload
702 .user_ids
703 .into_iter()
704 .map(UserId::from_proto)
705 .collect();
706 let users = self
707 .app_state
708 .db
709 .get_users_by_ids(user_ids)
710 .await?
711 .into_iter()
712 .map(|user| proto::User {
713 id: user.id.to_proto(),
714 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
715 github_login: user.github_login,
716 })
717 .collect();
718 Ok(proto::GetUsersResponse { users })
719 }
720
721 fn update_contacts_for_users<'a>(
722 self: &Arc<Server>,
723 user_ids: impl IntoIterator<Item = &'a UserId>,
724 ) -> anyhow::Result<()> {
725 let mut result = Ok(());
726 let state = self.state();
727 for user_id in user_ids {
728 let contacts = state.contacts_for_user(*user_id);
729 for connection_id in state.connection_ids_for_user(*user_id) {
730 if let Err(error) = self.peer.send(
731 connection_id,
732 proto::UpdateContacts {
733 contacts: contacts.clone(),
734 },
735 ) {
736 result = Err(error);
737 }
738 }
739 }
740 result
741 }
742
743 async fn join_channel(
744 mut self: Arc<Self>,
745 request: TypedEnvelope<proto::JoinChannel>,
746 ) -> tide::Result<proto::JoinChannelResponse> {
747 let user_id = self.state().user_id_for_connection(request.sender_id)?;
748 let channel_id = ChannelId::from_proto(request.payload.channel_id);
749 if !self
750 .app_state
751 .db
752 .can_user_access_channel(user_id, channel_id)
753 .await?
754 {
755 Err(anyhow!("access denied"))?;
756 }
757
758 self.state_mut().join_channel(request.sender_id, channel_id);
759 let messages = self
760 .app_state
761 .db
762 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
763 .await?
764 .into_iter()
765 .map(|msg| proto::ChannelMessage {
766 id: msg.id.to_proto(),
767 body: msg.body,
768 timestamp: msg.sent_at.unix_timestamp() as u64,
769 sender_id: msg.sender_id.to_proto(),
770 nonce: Some(msg.nonce.as_u128().into()),
771 })
772 .collect::<Vec<_>>();
773 Ok(proto::JoinChannelResponse {
774 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
775 messages,
776 })
777 }
778
779 async fn leave_channel(
780 mut self: Arc<Self>,
781 request: TypedEnvelope<proto::LeaveChannel>,
782 ) -> tide::Result<()> {
783 let user_id = self.state().user_id_for_connection(request.sender_id)?;
784 let channel_id = ChannelId::from_proto(request.payload.channel_id);
785 if !self
786 .app_state
787 .db
788 .can_user_access_channel(user_id, channel_id)
789 .await?
790 {
791 Err(anyhow!("access denied"))?;
792 }
793
794 self.state_mut()
795 .leave_channel(request.sender_id, channel_id);
796
797 Ok(())
798 }
799
800 async fn send_channel_message(
801 self: Arc<Self>,
802 request: TypedEnvelope<proto::SendChannelMessage>,
803 ) -> tide::Result<proto::SendChannelMessageResponse> {
804 let channel_id = ChannelId::from_proto(request.payload.channel_id);
805 let user_id;
806 let connection_ids;
807 {
808 let state = self.state();
809 user_id = state.user_id_for_connection(request.sender_id)?;
810 connection_ids = state.channel_connection_ids(channel_id)?;
811 }
812
813 // Validate the message body.
814 let body = request.payload.body.trim().to_string();
815 if body.len() > MAX_MESSAGE_LEN {
816 return Err(anyhow!("message is too long"))?;
817 }
818 if body.is_empty() {
819 return Err(anyhow!("message can't be blank"))?;
820 }
821
822 let timestamp = OffsetDateTime::now_utc();
823 let nonce = request
824 .payload
825 .nonce
826 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
827
828 let message_id = self
829 .app_state
830 .db
831 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
832 .await?
833 .to_proto();
834 let message = proto::ChannelMessage {
835 sender_id: user_id.to_proto(),
836 id: message_id,
837 body,
838 timestamp: timestamp.unix_timestamp() as u64,
839 nonce: Some(nonce),
840 };
841 broadcast(request.sender_id, connection_ids, |conn_id| {
842 self.peer.send(
843 conn_id,
844 proto::ChannelMessageSent {
845 channel_id: channel_id.to_proto(),
846 message: Some(message.clone()),
847 },
848 )
849 })?;
850 Ok(proto::SendChannelMessageResponse {
851 message: Some(message),
852 })
853 }
854
855 async fn get_channel_messages(
856 self: Arc<Self>,
857 request: TypedEnvelope<proto::GetChannelMessages>,
858 ) -> tide::Result<proto::GetChannelMessagesResponse> {
859 let user_id = self.state().user_id_for_connection(request.sender_id)?;
860 let channel_id = ChannelId::from_proto(request.payload.channel_id);
861 if !self
862 .app_state
863 .db
864 .can_user_access_channel(user_id, channel_id)
865 .await?
866 {
867 Err(anyhow!("access denied"))?;
868 }
869
870 let messages = self
871 .app_state
872 .db
873 .get_channel_messages(
874 channel_id,
875 MESSAGE_COUNT_PER_PAGE,
876 Some(MessageId::from_proto(request.payload.before_message_id)),
877 )
878 .await?
879 .into_iter()
880 .map(|msg| proto::ChannelMessage {
881 id: msg.id.to_proto(),
882 body: msg.body,
883 timestamp: msg.sent_at.unix_timestamp() as u64,
884 sender_id: msg.sender_id.to_proto(),
885 nonce: Some(msg.nonce.as_u128().into()),
886 })
887 .collect::<Vec<_>>();
888
889 Ok(proto::GetChannelMessagesResponse {
890 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
891 messages,
892 })
893 }
894
895 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
896 self.store.read()
897 }
898
899 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
900 self.store.write()
901 }
902}
903
904impl Executor for RealExecutor {
905 type Timer = Timer;
906
907 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
908 task::spawn(future);
909 }
910
911 fn timer(&self, duration: Duration) -> Self::Timer {
912 Timer::after(duration)
913 }
914}
915
916fn broadcast<F>(
917 sender_id: ConnectionId,
918 receiver_ids: Vec<ConnectionId>,
919 mut f: F,
920) -> anyhow::Result<()>
921where
922 F: FnMut(ConnectionId) -> anyhow::Result<()>,
923{
924 let mut result = Ok(());
925 for receiver_id in receiver_ids {
926 if receiver_id != sender_id {
927 if let Err(error) = f(receiver_id) {
928 if result.is_ok() {
929 result = Err(error);
930 }
931 }
932 }
933 }
934 result
935}
936
937pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
938 let server = Server::new(app.state().clone(), rpc.clone(), None);
939 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
940 let server = server.clone();
941 async move {
942 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
943
944 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
945 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
946 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
947 let client_protocol_version: Option<u32> = request
948 .header("X-Zed-Protocol-Version")
949 .and_then(|v| v.as_str().parse().ok());
950
951 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
952 return Ok(Response::new(StatusCode::UpgradeRequired));
953 }
954
955 let header = match request.header("Sec-Websocket-Key") {
956 Some(h) => h.as_str(),
957 None => return Err(anyhow!("expected sec-websocket-key"))?,
958 };
959
960 let user_id = process_auth_header(&request).await?;
961
962 let mut response = Response::new(StatusCode::SwitchingProtocols);
963 response.insert_header(UPGRADE, "websocket");
964 response.insert_header(CONNECTION, "Upgrade");
965 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
966 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
967 response.insert_header("Sec-Websocket-Version", "13");
968
969 let http_res: &mut tide::http::Response = response.as_mut();
970 let upgrade_receiver = http_res.recv_upgrade().await;
971 let addr = request.remote().unwrap_or("unknown").to_string();
972 task::spawn(async move {
973 if let Some(stream) = upgrade_receiver.await {
974 server
975 .handle_connection(
976 Connection::new(
977 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
978 ),
979 addr,
980 user_id,
981 None,
982 RealExecutor,
983 )
984 .await;
985 }
986 });
987
988 Ok(response)
989 }
990 });
991}
992
993fn header_contains_ignore_case<T>(
994 request: &tide::Request<T>,
995 header_name: HeaderName,
996 value: &str,
997) -> bool {
998 request
999 .header(header_name)
1000 .map(|h| {
1001 h.as_str()
1002 .split(',')
1003 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1004 })
1005 .unwrap_or(false)
1006}
1007
1008#[cfg(test)]
1009mod tests {
1010 use super::*;
1011 use crate::{
1012 auth,
1013 db::{tests::TestDb, UserId},
1014 github, AppState, Config,
1015 };
1016 use ::rpc::Peer;
1017 use client::{
1018 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1019 EstablishConnectionError, UserStore,
1020 };
1021 use collections::BTreeMap;
1022 use editor::{
1023 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1024 Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1025 };
1026 use gpui::{executor, ModelHandle, TestAppContext};
1027 use language::{
1028 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1029 LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1030 };
1031 use lsp;
1032 use parking_lot::Mutex;
1033 use postage::{barrier, watch};
1034 use project::{
1035 fs::{FakeFs, Fs as _},
1036 search::SearchQuery,
1037 DiagnosticSummary, Project, ProjectPath,
1038 };
1039 use rand::prelude::*;
1040 use rpc::PeerId;
1041 use serde_json::json;
1042 use sqlx::types::time::OffsetDateTime;
1043 use std::{
1044 cell::Cell,
1045 env,
1046 ops::Deref,
1047 path::{Path, PathBuf},
1048 rc::Rc,
1049 sync::{
1050 atomic::{AtomicBool, Ordering::SeqCst},
1051 Arc,
1052 },
1053 time::Duration,
1054 };
1055 use workspace::{Settings, Workspace, WorkspaceParams};
1056
1057 #[cfg(test)]
1058 #[ctor::ctor]
1059 fn init_logger() {
1060 if std::env::var("RUST_LOG").is_ok() {
1061 env_logger::init();
1062 }
1063 }
1064
1065 #[gpui::test(iterations = 10)]
1066 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1067 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1068 let lang_registry = Arc::new(LanguageRegistry::new());
1069 let fs = FakeFs::new(cx_a.background());
1070 cx_a.foreground().forbid_parking();
1071
1072 // Connect to a server as 2 clients.
1073 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1074 let client_a = server.create_client(cx_a, "user_a").await;
1075 let client_b = server.create_client(cx_b, "user_b").await;
1076
1077 // Share a project as client A
1078 fs.insert_tree(
1079 "/a",
1080 json!({
1081 ".zed.toml": r#"collaborators = ["user_b"]"#,
1082 "a.txt": "a-contents",
1083 "b.txt": "b-contents",
1084 }),
1085 )
1086 .await;
1087 let project_a = cx_a.update(|cx| {
1088 Project::local(
1089 client_a.clone(),
1090 client_a.user_store.clone(),
1091 lang_registry.clone(),
1092 fs.clone(),
1093 cx,
1094 )
1095 });
1096 let (worktree_a, _) = project_a
1097 .update(cx_a, |p, cx| {
1098 p.find_or_create_local_worktree("/a", true, cx)
1099 })
1100 .await
1101 .unwrap();
1102 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1103 worktree_a
1104 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1105 .await;
1106 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1107 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1108
1109 // Join that project as client B
1110 let project_b = Project::remote(
1111 project_id,
1112 client_b.clone(),
1113 client_b.user_store.clone(),
1114 lang_registry.clone(),
1115 fs.clone(),
1116 &mut cx_b.to_async(),
1117 )
1118 .await
1119 .unwrap();
1120
1121 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1122 assert_eq!(
1123 project
1124 .collaborators()
1125 .get(&client_a.peer_id)
1126 .unwrap()
1127 .user
1128 .github_login,
1129 "user_a"
1130 );
1131 project.replica_id()
1132 });
1133 project_a
1134 .condition(&cx_a, |tree, _| {
1135 tree.collaborators()
1136 .get(&client_b.peer_id)
1137 .map_or(false, |collaborator| {
1138 collaborator.replica_id == replica_id_b
1139 && collaborator.user.github_login == "user_b"
1140 })
1141 })
1142 .await;
1143
1144 // Open the same file as client B and client A.
1145 let buffer_b = project_b
1146 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1147 .await
1148 .unwrap();
1149 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1150 buffer_b.read_with(cx_b, |buf, cx| {
1151 assert_eq!(buf.read(cx).text(), "b-contents")
1152 });
1153 project_a.read_with(cx_a, |project, cx| {
1154 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1155 });
1156 let buffer_a = project_a
1157 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1158 .await
1159 .unwrap();
1160
1161 let editor_b = cx_b.add_view(window_b, |cx| {
1162 Editor::for_buffer(
1163 buffer_b,
1164 None,
1165 watch::channel_with(Settings::test(cx)).1,
1166 cx,
1167 )
1168 });
1169
1170 // TODO
1171 // // Create a selection set as client B and see that selection set as client A.
1172 // buffer_a
1173 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1174 // .await;
1175
1176 // Edit the buffer as client B and see that edit as client A.
1177 editor_b.update(cx_b, |editor, cx| {
1178 editor.handle_input(&Input("ok, ".into()), cx)
1179 });
1180 buffer_a
1181 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1182 .await;
1183
1184 // TODO
1185 // // Remove the selection set as client B, see those selections disappear as client A.
1186 cx_b.update(move |_| drop(editor_b));
1187 // buffer_a
1188 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1189 // .await;
1190
1191 // Dropping the client B's project removes client B from client A's collaborators.
1192 cx_b.update(move |_| drop(project_b));
1193 project_a
1194 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1195 .await;
1196 }
1197
1198 #[gpui::test(iterations = 10)]
1199 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1200 let lang_registry = Arc::new(LanguageRegistry::new());
1201 let fs = FakeFs::new(cx_a.background());
1202 cx_a.foreground().forbid_parking();
1203
1204 // Connect to a server as 2 clients.
1205 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1206 let client_a = server.create_client(cx_a, "user_a").await;
1207 let client_b = server.create_client(cx_b, "user_b").await;
1208
1209 // Share a project as client A
1210 fs.insert_tree(
1211 "/a",
1212 json!({
1213 ".zed.toml": r#"collaborators = ["user_b"]"#,
1214 "a.txt": "a-contents",
1215 "b.txt": "b-contents",
1216 }),
1217 )
1218 .await;
1219 let project_a = cx_a.update(|cx| {
1220 Project::local(
1221 client_a.clone(),
1222 client_a.user_store.clone(),
1223 lang_registry.clone(),
1224 fs.clone(),
1225 cx,
1226 )
1227 });
1228 let (worktree_a, _) = project_a
1229 .update(cx_a, |p, cx| {
1230 p.find_or_create_local_worktree("/a", true, cx)
1231 })
1232 .await
1233 .unwrap();
1234 worktree_a
1235 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1236 .await;
1237 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1238 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1239 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1240 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1241
1242 // Join that project as client B
1243 let project_b = Project::remote(
1244 project_id,
1245 client_b.clone(),
1246 client_b.user_store.clone(),
1247 lang_registry.clone(),
1248 fs.clone(),
1249 &mut cx_b.to_async(),
1250 )
1251 .await
1252 .unwrap();
1253 project_b
1254 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1255 .await
1256 .unwrap();
1257
1258 // Unshare the project as client A
1259 project_a
1260 .update(cx_a, |project, cx| project.unshare(cx))
1261 .await
1262 .unwrap();
1263 project_b
1264 .condition(cx_b, |project, _| project.is_read_only())
1265 .await;
1266 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1267 cx_b.update(|_| {
1268 drop(project_b);
1269 });
1270
1271 // Share the project again and ensure guests can still join.
1272 project_a
1273 .update(cx_a, |project, cx| project.share(cx))
1274 .await
1275 .unwrap();
1276 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1277
1278 let project_b2 = Project::remote(
1279 project_id,
1280 client_b.clone(),
1281 client_b.user_store.clone(),
1282 lang_registry.clone(),
1283 fs.clone(),
1284 &mut cx_b.to_async(),
1285 )
1286 .await
1287 .unwrap();
1288 project_b2
1289 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1290 .await
1291 .unwrap();
1292 }
1293
1294 #[gpui::test(iterations = 10)]
1295 async fn test_propagate_saves_and_fs_changes(
1296 cx_a: &mut TestAppContext,
1297 cx_b: &mut TestAppContext,
1298 cx_c: &mut TestAppContext,
1299 ) {
1300 let lang_registry = Arc::new(LanguageRegistry::new());
1301 let fs = FakeFs::new(cx_a.background());
1302 cx_a.foreground().forbid_parking();
1303
1304 // Connect to a server as 3 clients.
1305 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1306 let client_a = server.create_client(cx_a, "user_a").await;
1307 let client_b = server.create_client(cx_b, "user_b").await;
1308 let client_c = server.create_client(cx_c, "user_c").await;
1309
1310 // Share a worktree as client A.
1311 fs.insert_tree(
1312 "/a",
1313 json!({
1314 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1315 "file1": "",
1316 "file2": ""
1317 }),
1318 )
1319 .await;
1320 let project_a = cx_a.update(|cx| {
1321 Project::local(
1322 client_a.clone(),
1323 client_a.user_store.clone(),
1324 lang_registry.clone(),
1325 fs.clone(),
1326 cx,
1327 )
1328 });
1329 let (worktree_a, _) = project_a
1330 .update(cx_a, |p, cx| {
1331 p.find_or_create_local_worktree("/a", true, cx)
1332 })
1333 .await
1334 .unwrap();
1335 worktree_a
1336 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1337 .await;
1338 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1339 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1340 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1341
1342 // Join that worktree as clients B and C.
1343 let project_b = Project::remote(
1344 project_id,
1345 client_b.clone(),
1346 client_b.user_store.clone(),
1347 lang_registry.clone(),
1348 fs.clone(),
1349 &mut cx_b.to_async(),
1350 )
1351 .await
1352 .unwrap();
1353 let project_c = Project::remote(
1354 project_id,
1355 client_c.clone(),
1356 client_c.user_store.clone(),
1357 lang_registry.clone(),
1358 fs.clone(),
1359 &mut cx_c.to_async(),
1360 )
1361 .await
1362 .unwrap();
1363 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1364 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1365
1366 // Open and edit a buffer as both guests B and C.
1367 let buffer_b = project_b
1368 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1369 .await
1370 .unwrap();
1371 let buffer_c = project_c
1372 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1373 .await
1374 .unwrap();
1375 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1376 buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1377
1378 // Open and edit that buffer as the host.
1379 let buffer_a = project_a
1380 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1381 .await
1382 .unwrap();
1383
1384 buffer_a
1385 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1386 .await;
1387 buffer_a.update(cx_a, |buf, cx| {
1388 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1389 });
1390
1391 // Wait for edits to propagate
1392 buffer_a
1393 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1394 .await;
1395 buffer_b
1396 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1397 .await;
1398 buffer_c
1399 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1400 .await;
1401
1402 // Edit the buffer as the host and concurrently save as guest B.
1403 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1404 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1405 save_b.await.unwrap();
1406 assert_eq!(
1407 fs.load("/a/file1".as_ref()).await.unwrap(),
1408 "hi-a, i-am-c, i-am-b, i-am-a"
1409 );
1410 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1411 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1412 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1413
1414 // Make changes on host's file system, see those changes on guest worktrees.
1415 fs.rename(
1416 "/a/file1".as_ref(),
1417 "/a/file1-renamed".as_ref(),
1418 Default::default(),
1419 )
1420 .await
1421 .unwrap();
1422
1423 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1424 .await
1425 .unwrap();
1426 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1427
1428 worktree_a
1429 .condition(&cx_a, |tree, _| {
1430 tree.paths()
1431 .map(|p| p.to_string_lossy())
1432 .collect::<Vec<_>>()
1433 == [".zed.toml", "file1-renamed", "file3", "file4"]
1434 })
1435 .await;
1436 worktree_b
1437 .condition(&cx_b, |tree, _| {
1438 tree.paths()
1439 .map(|p| p.to_string_lossy())
1440 .collect::<Vec<_>>()
1441 == [".zed.toml", "file1-renamed", "file3", "file4"]
1442 })
1443 .await;
1444 worktree_c
1445 .condition(&cx_c, |tree, _| {
1446 tree.paths()
1447 .map(|p| p.to_string_lossy())
1448 .collect::<Vec<_>>()
1449 == [".zed.toml", "file1-renamed", "file3", "file4"]
1450 })
1451 .await;
1452
1453 // Ensure buffer files are updated as well.
1454 buffer_a
1455 .condition(&cx_a, |buf, _| {
1456 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1457 })
1458 .await;
1459 buffer_b
1460 .condition(&cx_b, |buf, _| {
1461 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1462 })
1463 .await;
1464 buffer_c
1465 .condition(&cx_c, |buf, _| {
1466 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1467 })
1468 .await;
1469 }
1470
1471 #[gpui::test(iterations = 10)]
1472 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1473 cx_a.foreground().forbid_parking();
1474 let lang_registry = Arc::new(LanguageRegistry::new());
1475 let fs = FakeFs::new(cx_a.background());
1476
1477 // Connect to a server as 2 clients.
1478 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1479 let client_a = server.create_client(cx_a, "user_a").await;
1480 let client_b = server.create_client(cx_b, "user_b").await;
1481
1482 // Share a project as client A
1483 fs.insert_tree(
1484 "/dir",
1485 json!({
1486 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1487 "a.txt": "a-contents",
1488 }),
1489 )
1490 .await;
1491
1492 let project_a = cx_a.update(|cx| {
1493 Project::local(
1494 client_a.clone(),
1495 client_a.user_store.clone(),
1496 lang_registry.clone(),
1497 fs.clone(),
1498 cx,
1499 )
1500 });
1501 let (worktree_a, _) = project_a
1502 .update(cx_a, |p, cx| {
1503 p.find_or_create_local_worktree("/dir", true, cx)
1504 })
1505 .await
1506 .unwrap();
1507 worktree_a
1508 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1509 .await;
1510 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1511 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1512 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1513
1514 // Join that project as client B
1515 let project_b = Project::remote(
1516 project_id,
1517 client_b.clone(),
1518 client_b.user_store.clone(),
1519 lang_registry.clone(),
1520 fs.clone(),
1521 &mut cx_b.to_async(),
1522 )
1523 .await
1524 .unwrap();
1525
1526 // Open a buffer as client B
1527 let buffer_b = project_b
1528 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1529 .await
1530 .unwrap();
1531
1532 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1533 buffer_b.read_with(cx_b, |buf, _| {
1534 assert!(buf.is_dirty());
1535 assert!(!buf.has_conflict());
1536 });
1537
1538 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1539 buffer_b
1540 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1541 .await;
1542 buffer_b.read_with(cx_b, |buf, _| {
1543 assert!(!buf.has_conflict());
1544 });
1545
1546 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1547 buffer_b.read_with(cx_b, |buf, _| {
1548 assert!(buf.is_dirty());
1549 assert!(!buf.has_conflict());
1550 });
1551 }
1552
1553 #[gpui::test(iterations = 10)]
1554 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1555 cx_a.foreground().forbid_parking();
1556 let lang_registry = Arc::new(LanguageRegistry::new());
1557 let fs = FakeFs::new(cx_a.background());
1558
1559 // Connect to a server as 2 clients.
1560 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1561 let client_a = server.create_client(cx_a, "user_a").await;
1562 let client_b = server.create_client(cx_b, "user_b").await;
1563
1564 // Share a project as client A
1565 fs.insert_tree(
1566 "/dir",
1567 json!({
1568 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1569 "a.txt": "a-contents",
1570 }),
1571 )
1572 .await;
1573
1574 let project_a = cx_a.update(|cx| {
1575 Project::local(
1576 client_a.clone(),
1577 client_a.user_store.clone(),
1578 lang_registry.clone(),
1579 fs.clone(),
1580 cx,
1581 )
1582 });
1583 let (worktree_a, _) = project_a
1584 .update(cx_a, |p, cx| {
1585 p.find_or_create_local_worktree("/dir", true, cx)
1586 })
1587 .await
1588 .unwrap();
1589 worktree_a
1590 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1591 .await;
1592 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1593 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1594 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1595
1596 // Join that project as client B
1597 let project_b = Project::remote(
1598 project_id,
1599 client_b.clone(),
1600 client_b.user_store.clone(),
1601 lang_registry.clone(),
1602 fs.clone(),
1603 &mut cx_b.to_async(),
1604 )
1605 .await
1606 .unwrap();
1607 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1608
1609 // Open a buffer as client B
1610 let buffer_b = project_b
1611 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1612 .await
1613 .unwrap();
1614 buffer_b.read_with(cx_b, |buf, _| {
1615 assert!(!buf.is_dirty());
1616 assert!(!buf.has_conflict());
1617 });
1618
1619 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1620 .await
1621 .unwrap();
1622 buffer_b
1623 .condition(&cx_b, |buf, _| {
1624 buf.text() == "new contents" && !buf.is_dirty()
1625 })
1626 .await;
1627 buffer_b.read_with(cx_b, |buf, _| {
1628 assert!(!buf.has_conflict());
1629 });
1630 }
1631
1632 #[gpui::test(iterations = 10)]
1633 async fn test_editing_while_guest_opens_buffer(
1634 cx_a: &mut TestAppContext,
1635 cx_b: &mut TestAppContext,
1636 ) {
1637 cx_a.foreground().forbid_parking();
1638 let lang_registry = Arc::new(LanguageRegistry::new());
1639 let fs = FakeFs::new(cx_a.background());
1640
1641 // Connect to a server as 2 clients.
1642 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1643 let client_a = server.create_client(cx_a, "user_a").await;
1644 let client_b = server.create_client(cx_b, "user_b").await;
1645
1646 // Share a project as client A
1647 fs.insert_tree(
1648 "/dir",
1649 json!({
1650 ".zed.toml": r#"collaborators = ["user_b"]"#,
1651 "a.txt": "a-contents",
1652 }),
1653 )
1654 .await;
1655 let project_a = cx_a.update(|cx| {
1656 Project::local(
1657 client_a.clone(),
1658 client_a.user_store.clone(),
1659 lang_registry.clone(),
1660 fs.clone(),
1661 cx,
1662 )
1663 });
1664 let (worktree_a, _) = project_a
1665 .update(cx_a, |p, cx| {
1666 p.find_or_create_local_worktree("/dir", true, cx)
1667 })
1668 .await
1669 .unwrap();
1670 worktree_a
1671 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1672 .await;
1673 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1674 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1675 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1676
1677 // Join that project as client B
1678 let project_b = Project::remote(
1679 project_id,
1680 client_b.clone(),
1681 client_b.user_store.clone(),
1682 lang_registry.clone(),
1683 fs.clone(),
1684 &mut cx_b.to_async(),
1685 )
1686 .await
1687 .unwrap();
1688
1689 // Open a buffer as client A
1690 let buffer_a = project_a
1691 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1692 .await
1693 .unwrap();
1694
1695 // Start opening the same buffer as client B
1696 let buffer_b = cx_b
1697 .background()
1698 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1699
1700 // Edit the buffer as client A while client B is still opening it.
1701 cx_b.background().simulate_random_delay().await;
1702 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1703 cx_b.background().simulate_random_delay().await;
1704 buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1705
1706 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1707 let buffer_b = buffer_b.await.unwrap();
1708 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1709 }
1710
1711 #[gpui::test(iterations = 10)]
1712 async fn test_leaving_worktree_while_opening_buffer(
1713 cx_a: &mut TestAppContext,
1714 cx_b: &mut TestAppContext,
1715 ) {
1716 cx_a.foreground().forbid_parking();
1717 let lang_registry = Arc::new(LanguageRegistry::new());
1718 let fs = FakeFs::new(cx_a.background());
1719
1720 // Connect to a server as 2 clients.
1721 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1722 let client_a = server.create_client(cx_a, "user_a").await;
1723 let client_b = server.create_client(cx_b, "user_b").await;
1724
1725 // Share a project as client A
1726 fs.insert_tree(
1727 "/dir",
1728 json!({
1729 ".zed.toml": r#"collaborators = ["user_b"]"#,
1730 "a.txt": "a-contents",
1731 }),
1732 )
1733 .await;
1734 let project_a = cx_a.update(|cx| {
1735 Project::local(
1736 client_a.clone(),
1737 client_a.user_store.clone(),
1738 lang_registry.clone(),
1739 fs.clone(),
1740 cx,
1741 )
1742 });
1743 let (worktree_a, _) = project_a
1744 .update(cx_a, |p, cx| {
1745 p.find_or_create_local_worktree("/dir", true, cx)
1746 })
1747 .await
1748 .unwrap();
1749 worktree_a
1750 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1751 .await;
1752 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1753 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1754 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1755
1756 // Join that project as client B
1757 let project_b = Project::remote(
1758 project_id,
1759 client_b.clone(),
1760 client_b.user_store.clone(),
1761 lang_registry.clone(),
1762 fs.clone(),
1763 &mut cx_b.to_async(),
1764 )
1765 .await
1766 .unwrap();
1767
1768 // See that a guest has joined as client A.
1769 project_a
1770 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1771 .await;
1772
1773 // Begin opening a buffer as client B, but leave the project before the open completes.
1774 let buffer_b = cx_b
1775 .background()
1776 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1777 cx_b.update(|_| drop(project_b));
1778 drop(buffer_b);
1779
1780 // See that the guest has left.
1781 project_a
1782 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1783 .await;
1784 }
1785
1786 #[gpui::test(iterations = 10)]
1787 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1788 cx_a.foreground().forbid_parking();
1789 let lang_registry = Arc::new(LanguageRegistry::new());
1790 let fs = FakeFs::new(cx_a.background());
1791
1792 // Connect to a server as 2 clients.
1793 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1794 let client_a = server.create_client(cx_a, "user_a").await;
1795 let client_b = server.create_client(cx_b, "user_b").await;
1796
1797 // Share a project as client A
1798 fs.insert_tree(
1799 "/a",
1800 json!({
1801 ".zed.toml": r#"collaborators = ["user_b"]"#,
1802 "a.txt": "a-contents",
1803 "b.txt": "b-contents",
1804 }),
1805 )
1806 .await;
1807 let project_a = cx_a.update(|cx| {
1808 Project::local(
1809 client_a.clone(),
1810 client_a.user_store.clone(),
1811 lang_registry.clone(),
1812 fs.clone(),
1813 cx,
1814 )
1815 });
1816 let (worktree_a, _) = project_a
1817 .update(cx_a, |p, cx| {
1818 p.find_or_create_local_worktree("/a", true, cx)
1819 })
1820 .await
1821 .unwrap();
1822 worktree_a
1823 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1824 .await;
1825 let project_id = project_a
1826 .update(cx_a, |project, _| project.next_remote_id())
1827 .await;
1828 project_a
1829 .update(cx_a, |project, cx| project.share(cx))
1830 .await
1831 .unwrap();
1832
1833 // Join that project as client B
1834 let _project_b = Project::remote(
1835 project_id,
1836 client_b.clone(),
1837 client_b.user_store.clone(),
1838 lang_registry.clone(),
1839 fs.clone(),
1840 &mut cx_b.to_async(),
1841 )
1842 .await
1843 .unwrap();
1844
1845 // Client A sees that a guest has joined.
1846 project_a
1847 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1848 .await;
1849
1850 // Drop client B's connection and ensure client A observes client B leaving the project.
1851 client_b.disconnect(&cx_b.to_async()).unwrap();
1852 project_a
1853 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1854 .await;
1855
1856 // Rejoin the project as client B
1857 let _project_b = Project::remote(
1858 project_id,
1859 client_b.clone(),
1860 client_b.user_store.clone(),
1861 lang_registry.clone(),
1862 fs.clone(),
1863 &mut cx_b.to_async(),
1864 )
1865 .await
1866 .unwrap();
1867
1868 // Client A sees that a guest has re-joined.
1869 project_a
1870 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1871 .await;
1872
1873 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1874 server.disconnect_client(client_b.current_user_id(cx_b));
1875 cx_a.foreground().advance_clock(Duration::from_secs(3));
1876 project_a
1877 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1878 .await;
1879 }
1880
1881 #[gpui::test(iterations = 10)]
1882 async fn test_collaborating_with_diagnostics(
1883 cx_a: &mut TestAppContext,
1884 cx_b: &mut TestAppContext,
1885 ) {
1886 cx_a.foreground().forbid_parking();
1887 let mut lang_registry = Arc::new(LanguageRegistry::new());
1888 let fs = FakeFs::new(cx_a.background());
1889
1890 // Set up a fake language server.
1891 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1892 Arc::get_mut(&mut lang_registry)
1893 .unwrap()
1894 .add(Arc::new(Language::new(
1895 LanguageConfig {
1896 name: "Rust".into(),
1897 path_suffixes: vec!["rs".to_string()],
1898 language_server: Some(language_server_config),
1899 ..Default::default()
1900 },
1901 Some(tree_sitter_rust::language()),
1902 )));
1903
1904 // Connect to a server as 2 clients.
1905 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1906 let client_a = server.create_client(cx_a, "user_a").await;
1907 let client_b = server.create_client(cx_b, "user_b").await;
1908
1909 // Share a project as client A
1910 fs.insert_tree(
1911 "/a",
1912 json!({
1913 ".zed.toml": r#"collaborators = ["user_b"]"#,
1914 "a.rs": "let one = two",
1915 "other.rs": "",
1916 }),
1917 )
1918 .await;
1919 let project_a = cx_a.update(|cx| {
1920 Project::local(
1921 client_a.clone(),
1922 client_a.user_store.clone(),
1923 lang_registry.clone(),
1924 fs.clone(),
1925 cx,
1926 )
1927 });
1928 let (worktree_a, _) = project_a
1929 .update(cx_a, |p, cx| {
1930 p.find_or_create_local_worktree("/a", true, cx)
1931 })
1932 .await
1933 .unwrap();
1934 worktree_a
1935 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1936 .await;
1937 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1938 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1939 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1940
1941 // Cause the language server to start.
1942 let _ = cx_a
1943 .background()
1944 .spawn(project_a.update(cx_a, |project, cx| {
1945 project.open_buffer(
1946 ProjectPath {
1947 worktree_id,
1948 path: Path::new("other.rs").into(),
1949 },
1950 cx,
1951 )
1952 }))
1953 .await
1954 .unwrap();
1955
1956 // Simulate a language server reporting errors for a file.
1957 let mut fake_language_server = fake_language_servers.next().await.unwrap();
1958 fake_language_server
1959 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1960 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1961 version: None,
1962 diagnostics: vec![lsp::Diagnostic {
1963 severity: Some(lsp::DiagnosticSeverity::ERROR),
1964 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1965 message: "message 1".to_string(),
1966 ..Default::default()
1967 }],
1968 })
1969 .await;
1970
1971 // Wait for server to see the diagnostics update.
1972 server
1973 .condition(|store| {
1974 let worktree = store
1975 .project(project_id)
1976 .unwrap()
1977 .share
1978 .as_ref()
1979 .unwrap()
1980 .worktrees
1981 .get(&worktree_id.to_proto())
1982 .unwrap();
1983
1984 !worktree.diagnostic_summaries.is_empty()
1985 })
1986 .await;
1987
1988 // Join the worktree as client B.
1989 let project_b = Project::remote(
1990 project_id,
1991 client_b.clone(),
1992 client_b.user_store.clone(),
1993 lang_registry.clone(),
1994 fs.clone(),
1995 &mut cx_b.to_async(),
1996 )
1997 .await
1998 .unwrap();
1999
2000 project_b.read_with(cx_b, |project, cx| {
2001 assert_eq!(
2002 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2003 &[(
2004 ProjectPath {
2005 worktree_id,
2006 path: Arc::from(Path::new("a.rs")),
2007 },
2008 DiagnosticSummary {
2009 error_count: 1,
2010 warning_count: 0,
2011 ..Default::default()
2012 },
2013 )]
2014 )
2015 });
2016
2017 // Simulate a language server reporting more errors for a file.
2018 fake_language_server
2019 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2020 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2021 version: None,
2022 diagnostics: vec![
2023 lsp::Diagnostic {
2024 severity: Some(lsp::DiagnosticSeverity::ERROR),
2025 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2026 message: "message 1".to_string(),
2027 ..Default::default()
2028 },
2029 lsp::Diagnostic {
2030 severity: Some(lsp::DiagnosticSeverity::WARNING),
2031 range: lsp::Range::new(
2032 lsp::Position::new(0, 10),
2033 lsp::Position::new(0, 13),
2034 ),
2035 message: "message 2".to_string(),
2036 ..Default::default()
2037 },
2038 ],
2039 })
2040 .await;
2041
2042 // Client b gets the updated summaries
2043 project_b
2044 .condition(&cx_b, |project, cx| {
2045 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2046 == &[(
2047 ProjectPath {
2048 worktree_id,
2049 path: Arc::from(Path::new("a.rs")),
2050 },
2051 DiagnosticSummary {
2052 error_count: 1,
2053 warning_count: 1,
2054 ..Default::default()
2055 },
2056 )]
2057 })
2058 .await;
2059
2060 // Open the file with the errors on client B. They should be present.
2061 let buffer_b = cx_b
2062 .background()
2063 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2064 .await
2065 .unwrap();
2066
2067 buffer_b.read_with(cx_b, |buffer, _| {
2068 assert_eq!(
2069 buffer
2070 .snapshot()
2071 .diagnostics_in_range::<_, Point>(0..buffer.len())
2072 .map(|entry| entry)
2073 .collect::<Vec<_>>(),
2074 &[
2075 DiagnosticEntry {
2076 range: Point::new(0, 4)..Point::new(0, 7),
2077 diagnostic: Diagnostic {
2078 group_id: 0,
2079 message: "message 1".to_string(),
2080 severity: lsp::DiagnosticSeverity::ERROR,
2081 is_primary: true,
2082 ..Default::default()
2083 }
2084 },
2085 DiagnosticEntry {
2086 range: Point::new(0, 10)..Point::new(0, 13),
2087 diagnostic: Diagnostic {
2088 group_id: 1,
2089 severity: lsp::DiagnosticSeverity::WARNING,
2090 message: "message 2".to_string(),
2091 is_primary: true,
2092 ..Default::default()
2093 }
2094 }
2095 ]
2096 );
2097 });
2098 }
2099
2100 #[gpui::test(iterations = 10)]
2101 async fn test_collaborating_with_completion(
2102 cx_a: &mut TestAppContext,
2103 cx_b: &mut TestAppContext,
2104 ) {
2105 cx_a.foreground().forbid_parking();
2106 let mut lang_registry = Arc::new(LanguageRegistry::new());
2107 let fs = FakeFs::new(cx_a.background());
2108
2109 // Set up a fake language server.
2110 let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2111 language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2112 completion_provider: Some(lsp::CompletionOptions {
2113 trigger_characters: Some(vec![".".to_string()]),
2114 ..Default::default()
2115 }),
2116 ..Default::default()
2117 });
2118 Arc::get_mut(&mut lang_registry)
2119 .unwrap()
2120 .add(Arc::new(Language::new(
2121 LanguageConfig {
2122 name: "Rust".into(),
2123 path_suffixes: vec!["rs".to_string()],
2124 language_server: Some(language_server_config),
2125 ..Default::default()
2126 },
2127 Some(tree_sitter_rust::language()),
2128 )));
2129
2130 // Connect to a server as 2 clients.
2131 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2132 let client_a = server.create_client(cx_a, "user_a").await;
2133 let client_b = server.create_client(cx_b, "user_b").await;
2134
2135 // Share a project as client A
2136 fs.insert_tree(
2137 "/a",
2138 json!({
2139 ".zed.toml": r#"collaborators = ["user_b"]"#,
2140 "main.rs": "fn main() { a }",
2141 "other.rs": "",
2142 }),
2143 )
2144 .await;
2145 let project_a = cx_a.update(|cx| {
2146 Project::local(
2147 client_a.clone(),
2148 client_a.user_store.clone(),
2149 lang_registry.clone(),
2150 fs.clone(),
2151 cx,
2152 )
2153 });
2154 let (worktree_a, _) = project_a
2155 .update(cx_a, |p, cx| {
2156 p.find_or_create_local_worktree("/a", true, cx)
2157 })
2158 .await
2159 .unwrap();
2160 worktree_a
2161 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2162 .await;
2163 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2164 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2165 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2166
2167 // Join the worktree as client B.
2168 let project_b = Project::remote(
2169 project_id,
2170 client_b.clone(),
2171 client_b.user_store.clone(),
2172 lang_registry.clone(),
2173 fs.clone(),
2174 &mut cx_b.to_async(),
2175 )
2176 .await
2177 .unwrap();
2178
2179 // Open a file in an editor as the guest.
2180 let buffer_b = project_b
2181 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2182 .await
2183 .unwrap();
2184 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2185 let editor_b = cx_b.add_view(window_b, |cx| {
2186 Editor::for_buffer(
2187 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2188 Some(project_b.clone()),
2189 watch::channel_with(Settings::test(cx)).1,
2190 cx,
2191 )
2192 });
2193
2194 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2195 buffer_b
2196 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2197 .await;
2198
2199 // Type a completion trigger character as the guest.
2200 editor_b.update(cx_b, |editor, cx| {
2201 editor.select_ranges([13..13], None, cx);
2202 editor.handle_input(&Input(".".into()), cx);
2203 cx.focus(&editor_b);
2204 });
2205
2206 // Receive a completion request as the host's language server.
2207 // Return some completions from the host's language server.
2208 cx_a.foreground().start_waiting();
2209 fake_language_server
2210 .handle_request::<lsp::request::Completion, _>(|params, _| {
2211 assert_eq!(
2212 params.text_document_position.text_document.uri,
2213 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2214 );
2215 assert_eq!(
2216 params.text_document_position.position,
2217 lsp::Position::new(0, 14),
2218 );
2219
2220 Some(lsp::CompletionResponse::Array(vec![
2221 lsp::CompletionItem {
2222 label: "first_method(…)".into(),
2223 detail: Some("fn(&mut self, B) -> C".into()),
2224 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2225 new_text: "first_method($1)".to_string(),
2226 range: lsp::Range::new(
2227 lsp::Position::new(0, 14),
2228 lsp::Position::new(0, 14),
2229 ),
2230 })),
2231 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2232 ..Default::default()
2233 },
2234 lsp::CompletionItem {
2235 label: "second_method(…)".into(),
2236 detail: Some("fn(&mut self, C) -> D<E>".into()),
2237 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2238 new_text: "second_method()".to_string(),
2239 range: lsp::Range::new(
2240 lsp::Position::new(0, 14),
2241 lsp::Position::new(0, 14),
2242 ),
2243 })),
2244 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2245 ..Default::default()
2246 },
2247 ]))
2248 })
2249 .next()
2250 .await
2251 .unwrap();
2252 cx_a.foreground().finish_waiting();
2253
2254 // Open the buffer on the host.
2255 let buffer_a = project_a
2256 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2257 .await
2258 .unwrap();
2259 buffer_a
2260 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2261 .await;
2262
2263 // Confirm a completion on the guest.
2264 editor_b
2265 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2266 .await;
2267 editor_b.update(cx_b, |editor, cx| {
2268 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2269 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2270 });
2271
2272 // Return a resolved completion from the host's language server.
2273 // The resolved completion has an additional text edit.
2274 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2275 |params, _| {
2276 assert_eq!(params.label, "first_method(…)");
2277 lsp::CompletionItem {
2278 label: "first_method(…)".into(),
2279 detail: Some("fn(&mut self, B) -> C".into()),
2280 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2281 new_text: "first_method($1)".to_string(),
2282 range: lsp::Range::new(
2283 lsp::Position::new(0, 14),
2284 lsp::Position::new(0, 14),
2285 ),
2286 })),
2287 additional_text_edits: Some(vec![lsp::TextEdit {
2288 new_text: "use d::SomeTrait;\n".to_string(),
2289 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2290 }]),
2291 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2292 ..Default::default()
2293 }
2294 },
2295 );
2296
2297 // The additional edit is applied.
2298 buffer_a
2299 .condition(&cx_a, |buffer, _| {
2300 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2301 })
2302 .await;
2303 buffer_b
2304 .condition(&cx_b, |buffer, _| {
2305 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2306 })
2307 .await;
2308 }
2309
2310 #[gpui::test(iterations = 10)]
2311 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2312 cx_a.foreground().forbid_parking();
2313 let mut lang_registry = Arc::new(LanguageRegistry::new());
2314 let fs = FakeFs::new(cx_a.background());
2315
2316 // Set up a fake language server.
2317 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2318 Arc::get_mut(&mut lang_registry)
2319 .unwrap()
2320 .add(Arc::new(Language::new(
2321 LanguageConfig {
2322 name: "Rust".into(),
2323 path_suffixes: vec!["rs".to_string()],
2324 language_server: Some(language_server_config),
2325 ..Default::default()
2326 },
2327 Some(tree_sitter_rust::language()),
2328 )));
2329
2330 // Connect to a server as 2 clients.
2331 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2332 let client_a = server.create_client(cx_a, "user_a").await;
2333 let client_b = server.create_client(cx_b, "user_b").await;
2334
2335 // Share a project as client A
2336 fs.insert_tree(
2337 "/a",
2338 json!({
2339 ".zed.toml": r#"collaborators = ["user_b"]"#,
2340 "a.rs": "let one = two",
2341 }),
2342 )
2343 .await;
2344 let project_a = cx_a.update(|cx| {
2345 Project::local(
2346 client_a.clone(),
2347 client_a.user_store.clone(),
2348 lang_registry.clone(),
2349 fs.clone(),
2350 cx,
2351 )
2352 });
2353 let (worktree_a, _) = project_a
2354 .update(cx_a, |p, cx| {
2355 p.find_or_create_local_worktree("/a", true, cx)
2356 })
2357 .await
2358 .unwrap();
2359 worktree_a
2360 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2361 .await;
2362 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2363 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2364 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2365
2366 // Join the worktree as client B.
2367 let project_b = Project::remote(
2368 project_id,
2369 client_b.clone(),
2370 client_b.user_store.clone(),
2371 lang_registry.clone(),
2372 fs.clone(),
2373 &mut cx_b.to_async(),
2374 )
2375 .await
2376 .unwrap();
2377
2378 let buffer_b = cx_b
2379 .background()
2380 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2381 .await
2382 .unwrap();
2383
2384 let format = project_b.update(cx_b, |project, cx| {
2385 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2386 });
2387
2388 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2389 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2390 Some(vec![
2391 lsp::TextEdit {
2392 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2393 new_text: "h".to_string(),
2394 },
2395 lsp::TextEdit {
2396 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2397 new_text: "y".to_string(),
2398 },
2399 ])
2400 });
2401
2402 format.await.unwrap();
2403 assert_eq!(
2404 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2405 "let honey = two"
2406 );
2407 }
2408
2409 #[gpui::test(iterations = 10)]
2410 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2411 cx_a.foreground().forbid_parking();
2412 let mut lang_registry = Arc::new(LanguageRegistry::new());
2413 let fs = FakeFs::new(cx_a.background());
2414 fs.insert_tree(
2415 "/root-1",
2416 json!({
2417 ".zed.toml": r#"collaborators = ["user_b"]"#,
2418 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2419 }),
2420 )
2421 .await;
2422 fs.insert_tree(
2423 "/root-2",
2424 json!({
2425 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2426 }),
2427 )
2428 .await;
2429
2430 // Set up a fake language server.
2431 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2432 Arc::get_mut(&mut lang_registry)
2433 .unwrap()
2434 .add(Arc::new(Language::new(
2435 LanguageConfig {
2436 name: "Rust".into(),
2437 path_suffixes: vec!["rs".to_string()],
2438 language_server: Some(language_server_config),
2439 ..Default::default()
2440 },
2441 Some(tree_sitter_rust::language()),
2442 )));
2443
2444 // Connect to a server as 2 clients.
2445 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2446 let client_a = server.create_client(cx_a, "user_a").await;
2447 let client_b = server.create_client(cx_b, "user_b").await;
2448
2449 // Share a project as client A
2450 let project_a = cx_a.update(|cx| {
2451 Project::local(
2452 client_a.clone(),
2453 client_a.user_store.clone(),
2454 lang_registry.clone(),
2455 fs.clone(),
2456 cx,
2457 )
2458 });
2459 let (worktree_a, _) = project_a
2460 .update(cx_a, |p, cx| {
2461 p.find_or_create_local_worktree("/root-1", true, cx)
2462 })
2463 .await
2464 .unwrap();
2465 worktree_a
2466 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2467 .await;
2468 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2469 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2470 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2471
2472 // Join the worktree as client B.
2473 let project_b = Project::remote(
2474 project_id,
2475 client_b.clone(),
2476 client_b.user_store.clone(),
2477 lang_registry.clone(),
2478 fs.clone(),
2479 &mut cx_b.to_async(),
2480 )
2481 .await
2482 .unwrap();
2483
2484 // Open the file on client B.
2485 let buffer_b = cx_b
2486 .background()
2487 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2488 .await
2489 .unwrap();
2490
2491 // Request the definition of a symbol as the guest.
2492 let definitions_1 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2493
2494 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2495 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2496 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2497 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2498 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2499 )))
2500 });
2501
2502 let definitions_1 = definitions_1.await.unwrap();
2503 cx_b.read(|cx| {
2504 assert_eq!(definitions_1.len(), 1);
2505 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2506 let target_buffer = definitions_1[0].buffer.read(cx);
2507 assert_eq!(
2508 target_buffer.text(),
2509 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2510 );
2511 assert_eq!(
2512 definitions_1[0].range.to_point(target_buffer),
2513 Point::new(0, 6)..Point::new(0, 9)
2514 );
2515 });
2516
2517 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2518 // the previous call to `definition`.
2519 let definitions_2 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2520 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2521 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2522 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2523 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2524 )))
2525 });
2526
2527 let definitions_2 = definitions_2.await.unwrap();
2528 cx_b.read(|cx| {
2529 assert_eq!(definitions_2.len(), 1);
2530 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2531 let target_buffer = definitions_2[0].buffer.read(cx);
2532 assert_eq!(
2533 target_buffer.text(),
2534 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2535 );
2536 assert_eq!(
2537 definitions_2[0].range.to_point(target_buffer),
2538 Point::new(1, 6)..Point::new(1, 11)
2539 );
2540 });
2541 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2542 }
2543
2544 #[gpui::test(iterations = 10)]
2545 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2546 cx_a.foreground().forbid_parking();
2547 let mut lang_registry = Arc::new(LanguageRegistry::new());
2548 let fs = FakeFs::new(cx_a.background());
2549 fs.insert_tree(
2550 "/root-1",
2551 json!({
2552 ".zed.toml": r#"collaborators = ["user_b"]"#,
2553 "one.rs": "const ONE: usize = 1;",
2554 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2555 }),
2556 )
2557 .await;
2558 fs.insert_tree(
2559 "/root-2",
2560 json!({
2561 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2562 }),
2563 )
2564 .await;
2565
2566 // Set up a fake language server.
2567 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2568 Arc::get_mut(&mut lang_registry)
2569 .unwrap()
2570 .add(Arc::new(Language::new(
2571 LanguageConfig {
2572 name: "Rust".into(),
2573 path_suffixes: vec!["rs".to_string()],
2574 language_server: Some(language_server_config),
2575 ..Default::default()
2576 },
2577 Some(tree_sitter_rust::language()),
2578 )));
2579
2580 // Connect to a server as 2 clients.
2581 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2582 let client_a = server.create_client(cx_a, "user_a").await;
2583 let client_b = server.create_client(cx_b, "user_b").await;
2584
2585 // Share a project as client A
2586 let project_a = cx_a.update(|cx| {
2587 Project::local(
2588 client_a.clone(),
2589 client_a.user_store.clone(),
2590 lang_registry.clone(),
2591 fs.clone(),
2592 cx,
2593 )
2594 });
2595 let (worktree_a, _) = project_a
2596 .update(cx_a, |p, cx| {
2597 p.find_or_create_local_worktree("/root-1", true, cx)
2598 })
2599 .await
2600 .unwrap();
2601 worktree_a
2602 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2603 .await;
2604 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2605 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2606 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2607
2608 // Join the worktree as client B.
2609 let project_b = Project::remote(
2610 project_id,
2611 client_b.clone(),
2612 client_b.user_store.clone(),
2613 lang_registry.clone(),
2614 fs.clone(),
2615 &mut cx_b.to_async(),
2616 )
2617 .await
2618 .unwrap();
2619
2620 // Open the file on client B.
2621 let buffer_b = cx_b
2622 .background()
2623 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2624 .await
2625 .unwrap();
2626
2627 // Request references to a symbol as the guest.
2628 let references = project_b.update(cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2629
2630 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2631 fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2632 assert_eq!(
2633 params.text_document_position.text_document.uri.as_str(),
2634 "file:///root-1/one.rs"
2635 );
2636 Some(vec![
2637 lsp::Location {
2638 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2639 range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2640 },
2641 lsp::Location {
2642 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2643 range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2644 },
2645 lsp::Location {
2646 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2647 range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2648 },
2649 ])
2650 });
2651
2652 let references = references.await.unwrap();
2653 cx_b.read(|cx| {
2654 assert_eq!(references.len(), 3);
2655 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2656
2657 let two_buffer = references[0].buffer.read(cx);
2658 let three_buffer = references[2].buffer.read(cx);
2659 assert_eq!(
2660 two_buffer.file().unwrap().path().as_ref(),
2661 Path::new("two.rs")
2662 );
2663 assert_eq!(references[1].buffer, references[0].buffer);
2664 assert_eq!(
2665 three_buffer.file().unwrap().full_path(cx),
2666 Path::new("three.rs")
2667 );
2668
2669 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2670 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2671 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2672 });
2673 }
2674
2675 #[gpui::test(iterations = 10)]
2676 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2677 cx_a.foreground().forbid_parking();
2678 let lang_registry = Arc::new(LanguageRegistry::new());
2679 let fs = FakeFs::new(cx_a.background());
2680 fs.insert_tree(
2681 "/root-1",
2682 json!({
2683 ".zed.toml": r#"collaborators = ["user_b"]"#,
2684 "a": "hello world",
2685 "b": "goodnight moon",
2686 "c": "a world of goo",
2687 "d": "world champion of clown world",
2688 }),
2689 )
2690 .await;
2691 fs.insert_tree(
2692 "/root-2",
2693 json!({
2694 "e": "disney world is fun",
2695 }),
2696 )
2697 .await;
2698
2699 // Connect to a server as 2 clients.
2700 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2701 let client_a = server.create_client(cx_a, "user_a").await;
2702 let client_b = server.create_client(cx_b, "user_b").await;
2703
2704 // Share a project as client A
2705 let project_a = cx_a.update(|cx| {
2706 Project::local(
2707 client_a.clone(),
2708 client_a.user_store.clone(),
2709 lang_registry.clone(),
2710 fs.clone(),
2711 cx,
2712 )
2713 });
2714 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2715
2716 let (worktree_1, _) = project_a
2717 .update(cx_a, |p, cx| {
2718 p.find_or_create_local_worktree("/root-1", true, cx)
2719 })
2720 .await
2721 .unwrap();
2722 worktree_1
2723 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2724 .await;
2725 let (worktree_2, _) = project_a
2726 .update(cx_a, |p, cx| {
2727 p.find_or_create_local_worktree("/root-2", true, cx)
2728 })
2729 .await
2730 .unwrap();
2731 worktree_2
2732 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2733 .await;
2734
2735 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2736
2737 // Join the worktree as client B.
2738 let project_b = Project::remote(
2739 project_id,
2740 client_b.clone(),
2741 client_b.user_store.clone(),
2742 lang_registry.clone(),
2743 fs.clone(),
2744 &mut cx_b.to_async(),
2745 )
2746 .await
2747 .unwrap();
2748
2749 let results = project_b
2750 .update(cx_b, |project, cx| {
2751 project.search(SearchQuery::text("world", false, false), cx)
2752 })
2753 .await
2754 .unwrap();
2755
2756 let mut ranges_by_path = results
2757 .into_iter()
2758 .map(|(buffer, ranges)| {
2759 buffer.read_with(cx_b, |buffer, cx| {
2760 let path = buffer.file().unwrap().full_path(cx);
2761 let offset_ranges = ranges
2762 .into_iter()
2763 .map(|range| range.to_offset(buffer))
2764 .collect::<Vec<_>>();
2765 (path, offset_ranges)
2766 })
2767 })
2768 .collect::<Vec<_>>();
2769 ranges_by_path.sort_by_key(|(path, _)| path.clone());
2770
2771 assert_eq!(
2772 ranges_by_path,
2773 &[
2774 (PathBuf::from("root-1/a"), vec![6..11]),
2775 (PathBuf::from("root-1/c"), vec![2..7]),
2776 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2777 (PathBuf::from("root-2/e"), vec![7..12]),
2778 ]
2779 );
2780 }
2781
2782 #[gpui::test(iterations = 10)]
2783 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2784 cx_a.foreground().forbid_parking();
2785 let lang_registry = Arc::new(LanguageRegistry::new());
2786 let fs = FakeFs::new(cx_a.background());
2787 fs.insert_tree(
2788 "/root-1",
2789 json!({
2790 ".zed.toml": r#"collaborators = ["user_b"]"#,
2791 "main.rs": "fn double(number: i32) -> i32 { number + number }",
2792 }),
2793 )
2794 .await;
2795
2796 // Set up a fake language server.
2797 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2798 lang_registry.add(Arc::new(Language::new(
2799 LanguageConfig {
2800 name: "Rust".into(),
2801 path_suffixes: vec!["rs".to_string()],
2802 language_server: Some(language_server_config),
2803 ..Default::default()
2804 },
2805 Some(tree_sitter_rust::language()),
2806 )));
2807
2808 // Connect to a server as 2 clients.
2809 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2810 let client_a = server.create_client(cx_a, "user_a").await;
2811 let client_b = server.create_client(cx_b, "user_b").await;
2812
2813 // Share a project as client A
2814 let project_a = cx_a.update(|cx| {
2815 Project::local(
2816 client_a.clone(),
2817 client_a.user_store.clone(),
2818 lang_registry.clone(),
2819 fs.clone(),
2820 cx,
2821 )
2822 });
2823 let (worktree_a, _) = project_a
2824 .update(cx_a, |p, cx| {
2825 p.find_or_create_local_worktree("/root-1", true, cx)
2826 })
2827 .await
2828 .unwrap();
2829 worktree_a
2830 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2831 .await;
2832 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2833 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2834 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2835
2836 // Join the worktree as client B.
2837 let project_b = Project::remote(
2838 project_id,
2839 client_b.clone(),
2840 client_b.user_store.clone(),
2841 lang_registry.clone(),
2842 fs.clone(),
2843 &mut cx_b.to_async(),
2844 )
2845 .await
2846 .unwrap();
2847
2848 // Open the file on client B.
2849 let buffer_b = cx_b
2850 .background()
2851 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2852 .await
2853 .unwrap();
2854
2855 // Request document highlights as the guest.
2856 let highlights = project_b.update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2857
2858 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2859 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2860 |params, _| {
2861 assert_eq!(
2862 params
2863 .text_document_position_params
2864 .text_document
2865 .uri
2866 .as_str(),
2867 "file:///root-1/main.rs"
2868 );
2869 assert_eq!(
2870 params.text_document_position_params.position,
2871 lsp::Position::new(0, 34)
2872 );
2873 Some(vec![
2874 lsp::DocumentHighlight {
2875 kind: Some(lsp::DocumentHighlightKind::WRITE),
2876 range: lsp::Range::new(
2877 lsp::Position::new(0, 10),
2878 lsp::Position::new(0, 16),
2879 ),
2880 },
2881 lsp::DocumentHighlight {
2882 kind: Some(lsp::DocumentHighlightKind::READ),
2883 range: lsp::Range::new(
2884 lsp::Position::new(0, 32),
2885 lsp::Position::new(0, 38),
2886 ),
2887 },
2888 lsp::DocumentHighlight {
2889 kind: Some(lsp::DocumentHighlightKind::READ),
2890 range: lsp::Range::new(
2891 lsp::Position::new(0, 41),
2892 lsp::Position::new(0, 47),
2893 ),
2894 },
2895 ])
2896 },
2897 );
2898
2899 let highlights = highlights.await.unwrap();
2900 buffer_b.read_with(cx_b, |buffer, _| {
2901 let snapshot = buffer.snapshot();
2902
2903 let highlights = highlights
2904 .into_iter()
2905 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2906 .collect::<Vec<_>>();
2907 assert_eq!(
2908 highlights,
2909 &[
2910 (lsp::DocumentHighlightKind::WRITE, 10..16),
2911 (lsp::DocumentHighlightKind::READ, 32..38),
2912 (lsp::DocumentHighlightKind::READ, 41..47)
2913 ]
2914 )
2915 });
2916 }
2917
2918 #[gpui::test(iterations = 10)]
2919 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2920 cx_a.foreground().forbid_parking();
2921 let mut lang_registry = Arc::new(LanguageRegistry::new());
2922 let fs = FakeFs::new(cx_a.background());
2923 fs.insert_tree(
2924 "/code",
2925 json!({
2926 "crate-1": {
2927 ".zed.toml": r#"collaborators = ["user_b"]"#,
2928 "one.rs": "const ONE: usize = 1;",
2929 },
2930 "crate-2": {
2931 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2932 },
2933 "private": {
2934 "passwords.txt": "the-password",
2935 }
2936 }),
2937 )
2938 .await;
2939
2940 // Set up a fake language server.
2941 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2942 Arc::get_mut(&mut lang_registry)
2943 .unwrap()
2944 .add(Arc::new(Language::new(
2945 LanguageConfig {
2946 name: "Rust".into(),
2947 path_suffixes: vec!["rs".to_string()],
2948 language_server: Some(language_server_config),
2949 ..Default::default()
2950 },
2951 Some(tree_sitter_rust::language()),
2952 )));
2953
2954 // Connect to a server as 2 clients.
2955 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2956 let client_a = server.create_client(cx_a, "user_a").await;
2957 let client_b = server.create_client(cx_b, "user_b").await;
2958
2959 // Share a project as client A
2960 let project_a = cx_a.update(|cx| {
2961 Project::local(
2962 client_a.clone(),
2963 client_a.user_store.clone(),
2964 lang_registry.clone(),
2965 fs.clone(),
2966 cx,
2967 )
2968 });
2969 let (worktree_a, _) = project_a
2970 .update(cx_a, |p, cx| {
2971 p.find_or_create_local_worktree("/code/crate-1", true, cx)
2972 })
2973 .await
2974 .unwrap();
2975 worktree_a
2976 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2977 .await;
2978 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2979 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2980 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2981
2982 // Join the worktree as client B.
2983 let project_b = Project::remote(
2984 project_id,
2985 client_b.clone(),
2986 client_b.user_store.clone(),
2987 lang_registry.clone(),
2988 fs.clone(),
2989 &mut cx_b.to_async(),
2990 )
2991 .await
2992 .unwrap();
2993
2994 // Cause the language server to start.
2995 let _buffer = cx_b
2996 .background()
2997 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2998 .await
2999 .unwrap();
3000
3001 // Request the definition of a symbol as the guest.
3002 let symbols = project_b.update(cx_b, |p, cx| p.symbols("two", cx));
3003 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3004 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3005 #[allow(deprecated)]
3006 Some(vec![lsp::SymbolInformation {
3007 name: "TWO".into(),
3008 location: lsp::Location {
3009 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3010 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3011 },
3012 kind: lsp::SymbolKind::CONSTANT,
3013 tags: None,
3014 container_name: None,
3015 deprecated: None,
3016 }])
3017 });
3018
3019 let symbols = symbols.await.unwrap();
3020 assert_eq!(symbols.len(), 1);
3021 assert_eq!(symbols[0].name, "TWO");
3022
3023 // Open one of the returned symbols.
3024 let buffer_b_2 = project_b
3025 .update(cx_b, |project, cx| {
3026 project.open_buffer_for_symbol(&symbols[0], cx)
3027 })
3028 .await
3029 .unwrap();
3030 buffer_b_2.read_with(cx_b, |buffer, _| {
3031 assert_eq!(
3032 buffer.file().unwrap().path().as_ref(),
3033 Path::new("../crate-2/two.rs")
3034 );
3035 });
3036
3037 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3038 let mut fake_symbol = symbols[0].clone();
3039 fake_symbol.path = Path::new("/code/secrets").into();
3040 let error = project_b
3041 .update(cx_b, |project, cx| {
3042 project.open_buffer_for_symbol(&fake_symbol, cx)
3043 })
3044 .await
3045 .unwrap_err();
3046 assert!(error.to_string().contains("invalid symbol signature"));
3047 }
3048
3049 #[gpui::test(iterations = 10)]
3050 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3051 cx_a: &mut TestAppContext,
3052 cx_b: &mut TestAppContext,
3053 mut rng: StdRng,
3054 ) {
3055 cx_a.foreground().forbid_parking();
3056 let mut lang_registry = Arc::new(LanguageRegistry::new());
3057 let fs = FakeFs::new(cx_a.background());
3058 fs.insert_tree(
3059 "/root",
3060 json!({
3061 ".zed.toml": r#"collaborators = ["user_b"]"#,
3062 "a.rs": "const ONE: usize = b::TWO;",
3063 "b.rs": "const TWO: usize = 2",
3064 }),
3065 )
3066 .await;
3067
3068 // Set up a fake language server.
3069 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3070
3071 Arc::get_mut(&mut lang_registry)
3072 .unwrap()
3073 .add(Arc::new(Language::new(
3074 LanguageConfig {
3075 name: "Rust".into(),
3076 path_suffixes: vec!["rs".to_string()],
3077 language_server: Some(language_server_config),
3078 ..Default::default()
3079 },
3080 Some(tree_sitter_rust::language()),
3081 )));
3082
3083 // Connect to a server as 2 clients.
3084 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3085 let client_a = server.create_client(cx_a, "user_a").await;
3086 let client_b = server.create_client(cx_b, "user_b").await;
3087
3088 // Share a project as client A
3089 let project_a = cx_a.update(|cx| {
3090 Project::local(
3091 client_a.clone(),
3092 client_a.user_store.clone(),
3093 lang_registry.clone(),
3094 fs.clone(),
3095 cx,
3096 )
3097 });
3098
3099 let (worktree_a, _) = project_a
3100 .update(cx_a, |p, cx| {
3101 p.find_or_create_local_worktree("/root", true, cx)
3102 })
3103 .await
3104 .unwrap();
3105 worktree_a
3106 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3107 .await;
3108 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3109 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3110 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3111
3112 // Join the worktree as client B.
3113 let project_b = Project::remote(
3114 project_id,
3115 client_b.clone(),
3116 client_b.user_store.clone(),
3117 lang_registry.clone(),
3118 fs.clone(),
3119 &mut cx_b.to_async(),
3120 )
3121 .await
3122 .unwrap();
3123
3124 let buffer_b1 = cx_b
3125 .background()
3126 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3127 .await
3128 .unwrap();
3129
3130 let definitions;
3131 let buffer_b2;
3132 if rng.gen() {
3133 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3134 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3135 } else {
3136 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3137 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3138 }
3139
3140 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3141 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3142 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3143 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3144 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3145 )))
3146 });
3147
3148 let buffer_b2 = buffer_b2.await.unwrap();
3149 let definitions = definitions.await.unwrap();
3150 assert_eq!(definitions.len(), 1);
3151 assert_eq!(definitions[0].buffer, buffer_b2);
3152 }
3153
3154 #[gpui::test(iterations = 10)]
3155 async fn test_collaborating_with_code_actions(
3156 cx_a: &mut TestAppContext,
3157 cx_b: &mut TestAppContext,
3158 ) {
3159 cx_a.foreground().forbid_parking();
3160 let mut lang_registry = Arc::new(LanguageRegistry::new());
3161 let fs = FakeFs::new(cx_a.background());
3162 let mut path_openers_b = Vec::new();
3163 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3164
3165 // Set up a fake language server.
3166 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3167 Arc::get_mut(&mut lang_registry)
3168 .unwrap()
3169 .add(Arc::new(Language::new(
3170 LanguageConfig {
3171 name: "Rust".into(),
3172 path_suffixes: vec!["rs".to_string()],
3173 language_server: Some(language_server_config),
3174 ..Default::default()
3175 },
3176 Some(tree_sitter_rust::language()),
3177 )));
3178
3179 // Connect to a server as 2 clients.
3180 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3181 let client_a = server.create_client(cx_a, "user_a").await;
3182 let client_b = server.create_client(cx_b, "user_b").await;
3183
3184 // Share a project as client A
3185 fs.insert_tree(
3186 "/a",
3187 json!({
3188 ".zed.toml": r#"collaborators = ["user_b"]"#,
3189 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3190 "other.rs": "pub fn foo() -> usize { 4 }",
3191 }),
3192 )
3193 .await;
3194 let project_a = cx_a.update(|cx| {
3195 Project::local(
3196 client_a.clone(),
3197 client_a.user_store.clone(),
3198 lang_registry.clone(),
3199 fs.clone(),
3200 cx,
3201 )
3202 });
3203 let (worktree_a, _) = project_a
3204 .update(cx_a, |p, cx| {
3205 p.find_or_create_local_worktree("/a", true, cx)
3206 })
3207 .await
3208 .unwrap();
3209 worktree_a
3210 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3211 .await;
3212 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3213 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3214 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3215
3216 // Join the worktree as client B.
3217 let project_b = Project::remote(
3218 project_id,
3219 client_b.clone(),
3220 client_b.user_store.clone(),
3221 lang_registry.clone(),
3222 fs.clone(),
3223 &mut cx_b.to_async(),
3224 )
3225 .await
3226 .unwrap();
3227 let mut params = cx_b.update(WorkspaceParams::test);
3228 params.languages = lang_registry.clone();
3229 params.client = client_b.client.clone();
3230 params.user_store = client_b.user_store.clone();
3231 params.project = project_b;
3232 params.path_openers = path_openers_b.into();
3233
3234 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3235 let editor_b = workspace_b
3236 .update(cx_b, |workspace, cx| {
3237 workspace.open_path((worktree_id, "main.rs").into(), cx)
3238 })
3239 .await
3240 .unwrap()
3241 .downcast::<Editor>()
3242 .unwrap();
3243
3244 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3245 fake_language_server
3246 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3247 assert_eq!(
3248 params.text_document.uri,
3249 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3250 );
3251 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3252 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3253 None
3254 })
3255 .next()
3256 .await;
3257
3258 // Move cursor to a location that contains code actions.
3259 editor_b.update(cx_b, |editor, cx| {
3260 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3261 cx.focus(&editor_b);
3262 });
3263
3264 fake_language_server
3265 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3266 assert_eq!(
3267 params.text_document.uri,
3268 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3269 );
3270 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3271 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3272
3273 Some(vec![lsp::CodeActionOrCommand::CodeAction(
3274 lsp::CodeAction {
3275 title: "Inline into all callers".to_string(),
3276 edit: Some(lsp::WorkspaceEdit {
3277 changes: Some(
3278 [
3279 (
3280 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3281 vec![lsp::TextEdit::new(
3282 lsp::Range::new(
3283 lsp::Position::new(1, 22),
3284 lsp::Position::new(1, 34),
3285 ),
3286 "4".to_string(),
3287 )],
3288 ),
3289 (
3290 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3291 vec![lsp::TextEdit::new(
3292 lsp::Range::new(
3293 lsp::Position::new(0, 0),
3294 lsp::Position::new(0, 27),
3295 ),
3296 "".to_string(),
3297 )],
3298 ),
3299 ]
3300 .into_iter()
3301 .collect(),
3302 ),
3303 ..Default::default()
3304 }),
3305 data: Some(json!({
3306 "codeActionParams": {
3307 "range": {
3308 "start": {"line": 1, "column": 31},
3309 "end": {"line": 1, "column": 31},
3310 }
3311 }
3312 })),
3313 ..Default::default()
3314 },
3315 )])
3316 })
3317 .next()
3318 .await;
3319
3320 // Toggle code actions and wait for them to display.
3321 editor_b.update(cx_b, |editor, cx| {
3322 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3323 });
3324 editor_b
3325 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3326 .await;
3327
3328 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3329
3330 // Confirming the code action will trigger a resolve request.
3331 let confirm_action = workspace_b
3332 .update(cx_b, |workspace, cx| {
3333 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3334 })
3335 .unwrap();
3336 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3337 lsp::CodeAction {
3338 title: "Inline into all callers".to_string(),
3339 edit: Some(lsp::WorkspaceEdit {
3340 changes: Some(
3341 [
3342 (
3343 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3344 vec![lsp::TextEdit::new(
3345 lsp::Range::new(
3346 lsp::Position::new(1, 22),
3347 lsp::Position::new(1, 34),
3348 ),
3349 "4".to_string(),
3350 )],
3351 ),
3352 (
3353 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3354 vec![lsp::TextEdit::new(
3355 lsp::Range::new(
3356 lsp::Position::new(0, 0),
3357 lsp::Position::new(0, 27),
3358 ),
3359 "".to_string(),
3360 )],
3361 ),
3362 ]
3363 .into_iter()
3364 .collect(),
3365 ),
3366 ..Default::default()
3367 }),
3368 ..Default::default()
3369 }
3370 });
3371
3372 // After the action is confirmed, an editor containing both modified files is opened.
3373 confirm_action.await.unwrap();
3374 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3375 workspace
3376 .active_item(cx)
3377 .unwrap()
3378 .downcast::<Editor>()
3379 .unwrap()
3380 });
3381 code_action_editor.update(cx_b, |editor, cx| {
3382 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3383 editor.undo(&Undo, cx);
3384 assert_eq!(
3385 editor.text(cx),
3386 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3387 );
3388 editor.redo(&Redo, cx);
3389 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3390 });
3391 }
3392
3393 #[gpui::test(iterations = 10)]
3394 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3395 cx_a.foreground().forbid_parking();
3396 let mut lang_registry = Arc::new(LanguageRegistry::new());
3397 let fs = FakeFs::new(cx_a.background());
3398 let mut path_openers_b = Vec::new();
3399 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3400
3401 // Set up a fake language server.
3402 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3403 Arc::get_mut(&mut lang_registry)
3404 .unwrap()
3405 .add(Arc::new(Language::new(
3406 LanguageConfig {
3407 name: "Rust".into(),
3408 path_suffixes: vec!["rs".to_string()],
3409 language_server: Some(language_server_config),
3410 ..Default::default()
3411 },
3412 Some(tree_sitter_rust::language()),
3413 )));
3414
3415 // Connect to a server as 2 clients.
3416 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3417 let client_a = server.create_client(cx_a, "user_a").await;
3418 let client_b = server.create_client(cx_b, "user_b").await;
3419
3420 // Share a project as client A
3421 fs.insert_tree(
3422 "/dir",
3423 json!({
3424 ".zed.toml": r#"collaborators = ["user_b"]"#,
3425 "one.rs": "const ONE: usize = 1;",
3426 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3427 }),
3428 )
3429 .await;
3430 let project_a = cx_a.update(|cx| {
3431 Project::local(
3432 client_a.clone(),
3433 client_a.user_store.clone(),
3434 lang_registry.clone(),
3435 fs.clone(),
3436 cx,
3437 )
3438 });
3439 let (worktree_a, _) = project_a
3440 .update(cx_a, |p, cx| {
3441 p.find_or_create_local_worktree("/dir", true, cx)
3442 })
3443 .await
3444 .unwrap();
3445 worktree_a
3446 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3447 .await;
3448 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3449 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3450 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3451
3452 // Join the worktree as client B.
3453 let project_b = Project::remote(
3454 project_id,
3455 client_b.clone(),
3456 client_b.user_store.clone(),
3457 lang_registry.clone(),
3458 fs.clone(),
3459 &mut cx_b.to_async(),
3460 )
3461 .await
3462 .unwrap();
3463 let mut params = cx_b.update(WorkspaceParams::test);
3464 params.languages = lang_registry.clone();
3465 params.client = client_b.client.clone();
3466 params.user_store = client_b.user_store.clone();
3467 params.project = project_b;
3468 params.path_openers = path_openers_b.into();
3469
3470 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3471 let editor_b = workspace_b
3472 .update(cx_b, |workspace, cx| {
3473 workspace.open_path((worktree_id, "one.rs").into(), cx)
3474 })
3475 .await
3476 .unwrap()
3477 .downcast::<Editor>()
3478 .unwrap();
3479 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3480
3481 // Move cursor to a location that can be renamed.
3482 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3483 editor.select_ranges([7..7], None, cx);
3484 editor.rename(&Rename, cx).unwrap()
3485 });
3486
3487 fake_language_server
3488 .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3489 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3490 assert_eq!(params.position, lsp::Position::new(0, 7));
3491 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3492 lsp::Position::new(0, 6),
3493 lsp::Position::new(0, 9),
3494 )))
3495 })
3496 .next()
3497 .await
3498 .unwrap();
3499 prepare_rename.await.unwrap();
3500 editor_b.update(cx_b, |editor, cx| {
3501 let rename = editor.pending_rename().unwrap();
3502 let buffer = editor.buffer().read(cx).snapshot(cx);
3503 assert_eq!(
3504 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3505 6..9
3506 );
3507 rename.editor.update(cx, |rename_editor, cx| {
3508 rename_editor.buffer().update(cx, |rename_buffer, cx| {
3509 rename_buffer.edit([0..3], "THREE", cx);
3510 });
3511 });
3512 });
3513
3514 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3515 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3516 });
3517 fake_language_server
3518 .handle_request::<lsp::request::Rename, _>(|params, _| {
3519 assert_eq!(
3520 params.text_document_position.text_document.uri.as_str(),
3521 "file:///dir/one.rs"
3522 );
3523 assert_eq!(
3524 params.text_document_position.position,
3525 lsp::Position::new(0, 6)
3526 );
3527 assert_eq!(params.new_name, "THREE");
3528 Some(lsp::WorkspaceEdit {
3529 changes: Some(
3530 [
3531 (
3532 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3533 vec![lsp::TextEdit::new(
3534 lsp::Range::new(
3535 lsp::Position::new(0, 6),
3536 lsp::Position::new(0, 9),
3537 ),
3538 "THREE".to_string(),
3539 )],
3540 ),
3541 (
3542 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3543 vec![
3544 lsp::TextEdit::new(
3545 lsp::Range::new(
3546 lsp::Position::new(0, 24),
3547 lsp::Position::new(0, 27),
3548 ),
3549 "THREE".to_string(),
3550 ),
3551 lsp::TextEdit::new(
3552 lsp::Range::new(
3553 lsp::Position::new(0, 35),
3554 lsp::Position::new(0, 38),
3555 ),
3556 "THREE".to_string(),
3557 ),
3558 ],
3559 ),
3560 ]
3561 .into_iter()
3562 .collect(),
3563 ),
3564 ..Default::default()
3565 })
3566 })
3567 .next()
3568 .await
3569 .unwrap();
3570 confirm_rename.await.unwrap();
3571
3572 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3573 workspace
3574 .active_item(cx)
3575 .unwrap()
3576 .downcast::<Editor>()
3577 .unwrap()
3578 });
3579 rename_editor.update(cx_b, |editor, cx| {
3580 assert_eq!(
3581 editor.text(cx),
3582 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3583 );
3584 editor.undo(&Undo, cx);
3585 assert_eq!(
3586 editor.text(cx),
3587 "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3588 );
3589 editor.redo(&Redo, cx);
3590 assert_eq!(
3591 editor.text(cx),
3592 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3593 );
3594 });
3595
3596 // Ensure temporary rename edits cannot be undone/redone.
3597 editor_b.update(cx_b, |editor, cx| {
3598 editor.undo(&Undo, cx);
3599 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3600 editor.undo(&Undo, cx);
3601 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3602 editor.redo(&Redo, cx);
3603 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3604 })
3605 }
3606
3607 #[gpui::test(iterations = 10)]
3608 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3609 cx_a.foreground().forbid_parking();
3610
3611 // Connect to a server as 2 clients.
3612 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3613 let client_a = server.create_client(cx_a, "user_a").await;
3614 let client_b = server.create_client(cx_b, "user_b").await;
3615
3616 // Create an org that includes these 2 users.
3617 let db = &server.app_state.db;
3618 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3619 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3620 .await
3621 .unwrap();
3622 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3623 .await
3624 .unwrap();
3625
3626 // Create a channel that includes all the users.
3627 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3628 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3629 .await
3630 .unwrap();
3631 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3632 .await
3633 .unwrap();
3634 db.create_channel_message(
3635 channel_id,
3636 client_b.current_user_id(&cx_b),
3637 "hello A, it's B.",
3638 OffsetDateTime::now_utc(),
3639 1,
3640 )
3641 .await
3642 .unwrap();
3643
3644 let channels_a = cx_a
3645 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3646 channels_a
3647 .condition(cx_a, |list, _| list.available_channels().is_some())
3648 .await;
3649 channels_a.read_with(cx_a, |list, _| {
3650 assert_eq!(
3651 list.available_channels().unwrap(),
3652 &[ChannelDetails {
3653 id: channel_id.to_proto(),
3654 name: "test-channel".to_string()
3655 }]
3656 )
3657 });
3658 let channel_a = channels_a.update(cx_a, |this, cx| {
3659 this.get_channel(channel_id.to_proto(), cx).unwrap()
3660 });
3661 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3662 channel_a
3663 .condition(&cx_a, |channel, _| {
3664 channel_messages(channel)
3665 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3666 })
3667 .await;
3668
3669 let channels_b = cx_b
3670 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3671 channels_b
3672 .condition(cx_b, |list, _| list.available_channels().is_some())
3673 .await;
3674 channels_b.read_with(cx_b, |list, _| {
3675 assert_eq!(
3676 list.available_channels().unwrap(),
3677 &[ChannelDetails {
3678 id: channel_id.to_proto(),
3679 name: "test-channel".to_string()
3680 }]
3681 )
3682 });
3683
3684 let channel_b = channels_b.update(cx_b, |this, cx| {
3685 this.get_channel(channel_id.to_proto(), cx).unwrap()
3686 });
3687 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3688 channel_b
3689 .condition(&cx_b, |channel, _| {
3690 channel_messages(channel)
3691 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3692 })
3693 .await;
3694
3695 channel_a
3696 .update(cx_a, |channel, cx| {
3697 channel
3698 .send_message("oh, hi B.".to_string(), cx)
3699 .unwrap()
3700 .detach();
3701 let task = channel.send_message("sup".to_string(), cx).unwrap();
3702 assert_eq!(
3703 channel_messages(channel),
3704 &[
3705 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3706 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3707 ("user_a".to_string(), "sup".to_string(), true)
3708 ]
3709 );
3710 task
3711 })
3712 .await
3713 .unwrap();
3714
3715 channel_b
3716 .condition(&cx_b, |channel, _| {
3717 channel_messages(channel)
3718 == [
3719 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3720 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3721 ("user_a".to_string(), "sup".to_string(), false),
3722 ]
3723 })
3724 .await;
3725
3726 assert_eq!(
3727 server
3728 .state()
3729 .await
3730 .channel(channel_id)
3731 .unwrap()
3732 .connection_ids
3733 .len(),
3734 2
3735 );
3736 cx_b.update(|_| drop(channel_b));
3737 server
3738 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3739 .await;
3740
3741 cx_a.update(|_| drop(channel_a));
3742 server
3743 .condition(|state| state.channel(channel_id).is_none())
3744 .await;
3745 }
3746
3747 #[gpui::test(iterations = 10)]
3748 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3749 cx_a.foreground().forbid_parking();
3750
3751 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3752 let client_a = server.create_client(cx_a, "user_a").await;
3753
3754 let db = &server.app_state.db;
3755 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3756 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3757 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3758 .await
3759 .unwrap();
3760 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3761 .await
3762 .unwrap();
3763
3764 let channels_a = cx_a
3765 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3766 channels_a
3767 .condition(cx_a, |list, _| list.available_channels().is_some())
3768 .await;
3769 let channel_a = channels_a.update(cx_a, |this, cx| {
3770 this.get_channel(channel_id.to_proto(), cx).unwrap()
3771 });
3772
3773 // Messages aren't allowed to be too long.
3774 channel_a
3775 .update(cx_a, |channel, cx| {
3776 let long_body = "this is long.\n".repeat(1024);
3777 channel.send_message(long_body, cx).unwrap()
3778 })
3779 .await
3780 .unwrap_err();
3781
3782 // Messages aren't allowed to be blank.
3783 channel_a.update(cx_a, |channel, cx| {
3784 channel.send_message(String::new(), cx).unwrap_err()
3785 });
3786
3787 // Leading and trailing whitespace are trimmed.
3788 channel_a
3789 .update(cx_a, |channel, cx| {
3790 channel
3791 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3792 .unwrap()
3793 })
3794 .await
3795 .unwrap();
3796 assert_eq!(
3797 db.get_channel_messages(channel_id, 10, None)
3798 .await
3799 .unwrap()
3800 .iter()
3801 .map(|m| &m.body)
3802 .collect::<Vec<_>>(),
3803 &["surrounded by whitespace"]
3804 );
3805 }
3806
3807 #[gpui::test(iterations = 10)]
3808 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3809 cx_a.foreground().forbid_parking();
3810
3811 // Connect to a server as 2 clients.
3812 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3813 let client_a = server.create_client(cx_a, "user_a").await;
3814 let client_b = server.create_client(cx_b, "user_b").await;
3815 let mut status_b = client_b.status();
3816
3817 // Create an org that includes these 2 users.
3818 let db = &server.app_state.db;
3819 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3820 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3821 .await
3822 .unwrap();
3823 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3824 .await
3825 .unwrap();
3826
3827 // Create a channel that includes all the users.
3828 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3829 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3830 .await
3831 .unwrap();
3832 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3833 .await
3834 .unwrap();
3835 db.create_channel_message(
3836 channel_id,
3837 client_b.current_user_id(&cx_b),
3838 "hello A, it's B.",
3839 OffsetDateTime::now_utc(),
3840 2,
3841 )
3842 .await
3843 .unwrap();
3844
3845 let channels_a = cx_a
3846 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3847 channels_a
3848 .condition(cx_a, |list, _| list.available_channels().is_some())
3849 .await;
3850
3851 channels_a.read_with(cx_a, |list, _| {
3852 assert_eq!(
3853 list.available_channels().unwrap(),
3854 &[ChannelDetails {
3855 id: channel_id.to_proto(),
3856 name: "test-channel".to_string()
3857 }]
3858 )
3859 });
3860 let channel_a = channels_a.update(cx_a, |this, cx| {
3861 this.get_channel(channel_id.to_proto(), cx).unwrap()
3862 });
3863 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3864 channel_a
3865 .condition(&cx_a, |channel, _| {
3866 channel_messages(channel)
3867 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3868 })
3869 .await;
3870
3871 let channels_b = cx_b
3872 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3873 channels_b
3874 .condition(cx_b, |list, _| list.available_channels().is_some())
3875 .await;
3876 channels_b.read_with(cx_b, |list, _| {
3877 assert_eq!(
3878 list.available_channels().unwrap(),
3879 &[ChannelDetails {
3880 id: channel_id.to_proto(),
3881 name: "test-channel".to_string()
3882 }]
3883 )
3884 });
3885
3886 let channel_b = channels_b.update(cx_b, |this, cx| {
3887 this.get_channel(channel_id.to_proto(), cx).unwrap()
3888 });
3889 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3890 channel_b
3891 .condition(&cx_b, |channel, _| {
3892 channel_messages(channel)
3893 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3894 })
3895 .await;
3896
3897 // Disconnect client B, ensuring we can still access its cached channel data.
3898 server.forbid_connections();
3899 server.disconnect_client(client_b.current_user_id(&cx_b));
3900 cx_b.foreground().advance_clock(Duration::from_secs(3));
3901 while !matches!(
3902 status_b.next().await,
3903 Some(client::Status::ReconnectionError { .. })
3904 ) {}
3905
3906 channels_b.read_with(cx_b, |channels, _| {
3907 assert_eq!(
3908 channels.available_channels().unwrap(),
3909 [ChannelDetails {
3910 id: channel_id.to_proto(),
3911 name: "test-channel".to_string()
3912 }]
3913 )
3914 });
3915 channel_b.read_with(cx_b, |channel, _| {
3916 assert_eq!(
3917 channel_messages(channel),
3918 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3919 )
3920 });
3921
3922 // Send a message from client B while it is disconnected.
3923 channel_b
3924 .update(cx_b, |channel, cx| {
3925 let task = channel
3926 .send_message("can you see this?".to_string(), cx)
3927 .unwrap();
3928 assert_eq!(
3929 channel_messages(channel),
3930 &[
3931 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3932 ("user_b".to_string(), "can you see this?".to_string(), true)
3933 ]
3934 );
3935 task
3936 })
3937 .await
3938 .unwrap_err();
3939
3940 // Send a message from client A while B is disconnected.
3941 channel_a
3942 .update(cx_a, |channel, cx| {
3943 channel
3944 .send_message("oh, hi B.".to_string(), cx)
3945 .unwrap()
3946 .detach();
3947 let task = channel.send_message("sup".to_string(), cx).unwrap();
3948 assert_eq!(
3949 channel_messages(channel),
3950 &[
3951 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3952 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3953 ("user_a".to_string(), "sup".to_string(), true)
3954 ]
3955 );
3956 task
3957 })
3958 .await
3959 .unwrap();
3960
3961 // Give client B a chance to reconnect.
3962 server.allow_connections();
3963 cx_b.foreground().advance_clock(Duration::from_secs(10));
3964
3965 // Verify that B sees the new messages upon reconnection, as well as the message client B
3966 // sent while offline.
3967 channel_b
3968 .condition(&cx_b, |channel, _| {
3969 channel_messages(channel)
3970 == [
3971 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3972 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3973 ("user_a".to_string(), "sup".to_string(), false),
3974 ("user_b".to_string(), "can you see this?".to_string(), false),
3975 ]
3976 })
3977 .await;
3978
3979 // Ensure client A and B can communicate normally after reconnection.
3980 channel_a
3981 .update(cx_a, |channel, cx| {
3982 channel.send_message("you online?".to_string(), cx).unwrap()
3983 })
3984 .await
3985 .unwrap();
3986 channel_b
3987 .condition(&cx_b, |channel, _| {
3988 channel_messages(channel)
3989 == [
3990 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3991 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3992 ("user_a".to_string(), "sup".to_string(), false),
3993 ("user_b".to_string(), "can you see this?".to_string(), false),
3994 ("user_a".to_string(), "you online?".to_string(), false),
3995 ]
3996 })
3997 .await;
3998
3999 channel_b
4000 .update(cx_b, |channel, cx| {
4001 channel.send_message("yep".to_string(), cx).unwrap()
4002 })
4003 .await
4004 .unwrap();
4005 channel_a
4006 .condition(&cx_a, |channel, _| {
4007 channel_messages(channel)
4008 == [
4009 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4010 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4011 ("user_a".to_string(), "sup".to_string(), false),
4012 ("user_b".to_string(), "can you see this?".to_string(), false),
4013 ("user_a".to_string(), "you online?".to_string(), false),
4014 ("user_b".to_string(), "yep".to_string(), false),
4015 ]
4016 })
4017 .await;
4018 }
4019
4020 #[gpui::test(iterations = 10)]
4021 async fn test_contacts(
4022 cx_a: &mut TestAppContext,
4023 cx_b: &mut TestAppContext,
4024 cx_c: &mut TestAppContext,
4025 ) {
4026 cx_a.foreground().forbid_parking();
4027 let lang_registry = Arc::new(LanguageRegistry::new());
4028 let fs = FakeFs::new(cx_a.background());
4029
4030 // Connect to a server as 3 clients.
4031 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4032 let client_a = server.create_client(cx_a, "user_a").await;
4033 let client_b = server.create_client(cx_b, "user_b").await;
4034 let client_c = server.create_client(cx_c, "user_c").await;
4035
4036 // Share a worktree as client A.
4037 fs.insert_tree(
4038 "/a",
4039 json!({
4040 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4041 }),
4042 )
4043 .await;
4044
4045 let project_a = cx_a.update(|cx| {
4046 Project::local(
4047 client_a.clone(),
4048 client_a.user_store.clone(),
4049 lang_registry.clone(),
4050 fs.clone(),
4051 cx,
4052 )
4053 });
4054 let (worktree_a, _) = project_a
4055 .update(cx_a, |p, cx| {
4056 p.find_or_create_local_worktree("/a", true, cx)
4057 })
4058 .await
4059 .unwrap();
4060 worktree_a
4061 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4062 .await;
4063
4064 client_a
4065 .user_store
4066 .condition(&cx_a, |user_store, _| {
4067 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4068 })
4069 .await;
4070 client_b
4071 .user_store
4072 .condition(&cx_b, |user_store, _| {
4073 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4074 })
4075 .await;
4076 client_c
4077 .user_store
4078 .condition(&cx_c, |user_store, _| {
4079 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4080 })
4081 .await;
4082
4083 let project_id = project_a
4084 .update(cx_a, |project, _| project.next_remote_id())
4085 .await;
4086 project_a
4087 .update(cx_a, |project, cx| project.share(cx))
4088 .await
4089 .unwrap();
4090
4091 let _project_b = Project::remote(
4092 project_id,
4093 client_b.clone(),
4094 client_b.user_store.clone(),
4095 lang_registry.clone(),
4096 fs.clone(),
4097 &mut cx_b.to_async(),
4098 )
4099 .await
4100 .unwrap();
4101
4102 client_a
4103 .user_store
4104 .condition(&cx_a, |user_store, _| {
4105 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4106 })
4107 .await;
4108 client_b
4109 .user_store
4110 .condition(&cx_b, |user_store, _| {
4111 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4112 })
4113 .await;
4114 client_c
4115 .user_store
4116 .condition(&cx_c, |user_store, _| {
4117 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4118 })
4119 .await;
4120
4121 project_a
4122 .condition(&cx_a, |project, _| {
4123 project.collaborators().contains_key(&client_b.peer_id)
4124 })
4125 .await;
4126
4127 cx_a.update(move |_| drop(project_a));
4128 client_a
4129 .user_store
4130 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4131 .await;
4132 client_b
4133 .user_store
4134 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4135 .await;
4136 client_c
4137 .user_store
4138 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4139 .await;
4140
4141 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4142 user_store
4143 .contacts()
4144 .iter()
4145 .map(|contact| {
4146 let worktrees = contact
4147 .projects
4148 .iter()
4149 .map(|p| {
4150 (
4151 p.worktree_root_names[0].as_str(),
4152 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4153 )
4154 })
4155 .collect();
4156 (contact.user.github_login.as_str(), worktrees)
4157 })
4158 .collect()
4159 }
4160 }
4161
4162 #[gpui::test(iterations = 100)]
4163 async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4164 cx.foreground().forbid_parking();
4165 let max_peers = env::var("MAX_PEERS")
4166 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4167 .unwrap_or(5);
4168 let max_operations = env::var("OPERATIONS")
4169 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4170 .unwrap_or(10);
4171
4172 let rng = Arc::new(Mutex::new(rng));
4173
4174 let guest_lang_registry = Arc::new(LanguageRegistry::new());
4175 let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4176
4177 let fs = FakeFs::new(cx.background());
4178 fs.insert_tree(
4179 "/_collab",
4180 json!({
4181 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4182 }),
4183 )
4184 .await;
4185
4186 let operations = Rc::new(Cell::new(0));
4187 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4188 let mut clients = Vec::new();
4189
4190 let mut next_entity_id = 100000;
4191 let mut host_cx = TestAppContext::new(
4192 cx.foreground_platform(),
4193 cx.platform(),
4194 cx.foreground(),
4195 cx.background(),
4196 cx.font_cache(),
4197 cx.leak_detector(),
4198 next_entity_id,
4199 );
4200 let host = server.create_client(&mut host_cx, "host").await;
4201 let host_project = host_cx.update(|cx| {
4202 Project::local(
4203 host.client.clone(),
4204 host.user_store.clone(),
4205 Arc::new(LanguageRegistry::new()),
4206 fs.clone(),
4207 cx,
4208 )
4209 });
4210 let host_project_id = host_project
4211 .update(&mut host_cx, |p, _| p.next_remote_id())
4212 .await;
4213
4214 let (collab_worktree, _) = host_project
4215 .update(&mut host_cx, |project, cx| {
4216 project.find_or_create_local_worktree("/_collab", true, cx)
4217 })
4218 .await
4219 .unwrap();
4220 collab_worktree
4221 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4222 .await;
4223 host_project
4224 .update(&mut host_cx, |project, cx| project.share(cx))
4225 .await
4226 .unwrap();
4227
4228 clients.push(cx.foreground().spawn(host.simulate_host(
4229 host_project,
4230 language_server_config,
4231 operations.clone(),
4232 max_operations,
4233 rng.clone(),
4234 host_cx,
4235 )));
4236
4237 while operations.get() < max_operations {
4238 cx.background().simulate_random_delay().await;
4239 if clients.len() >= max_peers {
4240 break;
4241 } else if rng.lock().gen_bool(0.05) {
4242 operations.set(operations.get() + 1);
4243
4244 let guest_id = clients.len();
4245 log::info!("Adding guest {}", guest_id);
4246 next_entity_id += 100000;
4247 let mut guest_cx = TestAppContext::new(
4248 cx.foreground_platform(),
4249 cx.platform(),
4250 cx.foreground(),
4251 cx.background(),
4252 cx.font_cache(),
4253 cx.leak_detector(),
4254 next_entity_id,
4255 );
4256 let guest = server
4257 .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4258 .await;
4259 let guest_project = Project::remote(
4260 host_project_id,
4261 guest.client.clone(),
4262 guest.user_store.clone(),
4263 guest_lang_registry.clone(),
4264 FakeFs::new(cx.background()),
4265 &mut guest_cx.to_async(),
4266 )
4267 .await
4268 .unwrap();
4269 clients.push(cx.foreground().spawn(guest.simulate_guest(
4270 guest_id,
4271 guest_project,
4272 operations.clone(),
4273 max_operations,
4274 rng.clone(),
4275 guest_cx,
4276 )));
4277
4278 log::info!("Guest {} added", guest_id);
4279 }
4280 }
4281
4282 let mut clients = futures::future::join_all(clients).await;
4283 cx.foreground().run_until_parked();
4284
4285 let (host_client, mut host_cx) = clients.remove(0);
4286 let host_project = host_client.project.as_ref().unwrap();
4287 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4288 project
4289 .worktrees(cx)
4290 .map(|worktree| {
4291 let snapshot = worktree.read(cx).snapshot();
4292 (snapshot.id(), snapshot)
4293 })
4294 .collect::<BTreeMap<_, _>>()
4295 });
4296
4297 host_client
4298 .project
4299 .as_ref()
4300 .unwrap()
4301 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4302
4303 for (guest_client, mut guest_cx) in clients.into_iter() {
4304 let guest_id = guest_client.client.id();
4305 let worktree_snapshots =
4306 guest_client
4307 .project
4308 .as_ref()
4309 .unwrap()
4310 .read_with(&guest_cx, |project, cx| {
4311 project
4312 .worktrees(cx)
4313 .map(|worktree| {
4314 let worktree = worktree.read(cx);
4315 (worktree.id(), worktree.snapshot())
4316 })
4317 .collect::<BTreeMap<_, _>>()
4318 });
4319
4320 assert_eq!(
4321 worktree_snapshots.keys().collect::<Vec<_>>(),
4322 host_worktree_snapshots.keys().collect::<Vec<_>>(),
4323 "guest {} has different worktrees than the host",
4324 guest_id
4325 );
4326 for (id, host_snapshot) in &host_worktree_snapshots {
4327 let guest_snapshot = &worktree_snapshots[id];
4328 assert_eq!(
4329 guest_snapshot.root_name(),
4330 host_snapshot.root_name(),
4331 "guest {} has different root name than the host for worktree {}",
4332 guest_id,
4333 id
4334 );
4335 assert_eq!(
4336 guest_snapshot.entries(false).collect::<Vec<_>>(),
4337 host_snapshot.entries(false).collect::<Vec<_>>(),
4338 "guest {} has different snapshot than the host for worktree {}",
4339 guest_id,
4340 id
4341 );
4342 }
4343
4344 guest_client
4345 .project
4346 .as_ref()
4347 .unwrap()
4348 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4349
4350 for guest_buffer in &guest_client.buffers {
4351 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4352 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4353 project.buffer_for_id(buffer_id, cx).expect(&format!(
4354 "host does not have buffer for guest:{}, peer:{}, id:{}",
4355 guest_id, guest_client.peer_id, buffer_id
4356 ))
4357 });
4358 let path = host_buffer
4359 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4360
4361 assert_eq!(
4362 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4363 0,
4364 "guest {}, buffer {}, path {:?} has deferred operations",
4365 guest_id,
4366 buffer_id,
4367 path,
4368 );
4369 assert_eq!(
4370 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4371 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4372 "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4373 guest_id,
4374 buffer_id,
4375 path
4376 );
4377 }
4378
4379 guest_cx.update(|_| drop(guest_client));
4380 }
4381
4382 host_cx.update(|_| drop(host_client));
4383 }
4384
4385 struct TestServer {
4386 peer: Arc<Peer>,
4387 app_state: Arc<AppState>,
4388 server: Arc<Server>,
4389 foreground: Rc<executor::Foreground>,
4390 notifications: mpsc::UnboundedReceiver<()>,
4391 connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4392 forbid_connections: Arc<AtomicBool>,
4393 _test_db: TestDb,
4394 }
4395
4396 impl TestServer {
4397 async fn start(
4398 foreground: Rc<executor::Foreground>,
4399 background: Arc<executor::Background>,
4400 ) -> Self {
4401 let test_db = TestDb::fake(background);
4402 let app_state = Self::build_app_state(&test_db).await;
4403 let peer = Peer::new();
4404 let notifications = mpsc::unbounded();
4405 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4406 Self {
4407 peer,
4408 app_state,
4409 server,
4410 foreground,
4411 notifications: notifications.1,
4412 connection_killers: Default::default(),
4413 forbid_connections: Default::default(),
4414 _test_db: test_db,
4415 }
4416 }
4417
4418 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4419 let http = FakeHttpClient::with_404_response();
4420 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4421 let client_name = name.to_string();
4422 let mut client = Client::new(http.clone());
4423 let server = self.server.clone();
4424 let connection_killers = self.connection_killers.clone();
4425 let forbid_connections = self.forbid_connections.clone();
4426 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4427
4428 Arc::get_mut(&mut client)
4429 .unwrap()
4430 .override_authenticate(move |cx| {
4431 cx.spawn(|_| async move {
4432 let access_token = "the-token".to_string();
4433 Ok(Credentials {
4434 user_id: user_id.0 as u64,
4435 access_token,
4436 })
4437 })
4438 })
4439 .override_establish_connection(move |credentials, cx| {
4440 assert_eq!(credentials.user_id, user_id.0 as u64);
4441 assert_eq!(credentials.access_token, "the-token");
4442
4443 let server = server.clone();
4444 let connection_killers = connection_killers.clone();
4445 let forbid_connections = forbid_connections.clone();
4446 let client_name = client_name.clone();
4447 let connection_id_tx = connection_id_tx.clone();
4448 cx.spawn(move |cx| async move {
4449 if forbid_connections.load(SeqCst) {
4450 Err(EstablishConnectionError::other(anyhow!(
4451 "server is forbidding connections"
4452 )))
4453 } else {
4454 let (client_conn, server_conn, kill_conn) =
4455 Connection::in_memory(cx.background());
4456 connection_killers.lock().insert(user_id, kill_conn);
4457 cx.background()
4458 .spawn(server.handle_connection(
4459 server_conn,
4460 client_name,
4461 user_id,
4462 Some(connection_id_tx),
4463 cx.background(),
4464 ))
4465 .detach();
4466 Ok(client_conn)
4467 }
4468 })
4469 });
4470
4471 client
4472 .authenticate_and_connect(&cx.to_async())
4473 .await
4474 .unwrap();
4475
4476 Channel::init(&client);
4477 Project::init(&client);
4478
4479 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4480 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4481 let mut authed_user =
4482 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4483 while authed_user.next().await.unwrap().is_none() {}
4484
4485 TestClient {
4486 client,
4487 peer_id,
4488 user_store,
4489 project: Default::default(),
4490 buffers: Default::default(),
4491 }
4492 }
4493
4494 fn disconnect_client(&self, user_id: UserId) {
4495 self.connection_killers.lock().remove(&user_id);
4496 }
4497
4498 fn forbid_connections(&self) {
4499 self.forbid_connections.store(true, SeqCst);
4500 }
4501
4502 fn allow_connections(&self) {
4503 self.forbid_connections.store(false, SeqCst);
4504 }
4505
4506 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4507 let mut config = Config::default();
4508 config.session_secret = "a".repeat(32);
4509 config.database_url = test_db.url.clone();
4510 let github_client = github::AppClient::test();
4511 Arc::new(AppState {
4512 db: test_db.db().clone(),
4513 handlebars: Default::default(),
4514 auth_client: auth::build_client("", ""),
4515 repo_client: github::RepoClient::test(&github_client),
4516 github_client,
4517 config,
4518 })
4519 }
4520
4521 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4522 self.server.store.read()
4523 }
4524
4525 async fn condition<F>(&mut self, mut predicate: F)
4526 where
4527 F: FnMut(&Store) -> bool,
4528 {
4529 async_std::future::timeout(Duration::from_millis(500), async {
4530 while !(predicate)(&*self.server.store.read()) {
4531 self.foreground.start_waiting();
4532 self.notifications.next().await;
4533 self.foreground.finish_waiting();
4534 }
4535 })
4536 .await
4537 .expect("condition timed out");
4538 }
4539 }
4540
4541 impl Drop for TestServer {
4542 fn drop(&mut self) {
4543 self.peer.reset();
4544 }
4545 }
4546
4547 struct TestClient {
4548 client: Arc<Client>,
4549 pub peer_id: PeerId,
4550 pub user_store: ModelHandle<UserStore>,
4551 project: Option<ModelHandle<Project>>,
4552 buffers: HashSet<ModelHandle<language::Buffer>>,
4553 }
4554
4555 impl Deref for TestClient {
4556 type Target = Arc<Client>;
4557
4558 fn deref(&self) -> &Self::Target {
4559 &self.client
4560 }
4561 }
4562
4563 impl TestClient {
4564 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4565 UserId::from_proto(
4566 self.user_store
4567 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4568 )
4569 }
4570
4571 fn simulate_host(
4572 mut self,
4573 project: ModelHandle<Project>,
4574 mut language_server_config: LanguageServerConfig,
4575 operations: Rc<Cell<usize>>,
4576 max_operations: usize,
4577 rng: Arc<Mutex<StdRng>>,
4578 mut cx: TestAppContext,
4579 ) -> impl Future<Output = (Self, TestAppContext)> {
4580 let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4581
4582 // Set up a fake language server.
4583 language_server_config.set_fake_initializer({
4584 let rng = rng.clone();
4585 let files = files.clone();
4586 let project = project.downgrade();
4587 move |fake_server| {
4588 fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4589 Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4590 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4591 range: lsp::Range::new(
4592 lsp::Position::new(0, 0),
4593 lsp::Position::new(0, 0),
4594 ),
4595 new_text: "the-new-text".to_string(),
4596 })),
4597 ..Default::default()
4598 }]))
4599 });
4600
4601 fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4602 Some(vec![lsp::CodeActionOrCommand::CodeAction(
4603 lsp::CodeAction {
4604 title: "the-code-action".to_string(),
4605 ..Default::default()
4606 },
4607 )])
4608 });
4609
4610 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4611 |params, _| {
4612 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4613 params.position,
4614 params.position,
4615 )))
4616 },
4617 );
4618
4619 fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4620 let files = files.clone();
4621 let rng = rng.clone();
4622 move |_, _| {
4623 let files = files.lock();
4624 let mut rng = rng.lock();
4625 let count = rng.gen_range::<usize, _>(1..3);
4626 let files = (0..count)
4627 .map(|_| files.choose(&mut *rng).unwrap())
4628 .collect::<Vec<_>>();
4629 log::info!("LSP: Returning definitions in files {:?}", &files);
4630 Some(lsp::GotoDefinitionResponse::Array(
4631 files
4632 .into_iter()
4633 .map(|file| lsp::Location {
4634 uri: lsp::Url::from_file_path(file).unwrap(),
4635 range: Default::default(),
4636 })
4637 .collect(),
4638 ))
4639 }
4640 });
4641
4642 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4643 let rng = rng.clone();
4644 let project = project.clone();
4645 move |params, mut cx| {
4646 if let Some(project) = project.upgrade(&cx) {
4647 project.update(&mut cx, |project, cx| {
4648 let path = params
4649 .text_document_position_params
4650 .text_document
4651 .uri
4652 .to_file_path()
4653 .unwrap();
4654 let (worktree, relative_path) =
4655 project.find_local_worktree(&path, cx)?;
4656 let project_path =
4657 ProjectPath::from((worktree.read(cx).id(), relative_path));
4658 let buffer =
4659 project.get_open_buffer(&project_path, cx)?.read(cx);
4660
4661 let mut highlights = Vec::new();
4662 let highlight_count = rng.lock().gen_range(1..=5);
4663 let mut prev_end = 0;
4664 for _ in 0..highlight_count {
4665 let range =
4666 buffer.random_byte_range(prev_end, &mut *rng.lock());
4667 let start = buffer
4668 .offset_to_point_utf16(range.start)
4669 .to_lsp_position();
4670 let end = buffer
4671 .offset_to_point_utf16(range.end)
4672 .to_lsp_position();
4673 highlights.push(lsp::DocumentHighlight {
4674 range: lsp::Range::new(start, end),
4675 kind: Some(lsp::DocumentHighlightKind::READ),
4676 });
4677 prev_end = range.end;
4678 }
4679 Some(highlights)
4680 })
4681 } else {
4682 None
4683 }
4684 }
4685 });
4686 }
4687 });
4688
4689 project.update(&mut cx, |project, _| {
4690 project.languages().add(Arc::new(Language::new(
4691 LanguageConfig {
4692 name: "Rust".into(),
4693 path_suffixes: vec!["rs".to_string()],
4694 language_server: Some(language_server_config),
4695 ..Default::default()
4696 },
4697 None,
4698 )));
4699 });
4700
4701 async move {
4702 let fs = project.read_with(&cx, |project, _| project.fs().clone());
4703 while operations.get() < max_operations {
4704 operations.set(operations.get() + 1);
4705
4706 let distribution = rng.lock().gen_range::<usize, _>(0..100);
4707 match distribution {
4708 0..=20 if !files.lock().is_empty() => {
4709 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4710 let mut path = path.as_path();
4711 while let Some(parent_path) = path.parent() {
4712 path = parent_path;
4713 if rng.lock().gen() {
4714 break;
4715 }
4716 }
4717
4718 log::info!("Host: find/create local worktree {:?}", path);
4719 let find_or_create_worktree = project.update(&mut cx, |project, cx| {
4720 project.find_or_create_local_worktree(path, true, cx)
4721 });
4722 let find_or_create_worktree = async move {
4723 find_or_create_worktree.await.unwrap();
4724 };
4725 if rng.lock().gen() {
4726 cx.background().spawn(find_or_create_worktree).detach();
4727 } else {
4728 find_or_create_worktree.await;
4729 }
4730 }
4731 10..=80 if !files.lock().is_empty() => {
4732 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4733 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4734 let (worktree, path) = project
4735 .update(&mut cx, |project, cx| {
4736 project.find_or_create_local_worktree(
4737 file.clone(),
4738 true,
4739 cx,
4740 )
4741 })
4742 .await
4743 .unwrap();
4744 let project_path =
4745 worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4746 log::info!(
4747 "Host: opening path {:?}, worktree {}, relative_path {:?}",
4748 file,
4749 project_path.0,
4750 project_path.1
4751 );
4752 let buffer = project
4753 .update(&mut cx, |project, cx| {
4754 project.open_buffer(project_path, cx)
4755 })
4756 .await
4757 .unwrap();
4758 self.buffers.insert(buffer.clone());
4759 buffer
4760 } else {
4761 self.buffers
4762 .iter()
4763 .choose(&mut *rng.lock())
4764 .unwrap()
4765 .clone()
4766 };
4767
4768 if rng.lock().gen_bool(0.1) {
4769 cx.update(|cx| {
4770 log::info!(
4771 "Host: dropping buffer {:?}",
4772 buffer.read(cx).file().unwrap().full_path(cx)
4773 );
4774 self.buffers.remove(&buffer);
4775 drop(buffer);
4776 });
4777 } else {
4778 buffer.update(&mut cx, |buffer, cx| {
4779 log::info!(
4780 "Host: updating buffer {:?} ({})",
4781 buffer.file().unwrap().full_path(cx),
4782 buffer.remote_id()
4783 );
4784 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4785 });
4786 }
4787 }
4788 _ => loop {
4789 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4790 let mut path = PathBuf::new();
4791 path.push("/");
4792 for _ in 0..path_component_count {
4793 let letter = rng.lock().gen_range(b'a'..=b'z');
4794 path.push(std::str::from_utf8(&[letter]).unwrap());
4795 }
4796 path.set_extension("rs");
4797 let parent_path = path.parent().unwrap();
4798
4799 log::info!("Host: creating file {:?}", path,);
4800
4801 if fs.create_dir(&parent_path).await.is_ok()
4802 && fs.create_file(&path, Default::default()).await.is_ok()
4803 {
4804 files.lock().push(path);
4805 break;
4806 } else {
4807 log::info!("Host: cannot create file");
4808 }
4809 },
4810 }
4811
4812 cx.background().simulate_random_delay().await;
4813 }
4814
4815 log::info!("Host done");
4816
4817 self.project = Some(project);
4818 (self, cx)
4819 }
4820 }
4821
4822 pub async fn simulate_guest(
4823 mut self,
4824 guest_id: usize,
4825 project: ModelHandle<Project>,
4826 operations: Rc<Cell<usize>>,
4827 max_operations: usize,
4828 rng: Arc<Mutex<StdRng>>,
4829 mut cx: TestAppContext,
4830 ) -> (Self, TestAppContext) {
4831 while operations.get() < max_operations {
4832 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4833 let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4834 project
4835 .worktrees(&cx)
4836 .filter(|worktree| {
4837 let worktree = worktree.read(cx);
4838 worktree.is_visible()
4839 && worktree.entries(false).any(|e| e.is_file())
4840 })
4841 .choose(&mut *rng.lock())
4842 }) {
4843 worktree
4844 } else {
4845 cx.background().simulate_random_delay().await;
4846 continue;
4847 };
4848
4849 operations.set(operations.get() + 1);
4850 let (worktree_root_name, project_path) =
4851 worktree.read_with(&cx, |worktree, _| {
4852 let entry = worktree
4853 .entries(false)
4854 .filter(|e| e.is_file())
4855 .choose(&mut *rng.lock())
4856 .unwrap();
4857 (
4858 worktree.root_name().to_string(),
4859 (worktree.id(), entry.path.clone()),
4860 )
4861 });
4862 log::info!(
4863 "Guest {}: opening path {:?} in worktree {} ({})",
4864 guest_id,
4865 project_path.1,
4866 project_path.0,
4867 worktree_root_name,
4868 );
4869 let buffer = project
4870 .update(&mut cx, |project, cx| {
4871 project.open_buffer(project_path.clone(), cx)
4872 })
4873 .await
4874 .unwrap();
4875 log::info!(
4876 "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
4877 guest_id,
4878 project_path.1,
4879 project_path.0,
4880 worktree_root_name,
4881 buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4882 );
4883 self.buffers.insert(buffer.clone());
4884 buffer
4885 } else {
4886 operations.set(operations.get() + 1);
4887
4888 self.buffers
4889 .iter()
4890 .choose(&mut *rng.lock())
4891 .unwrap()
4892 .clone()
4893 };
4894
4895 let choice = rng.lock().gen_range(0..100);
4896 match choice {
4897 0..=9 => {
4898 cx.update(|cx| {
4899 log::info!(
4900 "Guest {}: dropping buffer {:?}",
4901 guest_id,
4902 buffer.read(cx).file().unwrap().full_path(cx)
4903 );
4904 self.buffers.remove(&buffer);
4905 drop(buffer);
4906 });
4907 }
4908 10..=19 => {
4909 let completions = project.update(&mut cx, |project, cx| {
4910 log::info!(
4911 "Guest {}: requesting completions for buffer {} ({:?})",
4912 guest_id,
4913 buffer.read(cx).remote_id(),
4914 buffer.read(cx).file().unwrap().full_path(cx)
4915 );
4916 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4917 project.completions(&buffer, offset, cx)
4918 });
4919 let completions = cx.background().spawn(async move {
4920 completions.await.expect("completions request failed");
4921 });
4922 if rng.lock().gen_bool(0.3) {
4923 log::info!("Guest {}: detaching completions request", guest_id);
4924 completions.detach();
4925 } else {
4926 completions.await;
4927 }
4928 }
4929 20..=29 => {
4930 let code_actions = project.update(&mut cx, |project, cx| {
4931 log::info!(
4932 "Guest {}: requesting code actions for buffer {} ({:?})",
4933 guest_id,
4934 buffer.read(cx).remote_id(),
4935 buffer.read(cx).file().unwrap().full_path(cx)
4936 );
4937 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4938 project.code_actions(&buffer, range, cx)
4939 });
4940 let code_actions = cx.background().spawn(async move {
4941 code_actions.await.expect("code actions request failed");
4942 });
4943 if rng.lock().gen_bool(0.3) {
4944 log::info!("Guest {}: detaching code actions request", guest_id);
4945 code_actions.detach();
4946 } else {
4947 code_actions.await;
4948 }
4949 }
4950 30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4951 let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4952 log::info!(
4953 "Guest {}: saving buffer {} ({:?})",
4954 guest_id,
4955 buffer.remote_id(),
4956 buffer.file().unwrap().full_path(cx)
4957 );
4958 (buffer.version(), buffer.save(cx))
4959 });
4960 let save = cx.background().spawn(async move {
4961 let (saved_version, _) = save.await.expect("save request failed");
4962 assert!(saved_version.observed_all(&requested_version));
4963 });
4964 if rng.lock().gen_bool(0.3) {
4965 log::info!("Guest {}: detaching save request", guest_id);
4966 save.detach();
4967 } else {
4968 save.await;
4969 }
4970 }
4971 40..=44 => {
4972 let prepare_rename = project.update(&mut cx, |project, cx| {
4973 log::info!(
4974 "Guest {}: preparing rename for buffer {} ({:?})",
4975 guest_id,
4976 buffer.read(cx).remote_id(),
4977 buffer.read(cx).file().unwrap().full_path(cx)
4978 );
4979 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4980 project.prepare_rename(buffer, offset, cx)
4981 });
4982 let prepare_rename = cx.background().spawn(async move {
4983 prepare_rename.await.expect("prepare rename request failed");
4984 });
4985 if rng.lock().gen_bool(0.3) {
4986 log::info!("Guest {}: detaching prepare rename request", guest_id);
4987 prepare_rename.detach();
4988 } else {
4989 prepare_rename.await;
4990 }
4991 }
4992 45..=49 => {
4993 let definitions = project.update(&mut cx, |project, cx| {
4994 log::info!(
4995 "Guest {}: requesting definitions for buffer {} ({:?})",
4996 guest_id,
4997 buffer.read(cx).remote_id(),
4998 buffer.read(cx).file().unwrap().full_path(cx)
4999 );
5000 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5001 project.definition(&buffer, offset, cx)
5002 });
5003 let definitions = cx.background().spawn(async move {
5004 definitions.await.expect("definitions request failed")
5005 });
5006 if rng.lock().gen_bool(0.3) {
5007 log::info!("Guest {}: detaching definitions request", guest_id);
5008 definitions.detach();
5009 } else {
5010 self.buffers
5011 .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5012 }
5013 }
5014 50..=54 => {
5015 let highlights = project.update(&mut cx, |project, cx| {
5016 log::info!(
5017 "Guest {}: requesting highlights for buffer {} ({:?})",
5018 guest_id,
5019 buffer.read(cx).remote_id(),
5020 buffer.read(cx).file().unwrap().full_path(cx)
5021 );
5022 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5023 project.document_highlights(&buffer, offset, cx)
5024 });
5025 let highlights = cx.background().spawn(async move {
5026 highlights.await.expect("highlights request failed");
5027 });
5028 if rng.lock().gen_bool(0.3) {
5029 log::info!("Guest {}: detaching highlights request", guest_id);
5030 highlights.detach();
5031 } else {
5032 highlights.await;
5033 }
5034 }
5035 55..=59 => {
5036 let search = project.update(&mut cx, |project, cx| {
5037 let query = rng.lock().gen_range('a'..='z');
5038 log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5039 project.search(SearchQuery::text(query, false, false), cx)
5040 });
5041 let search = cx
5042 .background()
5043 .spawn(async move { search.await.expect("search request failed") });
5044 if rng.lock().gen_bool(0.3) {
5045 log::info!("Guest {}: detaching search request", guest_id);
5046 search.detach();
5047 } else {
5048 self.buffers.extend(search.await.into_keys());
5049 }
5050 }
5051 _ => {
5052 buffer.update(&mut cx, |buffer, cx| {
5053 log::info!(
5054 "Guest {}: updating buffer {} ({:?})",
5055 guest_id,
5056 buffer.remote_id(),
5057 buffer.file().unwrap().full_path(cx)
5058 );
5059 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5060 });
5061 }
5062 }
5063 cx.background().simulate_random_delay().await;
5064 }
5065
5066 log::info!("Guest {} done", guest_id);
5067
5068 self.project = Some(project);
5069 (self, cx)
5070 }
5071 }
5072
5073 impl Drop for TestClient {
5074 fn drop(&mut self) {
5075 self.client.tear_down();
5076 }
5077 }
5078
5079 impl Executor for Arc<gpui::executor::Background> {
5080 type Timer = gpui::executor::Timer;
5081
5082 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5083 self.spawn(future).detach();
5084 }
5085
5086 fn timer(&self, duration: Duration) -> Self::Timer {
5087 self.as_ref().timer(duration)
5088 }
5089 }
5090
5091 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5092 channel
5093 .messages()
5094 .cursor::<()>()
5095 .map(|m| {
5096 (
5097 m.sender.github_login.clone(),
5098 m.body.clone(),
5099 m.is_pending(),
5100 )
5101 })
5102 .collect()
5103 }
5104
5105 struct EmptyView;
5106
5107 impl gpui::Entity for EmptyView {
5108 type Event = ();
5109 }
5110
5111 impl gpui::View for EmptyView {
5112 fn ui_name() -> &'static str {
5113 "empty view"
5114 }
5115
5116 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5117 gpui::Element::boxed(gpui::elements::Empty)
5118 }
5119 }
5120}