1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_io::Timer;
10use async_std::task;
11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
12use collections::{HashMap, HashSet};
13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{
21 any::TypeId,
22 future::Future,
23 sync::Arc,
24 time::{Duration, Instant},
25};
26use store::{Store, Worktree};
27use surf::StatusCode;
28use tide::log;
29use tide::{
30 http::headers::{HeaderName, CONNECTION, UPGRADE},
31 Request, Response,
32};
33use time::OffsetDateTime;
34
35type MessageHandler = Box<
36 dyn Send
37 + Sync
38 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
39>;
40
41pub struct Server {
42 peer: Arc<Peer>,
43 store: RwLock<Store>,
44 app_state: Arc<AppState>,
45 handlers: HashMap<TypeId, MessageHandler>,
46 notifications: Option<mpsc::UnboundedSender<()>>,
47}
48
49pub trait Executor: Send + Clone {
50 type Timer: Send + Future;
51 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
52 fn timer(&self, duration: Duration) -> Self::Timer;
53}
54
55#[derive(Clone)]
56pub struct RealExecutor;
57
58const MESSAGE_COUNT_PER_PAGE: usize = 100;
59const MAX_MESSAGE_LEN: usize = 1024;
60
61impl Server {
62 pub fn new(
63 app_state: Arc<AppState>,
64 peer: Arc<Peer>,
65 notifications: Option<mpsc::UnboundedSender<()>>,
66 ) -> Arc<Self> {
67 let mut server = Self {
68 peer,
69 app_state,
70 store: Default::default(),
71 handlers: Default::default(),
72 notifications,
73 };
74
75 server
76 .add_request_handler(Server::ping)
77 .add_request_handler(Server::register_project)
78 .add_message_handler(Server::unregister_project)
79 .add_request_handler(Server::share_project)
80 .add_message_handler(Server::unshare_project)
81 .add_request_handler(Server::join_project)
82 .add_message_handler(Server::leave_project)
83 .add_request_handler(Server::register_worktree)
84 .add_message_handler(Server::unregister_worktree)
85 .add_request_handler(Server::update_worktree)
86 .add_message_handler(Server::update_diagnostic_summary)
87 .add_message_handler(Server::lsp_event)
88 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
89 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
90 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
91 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
92 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
93 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
94 .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
95 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
96 .add_request_handler(
97 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
98 )
99 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
100 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
101 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
102 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
103 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
104 .add_request_handler(Server::update_buffer)
105 .add_message_handler(Server::update_buffer_file)
106 .add_message_handler(Server::buffer_reloaded)
107 .add_message_handler(Server::buffer_saved)
108 .add_request_handler(Server::save_buffer)
109 .add_request_handler(Server::get_channels)
110 .add_request_handler(Server::get_users)
111 .add_request_handler(Server::join_channel)
112 .add_message_handler(Server::leave_channel)
113 .add_request_handler(Server::send_channel_message)
114 .add_request_handler(Server::get_channel_messages);
115
116 Arc::new(server)
117 }
118
119 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
120 where
121 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
122 Fut: 'static + Send + Future<Output = tide::Result<()>>,
123 M: EnvelopedMessage,
124 {
125 let prev_handler = self.handlers.insert(
126 TypeId::of::<M>(),
127 Box::new(move |server, envelope| {
128 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
129 (handler)(server, *envelope).boxed()
130 }),
131 );
132 if prev_handler.is_some() {
133 panic!("registered a handler for the same message twice");
134 }
135 self
136 }
137
138 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
139 where
140 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
141 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
142 M: RequestMessage,
143 {
144 self.add_message_handler(move |server, envelope| {
145 let receipt = envelope.receipt();
146 let response = (handler)(server.clone(), envelope);
147 async move {
148 match response.await {
149 Ok(response) => {
150 server.peer.respond(receipt, response)?;
151 Ok(())
152 }
153 Err(error) => {
154 server.peer.respond_with_error(
155 receipt,
156 proto::Error {
157 message: error.to_string(),
158 },
159 )?;
160 Err(error)
161 }
162 }
163 }
164 })
165 }
166
167 pub fn handle_connection<E: Executor>(
168 self: &Arc<Self>,
169 connection: Connection,
170 addr: String,
171 user_id: UserId,
172 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
173 executor: E,
174 ) -> impl Future<Output = ()> {
175 let mut this = self.clone();
176 async move {
177 let (connection_id, handle_io, mut incoming_rx) = this
178 .peer
179 .add_connection(connection, {
180 let executor = executor.clone();
181 move |duration| {
182 let timer = executor.timer(duration);
183 async move {
184 timer.await;
185 }
186 }
187 })
188 .await;
189
190 if let Some(send_connection_id) = send_connection_id.as_mut() {
191 let _ = send_connection_id.send(connection_id).await;
192 }
193
194 this.state_mut().add_connection(connection_id, user_id);
195 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
196 log::error!("error updating contacts for {:?}: {}", user_id, err);
197 }
198
199 let handle_io = handle_io.fuse();
200 futures::pin_mut!(handle_io);
201 loop {
202 let next_message = incoming_rx.next().fuse();
203 futures::pin_mut!(next_message);
204 futures::select_biased! {
205 result = handle_io => {
206 if let Err(err) = result {
207 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
208 }
209 break;
210 }
211 message = next_message => {
212 if let Some(message) = message {
213 let start_time = Instant::now();
214 let type_name = message.payload_type_name();
215 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
216 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
217 let notifications = this.notifications.clone();
218 let is_background = message.is_background();
219 let handle_message = (handler)(this.clone(), message);
220 let handle_message = async move {
221 if let Err(err) = handle_message.await {
222 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
223 } else {
224 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
225 }
226 if let Some(mut notifications) = notifications {
227 let _ = notifications.send(()).await;
228 }
229 };
230 if is_background {
231 executor.spawn_detached(handle_message);
232 } else {
233 handle_message.await;
234 }
235 } else {
236 log::warn!("unhandled message: {}", type_name);
237 }
238 } else {
239 log::info!("rpc connection closed {:?}", addr);
240 break;
241 }
242 }
243 }
244 }
245
246 if let Err(err) = this.sign_out(connection_id).await {
247 log::error!("error signing out connection {:?} - {:?}", addr, err);
248 }
249 }
250 }
251
252 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
253 self.peer.disconnect(connection_id);
254 let removed_connection = self.state_mut().remove_connection(connection_id)?;
255
256 for (project_id, project) in removed_connection.hosted_projects {
257 if let Some(share) = project.share {
258 broadcast(
259 connection_id,
260 share.guests.keys().copied().collect(),
261 |conn_id| {
262 self.peer
263 .send(conn_id, proto::UnshareProject { project_id })
264 },
265 )?;
266 }
267 }
268
269 for (project_id, peer_ids) in removed_connection.guest_project_ids {
270 broadcast(connection_id, peer_ids, |conn_id| {
271 self.peer.send(
272 conn_id,
273 proto::RemoveProjectCollaborator {
274 project_id,
275 peer_id: connection_id.0,
276 },
277 )
278 })?;
279 }
280
281 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
282 Ok(())
283 }
284
285 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
286 Ok(proto::Ack {})
287 }
288
289 async fn register_project(
290 mut self: Arc<Server>,
291 request: TypedEnvelope<proto::RegisterProject>,
292 ) -> tide::Result<proto::RegisterProjectResponse> {
293 let project_id = {
294 let mut state = self.state_mut();
295 let user_id = state.user_id_for_connection(request.sender_id)?;
296 state.register_project(request.sender_id, user_id)
297 };
298 Ok(proto::RegisterProjectResponse { project_id })
299 }
300
301 async fn unregister_project(
302 mut self: Arc<Server>,
303 request: TypedEnvelope<proto::UnregisterProject>,
304 ) -> tide::Result<()> {
305 let project = self
306 .state_mut()
307 .unregister_project(request.payload.project_id, request.sender_id)?;
308 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
309 Ok(())
310 }
311
312 async fn share_project(
313 mut self: Arc<Server>,
314 request: TypedEnvelope<proto::ShareProject>,
315 ) -> tide::Result<proto::Ack> {
316 self.state_mut()
317 .share_project(request.payload.project_id, request.sender_id);
318 Ok(proto::Ack {})
319 }
320
321 async fn unshare_project(
322 mut self: Arc<Server>,
323 request: TypedEnvelope<proto::UnshareProject>,
324 ) -> tide::Result<()> {
325 let project_id = request.payload.project_id;
326 let project = self
327 .state_mut()
328 .unshare_project(project_id, request.sender_id)?;
329
330 broadcast(request.sender_id, project.connection_ids, |conn_id| {
331 self.peer
332 .send(conn_id, proto::UnshareProject { project_id })
333 })?;
334 self.update_contacts_for_users(&project.authorized_user_ids)?;
335 Ok(())
336 }
337
338 async fn join_project(
339 mut self: Arc<Server>,
340 request: TypedEnvelope<proto::JoinProject>,
341 ) -> tide::Result<proto::JoinProjectResponse> {
342 let project_id = request.payload.project_id;
343
344 let user_id = self.state().user_id_for_connection(request.sender_id)?;
345 let (response, connection_ids, contact_user_ids) = self
346 .state_mut()
347 .join_project(request.sender_id, user_id, project_id)
348 .and_then(|joined| {
349 let share = joined.project.share()?;
350 let peer_count = share.guests.len();
351 let mut collaborators = Vec::with_capacity(peer_count);
352 collaborators.push(proto::Collaborator {
353 peer_id: joined.project.host_connection_id.0,
354 replica_id: 0,
355 user_id: joined.project.host_user_id.to_proto(),
356 });
357 let worktrees = share
358 .worktrees
359 .iter()
360 .filter_map(|(id, shared_worktree)| {
361 let worktree = joined.project.worktrees.get(&id)?;
362 Some(proto::Worktree {
363 id: *id,
364 root_name: worktree.root_name.clone(),
365 entries: shared_worktree.entries.values().cloned().collect(),
366 diagnostic_summaries: shared_worktree
367 .diagnostic_summaries
368 .values()
369 .cloned()
370 .collect(),
371 visible: worktree.visible,
372 })
373 })
374 .collect();
375 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
376 if *peer_conn_id != request.sender_id {
377 collaborators.push(proto::Collaborator {
378 peer_id: peer_conn_id.0,
379 replica_id: *peer_replica_id as u32,
380 user_id: peer_user_id.to_proto(),
381 });
382 }
383 }
384 let response = proto::JoinProjectResponse {
385 worktrees,
386 replica_id: joined.replica_id as u32,
387 collaborators,
388 };
389 let connection_ids = joined.project.connection_ids();
390 let contact_user_ids = joined.project.authorized_user_ids();
391 Ok((response, connection_ids, contact_user_ids))
392 })?;
393
394 broadcast(request.sender_id, connection_ids, |conn_id| {
395 self.peer.send(
396 conn_id,
397 proto::AddProjectCollaborator {
398 project_id,
399 collaborator: Some(proto::Collaborator {
400 peer_id: request.sender_id.0,
401 replica_id: response.replica_id,
402 user_id: user_id.to_proto(),
403 }),
404 },
405 )
406 })?;
407 self.update_contacts_for_users(&contact_user_ids)?;
408 Ok(response)
409 }
410
411 async fn leave_project(
412 mut self: Arc<Server>,
413 request: TypedEnvelope<proto::LeaveProject>,
414 ) -> tide::Result<()> {
415 let sender_id = request.sender_id;
416 let project_id = request.payload.project_id;
417 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
418
419 broadcast(sender_id, worktree.connection_ids, |conn_id| {
420 self.peer.send(
421 conn_id,
422 proto::RemoveProjectCollaborator {
423 project_id,
424 peer_id: sender_id.0,
425 },
426 )
427 })?;
428 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
429
430 Ok(())
431 }
432
433 async fn register_worktree(
434 mut self: Arc<Server>,
435 request: TypedEnvelope<proto::RegisterWorktree>,
436 ) -> tide::Result<proto::Ack> {
437 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
438
439 let mut contact_user_ids = HashSet::default();
440 contact_user_ids.insert(host_user_id);
441 for github_login in &request.payload.authorized_logins {
442 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
443 contact_user_ids.insert(contact_user_id);
444 }
445
446 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
447 let guest_connection_ids;
448 {
449 let mut state = self.state_mut();
450 guest_connection_ids = state
451 .read_project(request.payload.project_id, request.sender_id)?
452 .guest_connection_ids();
453 state.register_worktree(
454 request.payload.project_id,
455 request.payload.worktree_id,
456 request.sender_id,
457 Worktree {
458 authorized_user_ids: contact_user_ids.clone(),
459 root_name: request.payload.root_name.clone(),
460 visible: request.payload.visible,
461 },
462 )?;
463 }
464 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
465 self.peer
466 .forward_send(request.sender_id, connection_id, request.payload.clone())
467 })?;
468 self.update_contacts_for_users(&contact_user_ids)?;
469 Ok(proto::Ack {})
470 }
471
472 async fn unregister_worktree(
473 mut self: Arc<Server>,
474 request: TypedEnvelope<proto::UnregisterWorktree>,
475 ) -> tide::Result<()> {
476 let project_id = request.payload.project_id;
477 let worktree_id = request.payload.worktree_id;
478 let (worktree, guest_connection_ids) =
479 self.state_mut()
480 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
481 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
482 self.peer.send(
483 conn_id,
484 proto::UnregisterWorktree {
485 project_id,
486 worktree_id,
487 },
488 )
489 })?;
490 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
491 Ok(())
492 }
493
494 async fn update_worktree(
495 mut self: Arc<Server>,
496 request: TypedEnvelope<proto::UpdateWorktree>,
497 ) -> tide::Result<proto::Ack> {
498 let connection_ids = self.state_mut().update_worktree(
499 request.sender_id,
500 request.payload.project_id,
501 request.payload.worktree_id,
502 &request.payload.removed_entries,
503 &request.payload.updated_entries,
504 )?;
505
506 broadcast(request.sender_id, connection_ids, |connection_id| {
507 self.peer
508 .forward_send(request.sender_id, connection_id, request.payload.clone())
509 })?;
510
511 Ok(proto::Ack {})
512 }
513
514 async fn update_diagnostic_summary(
515 mut self: Arc<Server>,
516 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
517 ) -> tide::Result<()> {
518 let summary = request
519 .payload
520 .summary
521 .clone()
522 .ok_or_else(|| anyhow!("invalid summary"))?;
523 let receiver_ids = self.state_mut().update_diagnostic_summary(
524 request.payload.project_id,
525 request.payload.worktree_id,
526 request.sender_id,
527 summary,
528 )?;
529
530 broadcast(request.sender_id, receiver_ids, |connection_id| {
531 self.peer
532 .forward_send(request.sender_id, connection_id, request.payload.clone())
533 })?;
534 Ok(())
535 }
536
537 async fn lsp_event(
538 self: Arc<Server>,
539 request: TypedEnvelope<proto::LspEvent>,
540 ) -> tide::Result<()> {
541 let receiver_ids = self
542 .state()
543 .project_connection_ids(request.payload.project_id, request.sender_id)?;
544 broadcast(request.sender_id, receiver_ids, |connection_id| {
545 self.peer
546 .forward_send(request.sender_id, connection_id, request.payload.clone())
547 })?;
548 Ok(())
549 }
550
551 async fn forward_project_request<T>(
552 self: Arc<Server>,
553 request: TypedEnvelope<T>,
554 ) -> tide::Result<T::Response>
555 where
556 T: EntityMessage + RequestMessage,
557 {
558 let host_connection_id = self
559 .state()
560 .read_project(request.payload.remote_entity_id(), request.sender_id)?
561 .host_connection_id;
562 Ok(self
563 .peer
564 .forward_request(request.sender_id, host_connection_id, request.payload)
565 .await?)
566 }
567
568 async fn save_buffer(
569 self: Arc<Server>,
570 request: TypedEnvelope<proto::SaveBuffer>,
571 ) -> tide::Result<proto::BufferSaved> {
572 let host;
573 let mut guests;
574 {
575 let state = self.state();
576 let project = state.read_project(request.payload.project_id, request.sender_id)?;
577 host = project.host_connection_id;
578 guests = project.guest_connection_ids()
579 }
580
581 let response = self
582 .peer
583 .forward_request(request.sender_id, host, request.payload.clone())
584 .await?;
585
586 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
587 broadcast(host, guests, |conn_id| {
588 self.peer.forward_send(host, conn_id, response.clone())
589 })?;
590
591 Ok(response)
592 }
593
594 async fn update_buffer(
595 self: Arc<Server>,
596 request: TypedEnvelope<proto::UpdateBuffer>,
597 ) -> tide::Result<proto::Ack> {
598 let receiver_ids = self
599 .state()
600 .project_connection_ids(request.payload.project_id, request.sender_id)?;
601 broadcast(request.sender_id, receiver_ids, |connection_id| {
602 self.peer
603 .forward_send(request.sender_id, connection_id, request.payload.clone())
604 })?;
605 Ok(proto::Ack {})
606 }
607
608 async fn update_buffer_file(
609 self: Arc<Server>,
610 request: TypedEnvelope<proto::UpdateBufferFile>,
611 ) -> tide::Result<()> {
612 let receiver_ids = self
613 .state()
614 .project_connection_ids(request.payload.project_id, request.sender_id)?;
615 broadcast(request.sender_id, receiver_ids, |connection_id| {
616 self.peer
617 .forward_send(request.sender_id, connection_id, request.payload.clone())
618 })?;
619 Ok(())
620 }
621
622 async fn buffer_reloaded(
623 self: Arc<Server>,
624 request: TypedEnvelope<proto::BufferReloaded>,
625 ) -> tide::Result<()> {
626 let receiver_ids = self
627 .state()
628 .project_connection_ids(request.payload.project_id, request.sender_id)?;
629 broadcast(request.sender_id, receiver_ids, |connection_id| {
630 self.peer
631 .forward_send(request.sender_id, connection_id, request.payload.clone())
632 })?;
633 Ok(())
634 }
635
636 async fn buffer_saved(
637 self: Arc<Server>,
638 request: TypedEnvelope<proto::BufferSaved>,
639 ) -> tide::Result<()> {
640 let receiver_ids = self
641 .state()
642 .project_connection_ids(request.payload.project_id, request.sender_id)?;
643 broadcast(request.sender_id, receiver_ids, |connection_id| {
644 self.peer
645 .forward_send(request.sender_id, connection_id, request.payload.clone())
646 })?;
647 Ok(())
648 }
649
650 async fn get_channels(
651 self: Arc<Server>,
652 request: TypedEnvelope<proto::GetChannels>,
653 ) -> tide::Result<proto::GetChannelsResponse> {
654 let user_id = self.state().user_id_for_connection(request.sender_id)?;
655 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
656 Ok(proto::GetChannelsResponse {
657 channels: channels
658 .into_iter()
659 .map(|chan| proto::Channel {
660 id: chan.id.to_proto(),
661 name: chan.name,
662 })
663 .collect(),
664 })
665 }
666
667 async fn get_users(
668 self: Arc<Server>,
669 request: TypedEnvelope<proto::GetUsers>,
670 ) -> tide::Result<proto::GetUsersResponse> {
671 let user_ids = request
672 .payload
673 .user_ids
674 .into_iter()
675 .map(UserId::from_proto)
676 .collect();
677 let users = self
678 .app_state
679 .db
680 .get_users_by_ids(user_ids)
681 .await?
682 .into_iter()
683 .map(|user| proto::User {
684 id: user.id.to_proto(),
685 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
686 github_login: user.github_login,
687 })
688 .collect();
689 Ok(proto::GetUsersResponse { users })
690 }
691
692 fn update_contacts_for_users<'a>(
693 self: &Arc<Server>,
694 user_ids: impl IntoIterator<Item = &'a UserId>,
695 ) -> anyhow::Result<()> {
696 let mut result = Ok(());
697 let state = self.state();
698 for user_id in user_ids {
699 let contacts = state.contacts_for_user(*user_id);
700 for connection_id in state.connection_ids_for_user(*user_id) {
701 if let Err(error) = self.peer.send(
702 connection_id,
703 proto::UpdateContacts {
704 contacts: contacts.clone(),
705 },
706 ) {
707 result = Err(error);
708 }
709 }
710 }
711 result
712 }
713
714 async fn join_channel(
715 mut self: Arc<Self>,
716 request: TypedEnvelope<proto::JoinChannel>,
717 ) -> tide::Result<proto::JoinChannelResponse> {
718 let user_id = self.state().user_id_for_connection(request.sender_id)?;
719 let channel_id = ChannelId::from_proto(request.payload.channel_id);
720 if !self
721 .app_state
722 .db
723 .can_user_access_channel(user_id, channel_id)
724 .await?
725 {
726 Err(anyhow!("access denied"))?;
727 }
728
729 self.state_mut().join_channel(request.sender_id, channel_id);
730 let messages = self
731 .app_state
732 .db
733 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
734 .await?
735 .into_iter()
736 .map(|msg| proto::ChannelMessage {
737 id: msg.id.to_proto(),
738 body: msg.body,
739 timestamp: msg.sent_at.unix_timestamp() as u64,
740 sender_id: msg.sender_id.to_proto(),
741 nonce: Some(msg.nonce.as_u128().into()),
742 })
743 .collect::<Vec<_>>();
744 Ok(proto::JoinChannelResponse {
745 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
746 messages,
747 })
748 }
749
750 async fn leave_channel(
751 mut self: Arc<Self>,
752 request: TypedEnvelope<proto::LeaveChannel>,
753 ) -> tide::Result<()> {
754 let user_id = self.state().user_id_for_connection(request.sender_id)?;
755 let channel_id = ChannelId::from_proto(request.payload.channel_id);
756 if !self
757 .app_state
758 .db
759 .can_user_access_channel(user_id, channel_id)
760 .await?
761 {
762 Err(anyhow!("access denied"))?;
763 }
764
765 self.state_mut()
766 .leave_channel(request.sender_id, channel_id);
767
768 Ok(())
769 }
770
771 async fn send_channel_message(
772 self: Arc<Self>,
773 request: TypedEnvelope<proto::SendChannelMessage>,
774 ) -> tide::Result<proto::SendChannelMessageResponse> {
775 let channel_id = ChannelId::from_proto(request.payload.channel_id);
776 let user_id;
777 let connection_ids;
778 {
779 let state = self.state();
780 user_id = state.user_id_for_connection(request.sender_id)?;
781 connection_ids = state.channel_connection_ids(channel_id)?;
782 }
783
784 // Validate the message body.
785 let body = request.payload.body.trim().to_string();
786 if body.len() > MAX_MESSAGE_LEN {
787 return Err(anyhow!("message is too long"))?;
788 }
789 if body.is_empty() {
790 return Err(anyhow!("message can't be blank"))?;
791 }
792
793 let timestamp = OffsetDateTime::now_utc();
794 let nonce = request
795 .payload
796 .nonce
797 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
798
799 let message_id = self
800 .app_state
801 .db
802 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
803 .await?
804 .to_proto();
805 let message = proto::ChannelMessage {
806 sender_id: user_id.to_proto(),
807 id: message_id,
808 body,
809 timestamp: timestamp.unix_timestamp() as u64,
810 nonce: Some(nonce),
811 };
812 broadcast(request.sender_id, connection_ids, |conn_id| {
813 self.peer.send(
814 conn_id,
815 proto::ChannelMessageSent {
816 channel_id: channel_id.to_proto(),
817 message: Some(message.clone()),
818 },
819 )
820 })?;
821 Ok(proto::SendChannelMessageResponse {
822 message: Some(message),
823 })
824 }
825
826 async fn get_channel_messages(
827 self: Arc<Self>,
828 request: TypedEnvelope<proto::GetChannelMessages>,
829 ) -> tide::Result<proto::GetChannelMessagesResponse> {
830 let user_id = self.state().user_id_for_connection(request.sender_id)?;
831 let channel_id = ChannelId::from_proto(request.payload.channel_id);
832 if !self
833 .app_state
834 .db
835 .can_user_access_channel(user_id, channel_id)
836 .await?
837 {
838 Err(anyhow!("access denied"))?;
839 }
840
841 let messages = self
842 .app_state
843 .db
844 .get_channel_messages(
845 channel_id,
846 MESSAGE_COUNT_PER_PAGE,
847 Some(MessageId::from_proto(request.payload.before_message_id)),
848 )
849 .await?
850 .into_iter()
851 .map(|msg| proto::ChannelMessage {
852 id: msg.id.to_proto(),
853 body: msg.body,
854 timestamp: msg.sent_at.unix_timestamp() as u64,
855 sender_id: msg.sender_id.to_proto(),
856 nonce: Some(msg.nonce.as_u128().into()),
857 })
858 .collect::<Vec<_>>();
859
860 Ok(proto::GetChannelMessagesResponse {
861 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
862 messages,
863 })
864 }
865
866 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
867 self.store.read()
868 }
869
870 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
871 self.store.write()
872 }
873}
874
875impl Executor for RealExecutor {
876 type Timer = Timer;
877
878 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
879 task::spawn(future);
880 }
881
882 fn timer(&self, duration: Duration) -> Self::Timer {
883 Timer::after(duration)
884 }
885}
886
887fn broadcast<F>(
888 sender_id: ConnectionId,
889 receiver_ids: Vec<ConnectionId>,
890 mut f: F,
891) -> anyhow::Result<()>
892where
893 F: FnMut(ConnectionId) -> anyhow::Result<()>,
894{
895 let mut result = Ok(());
896 for receiver_id in receiver_ids {
897 if receiver_id != sender_id {
898 if let Err(error) = f(receiver_id) {
899 if result.is_ok() {
900 result = Err(error);
901 }
902 }
903 }
904 }
905 result
906}
907
908pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
909 let server = Server::new(app.state().clone(), rpc.clone(), None);
910 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
911 let server = server.clone();
912 async move {
913 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
914
915 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
916 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
917 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
918 let client_protocol_version: Option<u32> = request
919 .header("X-Zed-Protocol-Version")
920 .and_then(|v| v.as_str().parse().ok());
921
922 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
923 return Ok(Response::new(StatusCode::UpgradeRequired));
924 }
925
926 let header = match request.header("Sec-Websocket-Key") {
927 Some(h) => h.as_str(),
928 None => return Err(anyhow!("expected sec-websocket-key"))?,
929 };
930
931 let user_id = process_auth_header(&request).await?;
932
933 let mut response = Response::new(StatusCode::SwitchingProtocols);
934 response.insert_header(UPGRADE, "websocket");
935 response.insert_header(CONNECTION, "Upgrade");
936 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
937 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
938 response.insert_header("Sec-Websocket-Version", "13");
939
940 let http_res: &mut tide::http::Response = response.as_mut();
941 let upgrade_receiver = http_res.recv_upgrade().await;
942 let addr = request.remote().unwrap_or("unknown").to_string();
943 task::spawn(async move {
944 if let Some(stream) = upgrade_receiver.await {
945 server
946 .handle_connection(
947 Connection::new(
948 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
949 ),
950 addr,
951 user_id,
952 None,
953 RealExecutor,
954 )
955 .await;
956 }
957 });
958
959 Ok(response)
960 }
961 });
962}
963
964fn header_contains_ignore_case<T>(
965 request: &tide::Request<T>,
966 header_name: HeaderName,
967 value: &str,
968) -> bool {
969 request
970 .header(header_name)
971 .map(|h| {
972 h.as_str()
973 .split(',')
974 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
975 })
976 .unwrap_or(false)
977}
978
979#[cfg(test)]
980mod tests {
981 use super::*;
982 use crate::{
983 auth,
984 db::{tests::TestDb, UserId},
985 github, AppState, Config,
986 };
987 use ::rpc::Peer;
988 use client::{
989 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
990 EstablishConnectionError, UserStore,
991 };
992 use collections::BTreeMap;
993 use editor::{
994 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
995 Redo, Rename, ToOffset, ToggleCodeActions, Undo,
996 };
997 use gpui::{executor, ModelHandle, TestAppContext};
998 use language::{
999 tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig, LanguageRegistry,
1000 LanguageServerConfig, OffsetRangeExt, Point, ToLspPosition,
1001 };
1002 use lsp;
1003 use parking_lot::Mutex;
1004 use postage::{barrier, watch};
1005 use project::{
1006 fs::{FakeFs, Fs as _},
1007 search::SearchQuery,
1008 worktree::WorktreeHandle,
1009 DiagnosticSummary, Project, ProjectPath,
1010 };
1011 use rand::prelude::*;
1012 use rpc::PeerId;
1013 use serde_json::json;
1014 use sqlx::types::time::OffsetDateTime;
1015 use std::{
1016 cell::Cell,
1017 env,
1018 ops::Deref,
1019 path::{Path, PathBuf},
1020 rc::Rc,
1021 sync::{
1022 atomic::{AtomicBool, Ordering::SeqCst},
1023 Arc,
1024 },
1025 time::Duration,
1026 };
1027 use workspace::{Settings, Workspace, WorkspaceParams};
1028
1029 #[cfg(test)]
1030 #[ctor::ctor]
1031 fn init_logger() {
1032 if std::env::var("RUST_LOG").is_ok() {
1033 env_logger::init();
1034 }
1035 }
1036
1037 #[gpui::test(iterations = 10)]
1038 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1039 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1040 let lang_registry = Arc::new(LanguageRegistry::test());
1041 let fs = FakeFs::new(cx_a.background());
1042 cx_a.foreground().forbid_parking();
1043
1044 // Connect to a server as 2 clients.
1045 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1046 let client_a = server.create_client(cx_a, "user_a").await;
1047 let client_b = server.create_client(cx_b, "user_b").await;
1048
1049 // Share a project as client A
1050 fs.insert_tree(
1051 "/a",
1052 json!({
1053 ".zed.toml": r#"collaborators = ["user_b"]"#,
1054 "a.txt": "a-contents",
1055 "b.txt": "b-contents",
1056 }),
1057 )
1058 .await;
1059 let project_a = cx_a.update(|cx| {
1060 Project::local(
1061 client_a.clone(),
1062 client_a.user_store.clone(),
1063 lang_registry.clone(),
1064 fs.clone(),
1065 cx,
1066 )
1067 });
1068 let (worktree_a, _) = project_a
1069 .update(cx_a, |p, cx| {
1070 p.find_or_create_local_worktree("/a", true, cx)
1071 })
1072 .await
1073 .unwrap();
1074 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1075 worktree_a
1076 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1077 .await;
1078 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1079 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1080
1081 // Join that project as client B
1082 let project_b = Project::remote(
1083 project_id,
1084 client_b.clone(),
1085 client_b.user_store.clone(),
1086 lang_registry.clone(),
1087 fs.clone(),
1088 &mut cx_b.to_async(),
1089 )
1090 .await
1091 .unwrap();
1092
1093 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1094 assert_eq!(
1095 project
1096 .collaborators()
1097 .get(&client_a.peer_id)
1098 .unwrap()
1099 .user
1100 .github_login,
1101 "user_a"
1102 );
1103 project.replica_id()
1104 });
1105 project_a
1106 .condition(&cx_a, |tree, _| {
1107 tree.collaborators()
1108 .get(&client_b.peer_id)
1109 .map_or(false, |collaborator| {
1110 collaborator.replica_id == replica_id_b
1111 && collaborator.user.github_login == "user_b"
1112 })
1113 })
1114 .await;
1115
1116 // Open the same file as client B and client A.
1117 let buffer_b = project_b
1118 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1119 .await
1120 .unwrap();
1121 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1122 buffer_b.read_with(cx_b, |buf, cx| {
1123 assert_eq!(buf.read(cx).text(), "b-contents")
1124 });
1125 project_a.read_with(cx_a, |project, cx| {
1126 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1127 });
1128 let buffer_a = project_a
1129 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1130 .await
1131 .unwrap();
1132
1133 let editor_b = cx_b.add_view(window_b, |cx| {
1134 Editor::for_buffer(
1135 buffer_b,
1136 None,
1137 watch::channel_with(Settings::test(cx)).1,
1138 cx,
1139 )
1140 });
1141
1142 // TODO
1143 // // Create a selection set as client B and see that selection set as client A.
1144 // buffer_a
1145 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1146 // .await;
1147
1148 // Edit the buffer as client B and see that edit as client A.
1149 editor_b.update(cx_b, |editor, cx| {
1150 editor.handle_input(&Input("ok, ".into()), cx)
1151 });
1152 buffer_a
1153 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1154 .await;
1155
1156 // TODO
1157 // // Remove the selection set as client B, see those selections disappear as client A.
1158 cx_b.update(move |_| drop(editor_b));
1159 // buffer_a
1160 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1161 // .await;
1162
1163 // Dropping the client B's project removes client B from client A's collaborators.
1164 cx_b.update(move |_| drop(project_b));
1165 project_a
1166 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1167 .await;
1168 }
1169
1170 #[gpui::test(iterations = 10)]
1171 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1172 let lang_registry = Arc::new(LanguageRegistry::test());
1173 let fs = FakeFs::new(cx_a.background());
1174 cx_a.foreground().forbid_parking();
1175
1176 // Connect to a server as 2 clients.
1177 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1178 let client_a = server.create_client(cx_a, "user_a").await;
1179 let client_b = server.create_client(cx_b, "user_b").await;
1180
1181 // Share a project as client A
1182 fs.insert_tree(
1183 "/a",
1184 json!({
1185 ".zed.toml": r#"collaborators = ["user_b"]"#,
1186 "a.txt": "a-contents",
1187 "b.txt": "b-contents",
1188 }),
1189 )
1190 .await;
1191 let project_a = cx_a.update(|cx| {
1192 Project::local(
1193 client_a.clone(),
1194 client_a.user_store.clone(),
1195 lang_registry.clone(),
1196 fs.clone(),
1197 cx,
1198 )
1199 });
1200 let (worktree_a, _) = project_a
1201 .update(cx_a, |p, cx| {
1202 p.find_or_create_local_worktree("/a", true, cx)
1203 })
1204 .await
1205 .unwrap();
1206 worktree_a
1207 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1208 .await;
1209 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1210 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1211 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1212 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1213
1214 // Join that project as client B
1215 let project_b = Project::remote(
1216 project_id,
1217 client_b.clone(),
1218 client_b.user_store.clone(),
1219 lang_registry.clone(),
1220 fs.clone(),
1221 &mut cx_b.to_async(),
1222 )
1223 .await
1224 .unwrap();
1225 project_b
1226 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1227 .await
1228 .unwrap();
1229
1230 // Unshare the project as client A
1231 project_a
1232 .update(cx_a, |project, cx| project.unshare(cx))
1233 .await
1234 .unwrap();
1235 project_b
1236 .condition(cx_b, |project, _| project.is_read_only())
1237 .await;
1238 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1239 cx_b.update(|_| {
1240 drop(project_b);
1241 });
1242
1243 // Share the project again and ensure guests can still join.
1244 project_a
1245 .update(cx_a, |project, cx| project.share(cx))
1246 .await
1247 .unwrap();
1248 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1249
1250 let project_b2 = Project::remote(
1251 project_id,
1252 client_b.clone(),
1253 client_b.user_store.clone(),
1254 lang_registry.clone(),
1255 fs.clone(),
1256 &mut cx_b.to_async(),
1257 )
1258 .await
1259 .unwrap();
1260 project_b2
1261 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1262 .await
1263 .unwrap();
1264 }
1265
1266 #[gpui::test(iterations = 10)]
1267 async fn test_propagate_saves_and_fs_changes(
1268 cx_a: &mut TestAppContext,
1269 cx_b: &mut TestAppContext,
1270 cx_c: &mut TestAppContext,
1271 ) {
1272 let lang_registry = Arc::new(LanguageRegistry::test());
1273 let fs = FakeFs::new(cx_a.background());
1274 cx_a.foreground().forbid_parking();
1275
1276 // Connect to a server as 3 clients.
1277 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1278 let client_a = server.create_client(cx_a, "user_a").await;
1279 let client_b = server.create_client(cx_b, "user_b").await;
1280 let client_c = server.create_client(cx_c, "user_c").await;
1281
1282 // Share a worktree as client A.
1283 fs.insert_tree(
1284 "/a",
1285 json!({
1286 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1287 "file1": "",
1288 "file2": ""
1289 }),
1290 )
1291 .await;
1292 let project_a = cx_a.update(|cx| {
1293 Project::local(
1294 client_a.clone(),
1295 client_a.user_store.clone(),
1296 lang_registry.clone(),
1297 fs.clone(),
1298 cx,
1299 )
1300 });
1301 let (worktree_a, _) = project_a
1302 .update(cx_a, |p, cx| {
1303 p.find_or_create_local_worktree("/a", true, cx)
1304 })
1305 .await
1306 .unwrap();
1307 worktree_a
1308 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1309 .await;
1310 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1311 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1312 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1313
1314 // Join that worktree as clients B and C.
1315 let project_b = Project::remote(
1316 project_id,
1317 client_b.clone(),
1318 client_b.user_store.clone(),
1319 lang_registry.clone(),
1320 fs.clone(),
1321 &mut cx_b.to_async(),
1322 )
1323 .await
1324 .unwrap();
1325 let project_c = Project::remote(
1326 project_id,
1327 client_c.clone(),
1328 client_c.user_store.clone(),
1329 lang_registry.clone(),
1330 fs.clone(),
1331 &mut cx_c.to_async(),
1332 )
1333 .await
1334 .unwrap();
1335 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1336 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1337
1338 // Open and edit a buffer as both guests B and C.
1339 let buffer_b = project_b
1340 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1341 .await
1342 .unwrap();
1343 let buffer_c = project_c
1344 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1345 .await
1346 .unwrap();
1347 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1348 buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1349
1350 // Open and edit that buffer as the host.
1351 let buffer_a = project_a
1352 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1353 .await
1354 .unwrap();
1355
1356 buffer_a
1357 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1358 .await;
1359 buffer_a.update(cx_a, |buf, cx| {
1360 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1361 });
1362
1363 // Wait for edits to propagate
1364 buffer_a
1365 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1366 .await;
1367 buffer_b
1368 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1369 .await;
1370 buffer_c
1371 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1372 .await;
1373
1374 // Edit the buffer as the host and concurrently save as guest B.
1375 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1376 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1377 save_b.await.unwrap();
1378 assert_eq!(
1379 fs.load("/a/file1".as_ref()).await.unwrap(),
1380 "hi-a, i-am-c, i-am-b, i-am-a"
1381 );
1382 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1383 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1384 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1385
1386 worktree_a.flush_fs_events(cx_a).await;
1387
1388 // Make changes on host's file system, see those changes on guest worktrees.
1389 fs.rename(
1390 "/a/file1".as_ref(),
1391 "/a/file1-renamed".as_ref(),
1392 Default::default(),
1393 )
1394 .await
1395 .unwrap();
1396
1397 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1398 .await
1399 .unwrap();
1400 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1401
1402 worktree_a
1403 .condition(&cx_a, |tree, _| {
1404 tree.paths()
1405 .map(|p| p.to_string_lossy())
1406 .collect::<Vec<_>>()
1407 == [".zed.toml", "file1-renamed", "file3", "file4"]
1408 })
1409 .await;
1410 worktree_b
1411 .condition(&cx_b, |tree, _| {
1412 tree.paths()
1413 .map(|p| p.to_string_lossy())
1414 .collect::<Vec<_>>()
1415 == [".zed.toml", "file1-renamed", "file3", "file4"]
1416 })
1417 .await;
1418 worktree_c
1419 .condition(&cx_c, |tree, _| {
1420 tree.paths()
1421 .map(|p| p.to_string_lossy())
1422 .collect::<Vec<_>>()
1423 == [".zed.toml", "file1-renamed", "file3", "file4"]
1424 })
1425 .await;
1426
1427 // Ensure buffer files are updated as well.
1428 buffer_a
1429 .condition(&cx_a, |buf, _| {
1430 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1431 })
1432 .await;
1433 buffer_b
1434 .condition(&cx_b, |buf, _| {
1435 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1436 })
1437 .await;
1438 buffer_c
1439 .condition(&cx_c, |buf, _| {
1440 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1441 })
1442 .await;
1443 }
1444
1445 #[gpui::test(iterations = 10)]
1446 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1447 cx_a.foreground().forbid_parking();
1448 let lang_registry = Arc::new(LanguageRegistry::test());
1449 let fs = FakeFs::new(cx_a.background());
1450
1451 // Connect to a server as 2 clients.
1452 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1453 let client_a = server.create_client(cx_a, "user_a").await;
1454 let client_b = server.create_client(cx_b, "user_b").await;
1455
1456 // Share a project as client A
1457 fs.insert_tree(
1458 "/dir",
1459 json!({
1460 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1461 "a.txt": "a-contents",
1462 }),
1463 )
1464 .await;
1465
1466 let project_a = cx_a.update(|cx| {
1467 Project::local(
1468 client_a.clone(),
1469 client_a.user_store.clone(),
1470 lang_registry.clone(),
1471 fs.clone(),
1472 cx,
1473 )
1474 });
1475 let (worktree_a, _) = project_a
1476 .update(cx_a, |p, cx| {
1477 p.find_or_create_local_worktree("/dir", true, cx)
1478 })
1479 .await
1480 .unwrap();
1481 worktree_a
1482 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1483 .await;
1484 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1485 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1486 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1487
1488 // Join that project as client B
1489 let project_b = Project::remote(
1490 project_id,
1491 client_b.clone(),
1492 client_b.user_store.clone(),
1493 lang_registry.clone(),
1494 fs.clone(),
1495 &mut cx_b.to_async(),
1496 )
1497 .await
1498 .unwrap();
1499
1500 // Open a buffer as client B
1501 let buffer_b = project_b
1502 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1503 .await
1504 .unwrap();
1505
1506 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1507 buffer_b.read_with(cx_b, |buf, _| {
1508 assert!(buf.is_dirty());
1509 assert!(!buf.has_conflict());
1510 });
1511
1512 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1513 buffer_b
1514 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1515 .await;
1516 buffer_b.read_with(cx_b, |buf, _| {
1517 assert!(!buf.has_conflict());
1518 });
1519
1520 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1521 buffer_b.read_with(cx_b, |buf, _| {
1522 assert!(buf.is_dirty());
1523 assert!(!buf.has_conflict());
1524 });
1525 }
1526
1527 #[gpui::test(iterations = 10)]
1528 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1529 cx_a.foreground().forbid_parking();
1530 let lang_registry = Arc::new(LanguageRegistry::test());
1531 let fs = FakeFs::new(cx_a.background());
1532
1533 // Connect to a server as 2 clients.
1534 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1535 let client_a = server.create_client(cx_a, "user_a").await;
1536 let client_b = server.create_client(cx_b, "user_b").await;
1537
1538 // Share a project as client A
1539 fs.insert_tree(
1540 "/dir",
1541 json!({
1542 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1543 "a.txt": "a-contents",
1544 }),
1545 )
1546 .await;
1547
1548 let project_a = cx_a.update(|cx| {
1549 Project::local(
1550 client_a.clone(),
1551 client_a.user_store.clone(),
1552 lang_registry.clone(),
1553 fs.clone(),
1554 cx,
1555 )
1556 });
1557 let (worktree_a, _) = project_a
1558 .update(cx_a, |p, cx| {
1559 p.find_or_create_local_worktree("/dir", true, cx)
1560 })
1561 .await
1562 .unwrap();
1563 worktree_a
1564 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1565 .await;
1566 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1567 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1568 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1569
1570 // Join that project as client B
1571 let project_b = Project::remote(
1572 project_id,
1573 client_b.clone(),
1574 client_b.user_store.clone(),
1575 lang_registry.clone(),
1576 fs.clone(),
1577 &mut cx_b.to_async(),
1578 )
1579 .await
1580 .unwrap();
1581 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1582
1583 // Open a buffer as client B
1584 let buffer_b = project_b
1585 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1586 .await
1587 .unwrap();
1588 buffer_b.read_with(cx_b, |buf, _| {
1589 assert!(!buf.is_dirty());
1590 assert!(!buf.has_conflict());
1591 });
1592
1593 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1594 .await
1595 .unwrap();
1596 buffer_b
1597 .condition(&cx_b, |buf, _| {
1598 buf.text() == "new contents" && !buf.is_dirty()
1599 })
1600 .await;
1601 buffer_b.read_with(cx_b, |buf, _| {
1602 assert!(!buf.has_conflict());
1603 });
1604 }
1605
1606 #[gpui::test(iterations = 10)]
1607 async fn test_editing_while_guest_opens_buffer(
1608 cx_a: &mut TestAppContext,
1609 cx_b: &mut TestAppContext,
1610 ) {
1611 cx_a.foreground().forbid_parking();
1612 let lang_registry = Arc::new(LanguageRegistry::test());
1613 let fs = FakeFs::new(cx_a.background());
1614
1615 // Connect to a server as 2 clients.
1616 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1617 let client_a = server.create_client(cx_a, "user_a").await;
1618 let client_b = server.create_client(cx_b, "user_b").await;
1619
1620 // Share a project as client A
1621 fs.insert_tree(
1622 "/dir",
1623 json!({
1624 ".zed.toml": r#"collaborators = ["user_b"]"#,
1625 "a.txt": "a-contents",
1626 }),
1627 )
1628 .await;
1629 let project_a = cx_a.update(|cx| {
1630 Project::local(
1631 client_a.clone(),
1632 client_a.user_store.clone(),
1633 lang_registry.clone(),
1634 fs.clone(),
1635 cx,
1636 )
1637 });
1638 let (worktree_a, _) = project_a
1639 .update(cx_a, |p, cx| {
1640 p.find_or_create_local_worktree("/dir", true, cx)
1641 })
1642 .await
1643 .unwrap();
1644 worktree_a
1645 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1646 .await;
1647 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1648 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1649 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1650
1651 // Join that project as client B
1652 let project_b = Project::remote(
1653 project_id,
1654 client_b.clone(),
1655 client_b.user_store.clone(),
1656 lang_registry.clone(),
1657 fs.clone(),
1658 &mut cx_b.to_async(),
1659 )
1660 .await
1661 .unwrap();
1662
1663 // Open a buffer as client A
1664 let buffer_a = project_a
1665 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1666 .await
1667 .unwrap();
1668
1669 // Start opening the same buffer as client B
1670 let buffer_b = cx_b
1671 .background()
1672 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1673
1674 // Edit the buffer as client A while client B is still opening it.
1675 cx_b.background().simulate_random_delay().await;
1676 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1677 cx_b.background().simulate_random_delay().await;
1678 buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1679
1680 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1681 let buffer_b = buffer_b.await.unwrap();
1682 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1683 }
1684
1685 #[gpui::test(iterations = 10)]
1686 async fn test_leaving_worktree_while_opening_buffer(
1687 cx_a: &mut TestAppContext,
1688 cx_b: &mut TestAppContext,
1689 ) {
1690 cx_a.foreground().forbid_parking();
1691 let lang_registry = Arc::new(LanguageRegistry::test());
1692 let fs = FakeFs::new(cx_a.background());
1693
1694 // Connect to a server as 2 clients.
1695 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1696 let client_a = server.create_client(cx_a, "user_a").await;
1697 let client_b = server.create_client(cx_b, "user_b").await;
1698
1699 // Share a project as client A
1700 fs.insert_tree(
1701 "/dir",
1702 json!({
1703 ".zed.toml": r#"collaborators = ["user_b"]"#,
1704 "a.txt": "a-contents",
1705 }),
1706 )
1707 .await;
1708 let project_a = cx_a.update(|cx| {
1709 Project::local(
1710 client_a.clone(),
1711 client_a.user_store.clone(),
1712 lang_registry.clone(),
1713 fs.clone(),
1714 cx,
1715 )
1716 });
1717 let (worktree_a, _) = project_a
1718 .update(cx_a, |p, cx| {
1719 p.find_or_create_local_worktree("/dir", true, cx)
1720 })
1721 .await
1722 .unwrap();
1723 worktree_a
1724 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1725 .await;
1726 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1727 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1728 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1729
1730 // Join that project as client B
1731 let project_b = Project::remote(
1732 project_id,
1733 client_b.clone(),
1734 client_b.user_store.clone(),
1735 lang_registry.clone(),
1736 fs.clone(),
1737 &mut cx_b.to_async(),
1738 )
1739 .await
1740 .unwrap();
1741
1742 // See that a guest has joined as client A.
1743 project_a
1744 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1745 .await;
1746
1747 // Begin opening a buffer as client B, but leave the project before the open completes.
1748 let buffer_b = cx_b
1749 .background()
1750 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1751 cx_b.update(|_| drop(project_b));
1752 drop(buffer_b);
1753
1754 // See that the guest has left.
1755 project_a
1756 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1757 .await;
1758 }
1759
1760 #[gpui::test(iterations = 10)]
1761 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1762 cx_a.foreground().forbid_parking();
1763 let lang_registry = Arc::new(LanguageRegistry::test());
1764 let fs = FakeFs::new(cx_a.background());
1765
1766 // Connect to a server as 2 clients.
1767 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1768 let client_a = server.create_client(cx_a, "user_a").await;
1769 let client_b = server.create_client(cx_b, "user_b").await;
1770
1771 // Share a project as client A
1772 fs.insert_tree(
1773 "/a",
1774 json!({
1775 ".zed.toml": r#"collaborators = ["user_b"]"#,
1776 "a.txt": "a-contents",
1777 "b.txt": "b-contents",
1778 }),
1779 )
1780 .await;
1781 let project_a = cx_a.update(|cx| {
1782 Project::local(
1783 client_a.clone(),
1784 client_a.user_store.clone(),
1785 lang_registry.clone(),
1786 fs.clone(),
1787 cx,
1788 )
1789 });
1790 let (worktree_a, _) = project_a
1791 .update(cx_a, |p, cx| {
1792 p.find_or_create_local_worktree("/a", true, cx)
1793 })
1794 .await
1795 .unwrap();
1796 worktree_a
1797 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1798 .await;
1799 let project_id = project_a
1800 .update(cx_a, |project, _| project.next_remote_id())
1801 .await;
1802 project_a
1803 .update(cx_a, |project, cx| project.share(cx))
1804 .await
1805 .unwrap();
1806
1807 // Join that project as client B
1808 let _project_b = Project::remote(
1809 project_id,
1810 client_b.clone(),
1811 client_b.user_store.clone(),
1812 lang_registry.clone(),
1813 fs.clone(),
1814 &mut cx_b.to_async(),
1815 )
1816 .await
1817 .unwrap();
1818
1819 // Client A sees that a guest has joined.
1820 project_a
1821 .condition(cx_a, |p, _| p.collaborators().len() == 1)
1822 .await;
1823
1824 // Drop client B's connection and ensure client A observes client B leaving the project.
1825 client_b.disconnect(&cx_b.to_async()).unwrap();
1826 project_a
1827 .condition(cx_a, |p, _| p.collaborators().len() == 0)
1828 .await;
1829
1830 // Rejoin the project as client B
1831 let _project_b = Project::remote(
1832 project_id,
1833 client_b.clone(),
1834 client_b.user_store.clone(),
1835 lang_registry.clone(),
1836 fs.clone(),
1837 &mut cx_b.to_async(),
1838 )
1839 .await
1840 .unwrap();
1841
1842 // Client A sees that a guest has re-joined.
1843 project_a
1844 .condition(cx_a, |p, _| p.collaborators().len() == 1)
1845 .await;
1846
1847 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1848 client_b.wait_for_current_user(cx_b).await;
1849 server.disconnect_client(client_b.current_user_id(cx_b));
1850 cx_a.foreground().advance_clock(Duration::from_secs(3));
1851 project_a
1852 .condition(cx_a, |p, _| p.collaborators().len() == 0)
1853 .await;
1854 }
1855
1856 #[gpui::test(iterations = 10)]
1857 async fn test_collaborating_with_diagnostics(
1858 cx_a: &mut TestAppContext,
1859 cx_b: &mut TestAppContext,
1860 ) {
1861 cx_a.foreground().forbid_parking();
1862 let mut lang_registry = Arc::new(LanguageRegistry::test());
1863 let fs = FakeFs::new(cx_a.background());
1864
1865 // Set up a fake language server.
1866 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1867 Arc::get_mut(&mut lang_registry)
1868 .unwrap()
1869 .add(Arc::new(Language::new(
1870 LanguageConfig {
1871 name: "Rust".into(),
1872 path_suffixes: vec!["rs".to_string()],
1873 language_server: Some(language_server_config),
1874 ..Default::default()
1875 },
1876 Some(tree_sitter_rust::language()),
1877 )));
1878
1879 // Connect to a server as 2 clients.
1880 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1881 let client_a = server.create_client(cx_a, "user_a").await;
1882 let client_b = server.create_client(cx_b, "user_b").await;
1883
1884 // Share a project as client A
1885 fs.insert_tree(
1886 "/a",
1887 json!({
1888 ".zed.toml": r#"collaborators = ["user_b"]"#,
1889 "a.rs": "let one = two",
1890 "other.rs": "",
1891 }),
1892 )
1893 .await;
1894 let project_a = cx_a.update(|cx| {
1895 Project::local(
1896 client_a.clone(),
1897 client_a.user_store.clone(),
1898 lang_registry.clone(),
1899 fs.clone(),
1900 cx,
1901 )
1902 });
1903 let (worktree_a, _) = project_a
1904 .update(cx_a, |p, cx| {
1905 p.find_or_create_local_worktree("/a", true, cx)
1906 })
1907 .await
1908 .unwrap();
1909 worktree_a
1910 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1911 .await;
1912 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1913 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1914 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1915
1916 // Cause the language server to start.
1917 let _ = cx_a
1918 .background()
1919 .spawn(project_a.update(cx_a, |project, cx| {
1920 project.open_buffer(
1921 ProjectPath {
1922 worktree_id,
1923 path: Path::new("other.rs").into(),
1924 },
1925 cx,
1926 )
1927 }))
1928 .await
1929 .unwrap();
1930
1931 // Simulate a language server reporting errors for a file.
1932 let mut fake_language_server = fake_language_servers.next().await.unwrap();
1933 fake_language_server
1934 .receive_notification::<lsp::notification::DidOpenTextDocument>()
1935 .await;
1936 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
1937 lsp::PublishDiagnosticsParams {
1938 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1939 version: None,
1940 diagnostics: vec![lsp::Diagnostic {
1941 severity: Some(lsp::DiagnosticSeverity::ERROR),
1942 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1943 message: "message 1".to_string(),
1944 ..Default::default()
1945 }],
1946 },
1947 );
1948
1949 // Wait for server to see the diagnostics update.
1950 server
1951 .condition(|store| {
1952 let worktree = store
1953 .project(project_id)
1954 .unwrap()
1955 .share
1956 .as_ref()
1957 .unwrap()
1958 .worktrees
1959 .get(&worktree_id.to_proto())
1960 .unwrap();
1961
1962 !worktree.diagnostic_summaries.is_empty()
1963 })
1964 .await;
1965
1966 // Join the worktree as client B.
1967 let project_b = Project::remote(
1968 project_id,
1969 client_b.clone(),
1970 client_b.user_store.clone(),
1971 lang_registry.clone(),
1972 fs.clone(),
1973 &mut cx_b.to_async(),
1974 )
1975 .await
1976 .unwrap();
1977
1978 project_b.read_with(cx_b, |project, cx| {
1979 assert_eq!(
1980 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1981 &[(
1982 ProjectPath {
1983 worktree_id,
1984 path: Arc::from(Path::new("a.rs")),
1985 },
1986 DiagnosticSummary {
1987 error_count: 1,
1988 warning_count: 0,
1989 ..Default::default()
1990 },
1991 )]
1992 )
1993 });
1994
1995 // Simulate a language server reporting more errors for a file.
1996 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
1997 lsp::PublishDiagnosticsParams {
1998 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1999 version: None,
2000 diagnostics: vec![
2001 lsp::Diagnostic {
2002 severity: Some(lsp::DiagnosticSeverity::ERROR),
2003 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2004 message: "message 1".to_string(),
2005 ..Default::default()
2006 },
2007 lsp::Diagnostic {
2008 severity: Some(lsp::DiagnosticSeverity::WARNING),
2009 range: lsp::Range::new(
2010 lsp::Position::new(0, 10),
2011 lsp::Position::new(0, 13),
2012 ),
2013 message: "message 2".to_string(),
2014 ..Default::default()
2015 },
2016 ],
2017 },
2018 );
2019
2020 // Client b gets the updated summaries
2021 project_b
2022 .condition(&cx_b, |project, cx| {
2023 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2024 == &[(
2025 ProjectPath {
2026 worktree_id,
2027 path: Arc::from(Path::new("a.rs")),
2028 },
2029 DiagnosticSummary {
2030 error_count: 1,
2031 warning_count: 1,
2032 ..Default::default()
2033 },
2034 )]
2035 })
2036 .await;
2037
2038 // Open the file with the errors on client B. They should be present.
2039 let buffer_b = cx_b
2040 .background()
2041 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2042 .await
2043 .unwrap();
2044
2045 buffer_b.read_with(cx_b, |buffer, _| {
2046 assert_eq!(
2047 buffer
2048 .snapshot()
2049 .diagnostics_in_range::<_, Point>(0..buffer.len())
2050 .map(|entry| entry)
2051 .collect::<Vec<_>>(),
2052 &[
2053 DiagnosticEntry {
2054 range: Point::new(0, 4)..Point::new(0, 7),
2055 diagnostic: Diagnostic {
2056 group_id: 0,
2057 message: "message 1".to_string(),
2058 severity: lsp::DiagnosticSeverity::ERROR,
2059 is_primary: true,
2060 ..Default::default()
2061 }
2062 },
2063 DiagnosticEntry {
2064 range: Point::new(0, 10)..Point::new(0, 13),
2065 diagnostic: Diagnostic {
2066 group_id: 1,
2067 severity: lsp::DiagnosticSeverity::WARNING,
2068 message: "message 2".to_string(),
2069 is_primary: true,
2070 ..Default::default()
2071 }
2072 }
2073 ]
2074 );
2075 });
2076 }
2077
2078 #[gpui::test(iterations = 10)]
2079 async fn test_collaborating_with_completion(
2080 cx_a: &mut TestAppContext,
2081 cx_b: &mut TestAppContext,
2082 ) {
2083 cx_a.foreground().forbid_parking();
2084 let mut lang_registry = Arc::new(LanguageRegistry::test());
2085 let fs = FakeFs::new(cx_a.background());
2086
2087 // Set up a fake language server.
2088 let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2089 language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2090 completion_provider: Some(lsp::CompletionOptions {
2091 trigger_characters: Some(vec![".".to_string()]),
2092 ..Default::default()
2093 }),
2094 ..Default::default()
2095 });
2096 Arc::get_mut(&mut lang_registry)
2097 .unwrap()
2098 .add(Arc::new(Language::new(
2099 LanguageConfig {
2100 name: "Rust".into(),
2101 path_suffixes: vec!["rs".to_string()],
2102 language_server: Some(language_server_config),
2103 ..Default::default()
2104 },
2105 Some(tree_sitter_rust::language()),
2106 )));
2107
2108 // Connect to a server as 2 clients.
2109 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2110 let client_a = server.create_client(cx_a, "user_a").await;
2111 let client_b = server.create_client(cx_b, "user_b").await;
2112
2113 // Share a project as client A
2114 fs.insert_tree(
2115 "/a",
2116 json!({
2117 ".zed.toml": r#"collaborators = ["user_b"]"#,
2118 "main.rs": "fn main() { a }",
2119 "other.rs": "",
2120 }),
2121 )
2122 .await;
2123 let project_a = cx_a.update(|cx| {
2124 Project::local(
2125 client_a.clone(),
2126 client_a.user_store.clone(),
2127 lang_registry.clone(),
2128 fs.clone(),
2129 cx,
2130 )
2131 });
2132 let (worktree_a, _) = project_a
2133 .update(cx_a, |p, cx| {
2134 p.find_or_create_local_worktree("/a", true, cx)
2135 })
2136 .await
2137 .unwrap();
2138 worktree_a
2139 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2140 .await;
2141 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2142 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2143 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2144
2145 // Join the worktree as client B.
2146 let project_b = Project::remote(
2147 project_id,
2148 client_b.clone(),
2149 client_b.user_store.clone(),
2150 lang_registry.clone(),
2151 fs.clone(),
2152 &mut cx_b.to_async(),
2153 )
2154 .await
2155 .unwrap();
2156
2157 // Open a file in an editor as the guest.
2158 let buffer_b = project_b
2159 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2160 .await
2161 .unwrap();
2162 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2163 let editor_b = cx_b.add_view(window_b, |cx| {
2164 Editor::for_buffer(
2165 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2166 Some(project_b.clone()),
2167 watch::channel_with(Settings::test(cx)).1,
2168 cx,
2169 )
2170 });
2171
2172 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2173 buffer_b
2174 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2175 .await;
2176
2177 // Type a completion trigger character as the guest.
2178 editor_b.update(cx_b, |editor, cx| {
2179 editor.select_ranges([13..13], None, cx);
2180 editor.handle_input(&Input(".".into()), cx);
2181 cx.focus(&editor_b);
2182 });
2183
2184 // Receive a completion request as the host's language server.
2185 // Return some completions from the host's language server.
2186 cx_a.foreground().start_waiting();
2187 fake_language_server
2188 .handle_request::<lsp::request::Completion, _>(|params, _| {
2189 assert_eq!(
2190 params.text_document_position.text_document.uri,
2191 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2192 );
2193 assert_eq!(
2194 params.text_document_position.position,
2195 lsp::Position::new(0, 14),
2196 );
2197
2198 Some(lsp::CompletionResponse::Array(vec![
2199 lsp::CompletionItem {
2200 label: "first_method(…)".into(),
2201 detail: Some("fn(&mut self, B) -> C".into()),
2202 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2203 new_text: "first_method($1)".to_string(),
2204 range: lsp::Range::new(
2205 lsp::Position::new(0, 14),
2206 lsp::Position::new(0, 14),
2207 ),
2208 })),
2209 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2210 ..Default::default()
2211 },
2212 lsp::CompletionItem {
2213 label: "second_method(…)".into(),
2214 detail: Some("fn(&mut self, C) -> D<E>".into()),
2215 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2216 new_text: "second_method()".to_string(),
2217 range: lsp::Range::new(
2218 lsp::Position::new(0, 14),
2219 lsp::Position::new(0, 14),
2220 ),
2221 })),
2222 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2223 ..Default::default()
2224 },
2225 ]))
2226 })
2227 .next()
2228 .await
2229 .unwrap();
2230 cx_a.foreground().finish_waiting();
2231
2232 // Open the buffer on the host.
2233 let buffer_a = project_a
2234 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2235 .await
2236 .unwrap();
2237 buffer_a
2238 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2239 .await;
2240
2241 // Confirm a completion on the guest.
2242 editor_b
2243 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2244 .await;
2245 editor_b.update(cx_b, |editor, cx| {
2246 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2247 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2248 });
2249
2250 // Return a resolved completion from the host's language server.
2251 // The resolved completion has an additional text edit.
2252 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2253 |params, _| {
2254 assert_eq!(params.label, "first_method(…)");
2255 lsp::CompletionItem {
2256 label: "first_method(…)".into(),
2257 detail: Some("fn(&mut self, B) -> C".into()),
2258 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2259 new_text: "first_method($1)".to_string(),
2260 range: lsp::Range::new(
2261 lsp::Position::new(0, 14),
2262 lsp::Position::new(0, 14),
2263 ),
2264 })),
2265 additional_text_edits: Some(vec![lsp::TextEdit {
2266 new_text: "use d::SomeTrait;\n".to_string(),
2267 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2268 }]),
2269 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2270 ..Default::default()
2271 }
2272 },
2273 );
2274
2275 // The additional edit is applied.
2276 buffer_a
2277 .condition(&cx_a, |buffer, _| {
2278 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2279 })
2280 .await;
2281 buffer_b
2282 .condition(&cx_b, |buffer, _| {
2283 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2284 })
2285 .await;
2286 }
2287
2288 #[gpui::test(iterations = 10)]
2289 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2290 cx_a.foreground().forbid_parking();
2291 let mut lang_registry = Arc::new(LanguageRegistry::test());
2292 let fs = FakeFs::new(cx_a.background());
2293
2294 // Set up a fake language server.
2295 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2296 Arc::get_mut(&mut lang_registry)
2297 .unwrap()
2298 .add(Arc::new(Language::new(
2299 LanguageConfig {
2300 name: "Rust".into(),
2301 path_suffixes: vec!["rs".to_string()],
2302 language_server: Some(language_server_config),
2303 ..Default::default()
2304 },
2305 Some(tree_sitter_rust::language()),
2306 )));
2307
2308 // Connect to a server as 2 clients.
2309 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2310 let client_a = server.create_client(cx_a, "user_a").await;
2311 let client_b = server.create_client(cx_b, "user_b").await;
2312
2313 // Share a project as client A
2314 fs.insert_tree(
2315 "/a",
2316 json!({
2317 ".zed.toml": r#"collaborators = ["user_b"]"#,
2318 "a.rs": "let one = two",
2319 }),
2320 )
2321 .await;
2322 let project_a = cx_a.update(|cx| {
2323 Project::local(
2324 client_a.clone(),
2325 client_a.user_store.clone(),
2326 lang_registry.clone(),
2327 fs.clone(),
2328 cx,
2329 )
2330 });
2331 let (worktree_a, _) = project_a
2332 .update(cx_a, |p, cx| {
2333 p.find_or_create_local_worktree("/a", true, cx)
2334 })
2335 .await
2336 .unwrap();
2337 worktree_a
2338 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2339 .await;
2340 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2341 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2342 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2343
2344 // Join the worktree as client B.
2345 let project_b = Project::remote(
2346 project_id,
2347 client_b.clone(),
2348 client_b.user_store.clone(),
2349 lang_registry.clone(),
2350 fs.clone(),
2351 &mut cx_b.to_async(),
2352 )
2353 .await
2354 .unwrap();
2355
2356 let buffer_b = cx_b
2357 .background()
2358 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2359 .await
2360 .unwrap();
2361
2362 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2363 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2364 Some(vec![
2365 lsp::TextEdit {
2366 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2367 new_text: "h".to_string(),
2368 },
2369 lsp::TextEdit {
2370 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2371 new_text: "y".to_string(),
2372 },
2373 ])
2374 });
2375
2376 project_b
2377 .update(cx_b, |project, cx| {
2378 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2379 })
2380 .await
2381 .unwrap();
2382 assert_eq!(
2383 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2384 "let honey = two"
2385 );
2386 }
2387
2388 #[gpui::test(iterations = 10)]
2389 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2390 cx_a.foreground().forbid_parking();
2391 let mut lang_registry = Arc::new(LanguageRegistry::test());
2392 let fs = FakeFs::new(cx_a.background());
2393 fs.insert_tree(
2394 "/root-1",
2395 json!({
2396 ".zed.toml": r#"collaborators = ["user_b"]"#,
2397 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2398 }),
2399 )
2400 .await;
2401 fs.insert_tree(
2402 "/root-2",
2403 json!({
2404 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2405 }),
2406 )
2407 .await;
2408
2409 // Set up a fake language server.
2410 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2411 Arc::get_mut(&mut lang_registry)
2412 .unwrap()
2413 .add(Arc::new(Language::new(
2414 LanguageConfig {
2415 name: "Rust".into(),
2416 path_suffixes: vec!["rs".to_string()],
2417 language_server: Some(language_server_config),
2418 ..Default::default()
2419 },
2420 Some(tree_sitter_rust::language()),
2421 )));
2422
2423 // Connect to a server as 2 clients.
2424 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2425 let client_a = server.create_client(cx_a, "user_a").await;
2426 let client_b = server.create_client(cx_b, "user_b").await;
2427
2428 // Share a project as client A
2429 let project_a = cx_a.update(|cx| {
2430 Project::local(
2431 client_a.clone(),
2432 client_a.user_store.clone(),
2433 lang_registry.clone(),
2434 fs.clone(),
2435 cx,
2436 )
2437 });
2438 let (worktree_a, _) = project_a
2439 .update(cx_a, |p, cx| {
2440 p.find_or_create_local_worktree("/root-1", true, cx)
2441 })
2442 .await
2443 .unwrap();
2444 worktree_a
2445 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2446 .await;
2447 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2448 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2449 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2450
2451 // Join the worktree as client B.
2452 let project_b = Project::remote(
2453 project_id,
2454 client_b.clone(),
2455 client_b.user_store.clone(),
2456 lang_registry.clone(),
2457 fs.clone(),
2458 &mut cx_b.to_async(),
2459 )
2460 .await
2461 .unwrap();
2462
2463 // Open the file on client B.
2464 let buffer_b = cx_b
2465 .background()
2466 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2467 .await
2468 .unwrap();
2469
2470 // Request the definition of a symbol as the guest.
2471 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2472 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2473 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2474 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2475 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2476 )))
2477 });
2478
2479 let definitions_1 = project_b
2480 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2481 .await
2482 .unwrap();
2483 cx_b.read(|cx| {
2484 assert_eq!(definitions_1.len(), 1);
2485 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2486 let target_buffer = definitions_1[0].buffer.read(cx);
2487 assert_eq!(
2488 target_buffer.text(),
2489 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2490 );
2491 assert_eq!(
2492 definitions_1[0].range.to_point(target_buffer),
2493 Point::new(0, 6)..Point::new(0, 9)
2494 );
2495 });
2496
2497 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2498 // the previous call to `definition`.
2499 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2500 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2501 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2502 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2503 )))
2504 });
2505
2506 let definitions_2 = project_b
2507 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2508 .await
2509 .unwrap();
2510 cx_b.read(|cx| {
2511 assert_eq!(definitions_2.len(), 1);
2512 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2513 let target_buffer = definitions_2[0].buffer.read(cx);
2514 assert_eq!(
2515 target_buffer.text(),
2516 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2517 );
2518 assert_eq!(
2519 definitions_2[0].range.to_point(target_buffer),
2520 Point::new(1, 6)..Point::new(1, 11)
2521 );
2522 });
2523 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2524 }
2525
2526 #[gpui::test(iterations = 10)]
2527 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2528 cx_a.foreground().forbid_parking();
2529 let mut lang_registry = Arc::new(LanguageRegistry::test());
2530 let fs = FakeFs::new(cx_a.background());
2531 fs.insert_tree(
2532 "/root-1",
2533 json!({
2534 ".zed.toml": r#"collaborators = ["user_b"]"#,
2535 "one.rs": "const ONE: usize = 1;",
2536 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2537 }),
2538 )
2539 .await;
2540 fs.insert_tree(
2541 "/root-2",
2542 json!({
2543 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2544 }),
2545 )
2546 .await;
2547
2548 // Set up a fake language server.
2549 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2550 Arc::get_mut(&mut lang_registry)
2551 .unwrap()
2552 .add(Arc::new(Language::new(
2553 LanguageConfig {
2554 name: "Rust".into(),
2555 path_suffixes: vec!["rs".to_string()],
2556 language_server: Some(language_server_config),
2557 ..Default::default()
2558 },
2559 Some(tree_sitter_rust::language()),
2560 )));
2561
2562 // Connect to a server as 2 clients.
2563 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2564 let client_a = server.create_client(cx_a, "user_a").await;
2565 let client_b = server.create_client(cx_b, "user_b").await;
2566
2567 // Share a project as client A
2568 let project_a = cx_a.update(|cx| {
2569 Project::local(
2570 client_a.clone(),
2571 client_a.user_store.clone(),
2572 lang_registry.clone(),
2573 fs.clone(),
2574 cx,
2575 )
2576 });
2577 let (worktree_a, _) = project_a
2578 .update(cx_a, |p, cx| {
2579 p.find_or_create_local_worktree("/root-1", true, cx)
2580 })
2581 .await
2582 .unwrap();
2583 worktree_a
2584 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2585 .await;
2586 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2587 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2588 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2589
2590 // Join the worktree as client B.
2591 let project_b = Project::remote(
2592 project_id,
2593 client_b.clone(),
2594 client_b.user_store.clone(),
2595 lang_registry.clone(),
2596 fs.clone(),
2597 &mut cx_b.to_async(),
2598 )
2599 .await
2600 .unwrap();
2601
2602 // Open the file on client B.
2603 let buffer_b = cx_b
2604 .background()
2605 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2606 .await
2607 .unwrap();
2608
2609 // Request references to a symbol as the guest.
2610 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2611 fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2612 assert_eq!(
2613 params.text_document_position.text_document.uri.as_str(),
2614 "file:///root-1/one.rs"
2615 );
2616 Some(vec![
2617 lsp::Location {
2618 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2619 range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2620 },
2621 lsp::Location {
2622 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2623 range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2624 },
2625 lsp::Location {
2626 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2627 range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2628 },
2629 ])
2630 });
2631
2632 let references = project_b
2633 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
2634 .await
2635 .unwrap();
2636 cx_b.read(|cx| {
2637 assert_eq!(references.len(), 3);
2638 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2639
2640 let two_buffer = references[0].buffer.read(cx);
2641 let three_buffer = references[2].buffer.read(cx);
2642 assert_eq!(
2643 two_buffer.file().unwrap().path().as_ref(),
2644 Path::new("two.rs")
2645 );
2646 assert_eq!(references[1].buffer, references[0].buffer);
2647 assert_eq!(
2648 three_buffer.file().unwrap().full_path(cx),
2649 Path::new("three.rs")
2650 );
2651
2652 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2653 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2654 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2655 });
2656 }
2657
2658 #[gpui::test(iterations = 10)]
2659 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2660 cx_a.foreground().forbid_parking();
2661 let lang_registry = Arc::new(LanguageRegistry::test());
2662 let fs = FakeFs::new(cx_a.background());
2663 fs.insert_tree(
2664 "/root-1",
2665 json!({
2666 ".zed.toml": r#"collaborators = ["user_b"]"#,
2667 "a": "hello world",
2668 "b": "goodnight moon",
2669 "c": "a world of goo",
2670 "d": "world champion of clown world",
2671 }),
2672 )
2673 .await;
2674 fs.insert_tree(
2675 "/root-2",
2676 json!({
2677 "e": "disney world is fun",
2678 }),
2679 )
2680 .await;
2681
2682 // Connect to a server as 2 clients.
2683 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2684 let client_a = server.create_client(cx_a, "user_a").await;
2685 let client_b = server.create_client(cx_b, "user_b").await;
2686
2687 // Share a project as client A
2688 let project_a = cx_a.update(|cx| {
2689 Project::local(
2690 client_a.clone(),
2691 client_a.user_store.clone(),
2692 lang_registry.clone(),
2693 fs.clone(),
2694 cx,
2695 )
2696 });
2697 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2698
2699 let (worktree_1, _) = project_a
2700 .update(cx_a, |p, cx| {
2701 p.find_or_create_local_worktree("/root-1", true, cx)
2702 })
2703 .await
2704 .unwrap();
2705 worktree_1
2706 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2707 .await;
2708 let (worktree_2, _) = project_a
2709 .update(cx_a, |p, cx| {
2710 p.find_or_create_local_worktree("/root-2", true, cx)
2711 })
2712 .await
2713 .unwrap();
2714 worktree_2
2715 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2716 .await;
2717
2718 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2719
2720 // Join the worktree as client B.
2721 let project_b = Project::remote(
2722 project_id,
2723 client_b.clone(),
2724 client_b.user_store.clone(),
2725 lang_registry.clone(),
2726 fs.clone(),
2727 &mut cx_b.to_async(),
2728 )
2729 .await
2730 .unwrap();
2731
2732 let results = project_b
2733 .update(cx_b, |project, cx| {
2734 project.search(SearchQuery::text("world", false, false), cx)
2735 })
2736 .await
2737 .unwrap();
2738
2739 let mut ranges_by_path = results
2740 .into_iter()
2741 .map(|(buffer, ranges)| {
2742 buffer.read_with(cx_b, |buffer, cx| {
2743 let path = buffer.file().unwrap().full_path(cx);
2744 let offset_ranges = ranges
2745 .into_iter()
2746 .map(|range| range.to_offset(buffer))
2747 .collect::<Vec<_>>();
2748 (path, offset_ranges)
2749 })
2750 })
2751 .collect::<Vec<_>>();
2752 ranges_by_path.sort_by_key(|(path, _)| path.clone());
2753
2754 assert_eq!(
2755 ranges_by_path,
2756 &[
2757 (PathBuf::from("root-1/a"), vec![6..11]),
2758 (PathBuf::from("root-1/c"), vec![2..7]),
2759 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2760 (PathBuf::from("root-2/e"), vec![7..12]),
2761 ]
2762 );
2763 }
2764
2765 #[gpui::test(iterations = 10)]
2766 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2767 cx_a.foreground().forbid_parking();
2768 let lang_registry = Arc::new(LanguageRegistry::test());
2769 let fs = FakeFs::new(cx_a.background());
2770 fs.insert_tree(
2771 "/root-1",
2772 json!({
2773 ".zed.toml": r#"collaborators = ["user_b"]"#,
2774 "main.rs": "fn double(number: i32) -> i32 { number + number }",
2775 }),
2776 )
2777 .await;
2778
2779 // Set up a fake language server.
2780 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2781 lang_registry.add(Arc::new(Language::new(
2782 LanguageConfig {
2783 name: "Rust".into(),
2784 path_suffixes: vec!["rs".to_string()],
2785 language_server: Some(language_server_config),
2786 ..Default::default()
2787 },
2788 Some(tree_sitter_rust::language()),
2789 )));
2790
2791 // Connect to a server as 2 clients.
2792 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2793 let client_a = server.create_client(cx_a, "user_a").await;
2794 let client_b = server.create_client(cx_b, "user_b").await;
2795
2796 // Share a project as client A
2797 let project_a = cx_a.update(|cx| {
2798 Project::local(
2799 client_a.clone(),
2800 client_a.user_store.clone(),
2801 lang_registry.clone(),
2802 fs.clone(),
2803 cx,
2804 )
2805 });
2806 let (worktree_a, _) = project_a
2807 .update(cx_a, |p, cx| {
2808 p.find_or_create_local_worktree("/root-1", true, cx)
2809 })
2810 .await
2811 .unwrap();
2812 worktree_a
2813 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2814 .await;
2815 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2816 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2817 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2818
2819 // Join the worktree as client B.
2820 let project_b = Project::remote(
2821 project_id,
2822 client_b.clone(),
2823 client_b.user_store.clone(),
2824 lang_registry.clone(),
2825 fs.clone(),
2826 &mut cx_b.to_async(),
2827 )
2828 .await
2829 .unwrap();
2830
2831 // Open the file on client B.
2832 let buffer_b = cx_b
2833 .background()
2834 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2835 .await
2836 .unwrap();
2837
2838 // Request document highlights as the guest.
2839 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2840 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2841 |params, _| {
2842 assert_eq!(
2843 params
2844 .text_document_position_params
2845 .text_document
2846 .uri
2847 .as_str(),
2848 "file:///root-1/main.rs"
2849 );
2850 assert_eq!(
2851 params.text_document_position_params.position,
2852 lsp::Position::new(0, 34)
2853 );
2854 Some(vec![
2855 lsp::DocumentHighlight {
2856 kind: Some(lsp::DocumentHighlightKind::WRITE),
2857 range: lsp::Range::new(
2858 lsp::Position::new(0, 10),
2859 lsp::Position::new(0, 16),
2860 ),
2861 },
2862 lsp::DocumentHighlight {
2863 kind: Some(lsp::DocumentHighlightKind::READ),
2864 range: lsp::Range::new(
2865 lsp::Position::new(0, 32),
2866 lsp::Position::new(0, 38),
2867 ),
2868 },
2869 lsp::DocumentHighlight {
2870 kind: Some(lsp::DocumentHighlightKind::READ),
2871 range: lsp::Range::new(
2872 lsp::Position::new(0, 41),
2873 lsp::Position::new(0, 47),
2874 ),
2875 },
2876 ])
2877 },
2878 );
2879
2880 let highlights = project_b
2881 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
2882 .await
2883 .unwrap();
2884 buffer_b.read_with(cx_b, |buffer, _| {
2885 let snapshot = buffer.snapshot();
2886
2887 let highlights = highlights
2888 .into_iter()
2889 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2890 .collect::<Vec<_>>();
2891 assert_eq!(
2892 highlights,
2893 &[
2894 (lsp::DocumentHighlightKind::WRITE, 10..16),
2895 (lsp::DocumentHighlightKind::READ, 32..38),
2896 (lsp::DocumentHighlightKind::READ, 41..47)
2897 ]
2898 )
2899 });
2900 }
2901
2902 #[gpui::test(iterations = 10)]
2903 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2904 cx_a.foreground().forbid_parking();
2905 let mut lang_registry = Arc::new(LanguageRegistry::test());
2906 let fs = FakeFs::new(cx_a.background());
2907 fs.insert_tree(
2908 "/code",
2909 json!({
2910 "crate-1": {
2911 ".zed.toml": r#"collaborators = ["user_b"]"#,
2912 "one.rs": "const ONE: usize = 1;",
2913 },
2914 "crate-2": {
2915 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2916 },
2917 "private": {
2918 "passwords.txt": "the-password",
2919 }
2920 }),
2921 )
2922 .await;
2923
2924 // Set up a fake language server.
2925 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2926 Arc::get_mut(&mut lang_registry)
2927 .unwrap()
2928 .add(Arc::new(Language::new(
2929 LanguageConfig {
2930 name: "Rust".into(),
2931 path_suffixes: vec!["rs".to_string()],
2932 language_server: Some(language_server_config),
2933 ..Default::default()
2934 },
2935 Some(tree_sitter_rust::language()),
2936 )));
2937
2938 // Connect to a server as 2 clients.
2939 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2940 let client_a = server.create_client(cx_a, "user_a").await;
2941 let client_b = server.create_client(cx_b, "user_b").await;
2942
2943 // Share a project as client A
2944 let project_a = cx_a.update(|cx| {
2945 Project::local(
2946 client_a.clone(),
2947 client_a.user_store.clone(),
2948 lang_registry.clone(),
2949 fs.clone(),
2950 cx,
2951 )
2952 });
2953 let (worktree_a, _) = project_a
2954 .update(cx_a, |p, cx| {
2955 p.find_or_create_local_worktree("/code/crate-1", true, cx)
2956 })
2957 .await
2958 .unwrap();
2959 worktree_a
2960 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2961 .await;
2962 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2963 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2964 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2965
2966 // Join the worktree as client B.
2967 let project_b = Project::remote(
2968 project_id,
2969 client_b.clone(),
2970 client_b.user_store.clone(),
2971 lang_registry.clone(),
2972 fs.clone(),
2973 &mut cx_b.to_async(),
2974 )
2975 .await
2976 .unwrap();
2977
2978 // Cause the language server to start.
2979 let _buffer = cx_b
2980 .background()
2981 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2982 .await
2983 .unwrap();
2984
2985 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2986 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
2987 #[allow(deprecated)]
2988 Some(vec![lsp::SymbolInformation {
2989 name: "TWO".into(),
2990 location: lsp::Location {
2991 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
2992 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2993 },
2994 kind: lsp::SymbolKind::CONSTANT,
2995 tags: None,
2996 container_name: None,
2997 deprecated: None,
2998 }])
2999 });
3000
3001 // Request the definition of a symbol as the guest.
3002 let symbols = project_b
3003 .update(cx_b, |p, cx| p.symbols("two", cx))
3004 .await
3005 .unwrap();
3006 assert_eq!(symbols.len(), 1);
3007 assert_eq!(symbols[0].name, "TWO");
3008
3009 // Open one of the returned symbols.
3010 let buffer_b_2 = project_b
3011 .update(cx_b, |project, cx| {
3012 project.open_buffer_for_symbol(&symbols[0], cx)
3013 })
3014 .await
3015 .unwrap();
3016 buffer_b_2.read_with(cx_b, |buffer, _| {
3017 assert_eq!(
3018 buffer.file().unwrap().path().as_ref(),
3019 Path::new("../crate-2/two.rs")
3020 );
3021 });
3022
3023 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3024 let mut fake_symbol = symbols[0].clone();
3025 fake_symbol.path = Path::new("/code/secrets").into();
3026 let error = project_b
3027 .update(cx_b, |project, cx| {
3028 project.open_buffer_for_symbol(&fake_symbol, cx)
3029 })
3030 .await
3031 .unwrap_err();
3032 assert!(error.to_string().contains("invalid symbol signature"));
3033 }
3034
3035 #[gpui::test(iterations = 10)]
3036 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3037 cx_a: &mut TestAppContext,
3038 cx_b: &mut TestAppContext,
3039 mut rng: StdRng,
3040 ) {
3041 cx_a.foreground().forbid_parking();
3042 let mut lang_registry = Arc::new(LanguageRegistry::test());
3043 let fs = FakeFs::new(cx_a.background());
3044 fs.insert_tree(
3045 "/root",
3046 json!({
3047 ".zed.toml": r#"collaborators = ["user_b"]"#,
3048 "a.rs": "const ONE: usize = b::TWO;",
3049 "b.rs": "const TWO: usize = 2",
3050 }),
3051 )
3052 .await;
3053
3054 // Set up a fake language server.
3055 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3056
3057 Arc::get_mut(&mut lang_registry)
3058 .unwrap()
3059 .add(Arc::new(Language::new(
3060 LanguageConfig {
3061 name: "Rust".into(),
3062 path_suffixes: vec!["rs".to_string()],
3063 language_server: Some(language_server_config),
3064 ..Default::default()
3065 },
3066 Some(tree_sitter_rust::language()),
3067 )));
3068
3069 // Connect to a server as 2 clients.
3070 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3071 let client_a = server.create_client(cx_a, "user_a").await;
3072 let client_b = server.create_client(cx_b, "user_b").await;
3073
3074 // Share a project as client A
3075 let project_a = cx_a.update(|cx| {
3076 Project::local(
3077 client_a.clone(),
3078 client_a.user_store.clone(),
3079 lang_registry.clone(),
3080 fs.clone(),
3081 cx,
3082 )
3083 });
3084
3085 let (worktree_a, _) = project_a
3086 .update(cx_a, |p, cx| {
3087 p.find_or_create_local_worktree("/root", true, cx)
3088 })
3089 .await
3090 .unwrap();
3091 worktree_a
3092 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3093 .await;
3094 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3095 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3096 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3097
3098 // Join the worktree as client B.
3099 let project_b = Project::remote(
3100 project_id,
3101 client_b.clone(),
3102 client_b.user_store.clone(),
3103 lang_registry.clone(),
3104 fs.clone(),
3105 &mut cx_b.to_async(),
3106 )
3107 .await
3108 .unwrap();
3109
3110 let buffer_b1 = cx_b
3111 .background()
3112 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3113 .await
3114 .unwrap();
3115
3116 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3117 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3118 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3119 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3120 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3121 )))
3122 });
3123
3124 let definitions;
3125 let buffer_b2;
3126 if rng.gen() {
3127 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3128 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3129 } else {
3130 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3131 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3132 }
3133
3134 let buffer_b2 = buffer_b2.await.unwrap();
3135 let definitions = definitions.await.unwrap();
3136 assert_eq!(definitions.len(), 1);
3137 assert_eq!(definitions[0].buffer, buffer_b2);
3138 }
3139
3140 #[gpui::test(iterations = 10)]
3141 async fn test_collaborating_with_code_actions(
3142 cx_a: &mut TestAppContext,
3143 cx_b: &mut TestAppContext,
3144 ) {
3145 cx_a.foreground().forbid_parking();
3146 let mut lang_registry = Arc::new(LanguageRegistry::test());
3147 let fs = FakeFs::new(cx_a.background());
3148 let mut path_openers_b = Vec::new();
3149 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3150
3151 // Set up a fake language server.
3152 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3153 Arc::get_mut(&mut lang_registry)
3154 .unwrap()
3155 .add(Arc::new(Language::new(
3156 LanguageConfig {
3157 name: "Rust".into(),
3158 path_suffixes: vec!["rs".to_string()],
3159 language_server: Some(language_server_config),
3160 ..Default::default()
3161 },
3162 Some(tree_sitter_rust::language()),
3163 )));
3164
3165 // Connect to a server as 2 clients.
3166 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3167 let client_a = server.create_client(cx_a, "user_a").await;
3168 let client_b = server.create_client(cx_b, "user_b").await;
3169
3170 // Share a project as client A
3171 fs.insert_tree(
3172 "/a",
3173 json!({
3174 ".zed.toml": r#"collaborators = ["user_b"]"#,
3175 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3176 "other.rs": "pub fn foo() -> usize { 4 }",
3177 }),
3178 )
3179 .await;
3180 let project_a = cx_a.update(|cx| {
3181 Project::local(
3182 client_a.clone(),
3183 client_a.user_store.clone(),
3184 lang_registry.clone(),
3185 fs.clone(),
3186 cx,
3187 )
3188 });
3189 let (worktree_a, _) = project_a
3190 .update(cx_a, |p, cx| {
3191 p.find_or_create_local_worktree("/a", true, cx)
3192 })
3193 .await
3194 .unwrap();
3195 worktree_a
3196 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3197 .await;
3198 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3199 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3200 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3201
3202 // Join the worktree as client B.
3203 let project_b = Project::remote(
3204 project_id,
3205 client_b.clone(),
3206 client_b.user_store.clone(),
3207 lang_registry.clone(),
3208 fs.clone(),
3209 &mut cx_b.to_async(),
3210 )
3211 .await
3212 .unwrap();
3213 let mut params = cx_b.update(WorkspaceParams::test);
3214 params.languages = lang_registry.clone();
3215 params.client = client_b.client.clone();
3216 params.user_store = client_b.user_store.clone();
3217 params.project = project_b;
3218 params.path_openers = path_openers_b.into();
3219
3220 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3221 let editor_b = workspace_b
3222 .update(cx_b, |workspace, cx| {
3223 workspace.open_path((worktree_id, "main.rs").into(), cx)
3224 })
3225 .await
3226 .unwrap()
3227 .downcast::<Editor>()
3228 .unwrap();
3229
3230 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3231 fake_language_server
3232 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3233 assert_eq!(
3234 params.text_document.uri,
3235 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3236 );
3237 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3238 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3239 None
3240 })
3241 .next()
3242 .await;
3243
3244 // Move cursor to a location that contains code actions.
3245 editor_b.update(cx_b, |editor, cx| {
3246 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3247 cx.focus(&editor_b);
3248 });
3249
3250 fake_language_server
3251 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3252 assert_eq!(
3253 params.text_document.uri,
3254 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3255 );
3256 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3257 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3258
3259 Some(vec![lsp::CodeActionOrCommand::CodeAction(
3260 lsp::CodeAction {
3261 title: "Inline into all callers".to_string(),
3262 edit: Some(lsp::WorkspaceEdit {
3263 changes: Some(
3264 [
3265 (
3266 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3267 vec![lsp::TextEdit::new(
3268 lsp::Range::new(
3269 lsp::Position::new(1, 22),
3270 lsp::Position::new(1, 34),
3271 ),
3272 "4".to_string(),
3273 )],
3274 ),
3275 (
3276 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3277 vec![lsp::TextEdit::new(
3278 lsp::Range::new(
3279 lsp::Position::new(0, 0),
3280 lsp::Position::new(0, 27),
3281 ),
3282 "".to_string(),
3283 )],
3284 ),
3285 ]
3286 .into_iter()
3287 .collect(),
3288 ),
3289 ..Default::default()
3290 }),
3291 data: Some(json!({
3292 "codeActionParams": {
3293 "range": {
3294 "start": {"line": 1, "column": 31},
3295 "end": {"line": 1, "column": 31},
3296 }
3297 }
3298 })),
3299 ..Default::default()
3300 },
3301 )])
3302 })
3303 .next()
3304 .await;
3305
3306 // Toggle code actions and wait for them to display.
3307 editor_b.update(cx_b, |editor, cx| {
3308 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3309 });
3310 editor_b
3311 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3312 .await;
3313
3314 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3315
3316 // Confirming the code action will trigger a resolve request.
3317 let confirm_action = workspace_b
3318 .update(cx_b, |workspace, cx| {
3319 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3320 })
3321 .unwrap();
3322 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3323 lsp::CodeAction {
3324 title: "Inline into all callers".to_string(),
3325 edit: Some(lsp::WorkspaceEdit {
3326 changes: Some(
3327 [
3328 (
3329 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3330 vec![lsp::TextEdit::new(
3331 lsp::Range::new(
3332 lsp::Position::new(1, 22),
3333 lsp::Position::new(1, 34),
3334 ),
3335 "4".to_string(),
3336 )],
3337 ),
3338 (
3339 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3340 vec![lsp::TextEdit::new(
3341 lsp::Range::new(
3342 lsp::Position::new(0, 0),
3343 lsp::Position::new(0, 27),
3344 ),
3345 "".to_string(),
3346 )],
3347 ),
3348 ]
3349 .into_iter()
3350 .collect(),
3351 ),
3352 ..Default::default()
3353 }),
3354 ..Default::default()
3355 }
3356 });
3357
3358 // After the action is confirmed, an editor containing both modified files is opened.
3359 confirm_action.await.unwrap();
3360 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3361 workspace
3362 .active_item(cx)
3363 .unwrap()
3364 .downcast::<Editor>()
3365 .unwrap()
3366 });
3367 code_action_editor.update(cx_b, |editor, cx| {
3368 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3369 editor.undo(&Undo, cx);
3370 assert_eq!(
3371 editor.text(cx),
3372 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3373 );
3374 editor.redo(&Redo, cx);
3375 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3376 });
3377 }
3378
3379 #[gpui::test(iterations = 10)]
3380 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3381 cx_a.foreground().forbid_parking();
3382 let mut lang_registry = Arc::new(LanguageRegistry::test());
3383 let fs = FakeFs::new(cx_a.background());
3384 let mut path_openers_b = Vec::new();
3385 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3386
3387 // Set up a fake language server.
3388 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3389 Arc::get_mut(&mut lang_registry)
3390 .unwrap()
3391 .add(Arc::new(Language::new(
3392 LanguageConfig {
3393 name: "Rust".into(),
3394 path_suffixes: vec!["rs".to_string()],
3395 language_server: Some(language_server_config),
3396 ..Default::default()
3397 },
3398 Some(tree_sitter_rust::language()),
3399 )));
3400
3401 // Connect to a server as 2 clients.
3402 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3403 let client_a = server.create_client(cx_a, "user_a").await;
3404 let client_b = server.create_client(cx_b, "user_b").await;
3405
3406 // Share a project as client A
3407 fs.insert_tree(
3408 "/dir",
3409 json!({
3410 ".zed.toml": r#"collaborators = ["user_b"]"#,
3411 "one.rs": "const ONE: usize = 1;",
3412 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3413 }),
3414 )
3415 .await;
3416 let project_a = cx_a.update(|cx| {
3417 Project::local(
3418 client_a.clone(),
3419 client_a.user_store.clone(),
3420 lang_registry.clone(),
3421 fs.clone(),
3422 cx,
3423 )
3424 });
3425 let (worktree_a, _) = project_a
3426 .update(cx_a, |p, cx| {
3427 p.find_or_create_local_worktree("/dir", true, cx)
3428 })
3429 .await
3430 .unwrap();
3431 worktree_a
3432 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3433 .await;
3434 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3435 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3436 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3437
3438 // Join the worktree as client B.
3439 let project_b = Project::remote(
3440 project_id,
3441 client_b.clone(),
3442 client_b.user_store.clone(),
3443 lang_registry.clone(),
3444 fs.clone(),
3445 &mut cx_b.to_async(),
3446 )
3447 .await
3448 .unwrap();
3449 let mut params = cx_b.update(WorkspaceParams::test);
3450 params.languages = lang_registry.clone();
3451 params.client = client_b.client.clone();
3452 params.user_store = client_b.user_store.clone();
3453 params.project = project_b;
3454 params.path_openers = path_openers_b.into();
3455
3456 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3457 let editor_b = workspace_b
3458 .update(cx_b, |workspace, cx| {
3459 workspace.open_path((worktree_id, "one.rs").into(), cx)
3460 })
3461 .await
3462 .unwrap()
3463 .downcast::<Editor>()
3464 .unwrap();
3465 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3466
3467 // Move cursor to a location that can be renamed.
3468 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3469 editor.select_ranges([7..7], None, cx);
3470 editor.rename(&Rename, cx).unwrap()
3471 });
3472
3473 fake_language_server
3474 .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3475 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3476 assert_eq!(params.position, lsp::Position::new(0, 7));
3477 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3478 lsp::Position::new(0, 6),
3479 lsp::Position::new(0, 9),
3480 )))
3481 })
3482 .next()
3483 .await
3484 .unwrap();
3485 prepare_rename.await.unwrap();
3486 editor_b.update(cx_b, |editor, cx| {
3487 let rename = editor.pending_rename().unwrap();
3488 let buffer = editor.buffer().read(cx).snapshot(cx);
3489 assert_eq!(
3490 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3491 6..9
3492 );
3493 rename.editor.update(cx, |rename_editor, cx| {
3494 rename_editor.buffer().update(cx, |rename_buffer, cx| {
3495 rename_buffer.edit([0..3], "THREE", cx);
3496 });
3497 });
3498 });
3499
3500 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3501 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3502 });
3503 fake_language_server
3504 .handle_request::<lsp::request::Rename, _>(|params, _| {
3505 assert_eq!(
3506 params.text_document_position.text_document.uri.as_str(),
3507 "file:///dir/one.rs"
3508 );
3509 assert_eq!(
3510 params.text_document_position.position,
3511 lsp::Position::new(0, 6)
3512 );
3513 assert_eq!(params.new_name, "THREE");
3514 Some(lsp::WorkspaceEdit {
3515 changes: Some(
3516 [
3517 (
3518 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3519 vec![lsp::TextEdit::new(
3520 lsp::Range::new(
3521 lsp::Position::new(0, 6),
3522 lsp::Position::new(0, 9),
3523 ),
3524 "THREE".to_string(),
3525 )],
3526 ),
3527 (
3528 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3529 vec![
3530 lsp::TextEdit::new(
3531 lsp::Range::new(
3532 lsp::Position::new(0, 24),
3533 lsp::Position::new(0, 27),
3534 ),
3535 "THREE".to_string(),
3536 ),
3537 lsp::TextEdit::new(
3538 lsp::Range::new(
3539 lsp::Position::new(0, 35),
3540 lsp::Position::new(0, 38),
3541 ),
3542 "THREE".to_string(),
3543 ),
3544 ],
3545 ),
3546 ]
3547 .into_iter()
3548 .collect(),
3549 ),
3550 ..Default::default()
3551 })
3552 })
3553 .next()
3554 .await
3555 .unwrap();
3556 confirm_rename.await.unwrap();
3557
3558 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3559 workspace
3560 .active_item(cx)
3561 .unwrap()
3562 .downcast::<Editor>()
3563 .unwrap()
3564 });
3565 rename_editor.update(cx_b, |editor, cx| {
3566 assert_eq!(
3567 editor.text(cx),
3568 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3569 );
3570 editor.undo(&Undo, cx);
3571 assert_eq!(
3572 editor.text(cx),
3573 "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3574 );
3575 editor.redo(&Redo, cx);
3576 assert_eq!(
3577 editor.text(cx),
3578 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3579 );
3580 });
3581
3582 // Ensure temporary rename edits cannot be undone/redone.
3583 editor_b.update(cx_b, |editor, cx| {
3584 editor.undo(&Undo, cx);
3585 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3586 editor.undo(&Undo, cx);
3587 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3588 editor.redo(&Redo, cx);
3589 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3590 })
3591 }
3592
3593 #[gpui::test(iterations = 10)]
3594 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3595 cx_a.foreground().forbid_parking();
3596
3597 // Connect to a server as 2 clients.
3598 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3599 let client_a = server.create_client(cx_a, "user_a").await;
3600 let client_b = server.create_client(cx_b, "user_b").await;
3601
3602 // Create an org that includes these 2 users.
3603 let db = &server.app_state.db;
3604 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3605 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3606 .await
3607 .unwrap();
3608 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3609 .await
3610 .unwrap();
3611
3612 // Create a channel that includes all the users.
3613 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3614 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3615 .await
3616 .unwrap();
3617 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3618 .await
3619 .unwrap();
3620 db.create_channel_message(
3621 channel_id,
3622 client_b.current_user_id(&cx_b),
3623 "hello A, it's B.",
3624 OffsetDateTime::now_utc(),
3625 1,
3626 )
3627 .await
3628 .unwrap();
3629
3630 let channels_a = cx_a
3631 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3632 channels_a
3633 .condition(cx_a, |list, _| list.available_channels().is_some())
3634 .await;
3635 channels_a.read_with(cx_a, |list, _| {
3636 assert_eq!(
3637 list.available_channels().unwrap(),
3638 &[ChannelDetails {
3639 id: channel_id.to_proto(),
3640 name: "test-channel".to_string()
3641 }]
3642 )
3643 });
3644 let channel_a = channels_a.update(cx_a, |this, cx| {
3645 this.get_channel(channel_id.to_proto(), cx).unwrap()
3646 });
3647 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3648 channel_a
3649 .condition(&cx_a, |channel, _| {
3650 channel_messages(channel)
3651 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3652 })
3653 .await;
3654
3655 let channels_b = cx_b
3656 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3657 channels_b
3658 .condition(cx_b, |list, _| list.available_channels().is_some())
3659 .await;
3660 channels_b.read_with(cx_b, |list, _| {
3661 assert_eq!(
3662 list.available_channels().unwrap(),
3663 &[ChannelDetails {
3664 id: channel_id.to_proto(),
3665 name: "test-channel".to_string()
3666 }]
3667 )
3668 });
3669
3670 let channel_b = channels_b.update(cx_b, |this, cx| {
3671 this.get_channel(channel_id.to_proto(), cx).unwrap()
3672 });
3673 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3674 channel_b
3675 .condition(&cx_b, |channel, _| {
3676 channel_messages(channel)
3677 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3678 })
3679 .await;
3680
3681 channel_a
3682 .update(cx_a, |channel, cx| {
3683 channel
3684 .send_message("oh, hi B.".to_string(), cx)
3685 .unwrap()
3686 .detach();
3687 let task = channel.send_message("sup".to_string(), cx).unwrap();
3688 assert_eq!(
3689 channel_messages(channel),
3690 &[
3691 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3692 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3693 ("user_a".to_string(), "sup".to_string(), true)
3694 ]
3695 );
3696 task
3697 })
3698 .await
3699 .unwrap();
3700
3701 channel_b
3702 .condition(&cx_b, |channel, _| {
3703 channel_messages(channel)
3704 == [
3705 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3706 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3707 ("user_a".to_string(), "sup".to_string(), false),
3708 ]
3709 })
3710 .await;
3711
3712 assert_eq!(
3713 server
3714 .state()
3715 .await
3716 .channel(channel_id)
3717 .unwrap()
3718 .connection_ids
3719 .len(),
3720 2
3721 );
3722 cx_b.update(|_| drop(channel_b));
3723 server
3724 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3725 .await;
3726
3727 cx_a.update(|_| drop(channel_a));
3728 server
3729 .condition(|state| state.channel(channel_id).is_none())
3730 .await;
3731 }
3732
3733 #[gpui::test(iterations = 10)]
3734 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3735 cx_a.foreground().forbid_parking();
3736
3737 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3738 let client_a = server.create_client(cx_a, "user_a").await;
3739
3740 let db = &server.app_state.db;
3741 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3742 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3743 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3744 .await
3745 .unwrap();
3746 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3747 .await
3748 .unwrap();
3749
3750 let channels_a = cx_a
3751 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3752 channels_a
3753 .condition(cx_a, |list, _| list.available_channels().is_some())
3754 .await;
3755 let channel_a = channels_a.update(cx_a, |this, cx| {
3756 this.get_channel(channel_id.to_proto(), cx).unwrap()
3757 });
3758
3759 // Messages aren't allowed to be too long.
3760 channel_a
3761 .update(cx_a, |channel, cx| {
3762 let long_body = "this is long.\n".repeat(1024);
3763 channel.send_message(long_body, cx).unwrap()
3764 })
3765 .await
3766 .unwrap_err();
3767
3768 // Messages aren't allowed to be blank.
3769 channel_a.update(cx_a, |channel, cx| {
3770 channel.send_message(String::new(), cx).unwrap_err()
3771 });
3772
3773 // Leading and trailing whitespace are trimmed.
3774 channel_a
3775 .update(cx_a, |channel, cx| {
3776 channel
3777 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3778 .unwrap()
3779 })
3780 .await
3781 .unwrap();
3782 assert_eq!(
3783 db.get_channel_messages(channel_id, 10, None)
3784 .await
3785 .unwrap()
3786 .iter()
3787 .map(|m| &m.body)
3788 .collect::<Vec<_>>(),
3789 &["surrounded by whitespace"]
3790 );
3791 }
3792
3793 #[gpui::test(iterations = 10)]
3794 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3795 cx_a.foreground().forbid_parking();
3796
3797 // Connect to a server as 2 clients.
3798 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3799 let client_a = server.create_client(cx_a, "user_a").await;
3800 let client_b = server.create_client(cx_b, "user_b").await;
3801 let mut status_b = client_b.status();
3802
3803 // Create an org that includes these 2 users.
3804 let db = &server.app_state.db;
3805 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3806 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3807 .await
3808 .unwrap();
3809 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3810 .await
3811 .unwrap();
3812
3813 // Create a channel that includes all the users.
3814 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3815 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3816 .await
3817 .unwrap();
3818 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3819 .await
3820 .unwrap();
3821 db.create_channel_message(
3822 channel_id,
3823 client_b.current_user_id(&cx_b),
3824 "hello A, it's B.",
3825 OffsetDateTime::now_utc(),
3826 2,
3827 )
3828 .await
3829 .unwrap();
3830
3831 let channels_a = cx_a
3832 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3833 channels_a
3834 .condition(cx_a, |list, _| list.available_channels().is_some())
3835 .await;
3836
3837 channels_a.read_with(cx_a, |list, _| {
3838 assert_eq!(
3839 list.available_channels().unwrap(),
3840 &[ChannelDetails {
3841 id: channel_id.to_proto(),
3842 name: "test-channel".to_string()
3843 }]
3844 )
3845 });
3846 let channel_a = channels_a.update(cx_a, |this, cx| {
3847 this.get_channel(channel_id.to_proto(), cx).unwrap()
3848 });
3849 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3850 channel_a
3851 .condition(&cx_a, |channel, _| {
3852 channel_messages(channel)
3853 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3854 })
3855 .await;
3856
3857 let channels_b = cx_b
3858 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3859 channels_b
3860 .condition(cx_b, |list, _| list.available_channels().is_some())
3861 .await;
3862 channels_b.read_with(cx_b, |list, _| {
3863 assert_eq!(
3864 list.available_channels().unwrap(),
3865 &[ChannelDetails {
3866 id: channel_id.to_proto(),
3867 name: "test-channel".to_string()
3868 }]
3869 )
3870 });
3871
3872 let channel_b = channels_b.update(cx_b, |this, cx| {
3873 this.get_channel(channel_id.to_proto(), cx).unwrap()
3874 });
3875 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3876 channel_b
3877 .condition(&cx_b, |channel, _| {
3878 channel_messages(channel)
3879 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3880 })
3881 .await;
3882
3883 // Disconnect client B, ensuring we can still access its cached channel data.
3884 server.forbid_connections();
3885 server.disconnect_client(client_b.current_user_id(&cx_b));
3886 cx_b.foreground().advance_clock(Duration::from_secs(3));
3887 while !matches!(
3888 status_b.next().await,
3889 Some(client::Status::ReconnectionError { .. })
3890 ) {}
3891
3892 channels_b.read_with(cx_b, |channels, _| {
3893 assert_eq!(
3894 channels.available_channels().unwrap(),
3895 [ChannelDetails {
3896 id: channel_id.to_proto(),
3897 name: "test-channel".to_string()
3898 }]
3899 )
3900 });
3901 channel_b.read_with(cx_b, |channel, _| {
3902 assert_eq!(
3903 channel_messages(channel),
3904 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3905 )
3906 });
3907
3908 // Send a message from client B while it is disconnected.
3909 channel_b
3910 .update(cx_b, |channel, cx| {
3911 let task = channel
3912 .send_message("can you see this?".to_string(), cx)
3913 .unwrap();
3914 assert_eq!(
3915 channel_messages(channel),
3916 &[
3917 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3918 ("user_b".to_string(), "can you see this?".to_string(), true)
3919 ]
3920 );
3921 task
3922 })
3923 .await
3924 .unwrap_err();
3925
3926 // Send a message from client A while B is disconnected.
3927 channel_a
3928 .update(cx_a, |channel, cx| {
3929 channel
3930 .send_message("oh, hi B.".to_string(), cx)
3931 .unwrap()
3932 .detach();
3933 let task = channel.send_message("sup".to_string(), cx).unwrap();
3934 assert_eq!(
3935 channel_messages(channel),
3936 &[
3937 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3938 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3939 ("user_a".to_string(), "sup".to_string(), true)
3940 ]
3941 );
3942 task
3943 })
3944 .await
3945 .unwrap();
3946
3947 // Give client B a chance to reconnect.
3948 server.allow_connections();
3949 cx_b.foreground().advance_clock(Duration::from_secs(10));
3950
3951 // Verify that B sees the new messages upon reconnection, as well as the message client B
3952 // sent while offline.
3953 channel_b
3954 .condition(&cx_b, |channel, _| {
3955 channel_messages(channel)
3956 == [
3957 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3958 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3959 ("user_a".to_string(), "sup".to_string(), false),
3960 ("user_b".to_string(), "can you see this?".to_string(), false),
3961 ]
3962 })
3963 .await;
3964
3965 // Ensure client A and B can communicate normally after reconnection.
3966 channel_a
3967 .update(cx_a, |channel, cx| {
3968 channel.send_message("you online?".to_string(), cx).unwrap()
3969 })
3970 .await
3971 .unwrap();
3972 channel_b
3973 .condition(&cx_b, |channel, _| {
3974 channel_messages(channel)
3975 == [
3976 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3977 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3978 ("user_a".to_string(), "sup".to_string(), false),
3979 ("user_b".to_string(), "can you see this?".to_string(), false),
3980 ("user_a".to_string(), "you online?".to_string(), false),
3981 ]
3982 })
3983 .await;
3984
3985 channel_b
3986 .update(cx_b, |channel, cx| {
3987 channel.send_message("yep".to_string(), cx).unwrap()
3988 })
3989 .await
3990 .unwrap();
3991 channel_a
3992 .condition(&cx_a, |channel, _| {
3993 channel_messages(channel)
3994 == [
3995 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3996 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3997 ("user_a".to_string(), "sup".to_string(), false),
3998 ("user_b".to_string(), "can you see this?".to_string(), false),
3999 ("user_a".to_string(), "you online?".to_string(), false),
4000 ("user_b".to_string(), "yep".to_string(), false),
4001 ]
4002 })
4003 .await;
4004 }
4005
4006 #[gpui::test(iterations = 10)]
4007 async fn test_contacts(
4008 cx_a: &mut TestAppContext,
4009 cx_b: &mut TestAppContext,
4010 cx_c: &mut TestAppContext,
4011 ) {
4012 cx_a.foreground().forbid_parking();
4013 let lang_registry = Arc::new(LanguageRegistry::test());
4014 let fs = FakeFs::new(cx_a.background());
4015
4016 // Connect to a server as 3 clients.
4017 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4018 let client_a = server.create_client(cx_a, "user_a").await;
4019 let client_b = server.create_client(cx_b, "user_b").await;
4020 let client_c = server.create_client(cx_c, "user_c").await;
4021
4022 // Share a worktree as client A.
4023 fs.insert_tree(
4024 "/a",
4025 json!({
4026 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4027 }),
4028 )
4029 .await;
4030
4031 let project_a = cx_a.update(|cx| {
4032 Project::local(
4033 client_a.clone(),
4034 client_a.user_store.clone(),
4035 lang_registry.clone(),
4036 fs.clone(),
4037 cx,
4038 )
4039 });
4040 let (worktree_a, _) = project_a
4041 .update(cx_a, |p, cx| {
4042 p.find_or_create_local_worktree("/a", true, cx)
4043 })
4044 .await
4045 .unwrap();
4046 worktree_a
4047 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4048 .await;
4049
4050 client_a
4051 .user_store
4052 .condition(&cx_a, |user_store, _| {
4053 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4054 })
4055 .await;
4056 client_b
4057 .user_store
4058 .condition(&cx_b, |user_store, _| {
4059 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4060 })
4061 .await;
4062 client_c
4063 .user_store
4064 .condition(&cx_c, |user_store, _| {
4065 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4066 })
4067 .await;
4068
4069 let project_id = project_a
4070 .update(cx_a, |project, _| project.next_remote_id())
4071 .await;
4072 project_a
4073 .update(cx_a, |project, cx| project.share(cx))
4074 .await
4075 .unwrap();
4076
4077 let _project_b = Project::remote(
4078 project_id,
4079 client_b.clone(),
4080 client_b.user_store.clone(),
4081 lang_registry.clone(),
4082 fs.clone(),
4083 &mut cx_b.to_async(),
4084 )
4085 .await
4086 .unwrap();
4087
4088 client_a
4089 .user_store
4090 .condition(&cx_a, |user_store, _| {
4091 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4092 })
4093 .await;
4094 client_b
4095 .user_store
4096 .condition(&cx_b, |user_store, _| {
4097 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4098 })
4099 .await;
4100 client_c
4101 .user_store
4102 .condition(&cx_c, |user_store, _| {
4103 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4104 })
4105 .await;
4106
4107 project_a
4108 .condition(&cx_a, |project, _| {
4109 project.collaborators().contains_key(&client_b.peer_id)
4110 })
4111 .await;
4112
4113 cx_a.update(move |_| drop(project_a));
4114 client_a
4115 .user_store
4116 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4117 .await;
4118 client_b
4119 .user_store
4120 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4121 .await;
4122 client_c
4123 .user_store
4124 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4125 .await;
4126
4127 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4128 user_store
4129 .contacts()
4130 .iter()
4131 .map(|contact| {
4132 let worktrees = contact
4133 .projects
4134 .iter()
4135 .map(|p| {
4136 (
4137 p.worktree_root_names[0].as_str(),
4138 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4139 )
4140 })
4141 .collect();
4142 (contact.user.github_login.as_str(), worktrees)
4143 })
4144 .collect()
4145 }
4146 }
4147
4148 #[gpui::test(iterations = 100)]
4149 async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4150 cx.foreground().forbid_parking();
4151 let max_peers = env::var("MAX_PEERS")
4152 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4153 .unwrap_or(5);
4154 let max_operations = env::var("OPERATIONS")
4155 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4156 .unwrap_or(10);
4157
4158 let rng = Arc::new(Mutex::new(rng));
4159
4160 let guest_lang_registry = Arc::new(LanguageRegistry::test());
4161 let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4162
4163 let fs = FakeFs::new(cx.background());
4164 fs.insert_tree(
4165 "/_collab",
4166 json!({
4167 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4168 }),
4169 )
4170 .await;
4171
4172 let operations = Rc::new(Cell::new(0));
4173 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4174 let mut clients = Vec::new();
4175
4176 let mut next_entity_id = 100000;
4177 let mut host_cx = TestAppContext::new(
4178 cx.foreground_platform(),
4179 cx.platform(),
4180 cx.foreground(),
4181 cx.background(),
4182 cx.font_cache(),
4183 cx.leak_detector(),
4184 next_entity_id,
4185 );
4186 let host = server.create_client(&mut host_cx, "host").await;
4187 let host_project = host_cx.update(|cx| {
4188 Project::local(
4189 host.client.clone(),
4190 host.user_store.clone(),
4191 Arc::new(LanguageRegistry::test()),
4192 fs.clone(),
4193 cx,
4194 )
4195 });
4196 let host_project_id = host_project
4197 .update(&mut host_cx, |p, _| p.next_remote_id())
4198 .await;
4199
4200 let (collab_worktree, _) = host_project
4201 .update(&mut host_cx, |project, cx| {
4202 project.find_or_create_local_worktree("/_collab", true, cx)
4203 })
4204 .await
4205 .unwrap();
4206 collab_worktree
4207 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4208 .await;
4209 host_project
4210 .update(&mut host_cx, |project, cx| project.share(cx))
4211 .await
4212 .unwrap();
4213
4214 clients.push(cx.foreground().spawn(host.simulate_host(
4215 host_project,
4216 language_server_config,
4217 operations.clone(),
4218 max_operations,
4219 rng.clone(),
4220 host_cx,
4221 )));
4222
4223 while operations.get() < max_operations {
4224 cx.background().simulate_random_delay().await;
4225 if clients.len() >= max_peers {
4226 break;
4227 } else if rng.lock().gen_bool(0.05) {
4228 operations.set(operations.get() + 1);
4229
4230 let guest_id = clients.len();
4231 log::info!("Adding guest {}", guest_id);
4232 next_entity_id += 100000;
4233 let mut guest_cx = TestAppContext::new(
4234 cx.foreground_platform(),
4235 cx.platform(),
4236 cx.foreground(),
4237 cx.background(),
4238 cx.font_cache(),
4239 cx.leak_detector(),
4240 next_entity_id,
4241 );
4242 let guest = server
4243 .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4244 .await;
4245 let guest_project = Project::remote(
4246 host_project_id,
4247 guest.client.clone(),
4248 guest.user_store.clone(),
4249 guest_lang_registry.clone(),
4250 FakeFs::new(cx.background()),
4251 &mut guest_cx.to_async(),
4252 )
4253 .await
4254 .unwrap();
4255 clients.push(cx.foreground().spawn(guest.simulate_guest(
4256 guest_id,
4257 guest_project,
4258 operations.clone(),
4259 max_operations,
4260 rng.clone(),
4261 guest_cx,
4262 )));
4263
4264 log::info!("Guest {} added", guest_id);
4265 }
4266 }
4267
4268 let mut clients = futures::future::join_all(clients).await;
4269 cx.foreground().run_until_parked();
4270
4271 let (host_client, mut host_cx) = clients.remove(0);
4272 let host_project = host_client.project.as_ref().unwrap();
4273 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4274 project
4275 .worktrees(cx)
4276 .map(|worktree| {
4277 let snapshot = worktree.read(cx).snapshot();
4278 (snapshot.id(), snapshot)
4279 })
4280 .collect::<BTreeMap<_, _>>()
4281 });
4282
4283 host_client
4284 .project
4285 .as_ref()
4286 .unwrap()
4287 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4288
4289 for (guest_client, mut guest_cx) in clients.into_iter() {
4290 let guest_id = guest_client.client.id();
4291 let worktree_snapshots =
4292 guest_client
4293 .project
4294 .as_ref()
4295 .unwrap()
4296 .read_with(&guest_cx, |project, cx| {
4297 project
4298 .worktrees(cx)
4299 .map(|worktree| {
4300 let worktree = worktree.read(cx);
4301 (worktree.id(), worktree.snapshot())
4302 })
4303 .collect::<BTreeMap<_, _>>()
4304 });
4305
4306 assert_eq!(
4307 worktree_snapshots.keys().collect::<Vec<_>>(),
4308 host_worktree_snapshots.keys().collect::<Vec<_>>(),
4309 "guest {} has different worktrees than the host",
4310 guest_id
4311 );
4312 for (id, host_snapshot) in &host_worktree_snapshots {
4313 let guest_snapshot = &worktree_snapshots[id];
4314 assert_eq!(
4315 guest_snapshot.root_name(),
4316 host_snapshot.root_name(),
4317 "guest {} has different root name than the host for worktree {}",
4318 guest_id,
4319 id
4320 );
4321 assert_eq!(
4322 guest_snapshot.entries(false).collect::<Vec<_>>(),
4323 host_snapshot.entries(false).collect::<Vec<_>>(),
4324 "guest {} has different snapshot than the host for worktree {}",
4325 guest_id,
4326 id
4327 );
4328 }
4329
4330 guest_client
4331 .project
4332 .as_ref()
4333 .unwrap()
4334 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4335
4336 for guest_buffer in &guest_client.buffers {
4337 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4338 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4339 project.buffer_for_id(buffer_id, cx).expect(&format!(
4340 "host does not have buffer for guest:{}, peer:{}, id:{}",
4341 guest_id, guest_client.peer_id, buffer_id
4342 ))
4343 });
4344 let path = host_buffer
4345 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4346
4347 assert_eq!(
4348 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4349 0,
4350 "guest {}, buffer {}, path {:?} has deferred operations",
4351 guest_id,
4352 buffer_id,
4353 path,
4354 );
4355 assert_eq!(
4356 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4357 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4358 "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4359 guest_id,
4360 buffer_id,
4361 path
4362 );
4363 }
4364
4365 guest_cx.update(|_| drop(guest_client));
4366 }
4367
4368 host_cx.update(|_| drop(host_client));
4369 }
4370
4371 struct TestServer {
4372 peer: Arc<Peer>,
4373 app_state: Arc<AppState>,
4374 server: Arc<Server>,
4375 foreground: Rc<executor::Foreground>,
4376 notifications: mpsc::UnboundedReceiver<()>,
4377 connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4378 forbid_connections: Arc<AtomicBool>,
4379 _test_db: TestDb,
4380 }
4381
4382 impl TestServer {
4383 async fn start(
4384 foreground: Rc<executor::Foreground>,
4385 background: Arc<executor::Background>,
4386 ) -> Self {
4387 let test_db = TestDb::fake(background);
4388 let app_state = Self::build_app_state(&test_db).await;
4389 let peer = Peer::new();
4390 let notifications = mpsc::unbounded();
4391 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4392 Self {
4393 peer,
4394 app_state,
4395 server,
4396 foreground,
4397 notifications: notifications.1,
4398 connection_killers: Default::default(),
4399 forbid_connections: Default::default(),
4400 _test_db: test_db,
4401 }
4402 }
4403
4404 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4405 let http = FakeHttpClient::with_404_response();
4406 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4407 let client_name = name.to_string();
4408 let mut client = Client::new(http.clone());
4409 let server = self.server.clone();
4410 let connection_killers = self.connection_killers.clone();
4411 let forbid_connections = self.forbid_connections.clone();
4412 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4413
4414 Arc::get_mut(&mut client)
4415 .unwrap()
4416 .override_authenticate(move |cx| {
4417 cx.spawn(|_| async move {
4418 let access_token = "the-token".to_string();
4419 Ok(Credentials {
4420 user_id: user_id.0 as u64,
4421 access_token,
4422 })
4423 })
4424 })
4425 .override_establish_connection(move |credentials, cx| {
4426 assert_eq!(credentials.user_id, user_id.0 as u64);
4427 assert_eq!(credentials.access_token, "the-token");
4428
4429 let server = server.clone();
4430 let connection_killers = connection_killers.clone();
4431 let forbid_connections = forbid_connections.clone();
4432 let client_name = client_name.clone();
4433 let connection_id_tx = connection_id_tx.clone();
4434 cx.spawn(move |cx| async move {
4435 if forbid_connections.load(SeqCst) {
4436 Err(EstablishConnectionError::other(anyhow!(
4437 "server is forbidding connections"
4438 )))
4439 } else {
4440 let (client_conn, server_conn, kill_conn) =
4441 Connection::in_memory(cx.background());
4442 connection_killers.lock().insert(user_id, kill_conn);
4443 cx.background()
4444 .spawn(server.handle_connection(
4445 server_conn,
4446 client_name,
4447 user_id,
4448 Some(connection_id_tx),
4449 cx.background(),
4450 ))
4451 .detach();
4452 Ok(client_conn)
4453 }
4454 })
4455 });
4456
4457 client
4458 .authenticate_and_connect(&cx.to_async())
4459 .await
4460 .unwrap();
4461
4462 Channel::init(&client);
4463 Project::init(&client);
4464
4465 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4466 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4467
4468 let client = TestClient {
4469 client,
4470 peer_id,
4471 user_store,
4472 project: Default::default(),
4473 buffers: Default::default(),
4474 };
4475 client.wait_for_current_user(cx).await;
4476 client
4477 }
4478
4479 fn disconnect_client(&self, user_id: UserId) {
4480 self.connection_killers.lock().remove(&user_id);
4481 }
4482
4483 fn forbid_connections(&self) {
4484 self.forbid_connections.store(true, SeqCst);
4485 }
4486
4487 fn allow_connections(&self) {
4488 self.forbid_connections.store(false, SeqCst);
4489 }
4490
4491 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4492 let mut config = Config::default();
4493 config.session_secret = "a".repeat(32);
4494 config.database_url = test_db.url.clone();
4495 let github_client = github::AppClient::test();
4496 Arc::new(AppState {
4497 db: test_db.db().clone(),
4498 handlebars: Default::default(),
4499 auth_client: auth::build_client("", ""),
4500 repo_client: github::RepoClient::test(&github_client),
4501 github_client,
4502 config,
4503 })
4504 }
4505
4506 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4507 self.server.store.read()
4508 }
4509
4510 async fn condition<F>(&mut self, mut predicate: F)
4511 where
4512 F: FnMut(&Store) -> bool,
4513 {
4514 async_std::future::timeout(Duration::from_millis(500), async {
4515 while !(predicate)(&*self.server.store.read()) {
4516 self.foreground.start_waiting();
4517 self.notifications.next().await;
4518 self.foreground.finish_waiting();
4519 }
4520 })
4521 .await
4522 .expect("condition timed out");
4523 }
4524 }
4525
4526 impl Drop for TestServer {
4527 fn drop(&mut self) {
4528 self.peer.reset();
4529 }
4530 }
4531
4532 struct TestClient {
4533 client: Arc<Client>,
4534 pub peer_id: PeerId,
4535 pub user_store: ModelHandle<UserStore>,
4536 project: Option<ModelHandle<Project>>,
4537 buffers: HashSet<ModelHandle<language::Buffer>>,
4538 }
4539
4540 impl Deref for TestClient {
4541 type Target = Arc<Client>;
4542
4543 fn deref(&self) -> &Self::Target {
4544 &self.client
4545 }
4546 }
4547
4548 impl TestClient {
4549 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4550 UserId::from_proto(
4551 self.user_store
4552 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4553 )
4554 }
4555
4556 async fn wait_for_current_user(&self, cx: &TestAppContext) {
4557 let mut authed_user = self
4558 .user_store
4559 .read_with(cx, |user_store, _| user_store.watch_current_user());
4560 while authed_user.next().await.unwrap().is_none() {}
4561 }
4562
4563 fn simulate_host(
4564 mut self,
4565 project: ModelHandle<Project>,
4566 mut language_server_config: LanguageServerConfig,
4567 operations: Rc<Cell<usize>>,
4568 max_operations: usize,
4569 rng: Arc<Mutex<StdRng>>,
4570 mut cx: TestAppContext,
4571 ) -> impl Future<Output = (Self, TestAppContext)> {
4572 let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4573
4574 // Set up a fake language server.
4575 language_server_config.set_fake_initializer({
4576 let rng = rng.clone();
4577 let files = files.clone();
4578 let project = project.downgrade();
4579 move |fake_server| {
4580 fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4581 Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4582 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4583 range: lsp::Range::new(
4584 lsp::Position::new(0, 0),
4585 lsp::Position::new(0, 0),
4586 ),
4587 new_text: "the-new-text".to_string(),
4588 })),
4589 ..Default::default()
4590 }]))
4591 });
4592
4593 fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4594 Some(vec![lsp::CodeActionOrCommand::CodeAction(
4595 lsp::CodeAction {
4596 title: "the-code-action".to_string(),
4597 ..Default::default()
4598 },
4599 )])
4600 });
4601
4602 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4603 |params, _| {
4604 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4605 params.position,
4606 params.position,
4607 )))
4608 },
4609 );
4610
4611 fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4612 let files = files.clone();
4613 let rng = rng.clone();
4614 move |_, _| {
4615 let files = files.lock();
4616 let mut rng = rng.lock();
4617 let count = rng.gen_range::<usize, _>(1..3);
4618 let files = (0..count)
4619 .map(|_| files.choose(&mut *rng).unwrap())
4620 .collect::<Vec<_>>();
4621 log::info!("LSP: Returning definitions in files {:?}", &files);
4622 Some(lsp::GotoDefinitionResponse::Array(
4623 files
4624 .into_iter()
4625 .map(|file| lsp::Location {
4626 uri: lsp::Url::from_file_path(file).unwrap(),
4627 range: Default::default(),
4628 })
4629 .collect(),
4630 ))
4631 }
4632 });
4633
4634 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4635 let rng = rng.clone();
4636 let project = project.clone();
4637 move |params, mut cx| {
4638 if let Some(project) = project.upgrade(&cx) {
4639 project.update(&mut cx, |project, cx| {
4640 let path = params
4641 .text_document_position_params
4642 .text_document
4643 .uri
4644 .to_file_path()
4645 .unwrap();
4646 let (worktree, relative_path) =
4647 project.find_local_worktree(&path, cx)?;
4648 let project_path =
4649 ProjectPath::from((worktree.read(cx).id(), relative_path));
4650 let buffer =
4651 project.get_open_buffer(&project_path, cx)?.read(cx);
4652
4653 let mut highlights = Vec::new();
4654 let highlight_count = rng.lock().gen_range(1..=5);
4655 let mut prev_end = 0;
4656 for _ in 0..highlight_count {
4657 let range =
4658 buffer.random_byte_range(prev_end, &mut *rng.lock());
4659 let start = buffer
4660 .offset_to_point_utf16(range.start)
4661 .to_lsp_position();
4662 let end = buffer
4663 .offset_to_point_utf16(range.end)
4664 .to_lsp_position();
4665 highlights.push(lsp::DocumentHighlight {
4666 range: lsp::Range::new(start, end),
4667 kind: Some(lsp::DocumentHighlightKind::READ),
4668 });
4669 prev_end = range.end;
4670 }
4671 Some(highlights)
4672 })
4673 } else {
4674 None
4675 }
4676 }
4677 });
4678 }
4679 });
4680
4681 project.update(&mut cx, |project, _| {
4682 project.languages().add(Arc::new(Language::new(
4683 LanguageConfig {
4684 name: "Rust".into(),
4685 path_suffixes: vec!["rs".to_string()],
4686 language_server: Some(language_server_config),
4687 ..Default::default()
4688 },
4689 None,
4690 )));
4691 });
4692
4693 async move {
4694 let fs = project.read_with(&cx, |project, _| project.fs().clone());
4695 while operations.get() < max_operations {
4696 operations.set(operations.get() + 1);
4697
4698 let distribution = rng.lock().gen_range::<usize, _>(0..100);
4699 match distribution {
4700 0..=20 if !files.lock().is_empty() => {
4701 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4702 let mut path = path.as_path();
4703 while let Some(parent_path) = path.parent() {
4704 path = parent_path;
4705 if rng.lock().gen() {
4706 break;
4707 }
4708 }
4709
4710 log::info!("Host: find/create local worktree {:?}", path);
4711 let find_or_create_worktree = project.update(&mut cx, |project, cx| {
4712 project.find_or_create_local_worktree(path, true, cx)
4713 });
4714 let find_or_create_worktree = async move {
4715 find_or_create_worktree.await.unwrap();
4716 };
4717 if rng.lock().gen() {
4718 cx.background().spawn(find_or_create_worktree).detach();
4719 } else {
4720 find_or_create_worktree.await;
4721 }
4722 }
4723 10..=80 if !files.lock().is_empty() => {
4724 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4725 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4726 let (worktree, path) = project
4727 .update(&mut cx, |project, cx| {
4728 project.find_or_create_local_worktree(
4729 file.clone(),
4730 true,
4731 cx,
4732 )
4733 })
4734 .await
4735 .unwrap();
4736 let project_path =
4737 worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4738 log::info!(
4739 "Host: opening path {:?}, worktree {}, relative_path {:?}",
4740 file,
4741 project_path.0,
4742 project_path.1
4743 );
4744 let buffer = project
4745 .update(&mut cx, |project, cx| {
4746 project.open_buffer(project_path, cx)
4747 })
4748 .await
4749 .unwrap();
4750 self.buffers.insert(buffer.clone());
4751 buffer
4752 } else {
4753 self.buffers
4754 .iter()
4755 .choose(&mut *rng.lock())
4756 .unwrap()
4757 .clone()
4758 };
4759
4760 if rng.lock().gen_bool(0.1) {
4761 cx.update(|cx| {
4762 log::info!(
4763 "Host: dropping buffer {:?}",
4764 buffer.read(cx).file().unwrap().full_path(cx)
4765 );
4766 self.buffers.remove(&buffer);
4767 drop(buffer);
4768 });
4769 } else {
4770 buffer.update(&mut cx, |buffer, cx| {
4771 log::info!(
4772 "Host: updating buffer {:?} ({})",
4773 buffer.file().unwrap().full_path(cx),
4774 buffer.remote_id()
4775 );
4776 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4777 });
4778 }
4779 }
4780 _ => loop {
4781 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4782 let mut path = PathBuf::new();
4783 path.push("/");
4784 for _ in 0..path_component_count {
4785 let letter = rng.lock().gen_range(b'a'..=b'z');
4786 path.push(std::str::from_utf8(&[letter]).unwrap());
4787 }
4788 path.set_extension("rs");
4789 let parent_path = path.parent().unwrap();
4790
4791 log::info!("Host: creating file {:?}", path,);
4792
4793 if fs.create_dir(&parent_path).await.is_ok()
4794 && fs.create_file(&path, Default::default()).await.is_ok()
4795 {
4796 files.lock().push(path);
4797 break;
4798 } else {
4799 log::info!("Host: cannot create file");
4800 }
4801 },
4802 }
4803
4804 cx.background().simulate_random_delay().await;
4805 }
4806
4807 log::info!("Host done");
4808
4809 self.project = Some(project);
4810 (self, cx)
4811 }
4812 }
4813
4814 pub async fn simulate_guest(
4815 mut self,
4816 guest_id: usize,
4817 project: ModelHandle<Project>,
4818 operations: Rc<Cell<usize>>,
4819 max_operations: usize,
4820 rng: Arc<Mutex<StdRng>>,
4821 mut cx: TestAppContext,
4822 ) -> (Self, TestAppContext) {
4823 while operations.get() < max_operations {
4824 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4825 let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4826 project
4827 .worktrees(&cx)
4828 .filter(|worktree| {
4829 let worktree = worktree.read(cx);
4830 worktree.is_visible()
4831 && worktree.entries(false).any(|e| e.is_file())
4832 })
4833 .choose(&mut *rng.lock())
4834 }) {
4835 worktree
4836 } else {
4837 cx.background().simulate_random_delay().await;
4838 continue;
4839 };
4840
4841 operations.set(operations.get() + 1);
4842 let (worktree_root_name, project_path) =
4843 worktree.read_with(&cx, |worktree, _| {
4844 let entry = worktree
4845 .entries(false)
4846 .filter(|e| e.is_file())
4847 .choose(&mut *rng.lock())
4848 .unwrap();
4849 (
4850 worktree.root_name().to_string(),
4851 (worktree.id(), entry.path.clone()),
4852 )
4853 });
4854 log::info!(
4855 "Guest {}: opening path {:?} in worktree {} ({})",
4856 guest_id,
4857 project_path.1,
4858 project_path.0,
4859 worktree_root_name,
4860 );
4861 let buffer = project
4862 .update(&mut cx, |project, cx| {
4863 project.open_buffer(project_path.clone(), cx)
4864 })
4865 .await
4866 .unwrap();
4867 log::info!(
4868 "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
4869 guest_id,
4870 project_path.1,
4871 project_path.0,
4872 worktree_root_name,
4873 buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4874 );
4875 self.buffers.insert(buffer.clone());
4876 buffer
4877 } else {
4878 operations.set(operations.get() + 1);
4879
4880 self.buffers
4881 .iter()
4882 .choose(&mut *rng.lock())
4883 .unwrap()
4884 .clone()
4885 };
4886
4887 let choice = rng.lock().gen_range(0..100);
4888 match choice {
4889 0..=9 => {
4890 cx.update(|cx| {
4891 log::info!(
4892 "Guest {}: dropping buffer {:?}",
4893 guest_id,
4894 buffer.read(cx).file().unwrap().full_path(cx)
4895 );
4896 self.buffers.remove(&buffer);
4897 drop(buffer);
4898 });
4899 }
4900 10..=19 => {
4901 let completions = project.update(&mut cx, |project, cx| {
4902 log::info!(
4903 "Guest {}: requesting completions for buffer {} ({:?})",
4904 guest_id,
4905 buffer.read(cx).remote_id(),
4906 buffer.read(cx).file().unwrap().full_path(cx)
4907 );
4908 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4909 project.completions(&buffer, offset, cx)
4910 });
4911 let completions = cx.background().spawn(async move {
4912 completions.await.expect("completions request failed");
4913 });
4914 if rng.lock().gen_bool(0.3) {
4915 log::info!("Guest {}: detaching completions request", guest_id);
4916 completions.detach();
4917 } else {
4918 completions.await;
4919 }
4920 }
4921 20..=29 => {
4922 let code_actions = project.update(&mut cx, |project, cx| {
4923 log::info!(
4924 "Guest {}: requesting code actions for buffer {} ({:?})",
4925 guest_id,
4926 buffer.read(cx).remote_id(),
4927 buffer.read(cx).file().unwrap().full_path(cx)
4928 );
4929 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4930 project.code_actions(&buffer, range, cx)
4931 });
4932 let code_actions = cx.background().spawn(async move {
4933 code_actions.await.expect("code actions request failed");
4934 });
4935 if rng.lock().gen_bool(0.3) {
4936 log::info!("Guest {}: detaching code actions request", guest_id);
4937 code_actions.detach();
4938 } else {
4939 code_actions.await;
4940 }
4941 }
4942 30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4943 let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4944 log::info!(
4945 "Guest {}: saving buffer {} ({:?})",
4946 guest_id,
4947 buffer.remote_id(),
4948 buffer.file().unwrap().full_path(cx)
4949 );
4950 (buffer.version(), buffer.save(cx))
4951 });
4952 let save = cx.background().spawn(async move {
4953 let (saved_version, _) = save.await.expect("save request failed");
4954 assert!(saved_version.observed_all(&requested_version));
4955 });
4956 if rng.lock().gen_bool(0.3) {
4957 log::info!("Guest {}: detaching save request", guest_id);
4958 save.detach();
4959 } else {
4960 save.await;
4961 }
4962 }
4963 40..=44 => {
4964 let prepare_rename = project.update(&mut cx, |project, cx| {
4965 log::info!(
4966 "Guest {}: preparing rename for buffer {} ({:?})",
4967 guest_id,
4968 buffer.read(cx).remote_id(),
4969 buffer.read(cx).file().unwrap().full_path(cx)
4970 );
4971 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4972 project.prepare_rename(buffer, offset, cx)
4973 });
4974 let prepare_rename = cx.background().spawn(async move {
4975 prepare_rename.await.expect("prepare rename request failed");
4976 });
4977 if rng.lock().gen_bool(0.3) {
4978 log::info!("Guest {}: detaching prepare rename request", guest_id);
4979 prepare_rename.detach();
4980 } else {
4981 prepare_rename.await;
4982 }
4983 }
4984 45..=49 => {
4985 let definitions = project.update(&mut cx, |project, cx| {
4986 log::info!(
4987 "Guest {}: requesting definitions for buffer {} ({:?})",
4988 guest_id,
4989 buffer.read(cx).remote_id(),
4990 buffer.read(cx).file().unwrap().full_path(cx)
4991 );
4992 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4993 project.definition(&buffer, offset, cx)
4994 });
4995 let definitions = cx.background().spawn(async move {
4996 definitions.await.expect("definitions request failed")
4997 });
4998 if rng.lock().gen_bool(0.3) {
4999 log::info!("Guest {}: detaching definitions request", guest_id);
5000 definitions.detach();
5001 } else {
5002 self.buffers
5003 .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5004 }
5005 }
5006 50..=54 => {
5007 let highlights = project.update(&mut cx, |project, cx| {
5008 log::info!(
5009 "Guest {}: requesting highlights for buffer {} ({:?})",
5010 guest_id,
5011 buffer.read(cx).remote_id(),
5012 buffer.read(cx).file().unwrap().full_path(cx)
5013 );
5014 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5015 project.document_highlights(&buffer, offset, cx)
5016 });
5017 let highlights = cx.background().spawn(async move {
5018 highlights.await.expect("highlights request failed");
5019 });
5020 if rng.lock().gen_bool(0.3) {
5021 log::info!("Guest {}: detaching highlights request", guest_id);
5022 highlights.detach();
5023 } else {
5024 highlights.await;
5025 }
5026 }
5027 55..=59 => {
5028 let search = project.update(&mut cx, |project, cx| {
5029 let query = rng.lock().gen_range('a'..='z');
5030 log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5031 project.search(SearchQuery::text(query, false, false), cx)
5032 });
5033 let search = cx
5034 .background()
5035 .spawn(async move { search.await.expect("search request failed") });
5036 if rng.lock().gen_bool(0.3) {
5037 log::info!("Guest {}: detaching search request", guest_id);
5038 search.detach();
5039 } else {
5040 self.buffers.extend(search.await.into_keys());
5041 }
5042 }
5043 _ => {
5044 buffer.update(&mut cx, |buffer, cx| {
5045 log::info!(
5046 "Guest {}: updating buffer {} ({:?})",
5047 guest_id,
5048 buffer.remote_id(),
5049 buffer.file().unwrap().full_path(cx)
5050 );
5051 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5052 });
5053 }
5054 }
5055 cx.background().simulate_random_delay().await;
5056 }
5057
5058 log::info!("Guest {} done", guest_id);
5059
5060 self.project = Some(project);
5061 (self, cx)
5062 }
5063 }
5064
5065 impl Drop for TestClient {
5066 fn drop(&mut self) {
5067 self.client.tear_down();
5068 }
5069 }
5070
5071 impl Executor for Arc<gpui::executor::Background> {
5072 type Timer = gpui::executor::Timer;
5073
5074 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5075 self.spawn(future).detach();
5076 }
5077
5078 fn timer(&self, duration: Duration) -> Self::Timer {
5079 self.as_ref().timer(duration)
5080 }
5081 }
5082
5083 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5084 channel
5085 .messages()
5086 .cursor::<()>()
5087 .map(|m| {
5088 (
5089 m.sender.github_login.clone(),
5090 m.body.clone(),
5091 m.is_pending(),
5092 )
5093 })
5094 .collect()
5095 }
5096
5097 struct EmptyView;
5098
5099 impl gpui::Entity for EmptyView {
5100 type Event = ();
5101 }
5102
5103 impl gpui::View for EmptyView {
5104 fn ui_name() -> &'static str {
5105 "empty view"
5106 }
5107
5108 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5109 gpui::Element::boxed(gpui::elements::Empty)
5110 }
5111 }
5112}