1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_io::Timer;
10use async_std::task;
11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
12use collections::{HashMap, HashSet};
13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{
21 any::TypeId,
22 future::Future,
23 sync::Arc,
24 time::{Duration, Instant},
25};
26use store::{Store, Worktree};
27use surf::StatusCode;
28use tide::log;
29use tide::{
30 http::headers::{HeaderName, CONNECTION, UPGRADE},
31 Request, Response,
32};
33use time::OffsetDateTime;
34
35type MessageHandler = Box<
36 dyn Send
37 + Sync
38 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
39>;
40
41pub struct Server {
42 peer: Arc<Peer>,
43 store: RwLock<Store>,
44 app_state: Arc<AppState>,
45 handlers: HashMap<TypeId, MessageHandler>,
46 notifications: Option<mpsc::UnboundedSender<()>>,
47}
48
49pub trait Executor: Send + Clone {
50 type Timer: Send + Future;
51 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
52 fn timer(&self, duration: Duration) -> Self::Timer;
53}
54
55#[derive(Clone)]
56pub struct RealExecutor;
57
58const MESSAGE_COUNT_PER_PAGE: usize = 100;
59const MAX_MESSAGE_LEN: usize = 1024;
60
61impl Server {
62 pub fn new(
63 app_state: Arc<AppState>,
64 peer: Arc<Peer>,
65 notifications: Option<mpsc::UnboundedSender<()>>,
66 ) -> Arc<Self> {
67 let mut server = Self {
68 peer,
69 app_state,
70 store: Default::default(),
71 handlers: Default::default(),
72 notifications,
73 };
74
75 server
76 .add_request_handler(Server::ping)
77 .add_request_handler(Server::register_project)
78 .add_message_handler(Server::unregister_project)
79 .add_request_handler(Server::share_project)
80 .add_message_handler(Server::unshare_project)
81 .add_request_handler(Server::join_project)
82 .add_message_handler(Server::leave_project)
83 .add_request_handler(Server::register_worktree)
84 .add_message_handler(Server::unregister_worktree)
85 .add_request_handler(Server::update_worktree)
86 .add_message_handler(Server::start_language_server)
87 .add_message_handler(Server::update_language_server)
88 .add_message_handler(Server::update_diagnostic_summary)
89 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
90 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
91 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
92 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
93 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
94 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
95 .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
96 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
97 .add_request_handler(
98 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
99 )
100 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
101 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
102 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
103 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
104 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
105 .add_request_handler(Server::update_buffer)
106 .add_message_handler(Server::update_buffer_file)
107 .add_message_handler(Server::buffer_reloaded)
108 .add_message_handler(Server::buffer_saved)
109 .add_request_handler(Server::save_buffer)
110 .add_request_handler(Server::get_channels)
111 .add_request_handler(Server::get_users)
112 .add_request_handler(Server::join_channel)
113 .add_message_handler(Server::leave_channel)
114 .add_request_handler(Server::send_channel_message)
115 .add_request_handler(Server::get_channel_messages);
116
117 Arc::new(server)
118 }
119
120 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
121 where
122 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
123 Fut: 'static + Send + Future<Output = tide::Result<()>>,
124 M: EnvelopedMessage,
125 {
126 let prev_handler = self.handlers.insert(
127 TypeId::of::<M>(),
128 Box::new(move |server, envelope| {
129 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
130 (handler)(server, *envelope).boxed()
131 }),
132 );
133 if prev_handler.is_some() {
134 panic!("registered a handler for the same message twice");
135 }
136 self
137 }
138
139 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
140 where
141 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
142 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
143 M: RequestMessage,
144 {
145 self.add_message_handler(move |server, envelope| {
146 let receipt = envelope.receipt();
147 let response = (handler)(server.clone(), envelope);
148 async move {
149 match response.await {
150 Ok(response) => {
151 server.peer.respond(receipt, response)?;
152 Ok(())
153 }
154 Err(error) => {
155 server.peer.respond_with_error(
156 receipt,
157 proto::Error {
158 message: error.to_string(),
159 },
160 )?;
161 Err(error)
162 }
163 }
164 }
165 })
166 }
167
168 pub fn handle_connection<E: Executor>(
169 self: &Arc<Self>,
170 connection: Connection,
171 addr: String,
172 user_id: UserId,
173 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
174 executor: E,
175 ) -> impl Future<Output = ()> {
176 let mut this = self.clone();
177 async move {
178 let (connection_id, handle_io, mut incoming_rx) = this
179 .peer
180 .add_connection(connection, {
181 let executor = executor.clone();
182 move |duration| {
183 let timer = executor.timer(duration);
184 async move {
185 timer.await;
186 }
187 }
188 })
189 .await;
190
191 if let Some(send_connection_id) = send_connection_id.as_mut() {
192 let _ = send_connection_id.send(connection_id).await;
193 }
194
195 this.state_mut().add_connection(connection_id, user_id);
196 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
197 log::error!("error updating contacts for {:?}: {}", user_id, err);
198 }
199
200 let handle_io = handle_io.fuse();
201 futures::pin_mut!(handle_io);
202 loop {
203 let next_message = incoming_rx.next().fuse();
204 futures::pin_mut!(next_message);
205 futures::select_biased! {
206 result = handle_io => {
207 if let Err(err) = result {
208 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
209 }
210 break;
211 }
212 message = next_message => {
213 if let Some(message) = message {
214 let start_time = Instant::now();
215 let type_name = message.payload_type_name();
216 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
217 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
218 let notifications = this.notifications.clone();
219 let is_background = message.is_background();
220 let handle_message = (handler)(this.clone(), message);
221 let handle_message = async move {
222 if let Err(err) = handle_message.await {
223 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
224 } else {
225 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
226 }
227 if let Some(mut notifications) = notifications {
228 let _ = notifications.send(()).await;
229 }
230 };
231 if is_background {
232 executor.spawn_detached(handle_message);
233 } else {
234 handle_message.await;
235 }
236 } else {
237 log::warn!("unhandled message: {}", type_name);
238 }
239 } else {
240 log::info!("rpc connection closed {:?}", addr);
241 break;
242 }
243 }
244 }
245 }
246
247 if let Err(err) = this.sign_out(connection_id).await {
248 log::error!("error signing out connection {:?} - {:?}", addr, err);
249 }
250 }
251 }
252
253 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
254 self.peer.disconnect(connection_id);
255 let removed_connection = self.state_mut().remove_connection(connection_id)?;
256
257 for (project_id, project) in removed_connection.hosted_projects {
258 if let Some(share) = project.share {
259 broadcast(
260 connection_id,
261 share.guests.keys().copied().collect(),
262 |conn_id| {
263 self.peer
264 .send(conn_id, proto::UnshareProject { project_id })
265 },
266 )?;
267 }
268 }
269
270 for (project_id, peer_ids) in removed_connection.guest_project_ids {
271 broadcast(connection_id, peer_ids, |conn_id| {
272 self.peer.send(
273 conn_id,
274 proto::RemoveProjectCollaborator {
275 project_id,
276 peer_id: connection_id.0,
277 },
278 )
279 })?;
280 }
281
282 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
283 Ok(())
284 }
285
286 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
287 Ok(proto::Ack {})
288 }
289
290 async fn register_project(
291 mut self: Arc<Server>,
292 request: TypedEnvelope<proto::RegisterProject>,
293 ) -> tide::Result<proto::RegisterProjectResponse> {
294 let project_id = {
295 let mut state = self.state_mut();
296 let user_id = state.user_id_for_connection(request.sender_id)?;
297 state.register_project(request.sender_id, user_id)
298 };
299 Ok(proto::RegisterProjectResponse { project_id })
300 }
301
302 async fn unregister_project(
303 mut self: Arc<Server>,
304 request: TypedEnvelope<proto::UnregisterProject>,
305 ) -> tide::Result<()> {
306 let project = self
307 .state_mut()
308 .unregister_project(request.payload.project_id, request.sender_id)?;
309 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
310 Ok(())
311 }
312
313 async fn share_project(
314 mut self: Arc<Server>,
315 request: TypedEnvelope<proto::ShareProject>,
316 ) -> tide::Result<proto::Ack> {
317 self.state_mut()
318 .share_project(request.payload.project_id, request.sender_id);
319 Ok(proto::Ack {})
320 }
321
322 async fn unshare_project(
323 mut self: Arc<Server>,
324 request: TypedEnvelope<proto::UnshareProject>,
325 ) -> tide::Result<()> {
326 let project_id = request.payload.project_id;
327 let project = self
328 .state_mut()
329 .unshare_project(project_id, request.sender_id)?;
330
331 broadcast(request.sender_id, project.connection_ids, |conn_id| {
332 self.peer
333 .send(conn_id, proto::UnshareProject { project_id })
334 })?;
335 self.update_contacts_for_users(&project.authorized_user_ids)?;
336 Ok(())
337 }
338
339 async fn join_project(
340 mut self: Arc<Server>,
341 request: TypedEnvelope<proto::JoinProject>,
342 ) -> tide::Result<proto::JoinProjectResponse> {
343 let project_id = request.payload.project_id;
344
345 let user_id = self.state().user_id_for_connection(request.sender_id)?;
346 let (response, connection_ids, contact_user_ids) = self
347 .state_mut()
348 .join_project(request.sender_id, user_id, project_id)
349 .and_then(|joined| {
350 let share = joined.project.share()?;
351 let peer_count = share.guests.len();
352 let mut collaborators = Vec::with_capacity(peer_count);
353 collaborators.push(proto::Collaborator {
354 peer_id: joined.project.host_connection_id.0,
355 replica_id: 0,
356 user_id: joined.project.host_user_id.to_proto(),
357 });
358 let worktrees = share
359 .worktrees
360 .iter()
361 .filter_map(|(id, shared_worktree)| {
362 let worktree = joined.project.worktrees.get(&id)?;
363 Some(proto::Worktree {
364 id: *id,
365 root_name: worktree.root_name.clone(),
366 entries: shared_worktree.entries.values().cloned().collect(),
367 diagnostic_summaries: shared_worktree
368 .diagnostic_summaries
369 .values()
370 .cloned()
371 .collect(),
372 visible: worktree.visible,
373 })
374 })
375 .collect();
376 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
377 if *peer_conn_id != request.sender_id {
378 collaborators.push(proto::Collaborator {
379 peer_id: peer_conn_id.0,
380 replica_id: *peer_replica_id as u32,
381 user_id: peer_user_id.to_proto(),
382 });
383 }
384 }
385 let response = proto::JoinProjectResponse {
386 worktrees,
387 replica_id: joined.replica_id as u32,
388 collaborators,
389 language_servers: joined.project.language_servers.clone(),
390 };
391 let connection_ids = joined.project.connection_ids();
392 let contact_user_ids = joined.project.authorized_user_ids();
393 Ok((response, connection_ids, contact_user_ids))
394 })?;
395
396 broadcast(request.sender_id, connection_ids, |conn_id| {
397 self.peer.send(
398 conn_id,
399 proto::AddProjectCollaborator {
400 project_id,
401 collaborator: Some(proto::Collaborator {
402 peer_id: request.sender_id.0,
403 replica_id: response.replica_id,
404 user_id: user_id.to_proto(),
405 }),
406 },
407 )
408 })?;
409 self.update_contacts_for_users(&contact_user_ids)?;
410 Ok(response)
411 }
412
413 async fn leave_project(
414 mut self: Arc<Server>,
415 request: TypedEnvelope<proto::LeaveProject>,
416 ) -> tide::Result<()> {
417 let sender_id = request.sender_id;
418 let project_id = request.payload.project_id;
419 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
420
421 broadcast(sender_id, worktree.connection_ids, |conn_id| {
422 self.peer.send(
423 conn_id,
424 proto::RemoveProjectCollaborator {
425 project_id,
426 peer_id: sender_id.0,
427 },
428 )
429 })?;
430 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
431
432 Ok(())
433 }
434
435 async fn register_worktree(
436 mut self: Arc<Server>,
437 request: TypedEnvelope<proto::RegisterWorktree>,
438 ) -> tide::Result<proto::Ack> {
439 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
440
441 let mut contact_user_ids = HashSet::default();
442 contact_user_ids.insert(host_user_id);
443 for github_login in &request.payload.authorized_logins {
444 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
445 contact_user_ids.insert(contact_user_id);
446 }
447
448 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
449 let guest_connection_ids;
450 {
451 let mut state = self.state_mut();
452 guest_connection_ids = state
453 .read_project(request.payload.project_id, request.sender_id)?
454 .guest_connection_ids();
455 state.register_worktree(
456 request.payload.project_id,
457 request.payload.worktree_id,
458 request.sender_id,
459 Worktree {
460 authorized_user_ids: contact_user_ids.clone(),
461 root_name: request.payload.root_name.clone(),
462 visible: request.payload.visible,
463 },
464 )?;
465 }
466 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
467 self.peer
468 .forward_send(request.sender_id, connection_id, request.payload.clone())
469 })?;
470 self.update_contacts_for_users(&contact_user_ids)?;
471 Ok(proto::Ack {})
472 }
473
474 async fn unregister_worktree(
475 mut self: Arc<Server>,
476 request: TypedEnvelope<proto::UnregisterWorktree>,
477 ) -> tide::Result<()> {
478 let project_id = request.payload.project_id;
479 let worktree_id = request.payload.worktree_id;
480 let (worktree, guest_connection_ids) =
481 self.state_mut()
482 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
483 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
484 self.peer.send(
485 conn_id,
486 proto::UnregisterWorktree {
487 project_id,
488 worktree_id,
489 },
490 )
491 })?;
492 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
493 Ok(())
494 }
495
496 async fn update_worktree(
497 mut self: Arc<Server>,
498 request: TypedEnvelope<proto::UpdateWorktree>,
499 ) -> tide::Result<proto::Ack> {
500 let connection_ids = self.state_mut().update_worktree(
501 request.sender_id,
502 request.payload.project_id,
503 request.payload.worktree_id,
504 &request.payload.removed_entries,
505 &request.payload.updated_entries,
506 )?;
507
508 broadcast(request.sender_id, connection_ids, |connection_id| {
509 self.peer
510 .forward_send(request.sender_id, connection_id, request.payload.clone())
511 })?;
512
513 Ok(proto::Ack {})
514 }
515
516 async fn update_diagnostic_summary(
517 mut self: Arc<Server>,
518 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
519 ) -> tide::Result<()> {
520 let summary = request
521 .payload
522 .summary
523 .clone()
524 .ok_or_else(|| anyhow!("invalid summary"))?;
525 let receiver_ids = self.state_mut().update_diagnostic_summary(
526 request.payload.project_id,
527 request.payload.worktree_id,
528 request.sender_id,
529 summary,
530 )?;
531
532 broadcast(request.sender_id, receiver_ids, |connection_id| {
533 self.peer
534 .forward_send(request.sender_id, connection_id, request.payload.clone())
535 })?;
536 Ok(())
537 }
538
539 async fn start_language_server(
540 mut self: Arc<Server>,
541 request: TypedEnvelope<proto::StartLanguageServer>,
542 ) -> tide::Result<()> {
543 let receiver_ids = self.state_mut().start_language_server(
544 request.payload.project_id,
545 request.sender_id,
546 request
547 .payload
548 .server
549 .clone()
550 .ok_or_else(|| anyhow!("invalid language server"))?,
551 )?;
552 broadcast(request.sender_id, receiver_ids, |connection_id| {
553 self.peer
554 .forward_send(request.sender_id, connection_id, request.payload.clone())
555 })?;
556 Ok(())
557 }
558
559 async fn update_language_server(
560 self: Arc<Server>,
561 request: TypedEnvelope<proto::UpdateLanguageServer>,
562 ) -> tide::Result<()> {
563 let receiver_ids = self
564 .state()
565 .project_connection_ids(request.payload.project_id, request.sender_id)?;
566 broadcast(request.sender_id, receiver_ids, |connection_id| {
567 self.peer
568 .forward_send(request.sender_id, connection_id, request.payload.clone())
569 })?;
570 Ok(())
571 }
572
573 async fn forward_project_request<T>(
574 self: Arc<Server>,
575 request: TypedEnvelope<T>,
576 ) -> tide::Result<T::Response>
577 where
578 T: EntityMessage + RequestMessage,
579 {
580 let host_connection_id = self
581 .state()
582 .read_project(request.payload.remote_entity_id(), request.sender_id)?
583 .host_connection_id;
584 Ok(self
585 .peer
586 .forward_request(request.sender_id, host_connection_id, request.payload)
587 .await?)
588 }
589
590 async fn save_buffer(
591 self: Arc<Server>,
592 request: TypedEnvelope<proto::SaveBuffer>,
593 ) -> tide::Result<proto::BufferSaved> {
594 let host;
595 let mut guests;
596 {
597 let state = self.state();
598 let project = state.read_project(request.payload.project_id, request.sender_id)?;
599 host = project.host_connection_id;
600 guests = project.guest_connection_ids()
601 }
602
603 let response = self
604 .peer
605 .forward_request(request.sender_id, host, request.payload.clone())
606 .await?;
607
608 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
609 broadcast(host, guests, |conn_id| {
610 self.peer.forward_send(host, conn_id, response.clone())
611 })?;
612
613 Ok(response)
614 }
615
616 async fn update_buffer(
617 self: Arc<Server>,
618 request: TypedEnvelope<proto::UpdateBuffer>,
619 ) -> tide::Result<proto::Ack> {
620 let receiver_ids = self
621 .state()
622 .project_connection_ids(request.payload.project_id, request.sender_id)?;
623 broadcast(request.sender_id, receiver_ids, |connection_id| {
624 self.peer
625 .forward_send(request.sender_id, connection_id, request.payload.clone())
626 })?;
627 Ok(proto::Ack {})
628 }
629
630 async fn update_buffer_file(
631 self: Arc<Server>,
632 request: TypedEnvelope<proto::UpdateBufferFile>,
633 ) -> tide::Result<()> {
634 let receiver_ids = self
635 .state()
636 .project_connection_ids(request.payload.project_id, request.sender_id)?;
637 broadcast(request.sender_id, receiver_ids, |connection_id| {
638 self.peer
639 .forward_send(request.sender_id, connection_id, request.payload.clone())
640 })?;
641 Ok(())
642 }
643
644 async fn buffer_reloaded(
645 self: Arc<Server>,
646 request: TypedEnvelope<proto::BufferReloaded>,
647 ) -> tide::Result<()> {
648 let receiver_ids = self
649 .state()
650 .project_connection_ids(request.payload.project_id, request.sender_id)?;
651 broadcast(request.sender_id, receiver_ids, |connection_id| {
652 self.peer
653 .forward_send(request.sender_id, connection_id, request.payload.clone())
654 })?;
655 Ok(())
656 }
657
658 async fn buffer_saved(
659 self: Arc<Server>,
660 request: TypedEnvelope<proto::BufferSaved>,
661 ) -> tide::Result<()> {
662 let receiver_ids = self
663 .state()
664 .project_connection_ids(request.payload.project_id, request.sender_id)?;
665 broadcast(request.sender_id, receiver_ids, |connection_id| {
666 self.peer
667 .forward_send(request.sender_id, connection_id, request.payload.clone())
668 })?;
669 Ok(())
670 }
671
672 async fn get_channels(
673 self: Arc<Server>,
674 request: TypedEnvelope<proto::GetChannels>,
675 ) -> tide::Result<proto::GetChannelsResponse> {
676 let user_id = self.state().user_id_for_connection(request.sender_id)?;
677 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
678 Ok(proto::GetChannelsResponse {
679 channels: channels
680 .into_iter()
681 .map(|chan| proto::Channel {
682 id: chan.id.to_proto(),
683 name: chan.name,
684 })
685 .collect(),
686 })
687 }
688
689 async fn get_users(
690 self: Arc<Server>,
691 request: TypedEnvelope<proto::GetUsers>,
692 ) -> tide::Result<proto::GetUsersResponse> {
693 let user_ids = request
694 .payload
695 .user_ids
696 .into_iter()
697 .map(UserId::from_proto)
698 .collect();
699 let users = self
700 .app_state
701 .db
702 .get_users_by_ids(user_ids)
703 .await?
704 .into_iter()
705 .map(|user| proto::User {
706 id: user.id.to_proto(),
707 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
708 github_login: user.github_login,
709 })
710 .collect();
711 Ok(proto::GetUsersResponse { users })
712 }
713
714 fn update_contacts_for_users<'a>(
715 self: &Arc<Server>,
716 user_ids: impl IntoIterator<Item = &'a UserId>,
717 ) -> anyhow::Result<()> {
718 let mut result = Ok(());
719 let state = self.state();
720 for user_id in user_ids {
721 let contacts = state.contacts_for_user(*user_id);
722 for connection_id in state.connection_ids_for_user(*user_id) {
723 if let Err(error) = self.peer.send(
724 connection_id,
725 proto::UpdateContacts {
726 contacts: contacts.clone(),
727 },
728 ) {
729 result = Err(error);
730 }
731 }
732 }
733 result
734 }
735
736 async fn join_channel(
737 mut self: Arc<Self>,
738 request: TypedEnvelope<proto::JoinChannel>,
739 ) -> tide::Result<proto::JoinChannelResponse> {
740 let user_id = self.state().user_id_for_connection(request.sender_id)?;
741 let channel_id = ChannelId::from_proto(request.payload.channel_id);
742 if !self
743 .app_state
744 .db
745 .can_user_access_channel(user_id, channel_id)
746 .await?
747 {
748 Err(anyhow!("access denied"))?;
749 }
750
751 self.state_mut().join_channel(request.sender_id, channel_id);
752 let messages = self
753 .app_state
754 .db
755 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
756 .await?
757 .into_iter()
758 .map(|msg| proto::ChannelMessage {
759 id: msg.id.to_proto(),
760 body: msg.body,
761 timestamp: msg.sent_at.unix_timestamp() as u64,
762 sender_id: msg.sender_id.to_proto(),
763 nonce: Some(msg.nonce.as_u128().into()),
764 })
765 .collect::<Vec<_>>();
766 Ok(proto::JoinChannelResponse {
767 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
768 messages,
769 })
770 }
771
772 async fn leave_channel(
773 mut self: Arc<Self>,
774 request: TypedEnvelope<proto::LeaveChannel>,
775 ) -> tide::Result<()> {
776 let user_id = self.state().user_id_for_connection(request.sender_id)?;
777 let channel_id = ChannelId::from_proto(request.payload.channel_id);
778 if !self
779 .app_state
780 .db
781 .can_user_access_channel(user_id, channel_id)
782 .await?
783 {
784 Err(anyhow!("access denied"))?;
785 }
786
787 self.state_mut()
788 .leave_channel(request.sender_id, channel_id);
789
790 Ok(())
791 }
792
793 async fn send_channel_message(
794 self: Arc<Self>,
795 request: TypedEnvelope<proto::SendChannelMessage>,
796 ) -> tide::Result<proto::SendChannelMessageResponse> {
797 let channel_id = ChannelId::from_proto(request.payload.channel_id);
798 let user_id;
799 let connection_ids;
800 {
801 let state = self.state();
802 user_id = state.user_id_for_connection(request.sender_id)?;
803 connection_ids = state.channel_connection_ids(channel_id)?;
804 }
805
806 // Validate the message body.
807 let body = request.payload.body.trim().to_string();
808 if body.len() > MAX_MESSAGE_LEN {
809 return Err(anyhow!("message is too long"))?;
810 }
811 if body.is_empty() {
812 return Err(anyhow!("message can't be blank"))?;
813 }
814
815 let timestamp = OffsetDateTime::now_utc();
816 let nonce = request
817 .payload
818 .nonce
819 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
820
821 let message_id = self
822 .app_state
823 .db
824 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
825 .await?
826 .to_proto();
827 let message = proto::ChannelMessage {
828 sender_id: user_id.to_proto(),
829 id: message_id,
830 body,
831 timestamp: timestamp.unix_timestamp() as u64,
832 nonce: Some(nonce),
833 };
834 broadcast(request.sender_id, connection_ids, |conn_id| {
835 self.peer.send(
836 conn_id,
837 proto::ChannelMessageSent {
838 channel_id: channel_id.to_proto(),
839 message: Some(message.clone()),
840 },
841 )
842 })?;
843 Ok(proto::SendChannelMessageResponse {
844 message: Some(message),
845 })
846 }
847
848 async fn get_channel_messages(
849 self: Arc<Self>,
850 request: TypedEnvelope<proto::GetChannelMessages>,
851 ) -> tide::Result<proto::GetChannelMessagesResponse> {
852 let user_id = self.state().user_id_for_connection(request.sender_id)?;
853 let channel_id = ChannelId::from_proto(request.payload.channel_id);
854 if !self
855 .app_state
856 .db
857 .can_user_access_channel(user_id, channel_id)
858 .await?
859 {
860 Err(anyhow!("access denied"))?;
861 }
862
863 let messages = self
864 .app_state
865 .db
866 .get_channel_messages(
867 channel_id,
868 MESSAGE_COUNT_PER_PAGE,
869 Some(MessageId::from_proto(request.payload.before_message_id)),
870 )
871 .await?
872 .into_iter()
873 .map(|msg| proto::ChannelMessage {
874 id: msg.id.to_proto(),
875 body: msg.body,
876 timestamp: msg.sent_at.unix_timestamp() as u64,
877 sender_id: msg.sender_id.to_proto(),
878 nonce: Some(msg.nonce.as_u128().into()),
879 })
880 .collect::<Vec<_>>();
881
882 Ok(proto::GetChannelMessagesResponse {
883 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
884 messages,
885 })
886 }
887
888 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
889 self.store.read()
890 }
891
892 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
893 self.store.write()
894 }
895}
896
897impl Executor for RealExecutor {
898 type Timer = Timer;
899
900 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
901 task::spawn(future);
902 }
903
904 fn timer(&self, duration: Duration) -> Self::Timer {
905 Timer::after(duration)
906 }
907}
908
909fn broadcast<F>(
910 sender_id: ConnectionId,
911 receiver_ids: Vec<ConnectionId>,
912 mut f: F,
913) -> anyhow::Result<()>
914where
915 F: FnMut(ConnectionId) -> anyhow::Result<()>,
916{
917 let mut result = Ok(());
918 for receiver_id in receiver_ids {
919 if receiver_id != sender_id {
920 if let Err(error) = f(receiver_id) {
921 if result.is_ok() {
922 result = Err(error);
923 }
924 }
925 }
926 }
927 result
928}
929
930pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
931 let server = Server::new(app.state().clone(), rpc.clone(), None);
932 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
933 let server = server.clone();
934 async move {
935 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
936
937 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
938 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
939 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
940 let client_protocol_version: Option<u32> = request
941 .header("X-Zed-Protocol-Version")
942 .and_then(|v| v.as_str().parse().ok());
943
944 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
945 return Ok(Response::new(StatusCode::UpgradeRequired));
946 }
947
948 let header = match request.header("Sec-Websocket-Key") {
949 Some(h) => h.as_str(),
950 None => return Err(anyhow!("expected sec-websocket-key"))?,
951 };
952
953 let user_id = process_auth_header(&request).await?;
954
955 let mut response = Response::new(StatusCode::SwitchingProtocols);
956 response.insert_header(UPGRADE, "websocket");
957 response.insert_header(CONNECTION, "Upgrade");
958 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
959 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
960 response.insert_header("Sec-Websocket-Version", "13");
961
962 let http_res: &mut tide::http::Response = response.as_mut();
963 let upgrade_receiver = http_res.recv_upgrade().await;
964 let addr = request.remote().unwrap_or("unknown").to_string();
965 task::spawn(async move {
966 if let Some(stream) = upgrade_receiver.await {
967 server
968 .handle_connection(
969 Connection::new(
970 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
971 ),
972 addr,
973 user_id,
974 None,
975 RealExecutor,
976 )
977 .await;
978 }
979 });
980
981 Ok(response)
982 }
983 });
984}
985
986fn header_contains_ignore_case<T>(
987 request: &tide::Request<T>,
988 header_name: HeaderName,
989 value: &str,
990) -> bool {
991 request
992 .header(header_name)
993 .map(|h| {
994 h.as_str()
995 .split(',')
996 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
997 })
998 .unwrap_or(false)
999}
1000
1001#[cfg(test)]
1002mod tests {
1003 use super::*;
1004 use crate::{
1005 auth,
1006 db::{tests::TestDb, UserId},
1007 github, AppState, Config,
1008 };
1009 use ::rpc::Peer;
1010 use client::{
1011 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1012 EstablishConnectionError, UserStore,
1013 };
1014 use collections::BTreeMap;
1015 use editor::{
1016 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1017 Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1018 };
1019 use gpui::{executor, ModelHandle, TestAppContext};
1020 use language::{
1021 tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig, LanguageRegistry,
1022 LanguageServerConfig, OffsetRangeExt, Point, ToLspPosition,
1023 };
1024 use lsp;
1025 use parking_lot::Mutex;
1026 use postage::barrier;
1027 use project::{
1028 fs::{FakeFs, Fs as _},
1029 search::SearchQuery,
1030 worktree::WorktreeHandle,
1031 DiagnosticSummary, Project, ProjectPath,
1032 };
1033 use rand::prelude::*;
1034 use rpc::PeerId;
1035 use serde_json::json;
1036 use sqlx::types::time::OffsetDateTime;
1037 use std::{
1038 cell::Cell,
1039 env,
1040 ops::Deref,
1041 path::{Path, PathBuf},
1042 rc::Rc,
1043 sync::{
1044 atomic::{AtomicBool, Ordering::SeqCst},
1045 Arc,
1046 },
1047 time::Duration,
1048 };
1049 use workspace::{Settings, Workspace, WorkspaceParams};
1050
1051 #[cfg(test)]
1052 #[ctor::ctor]
1053 fn init_logger() {
1054 if std::env::var("RUST_LOG").is_ok() {
1055 env_logger::init();
1056 }
1057 }
1058
1059 #[gpui::test(iterations = 10)]
1060 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1061 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1062 let lang_registry = Arc::new(LanguageRegistry::test());
1063 let fs = FakeFs::new(cx_a.background());
1064 cx_a.foreground().forbid_parking();
1065
1066 // Connect to a server as 2 clients.
1067 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1068 let client_a = server.create_client(cx_a, "user_a").await;
1069 let client_b = server.create_client(cx_b, "user_b").await;
1070
1071 // Share a project as client A
1072 fs.insert_tree(
1073 "/a",
1074 json!({
1075 ".zed.toml": r#"collaborators = ["user_b"]"#,
1076 "a.txt": "a-contents",
1077 "b.txt": "b-contents",
1078 }),
1079 )
1080 .await;
1081 let project_a = cx_a.update(|cx| {
1082 Project::local(
1083 client_a.clone(),
1084 client_a.user_store.clone(),
1085 lang_registry.clone(),
1086 fs.clone(),
1087 cx,
1088 )
1089 });
1090 let (worktree_a, _) = project_a
1091 .update(cx_a, |p, cx| {
1092 p.find_or_create_local_worktree("/a", true, cx)
1093 })
1094 .await
1095 .unwrap();
1096 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1097 worktree_a
1098 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1099 .await;
1100 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1101 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1102
1103 // Join that project as client B
1104 let project_b = Project::remote(
1105 project_id,
1106 client_b.clone(),
1107 client_b.user_store.clone(),
1108 lang_registry.clone(),
1109 fs.clone(),
1110 &mut cx_b.to_async(),
1111 )
1112 .await
1113 .unwrap();
1114
1115 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1116 assert_eq!(
1117 project
1118 .collaborators()
1119 .get(&client_a.peer_id)
1120 .unwrap()
1121 .user
1122 .github_login,
1123 "user_a"
1124 );
1125 project.replica_id()
1126 });
1127 project_a
1128 .condition(&cx_a, |tree, _| {
1129 tree.collaborators()
1130 .get(&client_b.peer_id)
1131 .map_or(false, |collaborator| {
1132 collaborator.replica_id == replica_id_b
1133 && collaborator.user.github_login == "user_b"
1134 })
1135 })
1136 .await;
1137
1138 // Open the same file as client B and client A.
1139 let buffer_b = project_b
1140 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1141 .await
1142 .unwrap();
1143 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1144 buffer_b.read_with(cx_b, |buf, cx| {
1145 assert_eq!(buf.read(cx).text(), "b-contents")
1146 });
1147 project_a.read_with(cx_a, |project, cx| {
1148 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1149 });
1150 let buffer_a = project_a
1151 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1152 .await
1153 .unwrap();
1154
1155 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1156
1157 // TODO
1158 // // Create a selection set as client B and see that selection set as client A.
1159 // buffer_a
1160 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1161 // .await;
1162
1163 // Edit the buffer as client B and see that edit as client A.
1164 editor_b.update(cx_b, |editor, cx| {
1165 editor.handle_input(&Input("ok, ".into()), cx)
1166 });
1167 buffer_a
1168 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1169 .await;
1170
1171 // TODO
1172 // // Remove the selection set as client B, see those selections disappear as client A.
1173 cx_b.update(move |_| drop(editor_b));
1174 // buffer_a
1175 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1176 // .await;
1177
1178 // Dropping the client B's project removes client B from client A's collaborators.
1179 cx_b.update(move |_| drop(project_b));
1180 project_a
1181 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1182 .await;
1183 }
1184
1185 #[gpui::test(iterations = 10)]
1186 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1187 let lang_registry = Arc::new(LanguageRegistry::test());
1188 let fs = FakeFs::new(cx_a.background());
1189 cx_a.foreground().forbid_parking();
1190
1191 // Connect to a server as 2 clients.
1192 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1193 let client_a = server.create_client(cx_a, "user_a").await;
1194 let client_b = server.create_client(cx_b, "user_b").await;
1195
1196 // Share a project as client A
1197 fs.insert_tree(
1198 "/a",
1199 json!({
1200 ".zed.toml": r#"collaborators = ["user_b"]"#,
1201 "a.txt": "a-contents",
1202 "b.txt": "b-contents",
1203 }),
1204 )
1205 .await;
1206 let project_a = cx_a.update(|cx| {
1207 Project::local(
1208 client_a.clone(),
1209 client_a.user_store.clone(),
1210 lang_registry.clone(),
1211 fs.clone(),
1212 cx,
1213 )
1214 });
1215 let (worktree_a, _) = project_a
1216 .update(cx_a, |p, cx| {
1217 p.find_or_create_local_worktree("/a", true, cx)
1218 })
1219 .await
1220 .unwrap();
1221 worktree_a
1222 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1223 .await;
1224 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1225 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1226 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1227 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1228
1229 // Join that project as client B
1230 let project_b = Project::remote(
1231 project_id,
1232 client_b.clone(),
1233 client_b.user_store.clone(),
1234 lang_registry.clone(),
1235 fs.clone(),
1236 &mut cx_b.to_async(),
1237 )
1238 .await
1239 .unwrap();
1240 project_b
1241 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1242 .await
1243 .unwrap();
1244
1245 // Unshare the project as client A
1246 project_a
1247 .update(cx_a, |project, cx| project.unshare(cx))
1248 .await
1249 .unwrap();
1250 project_b
1251 .condition(cx_b, |project, _| project.is_read_only())
1252 .await;
1253 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1254 cx_b.update(|_| {
1255 drop(project_b);
1256 });
1257
1258 // Share the project again and ensure guests can still join.
1259 project_a
1260 .update(cx_a, |project, cx| project.share(cx))
1261 .await
1262 .unwrap();
1263 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1264
1265 let project_b2 = Project::remote(
1266 project_id,
1267 client_b.clone(),
1268 client_b.user_store.clone(),
1269 lang_registry.clone(),
1270 fs.clone(),
1271 &mut cx_b.to_async(),
1272 )
1273 .await
1274 .unwrap();
1275 project_b2
1276 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1277 .await
1278 .unwrap();
1279 }
1280
1281 #[gpui::test(iterations = 10)]
1282 async fn test_propagate_saves_and_fs_changes(
1283 cx_a: &mut TestAppContext,
1284 cx_b: &mut TestAppContext,
1285 cx_c: &mut TestAppContext,
1286 ) {
1287 let lang_registry = Arc::new(LanguageRegistry::test());
1288 let fs = FakeFs::new(cx_a.background());
1289 cx_a.foreground().forbid_parking();
1290
1291 // Connect to a server as 3 clients.
1292 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1293 let client_a = server.create_client(cx_a, "user_a").await;
1294 let client_b = server.create_client(cx_b, "user_b").await;
1295 let client_c = server.create_client(cx_c, "user_c").await;
1296
1297 // Share a worktree as client A.
1298 fs.insert_tree(
1299 "/a",
1300 json!({
1301 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1302 "file1": "",
1303 "file2": ""
1304 }),
1305 )
1306 .await;
1307 let project_a = cx_a.update(|cx| {
1308 Project::local(
1309 client_a.clone(),
1310 client_a.user_store.clone(),
1311 lang_registry.clone(),
1312 fs.clone(),
1313 cx,
1314 )
1315 });
1316 let (worktree_a, _) = project_a
1317 .update(cx_a, |p, cx| {
1318 p.find_or_create_local_worktree("/a", true, cx)
1319 })
1320 .await
1321 .unwrap();
1322 worktree_a
1323 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1324 .await;
1325 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1326 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1327 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1328
1329 // Join that worktree as clients B and C.
1330 let project_b = Project::remote(
1331 project_id,
1332 client_b.clone(),
1333 client_b.user_store.clone(),
1334 lang_registry.clone(),
1335 fs.clone(),
1336 &mut cx_b.to_async(),
1337 )
1338 .await
1339 .unwrap();
1340 let project_c = Project::remote(
1341 project_id,
1342 client_c.clone(),
1343 client_c.user_store.clone(),
1344 lang_registry.clone(),
1345 fs.clone(),
1346 &mut cx_c.to_async(),
1347 )
1348 .await
1349 .unwrap();
1350 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1351 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1352
1353 // Open and edit a buffer as both guests B and C.
1354 let buffer_b = project_b
1355 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1356 .await
1357 .unwrap();
1358 let buffer_c = project_c
1359 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1360 .await
1361 .unwrap();
1362 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1363 buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1364
1365 // Open and edit that buffer as the host.
1366 let buffer_a = project_a
1367 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1368 .await
1369 .unwrap();
1370
1371 buffer_a
1372 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1373 .await;
1374 buffer_a.update(cx_a, |buf, cx| {
1375 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1376 });
1377
1378 // Wait for edits to propagate
1379 buffer_a
1380 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1381 .await;
1382 buffer_b
1383 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1384 .await;
1385 buffer_c
1386 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1387 .await;
1388
1389 // Edit the buffer as the host and concurrently save as guest B.
1390 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1391 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1392 save_b.await.unwrap();
1393 assert_eq!(
1394 fs.load("/a/file1".as_ref()).await.unwrap(),
1395 "hi-a, i-am-c, i-am-b, i-am-a"
1396 );
1397 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1398 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1399 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1400
1401 worktree_a.flush_fs_events(cx_a).await;
1402
1403 // Make changes on host's file system, see those changes on guest worktrees.
1404 fs.rename(
1405 "/a/file1".as_ref(),
1406 "/a/file1-renamed".as_ref(),
1407 Default::default(),
1408 )
1409 .await
1410 .unwrap();
1411
1412 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1413 .await
1414 .unwrap();
1415 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1416
1417 worktree_a
1418 .condition(&cx_a, |tree, _| {
1419 tree.paths()
1420 .map(|p| p.to_string_lossy())
1421 .collect::<Vec<_>>()
1422 == [".zed.toml", "file1-renamed", "file3", "file4"]
1423 })
1424 .await;
1425 worktree_b
1426 .condition(&cx_b, |tree, _| {
1427 tree.paths()
1428 .map(|p| p.to_string_lossy())
1429 .collect::<Vec<_>>()
1430 == [".zed.toml", "file1-renamed", "file3", "file4"]
1431 })
1432 .await;
1433 worktree_c
1434 .condition(&cx_c, |tree, _| {
1435 tree.paths()
1436 .map(|p| p.to_string_lossy())
1437 .collect::<Vec<_>>()
1438 == [".zed.toml", "file1-renamed", "file3", "file4"]
1439 })
1440 .await;
1441
1442 // Ensure buffer files are updated as well.
1443 buffer_a
1444 .condition(&cx_a, |buf, _| {
1445 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1446 })
1447 .await;
1448 buffer_b
1449 .condition(&cx_b, |buf, _| {
1450 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1451 })
1452 .await;
1453 buffer_c
1454 .condition(&cx_c, |buf, _| {
1455 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1456 })
1457 .await;
1458 }
1459
1460 #[gpui::test(iterations = 10)]
1461 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1462 cx_a.foreground().forbid_parking();
1463 let lang_registry = Arc::new(LanguageRegistry::test());
1464 let fs = FakeFs::new(cx_a.background());
1465
1466 // Connect to a server as 2 clients.
1467 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1468 let client_a = server.create_client(cx_a, "user_a").await;
1469 let client_b = server.create_client(cx_b, "user_b").await;
1470
1471 // Share a project as client A
1472 fs.insert_tree(
1473 "/dir",
1474 json!({
1475 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1476 "a.txt": "a-contents",
1477 }),
1478 )
1479 .await;
1480
1481 let project_a = cx_a.update(|cx| {
1482 Project::local(
1483 client_a.clone(),
1484 client_a.user_store.clone(),
1485 lang_registry.clone(),
1486 fs.clone(),
1487 cx,
1488 )
1489 });
1490 let (worktree_a, _) = project_a
1491 .update(cx_a, |p, cx| {
1492 p.find_or_create_local_worktree("/dir", true, cx)
1493 })
1494 .await
1495 .unwrap();
1496 worktree_a
1497 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1498 .await;
1499 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1500 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1501 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1502
1503 // Join that project as client B
1504 let project_b = Project::remote(
1505 project_id,
1506 client_b.clone(),
1507 client_b.user_store.clone(),
1508 lang_registry.clone(),
1509 fs.clone(),
1510 &mut cx_b.to_async(),
1511 )
1512 .await
1513 .unwrap();
1514
1515 // Open a buffer as client B
1516 let buffer_b = project_b
1517 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1518 .await
1519 .unwrap();
1520
1521 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1522 buffer_b.read_with(cx_b, |buf, _| {
1523 assert!(buf.is_dirty());
1524 assert!(!buf.has_conflict());
1525 });
1526
1527 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1528 buffer_b
1529 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1530 .await;
1531 buffer_b.read_with(cx_b, |buf, _| {
1532 assert!(!buf.has_conflict());
1533 });
1534
1535 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1536 buffer_b.read_with(cx_b, |buf, _| {
1537 assert!(buf.is_dirty());
1538 assert!(!buf.has_conflict());
1539 });
1540 }
1541
1542 #[gpui::test(iterations = 10)]
1543 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1544 cx_a.foreground().forbid_parking();
1545 let lang_registry = Arc::new(LanguageRegistry::test());
1546 let fs = FakeFs::new(cx_a.background());
1547
1548 // Connect to a server as 2 clients.
1549 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1550 let client_a = server.create_client(cx_a, "user_a").await;
1551 let client_b = server.create_client(cx_b, "user_b").await;
1552
1553 // Share a project as client A
1554 fs.insert_tree(
1555 "/dir",
1556 json!({
1557 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1558 "a.txt": "a-contents",
1559 }),
1560 )
1561 .await;
1562
1563 let project_a = cx_a.update(|cx| {
1564 Project::local(
1565 client_a.clone(),
1566 client_a.user_store.clone(),
1567 lang_registry.clone(),
1568 fs.clone(),
1569 cx,
1570 )
1571 });
1572 let (worktree_a, _) = project_a
1573 .update(cx_a, |p, cx| {
1574 p.find_or_create_local_worktree("/dir", true, cx)
1575 })
1576 .await
1577 .unwrap();
1578 worktree_a
1579 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1580 .await;
1581 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1582 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1583 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1584
1585 // Join that project as client B
1586 let project_b = Project::remote(
1587 project_id,
1588 client_b.clone(),
1589 client_b.user_store.clone(),
1590 lang_registry.clone(),
1591 fs.clone(),
1592 &mut cx_b.to_async(),
1593 )
1594 .await
1595 .unwrap();
1596 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1597
1598 // Open a buffer as client B
1599 let buffer_b = project_b
1600 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1601 .await
1602 .unwrap();
1603 buffer_b.read_with(cx_b, |buf, _| {
1604 assert!(!buf.is_dirty());
1605 assert!(!buf.has_conflict());
1606 });
1607
1608 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1609 .await
1610 .unwrap();
1611 buffer_b
1612 .condition(&cx_b, |buf, _| {
1613 buf.text() == "new contents" && !buf.is_dirty()
1614 })
1615 .await;
1616 buffer_b.read_with(cx_b, |buf, _| {
1617 assert!(!buf.has_conflict());
1618 });
1619 }
1620
1621 #[gpui::test(iterations = 10)]
1622 async fn test_editing_while_guest_opens_buffer(
1623 cx_a: &mut TestAppContext,
1624 cx_b: &mut TestAppContext,
1625 ) {
1626 cx_a.foreground().forbid_parking();
1627 let lang_registry = Arc::new(LanguageRegistry::test());
1628 let fs = FakeFs::new(cx_a.background());
1629
1630 // Connect to a server as 2 clients.
1631 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1632 let client_a = server.create_client(cx_a, "user_a").await;
1633 let client_b = server.create_client(cx_b, "user_b").await;
1634
1635 // Share a project as client A
1636 fs.insert_tree(
1637 "/dir",
1638 json!({
1639 ".zed.toml": r#"collaborators = ["user_b"]"#,
1640 "a.txt": "a-contents",
1641 }),
1642 )
1643 .await;
1644 let project_a = cx_a.update(|cx| {
1645 Project::local(
1646 client_a.clone(),
1647 client_a.user_store.clone(),
1648 lang_registry.clone(),
1649 fs.clone(),
1650 cx,
1651 )
1652 });
1653 let (worktree_a, _) = project_a
1654 .update(cx_a, |p, cx| {
1655 p.find_or_create_local_worktree("/dir", true, cx)
1656 })
1657 .await
1658 .unwrap();
1659 worktree_a
1660 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1661 .await;
1662 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1663 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1664 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1665
1666 // Join that project as client B
1667 let project_b = Project::remote(
1668 project_id,
1669 client_b.clone(),
1670 client_b.user_store.clone(),
1671 lang_registry.clone(),
1672 fs.clone(),
1673 &mut cx_b.to_async(),
1674 )
1675 .await
1676 .unwrap();
1677
1678 // Open a buffer as client A
1679 let buffer_a = project_a
1680 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1681 .await
1682 .unwrap();
1683
1684 // Start opening the same buffer as client B
1685 let buffer_b = cx_b
1686 .background()
1687 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1688
1689 // Edit the buffer as client A while client B is still opening it.
1690 cx_b.background().simulate_random_delay().await;
1691 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1692 cx_b.background().simulate_random_delay().await;
1693 buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1694
1695 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1696 let buffer_b = buffer_b.await.unwrap();
1697 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1698 }
1699
1700 #[gpui::test(iterations = 10)]
1701 async fn test_leaving_worktree_while_opening_buffer(
1702 cx_a: &mut TestAppContext,
1703 cx_b: &mut TestAppContext,
1704 ) {
1705 cx_a.foreground().forbid_parking();
1706 let lang_registry = Arc::new(LanguageRegistry::test());
1707 let fs = FakeFs::new(cx_a.background());
1708
1709 // Connect to a server as 2 clients.
1710 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1711 let client_a = server.create_client(cx_a, "user_a").await;
1712 let client_b = server.create_client(cx_b, "user_b").await;
1713
1714 // Share a project as client A
1715 fs.insert_tree(
1716 "/dir",
1717 json!({
1718 ".zed.toml": r#"collaborators = ["user_b"]"#,
1719 "a.txt": "a-contents",
1720 }),
1721 )
1722 .await;
1723 let project_a = cx_a.update(|cx| {
1724 Project::local(
1725 client_a.clone(),
1726 client_a.user_store.clone(),
1727 lang_registry.clone(),
1728 fs.clone(),
1729 cx,
1730 )
1731 });
1732 let (worktree_a, _) = project_a
1733 .update(cx_a, |p, cx| {
1734 p.find_or_create_local_worktree("/dir", true, cx)
1735 })
1736 .await
1737 .unwrap();
1738 worktree_a
1739 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1740 .await;
1741 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1742 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1743 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1744
1745 // Join that project as client B
1746 let project_b = Project::remote(
1747 project_id,
1748 client_b.clone(),
1749 client_b.user_store.clone(),
1750 lang_registry.clone(),
1751 fs.clone(),
1752 &mut cx_b.to_async(),
1753 )
1754 .await
1755 .unwrap();
1756
1757 // See that a guest has joined as client A.
1758 project_a
1759 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1760 .await;
1761
1762 // Begin opening a buffer as client B, but leave the project before the open completes.
1763 let buffer_b = cx_b
1764 .background()
1765 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1766 cx_b.update(|_| drop(project_b));
1767 drop(buffer_b);
1768
1769 // See that the guest has left.
1770 project_a
1771 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1772 .await;
1773 }
1774
1775 #[gpui::test(iterations = 10)]
1776 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1777 cx_a.foreground().forbid_parking();
1778 let lang_registry = Arc::new(LanguageRegistry::test());
1779 let fs = FakeFs::new(cx_a.background());
1780
1781 // Connect to a server as 2 clients.
1782 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1783 let client_a = server.create_client(cx_a, "user_a").await;
1784 let client_b = server.create_client(cx_b, "user_b").await;
1785
1786 // Share a project as client A
1787 fs.insert_tree(
1788 "/a",
1789 json!({
1790 ".zed.toml": r#"collaborators = ["user_b"]"#,
1791 "a.txt": "a-contents",
1792 "b.txt": "b-contents",
1793 }),
1794 )
1795 .await;
1796 let project_a = cx_a.update(|cx| {
1797 Project::local(
1798 client_a.clone(),
1799 client_a.user_store.clone(),
1800 lang_registry.clone(),
1801 fs.clone(),
1802 cx,
1803 )
1804 });
1805 let (worktree_a, _) = project_a
1806 .update(cx_a, |p, cx| {
1807 p.find_or_create_local_worktree("/a", true, cx)
1808 })
1809 .await
1810 .unwrap();
1811 worktree_a
1812 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1813 .await;
1814 let project_id = project_a
1815 .update(cx_a, |project, _| project.next_remote_id())
1816 .await;
1817 project_a
1818 .update(cx_a, |project, cx| project.share(cx))
1819 .await
1820 .unwrap();
1821
1822 // Join that project as client B
1823 let _project_b = Project::remote(
1824 project_id,
1825 client_b.clone(),
1826 client_b.user_store.clone(),
1827 lang_registry.clone(),
1828 fs.clone(),
1829 &mut cx_b.to_async(),
1830 )
1831 .await
1832 .unwrap();
1833
1834 // Client A sees that a guest has joined.
1835 project_a
1836 .condition(cx_a, |p, _| p.collaborators().len() == 1)
1837 .await;
1838
1839 // Drop client B's connection and ensure client A observes client B leaving the project.
1840 client_b.disconnect(&cx_b.to_async()).unwrap();
1841 project_a
1842 .condition(cx_a, |p, _| p.collaborators().len() == 0)
1843 .await;
1844
1845 // Rejoin the project as client B
1846 let _project_b = Project::remote(
1847 project_id,
1848 client_b.clone(),
1849 client_b.user_store.clone(),
1850 lang_registry.clone(),
1851 fs.clone(),
1852 &mut cx_b.to_async(),
1853 )
1854 .await
1855 .unwrap();
1856
1857 // Client A sees that a guest has re-joined.
1858 project_a
1859 .condition(cx_a, |p, _| p.collaborators().len() == 1)
1860 .await;
1861
1862 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1863 client_b.wait_for_current_user(cx_b).await;
1864 server.disconnect_client(client_b.current_user_id(cx_b));
1865 cx_a.foreground().advance_clock(Duration::from_secs(3));
1866 project_a
1867 .condition(cx_a, |p, _| p.collaborators().len() == 0)
1868 .await;
1869 }
1870
1871 #[gpui::test(iterations = 10)]
1872 async fn test_collaborating_with_diagnostics(
1873 cx_a: &mut TestAppContext,
1874 cx_b: &mut TestAppContext,
1875 ) {
1876 cx_a.foreground().forbid_parking();
1877 let mut lang_registry = Arc::new(LanguageRegistry::test());
1878 let fs = FakeFs::new(cx_a.background());
1879
1880 // Set up a fake language server.
1881 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1882 Arc::get_mut(&mut lang_registry)
1883 .unwrap()
1884 .add(Arc::new(Language::new(
1885 LanguageConfig {
1886 name: "Rust".into(),
1887 path_suffixes: vec!["rs".to_string()],
1888 language_server: Some(language_server_config),
1889 ..Default::default()
1890 },
1891 Some(tree_sitter_rust::language()),
1892 )));
1893
1894 // Connect to a server as 2 clients.
1895 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1896 let client_a = server.create_client(cx_a, "user_a").await;
1897 let client_b = server.create_client(cx_b, "user_b").await;
1898
1899 // Share a project as client A
1900 fs.insert_tree(
1901 "/a",
1902 json!({
1903 ".zed.toml": r#"collaborators = ["user_b"]"#,
1904 "a.rs": "let one = two",
1905 "other.rs": "",
1906 }),
1907 )
1908 .await;
1909 let project_a = cx_a.update(|cx| {
1910 Project::local(
1911 client_a.clone(),
1912 client_a.user_store.clone(),
1913 lang_registry.clone(),
1914 fs.clone(),
1915 cx,
1916 )
1917 });
1918 let (worktree_a, _) = project_a
1919 .update(cx_a, |p, cx| {
1920 p.find_or_create_local_worktree("/a", true, cx)
1921 })
1922 .await
1923 .unwrap();
1924 worktree_a
1925 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1926 .await;
1927 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1928 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1929 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1930
1931 // Cause the language server to start.
1932 let _ = cx_a
1933 .background()
1934 .spawn(project_a.update(cx_a, |project, cx| {
1935 project.open_buffer(
1936 ProjectPath {
1937 worktree_id,
1938 path: Path::new("other.rs").into(),
1939 },
1940 cx,
1941 )
1942 }))
1943 .await
1944 .unwrap();
1945
1946 // Simulate a language server reporting errors for a file.
1947 let mut fake_language_server = fake_language_servers.next().await.unwrap();
1948 fake_language_server
1949 .receive_notification::<lsp::notification::DidOpenTextDocument>()
1950 .await;
1951 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
1952 lsp::PublishDiagnosticsParams {
1953 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1954 version: None,
1955 diagnostics: vec![lsp::Diagnostic {
1956 severity: Some(lsp::DiagnosticSeverity::ERROR),
1957 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1958 message: "message 1".to_string(),
1959 ..Default::default()
1960 }],
1961 },
1962 );
1963
1964 // Wait for server to see the diagnostics update.
1965 server
1966 .condition(|store| {
1967 let worktree = store
1968 .project(project_id)
1969 .unwrap()
1970 .share
1971 .as_ref()
1972 .unwrap()
1973 .worktrees
1974 .get(&worktree_id.to_proto())
1975 .unwrap();
1976
1977 !worktree.diagnostic_summaries.is_empty()
1978 })
1979 .await;
1980
1981 // Join the worktree as client B.
1982 let project_b = Project::remote(
1983 project_id,
1984 client_b.clone(),
1985 client_b.user_store.clone(),
1986 lang_registry.clone(),
1987 fs.clone(),
1988 &mut cx_b.to_async(),
1989 )
1990 .await
1991 .unwrap();
1992
1993 project_b.read_with(cx_b, |project, cx| {
1994 assert_eq!(
1995 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1996 &[(
1997 ProjectPath {
1998 worktree_id,
1999 path: Arc::from(Path::new("a.rs")),
2000 },
2001 DiagnosticSummary {
2002 error_count: 1,
2003 warning_count: 0,
2004 ..Default::default()
2005 },
2006 )]
2007 )
2008 });
2009
2010 // Simulate a language server reporting more errors for a file.
2011 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2012 lsp::PublishDiagnosticsParams {
2013 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2014 version: None,
2015 diagnostics: vec![
2016 lsp::Diagnostic {
2017 severity: Some(lsp::DiagnosticSeverity::ERROR),
2018 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2019 message: "message 1".to_string(),
2020 ..Default::default()
2021 },
2022 lsp::Diagnostic {
2023 severity: Some(lsp::DiagnosticSeverity::WARNING),
2024 range: lsp::Range::new(
2025 lsp::Position::new(0, 10),
2026 lsp::Position::new(0, 13),
2027 ),
2028 message: "message 2".to_string(),
2029 ..Default::default()
2030 },
2031 ],
2032 },
2033 );
2034
2035 // Client b gets the updated summaries
2036 project_b
2037 .condition(&cx_b, |project, cx| {
2038 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2039 == &[(
2040 ProjectPath {
2041 worktree_id,
2042 path: Arc::from(Path::new("a.rs")),
2043 },
2044 DiagnosticSummary {
2045 error_count: 1,
2046 warning_count: 1,
2047 ..Default::default()
2048 },
2049 )]
2050 })
2051 .await;
2052
2053 // Open the file with the errors on client B. They should be present.
2054 let buffer_b = cx_b
2055 .background()
2056 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2057 .await
2058 .unwrap();
2059
2060 buffer_b.read_with(cx_b, |buffer, _| {
2061 assert_eq!(
2062 buffer
2063 .snapshot()
2064 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2065 .map(|entry| entry)
2066 .collect::<Vec<_>>(),
2067 &[
2068 DiagnosticEntry {
2069 range: Point::new(0, 4)..Point::new(0, 7),
2070 diagnostic: Diagnostic {
2071 group_id: 0,
2072 message: "message 1".to_string(),
2073 severity: lsp::DiagnosticSeverity::ERROR,
2074 is_primary: true,
2075 ..Default::default()
2076 }
2077 },
2078 DiagnosticEntry {
2079 range: Point::new(0, 10)..Point::new(0, 13),
2080 diagnostic: Diagnostic {
2081 group_id: 1,
2082 severity: lsp::DiagnosticSeverity::WARNING,
2083 message: "message 2".to_string(),
2084 is_primary: true,
2085 ..Default::default()
2086 }
2087 }
2088 ]
2089 );
2090 });
2091 }
2092
2093 #[gpui::test(iterations = 10)]
2094 async fn test_collaborating_with_completion(
2095 cx_a: &mut TestAppContext,
2096 cx_b: &mut TestAppContext,
2097 ) {
2098 cx_a.foreground().forbid_parking();
2099 let mut lang_registry = Arc::new(LanguageRegistry::test());
2100 let fs = FakeFs::new(cx_a.background());
2101
2102 // Set up a fake language server.
2103 let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2104 language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2105 completion_provider: Some(lsp::CompletionOptions {
2106 trigger_characters: Some(vec![".".to_string()]),
2107 ..Default::default()
2108 }),
2109 ..Default::default()
2110 });
2111 Arc::get_mut(&mut lang_registry)
2112 .unwrap()
2113 .add(Arc::new(Language::new(
2114 LanguageConfig {
2115 name: "Rust".into(),
2116 path_suffixes: vec!["rs".to_string()],
2117 language_server: Some(language_server_config),
2118 ..Default::default()
2119 },
2120 Some(tree_sitter_rust::language()),
2121 )));
2122
2123 // Connect to a server as 2 clients.
2124 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2125 let client_a = server.create_client(cx_a, "user_a").await;
2126 let client_b = server.create_client(cx_b, "user_b").await;
2127
2128 // Share a project as client A
2129 fs.insert_tree(
2130 "/a",
2131 json!({
2132 ".zed.toml": r#"collaborators = ["user_b"]"#,
2133 "main.rs": "fn main() { a }",
2134 "other.rs": "",
2135 }),
2136 )
2137 .await;
2138 let project_a = cx_a.update(|cx| {
2139 Project::local(
2140 client_a.clone(),
2141 client_a.user_store.clone(),
2142 lang_registry.clone(),
2143 fs.clone(),
2144 cx,
2145 )
2146 });
2147 let (worktree_a, _) = project_a
2148 .update(cx_a, |p, cx| {
2149 p.find_or_create_local_worktree("/a", true, cx)
2150 })
2151 .await
2152 .unwrap();
2153 worktree_a
2154 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2155 .await;
2156 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2157 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2158 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2159
2160 // Join the worktree as client B.
2161 let project_b = Project::remote(
2162 project_id,
2163 client_b.clone(),
2164 client_b.user_store.clone(),
2165 lang_registry.clone(),
2166 fs.clone(),
2167 &mut cx_b.to_async(),
2168 )
2169 .await
2170 .unwrap();
2171
2172 // Open a file in an editor as the guest.
2173 let buffer_b = project_b
2174 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2175 .await
2176 .unwrap();
2177 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2178 let editor_b = cx_b.add_view(window_b, |cx| {
2179 Editor::for_buffer(
2180 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2181 Some(project_b.clone()),
2182 cx,
2183 )
2184 });
2185
2186 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2187 buffer_b
2188 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2189 .await;
2190
2191 // Type a completion trigger character as the guest.
2192 editor_b.update(cx_b, |editor, cx| {
2193 editor.select_ranges([13..13], None, cx);
2194 editor.handle_input(&Input(".".into()), cx);
2195 cx.focus(&editor_b);
2196 });
2197
2198 // Receive a completion request as the host's language server.
2199 // Return some completions from the host's language server.
2200 cx_a.foreground().start_waiting();
2201 fake_language_server
2202 .handle_request::<lsp::request::Completion, _>(|params, _| {
2203 assert_eq!(
2204 params.text_document_position.text_document.uri,
2205 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2206 );
2207 assert_eq!(
2208 params.text_document_position.position,
2209 lsp::Position::new(0, 14),
2210 );
2211
2212 Some(lsp::CompletionResponse::Array(vec![
2213 lsp::CompletionItem {
2214 label: "first_method(…)".into(),
2215 detail: Some("fn(&mut self, B) -> C".into()),
2216 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2217 new_text: "first_method($1)".to_string(),
2218 range: lsp::Range::new(
2219 lsp::Position::new(0, 14),
2220 lsp::Position::new(0, 14),
2221 ),
2222 })),
2223 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2224 ..Default::default()
2225 },
2226 lsp::CompletionItem {
2227 label: "second_method(…)".into(),
2228 detail: Some("fn(&mut self, C) -> D<E>".into()),
2229 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2230 new_text: "second_method()".to_string(),
2231 range: lsp::Range::new(
2232 lsp::Position::new(0, 14),
2233 lsp::Position::new(0, 14),
2234 ),
2235 })),
2236 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2237 ..Default::default()
2238 },
2239 ]))
2240 })
2241 .next()
2242 .await
2243 .unwrap();
2244 cx_a.foreground().finish_waiting();
2245
2246 // Open the buffer on the host.
2247 let buffer_a = project_a
2248 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2249 .await
2250 .unwrap();
2251 buffer_a
2252 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2253 .await;
2254
2255 // Confirm a completion on the guest.
2256 editor_b
2257 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2258 .await;
2259 editor_b.update(cx_b, |editor, cx| {
2260 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2261 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2262 });
2263
2264 // Return a resolved completion from the host's language server.
2265 // The resolved completion has an additional text edit.
2266 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2267 |params, _| {
2268 assert_eq!(params.label, "first_method(…)");
2269 lsp::CompletionItem {
2270 label: "first_method(…)".into(),
2271 detail: Some("fn(&mut self, B) -> C".into()),
2272 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2273 new_text: "first_method($1)".to_string(),
2274 range: lsp::Range::new(
2275 lsp::Position::new(0, 14),
2276 lsp::Position::new(0, 14),
2277 ),
2278 })),
2279 additional_text_edits: Some(vec![lsp::TextEdit {
2280 new_text: "use d::SomeTrait;\n".to_string(),
2281 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2282 }]),
2283 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2284 ..Default::default()
2285 }
2286 },
2287 );
2288
2289 // The additional edit is applied.
2290 buffer_a
2291 .condition(&cx_a, |buffer, _| {
2292 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2293 })
2294 .await;
2295 buffer_b
2296 .condition(&cx_b, |buffer, _| {
2297 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2298 })
2299 .await;
2300 }
2301
2302 #[gpui::test(iterations = 10)]
2303 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2304 cx_a.foreground().forbid_parking();
2305 let mut lang_registry = Arc::new(LanguageRegistry::test());
2306 let fs = FakeFs::new(cx_a.background());
2307
2308 // Set up a fake language server.
2309 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2310 Arc::get_mut(&mut lang_registry)
2311 .unwrap()
2312 .add(Arc::new(Language::new(
2313 LanguageConfig {
2314 name: "Rust".into(),
2315 path_suffixes: vec!["rs".to_string()],
2316 language_server: Some(language_server_config),
2317 ..Default::default()
2318 },
2319 Some(tree_sitter_rust::language()),
2320 )));
2321
2322 // Connect to a server as 2 clients.
2323 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2324 let client_a = server.create_client(cx_a, "user_a").await;
2325 let client_b = server.create_client(cx_b, "user_b").await;
2326
2327 // Share a project as client A
2328 fs.insert_tree(
2329 "/a",
2330 json!({
2331 ".zed.toml": r#"collaborators = ["user_b"]"#,
2332 "a.rs": "let one = two",
2333 }),
2334 )
2335 .await;
2336 let project_a = cx_a.update(|cx| {
2337 Project::local(
2338 client_a.clone(),
2339 client_a.user_store.clone(),
2340 lang_registry.clone(),
2341 fs.clone(),
2342 cx,
2343 )
2344 });
2345 let (worktree_a, _) = project_a
2346 .update(cx_a, |p, cx| {
2347 p.find_or_create_local_worktree("/a", true, cx)
2348 })
2349 .await
2350 .unwrap();
2351 worktree_a
2352 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2353 .await;
2354 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2355 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2356 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2357
2358 // Join the worktree as client B.
2359 let project_b = Project::remote(
2360 project_id,
2361 client_b.clone(),
2362 client_b.user_store.clone(),
2363 lang_registry.clone(),
2364 fs.clone(),
2365 &mut cx_b.to_async(),
2366 )
2367 .await
2368 .unwrap();
2369
2370 let buffer_b = cx_b
2371 .background()
2372 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2373 .await
2374 .unwrap();
2375
2376 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2377 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2378 Some(vec![
2379 lsp::TextEdit {
2380 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2381 new_text: "h".to_string(),
2382 },
2383 lsp::TextEdit {
2384 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2385 new_text: "y".to_string(),
2386 },
2387 ])
2388 });
2389
2390 project_b
2391 .update(cx_b, |project, cx| {
2392 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2393 })
2394 .await
2395 .unwrap();
2396 assert_eq!(
2397 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2398 "let honey = two"
2399 );
2400 }
2401
2402 #[gpui::test(iterations = 10)]
2403 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2404 cx_a.foreground().forbid_parking();
2405 let mut lang_registry = Arc::new(LanguageRegistry::test());
2406 let fs = FakeFs::new(cx_a.background());
2407 fs.insert_tree(
2408 "/root-1",
2409 json!({
2410 ".zed.toml": r#"collaborators = ["user_b"]"#,
2411 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2412 }),
2413 )
2414 .await;
2415 fs.insert_tree(
2416 "/root-2",
2417 json!({
2418 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2419 }),
2420 )
2421 .await;
2422
2423 // Set up a fake language server.
2424 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2425 Arc::get_mut(&mut lang_registry)
2426 .unwrap()
2427 .add(Arc::new(Language::new(
2428 LanguageConfig {
2429 name: "Rust".into(),
2430 path_suffixes: vec!["rs".to_string()],
2431 language_server: Some(language_server_config),
2432 ..Default::default()
2433 },
2434 Some(tree_sitter_rust::language()),
2435 )));
2436
2437 // Connect to a server as 2 clients.
2438 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2439 let client_a = server.create_client(cx_a, "user_a").await;
2440 let client_b = server.create_client(cx_b, "user_b").await;
2441
2442 // Share a project as client A
2443 let project_a = cx_a.update(|cx| {
2444 Project::local(
2445 client_a.clone(),
2446 client_a.user_store.clone(),
2447 lang_registry.clone(),
2448 fs.clone(),
2449 cx,
2450 )
2451 });
2452 let (worktree_a, _) = project_a
2453 .update(cx_a, |p, cx| {
2454 p.find_or_create_local_worktree("/root-1", true, cx)
2455 })
2456 .await
2457 .unwrap();
2458 worktree_a
2459 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2460 .await;
2461 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2462 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2463 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2464
2465 // Join the worktree as client B.
2466 let project_b = Project::remote(
2467 project_id,
2468 client_b.clone(),
2469 client_b.user_store.clone(),
2470 lang_registry.clone(),
2471 fs.clone(),
2472 &mut cx_b.to_async(),
2473 )
2474 .await
2475 .unwrap();
2476
2477 // Open the file on client B.
2478 let buffer_b = cx_b
2479 .background()
2480 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2481 .await
2482 .unwrap();
2483
2484 // Request the definition of a symbol as the guest.
2485 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2486 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2487 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2488 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2489 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2490 )))
2491 });
2492
2493 let definitions_1 = project_b
2494 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2495 .await
2496 .unwrap();
2497 cx_b.read(|cx| {
2498 assert_eq!(definitions_1.len(), 1);
2499 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2500 let target_buffer = definitions_1[0].buffer.read(cx);
2501 assert_eq!(
2502 target_buffer.text(),
2503 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2504 );
2505 assert_eq!(
2506 definitions_1[0].range.to_point(target_buffer),
2507 Point::new(0, 6)..Point::new(0, 9)
2508 );
2509 });
2510
2511 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2512 // the previous call to `definition`.
2513 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2514 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2515 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2516 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2517 )))
2518 });
2519
2520 let definitions_2 = project_b
2521 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2522 .await
2523 .unwrap();
2524 cx_b.read(|cx| {
2525 assert_eq!(definitions_2.len(), 1);
2526 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2527 let target_buffer = definitions_2[0].buffer.read(cx);
2528 assert_eq!(
2529 target_buffer.text(),
2530 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2531 );
2532 assert_eq!(
2533 definitions_2[0].range.to_point(target_buffer),
2534 Point::new(1, 6)..Point::new(1, 11)
2535 );
2536 });
2537 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2538 }
2539
2540 #[gpui::test(iterations = 10)]
2541 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2542 cx_a.foreground().forbid_parking();
2543 let mut lang_registry = Arc::new(LanguageRegistry::test());
2544 let fs = FakeFs::new(cx_a.background());
2545 fs.insert_tree(
2546 "/root-1",
2547 json!({
2548 ".zed.toml": r#"collaborators = ["user_b"]"#,
2549 "one.rs": "const ONE: usize = 1;",
2550 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2551 }),
2552 )
2553 .await;
2554 fs.insert_tree(
2555 "/root-2",
2556 json!({
2557 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2558 }),
2559 )
2560 .await;
2561
2562 // Set up a fake language server.
2563 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2564 Arc::get_mut(&mut lang_registry)
2565 .unwrap()
2566 .add(Arc::new(Language::new(
2567 LanguageConfig {
2568 name: "Rust".into(),
2569 path_suffixes: vec!["rs".to_string()],
2570 language_server: Some(language_server_config),
2571 ..Default::default()
2572 },
2573 Some(tree_sitter_rust::language()),
2574 )));
2575
2576 // Connect to a server as 2 clients.
2577 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2578 let client_a = server.create_client(cx_a, "user_a").await;
2579 let client_b = server.create_client(cx_b, "user_b").await;
2580
2581 // Share a project as client A
2582 let project_a = cx_a.update(|cx| {
2583 Project::local(
2584 client_a.clone(),
2585 client_a.user_store.clone(),
2586 lang_registry.clone(),
2587 fs.clone(),
2588 cx,
2589 )
2590 });
2591 let (worktree_a, _) = project_a
2592 .update(cx_a, |p, cx| {
2593 p.find_or_create_local_worktree("/root-1", true, cx)
2594 })
2595 .await
2596 .unwrap();
2597 worktree_a
2598 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2599 .await;
2600 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2601 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2602 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2603
2604 // Join the worktree as client B.
2605 let project_b = Project::remote(
2606 project_id,
2607 client_b.clone(),
2608 client_b.user_store.clone(),
2609 lang_registry.clone(),
2610 fs.clone(),
2611 &mut cx_b.to_async(),
2612 )
2613 .await
2614 .unwrap();
2615
2616 // Open the file on client B.
2617 let buffer_b = cx_b
2618 .background()
2619 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2620 .await
2621 .unwrap();
2622
2623 // Request references to a symbol as the guest.
2624 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2625 fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2626 assert_eq!(
2627 params.text_document_position.text_document.uri.as_str(),
2628 "file:///root-1/one.rs"
2629 );
2630 Some(vec![
2631 lsp::Location {
2632 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2633 range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2634 },
2635 lsp::Location {
2636 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2637 range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2638 },
2639 lsp::Location {
2640 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2641 range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2642 },
2643 ])
2644 });
2645
2646 let references = project_b
2647 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
2648 .await
2649 .unwrap();
2650 cx_b.read(|cx| {
2651 assert_eq!(references.len(), 3);
2652 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2653
2654 let two_buffer = references[0].buffer.read(cx);
2655 let three_buffer = references[2].buffer.read(cx);
2656 assert_eq!(
2657 two_buffer.file().unwrap().path().as_ref(),
2658 Path::new("two.rs")
2659 );
2660 assert_eq!(references[1].buffer, references[0].buffer);
2661 assert_eq!(
2662 three_buffer.file().unwrap().full_path(cx),
2663 Path::new("three.rs")
2664 );
2665
2666 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2667 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2668 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2669 });
2670 }
2671
2672 #[gpui::test(iterations = 10)]
2673 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2674 cx_a.foreground().forbid_parking();
2675 let lang_registry = Arc::new(LanguageRegistry::test());
2676 let fs = FakeFs::new(cx_a.background());
2677 fs.insert_tree(
2678 "/root-1",
2679 json!({
2680 ".zed.toml": r#"collaborators = ["user_b"]"#,
2681 "a": "hello world",
2682 "b": "goodnight moon",
2683 "c": "a world of goo",
2684 "d": "world champion of clown world",
2685 }),
2686 )
2687 .await;
2688 fs.insert_tree(
2689 "/root-2",
2690 json!({
2691 "e": "disney world is fun",
2692 }),
2693 )
2694 .await;
2695
2696 // Connect to a server as 2 clients.
2697 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2698 let client_a = server.create_client(cx_a, "user_a").await;
2699 let client_b = server.create_client(cx_b, "user_b").await;
2700
2701 // Share a project as client A
2702 let project_a = cx_a.update(|cx| {
2703 Project::local(
2704 client_a.clone(),
2705 client_a.user_store.clone(),
2706 lang_registry.clone(),
2707 fs.clone(),
2708 cx,
2709 )
2710 });
2711 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2712
2713 let (worktree_1, _) = project_a
2714 .update(cx_a, |p, cx| {
2715 p.find_or_create_local_worktree("/root-1", true, cx)
2716 })
2717 .await
2718 .unwrap();
2719 worktree_1
2720 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2721 .await;
2722 let (worktree_2, _) = project_a
2723 .update(cx_a, |p, cx| {
2724 p.find_or_create_local_worktree("/root-2", true, cx)
2725 })
2726 .await
2727 .unwrap();
2728 worktree_2
2729 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2730 .await;
2731
2732 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2733
2734 // Join the worktree as client B.
2735 let project_b = Project::remote(
2736 project_id,
2737 client_b.clone(),
2738 client_b.user_store.clone(),
2739 lang_registry.clone(),
2740 fs.clone(),
2741 &mut cx_b.to_async(),
2742 )
2743 .await
2744 .unwrap();
2745
2746 let results = project_b
2747 .update(cx_b, |project, cx| {
2748 project.search(SearchQuery::text("world", false, false), cx)
2749 })
2750 .await
2751 .unwrap();
2752
2753 let mut ranges_by_path = results
2754 .into_iter()
2755 .map(|(buffer, ranges)| {
2756 buffer.read_with(cx_b, |buffer, cx| {
2757 let path = buffer.file().unwrap().full_path(cx);
2758 let offset_ranges = ranges
2759 .into_iter()
2760 .map(|range| range.to_offset(buffer))
2761 .collect::<Vec<_>>();
2762 (path, offset_ranges)
2763 })
2764 })
2765 .collect::<Vec<_>>();
2766 ranges_by_path.sort_by_key(|(path, _)| path.clone());
2767
2768 assert_eq!(
2769 ranges_by_path,
2770 &[
2771 (PathBuf::from("root-1/a"), vec![6..11]),
2772 (PathBuf::from("root-1/c"), vec![2..7]),
2773 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2774 (PathBuf::from("root-2/e"), vec![7..12]),
2775 ]
2776 );
2777 }
2778
2779 #[gpui::test(iterations = 10)]
2780 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2781 cx_a.foreground().forbid_parking();
2782 let lang_registry = Arc::new(LanguageRegistry::test());
2783 let fs = FakeFs::new(cx_a.background());
2784 fs.insert_tree(
2785 "/root-1",
2786 json!({
2787 ".zed.toml": r#"collaborators = ["user_b"]"#,
2788 "main.rs": "fn double(number: i32) -> i32 { number + number }",
2789 }),
2790 )
2791 .await;
2792
2793 // Set up a fake language server.
2794 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2795 lang_registry.add(Arc::new(Language::new(
2796 LanguageConfig {
2797 name: "Rust".into(),
2798 path_suffixes: vec!["rs".to_string()],
2799 language_server: Some(language_server_config),
2800 ..Default::default()
2801 },
2802 Some(tree_sitter_rust::language()),
2803 )));
2804
2805 // Connect to a server as 2 clients.
2806 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2807 let client_a = server.create_client(cx_a, "user_a").await;
2808 let client_b = server.create_client(cx_b, "user_b").await;
2809
2810 // Share a project as client A
2811 let project_a = cx_a.update(|cx| {
2812 Project::local(
2813 client_a.clone(),
2814 client_a.user_store.clone(),
2815 lang_registry.clone(),
2816 fs.clone(),
2817 cx,
2818 )
2819 });
2820 let (worktree_a, _) = project_a
2821 .update(cx_a, |p, cx| {
2822 p.find_or_create_local_worktree("/root-1", true, cx)
2823 })
2824 .await
2825 .unwrap();
2826 worktree_a
2827 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2828 .await;
2829 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2830 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2831 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2832
2833 // Join the worktree as client B.
2834 let project_b = Project::remote(
2835 project_id,
2836 client_b.clone(),
2837 client_b.user_store.clone(),
2838 lang_registry.clone(),
2839 fs.clone(),
2840 &mut cx_b.to_async(),
2841 )
2842 .await
2843 .unwrap();
2844
2845 // Open the file on client B.
2846 let buffer_b = cx_b
2847 .background()
2848 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2849 .await
2850 .unwrap();
2851
2852 // Request document highlights as the guest.
2853 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2854 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2855 |params, _| {
2856 assert_eq!(
2857 params
2858 .text_document_position_params
2859 .text_document
2860 .uri
2861 .as_str(),
2862 "file:///root-1/main.rs"
2863 );
2864 assert_eq!(
2865 params.text_document_position_params.position,
2866 lsp::Position::new(0, 34)
2867 );
2868 Some(vec![
2869 lsp::DocumentHighlight {
2870 kind: Some(lsp::DocumentHighlightKind::WRITE),
2871 range: lsp::Range::new(
2872 lsp::Position::new(0, 10),
2873 lsp::Position::new(0, 16),
2874 ),
2875 },
2876 lsp::DocumentHighlight {
2877 kind: Some(lsp::DocumentHighlightKind::READ),
2878 range: lsp::Range::new(
2879 lsp::Position::new(0, 32),
2880 lsp::Position::new(0, 38),
2881 ),
2882 },
2883 lsp::DocumentHighlight {
2884 kind: Some(lsp::DocumentHighlightKind::READ),
2885 range: lsp::Range::new(
2886 lsp::Position::new(0, 41),
2887 lsp::Position::new(0, 47),
2888 ),
2889 },
2890 ])
2891 },
2892 );
2893
2894 let highlights = project_b
2895 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
2896 .await
2897 .unwrap();
2898 buffer_b.read_with(cx_b, |buffer, _| {
2899 let snapshot = buffer.snapshot();
2900
2901 let highlights = highlights
2902 .into_iter()
2903 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2904 .collect::<Vec<_>>();
2905 assert_eq!(
2906 highlights,
2907 &[
2908 (lsp::DocumentHighlightKind::WRITE, 10..16),
2909 (lsp::DocumentHighlightKind::READ, 32..38),
2910 (lsp::DocumentHighlightKind::READ, 41..47)
2911 ]
2912 )
2913 });
2914 }
2915
2916 #[gpui::test(iterations = 10)]
2917 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2918 cx_a.foreground().forbid_parking();
2919 let mut lang_registry = Arc::new(LanguageRegistry::test());
2920 let fs = FakeFs::new(cx_a.background());
2921 fs.insert_tree(
2922 "/code",
2923 json!({
2924 "crate-1": {
2925 ".zed.toml": r#"collaborators = ["user_b"]"#,
2926 "one.rs": "const ONE: usize = 1;",
2927 },
2928 "crate-2": {
2929 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2930 },
2931 "private": {
2932 "passwords.txt": "the-password",
2933 }
2934 }),
2935 )
2936 .await;
2937
2938 // Set up a fake language server.
2939 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2940 Arc::get_mut(&mut lang_registry)
2941 .unwrap()
2942 .add(Arc::new(Language::new(
2943 LanguageConfig {
2944 name: "Rust".into(),
2945 path_suffixes: vec!["rs".to_string()],
2946 language_server: Some(language_server_config),
2947 ..Default::default()
2948 },
2949 Some(tree_sitter_rust::language()),
2950 )));
2951
2952 // Connect to a server as 2 clients.
2953 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2954 let client_a = server.create_client(cx_a, "user_a").await;
2955 let client_b = server.create_client(cx_b, "user_b").await;
2956
2957 // Share a project as client A
2958 let project_a = cx_a.update(|cx| {
2959 Project::local(
2960 client_a.clone(),
2961 client_a.user_store.clone(),
2962 lang_registry.clone(),
2963 fs.clone(),
2964 cx,
2965 )
2966 });
2967 let (worktree_a, _) = project_a
2968 .update(cx_a, |p, cx| {
2969 p.find_or_create_local_worktree("/code/crate-1", true, cx)
2970 })
2971 .await
2972 .unwrap();
2973 worktree_a
2974 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2975 .await;
2976 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2977 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2978 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2979
2980 // Join the worktree as client B.
2981 let project_b = Project::remote(
2982 project_id,
2983 client_b.clone(),
2984 client_b.user_store.clone(),
2985 lang_registry.clone(),
2986 fs.clone(),
2987 &mut cx_b.to_async(),
2988 )
2989 .await
2990 .unwrap();
2991
2992 // Cause the language server to start.
2993 let _buffer = cx_b
2994 .background()
2995 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2996 .await
2997 .unwrap();
2998
2999 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3000 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3001 #[allow(deprecated)]
3002 Some(vec![lsp::SymbolInformation {
3003 name: "TWO".into(),
3004 location: lsp::Location {
3005 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3006 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3007 },
3008 kind: lsp::SymbolKind::CONSTANT,
3009 tags: None,
3010 container_name: None,
3011 deprecated: None,
3012 }])
3013 });
3014
3015 // Request the definition of a symbol as the guest.
3016 let symbols = project_b
3017 .update(cx_b, |p, cx| p.symbols("two", cx))
3018 .await
3019 .unwrap();
3020 assert_eq!(symbols.len(), 1);
3021 assert_eq!(symbols[0].name, "TWO");
3022
3023 // Open one of the returned symbols.
3024 let buffer_b_2 = project_b
3025 .update(cx_b, |project, cx| {
3026 project.open_buffer_for_symbol(&symbols[0], cx)
3027 })
3028 .await
3029 .unwrap();
3030 buffer_b_2.read_with(cx_b, |buffer, _| {
3031 assert_eq!(
3032 buffer.file().unwrap().path().as_ref(),
3033 Path::new("../crate-2/two.rs")
3034 );
3035 });
3036
3037 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3038 let mut fake_symbol = symbols[0].clone();
3039 fake_symbol.path = Path::new("/code/secrets").into();
3040 let error = project_b
3041 .update(cx_b, |project, cx| {
3042 project.open_buffer_for_symbol(&fake_symbol, cx)
3043 })
3044 .await
3045 .unwrap_err();
3046 assert!(error.to_string().contains("invalid symbol signature"));
3047 }
3048
3049 #[gpui::test(iterations = 10)]
3050 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3051 cx_a: &mut TestAppContext,
3052 cx_b: &mut TestAppContext,
3053 mut rng: StdRng,
3054 ) {
3055 cx_a.foreground().forbid_parking();
3056 let mut lang_registry = Arc::new(LanguageRegistry::test());
3057 let fs = FakeFs::new(cx_a.background());
3058 fs.insert_tree(
3059 "/root",
3060 json!({
3061 ".zed.toml": r#"collaborators = ["user_b"]"#,
3062 "a.rs": "const ONE: usize = b::TWO;",
3063 "b.rs": "const TWO: usize = 2",
3064 }),
3065 )
3066 .await;
3067
3068 // Set up a fake language server.
3069 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3070
3071 Arc::get_mut(&mut lang_registry)
3072 .unwrap()
3073 .add(Arc::new(Language::new(
3074 LanguageConfig {
3075 name: "Rust".into(),
3076 path_suffixes: vec!["rs".to_string()],
3077 language_server: Some(language_server_config),
3078 ..Default::default()
3079 },
3080 Some(tree_sitter_rust::language()),
3081 )));
3082
3083 // Connect to a server as 2 clients.
3084 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3085 let client_a = server.create_client(cx_a, "user_a").await;
3086 let client_b = server.create_client(cx_b, "user_b").await;
3087
3088 // Share a project as client A
3089 let project_a = cx_a.update(|cx| {
3090 Project::local(
3091 client_a.clone(),
3092 client_a.user_store.clone(),
3093 lang_registry.clone(),
3094 fs.clone(),
3095 cx,
3096 )
3097 });
3098
3099 let (worktree_a, _) = project_a
3100 .update(cx_a, |p, cx| {
3101 p.find_or_create_local_worktree("/root", true, cx)
3102 })
3103 .await
3104 .unwrap();
3105 worktree_a
3106 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3107 .await;
3108 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3109 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3110 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3111
3112 // Join the worktree as client B.
3113 let project_b = Project::remote(
3114 project_id,
3115 client_b.clone(),
3116 client_b.user_store.clone(),
3117 lang_registry.clone(),
3118 fs.clone(),
3119 &mut cx_b.to_async(),
3120 )
3121 .await
3122 .unwrap();
3123
3124 let buffer_b1 = cx_b
3125 .background()
3126 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3127 .await
3128 .unwrap();
3129
3130 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3131 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3132 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3133 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3134 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3135 )))
3136 });
3137
3138 let definitions;
3139 let buffer_b2;
3140 if rng.gen() {
3141 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3142 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3143 } else {
3144 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3145 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3146 }
3147
3148 let buffer_b2 = buffer_b2.await.unwrap();
3149 let definitions = definitions.await.unwrap();
3150 assert_eq!(definitions.len(), 1);
3151 assert_eq!(definitions[0].buffer, buffer_b2);
3152 }
3153
3154 #[gpui::test(iterations = 10)]
3155 async fn test_collaborating_with_code_actions(
3156 cx_a: &mut TestAppContext,
3157 cx_b: &mut TestAppContext,
3158 ) {
3159 cx_a.foreground().forbid_parking();
3160 let mut lang_registry = Arc::new(LanguageRegistry::test());
3161 let fs = FakeFs::new(cx_a.background());
3162 cx_b.update(|cx| editor::init(cx));
3163
3164 // Set up a fake language server.
3165 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3166 Arc::get_mut(&mut lang_registry)
3167 .unwrap()
3168 .add(Arc::new(Language::new(
3169 LanguageConfig {
3170 name: "Rust".into(),
3171 path_suffixes: vec!["rs".to_string()],
3172 language_server: Some(language_server_config),
3173 ..Default::default()
3174 },
3175 Some(tree_sitter_rust::language()),
3176 )));
3177
3178 // Connect to a server as 2 clients.
3179 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3180 let client_a = server.create_client(cx_a, "user_a").await;
3181 let client_b = server.create_client(cx_b, "user_b").await;
3182
3183 // Share a project as client A
3184 fs.insert_tree(
3185 "/a",
3186 json!({
3187 ".zed.toml": r#"collaborators = ["user_b"]"#,
3188 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3189 "other.rs": "pub fn foo() -> usize { 4 }",
3190 }),
3191 )
3192 .await;
3193 let project_a = cx_a.update(|cx| {
3194 Project::local(
3195 client_a.clone(),
3196 client_a.user_store.clone(),
3197 lang_registry.clone(),
3198 fs.clone(),
3199 cx,
3200 )
3201 });
3202 let (worktree_a, _) = project_a
3203 .update(cx_a, |p, cx| {
3204 p.find_or_create_local_worktree("/a", true, cx)
3205 })
3206 .await
3207 .unwrap();
3208 worktree_a
3209 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3210 .await;
3211 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3212 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3213 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3214
3215 // Join the worktree as client B.
3216 let project_b = Project::remote(
3217 project_id,
3218 client_b.clone(),
3219 client_b.user_store.clone(),
3220 lang_registry.clone(),
3221 fs.clone(),
3222 &mut cx_b.to_async(),
3223 )
3224 .await
3225 .unwrap();
3226 let mut params = cx_b.update(WorkspaceParams::test);
3227 params.languages = lang_registry.clone();
3228 params.client = client_b.client.clone();
3229 params.user_store = client_b.user_store.clone();
3230 params.project = project_b;
3231
3232 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3233 let editor_b = workspace_b
3234 .update(cx_b, |workspace, cx| {
3235 workspace.open_path((worktree_id, "main.rs").into(), cx)
3236 })
3237 .await
3238 .unwrap()
3239 .downcast::<Editor>()
3240 .unwrap();
3241
3242 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3243 fake_language_server
3244 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3245 assert_eq!(
3246 params.text_document.uri,
3247 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3248 );
3249 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3250 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3251 None
3252 })
3253 .next()
3254 .await;
3255
3256 // Move cursor to a location that contains code actions.
3257 editor_b.update(cx_b, |editor, cx| {
3258 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3259 cx.focus(&editor_b);
3260 });
3261
3262 fake_language_server
3263 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3264 assert_eq!(
3265 params.text_document.uri,
3266 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3267 );
3268 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3269 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3270
3271 Some(vec![lsp::CodeActionOrCommand::CodeAction(
3272 lsp::CodeAction {
3273 title: "Inline into all callers".to_string(),
3274 edit: Some(lsp::WorkspaceEdit {
3275 changes: Some(
3276 [
3277 (
3278 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3279 vec![lsp::TextEdit::new(
3280 lsp::Range::new(
3281 lsp::Position::new(1, 22),
3282 lsp::Position::new(1, 34),
3283 ),
3284 "4".to_string(),
3285 )],
3286 ),
3287 (
3288 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3289 vec![lsp::TextEdit::new(
3290 lsp::Range::new(
3291 lsp::Position::new(0, 0),
3292 lsp::Position::new(0, 27),
3293 ),
3294 "".to_string(),
3295 )],
3296 ),
3297 ]
3298 .into_iter()
3299 .collect(),
3300 ),
3301 ..Default::default()
3302 }),
3303 data: Some(json!({
3304 "codeActionParams": {
3305 "range": {
3306 "start": {"line": 1, "column": 31},
3307 "end": {"line": 1, "column": 31},
3308 }
3309 }
3310 })),
3311 ..Default::default()
3312 },
3313 )])
3314 })
3315 .next()
3316 .await;
3317
3318 // Toggle code actions and wait for them to display.
3319 editor_b.update(cx_b, |editor, cx| {
3320 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3321 });
3322 editor_b
3323 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3324 .await;
3325
3326 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3327
3328 // Confirming the code action will trigger a resolve request.
3329 let confirm_action = workspace_b
3330 .update(cx_b, |workspace, cx| {
3331 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3332 })
3333 .unwrap();
3334 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3335 lsp::CodeAction {
3336 title: "Inline into all callers".to_string(),
3337 edit: Some(lsp::WorkspaceEdit {
3338 changes: Some(
3339 [
3340 (
3341 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3342 vec![lsp::TextEdit::new(
3343 lsp::Range::new(
3344 lsp::Position::new(1, 22),
3345 lsp::Position::new(1, 34),
3346 ),
3347 "4".to_string(),
3348 )],
3349 ),
3350 (
3351 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3352 vec![lsp::TextEdit::new(
3353 lsp::Range::new(
3354 lsp::Position::new(0, 0),
3355 lsp::Position::new(0, 27),
3356 ),
3357 "".to_string(),
3358 )],
3359 ),
3360 ]
3361 .into_iter()
3362 .collect(),
3363 ),
3364 ..Default::default()
3365 }),
3366 ..Default::default()
3367 }
3368 });
3369
3370 // After the action is confirmed, an editor containing both modified files is opened.
3371 confirm_action.await.unwrap();
3372 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3373 workspace
3374 .active_item(cx)
3375 .unwrap()
3376 .downcast::<Editor>()
3377 .unwrap()
3378 });
3379 code_action_editor.update(cx_b, |editor, cx| {
3380 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3381 editor.undo(&Undo, cx);
3382 assert_eq!(
3383 editor.text(cx),
3384 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3385 );
3386 editor.redo(&Redo, cx);
3387 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3388 });
3389 }
3390
3391 #[gpui::test(iterations = 10)]
3392 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3393 cx_a.foreground().forbid_parking();
3394 let mut lang_registry = Arc::new(LanguageRegistry::test());
3395 let fs = FakeFs::new(cx_a.background());
3396 cx_b.update(|cx| editor::init(cx));
3397
3398 // Set up a fake language server.
3399 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3400 Arc::get_mut(&mut lang_registry)
3401 .unwrap()
3402 .add(Arc::new(Language::new(
3403 LanguageConfig {
3404 name: "Rust".into(),
3405 path_suffixes: vec!["rs".to_string()],
3406 language_server: Some(language_server_config),
3407 ..Default::default()
3408 },
3409 Some(tree_sitter_rust::language()),
3410 )));
3411
3412 // Connect to a server as 2 clients.
3413 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3414 let client_a = server.create_client(cx_a, "user_a").await;
3415 let client_b = server.create_client(cx_b, "user_b").await;
3416
3417 // Share a project as client A
3418 fs.insert_tree(
3419 "/dir",
3420 json!({
3421 ".zed.toml": r#"collaborators = ["user_b"]"#,
3422 "one.rs": "const ONE: usize = 1;",
3423 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3424 }),
3425 )
3426 .await;
3427 let project_a = cx_a.update(|cx| {
3428 Project::local(
3429 client_a.clone(),
3430 client_a.user_store.clone(),
3431 lang_registry.clone(),
3432 fs.clone(),
3433 cx,
3434 )
3435 });
3436 let (worktree_a, _) = project_a
3437 .update(cx_a, |p, cx| {
3438 p.find_or_create_local_worktree("/dir", true, cx)
3439 })
3440 .await
3441 .unwrap();
3442 worktree_a
3443 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3444 .await;
3445 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3446 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3447 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3448
3449 // Join the worktree as client B.
3450 let project_b = Project::remote(
3451 project_id,
3452 client_b.clone(),
3453 client_b.user_store.clone(),
3454 lang_registry.clone(),
3455 fs.clone(),
3456 &mut cx_b.to_async(),
3457 )
3458 .await
3459 .unwrap();
3460 let mut params = cx_b.update(WorkspaceParams::test);
3461 params.languages = lang_registry.clone();
3462 params.client = client_b.client.clone();
3463 params.user_store = client_b.user_store.clone();
3464 params.project = project_b;
3465
3466 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3467 let editor_b = workspace_b
3468 .update(cx_b, |workspace, cx| {
3469 workspace.open_path((worktree_id, "one.rs").into(), cx)
3470 })
3471 .await
3472 .unwrap()
3473 .downcast::<Editor>()
3474 .unwrap();
3475 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3476
3477 // Move cursor to a location that can be renamed.
3478 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3479 editor.select_ranges([7..7], None, cx);
3480 editor.rename(&Rename, cx).unwrap()
3481 });
3482
3483 fake_language_server
3484 .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3485 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3486 assert_eq!(params.position, lsp::Position::new(0, 7));
3487 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3488 lsp::Position::new(0, 6),
3489 lsp::Position::new(0, 9),
3490 )))
3491 })
3492 .next()
3493 .await
3494 .unwrap();
3495 prepare_rename.await.unwrap();
3496 editor_b.update(cx_b, |editor, cx| {
3497 let rename = editor.pending_rename().unwrap();
3498 let buffer = editor.buffer().read(cx).snapshot(cx);
3499 assert_eq!(
3500 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3501 6..9
3502 );
3503 rename.editor.update(cx, |rename_editor, cx| {
3504 rename_editor.buffer().update(cx, |rename_buffer, cx| {
3505 rename_buffer.edit([0..3], "THREE", cx);
3506 });
3507 });
3508 });
3509
3510 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3511 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3512 });
3513 fake_language_server
3514 .handle_request::<lsp::request::Rename, _>(|params, _| {
3515 assert_eq!(
3516 params.text_document_position.text_document.uri.as_str(),
3517 "file:///dir/one.rs"
3518 );
3519 assert_eq!(
3520 params.text_document_position.position,
3521 lsp::Position::new(0, 6)
3522 );
3523 assert_eq!(params.new_name, "THREE");
3524 Some(lsp::WorkspaceEdit {
3525 changes: Some(
3526 [
3527 (
3528 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3529 vec![lsp::TextEdit::new(
3530 lsp::Range::new(
3531 lsp::Position::new(0, 6),
3532 lsp::Position::new(0, 9),
3533 ),
3534 "THREE".to_string(),
3535 )],
3536 ),
3537 (
3538 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3539 vec![
3540 lsp::TextEdit::new(
3541 lsp::Range::new(
3542 lsp::Position::new(0, 24),
3543 lsp::Position::new(0, 27),
3544 ),
3545 "THREE".to_string(),
3546 ),
3547 lsp::TextEdit::new(
3548 lsp::Range::new(
3549 lsp::Position::new(0, 35),
3550 lsp::Position::new(0, 38),
3551 ),
3552 "THREE".to_string(),
3553 ),
3554 ],
3555 ),
3556 ]
3557 .into_iter()
3558 .collect(),
3559 ),
3560 ..Default::default()
3561 })
3562 })
3563 .next()
3564 .await
3565 .unwrap();
3566 confirm_rename.await.unwrap();
3567
3568 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3569 workspace
3570 .active_item(cx)
3571 .unwrap()
3572 .downcast::<Editor>()
3573 .unwrap()
3574 });
3575 rename_editor.update(cx_b, |editor, cx| {
3576 assert_eq!(
3577 editor.text(cx),
3578 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3579 );
3580 editor.undo(&Undo, cx);
3581 assert_eq!(
3582 editor.text(cx),
3583 "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3584 );
3585 editor.redo(&Redo, cx);
3586 assert_eq!(
3587 editor.text(cx),
3588 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3589 );
3590 });
3591
3592 // Ensure temporary rename edits cannot be undone/redone.
3593 editor_b.update(cx_b, |editor, cx| {
3594 editor.undo(&Undo, cx);
3595 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3596 editor.undo(&Undo, cx);
3597 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3598 editor.redo(&Redo, cx);
3599 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3600 })
3601 }
3602
3603 #[gpui::test(iterations = 10)]
3604 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3605 cx_a.foreground().forbid_parking();
3606
3607 // Connect to a server as 2 clients.
3608 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3609 let client_a = server.create_client(cx_a, "user_a").await;
3610 let client_b = server.create_client(cx_b, "user_b").await;
3611
3612 // Create an org that includes these 2 users.
3613 let db = &server.app_state.db;
3614 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3615 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3616 .await
3617 .unwrap();
3618 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3619 .await
3620 .unwrap();
3621
3622 // Create a channel that includes all the users.
3623 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3624 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3625 .await
3626 .unwrap();
3627 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3628 .await
3629 .unwrap();
3630 db.create_channel_message(
3631 channel_id,
3632 client_b.current_user_id(&cx_b),
3633 "hello A, it's B.",
3634 OffsetDateTime::now_utc(),
3635 1,
3636 )
3637 .await
3638 .unwrap();
3639
3640 let channels_a = cx_a
3641 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3642 channels_a
3643 .condition(cx_a, |list, _| list.available_channels().is_some())
3644 .await;
3645 channels_a.read_with(cx_a, |list, _| {
3646 assert_eq!(
3647 list.available_channels().unwrap(),
3648 &[ChannelDetails {
3649 id: channel_id.to_proto(),
3650 name: "test-channel".to_string()
3651 }]
3652 )
3653 });
3654 let channel_a = channels_a.update(cx_a, |this, cx| {
3655 this.get_channel(channel_id.to_proto(), cx).unwrap()
3656 });
3657 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3658 channel_a
3659 .condition(&cx_a, |channel, _| {
3660 channel_messages(channel)
3661 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3662 })
3663 .await;
3664
3665 let channels_b = cx_b
3666 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3667 channels_b
3668 .condition(cx_b, |list, _| list.available_channels().is_some())
3669 .await;
3670 channels_b.read_with(cx_b, |list, _| {
3671 assert_eq!(
3672 list.available_channels().unwrap(),
3673 &[ChannelDetails {
3674 id: channel_id.to_proto(),
3675 name: "test-channel".to_string()
3676 }]
3677 )
3678 });
3679
3680 let channel_b = channels_b.update(cx_b, |this, cx| {
3681 this.get_channel(channel_id.to_proto(), cx).unwrap()
3682 });
3683 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3684 channel_b
3685 .condition(&cx_b, |channel, _| {
3686 channel_messages(channel)
3687 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3688 })
3689 .await;
3690
3691 channel_a
3692 .update(cx_a, |channel, cx| {
3693 channel
3694 .send_message("oh, hi B.".to_string(), cx)
3695 .unwrap()
3696 .detach();
3697 let task = channel.send_message("sup".to_string(), cx).unwrap();
3698 assert_eq!(
3699 channel_messages(channel),
3700 &[
3701 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3702 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3703 ("user_a".to_string(), "sup".to_string(), true)
3704 ]
3705 );
3706 task
3707 })
3708 .await
3709 .unwrap();
3710
3711 channel_b
3712 .condition(&cx_b, |channel, _| {
3713 channel_messages(channel)
3714 == [
3715 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3716 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3717 ("user_a".to_string(), "sup".to_string(), false),
3718 ]
3719 })
3720 .await;
3721
3722 assert_eq!(
3723 server
3724 .state()
3725 .await
3726 .channel(channel_id)
3727 .unwrap()
3728 .connection_ids
3729 .len(),
3730 2
3731 );
3732 cx_b.update(|_| drop(channel_b));
3733 server
3734 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3735 .await;
3736
3737 cx_a.update(|_| drop(channel_a));
3738 server
3739 .condition(|state| state.channel(channel_id).is_none())
3740 .await;
3741 }
3742
3743 #[gpui::test(iterations = 10)]
3744 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3745 cx_a.foreground().forbid_parking();
3746
3747 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3748 let client_a = server.create_client(cx_a, "user_a").await;
3749
3750 let db = &server.app_state.db;
3751 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3752 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3753 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3754 .await
3755 .unwrap();
3756 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3757 .await
3758 .unwrap();
3759
3760 let channels_a = cx_a
3761 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3762 channels_a
3763 .condition(cx_a, |list, _| list.available_channels().is_some())
3764 .await;
3765 let channel_a = channels_a.update(cx_a, |this, cx| {
3766 this.get_channel(channel_id.to_proto(), cx).unwrap()
3767 });
3768
3769 // Messages aren't allowed to be too long.
3770 channel_a
3771 .update(cx_a, |channel, cx| {
3772 let long_body = "this is long.\n".repeat(1024);
3773 channel.send_message(long_body, cx).unwrap()
3774 })
3775 .await
3776 .unwrap_err();
3777
3778 // Messages aren't allowed to be blank.
3779 channel_a.update(cx_a, |channel, cx| {
3780 channel.send_message(String::new(), cx).unwrap_err()
3781 });
3782
3783 // Leading and trailing whitespace are trimmed.
3784 channel_a
3785 .update(cx_a, |channel, cx| {
3786 channel
3787 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3788 .unwrap()
3789 })
3790 .await
3791 .unwrap();
3792 assert_eq!(
3793 db.get_channel_messages(channel_id, 10, None)
3794 .await
3795 .unwrap()
3796 .iter()
3797 .map(|m| &m.body)
3798 .collect::<Vec<_>>(),
3799 &["surrounded by whitespace"]
3800 );
3801 }
3802
3803 #[gpui::test(iterations = 10)]
3804 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3805 cx_a.foreground().forbid_parking();
3806
3807 // Connect to a server as 2 clients.
3808 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3809 let client_a = server.create_client(cx_a, "user_a").await;
3810 let client_b = server.create_client(cx_b, "user_b").await;
3811 let mut status_b = client_b.status();
3812
3813 // Create an org that includes these 2 users.
3814 let db = &server.app_state.db;
3815 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3816 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3817 .await
3818 .unwrap();
3819 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3820 .await
3821 .unwrap();
3822
3823 // Create a channel that includes all the users.
3824 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3825 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3826 .await
3827 .unwrap();
3828 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3829 .await
3830 .unwrap();
3831 db.create_channel_message(
3832 channel_id,
3833 client_b.current_user_id(&cx_b),
3834 "hello A, it's B.",
3835 OffsetDateTime::now_utc(),
3836 2,
3837 )
3838 .await
3839 .unwrap();
3840
3841 let channels_a = cx_a
3842 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3843 channels_a
3844 .condition(cx_a, |list, _| list.available_channels().is_some())
3845 .await;
3846
3847 channels_a.read_with(cx_a, |list, _| {
3848 assert_eq!(
3849 list.available_channels().unwrap(),
3850 &[ChannelDetails {
3851 id: channel_id.to_proto(),
3852 name: "test-channel".to_string()
3853 }]
3854 )
3855 });
3856 let channel_a = channels_a.update(cx_a, |this, cx| {
3857 this.get_channel(channel_id.to_proto(), cx).unwrap()
3858 });
3859 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3860 channel_a
3861 .condition(&cx_a, |channel, _| {
3862 channel_messages(channel)
3863 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3864 })
3865 .await;
3866
3867 let channels_b = cx_b
3868 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3869 channels_b
3870 .condition(cx_b, |list, _| list.available_channels().is_some())
3871 .await;
3872 channels_b.read_with(cx_b, |list, _| {
3873 assert_eq!(
3874 list.available_channels().unwrap(),
3875 &[ChannelDetails {
3876 id: channel_id.to_proto(),
3877 name: "test-channel".to_string()
3878 }]
3879 )
3880 });
3881
3882 let channel_b = channels_b.update(cx_b, |this, cx| {
3883 this.get_channel(channel_id.to_proto(), cx).unwrap()
3884 });
3885 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3886 channel_b
3887 .condition(&cx_b, |channel, _| {
3888 channel_messages(channel)
3889 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3890 })
3891 .await;
3892
3893 // Disconnect client B, ensuring we can still access its cached channel data.
3894 server.forbid_connections();
3895 server.disconnect_client(client_b.current_user_id(&cx_b));
3896 cx_b.foreground().advance_clock(Duration::from_secs(3));
3897 while !matches!(
3898 status_b.next().await,
3899 Some(client::Status::ReconnectionError { .. })
3900 ) {}
3901
3902 channels_b.read_with(cx_b, |channels, _| {
3903 assert_eq!(
3904 channels.available_channels().unwrap(),
3905 [ChannelDetails {
3906 id: channel_id.to_proto(),
3907 name: "test-channel".to_string()
3908 }]
3909 )
3910 });
3911 channel_b.read_with(cx_b, |channel, _| {
3912 assert_eq!(
3913 channel_messages(channel),
3914 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3915 )
3916 });
3917
3918 // Send a message from client B while it is disconnected.
3919 channel_b
3920 .update(cx_b, |channel, cx| {
3921 let task = channel
3922 .send_message("can you see this?".to_string(), cx)
3923 .unwrap();
3924 assert_eq!(
3925 channel_messages(channel),
3926 &[
3927 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3928 ("user_b".to_string(), "can you see this?".to_string(), true)
3929 ]
3930 );
3931 task
3932 })
3933 .await
3934 .unwrap_err();
3935
3936 // Send a message from client A while B is disconnected.
3937 channel_a
3938 .update(cx_a, |channel, cx| {
3939 channel
3940 .send_message("oh, hi B.".to_string(), cx)
3941 .unwrap()
3942 .detach();
3943 let task = channel.send_message("sup".to_string(), cx).unwrap();
3944 assert_eq!(
3945 channel_messages(channel),
3946 &[
3947 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3948 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3949 ("user_a".to_string(), "sup".to_string(), true)
3950 ]
3951 );
3952 task
3953 })
3954 .await
3955 .unwrap();
3956
3957 // Give client B a chance to reconnect.
3958 server.allow_connections();
3959 cx_b.foreground().advance_clock(Duration::from_secs(10));
3960
3961 // Verify that B sees the new messages upon reconnection, as well as the message client B
3962 // sent while offline.
3963 channel_b
3964 .condition(&cx_b, |channel, _| {
3965 channel_messages(channel)
3966 == [
3967 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3968 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3969 ("user_a".to_string(), "sup".to_string(), false),
3970 ("user_b".to_string(), "can you see this?".to_string(), false),
3971 ]
3972 })
3973 .await;
3974
3975 // Ensure client A and B can communicate normally after reconnection.
3976 channel_a
3977 .update(cx_a, |channel, cx| {
3978 channel.send_message("you online?".to_string(), cx).unwrap()
3979 })
3980 .await
3981 .unwrap();
3982 channel_b
3983 .condition(&cx_b, |channel, _| {
3984 channel_messages(channel)
3985 == [
3986 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3987 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3988 ("user_a".to_string(), "sup".to_string(), false),
3989 ("user_b".to_string(), "can you see this?".to_string(), false),
3990 ("user_a".to_string(), "you online?".to_string(), false),
3991 ]
3992 })
3993 .await;
3994
3995 channel_b
3996 .update(cx_b, |channel, cx| {
3997 channel.send_message("yep".to_string(), cx).unwrap()
3998 })
3999 .await
4000 .unwrap();
4001 channel_a
4002 .condition(&cx_a, |channel, _| {
4003 channel_messages(channel)
4004 == [
4005 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4006 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4007 ("user_a".to_string(), "sup".to_string(), false),
4008 ("user_b".to_string(), "can you see this?".to_string(), false),
4009 ("user_a".to_string(), "you online?".to_string(), false),
4010 ("user_b".to_string(), "yep".to_string(), false),
4011 ]
4012 })
4013 .await;
4014 }
4015
4016 #[gpui::test(iterations = 10)]
4017 async fn test_contacts(
4018 cx_a: &mut TestAppContext,
4019 cx_b: &mut TestAppContext,
4020 cx_c: &mut TestAppContext,
4021 ) {
4022 cx_a.foreground().forbid_parking();
4023 let lang_registry = Arc::new(LanguageRegistry::test());
4024 let fs = FakeFs::new(cx_a.background());
4025
4026 // Connect to a server as 3 clients.
4027 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4028 let client_a = server.create_client(cx_a, "user_a").await;
4029 let client_b = server.create_client(cx_b, "user_b").await;
4030 let client_c = server.create_client(cx_c, "user_c").await;
4031
4032 // Share a worktree as client A.
4033 fs.insert_tree(
4034 "/a",
4035 json!({
4036 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4037 }),
4038 )
4039 .await;
4040
4041 let project_a = cx_a.update(|cx| {
4042 Project::local(
4043 client_a.clone(),
4044 client_a.user_store.clone(),
4045 lang_registry.clone(),
4046 fs.clone(),
4047 cx,
4048 )
4049 });
4050 let (worktree_a, _) = project_a
4051 .update(cx_a, |p, cx| {
4052 p.find_or_create_local_worktree("/a", true, cx)
4053 })
4054 .await
4055 .unwrap();
4056 worktree_a
4057 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4058 .await;
4059
4060 client_a
4061 .user_store
4062 .condition(&cx_a, |user_store, _| {
4063 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4064 })
4065 .await;
4066 client_b
4067 .user_store
4068 .condition(&cx_b, |user_store, _| {
4069 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4070 })
4071 .await;
4072 client_c
4073 .user_store
4074 .condition(&cx_c, |user_store, _| {
4075 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4076 })
4077 .await;
4078
4079 let project_id = project_a
4080 .update(cx_a, |project, _| project.next_remote_id())
4081 .await;
4082 project_a
4083 .update(cx_a, |project, cx| project.share(cx))
4084 .await
4085 .unwrap();
4086
4087 let _project_b = Project::remote(
4088 project_id,
4089 client_b.clone(),
4090 client_b.user_store.clone(),
4091 lang_registry.clone(),
4092 fs.clone(),
4093 &mut cx_b.to_async(),
4094 )
4095 .await
4096 .unwrap();
4097
4098 client_a
4099 .user_store
4100 .condition(&cx_a, |user_store, _| {
4101 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4102 })
4103 .await;
4104 client_b
4105 .user_store
4106 .condition(&cx_b, |user_store, _| {
4107 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4108 })
4109 .await;
4110 client_c
4111 .user_store
4112 .condition(&cx_c, |user_store, _| {
4113 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4114 })
4115 .await;
4116
4117 project_a
4118 .condition(&cx_a, |project, _| {
4119 project.collaborators().contains_key(&client_b.peer_id)
4120 })
4121 .await;
4122
4123 cx_a.update(move |_| drop(project_a));
4124 client_a
4125 .user_store
4126 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4127 .await;
4128 client_b
4129 .user_store
4130 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4131 .await;
4132 client_c
4133 .user_store
4134 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4135 .await;
4136
4137 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4138 user_store
4139 .contacts()
4140 .iter()
4141 .map(|contact| {
4142 let worktrees = contact
4143 .projects
4144 .iter()
4145 .map(|p| {
4146 (
4147 p.worktree_root_names[0].as_str(),
4148 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4149 )
4150 })
4151 .collect();
4152 (contact.user.github_login.as_str(), worktrees)
4153 })
4154 .collect()
4155 }
4156 }
4157
4158 #[gpui::test(iterations = 100)]
4159 async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4160 cx.foreground().forbid_parking();
4161 let max_peers = env::var("MAX_PEERS")
4162 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4163 .unwrap_or(5);
4164 let max_operations = env::var("OPERATIONS")
4165 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4166 .unwrap_or(10);
4167
4168 let rng = Arc::new(Mutex::new(rng));
4169
4170 let guest_lang_registry = Arc::new(LanguageRegistry::test());
4171 let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4172
4173 let fs = FakeFs::new(cx.background());
4174 fs.insert_tree(
4175 "/_collab",
4176 json!({
4177 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4178 }),
4179 )
4180 .await;
4181
4182 let operations = Rc::new(Cell::new(0));
4183 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4184 let mut clients = Vec::new();
4185
4186 let mut next_entity_id = 100000;
4187 let mut host_cx = TestAppContext::new(
4188 cx.foreground_platform(),
4189 cx.platform(),
4190 cx.foreground(),
4191 cx.background(),
4192 cx.font_cache(),
4193 cx.leak_detector(),
4194 next_entity_id,
4195 );
4196 let host = server.create_client(&mut host_cx, "host").await;
4197 let host_project = host_cx.update(|cx| {
4198 Project::local(
4199 host.client.clone(),
4200 host.user_store.clone(),
4201 Arc::new(LanguageRegistry::test()),
4202 fs.clone(),
4203 cx,
4204 )
4205 });
4206 let host_project_id = host_project
4207 .update(&mut host_cx, |p, _| p.next_remote_id())
4208 .await;
4209
4210 let (collab_worktree, _) = host_project
4211 .update(&mut host_cx, |project, cx| {
4212 project.find_or_create_local_worktree("/_collab", true, cx)
4213 })
4214 .await
4215 .unwrap();
4216 collab_worktree
4217 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4218 .await;
4219 host_project
4220 .update(&mut host_cx, |project, cx| project.share(cx))
4221 .await
4222 .unwrap();
4223
4224 clients.push(cx.foreground().spawn(host.simulate_host(
4225 host_project,
4226 language_server_config,
4227 operations.clone(),
4228 max_operations,
4229 rng.clone(),
4230 host_cx,
4231 )));
4232
4233 while operations.get() < max_operations {
4234 cx.background().simulate_random_delay().await;
4235 if clients.len() >= max_peers {
4236 break;
4237 } else if rng.lock().gen_bool(0.05) {
4238 operations.set(operations.get() + 1);
4239
4240 let guest_id = clients.len();
4241 log::info!("Adding guest {}", guest_id);
4242 next_entity_id += 100000;
4243 let mut guest_cx = TestAppContext::new(
4244 cx.foreground_platform(),
4245 cx.platform(),
4246 cx.foreground(),
4247 cx.background(),
4248 cx.font_cache(),
4249 cx.leak_detector(),
4250 next_entity_id,
4251 );
4252 let guest = server
4253 .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4254 .await;
4255 let guest_project = Project::remote(
4256 host_project_id,
4257 guest.client.clone(),
4258 guest.user_store.clone(),
4259 guest_lang_registry.clone(),
4260 FakeFs::new(cx.background()),
4261 &mut guest_cx.to_async(),
4262 )
4263 .await
4264 .unwrap();
4265 clients.push(cx.foreground().spawn(guest.simulate_guest(
4266 guest_id,
4267 guest_project,
4268 operations.clone(),
4269 max_operations,
4270 rng.clone(),
4271 guest_cx,
4272 )));
4273
4274 log::info!("Guest {} added", guest_id);
4275 }
4276 }
4277
4278 let mut clients = futures::future::join_all(clients).await;
4279 cx.foreground().run_until_parked();
4280
4281 let (host_client, mut host_cx) = clients.remove(0);
4282 let host_project = host_client.project.as_ref().unwrap();
4283 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4284 project
4285 .worktrees(cx)
4286 .map(|worktree| {
4287 let snapshot = worktree.read(cx).snapshot();
4288 (snapshot.id(), snapshot)
4289 })
4290 .collect::<BTreeMap<_, _>>()
4291 });
4292
4293 host_client
4294 .project
4295 .as_ref()
4296 .unwrap()
4297 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4298
4299 for (guest_client, mut guest_cx) in clients.into_iter() {
4300 let guest_id = guest_client.client.id();
4301 let worktree_snapshots =
4302 guest_client
4303 .project
4304 .as_ref()
4305 .unwrap()
4306 .read_with(&guest_cx, |project, cx| {
4307 project
4308 .worktrees(cx)
4309 .map(|worktree| {
4310 let worktree = worktree.read(cx);
4311 (worktree.id(), worktree.snapshot())
4312 })
4313 .collect::<BTreeMap<_, _>>()
4314 });
4315
4316 assert_eq!(
4317 worktree_snapshots.keys().collect::<Vec<_>>(),
4318 host_worktree_snapshots.keys().collect::<Vec<_>>(),
4319 "guest {} has different worktrees than the host",
4320 guest_id
4321 );
4322 for (id, host_snapshot) in &host_worktree_snapshots {
4323 let guest_snapshot = &worktree_snapshots[id];
4324 assert_eq!(
4325 guest_snapshot.root_name(),
4326 host_snapshot.root_name(),
4327 "guest {} has different root name than the host for worktree {}",
4328 guest_id,
4329 id
4330 );
4331 assert_eq!(
4332 guest_snapshot.entries(false).collect::<Vec<_>>(),
4333 host_snapshot.entries(false).collect::<Vec<_>>(),
4334 "guest {} has different snapshot than the host for worktree {}",
4335 guest_id,
4336 id
4337 );
4338 }
4339
4340 guest_client
4341 .project
4342 .as_ref()
4343 .unwrap()
4344 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4345
4346 for guest_buffer in &guest_client.buffers {
4347 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4348 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4349 project.buffer_for_id(buffer_id, cx).expect(&format!(
4350 "host does not have buffer for guest:{}, peer:{}, id:{}",
4351 guest_id, guest_client.peer_id, buffer_id
4352 ))
4353 });
4354 let path = host_buffer
4355 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4356
4357 assert_eq!(
4358 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4359 0,
4360 "guest {}, buffer {}, path {:?} has deferred operations",
4361 guest_id,
4362 buffer_id,
4363 path,
4364 );
4365 assert_eq!(
4366 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4367 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4368 "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4369 guest_id,
4370 buffer_id,
4371 path
4372 );
4373 }
4374
4375 guest_cx.update(|_| drop(guest_client));
4376 }
4377
4378 host_cx.update(|_| drop(host_client));
4379 }
4380
4381 struct TestServer {
4382 peer: Arc<Peer>,
4383 app_state: Arc<AppState>,
4384 server: Arc<Server>,
4385 foreground: Rc<executor::Foreground>,
4386 notifications: mpsc::UnboundedReceiver<()>,
4387 connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4388 forbid_connections: Arc<AtomicBool>,
4389 _test_db: TestDb,
4390 }
4391
4392 impl TestServer {
4393 async fn start(
4394 foreground: Rc<executor::Foreground>,
4395 background: Arc<executor::Background>,
4396 ) -> Self {
4397 let test_db = TestDb::fake(background);
4398 let app_state = Self::build_app_state(&test_db).await;
4399 let peer = Peer::new();
4400 let notifications = mpsc::unbounded();
4401 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4402 Self {
4403 peer,
4404 app_state,
4405 server,
4406 foreground,
4407 notifications: notifications.1,
4408 connection_killers: Default::default(),
4409 forbid_connections: Default::default(),
4410 _test_db: test_db,
4411 }
4412 }
4413
4414 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4415 cx.update(|cx| {
4416 let settings = Settings::test(cx);
4417 cx.add_app_state(settings);
4418 });
4419
4420 let http = FakeHttpClient::with_404_response();
4421 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4422 let client_name = name.to_string();
4423 let mut client = Client::new(http.clone());
4424 let server = self.server.clone();
4425 let connection_killers = self.connection_killers.clone();
4426 let forbid_connections = self.forbid_connections.clone();
4427 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4428
4429 Arc::get_mut(&mut client)
4430 .unwrap()
4431 .override_authenticate(move |cx| {
4432 cx.spawn(|_| async move {
4433 let access_token = "the-token".to_string();
4434 Ok(Credentials {
4435 user_id: user_id.0 as u64,
4436 access_token,
4437 })
4438 })
4439 })
4440 .override_establish_connection(move |credentials, cx| {
4441 assert_eq!(credentials.user_id, user_id.0 as u64);
4442 assert_eq!(credentials.access_token, "the-token");
4443
4444 let server = server.clone();
4445 let connection_killers = connection_killers.clone();
4446 let forbid_connections = forbid_connections.clone();
4447 let client_name = client_name.clone();
4448 let connection_id_tx = connection_id_tx.clone();
4449 cx.spawn(move |cx| async move {
4450 if forbid_connections.load(SeqCst) {
4451 Err(EstablishConnectionError::other(anyhow!(
4452 "server is forbidding connections"
4453 )))
4454 } else {
4455 let (client_conn, server_conn, kill_conn) =
4456 Connection::in_memory(cx.background());
4457 connection_killers.lock().insert(user_id, kill_conn);
4458 cx.background()
4459 .spawn(server.handle_connection(
4460 server_conn,
4461 client_name,
4462 user_id,
4463 Some(connection_id_tx),
4464 cx.background(),
4465 ))
4466 .detach();
4467 Ok(client_conn)
4468 }
4469 })
4470 });
4471
4472 client
4473 .authenticate_and_connect(&cx.to_async())
4474 .await
4475 .unwrap();
4476
4477 Channel::init(&client);
4478 Project::init(&client);
4479
4480 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4481 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4482
4483 let client = TestClient {
4484 client,
4485 peer_id,
4486 user_store,
4487 project: Default::default(),
4488 buffers: Default::default(),
4489 };
4490 client.wait_for_current_user(cx).await;
4491 client
4492 }
4493
4494 fn disconnect_client(&self, user_id: UserId) {
4495 self.connection_killers.lock().remove(&user_id);
4496 }
4497
4498 fn forbid_connections(&self) {
4499 self.forbid_connections.store(true, SeqCst);
4500 }
4501
4502 fn allow_connections(&self) {
4503 self.forbid_connections.store(false, SeqCst);
4504 }
4505
4506 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4507 let mut config = Config::default();
4508 config.session_secret = "a".repeat(32);
4509 config.database_url = test_db.url.clone();
4510 let github_client = github::AppClient::test();
4511 Arc::new(AppState {
4512 db: test_db.db().clone(),
4513 handlebars: Default::default(),
4514 auth_client: auth::build_client("", ""),
4515 repo_client: github::RepoClient::test(&github_client),
4516 github_client,
4517 config,
4518 })
4519 }
4520
4521 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4522 self.server.store.read()
4523 }
4524
4525 async fn condition<F>(&mut self, mut predicate: F)
4526 where
4527 F: FnMut(&Store) -> bool,
4528 {
4529 async_std::future::timeout(Duration::from_millis(500), async {
4530 while !(predicate)(&*self.server.store.read()) {
4531 self.foreground.start_waiting();
4532 self.notifications.next().await;
4533 self.foreground.finish_waiting();
4534 }
4535 })
4536 .await
4537 .expect("condition timed out");
4538 }
4539 }
4540
4541 impl Drop for TestServer {
4542 fn drop(&mut self) {
4543 self.peer.reset();
4544 }
4545 }
4546
4547 struct TestClient {
4548 client: Arc<Client>,
4549 pub peer_id: PeerId,
4550 pub user_store: ModelHandle<UserStore>,
4551 project: Option<ModelHandle<Project>>,
4552 buffers: HashSet<ModelHandle<language::Buffer>>,
4553 }
4554
4555 impl Deref for TestClient {
4556 type Target = Arc<Client>;
4557
4558 fn deref(&self) -> &Self::Target {
4559 &self.client
4560 }
4561 }
4562
4563 impl TestClient {
4564 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4565 UserId::from_proto(
4566 self.user_store
4567 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4568 )
4569 }
4570
4571 async fn wait_for_current_user(&self, cx: &TestAppContext) {
4572 let mut authed_user = self
4573 .user_store
4574 .read_with(cx, |user_store, _| user_store.watch_current_user());
4575 while authed_user.next().await.unwrap().is_none() {}
4576 }
4577
4578 fn simulate_host(
4579 mut self,
4580 project: ModelHandle<Project>,
4581 mut language_server_config: LanguageServerConfig,
4582 operations: Rc<Cell<usize>>,
4583 max_operations: usize,
4584 rng: Arc<Mutex<StdRng>>,
4585 mut cx: TestAppContext,
4586 ) -> impl Future<Output = (Self, TestAppContext)> {
4587 let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4588
4589 // Set up a fake language server.
4590 language_server_config.set_fake_initializer({
4591 let rng = rng.clone();
4592 let files = files.clone();
4593 let project = project.downgrade();
4594 move |fake_server| {
4595 fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4596 Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4597 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4598 range: lsp::Range::new(
4599 lsp::Position::new(0, 0),
4600 lsp::Position::new(0, 0),
4601 ),
4602 new_text: "the-new-text".to_string(),
4603 })),
4604 ..Default::default()
4605 }]))
4606 });
4607
4608 fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4609 Some(vec![lsp::CodeActionOrCommand::CodeAction(
4610 lsp::CodeAction {
4611 title: "the-code-action".to_string(),
4612 ..Default::default()
4613 },
4614 )])
4615 });
4616
4617 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4618 |params, _| {
4619 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4620 params.position,
4621 params.position,
4622 )))
4623 },
4624 );
4625
4626 fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4627 let files = files.clone();
4628 let rng = rng.clone();
4629 move |_, _| {
4630 let files = files.lock();
4631 let mut rng = rng.lock();
4632 let count = rng.gen_range::<usize, _>(1..3);
4633 let files = (0..count)
4634 .map(|_| files.choose(&mut *rng).unwrap())
4635 .collect::<Vec<_>>();
4636 log::info!("LSP: Returning definitions in files {:?}", &files);
4637 Some(lsp::GotoDefinitionResponse::Array(
4638 files
4639 .into_iter()
4640 .map(|file| lsp::Location {
4641 uri: lsp::Url::from_file_path(file).unwrap(),
4642 range: Default::default(),
4643 })
4644 .collect(),
4645 ))
4646 }
4647 });
4648
4649 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4650 let rng = rng.clone();
4651 let project = project.clone();
4652 move |params, mut cx| {
4653 if let Some(project) = project.upgrade(&cx) {
4654 project.update(&mut cx, |project, cx| {
4655 let path = params
4656 .text_document_position_params
4657 .text_document
4658 .uri
4659 .to_file_path()
4660 .unwrap();
4661 let (worktree, relative_path) =
4662 project.find_local_worktree(&path, cx)?;
4663 let project_path =
4664 ProjectPath::from((worktree.read(cx).id(), relative_path));
4665 let buffer =
4666 project.get_open_buffer(&project_path, cx)?.read(cx);
4667
4668 let mut highlights = Vec::new();
4669 let highlight_count = rng.lock().gen_range(1..=5);
4670 let mut prev_end = 0;
4671 for _ in 0..highlight_count {
4672 let range =
4673 buffer.random_byte_range(prev_end, &mut *rng.lock());
4674 let start = buffer
4675 .offset_to_point_utf16(range.start)
4676 .to_lsp_position();
4677 let end = buffer
4678 .offset_to_point_utf16(range.end)
4679 .to_lsp_position();
4680 highlights.push(lsp::DocumentHighlight {
4681 range: lsp::Range::new(start, end),
4682 kind: Some(lsp::DocumentHighlightKind::READ),
4683 });
4684 prev_end = range.end;
4685 }
4686 Some(highlights)
4687 })
4688 } else {
4689 None
4690 }
4691 }
4692 });
4693 }
4694 });
4695
4696 project.update(&mut cx, |project, _| {
4697 project.languages().add(Arc::new(Language::new(
4698 LanguageConfig {
4699 name: "Rust".into(),
4700 path_suffixes: vec!["rs".to_string()],
4701 language_server: Some(language_server_config),
4702 ..Default::default()
4703 },
4704 None,
4705 )));
4706 });
4707
4708 async move {
4709 let fs = project.read_with(&cx, |project, _| project.fs().clone());
4710 while operations.get() < max_operations {
4711 operations.set(operations.get() + 1);
4712
4713 let distribution = rng.lock().gen_range::<usize, _>(0..100);
4714 match distribution {
4715 0..=20 if !files.lock().is_empty() => {
4716 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4717 let mut path = path.as_path();
4718 while let Some(parent_path) = path.parent() {
4719 path = parent_path;
4720 if rng.lock().gen() {
4721 break;
4722 }
4723 }
4724
4725 log::info!("Host: find/create local worktree {:?}", path);
4726 let find_or_create_worktree = project.update(&mut cx, |project, cx| {
4727 project.find_or_create_local_worktree(path, true, cx)
4728 });
4729 let find_or_create_worktree = async move {
4730 find_or_create_worktree.await.unwrap();
4731 };
4732 if rng.lock().gen() {
4733 cx.background().spawn(find_or_create_worktree).detach();
4734 } else {
4735 find_or_create_worktree.await;
4736 }
4737 }
4738 10..=80 if !files.lock().is_empty() => {
4739 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4740 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4741 let (worktree, path) = project
4742 .update(&mut cx, |project, cx| {
4743 project.find_or_create_local_worktree(
4744 file.clone(),
4745 true,
4746 cx,
4747 )
4748 })
4749 .await
4750 .unwrap();
4751 let project_path =
4752 worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4753 log::info!(
4754 "Host: opening path {:?}, worktree {}, relative_path {:?}",
4755 file,
4756 project_path.0,
4757 project_path.1
4758 );
4759 let buffer = project
4760 .update(&mut cx, |project, cx| {
4761 project.open_buffer(project_path, cx)
4762 })
4763 .await
4764 .unwrap();
4765 self.buffers.insert(buffer.clone());
4766 buffer
4767 } else {
4768 self.buffers
4769 .iter()
4770 .choose(&mut *rng.lock())
4771 .unwrap()
4772 .clone()
4773 };
4774
4775 if rng.lock().gen_bool(0.1) {
4776 cx.update(|cx| {
4777 log::info!(
4778 "Host: dropping buffer {:?}",
4779 buffer.read(cx).file().unwrap().full_path(cx)
4780 );
4781 self.buffers.remove(&buffer);
4782 drop(buffer);
4783 });
4784 } else {
4785 buffer.update(&mut cx, |buffer, cx| {
4786 log::info!(
4787 "Host: updating buffer {:?} ({})",
4788 buffer.file().unwrap().full_path(cx),
4789 buffer.remote_id()
4790 );
4791 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4792 });
4793 }
4794 }
4795 _ => loop {
4796 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4797 let mut path = PathBuf::new();
4798 path.push("/");
4799 for _ in 0..path_component_count {
4800 let letter = rng.lock().gen_range(b'a'..=b'z');
4801 path.push(std::str::from_utf8(&[letter]).unwrap());
4802 }
4803 path.set_extension("rs");
4804 let parent_path = path.parent().unwrap();
4805
4806 log::info!("Host: creating file {:?}", path,);
4807
4808 if fs.create_dir(&parent_path).await.is_ok()
4809 && fs.create_file(&path, Default::default()).await.is_ok()
4810 {
4811 files.lock().push(path);
4812 break;
4813 } else {
4814 log::info!("Host: cannot create file");
4815 }
4816 },
4817 }
4818
4819 cx.background().simulate_random_delay().await;
4820 }
4821
4822 log::info!("Host done");
4823
4824 self.project = Some(project);
4825 (self, cx)
4826 }
4827 }
4828
4829 pub async fn simulate_guest(
4830 mut self,
4831 guest_id: usize,
4832 project: ModelHandle<Project>,
4833 operations: Rc<Cell<usize>>,
4834 max_operations: usize,
4835 rng: Arc<Mutex<StdRng>>,
4836 mut cx: TestAppContext,
4837 ) -> (Self, TestAppContext) {
4838 while operations.get() < max_operations {
4839 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4840 let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4841 project
4842 .worktrees(&cx)
4843 .filter(|worktree| {
4844 let worktree = worktree.read(cx);
4845 worktree.is_visible()
4846 && worktree.entries(false).any(|e| e.is_file())
4847 })
4848 .choose(&mut *rng.lock())
4849 }) {
4850 worktree
4851 } else {
4852 cx.background().simulate_random_delay().await;
4853 continue;
4854 };
4855
4856 operations.set(operations.get() + 1);
4857 let (worktree_root_name, project_path) =
4858 worktree.read_with(&cx, |worktree, _| {
4859 let entry = worktree
4860 .entries(false)
4861 .filter(|e| e.is_file())
4862 .choose(&mut *rng.lock())
4863 .unwrap();
4864 (
4865 worktree.root_name().to_string(),
4866 (worktree.id(), entry.path.clone()),
4867 )
4868 });
4869 log::info!(
4870 "Guest {}: opening path {:?} in worktree {} ({})",
4871 guest_id,
4872 project_path.1,
4873 project_path.0,
4874 worktree_root_name,
4875 );
4876 let buffer = project
4877 .update(&mut cx, |project, cx| {
4878 project.open_buffer(project_path.clone(), cx)
4879 })
4880 .await
4881 .unwrap();
4882 log::info!(
4883 "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
4884 guest_id,
4885 project_path.1,
4886 project_path.0,
4887 worktree_root_name,
4888 buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4889 );
4890 self.buffers.insert(buffer.clone());
4891 buffer
4892 } else {
4893 operations.set(operations.get() + 1);
4894
4895 self.buffers
4896 .iter()
4897 .choose(&mut *rng.lock())
4898 .unwrap()
4899 .clone()
4900 };
4901
4902 let choice = rng.lock().gen_range(0..100);
4903 match choice {
4904 0..=9 => {
4905 cx.update(|cx| {
4906 log::info!(
4907 "Guest {}: dropping buffer {:?}",
4908 guest_id,
4909 buffer.read(cx).file().unwrap().full_path(cx)
4910 );
4911 self.buffers.remove(&buffer);
4912 drop(buffer);
4913 });
4914 }
4915 10..=19 => {
4916 let completions = project.update(&mut cx, |project, cx| {
4917 log::info!(
4918 "Guest {}: requesting completions for buffer {} ({:?})",
4919 guest_id,
4920 buffer.read(cx).remote_id(),
4921 buffer.read(cx).file().unwrap().full_path(cx)
4922 );
4923 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4924 project.completions(&buffer, offset, cx)
4925 });
4926 let completions = cx.background().spawn(async move {
4927 completions.await.expect("completions request failed");
4928 });
4929 if rng.lock().gen_bool(0.3) {
4930 log::info!("Guest {}: detaching completions request", guest_id);
4931 completions.detach();
4932 } else {
4933 completions.await;
4934 }
4935 }
4936 20..=29 => {
4937 let code_actions = project.update(&mut cx, |project, cx| {
4938 log::info!(
4939 "Guest {}: requesting code actions for buffer {} ({:?})",
4940 guest_id,
4941 buffer.read(cx).remote_id(),
4942 buffer.read(cx).file().unwrap().full_path(cx)
4943 );
4944 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4945 project.code_actions(&buffer, range, cx)
4946 });
4947 let code_actions = cx.background().spawn(async move {
4948 code_actions.await.expect("code actions request failed");
4949 });
4950 if rng.lock().gen_bool(0.3) {
4951 log::info!("Guest {}: detaching code actions request", guest_id);
4952 code_actions.detach();
4953 } else {
4954 code_actions.await;
4955 }
4956 }
4957 30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4958 let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4959 log::info!(
4960 "Guest {}: saving buffer {} ({:?})",
4961 guest_id,
4962 buffer.remote_id(),
4963 buffer.file().unwrap().full_path(cx)
4964 );
4965 (buffer.version(), buffer.save(cx))
4966 });
4967 let save = cx.background().spawn(async move {
4968 let (saved_version, _) = save.await.expect("save request failed");
4969 assert!(saved_version.observed_all(&requested_version));
4970 });
4971 if rng.lock().gen_bool(0.3) {
4972 log::info!("Guest {}: detaching save request", guest_id);
4973 save.detach();
4974 } else {
4975 save.await;
4976 }
4977 }
4978 40..=44 => {
4979 let prepare_rename = project.update(&mut cx, |project, cx| {
4980 log::info!(
4981 "Guest {}: preparing rename for buffer {} ({:?})",
4982 guest_id,
4983 buffer.read(cx).remote_id(),
4984 buffer.read(cx).file().unwrap().full_path(cx)
4985 );
4986 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4987 project.prepare_rename(buffer, offset, cx)
4988 });
4989 let prepare_rename = cx.background().spawn(async move {
4990 prepare_rename.await.expect("prepare rename request failed");
4991 });
4992 if rng.lock().gen_bool(0.3) {
4993 log::info!("Guest {}: detaching prepare rename request", guest_id);
4994 prepare_rename.detach();
4995 } else {
4996 prepare_rename.await;
4997 }
4998 }
4999 45..=49 => {
5000 let definitions = project.update(&mut cx, |project, cx| {
5001 log::info!(
5002 "Guest {}: requesting definitions for buffer {} ({:?})",
5003 guest_id,
5004 buffer.read(cx).remote_id(),
5005 buffer.read(cx).file().unwrap().full_path(cx)
5006 );
5007 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5008 project.definition(&buffer, offset, cx)
5009 });
5010 let definitions = cx.background().spawn(async move {
5011 definitions.await.expect("definitions request failed")
5012 });
5013 if rng.lock().gen_bool(0.3) {
5014 log::info!("Guest {}: detaching definitions request", guest_id);
5015 definitions.detach();
5016 } else {
5017 self.buffers
5018 .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5019 }
5020 }
5021 50..=54 => {
5022 let highlights = project.update(&mut cx, |project, cx| {
5023 log::info!(
5024 "Guest {}: requesting highlights for buffer {} ({:?})",
5025 guest_id,
5026 buffer.read(cx).remote_id(),
5027 buffer.read(cx).file().unwrap().full_path(cx)
5028 );
5029 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5030 project.document_highlights(&buffer, offset, cx)
5031 });
5032 let highlights = cx.background().spawn(async move {
5033 highlights.await.expect("highlights request failed");
5034 });
5035 if rng.lock().gen_bool(0.3) {
5036 log::info!("Guest {}: detaching highlights request", guest_id);
5037 highlights.detach();
5038 } else {
5039 highlights.await;
5040 }
5041 }
5042 55..=59 => {
5043 let search = project.update(&mut cx, |project, cx| {
5044 let query = rng.lock().gen_range('a'..='z');
5045 log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5046 project.search(SearchQuery::text(query, false, false), cx)
5047 });
5048 let search = cx
5049 .background()
5050 .spawn(async move { search.await.expect("search request failed") });
5051 if rng.lock().gen_bool(0.3) {
5052 log::info!("Guest {}: detaching search request", guest_id);
5053 search.detach();
5054 } else {
5055 self.buffers.extend(search.await.into_keys());
5056 }
5057 }
5058 _ => {
5059 buffer.update(&mut cx, |buffer, cx| {
5060 log::info!(
5061 "Guest {}: updating buffer {} ({:?})",
5062 guest_id,
5063 buffer.remote_id(),
5064 buffer.file().unwrap().full_path(cx)
5065 );
5066 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5067 });
5068 }
5069 }
5070 cx.background().simulate_random_delay().await;
5071 }
5072
5073 log::info!("Guest {} done", guest_id);
5074
5075 self.project = Some(project);
5076 (self, cx)
5077 }
5078 }
5079
5080 impl Drop for TestClient {
5081 fn drop(&mut self) {
5082 self.client.tear_down();
5083 }
5084 }
5085
5086 impl Executor for Arc<gpui::executor::Background> {
5087 type Timer = gpui::executor::Timer;
5088
5089 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5090 self.spawn(future).detach();
5091 }
5092
5093 fn timer(&self, duration: Duration) -> Self::Timer {
5094 self.as_ref().timer(duration)
5095 }
5096 }
5097
5098 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5099 channel
5100 .messages()
5101 .cursor::<()>()
5102 .map(|m| {
5103 (
5104 m.sender.github_login.clone(),
5105 m.body.clone(),
5106 m.is_pending(),
5107 )
5108 })
5109 .collect()
5110 }
5111
5112 struct EmptyView;
5113
5114 impl gpui::Entity for EmptyView {
5115 type Event = ();
5116 }
5117
5118 impl gpui::View for EmptyView {
5119 fn ui_name() -> &'static str {
5120 "empty view"
5121 }
5122
5123 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5124 gpui::Element::boxed(gpui::elements::Empty)
5125 }
5126 }
5127}