1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_io::Timer;
10use async_std::task;
11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
12use collections::{HashMap, HashSet};
13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{
21 any::TypeId,
22 future::Future,
23 sync::Arc,
24 time::{Duration, Instant},
25};
26use store::{Store, Worktree};
27use surf::StatusCode;
28use tide::log;
29use tide::{
30 http::headers::{HeaderName, CONNECTION, UPGRADE},
31 Request, Response,
32};
33use time::OffsetDateTime;
34
35type MessageHandler = Box<
36 dyn Send
37 + Sync
38 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
39>;
40
41pub struct Server {
42 peer: Arc<Peer>,
43 store: RwLock<Store>,
44 app_state: Arc<AppState>,
45 handlers: HashMap<TypeId, MessageHandler>,
46 notifications: Option<mpsc::UnboundedSender<()>>,
47}
48
49pub trait Executor: Send + Clone {
50 type Timer: Send + Future;
51 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
52 fn timer(&self, duration: Duration) -> Self::Timer;
53}
54
55#[derive(Clone)]
56pub struct RealExecutor;
57
58const MESSAGE_COUNT_PER_PAGE: usize = 100;
59const MAX_MESSAGE_LEN: usize = 1024;
60
61impl Server {
62 pub fn new(
63 app_state: Arc<AppState>,
64 peer: Arc<Peer>,
65 notifications: Option<mpsc::UnboundedSender<()>>,
66 ) -> Arc<Self> {
67 let mut server = Self {
68 peer,
69 app_state,
70 store: Default::default(),
71 handlers: Default::default(),
72 notifications,
73 };
74
75 server
76 .add_request_handler(Server::ping)
77 .add_request_handler(Server::register_project)
78 .add_message_handler(Server::unregister_project)
79 .add_request_handler(Server::share_project)
80 .add_message_handler(Server::unshare_project)
81 .add_request_handler(Server::join_project)
82 .add_message_handler(Server::leave_project)
83 .add_request_handler(Server::register_worktree)
84 .add_message_handler(Server::unregister_worktree)
85 .add_request_handler(Server::update_worktree)
86 .add_message_handler(Server::start_language_server)
87 .add_message_handler(Server::update_language_server)
88 .add_message_handler(Server::update_diagnostic_summary)
89 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
90 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
91 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
92 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
93 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
94 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
95 .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
96 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
97 .add_request_handler(
98 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
99 )
100 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
101 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
102 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
103 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
104 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
105 .add_request_handler(Server::update_buffer)
106 .add_message_handler(Server::update_buffer_file)
107 .add_message_handler(Server::buffer_reloaded)
108 .add_message_handler(Server::buffer_saved)
109 .add_request_handler(Server::save_buffer)
110 .add_request_handler(Server::get_channels)
111 .add_request_handler(Server::get_users)
112 .add_request_handler(Server::join_channel)
113 .add_message_handler(Server::leave_channel)
114 .add_request_handler(Server::send_channel_message)
115 .add_request_handler(Server::get_channel_messages);
116
117 Arc::new(server)
118 }
119
120 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
121 where
122 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
123 Fut: 'static + Send + Future<Output = tide::Result<()>>,
124 M: EnvelopedMessage,
125 {
126 let prev_handler = self.handlers.insert(
127 TypeId::of::<M>(),
128 Box::new(move |server, envelope| {
129 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
130 (handler)(server, *envelope).boxed()
131 }),
132 );
133 if prev_handler.is_some() {
134 panic!("registered a handler for the same message twice");
135 }
136 self
137 }
138
139 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
140 where
141 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
142 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
143 M: RequestMessage,
144 {
145 self.add_message_handler(move |server, envelope| {
146 let receipt = envelope.receipt();
147 let response = (handler)(server.clone(), envelope);
148 async move {
149 match response.await {
150 Ok(response) => {
151 server.peer.respond(receipt, response)?;
152 Ok(())
153 }
154 Err(error) => {
155 server.peer.respond_with_error(
156 receipt,
157 proto::Error {
158 message: error.to_string(),
159 },
160 )?;
161 Err(error)
162 }
163 }
164 }
165 })
166 }
167
168 pub fn handle_connection<E: Executor>(
169 self: &Arc<Self>,
170 connection: Connection,
171 addr: String,
172 user_id: UserId,
173 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
174 executor: E,
175 ) -> impl Future<Output = ()> {
176 let mut this = self.clone();
177 async move {
178 let (connection_id, handle_io, mut incoming_rx) = this
179 .peer
180 .add_connection(connection, {
181 let executor = executor.clone();
182 move |duration| {
183 let timer = executor.timer(duration);
184 async move {
185 timer.await;
186 }
187 }
188 })
189 .await;
190
191 if let Some(send_connection_id) = send_connection_id.as_mut() {
192 let _ = send_connection_id.send(connection_id).await;
193 }
194
195 this.state_mut().add_connection(connection_id, user_id);
196 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
197 log::error!("error updating contacts for {:?}: {}", user_id, err);
198 }
199
200 let handle_io = handle_io.fuse();
201 futures::pin_mut!(handle_io);
202 loop {
203 let next_message = incoming_rx.next().fuse();
204 futures::pin_mut!(next_message);
205 futures::select_biased! {
206 result = handle_io => {
207 if let Err(err) = result {
208 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
209 }
210 break;
211 }
212 message = next_message => {
213 if let Some(message) = message {
214 let start_time = Instant::now();
215 let type_name = message.payload_type_name();
216 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
217 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
218 let notifications = this.notifications.clone();
219 let is_background = message.is_background();
220 let handle_message = (handler)(this.clone(), message);
221 let handle_message = async move {
222 if let Err(err) = handle_message.await {
223 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
224 } else {
225 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
226 }
227 if let Some(mut notifications) = notifications {
228 let _ = notifications.send(()).await;
229 }
230 };
231 if is_background {
232 executor.spawn_detached(handle_message);
233 } else {
234 handle_message.await;
235 }
236 } else {
237 log::warn!("unhandled message: {}", type_name);
238 }
239 } else {
240 log::info!("rpc connection closed {:?}", addr);
241 break;
242 }
243 }
244 }
245 }
246
247 if let Err(err) = this.sign_out(connection_id).await {
248 log::error!("error signing out connection {:?} - {:?}", addr, err);
249 }
250 }
251 }
252
253 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
254 self.peer.disconnect(connection_id);
255 let removed_connection = self.state_mut().remove_connection(connection_id)?;
256
257 for (project_id, project) in removed_connection.hosted_projects {
258 if let Some(share) = project.share {
259 broadcast(
260 connection_id,
261 share.guests.keys().copied().collect(),
262 |conn_id| {
263 self.peer
264 .send(conn_id, proto::UnshareProject { project_id })
265 },
266 )?;
267 }
268 }
269
270 for (project_id, peer_ids) in removed_connection.guest_project_ids {
271 broadcast(connection_id, peer_ids, |conn_id| {
272 self.peer.send(
273 conn_id,
274 proto::RemoveProjectCollaborator {
275 project_id,
276 peer_id: connection_id.0,
277 },
278 )
279 })?;
280 }
281
282 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
283 Ok(())
284 }
285
286 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
287 Ok(proto::Ack {})
288 }
289
290 async fn register_project(
291 mut self: Arc<Server>,
292 request: TypedEnvelope<proto::RegisterProject>,
293 ) -> tide::Result<proto::RegisterProjectResponse> {
294 let project_id = {
295 let mut state = self.state_mut();
296 let user_id = state.user_id_for_connection(request.sender_id)?;
297 state.register_project(request.sender_id, user_id)
298 };
299 Ok(proto::RegisterProjectResponse { project_id })
300 }
301
302 async fn unregister_project(
303 mut self: Arc<Server>,
304 request: TypedEnvelope<proto::UnregisterProject>,
305 ) -> tide::Result<()> {
306 let project = self
307 .state_mut()
308 .unregister_project(request.payload.project_id, request.sender_id)?;
309 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
310 Ok(())
311 }
312
313 async fn share_project(
314 mut self: Arc<Server>,
315 request: TypedEnvelope<proto::ShareProject>,
316 ) -> tide::Result<proto::Ack> {
317 self.state_mut()
318 .share_project(request.payload.project_id, request.sender_id);
319 Ok(proto::Ack {})
320 }
321
322 async fn unshare_project(
323 mut self: Arc<Server>,
324 request: TypedEnvelope<proto::UnshareProject>,
325 ) -> tide::Result<()> {
326 let project_id = request.payload.project_id;
327 let project = self
328 .state_mut()
329 .unshare_project(project_id, request.sender_id)?;
330
331 broadcast(request.sender_id, project.connection_ids, |conn_id| {
332 self.peer
333 .send(conn_id, proto::UnshareProject { project_id })
334 })?;
335 self.update_contacts_for_users(&project.authorized_user_ids)?;
336 Ok(())
337 }
338
339 async fn join_project(
340 mut self: Arc<Server>,
341 request: TypedEnvelope<proto::JoinProject>,
342 ) -> tide::Result<proto::JoinProjectResponse> {
343 let project_id = request.payload.project_id;
344
345 let user_id = self.state().user_id_for_connection(request.sender_id)?;
346 let (response, connection_ids, contact_user_ids) = self
347 .state_mut()
348 .join_project(request.sender_id, user_id, project_id)
349 .and_then(|joined| {
350 let share = joined.project.share()?;
351 let peer_count = share.guests.len();
352 let mut collaborators = Vec::with_capacity(peer_count);
353 collaborators.push(proto::Collaborator {
354 peer_id: joined.project.host_connection_id.0,
355 replica_id: 0,
356 user_id: joined.project.host_user_id.to_proto(),
357 });
358 let worktrees = share
359 .worktrees
360 .iter()
361 .filter_map(|(id, shared_worktree)| {
362 let worktree = joined.project.worktrees.get(&id)?;
363 Some(proto::Worktree {
364 id: *id,
365 root_name: worktree.root_name.clone(),
366 entries: shared_worktree.entries.values().cloned().collect(),
367 diagnostic_summaries: shared_worktree
368 .diagnostic_summaries
369 .values()
370 .cloned()
371 .collect(),
372 visible: worktree.visible,
373 })
374 })
375 .collect();
376 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
377 if *peer_conn_id != request.sender_id {
378 collaborators.push(proto::Collaborator {
379 peer_id: peer_conn_id.0,
380 replica_id: *peer_replica_id as u32,
381 user_id: peer_user_id.to_proto(),
382 });
383 }
384 }
385 let response = proto::JoinProjectResponse {
386 worktrees,
387 replica_id: joined.replica_id as u32,
388 collaborators,
389 language_servers: joined.project.language_servers.clone(),
390 };
391 let connection_ids = joined.project.connection_ids();
392 let contact_user_ids = joined.project.authorized_user_ids();
393 Ok((response, connection_ids, contact_user_ids))
394 })?;
395
396 broadcast(request.sender_id, connection_ids, |conn_id| {
397 self.peer.send(
398 conn_id,
399 proto::AddProjectCollaborator {
400 project_id,
401 collaborator: Some(proto::Collaborator {
402 peer_id: request.sender_id.0,
403 replica_id: response.replica_id,
404 user_id: user_id.to_proto(),
405 }),
406 },
407 )
408 })?;
409 self.update_contacts_for_users(&contact_user_ids)?;
410 Ok(response)
411 }
412
413 async fn leave_project(
414 mut self: Arc<Server>,
415 request: TypedEnvelope<proto::LeaveProject>,
416 ) -> tide::Result<()> {
417 let sender_id = request.sender_id;
418 let project_id = request.payload.project_id;
419 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
420
421 broadcast(sender_id, worktree.connection_ids, |conn_id| {
422 self.peer.send(
423 conn_id,
424 proto::RemoveProjectCollaborator {
425 project_id,
426 peer_id: sender_id.0,
427 },
428 )
429 })?;
430 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
431
432 Ok(())
433 }
434
435 async fn register_worktree(
436 mut self: Arc<Server>,
437 request: TypedEnvelope<proto::RegisterWorktree>,
438 ) -> tide::Result<proto::Ack> {
439 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
440
441 let mut contact_user_ids = HashSet::default();
442 contact_user_ids.insert(host_user_id);
443 for github_login in &request.payload.authorized_logins {
444 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
445 contact_user_ids.insert(contact_user_id);
446 }
447
448 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
449 let guest_connection_ids;
450 {
451 let mut state = self.state_mut();
452 guest_connection_ids = state
453 .read_project(request.payload.project_id, request.sender_id)?
454 .guest_connection_ids();
455 state.register_worktree(
456 request.payload.project_id,
457 request.payload.worktree_id,
458 request.sender_id,
459 Worktree {
460 authorized_user_ids: contact_user_ids.clone(),
461 root_name: request.payload.root_name.clone(),
462 visible: request.payload.visible,
463 },
464 )?;
465 }
466 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
467 self.peer
468 .forward_send(request.sender_id, connection_id, request.payload.clone())
469 })?;
470 self.update_contacts_for_users(&contact_user_ids)?;
471 Ok(proto::Ack {})
472 }
473
474 async fn unregister_worktree(
475 mut self: Arc<Server>,
476 request: TypedEnvelope<proto::UnregisterWorktree>,
477 ) -> tide::Result<()> {
478 let project_id = request.payload.project_id;
479 let worktree_id = request.payload.worktree_id;
480 let (worktree, guest_connection_ids) =
481 self.state_mut()
482 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
483 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
484 self.peer.send(
485 conn_id,
486 proto::UnregisterWorktree {
487 project_id,
488 worktree_id,
489 },
490 )
491 })?;
492 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
493 Ok(())
494 }
495
496 async fn update_worktree(
497 mut self: Arc<Server>,
498 request: TypedEnvelope<proto::UpdateWorktree>,
499 ) -> tide::Result<proto::Ack> {
500 let connection_ids = self.state_mut().update_worktree(
501 request.sender_id,
502 request.payload.project_id,
503 request.payload.worktree_id,
504 &request.payload.removed_entries,
505 &request.payload.updated_entries,
506 )?;
507
508 broadcast(request.sender_id, connection_ids, |connection_id| {
509 self.peer
510 .forward_send(request.sender_id, connection_id, request.payload.clone())
511 })?;
512
513 Ok(proto::Ack {})
514 }
515
516 async fn update_diagnostic_summary(
517 mut self: Arc<Server>,
518 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
519 ) -> tide::Result<()> {
520 let summary = request
521 .payload
522 .summary
523 .clone()
524 .ok_or_else(|| anyhow!("invalid summary"))?;
525 let receiver_ids = self.state_mut().update_diagnostic_summary(
526 request.payload.project_id,
527 request.payload.worktree_id,
528 request.sender_id,
529 summary,
530 )?;
531
532 broadcast(request.sender_id, receiver_ids, |connection_id| {
533 self.peer
534 .forward_send(request.sender_id, connection_id, request.payload.clone())
535 })?;
536 Ok(())
537 }
538
539 async fn start_language_server(
540 mut self: Arc<Server>,
541 request: TypedEnvelope<proto::StartLanguageServer>,
542 ) -> tide::Result<()> {
543 let receiver_ids = self.state_mut().start_language_server(
544 request.payload.project_id,
545 request.sender_id,
546 request
547 .payload
548 .server
549 .clone()
550 .ok_or_else(|| anyhow!("invalid language server"))?,
551 )?;
552 broadcast(request.sender_id, receiver_ids, |connection_id| {
553 self.peer
554 .forward_send(request.sender_id, connection_id, request.payload.clone())
555 })?;
556 Ok(())
557 }
558
559 async fn update_language_server(
560 self: Arc<Server>,
561 request: TypedEnvelope<proto::UpdateLanguageServer>,
562 ) -> tide::Result<()> {
563 let receiver_ids = self
564 .state()
565 .project_connection_ids(request.payload.project_id, request.sender_id)?;
566 broadcast(request.sender_id, receiver_ids, |connection_id| {
567 self.peer
568 .forward_send(request.sender_id, connection_id, request.payload.clone())
569 })?;
570 Ok(())
571 }
572
573 async fn forward_project_request<T>(
574 self: Arc<Server>,
575 request: TypedEnvelope<T>,
576 ) -> tide::Result<T::Response>
577 where
578 T: EntityMessage + RequestMessage,
579 {
580 let host_connection_id = self
581 .state()
582 .read_project(request.payload.remote_entity_id(), request.sender_id)?
583 .host_connection_id;
584 Ok(self
585 .peer
586 .forward_request(request.sender_id, host_connection_id, request.payload)
587 .await?)
588 }
589
590 async fn save_buffer(
591 self: Arc<Server>,
592 request: TypedEnvelope<proto::SaveBuffer>,
593 ) -> tide::Result<proto::BufferSaved> {
594 let host;
595 let mut guests;
596 {
597 let state = self.state();
598 let project = state.read_project(request.payload.project_id, request.sender_id)?;
599 host = project.host_connection_id;
600 guests = project.guest_connection_ids()
601 }
602
603 let response = self
604 .peer
605 .forward_request(request.sender_id, host, request.payload.clone())
606 .await?;
607
608 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
609 broadcast(host, guests, |conn_id| {
610 self.peer.forward_send(host, conn_id, response.clone())
611 })?;
612
613 Ok(response)
614 }
615
616 async fn update_buffer(
617 self: Arc<Server>,
618 request: TypedEnvelope<proto::UpdateBuffer>,
619 ) -> tide::Result<proto::Ack> {
620 let receiver_ids = self
621 .state()
622 .project_connection_ids(request.payload.project_id, request.sender_id)?;
623 broadcast(request.sender_id, receiver_ids, |connection_id| {
624 self.peer
625 .forward_send(request.sender_id, connection_id, request.payload.clone())
626 })?;
627 Ok(proto::Ack {})
628 }
629
630 async fn update_buffer_file(
631 self: Arc<Server>,
632 request: TypedEnvelope<proto::UpdateBufferFile>,
633 ) -> tide::Result<()> {
634 let receiver_ids = self
635 .state()
636 .project_connection_ids(request.payload.project_id, request.sender_id)?;
637 broadcast(request.sender_id, receiver_ids, |connection_id| {
638 self.peer
639 .forward_send(request.sender_id, connection_id, request.payload.clone())
640 })?;
641 Ok(())
642 }
643
644 async fn buffer_reloaded(
645 self: Arc<Server>,
646 request: TypedEnvelope<proto::BufferReloaded>,
647 ) -> tide::Result<()> {
648 let receiver_ids = self
649 .state()
650 .project_connection_ids(request.payload.project_id, request.sender_id)?;
651 broadcast(request.sender_id, receiver_ids, |connection_id| {
652 self.peer
653 .forward_send(request.sender_id, connection_id, request.payload.clone())
654 })?;
655 Ok(())
656 }
657
658 async fn buffer_saved(
659 self: Arc<Server>,
660 request: TypedEnvelope<proto::BufferSaved>,
661 ) -> tide::Result<()> {
662 let receiver_ids = self
663 .state()
664 .project_connection_ids(request.payload.project_id, request.sender_id)?;
665 broadcast(request.sender_id, receiver_ids, |connection_id| {
666 self.peer
667 .forward_send(request.sender_id, connection_id, request.payload.clone())
668 })?;
669 Ok(())
670 }
671
672 async fn get_channels(
673 self: Arc<Server>,
674 request: TypedEnvelope<proto::GetChannels>,
675 ) -> tide::Result<proto::GetChannelsResponse> {
676 let user_id = self.state().user_id_for_connection(request.sender_id)?;
677 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
678 Ok(proto::GetChannelsResponse {
679 channels: channels
680 .into_iter()
681 .map(|chan| proto::Channel {
682 id: chan.id.to_proto(),
683 name: chan.name,
684 })
685 .collect(),
686 })
687 }
688
689 async fn get_users(
690 self: Arc<Server>,
691 request: TypedEnvelope<proto::GetUsers>,
692 ) -> tide::Result<proto::GetUsersResponse> {
693 let user_ids = request
694 .payload
695 .user_ids
696 .into_iter()
697 .map(UserId::from_proto)
698 .collect();
699 let users = self
700 .app_state
701 .db
702 .get_users_by_ids(user_ids)
703 .await?
704 .into_iter()
705 .map(|user| proto::User {
706 id: user.id.to_proto(),
707 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
708 github_login: user.github_login,
709 })
710 .collect();
711 Ok(proto::GetUsersResponse { users })
712 }
713
714 fn update_contacts_for_users<'a>(
715 self: &Arc<Server>,
716 user_ids: impl IntoIterator<Item = &'a UserId>,
717 ) -> anyhow::Result<()> {
718 let mut result = Ok(());
719 let state = self.state();
720 for user_id in user_ids {
721 let contacts = state.contacts_for_user(*user_id);
722 for connection_id in state.connection_ids_for_user(*user_id) {
723 if let Err(error) = self.peer.send(
724 connection_id,
725 proto::UpdateContacts {
726 contacts: contacts.clone(),
727 },
728 ) {
729 result = Err(error);
730 }
731 }
732 }
733 result
734 }
735
736 async fn join_channel(
737 mut self: Arc<Self>,
738 request: TypedEnvelope<proto::JoinChannel>,
739 ) -> tide::Result<proto::JoinChannelResponse> {
740 let user_id = self.state().user_id_for_connection(request.sender_id)?;
741 let channel_id = ChannelId::from_proto(request.payload.channel_id);
742 if !self
743 .app_state
744 .db
745 .can_user_access_channel(user_id, channel_id)
746 .await?
747 {
748 Err(anyhow!("access denied"))?;
749 }
750
751 self.state_mut().join_channel(request.sender_id, channel_id);
752 let messages = self
753 .app_state
754 .db
755 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
756 .await?
757 .into_iter()
758 .map(|msg| proto::ChannelMessage {
759 id: msg.id.to_proto(),
760 body: msg.body,
761 timestamp: msg.sent_at.unix_timestamp() as u64,
762 sender_id: msg.sender_id.to_proto(),
763 nonce: Some(msg.nonce.as_u128().into()),
764 })
765 .collect::<Vec<_>>();
766 Ok(proto::JoinChannelResponse {
767 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
768 messages,
769 })
770 }
771
772 async fn leave_channel(
773 mut self: Arc<Self>,
774 request: TypedEnvelope<proto::LeaveChannel>,
775 ) -> tide::Result<()> {
776 let user_id = self.state().user_id_for_connection(request.sender_id)?;
777 let channel_id = ChannelId::from_proto(request.payload.channel_id);
778 if !self
779 .app_state
780 .db
781 .can_user_access_channel(user_id, channel_id)
782 .await?
783 {
784 Err(anyhow!("access denied"))?;
785 }
786
787 self.state_mut()
788 .leave_channel(request.sender_id, channel_id);
789
790 Ok(())
791 }
792
793 async fn send_channel_message(
794 self: Arc<Self>,
795 request: TypedEnvelope<proto::SendChannelMessage>,
796 ) -> tide::Result<proto::SendChannelMessageResponse> {
797 let channel_id = ChannelId::from_proto(request.payload.channel_id);
798 let user_id;
799 let connection_ids;
800 {
801 let state = self.state();
802 user_id = state.user_id_for_connection(request.sender_id)?;
803 connection_ids = state.channel_connection_ids(channel_id)?;
804 }
805
806 // Validate the message body.
807 let body = request.payload.body.trim().to_string();
808 if body.len() > MAX_MESSAGE_LEN {
809 return Err(anyhow!("message is too long"))?;
810 }
811 if body.is_empty() {
812 return Err(anyhow!("message can't be blank"))?;
813 }
814
815 let timestamp = OffsetDateTime::now_utc();
816 let nonce = request
817 .payload
818 .nonce
819 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
820
821 let message_id = self
822 .app_state
823 .db
824 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
825 .await?
826 .to_proto();
827 let message = proto::ChannelMessage {
828 sender_id: user_id.to_proto(),
829 id: message_id,
830 body,
831 timestamp: timestamp.unix_timestamp() as u64,
832 nonce: Some(nonce),
833 };
834 broadcast(request.sender_id, connection_ids, |conn_id| {
835 self.peer.send(
836 conn_id,
837 proto::ChannelMessageSent {
838 channel_id: channel_id.to_proto(),
839 message: Some(message.clone()),
840 },
841 )
842 })?;
843 Ok(proto::SendChannelMessageResponse {
844 message: Some(message),
845 })
846 }
847
848 async fn get_channel_messages(
849 self: Arc<Self>,
850 request: TypedEnvelope<proto::GetChannelMessages>,
851 ) -> tide::Result<proto::GetChannelMessagesResponse> {
852 let user_id = self.state().user_id_for_connection(request.sender_id)?;
853 let channel_id = ChannelId::from_proto(request.payload.channel_id);
854 if !self
855 .app_state
856 .db
857 .can_user_access_channel(user_id, channel_id)
858 .await?
859 {
860 Err(anyhow!("access denied"))?;
861 }
862
863 let messages = self
864 .app_state
865 .db
866 .get_channel_messages(
867 channel_id,
868 MESSAGE_COUNT_PER_PAGE,
869 Some(MessageId::from_proto(request.payload.before_message_id)),
870 )
871 .await?
872 .into_iter()
873 .map(|msg| proto::ChannelMessage {
874 id: msg.id.to_proto(),
875 body: msg.body,
876 timestamp: msg.sent_at.unix_timestamp() as u64,
877 sender_id: msg.sender_id.to_proto(),
878 nonce: Some(msg.nonce.as_u128().into()),
879 })
880 .collect::<Vec<_>>();
881
882 Ok(proto::GetChannelMessagesResponse {
883 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
884 messages,
885 })
886 }
887
888 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
889 self.store.read()
890 }
891
892 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
893 self.store.write()
894 }
895}
896
897impl Executor for RealExecutor {
898 type Timer = Timer;
899
900 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
901 task::spawn(future);
902 }
903
904 fn timer(&self, duration: Duration) -> Self::Timer {
905 Timer::after(duration)
906 }
907}
908
909fn broadcast<F>(
910 sender_id: ConnectionId,
911 receiver_ids: Vec<ConnectionId>,
912 mut f: F,
913) -> anyhow::Result<()>
914where
915 F: FnMut(ConnectionId) -> anyhow::Result<()>,
916{
917 let mut result = Ok(());
918 for receiver_id in receiver_ids {
919 if receiver_id != sender_id {
920 if let Err(error) = f(receiver_id) {
921 if result.is_ok() {
922 result = Err(error);
923 }
924 }
925 }
926 }
927 result
928}
929
930pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
931 let server = Server::new(app.state().clone(), rpc.clone(), None);
932 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
933 let server = server.clone();
934 async move {
935 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
936
937 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
938 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
939 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
940 let client_protocol_version: Option<u32> = request
941 .header("X-Zed-Protocol-Version")
942 .and_then(|v| v.as_str().parse().ok());
943
944 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
945 return Ok(Response::new(StatusCode::UpgradeRequired));
946 }
947
948 let header = match request.header("Sec-Websocket-Key") {
949 Some(h) => h.as_str(),
950 None => return Err(anyhow!("expected sec-websocket-key"))?,
951 };
952
953 let user_id = process_auth_header(&request).await?;
954
955 let mut response = Response::new(StatusCode::SwitchingProtocols);
956 response.insert_header(UPGRADE, "websocket");
957 response.insert_header(CONNECTION, "Upgrade");
958 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
959 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
960 response.insert_header("Sec-Websocket-Version", "13");
961
962 let http_res: &mut tide::http::Response = response.as_mut();
963 let upgrade_receiver = http_res.recv_upgrade().await;
964 let addr = request.remote().unwrap_or("unknown").to_string();
965 task::spawn(async move {
966 if let Some(stream) = upgrade_receiver.await {
967 server
968 .handle_connection(
969 Connection::new(
970 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
971 ),
972 addr,
973 user_id,
974 None,
975 RealExecutor,
976 )
977 .await;
978 }
979 });
980
981 Ok(response)
982 }
983 });
984}
985
986fn header_contains_ignore_case<T>(
987 request: &tide::Request<T>,
988 header_name: HeaderName,
989 value: &str,
990) -> bool {
991 request
992 .header(header_name)
993 .map(|h| {
994 h.as_str()
995 .split(',')
996 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
997 })
998 .unwrap_or(false)
999}
1000
1001#[cfg(test)]
1002mod tests {
1003 use super::*;
1004 use crate::{
1005 auth,
1006 db::{tests::TestDb, UserId},
1007 github, AppState, Config,
1008 };
1009 use ::rpc::Peer;
1010 use client::{
1011 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1012 EstablishConnectionError, UserStore,
1013 };
1014 use collections::BTreeMap;
1015 use editor::{
1016 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1017 Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1018 };
1019 use gpui::{executor, ModelHandle, TestAppContext};
1020 use language::{
1021 tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig, LanguageRegistry,
1022 LanguageServerConfig, OffsetRangeExt, Point, ToLspPosition,
1023 };
1024 use lsp;
1025 use parking_lot::Mutex;
1026 use postage::barrier;
1027 use project::{
1028 fs::{FakeFs, Fs as _},
1029 search::SearchQuery,
1030 worktree::WorktreeHandle,
1031 DiagnosticSummary, Project, ProjectPath,
1032 };
1033 use rand::prelude::*;
1034 use rpc::PeerId;
1035 use serde_json::json;
1036 use sqlx::types::time::OffsetDateTime;
1037 use std::{
1038 cell::Cell,
1039 env,
1040 ops::Deref,
1041 path::{Path, PathBuf},
1042 rc::Rc,
1043 sync::{
1044 atomic::{AtomicBool, Ordering::SeqCst},
1045 Arc,
1046 },
1047 time::Duration,
1048 };
1049 use workspace::{Settings, Workspace, WorkspaceParams};
1050
1051 #[cfg(test)]
1052 #[ctor::ctor]
1053 fn init_logger() {
1054 if std::env::var("RUST_LOG").is_ok() {
1055 env_logger::init();
1056 }
1057 }
1058
1059 #[gpui::test(iterations = 10)]
1060 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1061 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1062 let lang_registry = Arc::new(LanguageRegistry::test());
1063 let fs = FakeFs::new(cx_a.background());
1064 cx_a.foreground().forbid_parking();
1065
1066 // Connect to a server as 2 clients.
1067 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1068 let client_a = server.create_client(cx_a, "user_a").await;
1069 let client_b = server.create_client(cx_b, "user_b").await;
1070
1071 // Share a project as client A
1072 fs.insert_tree(
1073 "/a",
1074 json!({
1075 ".zed.toml": r#"collaborators = ["user_b"]"#,
1076 "a.txt": "a-contents",
1077 "b.txt": "b-contents",
1078 }),
1079 )
1080 .await;
1081 let project_a = cx_a.update(|cx| {
1082 Project::local(
1083 client_a.clone(),
1084 client_a.user_store.clone(),
1085 lang_registry.clone(),
1086 fs.clone(),
1087 cx,
1088 )
1089 });
1090 let (worktree_a, _) = project_a
1091 .update(cx_a, |p, cx| {
1092 p.find_or_create_local_worktree("/a", true, cx)
1093 })
1094 .await
1095 .unwrap();
1096 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1097 worktree_a
1098 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1099 .await;
1100 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1101 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1102
1103 // Join that project as client B
1104 let project_b = Project::remote(
1105 project_id,
1106 client_b.clone(),
1107 client_b.user_store.clone(),
1108 lang_registry.clone(),
1109 fs.clone(),
1110 &mut cx_b.to_async(),
1111 )
1112 .await
1113 .unwrap();
1114
1115 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1116 assert_eq!(
1117 project
1118 .collaborators()
1119 .get(&client_a.peer_id)
1120 .unwrap()
1121 .user
1122 .github_login,
1123 "user_a"
1124 );
1125 project.replica_id()
1126 });
1127 project_a
1128 .condition(&cx_a, |tree, _| {
1129 tree.collaborators()
1130 .get(&client_b.peer_id)
1131 .map_or(false, |collaborator| {
1132 collaborator.replica_id == replica_id_b
1133 && collaborator.user.github_login == "user_b"
1134 })
1135 })
1136 .await;
1137
1138 // Open the same file as client B and client A.
1139 let buffer_b = project_b
1140 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1141 .await
1142 .unwrap();
1143 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1144 buffer_b.read_with(cx_b, |buf, cx| {
1145 assert_eq!(buf.read(cx).text(), "b-contents")
1146 });
1147 project_a.read_with(cx_a, |project, cx| {
1148 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1149 });
1150 let buffer_a = project_a
1151 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1152 .await
1153 .unwrap();
1154
1155 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1156
1157 // TODO
1158 // // Create a selection set as client B and see that selection set as client A.
1159 // buffer_a
1160 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1161 // .await;
1162
1163 // Edit the buffer as client B and see that edit as client A.
1164 editor_b.update(cx_b, |editor, cx| {
1165 editor.handle_input(&Input("ok, ".into()), cx)
1166 });
1167 buffer_a
1168 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1169 .await;
1170
1171 // TODO
1172 // // Remove the selection set as client B, see those selections disappear as client A.
1173 cx_b.update(move |_| drop(editor_b));
1174 // buffer_a
1175 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1176 // .await;
1177
1178 // Dropping the client B's project removes client B from client A's collaborators.
1179 cx_b.update(move |_| drop(project_b));
1180 project_a
1181 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1182 .await;
1183 }
1184
1185 #[gpui::test(iterations = 10)]
1186 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1187 let lang_registry = Arc::new(LanguageRegistry::test());
1188 let fs = FakeFs::new(cx_a.background());
1189 cx_a.foreground().forbid_parking();
1190
1191 // Connect to a server as 2 clients.
1192 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1193 let client_a = server.create_client(cx_a, "user_a").await;
1194 let client_b = server.create_client(cx_b, "user_b").await;
1195
1196 // Share a project as client A
1197 fs.insert_tree(
1198 "/a",
1199 json!({
1200 ".zed.toml": r#"collaborators = ["user_b"]"#,
1201 "a.txt": "a-contents",
1202 "b.txt": "b-contents",
1203 }),
1204 )
1205 .await;
1206 let project_a = cx_a.update(|cx| {
1207 Project::local(
1208 client_a.clone(),
1209 client_a.user_store.clone(),
1210 lang_registry.clone(),
1211 fs.clone(),
1212 cx,
1213 )
1214 });
1215 let (worktree_a, _) = project_a
1216 .update(cx_a, |p, cx| {
1217 p.find_or_create_local_worktree("/a", true, cx)
1218 })
1219 .await
1220 .unwrap();
1221 worktree_a
1222 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1223 .await;
1224 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1225 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1226 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1227 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1228
1229 // Join that project as client B
1230 let project_b = Project::remote(
1231 project_id,
1232 client_b.clone(),
1233 client_b.user_store.clone(),
1234 lang_registry.clone(),
1235 fs.clone(),
1236 &mut cx_b.to_async(),
1237 )
1238 .await
1239 .unwrap();
1240 project_b
1241 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1242 .await
1243 .unwrap();
1244
1245 // Unshare the project as client A
1246 project_a
1247 .update(cx_a, |project, cx| project.unshare(cx))
1248 .await
1249 .unwrap();
1250 project_b
1251 .condition(cx_b, |project, _| project.is_read_only())
1252 .await;
1253 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1254 cx_b.update(|_| {
1255 drop(project_b);
1256 });
1257
1258 // Share the project again and ensure guests can still join.
1259 project_a
1260 .update(cx_a, |project, cx| project.share(cx))
1261 .await
1262 .unwrap();
1263 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1264
1265 let project_b2 = Project::remote(
1266 project_id,
1267 client_b.clone(),
1268 client_b.user_store.clone(),
1269 lang_registry.clone(),
1270 fs.clone(),
1271 &mut cx_b.to_async(),
1272 )
1273 .await
1274 .unwrap();
1275 project_b2
1276 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1277 .await
1278 .unwrap();
1279 }
1280
1281 #[gpui::test(iterations = 10)]
1282 async fn test_propagate_saves_and_fs_changes(
1283 cx_a: &mut TestAppContext,
1284 cx_b: &mut TestAppContext,
1285 cx_c: &mut TestAppContext,
1286 ) {
1287 let lang_registry = Arc::new(LanguageRegistry::test());
1288 let fs = FakeFs::new(cx_a.background());
1289 cx_a.foreground().forbid_parking();
1290
1291 // Connect to a server as 3 clients.
1292 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1293 let client_a = server.create_client(cx_a, "user_a").await;
1294 let client_b = server.create_client(cx_b, "user_b").await;
1295 let client_c = server.create_client(cx_c, "user_c").await;
1296
1297 // Share a worktree as client A.
1298 fs.insert_tree(
1299 "/a",
1300 json!({
1301 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1302 "file1": "",
1303 "file2": ""
1304 }),
1305 )
1306 .await;
1307 let project_a = cx_a.update(|cx| {
1308 Project::local(
1309 client_a.clone(),
1310 client_a.user_store.clone(),
1311 lang_registry.clone(),
1312 fs.clone(),
1313 cx,
1314 )
1315 });
1316 let (worktree_a, _) = project_a
1317 .update(cx_a, |p, cx| {
1318 p.find_or_create_local_worktree("/a", true, cx)
1319 })
1320 .await
1321 .unwrap();
1322 worktree_a
1323 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1324 .await;
1325 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1326 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1327 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1328
1329 // Join that worktree as clients B and C.
1330 let project_b = Project::remote(
1331 project_id,
1332 client_b.clone(),
1333 client_b.user_store.clone(),
1334 lang_registry.clone(),
1335 fs.clone(),
1336 &mut cx_b.to_async(),
1337 )
1338 .await
1339 .unwrap();
1340 let project_c = Project::remote(
1341 project_id,
1342 client_c.clone(),
1343 client_c.user_store.clone(),
1344 lang_registry.clone(),
1345 fs.clone(),
1346 &mut cx_c.to_async(),
1347 )
1348 .await
1349 .unwrap();
1350 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1351 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1352
1353 // Open and edit a buffer as both guests B and C.
1354 let buffer_b = project_b
1355 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1356 .await
1357 .unwrap();
1358 let buffer_c = project_c
1359 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1360 .await
1361 .unwrap();
1362 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1363 buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1364
1365 // Open and edit that buffer as the host.
1366 let buffer_a = project_a
1367 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1368 .await
1369 .unwrap();
1370
1371 buffer_a
1372 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1373 .await;
1374 buffer_a.update(cx_a, |buf, cx| {
1375 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1376 });
1377
1378 // Wait for edits to propagate
1379 buffer_a
1380 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1381 .await;
1382 buffer_b
1383 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1384 .await;
1385 buffer_c
1386 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1387 .await;
1388
1389 // Edit the buffer as the host and concurrently save as guest B.
1390 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1391 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1392 save_b.await.unwrap();
1393 assert_eq!(
1394 fs.load("/a/file1".as_ref()).await.unwrap(),
1395 "hi-a, i-am-c, i-am-b, i-am-a"
1396 );
1397 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1398 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1399 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1400
1401 worktree_a.flush_fs_events(cx_a).await;
1402
1403 // Make changes on host's file system, see those changes on guest worktrees.
1404 fs.rename(
1405 "/a/file1".as_ref(),
1406 "/a/file1-renamed".as_ref(),
1407 Default::default(),
1408 )
1409 .await
1410 .unwrap();
1411
1412 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1413 .await
1414 .unwrap();
1415 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1416
1417 worktree_a
1418 .condition(&cx_a, |tree, _| {
1419 tree.paths()
1420 .map(|p| p.to_string_lossy())
1421 .collect::<Vec<_>>()
1422 == [".zed.toml", "file1-renamed", "file3", "file4"]
1423 })
1424 .await;
1425 worktree_b
1426 .condition(&cx_b, |tree, _| {
1427 tree.paths()
1428 .map(|p| p.to_string_lossy())
1429 .collect::<Vec<_>>()
1430 == [".zed.toml", "file1-renamed", "file3", "file4"]
1431 })
1432 .await;
1433 worktree_c
1434 .condition(&cx_c, |tree, _| {
1435 tree.paths()
1436 .map(|p| p.to_string_lossy())
1437 .collect::<Vec<_>>()
1438 == [".zed.toml", "file1-renamed", "file3", "file4"]
1439 })
1440 .await;
1441
1442 // Ensure buffer files are updated as well.
1443 buffer_a
1444 .condition(&cx_a, |buf, _| {
1445 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1446 })
1447 .await;
1448 buffer_b
1449 .condition(&cx_b, |buf, _| {
1450 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1451 })
1452 .await;
1453 buffer_c
1454 .condition(&cx_c, |buf, _| {
1455 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1456 })
1457 .await;
1458 }
1459
1460 #[gpui::test(iterations = 10)]
1461 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1462 cx_a.foreground().forbid_parking();
1463 let lang_registry = Arc::new(LanguageRegistry::test());
1464 let fs = FakeFs::new(cx_a.background());
1465
1466 // Connect to a server as 2 clients.
1467 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1468 let client_a = server.create_client(cx_a, "user_a").await;
1469 let client_b = server.create_client(cx_b, "user_b").await;
1470
1471 // Share a project as client A
1472 fs.insert_tree(
1473 "/dir",
1474 json!({
1475 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1476 "a.txt": "a-contents",
1477 }),
1478 )
1479 .await;
1480
1481 let project_a = cx_a.update(|cx| {
1482 Project::local(
1483 client_a.clone(),
1484 client_a.user_store.clone(),
1485 lang_registry.clone(),
1486 fs.clone(),
1487 cx,
1488 )
1489 });
1490 let (worktree_a, _) = project_a
1491 .update(cx_a, |p, cx| {
1492 p.find_or_create_local_worktree("/dir", true, cx)
1493 })
1494 .await
1495 .unwrap();
1496 worktree_a
1497 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1498 .await;
1499 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1500 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1501 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1502
1503 // Join that project as client B
1504 let project_b = Project::remote(
1505 project_id,
1506 client_b.clone(),
1507 client_b.user_store.clone(),
1508 lang_registry.clone(),
1509 fs.clone(),
1510 &mut cx_b.to_async(),
1511 )
1512 .await
1513 .unwrap();
1514
1515 // Open a buffer as client B
1516 let buffer_b = project_b
1517 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1518 .await
1519 .unwrap();
1520
1521 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1522 buffer_b.read_with(cx_b, |buf, _| {
1523 assert!(buf.is_dirty());
1524 assert!(!buf.has_conflict());
1525 });
1526
1527 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1528 buffer_b
1529 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1530 .await;
1531 buffer_b.read_with(cx_b, |buf, _| {
1532 assert!(!buf.has_conflict());
1533 });
1534
1535 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1536 buffer_b.read_with(cx_b, |buf, _| {
1537 assert!(buf.is_dirty());
1538 assert!(!buf.has_conflict());
1539 });
1540 }
1541
1542 #[gpui::test(iterations = 10)]
1543 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1544 cx_a.foreground().forbid_parking();
1545 let lang_registry = Arc::new(LanguageRegistry::test());
1546 let fs = FakeFs::new(cx_a.background());
1547
1548 // Connect to a server as 2 clients.
1549 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1550 let client_a = server.create_client(cx_a, "user_a").await;
1551 let client_b = server.create_client(cx_b, "user_b").await;
1552
1553 // Share a project as client A
1554 fs.insert_tree(
1555 "/dir",
1556 json!({
1557 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1558 "a.txt": "a-contents",
1559 }),
1560 )
1561 .await;
1562
1563 let project_a = cx_a.update(|cx| {
1564 Project::local(
1565 client_a.clone(),
1566 client_a.user_store.clone(),
1567 lang_registry.clone(),
1568 fs.clone(),
1569 cx,
1570 )
1571 });
1572 let (worktree_a, _) = project_a
1573 .update(cx_a, |p, cx| {
1574 p.find_or_create_local_worktree("/dir", true, cx)
1575 })
1576 .await
1577 .unwrap();
1578 worktree_a
1579 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1580 .await;
1581 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1582 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1583 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1584
1585 // Join that project as client B
1586 let project_b = Project::remote(
1587 project_id,
1588 client_b.clone(),
1589 client_b.user_store.clone(),
1590 lang_registry.clone(),
1591 fs.clone(),
1592 &mut cx_b.to_async(),
1593 )
1594 .await
1595 .unwrap();
1596 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1597
1598 // Open a buffer as client B
1599 let buffer_b = project_b
1600 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1601 .await
1602 .unwrap();
1603 buffer_b.read_with(cx_b, |buf, _| {
1604 assert!(!buf.is_dirty());
1605 assert!(!buf.has_conflict());
1606 });
1607
1608 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1609 .await
1610 .unwrap();
1611 buffer_b
1612 .condition(&cx_b, |buf, _| {
1613 buf.text() == "new contents" && !buf.is_dirty()
1614 })
1615 .await;
1616 buffer_b.read_with(cx_b, |buf, _| {
1617 assert!(!buf.has_conflict());
1618 });
1619 }
1620
1621 #[gpui::test(iterations = 10)]
1622 async fn test_editing_while_guest_opens_buffer(
1623 cx_a: &mut TestAppContext,
1624 cx_b: &mut TestAppContext,
1625 ) {
1626 cx_a.foreground().forbid_parking();
1627 let lang_registry = Arc::new(LanguageRegistry::test());
1628 let fs = FakeFs::new(cx_a.background());
1629
1630 // Connect to a server as 2 clients.
1631 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1632 let client_a = server.create_client(cx_a, "user_a").await;
1633 let client_b = server.create_client(cx_b, "user_b").await;
1634
1635 // Share a project as client A
1636 fs.insert_tree(
1637 "/dir",
1638 json!({
1639 ".zed.toml": r#"collaborators = ["user_b"]"#,
1640 "a.txt": "a-contents",
1641 }),
1642 )
1643 .await;
1644 let project_a = cx_a.update(|cx| {
1645 Project::local(
1646 client_a.clone(),
1647 client_a.user_store.clone(),
1648 lang_registry.clone(),
1649 fs.clone(),
1650 cx,
1651 )
1652 });
1653 let (worktree_a, _) = project_a
1654 .update(cx_a, |p, cx| {
1655 p.find_or_create_local_worktree("/dir", true, cx)
1656 })
1657 .await
1658 .unwrap();
1659 worktree_a
1660 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1661 .await;
1662 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1663 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1664 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1665
1666 // Join that project as client B
1667 let project_b = Project::remote(
1668 project_id,
1669 client_b.clone(),
1670 client_b.user_store.clone(),
1671 lang_registry.clone(),
1672 fs.clone(),
1673 &mut cx_b.to_async(),
1674 )
1675 .await
1676 .unwrap();
1677
1678 // Open a buffer as client A
1679 let buffer_a = project_a
1680 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1681 .await
1682 .unwrap();
1683
1684 // Start opening the same buffer as client B
1685 let buffer_b = cx_b
1686 .background()
1687 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1688
1689 // Edit the buffer as client A while client B is still opening it.
1690 cx_b.background().simulate_random_delay().await;
1691 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1692 cx_b.background().simulate_random_delay().await;
1693 buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1694
1695 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1696 let buffer_b = buffer_b.await.unwrap();
1697 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1698 }
1699
1700 #[gpui::test(iterations = 10)]
1701 async fn test_leaving_worktree_while_opening_buffer(
1702 cx_a: &mut TestAppContext,
1703 cx_b: &mut TestAppContext,
1704 ) {
1705 cx_a.foreground().forbid_parking();
1706 let lang_registry = Arc::new(LanguageRegistry::test());
1707 let fs = FakeFs::new(cx_a.background());
1708
1709 // Connect to a server as 2 clients.
1710 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1711 let client_a = server.create_client(cx_a, "user_a").await;
1712 let client_b = server.create_client(cx_b, "user_b").await;
1713
1714 // Share a project as client A
1715 fs.insert_tree(
1716 "/dir",
1717 json!({
1718 ".zed.toml": r#"collaborators = ["user_b"]"#,
1719 "a.txt": "a-contents",
1720 }),
1721 )
1722 .await;
1723 let project_a = cx_a.update(|cx| {
1724 Project::local(
1725 client_a.clone(),
1726 client_a.user_store.clone(),
1727 lang_registry.clone(),
1728 fs.clone(),
1729 cx,
1730 )
1731 });
1732 let (worktree_a, _) = project_a
1733 .update(cx_a, |p, cx| {
1734 p.find_or_create_local_worktree("/dir", true, cx)
1735 })
1736 .await
1737 .unwrap();
1738 worktree_a
1739 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1740 .await;
1741 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1742 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1743 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1744
1745 // Join that project as client B
1746 let project_b = Project::remote(
1747 project_id,
1748 client_b.clone(),
1749 client_b.user_store.clone(),
1750 lang_registry.clone(),
1751 fs.clone(),
1752 &mut cx_b.to_async(),
1753 )
1754 .await
1755 .unwrap();
1756
1757 // See that a guest has joined as client A.
1758 project_a
1759 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1760 .await;
1761
1762 // Begin opening a buffer as client B, but leave the project before the open completes.
1763 let buffer_b = cx_b
1764 .background()
1765 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1766 cx_b.update(|_| drop(project_b));
1767 drop(buffer_b);
1768
1769 // See that the guest has left.
1770 project_a
1771 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1772 .await;
1773 }
1774
1775 #[gpui::test(iterations = 10)]
1776 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1777 cx_a.foreground().forbid_parking();
1778 let lang_registry = Arc::new(LanguageRegistry::test());
1779 let fs = FakeFs::new(cx_a.background());
1780
1781 // Connect to a server as 2 clients.
1782 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1783 let client_a = server.create_client(cx_a, "user_a").await;
1784 let client_b = server.create_client(cx_b, "user_b").await;
1785
1786 // Share a project as client A
1787 fs.insert_tree(
1788 "/a",
1789 json!({
1790 ".zed.toml": r#"collaborators = ["user_b"]"#,
1791 "a.txt": "a-contents",
1792 "b.txt": "b-contents",
1793 }),
1794 )
1795 .await;
1796 let project_a = cx_a.update(|cx| {
1797 Project::local(
1798 client_a.clone(),
1799 client_a.user_store.clone(),
1800 lang_registry.clone(),
1801 fs.clone(),
1802 cx,
1803 )
1804 });
1805 let (worktree_a, _) = project_a
1806 .update(cx_a, |p, cx| {
1807 p.find_or_create_local_worktree("/a", true, cx)
1808 })
1809 .await
1810 .unwrap();
1811 worktree_a
1812 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1813 .await;
1814 let project_id = project_a
1815 .update(cx_a, |project, _| project.next_remote_id())
1816 .await;
1817 project_a
1818 .update(cx_a, |project, cx| project.share(cx))
1819 .await
1820 .unwrap();
1821
1822 // Join that project as client B
1823 let _project_b = Project::remote(
1824 project_id,
1825 client_b.clone(),
1826 client_b.user_store.clone(),
1827 lang_registry.clone(),
1828 fs.clone(),
1829 &mut cx_b.to_async(),
1830 )
1831 .await
1832 .unwrap();
1833
1834 // Client A sees that a guest has joined.
1835 project_a
1836 .condition(cx_a, |p, _| p.collaborators().len() == 1)
1837 .await;
1838
1839 // Drop client B's connection and ensure client A observes client B leaving the project.
1840 client_b.disconnect(&cx_b.to_async()).unwrap();
1841 project_a
1842 .condition(cx_a, |p, _| p.collaborators().len() == 0)
1843 .await;
1844
1845 // Rejoin the project as client B
1846 let _project_b = Project::remote(
1847 project_id,
1848 client_b.clone(),
1849 client_b.user_store.clone(),
1850 lang_registry.clone(),
1851 fs.clone(),
1852 &mut cx_b.to_async(),
1853 )
1854 .await
1855 .unwrap();
1856
1857 // Client A sees that a guest has re-joined.
1858 project_a
1859 .condition(cx_a, |p, _| p.collaborators().len() == 1)
1860 .await;
1861
1862 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1863 client_b.wait_for_current_user(cx_b).await;
1864 server.disconnect_client(client_b.current_user_id(cx_b));
1865 cx_a.foreground().advance_clock(Duration::from_secs(3));
1866 project_a
1867 .condition(cx_a, |p, _| p.collaborators().len() == 0)
1868 .await;
1869 }
1870
1871 #[gpui::test(iterations = 10)]
1872 async fn test_collaborating_with_diagnostics(
1873 cx_a: &mut TestAppContext,
1874 cx_b: &mut TestAppContext,
1875 ) {
1876 cx_a.foreground().forbid_parking();
1877 let mut lang_registry = Arc::new(LanguageRegistry::test());
1878 let fs = FakeFs::new(cx_a.background());
1879
1880 // Set up a fake language server.
1881 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1882 Arc::get_mut(&mut lang_registry)
1883 .unwrap()
1884 .add(Arc::new(Language::new(
1885 LanguageConfig {
1886 name: "Rust".into(),
1887 path_suffixes: vec!["rs".to_string()],
1888 language_server: Some(language_server_config),
1889 ..Default::default()
1890 },
1891 Some(tree_sitter_rust::language()),
1892 )));
1893
1894 // Connect to a server as 2 clients.
1895 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1896 let client_a = server.create_client(cx_a, "user_a").await;
1897 let client_b = server.create_client(cx_b, "user_b").await;
1898
1899 // Share a project as client A
1900 fs.insert_tree(
1901 "/a",
1902 json!({
1903 ".zed.toml": r#"collaborators = ["user_b"]"#,
1904 "a.rs": "let one = two",
1905 "other.rs": "",
1906 }),
1907 )
1908 .await;
1909 let project_a = cx_a.update(|cx| {
1910 Project::local(
1911 client_a.clone(),
1912 client_a.user_store.clone(),
1913 lang_registry.clone(),
1914 fs.clone(),
1915 cx,
1916 )
1917 });
1918 let (worktree_a, _) = project_a
1919 .update(cx_a, |p, cx| {
1920 p.find_or_create_local_worktree("/a", true, cx)
1921 })
1922 .await
1923 .unwrap();
1924 worktree_a
1925 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1926 .await;
1927 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1928 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1929 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1930
1931 // Cause the language server to start.
1932 let _ = cx_a
1933 .background()
1934 .spawn(project_a.update(cx_a, |project, cx| {
1935 project.open_buffer(
1936 ProjectPath {
1937 worktree_id,
1938 path: Path::new("other.rs").into(),
1939 },
1940 cx,
1941 )
1942 }))
1943 .await
1944 .unwrap();
1945
1946 // Simulate a language server reporting errors for a file.
1947 let mut fake_language_server = fake_language_servers.next().await.unwrap();
1948 fake_language_server
1949 .receive_notification::<lsp::notification::DidOpenTextDocument>()
1950 .await;
1951 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
1952 lsp::PublishDiagnosticsParams {
1953 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1954 version: None,
1955 diagnostics: vec![lsp::Diagnostic {
1956 severity: Some(lsp::DiagnosticSeverity::ERROR),
1957 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1958 message: "message 1".to_string(),
1959 ..Default::default()
1960 }],
1961 },
1962 );
1963
1964 // Wait for server to see the diagnostics update.
1965 server
1966 .condition(|store| {
1967 let worktree = store
1968 .project(project_id)
1969 .unwrap()
1970 .share
1971 .as_ref()
1972 .unwrap()
1973 .worktrees
1974 .get(&worktree_id.to_proto())
1975 .unwrap();
1976
1977 !worktree.diagnostic_summaries.is_empty()
1978 })
1979 .await;
1980
1981 // Join the worktree as client B.
1982 let project_b = Project::remote(
1983 project_id,
1984 client_b.clone(),
1985 client_b.user_store.clone(),
1986 lang_registry.clone(),
1987 fs.clone(),
1988 &mut cx_b.to_async(),
1989 )
1990 .await
1991 .unwrap();
1992
1993 project_b.read_with(cx_b, |project, cx| {
1994 assert_eq!(
1995 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1996 &[(
1997 ProjectPath {
1998 worktree_id,
1999 path: Arc::from(Path::new("a.rs")),
2000 },
2001 DiagnosticSummary {
2002 error_count: 1,
2003 warning_count: 0,
2004 ..Default::default()
2005 },
2006 )]
2007 )
2008 });
2009
2010 // Simulate a language server reporting more errors for a file.
2011 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2012 lsp::PublishDiagnosticsParams {
2013 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2014 version: None,
2015 diagnostics: vec![
2016 lsp::Diagnostic {
2017 severity: Some(lsp::DiagnosticSeverity::ERROR),
2018 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2019 message: "message 1".to_string(),
2020 ..Default::default()
2021 },
2022 lsp::Diagnostic {
2023 severity: Some(lsp::DiagnosticSeverity::WARNING),
2024 range: lsp::Range::new(
2025 lsp::Position::new(0, 10),
2026 lsp::Position::new(0, 13),
2027 ),
2028 message: "message 2".to_string(),
2029 ..Default::default()
2030 },
2031 ],
2032 },
2033 );
2034
2035 // Client b gets the updated summaries
2036 project_b
2037 .condition(&cx_b, |project, cx| {
2038 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2039 == &[(
2040 ProjectPath {
2041 worktree_id,
2042 path: Arc::from(Path::new("a.rs")),
2043 },
2044 DiagnosticSummary {
2045 error_count: 1,
2046 warning_count: 1,
2047 ..Default::default()
2048 },
2049 )]
2050 })
2051 .await;
2052
2053 // Open the file with the errors on client B. They should be present.
2054 let buffer_b = cx_b
2055 .background()
2056 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2057 .await
2058 .unwrap();
2059
2060 buffer_b.read_with(cx_b, |buffer, _| {
2061 assert_eq!(
2062 buffer
2063 .snapshot()
2064 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2065 .map(|entry| entry)
2066 .collect::<Vec<_>>(),
2067 &[
2068 DiagnosticEntry {
2069 range: Point::new(0, 4)..Point::new(0, 7),
2070 diagnostic: Diagnostic {
2071 group_id: 0,
2072 message: "message 1".to_string(),
2073 severity: lsp::DiagnosticSeverity::ERROR,
2074 is_primary: true,
2075 ..Default::default()
2076 }
2077 },
2078 DiagnosticEntry {
2079 range: Point::new(0, 10)..Point::new(0, 13),
2080 diagnostic: Diagnostic {
2081 group_id: 1,
2082 severity: lsp::DiagnosticSeverity::WARNING,
2083 message: "message 2".to_string(),
2084 is_primary: true,
2085 ..Default::default()
2086 }
2087 }
2088 ]
2089 );
2090 });
2091 }
2092
2093 #[gpui::test(iterations = 10)]
2094 async fn test_collaborating_with_completion(
2095 cx_a: &mut TestAppContext,
2096 cx_b: &mut TestAppContext,
2097 ) {
2098 cx_a.foreground().forbid_parking();
2099 let mut lang_registry = Arc::new(LanguageRegistry::test());
2100 let fs = FakeFs::new(cx_a.background());
2101
2102 // Set up a fake language server.
2103 let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2104 language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2105 completion_provider: Some(lsp::CompletionOptions {
2106 trigger_characters: Some(vec![".".to_string()]),
2107 ..Default::default()
2108 }),
2109 ..Default::default()
2110 });
2111 Arc::get_mut(&mut lang_registry)
2112 .unwrap()
2113 .add(Arc::new(Language::new(
2114 LanguageConfig {
2115 name: "Rust".into(),
2116 path_suffixes: vec!["rs".to_string()],
2117 language_server: Some(language_server_config),
2118 ..Default::default()
2119 },
2120 Some(tree_sitter_rust::language()),
2121 )));
2122
2123 // Connect to a server as 2 clients.
2124 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2125 let client_a = server.create_client(cx_a, "user_a").await;
2126 let client_b = server.create_client(cx_b, "user_b").await;
2127
2128 // Share a project as client A
2129 fs.insert_tree(
2130 "/a",
2131 json!({
2132 ".zed.toml": r#"collaborators = ["user_b"]"#,
2133 "main.rs": "fn main() { a }",
2134 "other.rs": "",
2135 }),
2136 )
2137 .await;
2138 let project_a = cx_a.update(|cx| {
2139 Project::local(
2140 client_a.clone(),
2141 client_a.user_store.clone(),
2142 lang_registry.clone(),
2143 fs.clone(),
2144 cx,
2145 )
2146 });
2147 let (worktree_a, _) = project_a
2148 .update(cx_a, |p, cx| {
2149 p.find_or_create_local_worktree("/a", true, cx)
2150 })
2151 .await
2152 .unwrap();
2153 worktree_a
2154 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2155 .await;
2156 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2157 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2158 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2159
2160 // Join the worktree as client B.
2161 let project_b = Project::remote(
2162 project_id,
2163 client_b.clone(),
2164 client_b.user_store.clone(),
2165 lang_registry.clone(),
2166 fs.clone(),
2167 &mut cx_b.to_async(),
2168 )
2169 .await
2170 .unwrap();
2171
2172 // Open a file in an editor as the guest.
2173 let buffer_b = project_b
2174 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2175 .await
2176 .unwrap();
2177 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2178 let editor_b = cx_b.add_view(window_b, |cx| {
2179 Editor::for_buffer(
2180 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2181 Some(project_b.clone()),
2182 cx,
2183 )
2184 });
2185
2186 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2187 buffer_b
2188 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2189 .await;
2190
2191 // Type a completion trigger character as the guest.
2192 editor_b.update(cx_b, |editor, cx| {
2193 editor.select_ranges([13..13], None, cx);
2194 editor.handle_input(&Input(".".into()), cx);
2195 cx.focus(&editor_b);
2196 });
2197
2198 // Receive a completion request as the host's language server.
2199 // Return some completions from the host's language server.
2200 cx_a.foreground().start_waiting();
2201 fake_language_server
2202 .handle_request::<lsp::request::Completion, _>(|params, _| {
2203 assert_eq!(
2204 params.text_document_position.text_document.uri,
2205 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2206 );
2207 assert_eq!(
2208 params.text_document_position.position,
2209 lsp::Position::new(0, 14),
2210 );
2211
2212 Some(lsp::CompletionResponse::Array(vec![
2213 lsp::CompletionItem {
2214 label: "first_method(…)".into(),
2215 detail: Some("fn(&mut self, B) -> C".into()),
2216 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2217 new_text: "first_method($1)".to_string(),
2218 range: lsp::Range::new(
2219 lsp::Position::new(0, 14),
2220 lsp::Position::new(0, 14),
2221 ),
2222 })),
2223 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2224 ..Default::default()
2225 },
2226 lsp::CompletionItem {
2227 label: "second_method(…)".into(),
2228 detail: Some("fn(&mut self, C) -> D<E>".into()),
2229 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2230 new_text: "second_method()".to_string(),
2231 range: lsp::Range::new(
2232 lsp::Position::new(0, 14),
2233 lsp::Position::new(0, 14),
2234 ),
2235 })),
2236 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2237 ..Default::default()
2238 },
2239 ]))
2240 })
2241 .next()
2242 .await
2243 .unwrap();
2244 cx_a.foreground().finish_waiting();
2245
2246 // Open the buffer on the host.
2247 let buffer_a = project_a
2248 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2249 .await
2250 .unwrap();
2251 buffer_a
2252 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2253 .await;
2254
2255 // Confirm a completion on the guest.
2256 editor_b
2257 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2258 .await;
2259 editor_b.update(cx_b, |editor, cx| {
2260 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2261 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2262 });
2263
2264 // Return a resolved completion from the host's language server.
2265 // The resolved completion has an additional text edit.
2266 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2267 |params, _| {
2268 assert_eq!(params.label, "first_method(…)");
2269 lsp::CompletionItem {
2270 label: "first_method(…)".into(),
2271 detail: Some("fn(&mut self, B) -> C".into()),
2272 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2273 new_text: "first_method($1)".to_string(),
2274 range: lsp::Range::new(
2275 lsp::Position::new(0, 14),
2276 lsp::Position::new(0, 14),
2277 ),
2278 })),
2279 additional_text_edits: Some(vec![lsp::TextEdit {
2280 new_text: "use d::SomeTrait;\n".to_string(),
2281 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2282 }]),
2283 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2284 ..Default::default()
2285 }
2286 },
2287 );
2288
2289 // The additional edit is applied.
2290 buffer_a
2291 .condition(&cx_a, |buffer, _| {
2292 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2293 })
2294 .await;
2295 buffer_b
2296 .condition(&cx_b, |buffer, _| {
2297 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2298 })
2299 .await;
2300 }
2301
2302 #[gpui::test(iterations = 10)]
2303 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2304 cx_a.foreground().forbid_parking();
2305 let mut lang_registry = Arc::new(LanguageRegistry::test());
2306 let fs = FakeFs::new(cx_a.background());
2307
2308 // Set up a fake language server.
2309 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2310 Arc::get_mut(&mut lang_registry)
2311 .unwrap()
2312 .add(Arc::new(Language::new(
2313 LanguageConfig {
2314 name: "Rust".into(),
2315 path_suffixes: vec!["rs".to_string()],
2316 language_server: Some(language_server_config),
2317 ..Default::default()
2318 },
2319 Some(tree_sitter_rust::language()),
2320 )));
2321
2322 // Connect to a server as 2 clients.
2323 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2324 let client_a = server.create_client(cx_a, "user_a").await;
2325 let client_b = server.create_client(cx_b, "user_b").await;
2326
2327 // Share a project as client A
2328 fs.insert_tree(
2329 "/a",
2330 json!({
2331 ".zed.toml": r#"collaborators = ["user_b"]"#,
2332 "a.rs": "let one = two",
2333 }),
2334 )
2335 .await;
2336 let project_a = cx_a.update(|cx| {
2337 Project::local(
2338 client_a.clone(),
2339 client_a.user_store.clone(),
2340 lang_registry.clone(),
2341 fs.clone(),
2342 cx,
2343 )
2344 });
2345 let (worktree_a, _) = project_a
2346 .update(cx_a, |p, cx| {
2347 p.find_or_create_local_worktree("/a", true, cx)
2348 })
2349 .await
2350 .unwrap();
2351 worktree_a
2352 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2353 .await;
2354 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2355 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2356 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2357
2358 // Join the worktree as client B.
2359 let project_b = Project::remote(
2360 project_id,
2361 client_b.clone(),
2362 client_b.user_store.clone(),
2363 lang_registry.clone(),
2364 fs.clone(),
2365 &mut cx_b.to_async(),
2366 )
2367 .await
2368 .unwrap();
2369
2370 let buffer_b = cx_b
2371 .background()
2372 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2373 .await
2374 .unwrap();
2375
2376 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2377 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2378 Some(vec![
2379 lsp::TextEdit {
2380 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2381 new_text: "h".to_string(),
2382 },
2383 lsp::TextEdit {
2384 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2385 new_text: "y".to_string(),
2386 },
2387 ])
2388 });
2389
2390 project_b
2391 .update(cx_b, |project, cx| {
2392 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2393 })
2394 .await
2395 .unwrap();
2396 assert_eq!(
2397 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2398 "let honey = two"
2399 );
2400 }
2401
2402 #[gpui::test(iterations = 10)]
2403 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2404 cx_a.foreground().forbid_parking();
2405 let mut lang_registry = Arc::new(LanguageRegistry::test());
2406 let fs = FakeFs::new(cx_a.background());
2407 fs.insert_tree(
2408 "/root-1",
2409 json!({
2410 ".zed.toml": r#"collaborators = ["user_b"]"#,
2411 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2412 }),
2413 )
2414 .await;
2415 fs.insert_tree(
2416 "/root-2",
2417 json!({
2418 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2419 }),
2420 )
2421 .await;
2422
2423 // Set up a fake language server.
2424 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2425 Arc::get_mut(&mut lang_registry)
2426 .unwrap()
2427 .add(Arc::new(Language::new(
2428 LanguageConfig {
2429 name: "Rust".into(),
2430 path_suffixes: vec!["rs".to_string()],
2431 language_server: Some(language_server_config),
2432 ..Default::default()
2433 },
2434 Some(tree_sitter_rust::language()),
2435 )));
2436
2437 // Connect to a server as 2 clients.
2438 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2439 let client_a = server.create_client(cx_a, "user_a").await;
2440 let client_b = server.create_client(cx_b, "user_b").await;
2441
2442 // Share a project as client A
2443 let project_a = cx_a.update(|cx| {
2444 Project::local(
2445 client_a.clone(),
2446 client_a.user_store.clone(),
2447 lang_registry.clone(),
2448 fs.clone(),
2449 cx,
2450 )
2451 });
2452 let (worktree_a, _) = project_a
2453 .update(cx_a, |p, cx| {
2454 p.find_or_create_local_worktree("/root-1", true, cx)
2455 })
2456 .await
2457 .unwrap();
2458 worktree_a
2459 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2460 .await;
2461 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2462 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2463 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2464
2465 // Join the worktree as client B.
2466 let project_b = Project::remote(
2467 project_id,
2468 client_b.clone(),
2469 client_b.user_store.clone(),
2470 lang_registry.clone(),
2471 fs.clone(),
2472 &mut cx_b.to_async(),
2473 )
2474 .await
2475 .unwrap();
2476
2477 // Open the file on client B.
2478 let buffer_b = cx_b
2479 .background()
2480 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2481 .await
2482 .unwrap();
2483
2484 // Request the definition of a symbol as the guest.
2485 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2486 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2487 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2488 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2489 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2490 )))
2491 });
2492
2493 let definitions_1 = project_b
2494 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2495 .await
2496 .unwrap();
2497 cx_b.read(|cx| {
2498 assert_eq!(definitions_1.len(), 1);
2499 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2500 let target_buffer = definitions_1[0].buffer.read(cx);
2501 assert_eq!(
2502 target_buffer.text(),
2503 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2504 );
2505 assert_eq!(
2506 definitions_1[0].range.to_point(target_buffer),
2507 Point::new(0, 6)..Point::new(0, 9)
2508 );
2509 });
2510
2511 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2512 // the previous call to `definition`.
2513 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2514 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2515 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2516 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2517 )))
2518 });
2519
2520 let definitions_2 = project_b
2521 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2522 .await
2523 .unwrap();
2524 cx_b.read(|cx| {
2525 assert_eq!(definitions_2.len(), 1);
2526 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2527 let target_buffer = definitions_2[0].buffer.read(cx);
2528 assert_eq!(
2529 target_buffer.text(),
2530 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2531 );
2532 assert_eq!(
2533 definitions_2[0].range.to_point(target_buffer),
2534 Point::new(1, 6)..Point::new(1, 11)
2535 );
2536 });
2537 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2538 }
2539
2540 #[gpui::test(iterations = 10)]
2541 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2542 cx_a.foreground().forbid_parking();
2543 let mut lang_registry = Arc::new(LanguageRegistry::test());
2544 let fs = FakeFs::new(cx_a.background());
2545 fs.insert_tree(
2546 "/root-1",
2547 json!({
2548 ".zed.toml": r#"collaborators = ["user_b"]"#,
2549 "one.rs": "const ONE: usize = 1;",
2550 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2551 }),
2552 )
2553 .await;
2554 fs.insert_tree(
2555 "/root-2",
2556 json!({
2557 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2558 }),
2559 )
2560 .await;
2561
2562 // Set up a fake language server.
2563 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2564 Arc::get_mut(&mut lang_registry)
2565 .unwrap()
2566 .add(Arc::new(Language::new(
2567 LanguageConfig {
2568 name: "Rust".into(),
2569 path_suffixes: vec!["rs".to_string()],
2570 language_server: Some(language_server_config),
2571 ..Default::default()
2572 },
2573 Some(tree_sitter_rust::language()),
2574 )));
2575
2576 // Connect to a server as 2 clients.
2577 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2578 let client_a = server.create_client(cx_a, "user_a").await;
2579 let client_b = server.create_client(cx_b, "user_b").await;
2580
2581 // Share a project as client A
2582 let project_a = cx_a.update(|cx| {
2583 Project::local(
2584 client_a.clone(),
2585 client_a.user_store.clone(),
2586 lang_registry.clone(),
2587 fs.clone(),
2588 cx,
2589 )
2590 });
2591 let (worktree_a, _) = project_a
2592 .update(cx_a, |p, cx| {
2593 p.find_or_create_local_worktree("/root-1", true, cx)
2594 })
2595 .await
2596 .unwrap();
2597 worktree_a
2598 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2599 .await;
2600 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2601 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2602 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2603
2604 // Join the worktree as client B.
2605 let project_b = Project::remote(
2606 project_id,
2607 client_b.clone(),
2608 client_b.user_store.clone(),
2609 lang_registry.clone(),
2610 fs.clone(),
2611 &mut cx_b.to_async(),
2612 )
2613 .await
2614 .unwrap();
2615
2616 // Open the file on client B.
2617 let buffer_b = cx_b
2618 .background()
2619 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2620 .await
2621 .unwrap();
2622
2623 // Request references to a symbol as the guest.
2624 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2625 fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2626 assert_eq!(
2627 params.text_document_position.text_document.uri.as_str(),
2628 "file:///root-1/one.rs"
2629 );
2630 Some(vec![
2631 lsp::Location {
2632 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2633 range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2634 },
2635 lsp::Location {
2636 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2637 range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2638 },
2639 lsp::Location {
2640 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2641 range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2642 },
2643 ])
2644 });
2645
2646 let references = project_b
2647 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
2648 .await
2649 .unwrap();
2650 cx_b.read(|cx| {
2651 assert_eq!(references.len(), 3);
2652 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2653
2654 let two_buffer = references[0].buffer.read(cx);
2655 let three_buffer = references[2].buffer.read(cx);
2656 assert_eq!(
2657 two_buffer.file().unwrap().path().as_ref(),
2658 Path::new("two.rs")
2659 );
2660 assert_eq!(references[1].buffer, references[0].buffer);
2661 assert_eq!(
2662 three_buffer.file().unwrap().full_path(cx),
2663 Path::new("three.rs")
2664 );
2665
2666 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2667 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2668 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2669 });
2670 }
2671
2672 #[gpui::test(iterations = 10)]
2673 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2674 cx_a.foreground().forbid_parking();
2675 let lang_registry = Arc::new(LanguageRegistry::test());
2676 let fs = FakeFs::new(cx_a.background());
2677 fs.insert_tree(
2678 "/root-1",
2679 json!({
2680 ".zed.toml": r#"collaborators = ["user_b"]"#,
2681 "a": "hello world",
2682 "b": "goodnight moon",
2683 "c": "a world of goo",
2684 "d": "world champion of clown world",
2685 }),
2686 )
2687 .await;
2688 fs.insert_tree(
2689 "/root-2",
2690 json!({
2691 "e": "disney world is fun",
2692 }),
2693 )
2694 .await;
2695
2696 // Connect to a server as 2 clients.
2697 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2698 let client_a = server.create_client(cx_a, "user_a").await;
2699 let client_b = server.create_client(cx_b, "user_b").await;
2700
2701 // Share a project as client A
2702 let project_a = cx_a.update(|cx| {
2703 Project::local(
2704 client_a.clone(),
2705 client_a.user_store.clone(),
2706 lang_registry.clone(),
2707 fs.clone(),
2708 cx,
2709 )
2710 });
2711 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2712
2713 let (worktree_1, _) = project_a
2714 .update(cx_a, |p, cx| {
2715 p.find_or_create_local_worktree("/root-1", true, cx)
2716 })
2717 .await
2718 .unwrap();
2719 worktree_1
2720 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2721 .await;
2722 let (worktree_2, _) = project_a
2723 .update(cx_a, |p, cx| {
2724 p.find_or_create_local_worktree("/root-2", true, cx)
2725 })
2726 .await
2727 .unwrap();
2728 worktree_2
2729 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2730 .await;
2731
2732 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2733
2734 // Join the worktree as client B.
2735 let project_b = Project::remote(
2736 project_id,
2737 client_b.clone(),
2738 client_b.user_store.clone(),
2739 lang_registry.clone(),
2740 fs.clone(),
2741 &mut cx_b.to_async(),
2742 )
2743 .await
2744 .unwrap();
2745
2746 let results = project_b
2747 .update(cx_b, |project, cx| {
2748 project.search(SearchQuery::text("world", false, false), cx)
2749 })
2750 .await
2751 .unwrap();
2752
2753 let mut ranges_by_path = results
2754 .into_iter()
2755 .map(|(buffer, ranges)| {
2756 buffer.read_with(cx_b, |buffer, cx| {
2757 let path = buffer.file().unwrap().full_path(cx);
2758 let offset_ranges = ranges
2759 .into_iter()
2760 .map(|range| range.to_offset(buffer))
2761 .collect::<Vec<_>>();
2762 (path, offset_ranges)
2763 })
2764 })
2765 .collect::<Vec<_>>();
2766 ranges_by_path.sort_by_key(|(path, _)| path.clone());
2767
2768 assert_eq!(
2769 ranges_by_path,
2770 &[
2771 (PathBuf::from("root-1/a"), vec![6..11]),
2772 (PathBuf::from("root-1/c"), vec![2..7]),
2773 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2774 (PathBuf::from("root-2/e"), vec![7..12]),
2775 ]
2776 );
2777 }
2778
2779 #[gpui::test(iterations = 10)]
2780 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2781 cx_a.foreground().forbid_parking();
2782 let lang_registry = Arc::new(LanguageRegistry::test());
2783 let fs = FakeFs::new(cx_a.background());
2784 fs.insert_tree(
2785 "/root-1",
2786 json!({
2787 ".zed.toml": r#"collaborators = ["user_b"]"#,
2788 "main.rs": "fn double(number: i32) -> i32 { number + number }",
2789 }),
2790 )
2791 .await;
2792
2793 // Set up a fake language server.
2794 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2795 lang_registry.add(Arc::new(Language::new(
2796 LanguageConfig {
2797 name: "Rust".into(),
2798 path_suffixes: vec!["rs".to_string()],
2799 language_server: Some(language_server_config),
2800 ..Default::default()
2801 },
2802 Some(tree_sitter_rust::language()),
2803 )));
2804
2805 // Connect to a server as 2 clients.
2806 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2807 let client_a = server.create_client(cx_a, "user_a").await;
2808 let client_b = server.create_client(cx_b, "user_b").await;
2809
2810 // Share a project as client A
2811 let project_a = cx_a.update(|cx| {
2812 Project::local(
2813 client_a.clone(),
2814 client_a.user_store.clone(),
2815 lang_registry.clone(),
2816 fs.clone(),
2817 cx,
2818 )
2819 });
2820 let (worktree_a, _) = project_a
2821 .update(cx_a, |p, cx| {
2822 p.find_or_create_local_worktree("/root-1", true, cx)
2823 })
2824 .await
2825 .unwrap();
2826 worktree_a
2827 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2828 .await;
2829 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2830 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2831 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2832
2833 // Join the worktree as client B.
2834 let project_b = Project::remote(
2835 project_id,
2836 client_b.clone(),
2837 client_b.user_store.clone(),
2838 lang_registry.clone(),
2839 fs.clone(),
2840 &mut cx_b.to_async(),
2841 )
2842 .await
2843 .unwrap();
2844
2845 // Open the file on client B.
2846 let buffer_b = cx_b
2847 .background()
2848 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2849 .await
2850 .unwrap();
2851
2852 // Request document highlights as the guest.
2853 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2854 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2855 |params, _| {
2856 assert_eq!(
2857 params
2858 .text_document_position_params
2859 .text_document
2860 .uri
2861 .as_str(),
2862 "file:///root-1/main.rs"
2863 );
2864 assert_eq!(
2865 params.text_document_position_params.position,
2866 lsp::Position::new(0, 34)
2867 );
2868 Some(vec![
2869 lsp::DocumentHighlight {
2870 kind: Some(lsp::DocumentHighlightKind::WRITE),
2871 range: lsp::Range::new(
2872 lsp::Position::new(0, 10),
2873 lsp::Position::new(0, 16),
2874 ),
2875 },
2876 lsp::DocumentHighlight {
2877 kind: Some(lsp::DocumentHighlightKind::READ),
2878 range: lsp::Range::new(
2879 lsp::Position::new(0, 32),
2880 lsp::Position::new(0, 38),
2881 ),
2882 },
2883 lsp::DocumentHighlight {
2884 kind: Some(lsp::DocumentHighlightKind::READ),
2885 range: lsp::Range::new(
2886 lsp::Position::new(0, 41),
2887 lsp::Position::new(0, 47),
2888 ),
2889 },
2890 ])
2891 },
2892 );
2893
2894 let highlights = project_b
2895 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
2896 .await
2897 .unwrap();
2898 buffer_b.read_with(cx_b, |buffer, _| {
2899 let snapshot = buffer.snapshot();
2900
2901 let highlights = highlights
2902 .into_iter()
2903 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2904 .collect::<Vec<_>>();
2905 assert_eq!(
2906 highlights,
2907 &[
2908 (lsp::DocumentHighlightKind::WRITE, 10..16),
2909 (lsp::DocumentHighlightKind::READ, 32..38),
2910 (lsp::DocumentHighlightKind::READ, 41..47)
2911 ]
2912 )
2913 });
2914 }
2915
2916 #[gpui::test(iterations = 10)]
2917 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2918 cx_a.foreground().forbid_parking();
2919 let mut lang_registry = Arc::new(LanguageRegistry::test());
2920 let fs = FakeFs::new(cx_a.background());
2921 fs.insert_tree(
2922 "/code",
2923 json!({
2924 "crate-1": {
2925 ".zed.toml": r#"collaborators = ["user_b"]"#,
2926 "one.rs": "const ONE: usize = 1;",
2927 },
2928 "crate-2": {
2929 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2930 },
2931 "private": {
2932 "passwords.txt": "the-password",
2933 }
2934 }),
2935 )
2936 .await;
2937
2938 // Set up a fake language server.
2939 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2940 Arc::get_mut(&mut lang_registry)
2941 .unwrap()
2942 .add(Arc::new(Language::new(
2943 LanguageConfig {
2944 name: "Rust".into(),
2945 path_suffixes: vec!["rs".to_string()],
2946 language_server: Some(language_server_config),
2947 ..Default::default()
2948 },
2949 Some(tree_sitter_rust::language()),
2950 )));
2951
2952 // Connect to a server as 2 clients.
2953 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2954 let client_a = server.create_client(cx_a, "user_a").await;
2955 let client_b = server.create_client(cx_b, "user_b").await;
2956
2957 // Share a project as client A
2958 let project_a = cx_a.update(|cx| {
2959 Project::local(
2960 client_a.clone(),
2961 client_a.user_store.clone(),
2962 lang_registry.clone(),
2963 fs.clone(),
2964 cx,
2965 )
2966 });
2967 let (worktree_a, _) = project_a
2968 .update(cx_a, |p, cx| {
2969 p.find_or_create_local_worktree("/code/crate-1", true, cx)
2970 })
2971 .await
2972 .unwrap();
2973 worktree_a
2974 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2975 .await;
2976 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2977 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2978 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2979
2980 // Join the worktree as client B.
2981 let project_b = Project::remote(
2982 project_id,
2983 client_b.clone(),
2984 client_b.user_store.clone(),
2985 lang_registry.clone(),
2986 fs.clone(),
2987 &mut cx_b.to_async(),
2988 )
2989 .await
2990 .unwrap();
2991
2992 // Cause the language server to start.
2993 let _buffer = cx_b
2994 .background()
2995 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2996 .await
2997 .unwrap();
2998
2999 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3000 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3001 #[allow(deprecated)]
3002 Some(vec![lsp::SymbolInformation {
3003 name: "TWO".into(),
3004 location: lsp::Location {
3005 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3006 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3007 },
3008 kind: lsp::SymbolKind::CONSTANT,
3009 tags: None,
3010 container_name: None,
3011 deprecated: None,
3012 }])
3013 });
3014
3015 // Request the definition of a symbol as the guest.
3016 let symbols = project_b
3017 .update(cx_b, |p, cx| p.symbols("two", cx))
3018 .await
3019 .unwrap();
3020 assert_eq!(symbols.len(), 1);
3021 assert_eq!(symbols[0].name, "TWO");
3022
3023 // Open one of the returned symbols.
3024 let buffer_b_2 = project_b
3025 .update(cx_b, |project, cx| {
3026 project.open_buffer_for_symbol(&symbols[0], cx)
3027 })
3028 .await
3029 .unwrap();
3030 buffer_b_2.read_with(cx_b, |buffer, _| {
3031 assert_eq!(
3032 buffer.file().unwrap().path().as_ref(),
3033 Path::new("../crate-2/two.rs")
3034 );
3035 });
3036
3037 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3038 let mut fake_symbol = symbols[0].clone();
3039 fake_symbol.path = Path::new("/code/secrets").into();
3040 let error = project_b
3041 .update(cx_b, |project, cx| {
3042 project.open_buffer_for_symbol(&fake_symbol, cx)
3043 })
3044 .await
3045 .unwrap_err();
3046 assert!(error.to_string().contains("invalid symbol signature"));
3047 }
3048
3049 #[gpui::test(iterations = 10)]
3050 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3051 cx_a: &mut TestAppContext,
3052 cx_b: &mut TestAppContext,
3053 mut rng: StdRng,
3054 ) {
3055 cx_a.foreground().forbid_parking();
3056 let mut lang_registry = Arc::new(LanguageRegistry::test());
3057 let fs = FakeFs::new(cx_a.background());
3058 fs.insert_tree(
3059 "/root",
3060 json!({
3061 ".zed.toml": r#"collaborators = ["user_b"]"#,
3062 "a.rs": "const ONE: usize = b::TWO;",
3063 "b.rs": "const TWO: usize = 2",
3064 }),
3065 )
3066 .await;
3067
3068 // Set up a fake language server.
3069 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3070
3071 Arc::get_mut(&mut lang_registry)
3072 .unwrap()
3073 .add(Arc::new(Language::new(
3074 LanguageConfig {
3075 name: "Rust".into(),
3076 path_suffixes: vec!["rs".to_string()],
3077 language_server: Some(language_server_config),
3078 ..Default::default()
3079 },
3080 Some(tree_sitter_rust::language()),
3081 )));
3082
3083 // Connect to a server as 2 clients.
3084 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3085 let client_a = server.create_client(cx_a, "user_a").await;
3086 let client_b = server.create_client(cx_b, "user_b").await;
3087
3088 // Share a project as client A
3089 let project_a = cx_a.update(|cx| {
3090 Project::local(
3091 client_a.clone(),
3092 client_a.user_store.clone(),
3093 lang_registry.clone(),
3094 fs.clone(),
3095 cx,
3096 )
3097 });
3098
3099 let (worktree_a, _) = project_a
3100 .update(cx_a, |p, cx| {
3101 p.find_or_create_local_worktree("/root", true, cx)
3102 })
3103 .await
3104 .unwrap();
3105 worktree_a
3106 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3107 .await;
3108 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3109 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3110 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3111
3112 // Join the worktree as client B.
3113 let project_b = Project::remote(
3114 project_id,
3115 client_b.clone(),
3116 client_b.user_store.clone(),
3117 lang_registry.clone(),
3118 fs.clone(),
3119 &mut cx_b.to_async(),
3120 )
3121 .await
3122 .unwrap();
3123
3124 let buffer_b1 = cx_b
3125 .background()
3126 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3127 .await
3128 .unwrap();
3129
3130 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3131 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3132 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3133 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3134 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3135 )))
3136 });
3137
3138 let definitions;
3139 let buffer_b2;
3140 if rng.gen() {
3141 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3142 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3143 } else {
3144 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3145 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3146 }
3147
3148 let buffer_b2 = buffer_b2.await.unwrap();
3149 let definitions = definitions.await.unwrap();
3150 assert_eq!(definitions.len(), 1);
3151 assert_eq!(definitions[0].buffer, buffer_b2);
3152 }
3153
3154 #[gpui::test(iterations = 10)]
3155 async fn test_collaborating_with_code_actions(
3156 cx_a: &mut TestAppContext,
3157 cx_b: &mut TestAppContext,
3158 ) {
3159 cx_a.foreground().forbid_parking();
3160 let mut lang_registry = Arc::new(LanguageRegistry::test());
3161 let fs = FakeFs::new(cx_a.background());
3162 let mut path_openers_b = Vec::new();
3163 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3164
3165 // Set up a fake language server.
3166 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3167 Arc::get_mut(&mut lang_registry)
3168 .unwrap()
3169 .add(Arc::new(Language::new(
3170 LanguageConfig {
3171 name: "Rust".into(),
3172 path_suffixes: vec!["rs".to_string()],
3173 language_server: Some(language_server_config),
3174 ..Default::default()
3175 },
3176 Some(tree_sitter_rust::language()),
3177 )));
3178
3179 // Connect to a server as 2 clients.
3180 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3181 let client_a = server.create_client(cx_a, "user_a").await;
3182 let client_b = server.create_client(cx_b, "user_b").await;
3183
3184 // Share a project as client A
3185 fs.insert_tree(
3186 "/a",
3187 json!({
3188 ".zed.toml": r#"collaborators = ["user_b"]"#,
3189 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3190 "other.rs": "pub fn foo() -> usize { 4 }",
3191 }),
3192 )
3193 .await;
3194 let project_a = cx_a.update(|cx| {
3195 Project::local(
3196 client_a.clone(),
3197 client_a.user_store.clone(),
3198 lang_registry.clone(),
3199 fs.clone(),
3200 cx,
3201 )
3202 });
3203 let (worktree_a, _) = project_a
3204 .update(cx_a, |p, cx| {
3205 p.find_or_create_local_worktree("/a", true, cx)
3206 })
3207 .await
3208 .unwrap();
3209 worktree_a
3210 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3211 .await;
3212 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3213 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3214 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3215
3216 // Join the worktree as client B.
3217 let project_b = Project::remote(
3218 project_id,
3219 client_b.clone(),
3220 client_b.user_store.clone(),
3221 lang_registry.clone(),
3222 fs.clone(),
3223 &mut cx_b.to_async(),
3224 )
3225 .await
3226 .unwrap();
3227 let mut params = cx_b.update(WorkspaceParams::test);
3228 params.languages = lang_registry.clone();
3229 params.client = client_b.client.clone();
3230 params.user_store = client_b.user_store.clone();
3231 params.project = project_b;
3232 params.path_openers = path_openers_b.into();
3233
3234 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3235 let editor_b = workspace_b
3236 .update(cx_b, |workspace, cx| {
3237 workspace.open_path((worktree_id, "main.rs").into(), cx)
3238 })
3239 .await
3240 .unwrap()
3241 .downcast::<Editor>()
3242 .unwrap();
3243
3244 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3245 fake_language_server
3246 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3247 assert_eq!(
3248 params.text_document.uri,
3249 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3250 );
3251 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3252 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3253 None
3254 })
3255 .next()
3256 .await;
3257
3258 // Move cursor to a location that contains code actions.
3259 editor_b.update(cx_b, |editor, cx| {
3260 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3261 cx.focus(&editor_b);
3262 });
3263
3264 fake_language_server
3265 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3266 assert_eq!(
3267 params.text_document.uri,
3268 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3269 );
3270 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3271 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3272
3273 Some(vec![lsp::CodeActionOrCommand::CodeAction(
3274 lsp::CodeAction {
3275 title: "Inline into all callers".to_string(),
3276 edit: Some(lsp::WorkspaceEdit {
3277 changes: Some(
3278 [
3279 (
3280 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3281 vec![lsp::TextEdit::new(
3282 lsp::Range::new(
3283 lsp::Position::new(1, 22),
3284 lsp::Position::new(1, 34),
3285 ),
3286 "4".to_string(),
3287 )],
3288 ),
3289 (
3290 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3291 vec![lsp::TextEdit::new(
3292 lsp::Range::new(
3293 lsp::Position::new(0, 0),
3294 lsp::Position::new(0, 27),
3295 ),
3296 "".to_string(),
3297 )],
3298 ),
3299 ]
3300 .into_iter()
3301 .collect(),
3302 ),
3303 ..Default::default()
3304 }),
3305 data: Some(json!({
3306 "codeActionParams": {
3307 "range": {
3308 "start": {"line": 1, "column": 31},
3309 "end": {"line": 1, "column": 31},
3310 }
3311 }
3312 })),
3313 ..Default::default()
3314 },
3315 )])
3316 })
3317 .next()
3318 .await;
3319
3320 // Toggle code actions and wait for them to display.
3321 editor_b.update(cx_b, |editor, cx| {
3322 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3323 });
3324 editor_b
3325 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3326 .await;
3327
3328 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3329
3330 // Confirming the code action will trigger a resolve request.
3331 let confirm_action = workspace_b
3332 .update(cx_b, |workspace, cx| {
3333 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3334 })
3335 .unwrap();
3336 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3337 lsp::CodeAction {
3338 title: "Inline into all callers".to_string(),
3339 edit: Some(lsp::WorkspaceEdit {
3340 changes: Some(
3341 [
3342 (
3343 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3344 vec![lsp::TextEdit::new(
3345 lsp::Range::new(
3346 lsp::Position::new(1, 22),
3347 lsp::Position::new(1, 34),
3348 ),
3349 "4".to_string(),
3350 )],
3351 ),
3352 (
3353 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3354 vec![lsp::TextEdit::new(
3355 lsp::Range::new(
3356 lsp::Position::new(0, 0),
3357 lsp::Position::new(0, 27),
3358 ),
3359 "".to_string(),
3360 )],
3361 ),
3362 ]
3363 .into_iter()
3364 .collect(),
3365 ),
3366 ..Default::default()
3367 }),
3368 ..Default::default()
3369 }
3370 });
3371
3372 // After the action is confirmed, an editor containing both modified files is opened.
3373 confirm_action.await.unwrap();
3374 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3375 workspace
3376 .active_item(cx)
3377 .unwrap()
3378 .downcast::<Editor>()
3379 .unwrap()
3380 });
3381 code_action_editor.update(cx_b, |editor, cx| {
3382 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3383 editor.undo(&Undo, cx);
3384 assert_eq!(
3385 editor.text(cx),
3386 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3387 );
3388 editor.redo(&Redo, cx);
3389 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3390 });
3391 }
3392
3393 #[gpui::test(iterations = 10)]
3394 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3395 cx_a.foreground().forbid_parking();
3396 let mut lang_registry = Arc::new(LanguageRegistry::test());
3397 let fs = FakeFs::new(cx_a.background());
3398 let mut path_openers_b = Vec::new();
3399 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3400
3401 // Set up a fake language server.
3402 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3403 Arc::get_mut(&mut lang_registry)
3404 .unwrap()
3405 .add(Arc::new(Language::new(
3406 LanguageConfig {
3407 name: "Rust".into(),
3408 path_suffixes: vec!["rs".to_string()],
3409 language_server: Some(language_server_config),
3410 ..Default::default()
3411 },
3412 Some(tree_sitter_rust::language()),
3413 )));
3414
3415 // Connect to a server as 2 clients.
3416 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3417 let client_a = server.create_client(cx_a, "user_a").await;
3418 let client_b = server.create_client(cx_b, "user_b").await;
3419
3420 // Share a project as client A
3421 fs.insert_tree(
3422 "/dir",
3423 json!({
3424 ".zed.toml": r#"collaborators = ["user_b"]"#,
3425 "one.rs": "const ONE: usize = 1;",
3426 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3427 }),
3428 )
3429 .await;
3430 let project_a = cx_a.update(|cx| {
3431 Project::local(
3432 client_a.clone(),
3433 client_a.user_store.clone(),
3434 lang_registry.clone(),
3435 fs.clone(),
3436 cx,
3437 )
3438 });
3439 let (worktree_a, _) = project_a
3440 .update(cx_a, |p, cx| {
3441 p.find_or_create_local_worktree("/dir", true, cx)
3442 })
3443 .await
3444 .unwrap();
3445 worktree_a
3446 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3447 .await;
3448 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3449 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3450 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3451
3452 // Join the worktree as client B.
3453 let project_b = Project::remote(
3454 project_id,
3455 client_b.clone(),
3456 client_b.user_store.clone(),
3457 lang_registry.clone(),
3458 fs.clone(),
3459 &mut cx_b.to_async(),
3460 )
3461 .await
3462 .unwrap();
3463 let mut params = cx_b.update(WorkspaceParams::test);
3464 params.languages = lang_registry.clone();
3465 params.client = client_b.client.clone();
3466 params.user_store = client_b.user_store.clone();
3467 params.project = project_b;
3468 params.path_openers = path_openers_b.into();
3469
3470 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3471 let editor_b = workspace_b
3472 .update(cx_b, |workspace, cx| {
3473 workspace.open_path((worktree_id, "one.rs").into(), cx)
3474 })
3475 .await
3476 .unwrap()
3477 .downcast::<Editor>()
3478 .unwrap();
3479 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3480
3481 // Move cursor to a location that can be renamed.
3482 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3483 editor.select_ranges([7..7], None, cx);
3484 editor.rename(&Rename, cx).unwrap()
3485 });
3486
3487 fake_language_server
3488 .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3489 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3490 assert_eq!(params.position, lsp::Position::new(0, 7));
3491 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3492 lsp::Position::new(0, 6),
3493 lsp::Position::new(0, 9),
3494 )))
3495 })
3496 .next()
3497 .await
3498 .unwrap();
3499 prepare_rename.await.unwrap();
3500 editor_b.update(cx_b, |editor, cx| {
3501 let rename = editor.pending_rename().unwrap();
3502 let buffer = editor.buffer().read(cx).snapshot(cx);
3503 assert_eq!(
3504 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3505 6..9
3506 );
3507 rename.editor.update(cx, |rename_editor, cx| {
3508 rename_editor.buffer().update(cx, |rename_buffer, cx| {
3509 rename_buffer.edit([0..3], "THREE", cx);
3510 });
3511 });
3512 });
3513
3514 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3515 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3516 });
3517 fake_language_server
3518 .handle_request::<lsp::request::Rename, _>(|params, _| {
3519 assert_eq!(
3520 params.text_document_position.text_document.uri.as_str(),
3521 "file:///dir/one.rs"
3522 );
3523 assert_eq!(
3524 params.text_document_position.position,
3525 lsp::Position::new(0, 6)
3526 );
3527 assert_eq!(params.new_name, "THREE");
3528 Some(lsp::WorkspaceEdit {
3529 changes: Some(
3530 [
3531 (
3532 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3533 vec![lsp::TextEdit::new(
3534 lsp::Range::new(
3535 lsp::Position::new(0, 6),
3536 lsp::Position::new(0, 9),
3537 ),
3538 "THREE".to_string(),
3539 )],
3540 ),
3541 (
3542 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3543 vec![
3544 lsp::TextEdit::new(
3545 lsp::Range::new(
3546 lsp::Position::new(0, 24),
3547 lsp::Position::new(0, 27),
3548 ),
3549 "THREE".to_string(),
3550 ),
3551 lsp::TextEdit::new(
3552 lsp::Range::new(
3553 lsp::Position::new(0, 35),
3554 lsp::Position::new(0, 38),
3555 ),
3556 "THREE".to_string(),
3557 ),
3558 ],
3559 ),
3560 ]
3561 .into_iter()
3562 .collect(),
3563 ),
3564 ..Default::default()
3565 })
3566 })
3567 .next()
3568 .await
3569 .unwrap();
3570 confirm_rename.await.unwrap();
3571
3572 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3573 workspace
3574 .active_item(cx)
3575 .unwrap()
3576 .downcast::<Editor>()
3577 .unwrap()
3578 });
3579 rename_editor.update(cx_b, |editor, cx| {
3580 assert_eq!(
3581 editor.text(cx),
3582 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3583 );
3584 editor.undo(&Undo, cx);
3585 assert_eq!(
3586 editor.text(cx),
3587 "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3588 );
3589 editor.redo(&Redo, cx);
3590 assert_eq!(
3591 editor.text(cx),
3592 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3593 );
3594 });
3595
3596 // Ensure temporary rename edits cannot be undone/redone.
3597 editor_b.update(cx_b, |editor, cx| {
3598 editor.undo(&Undo, cx);
3599 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3600 editor.undo(&Undo, cx);
3601 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3602 editor.redo(&Redo, cx);
3603 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3604 })
3605 }
3606
3607 #[gpui::test(iterations = 10)]
3608 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3609 cx_a.foreground().forbid_parking();
3610
3611 // Connect to a server as 2 clients.
3612 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3613 let client_a = server.create_client(cx_a, "user_a").await;
3614 let client_b = server.create_client(cx_b, "user_b").await;
3615
3616 // Create an org that includes these 2 users.
3617 let db = &server.app_state.db;
3618 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3619 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3620 .await
3621 .unwrap();
3622 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3623 .await
3624 .unwrap();
3625
3626 // Create a channel that includes all the users.
3627 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3628 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3629 .await
3630 .unwrap();
3631 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3632 .await
3633 .unwrap();
3634 db.create_channel_message(
3635 channel_id,
3636 client_b.current_user_id(&cx_b),
3637 "hello A, it's B.",
3638 OffsetDateTime::now_utc(),
3639 1,
3640 )
3641 .await
3642 .unwrap();
3643
3644 let channels_a = cx_a
3645 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3646 channels_a
3647 .condition(cx_a, |list, _| list.available_channels().is_some())
3648 .await;
3649 channels_a.read_with(cx_a, |list, _| {
3650 assert_eq!(
3651 list.available_channels().unwrap(),
3652 &[ChannelDetails {
3653 id: channel_id.to_proto(),
3654 name: "test-channel".to_string()
3655 }]
3656 )
3657 });
3658 let channel_a = channels_a.update(cx_a, |this, cx| {
3659 this.get_channel(channel_id.to_proto(), cx).unwrap()
3660 });
3661 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3662 channel_a
3663 .condition(&cx_a, |channel, _| {
3664 channel_messages(channel)
3665 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3666 })
3667 .await;
3668
3669 let channels_b = cx_b
3670 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3671 channels_b
3672 .condition(cx_b, |list, _| list.available_channels().is_some())
3673 .await;
3674 channels_b.read_with(cx_b, |list, _| {
3675 assert_eq!(
3676 list.available_channels().unwrap(),
3677 &[ChannelDetails {
3678 id: channel_id.to_proto(),
3679 name: "test-channel".to_string()
3680 }]
3681 )
3682 });
3683
3684 let channel_b = channels_b.update(cx_b, |this, cx| {
3685 this.get_channel(channel_id.to_proto(), cx).unwrap()
3686 });
3687 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3688 channel_b
3689 .condition(&cx_b, |channel, _| {
3690 channel_messages(channel)
3691 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3692 })
3693 .await;
3694
3695 channel_a
3696 .update(cx_a, |channel, cx| {
3697 channel
3698 .send_message("oh, hi B.".to_string(), cx)
3699 .unwrap()
3700 .detach();
3701 let task = channel.send_message("sup".to_string(), cx).unwrap();
3702 assert_eq!(
3703 channel_messages(channel),
3704 &[
3705 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3706 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3707 ("user_a".to_string(), "sup".to_string(), true)
3708 ]
3709 );
3710 task
3711 })
3712 .await
3713 .unwrap();
3714
3715 channel_b
3716 .condition(&cx_b, |channel, _| {
3717 channel_messages(channel)
3718 == [
3719 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3720 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3721 ("user_a".to_string(), "sup".to_string(), false),
3722 ]
3723 })
3724 .await;
3725
3726 assert_eq!(
3727 server
3728 .state()
3729 .await
3730 .channel(channel_id)
3731 .unwrap()
3732 .connection_ids
3733 .len(),
3734 2
3735 );
3736 cx_b.update(|_| drop(channel_b));
3737 server
3738 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3739 .await;
3740
3741 cx_a.update(|_| drop(channel_a));
3742 server
3743 .condition(|state| state.channel(channel_id).is_none())
3744 .await;
3745 }
3746
3747 #[gpui::test(iterations = 10)]
3748 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3749 cx_a.foreground().forbid_parking();
3750
3751 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3752 let client_a = server.create_client(cx_a, "user_a").await;
3753
3754 let db = &server.app_state.db;
3755 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3756 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3757 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3758 .await
3759 .unwrap();
3760 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3761 .await
3762 .unwrap();
3763
3764 let channels_a = cx_a
3765 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3766 channels_a
3767 .condition(cx_a, |list, _| list.available_channels().is_some())
3768 .await;
3769 let channel_a = channels_a.update(cx_a, |this, cx| {
3770 this.get_channel(channel_id.to_proto(), cx).unwrap()
3771 });
3772
3773 // Messages aren't allowed to be too long.
3774 channel_a
3775 .update(cx_a, |channel, cx| {
3776 let long_body = "this is long.\n".repeat(1024);
3777 channel.send_message(long_body, cx).unwrap()
3778 })
3779 .await
3780 .unwrap_err();
3781
3782 // Messages aren't allowed to be blank.
3783 channel_a.update(cx_a, |channel, cx| {
3784 channel.send_message(String::new(), cx).unwrap_err()
3785 });
3786
3787 // Leading and trailing whitespace are trimmed.
3788 channel_a
3789 .update(cx_a, |channel, cx| {
3790 channel
3791 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3792 .unwrap()
3793 })
3794 .await
3795 .unwrap();
3796 assert_eq!(
3797 db.get_channel_messages(channel_id, 10, None)
3798 .await
3799 .unwrap()
3800 .iter()
3801 .map(|m| &m.body)
3802 .collect::<Vec<_>>(),
3803 &["surrounded by whitespace"]
3804 );
3805 }
3806
3807 #[gpui::test(iterations = 10)]
3808 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3809 cx_a.foreground().forbid_parking();
3810
3811 // Connect to a server as 2 clients.
3812 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3813 let client_a = server.create_client(cx_a, "user_a").await;
3814 let client_b = server.create_client(cx_b, "user_b").await;
3815 let mut status_b = client_b.status();
3816
3817 // Create an org that includes these 2 users.
3818 let db = &server.app_state.db;
3819 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3820 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3821 .await
3822 .unwrap();
3823 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3824 .await
3825 .unwrap();
3826
3827 // Create a channel that includes all the users.
3828 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3829 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3830 .await
3831 .unwrap();
3832 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3833 .await
3834 .unwrap();
3835 db.create_channel_message(
3836 channel_id,
3837 client_b.current_user_id(&cx_b),
3838 "hello A, it's B.",
3839 OffsetDateTime::now_utc(),
3840 2,
3841 )
3842 .await
3843 .unwrap();
3844
3845 let channels_a = cx_a
3846 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3847 channels_a
3848 .condition(cx_a, |list, _| list.available_channels().is_some())
3849 .await;
3850
3851 channels_a.read_with(cx_a, |list, _| {
3852 assert_eq!(
3853 list.available_channels().unwrap(),
3854 &[ChannelDetails {
3855 id: channel_id.to_proto(),
3856 name: "test-channel".to_string()
3857 }]
3858 )
3859 });
3860 let channel_a = channels_a.update(cx_a, |this, cx| {
3861 this.get_channel(channel_id.to_proto(), cx).unwrap()
3862 });
3863 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3864 channel_a
3865 .condition(&cx_a, |channel, _| {
3866 channel_messages(channel)
3867 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3868 })
3869 .await;
3870
3871 let channels_b = cx_b
3872 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3873 channels_b
3874 .condition(cx_b, |list, _| list.available_channels().is_some())
3875 .await;
3876 channels_b.read_with(cx_b, |list, _| {
3877 assert_eq!(
3878 list.available_channels().unwrap(),
3879 &[ChannelDetails {
3880 id: channel_id.to_proto(),
3881 name: "test-channel".to_string()
3882 }]
3883 )
3884 });
3885
3886 let channel_b = channels_b.update(cx_b, |this, cx| {
3887 this.get_channel(channel_id.to_proto(), cx).unwrap()
3888 });
3889 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3890 channel_b
3891 .condition(&cx_b, |channel, _| {
3892 channel_messages(channel)
3893 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3894 })
3895 .await;
3896
3897 // Disconnect client B, ensuring we can still access its cached channel data.
3898 server.forbid_connections();
3899 server.disconnect_client(client_b.current_user_id(&cx_b));
3900 cx_b.foreground().advance_clock(Duration::from_secs(3));
3901 while !matches!(
3902 status_b.next().await,
3903 Some(client::Status::ReconnectionError { .. })
3904 ) {}
3905
3906 channels_b.read_with(cx_b, |channels, _| {
3907 assert_eq!(
3908 channels.available_channels().unwrap(),
3909 [ChannelDetails {
3910 id: channel_id.to_proto(),
3911 name: "test-channel".to_string()
3912 }]
3913 )
3914 });
3915 channel_b.read_with(cx_b, |channel, _| {
3916 assert_eq!(
3917 channel_messages(channel),
3918 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3919 )
3920 });
3921
3922 // Send a message from client B while it is disconnected.
3923 channel_b
3924 .update(cx_b, |channel, cx| {
3925 let task = channel
3926 .send_message("can you see this?".to_string(), cx)
3927 .unwrap();
3928 assert_eq!(
3929 channel_messages(channel),
3930 &[
3931 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3932 ("user_b".to_string(), "can you see this?".to_string(), true)
3933 ]
3934 );
3935 task
3936 })
3937 .await
3938 .unwrap_err();
3939
3940 // Send a message from client A while B is disconnected.
3941 channel_a
3942 .update(cx_a, |channel, cx| {
3943 channel
3944 .send_message("oh, hi B.".to_string(), cx)
3945 .unwrap()
3946 .detach();
3947 let task = channel.send_message("sup".to_string(), cx).unwrap();
3948 assert_eq!(
3949 channel_messages(channel),
3950 &[
3951 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3952 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3953 ("user_a".to_string(), "sup".to_string(), true)
3954 ]
3955 );
3956 task
3957 })
3958 .await
3959 .unwrap();
3960
3961 // Give client B a chance to reconnect.
3962 server.allow_connections();
3963 cx_b.foreground().advance_clock(Duration::from_secs(10));
3964
3965 // Verify that B sees the new messages upon reconnection, as well as the message client B
3966 // sent while offline.
3967 channel_b
3968 .condition(&cx_b, |channel, _| {
3969 channel_messages(channel)
3970 == [
3971 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3972 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3973 ("user_a".to_string(), "sup".to_string(), false),
3974 ("user_b".to_string(), "can you see this?".to_string(), false),
3975 ]
3976 })
3977 .await;
3978
3979 // Ensure client A and B can communicate normally after reconnection.
3980 channel_a
3981 .update(cx_a, |channel, cx| {
3982 channel.send_message("you online?".to_string(), cx).unwrap()
3983 })
3984 .await
3985 .unwrap();
3986 channel_b
3987 .condition(&cx_b, |channel, _| {
3988 channel_messages(channel)
3989 == [
3990 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3991 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3992 ("user_a".to_string(), "sup".to_string(), false),
3993 ("user_b".to_string(), "can you see this?".to_string(), false),
3994 ("user_a".to_string(), "you online?".to_string(), false),
3995 ]
3996 })
3997 .await;
3998
3999 channel_b
4000 .update(cx_b, |channel, cx| {
4001 channel.send_message("yep".to_string(), cx).unwrap()
4002 })
4003 .await
4004 .unwrap();
4005 channel_a
4006 .condition(&cx_a, |channel, _| {
4007 channel_messages(channel)
4008 == [
4009 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4010 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4011 ("user_a".to_string(), "sup".to_string(), false),
4012 ("user_b".to_string(), "can you see this?".to_string(), false),
4013 ("user_a".to_string(), "you online?".to_string(), false),
4014 ("user_b".to_string(), "yep".to_string(), false),
4015 ]
4016 })
4017 .await;
4018 }
4019
4020 #[gpui::test(iterations = 10)]
4021 async fn test_contacts(
4022 cx_a: &mut TestAppContext,
4023 cx_b: &mut TestAppContext,
4024 cx_c: &mut TestAppContext,
4025 ) {
4026 cx_a.foreground().forbid_parking();
4027 let lang_registry = Arc::new(LanguageRegistry::test());
4028 let fs = FakeFs::new(cx_a.background());
4029
4030 // Connect to a server as 3 clients.
4031 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4032 let client_a = server.create_client(cx_a, "user_a").await;
4033 let client_b = server.create_client(cx_b, "user_b").await;
4034 let client_c = server.create_client(cx_c, "user_c").await;
4035
4036 // Share a worktree as client A.
4037 fs.insert_tree(
4038 "/a",
4039 json!({
4040 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4041 }),
4042 )
4043 .await;
4044
4045 let project_a = cx_a.update(|cx| {
4046 Project::local(
4047 client_a.clone(),
4048 client_a.user_store.clone(),
4049 lang_registry.clone(),
4050 fs.clone(),
4051 cx,
4052 )
4053 });
4054 let (worktree_a, _) = project_a
4055 .update(cx_a, |p, cx| {
4056 p.find_or_create_local_worktree("/a", true, cx)
4057 })
4058 .await
4059 .unwrap();
4060 worktree_a
4061 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4062 .await;
4063
4064 client_a
4065 .user_store
4066 .condition(&cx_a, |user_store, _| {
4067 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4068 })
4069 .await;
4070 client_b
4071 .user_store
4072 .condition(&cx_b, |user_store, _| {
4073 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4074 })
4075 .await;
4076 client_c
4077 .user_store
4078 .condition(&cx_c, |user_store, _| {
4079 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4080 })
4081 .await;
4082
4083 let project_id = project_a
4084 .update(cx_a, |project, _| project.next_remote_id())
4085 .await;
4086 project_a
4087 .update(cx_a, |project, cx| project.share(cx))
4088 .await
4089 .unwrap();
4090
4091 let _project_b = Project::remote(
4092 project_id,
4093 client_b.clone(),
4094 client_b.user_store.clone(),
4095 lang_registry.clone(),
4096 fs.clone(),
4097 &mut cx_b.to_async(),
4098 )
4099 .await
4100 .unwrap();
4101
4102 client_a
4103 .user_store
4104 .condition(&cx_a, |user_store, _| {
4105 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4106 })
4107 .await;
4108 client_b
4109 .user_store
4110 .condition(&cx_b, |user_store, _| {
4111 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4112 })
4113 .await;
4114 client_c
4115 .user_store
4116 .condition(&cx_c, |user_store, _| {
4117 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4118 })
4119 .await;
4120
4121 project_a
4122 .condition(&cx_a, |project, _| {
4123 project.collaborators().contains_key(&client_b.peer_id)
4124 })
4125 .await;
4126
4127 cx_a.update(move |_| drop(project_a));
4128 client_a
4129 .user_store
4130 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4131 .await;
4132 client_b
4133 .user_store
4134 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4135 .await;
4136 client_c
4137 .user_store
4138 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4139 .await;
4140
4141 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4142 user_store
4143 .contacts()
4144 .iter()
4145 .map(|contact| {
4146 let worktrees = contact
4147 .projects
4148 .iter()
4149 .map(|p| {
4150 (
4151 p.worktree_root_names[0].as_str(),
4152 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4153 )
4154 })
4155 .collect();
4156 (contact.user.github_login.as_str(), worktrees)
4157 })
4158 .collect()
4159 }
4160 }
4161
4162 #[gpui::test(iterations = 100)]
4163 async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4164 cx.foreground().forbid_parking();
4165 let max_peers = env::var("MAX_PEERS")
4166 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4167 .unwrap_or(5);
4168 let max_operations = env::var("OPERATIONS")
4169 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4170 .unwrap_or(10);
4171
4172 let rng = Arc::new(Mutex::new(rng));
4173
4174 let guest_lang_registry = Arc::new(LanguageRegistry::test());
4175 let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4176
4177 let fs = FakeFs::new(cx.background());
4178 fs.insert_tree(
4179 "/_collab",
4180 json!({
4181 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4182 }),
4183 )
4184 .await;
4185
4186 let operations = Rc::new(Cell::new(0));
4187 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4188 let mut clients = Vec::new();
4189
4190 let mut next_entity_id = 100000;
4191 let mut host_cx = TestAppContext::new(
4192 cx.foreground_platform(),
4193 cx.platform(),
4194 cx.foreground(),
4195 cx.background(),
4196 cx.font_cache(),
4197 cx.leak_detector(),
4198 next_entity_id,
4199 );
4200 let host = server.create_client(&mut host_cx, "host").await;
4201 let host_project = host_cx.update(|cx| {
4202 Project::local(
4203 host.client.clone(),
4204 host.user_store.clone(),
4205 Arc::new(LanguageRegistry::test()),
4206 fs.clone(),
4207 cx,
4208 )
4209 });
4210 let host_project_id = host_project
4211 .update(&mut host_cx, |p, _| p.next_remote_id())
4212 .await;
4213
4214 let (collab_worktree, _) = host_project
4215 .update(&mut host_cx, |project, cx| {
4216 project.find_or_create_local_worktree("/_collab", true, cx)
4217 })
4218 .await
4219 .unwrap();
4220 collab_worktree
4221 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4222 .await;
4223 host_project
4224 .update(&mut host_cx, |project, cx| project.share(cx))
4225 .await
4226 .unwrap();
4227
4228 clients.push(cx.foreground().spawn(host.simulate_host(
4229 host_project,
4230 language_server_config,
4231 operations.clone(),
4232 max_operations,
4233 rng.clone(),
4234 host_cx,
4235 )));
4236
4237 while operations.get() < max_operations {
4238 cx.background().simulate_random_delay().await;
4239 if clients.len() >= max_peers {
4240 break;
4241 } else if rng.lock().gen_bool(0.05) {
4242 operations.set(operations.get() + 1);
4243
4244 let guest_id = clients.len();
4245 log::info!("Adding guest {}", guest_id);
4246 next_entity_id += 100000;
4247 let mut guest_cx = TestAppContext::new(
4248 cx.foreground_platform(),
4249 cx.platform(),
4250 cx.foreground(),
4251 cx.background(),
4252 cx.font_cache(),
4253 cx.leak_detector(),
4254 next_entity_id,
4255 );
4256 let guest = server
4257 .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4258 .await;
4259 let guest_project = Project::remote(
4260 host_project_id,
4261 guest.client.clone(),
4262 guest.user_store.clone(),
4263 guest_lang_registry.clone(),
4264 FakeFs::new(cx.background()),
4265 &mut guest_cx.to_async(),
4266 )
4267 .await
4268 .unwrap();
4269 clients.push(cx.foreground().spawn(guest.simulate_guest(
4270 guest_id,
4271 guest_project,
4272 operations.clone(),
4273 max_operations,
4274 rng.clone(),
4275 guest_cx,
4276 )));
4277
4278 log::info!("Guest {} added", guest_id);
4279 }
4280 }
4281
4282 let mut clients = futures::future::join_all(clients).await;
4283 cx.foreground().run_until_parked();
4284
4285 let (host_client, mut host_cx) = clients.remove(0);
4286 let host_project = host_client.project.as_ref().unwrap();
4287 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4288 project
4289 .worktrees(cx)
4290 .map(|worktree| {
4291 let snapshot = worktree.read(cx).snapshot();
4292 (snapshot.id(), snapshot)
4293 })
4294 .collect::<BTreeMap<_, _>>()
4295 });
4296
4297 host_client
4298 .project
4299 .as_ref()
4300 .unwrap()
4301 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4302
4303 for (guest_client, mut guest_cx) in clients.into_iter() {
4304 let guest_id = guest_client.client.id();
4305 let worktree_snapshots =
4306 guest_client
4307 .project
4308 .as_ref()
4309 .unwrap()
4310 .read_with(&guest_cx, |project, cx| {
4311 project
4312 .worktrees(cx)
4313 .map(|worktree| {
4314 let worktree = worktree.read(cx);
4315 (worktree.id(), worktree.snapshot())
4316 })
4317 .collect::<BTreeMap<_, _>>()
4318 });
4319
4320 assert_eq!(
4321 worktree_snapshots.keys().collect::<Vec<_>>(),
4322 host_worktree_snapshots.keys().collect::<Vec<_>>(),
4323 "guest {} has different worktrees than the host",
4324 guest_id
4325 );
4326 for (id, host_snapshot) in &host_worktree_snapshots {
4327 let guest_snapshot = &worktree_snapshots[id];
4328 assert_eq!(
4329 guest_snapshot.root_name(),
4330 host_snapshot.root_name(),
4331 "guest {} has different root name than the host for worktree {}",
4332 guest_id,
4333 id
4334 );
4335 assert_eq!(
4336 guest_snapshot.entries(false).collect::<Vec<_>>(),
4337 host_snapshot.entries(false).collect::<Vec<_>>(),
4338 "guest {} has different snapshot than the host for worktree {}",
4339 guest_id,
4340 id
4341 );
4342 }
4343
4344 guest_client
4345 .project
4346 .as_ref()
4347 .unwrap()
4348 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4349
4350 for guest_buffer in &guest_client.buffers {
4351 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4352 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4353 project.buffer_for_id(buffer_id, cx).expect(&format!(
4354 "host does not have buffer for guest:{}, peer:{}, id:{}",
4355 guest_id, guest_client.peer_id, buffer_id
4356 ))
4357 });
4358 let path = host_buffer
4359 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4360
4361 assert_eq!(
4362 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4363 0,
4364 "guest {}, buffer {}, path {:?} has deferred operations",
4365 guest_id,
4366 buffer_id,
4367 path,
4368 );
4369 assert_eq!(
4370 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4371 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4372 "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4373 guest_id,
4374 buffer_id,
4375 path
4376 );
4377 }
4378
4379 guest_cx.update(|_| drop(guest_client));
4380 }
4381
4382 host_cx.update(|_| drop(host_client));
4383 }
4384
4385 struct TestServer {
4386 peer: Arc<Peer>,
4387 app_state: Arc<AppState>,
4388 server: Arc<Server>,
4389 foreground: Rc<executor::Foreground>,
4390 notifications: mpsc::UnboundedReceiver<()>,
4391 connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4392 forbid_connections: Arc<AtomicBool>,
4393 _test_db: TestDb,
4394 }
4395
4396 impl TestServer {
4397 async fn start(
4398 foreground: Rc<executor::Foreground>,
4399 background: Arc<executor::Background>,
4400 ) -> Self {
4401 let test_db = TestDb::fake(background);
4402 let app_state = Self::build_app_state(&test_db).await;
4403 let peer = Peer::new();
4404 let notifications = mpsc::unbounded();
4405 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4406 Self {
4407 peer,
4408 app_state,
4409 server,
4410 foreground,
4411 notifications: notifications.1,
4412 connection_killers: Default::default(),
4413 forbid_connections: Default::default(),
4414 _test_db: test_db,
4415 }
4416 }
4417
4418 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4419 cx.update(|cx| {
4420 let settings = Settings::test(cx);
4421 cx.add_app_state(settings);
4422 });
4423
4424 let http = FakeHttpClient::with_404_response();
4425 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4426 let client_name = name.to_string();
4427 let mut client = Client::new(http.clone());
4428 let server = self.server.clone();
4429 let connection_killers = self.connection_killers.clone();
4430 let forbid_connections = self.forbid_connections.clone();
4431 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4432
4433 Arc::get_mut(&mut client)
4434 .unwrap()
4435 .override_authenticate(move |cx| {
4436 cx.spawn(|_| async move {
4437 let access_token = "the-token".to_string();
4438 Ok(Credentials {
4439 user_id: user_id.0 as u64,
4440 access_token,
4441 })
4442 })
4443 })
4444 .override_establish_connection(move |credentials, cx| {
4445 assert_eq!(credentials.user_id, user_id.0 as u64);
4446 assert_eq!(credentials.access_token, "the-token");
4447
4448 let server = server.clone();
4449 let connection_killers = connection_killers.clone();
4450 let forbid_connections = forbid_connections.clone();
4451 let client_name = client_name.clone();
4452 let connection_id_tx = connection_id_tx.clone();
4453 cx.spawn(move |cx| async move {
4454 if forbid_connections.load(SeqCst) {
4455 Err(EstablishConnectionError::other(anyhow!(
4456 "server is forbidding connections"
4457 )))
4458 } else {
4459 let (client_conn, server_conn, kill_conn) =
4460 Connection::in_memory(cx.background());
4461 connection_killers.lock().insert(user_id, kill_conn);
4462 cx.background()
4463 .spawn(server.handle_connection(
4464 server_conn,
4465 client_name,
4466 user_id,
4467 Some(connection_id_tx),
4468 cx.background(),
4469 ))
4470 .detach();
4471 Ok(client_conn)
4472 }
4473 })
4474 });
4475
4476 client
4477 .authenticate_and_connect(&cx.to_async())
4478 .await
4479 .unwrap();
4480
4481 Channel::init(&client);
4482 Project::init(&client);
4483
4484 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4485 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4486
4487 let client = TestClient {
4488 client,
4489 peer_id,
4490 user_store,
4491 project: Default::default(),
4492 buffers: Default::default(),
4493 };
4494 client.wait_for_current_user(cx).await;
4495 client
4496 }
4497
4498 fn disconnect_client(&self, user_id: UserId) {
4499 self.connection_killers.lock().remove(&user_id);
4500 }
4501
4502 fn forbid_connections(&self) {
4503 self.forbid_connections.store(true, SeqCst);
4504 }
4505
4506 fn allow_connections(&self) {
4507 self.forbid_connections.store(false, SeqCst);
4508 }
4509
4510 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4511 let mut config = Config::default();
4512 config.session_secret = "a".repeat(32);
4513 config.database_url = test_db.url.clone();
4514 let github_client = github::AppClient::test();
4515 Arc::new(AppState {
4516 db: test_db.db().clone(),
4517 handlebars: Default::default(),
4518 auth_client: auth::build_client("", ""),
4519 repo_client: github::RepoClient::test(&github_client),
4520 github_client,
4521 config,
4522 })
4523 }
4524
4525 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4526 self.server.store.read()
4527 }
4528
4529 async fn condition<F>(&mut self, mut predicate: F)
4530 where
4531 F: FnMut(&Store) -> bool,
4532 {
4533 async_std::future::timeout(Duration::from_millis(500), async {
4534 while !(predicate)(&*self.server.store.read()) {
4535 self.foreground.start_waiting();
4536 self.notifications.next().await;
4537 self.foreground.finish_waiting();
4538 }
4539 })
4540 .await
4541 .expect("condition timed out");
4542 }
4543 }
4544
4545 impl Drop for TestServer {
4546 fn drop(&mut self) {
4547 self.peer.reset();
4548 }
4549 }
4550
4551 struct TestClient {
4552 client: Arc<Client>,
4553 pub peer_id: PeerId,
4554 pub user_store: ModelHandle<UserStore>,
4555 project: Option<ModelHandle<Project>>,
4556 buffers: HashSet<ModelHandle<language::Buffer>>,
4557 }
4558
4559 impl Deref for TestClient {
4560 type Target = Arc<Client>;
4561
4562 fn deref(&self) -> &Self::Target {
4563 &self.client
4564 }
4565 }
4566
4567 impl TestClient {
4568 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4569 UserId::from_proto(
4570 self.user_store
4571 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4572 )
4573 }
4574
4575 async fn wait_for_current_user(&self, cx: &TestAppContext) {
4576 let mut authed_user = self
4577 .user_store
4578 .read_with(cx, |user_store, _| user_store.watch_current_user());
4579 while authed_user.next().await.unwrap().is_none() {}
4580 }
4581
4582 fn simulate_host(
4583 mut self,
4584 project: ModelHandle<Project>,
4585 mut language_server_config: LanguageServerConfig,
4586 operations: Rc<Cell<usize>>,
4587 max_operations: usize,
4588 rng: Arc<Mutex<StdRng>>,
4589 mut cx: TestAppContext,
4590 ) -> impl Future<Output = (Self, TestAppContext)> {
4591 let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4592
4593 // Set up a fake language server.
4594 language_server_config.set_fake_initializer({
4595 let rng = rng.clone();
4596 let files = files.clone();
4597 let project = project.downgrade();
4598 move |fake_server| {
4599 fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4600 Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4601 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4602 range: lsp::Range::new(
4603 lsp::Position::new(0, 0),
4604 lsp::Position::new(0, 0),
4605 ),
4606 new_text: "the-new-text".to_string(),
4607 })),
4608 ..Default::default()
4609 }]))
4610 });
4611
4612 fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4613 Some(vec![lsp::CodeActionOrCommand::CodeAction(
4614 lsp::CodeAction {
4615 title: "the-code-action".to_string(),
4616 ..Default::default()
4617 },
4618 )])
4619 });
4620
4621 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4622 |params, _| {
4623 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4624 params.position,
4625 params.position,
4626 )))
4627 },
4628 );
4629
4630 fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4631 let files = files.clone();
4632 let rng = rng.clone();
4633 move |_, _| {
4634 let files = files.lock();
4635 let mut rng = rng.lock();
4636 let count = rng.gen_range::<usize, _>(1..3);
4637 let files = (0..count)
4638 .map(|_| files.choose(&mut *rng).unwrap())
4639 .collect::<Vec<_>>();
4640 log::info!("LSP: Returning definitions in files {:?}", &files);
4641 Some(lsp::GotoDefinitionResponse::Array(
4642 files
4643 .into_iter()
4644 .map(|file| lsp::Location {
4645 uri: lsp::Url::from_file_path(file).unwrap(),
4646 range: Default::default(),
4647 })
4648 .collect(),
4649 ))
4650 }
4651 });
4652
4653 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4654 let rng = rng.clone();
4655 let project = project.clone();
4656 move |params, mut cx| {
4657 if let Some(project) = project.upgrade(&cx) {
4658 project.update(&mut cx, |project, cx| {
4659 let path = params
4660 .text_document_position_params
4661 .text_document
4662 .uri
4663 .to_file_path()
4664 .unwrap();
4665 let (worktree, relative_path) =
4666 project.find_local_worktree(&path, cx)?;
4667 let project_path =
4668 ProjectPath::from((worktree.read(cx).id(), relative_path));
4669 let buffer =
4670 project.get_open_buffer(&project_path, cx)?.read(cx);
4671
4672 let mut highlights = Vec::new();
4673 let highlight_count = rng.lock().gen_range(1..=5);
4674 let mut prev_end = 0;
4675 for _ in 0..highlight_count {
4676 let range =
4677 buffer.random_byte_range(prev_end, &mut *rng.lock());
4678 let start = buffer
4679 .offset_to_point_utf16(range.start)
4680 .to_lsp_position();
4681 let end = buffer
4682 .offset_to_point_utf16(range.end)
4683 .to_lsp_position();
4684 highlights.push(lsp::DocumentHighlight {
4685 range: lsp::Range::new(start, end),
4686 kind: Some(lsp::DocumentHighlightKind::READ),
4687 });
4688 prev_end = range.end;
4689 }
4690 Some(highlights)
4691 })
4692 } else {
4693 None
4694 }
4695 }
4696 });
4697 }
4698 });
4699
4700 project.update(&mut cx, |project, _| {
4701 project.languages().add(Arc::new(Language::new(
4702 LanguageConfig {
4703 name: "Rust".into(),
4704 path_suffixes: vec!["rs".to_string()],
4705 language_server: Some(language_server_config),
4706 ..Default::default()
4707 },
4708 None,
4709 )));
4710 });
4711
4712 async move {
4713 let fs = project.read_with(&cx, |project, _| project.fs().clone());
4714 while operations.get() < max_operations {
4715 operations.set(operations.get() + 1);
4716
4717 let distribution = rng.lock().gen_range::<usize, _>(0..100);
4718 match distribution {
4719 0..=20 if !files.lock().is_empty() => {
4720 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4721 let mut path = path.as_path();
4722 while let Some(parent_path) = path.parent() {
4723 path = parent_path;
4724 if rng.lock().gen() {
4725 break;
4726 }
4727 }
4728
4729 log::info!("Host: find/create local worktree {:?}", path);
4730 let find_or_create_worktree = project.update(&mut cx, |project, cx| {
4731 project.find_or_create_local_worktree(path, true, cx)
4732 });
4733 let find_or_create_worktree = async move {
4734 find_or_create_worktree.await.unwrap();
4735 };
4736 if rng.lock().gen() {
4737 cx.background().spawn(find_or_create_worktree).detach();
4738 } else {
4739 find_or_create_worktree.await;
4740 }
4741 }
4742 10..=80 if !files.lock().is_empty() => {
4743 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4744 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4745 let (worktree, path) = project
4746 .update(&mut cx, |project, cx| {
4747 project.find_or_create_local_worktree(
4748 file.clone(),
4749 true,
4750 cx,
4751 )
4752 })
4753 .await
4754 .unwrap();
4755 let project_path =
4756 worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4757 log::info!(
4758 "Host: opening path {:?}, worktree {}, relative_path {:?}",
4759 file,
4760 project_path.0,
4761 project_path.1
4762 );
4763 let buffer = project
4764 .update(&mut cx, |project, cx| {
4765 project.open_buffer(project_path, cx)
4766 })
4767 .await
4768 .unwrap();
4769 self.buffers.insert(buffer.clone());
4770 buffer
4771 } else {
4772 self.buffers
4773 .iter()
4774 .choose(&mut *rng.lock())
4775 .unwrap()
4776 .clone()
4777 };
4778
4779 if rng.lock().gen_bool(0.1) {
4780 cx.update(|cx| {
4781 log::info!(
4782 "Host: dropping buffer {:?}",
4783 buffer.read(cx).file().unwrap().full_path(cx)
4784 );
4785 self.buffers.remove(&buffer);
4786 drop(buffer);
4787 });
4788 } else {
4789 buffer.update(&mut cx, |buffer, cx| {
4790 log::info!(
4791 "Host: updating buffer {:?} ({})",
4792 buffer.file().unwrap().full_path(cx),
4793 buffer.remote_id()
4794 );
4795 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4796 });
4797 }
4798 }
4799 _ => loop {
4800 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4801 let mut path = PathBuf::new();
4802 path.push("/");
4803 for _ in 0..path_component_count {
4804 let letter = rng.lock().gen_range(b'a'..=b'z');
4805 path.push(std::str::from_utf8(&[letter]).unwrap());
4806 }
4807 path.set_extension("rs");
4808 let parent_path = path.parent().unwrap();
4809
4810 log::info!("Host: creating file {:?}", path,);
4811
4812 if fs.create_dir(&parent_path).await.is_ok()
4813 && fs.create_file(&path, Default::default()).await.is_ok()
4814 {
4815 files.lock().push(path);
4816 break;
4817 } else {
4818 log::info!("Host: cannot create file");
4819 }
4820 },
4821 }
4822
4823 cx.background().simulate_random_delay().await;
4824 }
4825
4826 log::info!("Host done");
4827
4828 self.project = Some(project);
4829 (self, cx)
4830 }
4831 }
4832
4833 pub async fn simulate_guest(
4834 mut self,
4835 guest_id: usize,
4836 project: ModelHandle<Project>,
4837 operations: Rc<Cell<usize>>,
4838 max_operations: usize,
4839 rng: Arc<Mutex<StdRng>>,
4840 mut cx: TestAppContext,
4841 ) -> (Self, TestAppContext) {
4842 while operations.get() < max_operations {
4843 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4844 let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4845 project
4846 .worktrees(&cx)
4847 .filter(|worktree| {
4848 let worktree = worktree.read(cx);
4849 worktree.is_visible()
4850 && worktree.entries(false).any(|e| e.is_file())
4851 })
4852 .choose(&mut *rng.lock())
4853 }) {
4854 worktree
4855 } else {
4856 cx.background().simulate_random_delay().await;
4857 continue;
4858 };
4859
4860 operations.set(operations.get() + 1);
4861 let (worktree_root_name, project_path) =
4862 worktree.read_with(&cx, |worktree, _| {
4863 let entry = worktree
4864 .entries(false)
4865 .filter(|e| e.is_file())
4866 .choose(&mut *rng.lock())
4867 .unwrap();
4868 (
4869 worktree.root_name().to_string(),
4870 (worktree.id(), entry.path.clone()),
4871 )
4872 });
4873 log::info!(
4874 "Guest {}: opening path {:?} in worktree {} ({})",
4875 guest_id,
4876 project_path.1,
4877 project_path.0,
4878 worktree_root_name,
4879 );
4880 let buffer = project
4881 .update(&mut cx, |project, cx| {
4882 project.open_buffer(project_path.clone(), cx)
4883 })
4884 .await
4885 .unwrap();
4886 log::info!(
4887 "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
4888 guest_id,
4889 project_path.1,
4890 project_path.0,
4891 worktree_root_name,
4892 buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4893 );
4894 self.buffers.insert(buffer.clone());
4895 buffer
4896 } else {
4897 operations.set(operations.get() + 1);
4898
4899 self.buffers
4900 .iter()
4901 .choose(&mut *rng.lock())
4902 .unwrap()
4903 .clone()
4904 };
4905
4906 let choice = rng.lock().gen_range(0..100);
4907 match choice {
4908 0..=9 => {
4909 cx.update(|cx| {
4910 log::info!(
4911 "Guest {}: dropping buffer {:?}",
4912 guest_id,
4913 buffer.read(cx).file().unwrap().full_path(cx)
4914 );
4915 self.buffers.remove(&buffer);
4916 drop(buffer);
4917 });
4918 }
4919 10..=19 => {
4920 let completions = project.update(&mut cx, |project, cx| {
4921 log::info!(
4922 "Guest {}: requesting completions for buffer {} ({:?})",
4923 guest_id,
4924 buffer.read(cx).remote_id(),
4925 buffer.read(cx).file().unwrap().full_path(cx)
4926 );
4927 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4928 project.completions(&buffer, offset, cx)
4929 });
4930 let completions = cx.background().spawn(async move {
4931 completions.await.expect("completions request failed");
4932 });
4933 if rng.lock().gen_bool(0.3) {
4934 log::info!("Guest {}: detaching completions request", guest_id);
4935 completions.detach();
4936 } else {
4937 completions.await;
4938 }
4939 }
4940 20..=29 => {
4941 let code_actions = project.update(&mut cx, |project, cx| {
4942 log::info!(
4943 "Guest {}: requesting code actions for buffer {} ({:?})",
4944 guest_id,
4945 buffer.read(cx).remote_id(),
4946 buffer.read(cx).file().unwrap().full_path(cx)
4947 );
4948 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4949 project.code_actions(&buffer, range, cx)
4950 });
4951 let code_actions = cx.background().spawn(async move {
4952 code_actions.await.expect("code actions request failed");
4953 });
4954 if rng.lock().gen_bool(0.3) {
4955 log::info!("Guest {}: detaching code actions request", guest_id);
4956 code_actions.detach();
4957 } else {
4958 code_actions.await;
4959 }
4960 }
4961 30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4962 let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4963 log::info!(
4964 "Guest {}: saving buffer {} ({:?})",
4965 guest_id,
4966 buffer.remote_id(),
4967 buffer.file().unwrap().full_path(cx)
4968 );
4969 (buffer.version(), buffer.save(cx))
4970 });
4971 let save = cx.background().spawn(async move {
4972 let (saved_version, _) = save.await.expect("save request failed");
4973 assert!(saved_version.observed_all(&requested_version));
4974 });
4975 if rng.lock().gen_bool(0.3) {
4976 log::info!("Guest {}: detaching save request", guest_id);
4977 save.detach();
4978 } else {
4979 save.await;
4980 }
4981 }
4982 40..=44 => {
4983 let prepare_rename = project.update(&mut cx, |project, cx| {
4984 log::info!(
4985 "Guest {}: preparing rename for buffer {} ({:?})",
4986 guest_id,
4987 buffer.read(cx).remote_id(),
4988 buffer.read(cx).file().unwrap().full_path(cx)
4989 );
4990 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4991 project.prepare_rename(buffer, offset, cx)
4992 });
4993 let prepare_rename = cx.background().spawn(async move {
4994 prepare_rename.await.expect("prepare rename request failed");
4995 });
4996 if rng.lock().gen_bool(0.3) {
4997 log::info!("Guest {}: detaching prepare rename request", guest_id);
4998 prepare_rename.detach();
4999 } else {
5000 prepare_rename.await;
5001 }
5002 }
5003 45..=49 => {
5004 let definitions = project.update(&mut cx, |project, cx| {
5005 log::info!(
5006 "Guest {}: requesting definitions for buffer {} ({:?})",
5007 guest_id,
5008 buffer.read(cx).remote_id(),
5009 buffer.read(cx).file().unwrap().full_path(cx)
5010 );
5011 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5012 project.definition(&buffer, offset, cx)
5013 });
5014 let definitions = cx.background().spawn(async move {
5015 definitions.await.expect("definitions request failed")
5016 });
5017 if rng.lock().gen_bool(0.3) {
5018 log::info!("Guest {}: detaching definitions request", guest_id);
5019 definitions.detach();
5020 } else {
5021 self.buffers
5022 .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5023 }
5024 }
5025 50..=54 => {
5026 let highlights = project.update(&mut cx, |project, cx| {
5027 log::info!(
5028 "Guest {}: requesting highlights for buffer {} ({:?})",
5029 guest_id,
5030 buffer.read(cx).remote_id(),
5031 buffer.read(cx).file().unwrap().full_path(cx)
5032 );
5033 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5034 project.document_highlights(&buffer, offset, cx)
5035 });
5036 let highlights = cx.background().spawn(async move {
5037 highlights.await.expect("highlights request failed");
5038 });
5039 if rng.lock().gen_bool(0.3) {
5040 log::info!("Guest {}: detaching highlights request", guest_id);
5041 highlights.detach();
5042 } else {
5043 highlights.await;
5044 }
5045 }
5046 55..=59 => {
5047 let search = project.update(&mut cx, |project, cx| {
5048 let query = rng.lock().gen_range('a'..='z');
5049 log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5050 project.search(SearchQuery::text(query, false, false), cx)
5051 });
5052 let search = cx
5053 .background()
5054 .spawn(async move { search.await.expect("search request failed") });
5055 if rng.lock().gen_bool(0.3) {
5056 log::info!("Guest {}: detaching search request", guest_id);
5057 search.detach();
5058 } else {
5059 self.buffers.extend(search.await.into_keys());
5060 }
5061 }
5062 _ => {
5063 buffer.update(&mut cx, |buffer, cx| {
5064 log::info!(
5065 "Guest {}: updating buffer {} ({:?})",
5066 guest_id,
5067 buffer.remote_id(),
5068 buffer.file().unwrap().full_path(cx)
5069 );
5070 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5071 });
5072 }
5073 }
5074 cx.background().simulate_random_delay().await;
5075 }
5076
5077 log::info!("Guest {} done", guest_id);
5078
5079 self.project = Some(project);
5080 (self, cx)
5081 }
5082 }
5083
5084 impl Drop for TestClient {
5085 fn drop(&mut self) {
5086 self.client.tear_down();
5087 }
5088 }
5089
5090 impl Executor for Arc<gpui::executor::Background> {
5091 type Timer = gpui::executor::Timer;
5092
5093 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5094 self.spawn(future).detach();
5095 }
5096
5097 fn timer(&self, duration: Duration) -> Self::Timer {
5098 self.as_ref().timer(duration)
5099 }
5100 }
5101
5102 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5103 channel
5104 .messages()
5105 .cursor::<()>()
5106 .map(|m| {
5107 (
5108 m.sender.github_login.clone(),
5109 m.body.clone(),
5110 m.is_pending(),
5111 )
5112 })
5113 .collect()
5114 }
5115
5116 struct EmptyView;
5117
5118 impl gpui::Entity for EmptyView {
5119 type Event = ();
5120 }
5121
5122 impl gpui::View for EmptyView {
5123 fn ui_name() -> &'static str {
5124 "empty view"
5125 }
5126
5127 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5128 gpui::Element::boxed(gpui::elements::Empty)
5129 }
5130 }
5131}