1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_io::Timer;
10use async_std::task;
11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
12use collections::{HashMap, HashSet};
13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{
21 any::TypeId,
22 future::Future,
23 sync::Arc,
24 time::{Duration, Instant},
25};
26use store::{Store, Worktree};
27use surf::StatusCode;
28use tide::log;
29use tide::{
30 http::headers::{HeaderName, CONNECTION, UPGRADE},
31 Request, Response,
32};
33use time::OffsetDateTime;
34
35type MessageHandler = Box<
36 dyn Send
37 + Sync
38 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
39>;
40
41pub struct Server {
42 peer: Arc<Peer>,
43 store: RwLock<Store>,
44 app_state: Arc<AppState>,
45 handlers: HashMap<TypeId, MessageHandler>,
46 notifications: Option<mpsc::UnboundedSender<()>>,
47}
48
49pub trait Executor: Send + Clone {
50 type Timer: Send + Future;
51 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
52 fn timer(&self, duration: Duration) -> Self::Timer;
53}
54
55#[derive(Clone)]
56pub struct RealExecutor;
57
58const MESSAGE_COUNT_PER_PAGE: usize = 100;
59const MAX_MESSAGE_LEN: usize = 1024;
60
61impl Server {
62 pub fn new(
63 app_state: Arc<AppState>,
64 peer: Arc<Peer>,
65 notifications: Option<mpsc::UnboundedSender<()>>,
66 ) -> Arc<Self> {
67 let mut server = Self {
68 peer,
69 app_state,
70 store: Default::default(),
71 handlers: Default::default(),
72 notifications,
73 };
74
75 server
76 .add_request_handler(Server::ping)
77 .add_request_handler(Server::register_project)
78 .add_message_handler(Server::unregister_project)
79 .add_request_handler(Server::share_project)
80 .add_message_handler(Server::unshare_project)
81 .add_request_handler(Server::join_project)
82 .add_message_handler(Server::leave_project)
83 .add_request_handler(Server::register_worktree)
84 .add_message_handler(Server::unregister_worktree)
85 .add_request_handler(Server::update_worktree)
86 .add_message_handler(Server::update_diagnostic_summary)
87 .add_message_handler(Server::disk_based_diagnostics_updating)
88 .add_message_handler(Server::disk_based_diagnostics_updated)
89 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
90 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
91 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
92 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
93 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
94 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
95 .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
96 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
97 .add_request_handler(
98 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
99 )
100 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
101 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
102 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
103 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
104 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
105 .add_message_handler(Server::close_buffer)
106 .add_request_handler(Server::update_buffer)
107 .add_message_handler(Server::update_buffer_file)
108 .add_message_handler(Server::buffer_reloaded)
109 .add_message_handler(Server::buffer_saved)
110 .add_request_handler(Server::save_buffer)
111 .add_request_handler(Server::get_channels)
112 .add_request_handler(Server::get_users)
113 .add_request_handler(Server::join_channel)
114 .add_message_handler(Server::leave_channel)
115 .add_request_handler(Server::send_channel_message)
116 .add_request_handler(Server::get_channel_messages);
117
118 Arc::new(server)
119 }
120
121 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
122 where
123 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
124 Fut: 'static + Send + Future<Output = tide::Result<()>>,
125 M: EnvelopedMessage,
126 {
127 let prev_handler = self.handlers.insert(
128 TypeId::of::<M>(),
129 Box::new(move |server, envelope| {
130 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
131 (handler)(server, *envelope).boxed()
132 }),
133 );
134 if prev_handler.is_some() {
135 panic!("registered a handler for the same message twice");
136 }
137 self
138 }
139
140 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
141 where
142 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
143 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
144 M: RequestMessage,
145 {
146 self.add_message_handler(move |server, envelope| {
147 let receipt = envelope.receipt();
148 let response = (handler)(server.clone(), envelope);
149 async move {
150 match response.await {
151 Ok(response) => {
152 server.peer.respond(receipt, response)?;
153 Ok(())
154 }
155 Err(error) => {
156 server.peer.respond_with_error(
157 receipt,
158 proto::Error {
159 message: error.to_string(),
160 },
161 )?;
162 Err(error)
163 }
164 }
165 }
166 })
167 }
168
169 pub fn handle_connection<E: Executor>(
170 self: &Arc<Self>,
171 connection: Connection,
172 addr: String,
173 user_id: UserId,
174 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
175 executor: E,
176 ) -> impl Future<Output = ()> {
177 let mut this = self.clone();
178 async move {
179 let (connection_id, handle_io, mut incoming_rx) = this
180 .peer
181 .add_connection(connection, {
182 let executor = executor.clone();
183 move |duration| {
184 let timer = executor.timer(duration);
185 async move {
186 timer.await;
187 }
188 }
189 })
190 .await;
191
192 if let Some(send_connection_id) = send_connection_id.as_mut() {
193 let _ = send_connection_id.send(connection_id).await;
194 }
195
196 this.state_mut().add_connection(connection_id, user_id);
197 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
198 log::error!("error updating contacts for {:?}: {}", user_id, err);
199 }
200
201 let handle_io = handle_io.fuse();
202 futures::pin_mut!(handle_io);
203 loop {
204 let next_message = incoming_rx.next().fuse();
205 futures::pin_mut!(next_message);
206 futures::select_biased! {
207 result = handle_io => {
208 if let Err(err) = result {
209 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
210 }
211 break;
212 }
213 message = next_message => {
214 if let Some(message) = message {
215 let start_time = Instant::now();
216 let type_name = message.payload_type_name();
217 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
218 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
219 let notifications = this.notifications.clone();
220 let is_background = message.is_background();
221 let handle_message = (handler)(this.clone(), message);
222 let handle_message = async move {
223 if let Err(err) = handle_message.await {
224 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
225 } else {
226 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
227 }
228 if let Some(mut notifications) = notifications {
229 let _ = notifications.send(()).await;
230 }
231 };
232 if is_background {
233 executor.spawn_detached(handle_message);
234 } else {
235 handle_message.await;
236 }
237 } else {
238 log::warn!("unhandled message: {}", type_name);
239 }
240 } else {
241 log::info!("rpc connection closed {:?}", addr);
242 break;
243 }
244 }
245 }
246 }
247
248 if let Err(err) = this.sign_out(connection_id).await {
249 log::error!("error signing out connection {:?} - {:?}", addr, err);
250 }
251 }
252 }
253
254 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
255 self.peer.disconnect(connection_id);
256 let removed_connection = self.state_mut().remove_connection(connection_id)?;
257
258 for (project_id, project) in removed_connection.hosted_projects {
259 if let Some(share) = project.share {
260 broadcast(
261 connection_id,
262 share.guests.keys().copied().collect(),
263 |conn_id| {
264 self.peer
265 .send(conn_id, proto::UnshareProject { project_id })
266 },
267 )?;
268 }
269 }
270
271 for (project_id, peer_ids) in removed_connection.guest_project_ids {
272 broadcast(connection_id, peer_ids, |conn_id| {
273 self.peer.send(
274 conn_id,
275 proto::RemoveProjectCollaborator {
276 project_id,
277 peer_id: connection_id.0,
278 },
279 )
280 })?;
281 }
282
283 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
284 Ok(())
285 }
286
287 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
288 Ok(proto::Ack {})
289 }
290
291 async fn register_project(
292 mut self: Arc<Server>,
293 request: TypedEnvelope<proto::RegisterProject>,
294 ) -> tide::Result<proto::RegisterProjectResponse> {
295 let project_id = {
296 let mut state = self.state_mut();
297 let user_id = state.user_id_for_connection(request.sender_id)?;
298 state.register_project(request.sender_id, user_id)
299 };
300 Ok(proto::RegisterProjectResponse { project_id })
301 }
302
303 async fn unregister_project(
304 mut self: Arc<Server>,
305 request: TypedEnvelope<proto::UnregisterProject>,
306 ) -> tide::Result<()> {
307 let project = self
308 .state_mut()
309 .unregister_project(request.payload.project_id, request.sender_id)?;
310 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
311 Ok(())
312 }
313
314 async fn share_project(
315 mut self: Arc<Server>,
316 request: TypedEnvelope<proto::ShareProject>,
317 ) -> tide::Result<proto::Ack> {
318 self.state_mut()
319 .share_project(request.payload.project_id, request.sender_id);
320 Ok(proto::Ack {})
321 }
322
323 async fn unshare_project(
324 mut self: Arc<Server>,
325 request: TypedEnvelope<proto::UnshareProject>,
326 ) -> tide::Result<()> {
327 let project_id = request.payload.project_id;
328 let project = self
329 .state_mut()
330 .unshare_project(project_id, request.sender_id)?;
331
332 broadcast(request.sender_id, project.connection_ids, |conn_id| {
333 self.peer
334 .send(conn_id, proto::UnshareProject { project_id })
335 })?;
336 self.update_contacts_for_users(&project.authorized_user_ids)?;
337 Ok(())
338 }
339
340 async fn join_project(
341 mut self: Arc<Server>,
342 request: TypedEnvelope<proto::JoinProject>,
343 ) -> tide::Result<proto::JoinProjectResponse> {
344 let project_id = request.payload.project_id;
345
346 let user_id = self.state().user_id_for_connection(request.sender_id)?;
347 let (response, connection_ids, contact_user_ids) = self
348 .state_mut()
349 .join_project(request.sender_id, user_id, project_id)
350 .and_then(|joined| {
351 let share = joined.project.share()?;
352 let peer_count = share.guests.len();
353 let mut collaborators = Vec::with_capacity(peer_count);
354 collaborators.push(proto::Collaborator {
355 peer_id: joined.project.host_connection_id.0,
356 replica_id: 0,
357 user_id: joined.project.host_user_id.to_proto(),
358 });
359 let worktrees = share
360 .worktrees
361 .iter()
362 .filter_map(|(id, shared_worktree)| {
363 let worktree = joined.project.worktrees.get(&id)?;
364 Some(proto::Worktree {
365 id: *id,
366 root_name: worktree.root_name.clone(),
367 entries: shared_worktree.entries.values().cloned().collect(),
368 diagnostic_summaries: shared_worktree
369 .diagnostic_summaries
370 .values()
371 .cloned()
372 .collect(),
373 visible: worktree.visible,
374 })
375 })
376 .collect();
377 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
378 if *peer_conn_id != request.sender_id {
379 collaborators.push(proto::Collaborator {
380 peer_id: peer_conn_id.0,
381 replica_id: *peer_replica_id as u32,
382 user_id: peer_user_id.to_proto(),
383 });
384 }
385 }
386 let response = proto::JoinProjectResponse {
387 worktrees,
388 replica_id: joined.replica_id as u32,
389 collaborators,
390 };
391 let connection_ids = joined.project.connection_ids();
392 let contact_user_ids = joined.project.authorized_user_ids();
393 Ok((response, connection_ids, contact_user_ids))
394 })?;
395
396 broadcast(request.sender_id, connection_ids, |conn_id| {
397 self.peer.send(
398 conn_id,
399 proto::AddProjectCollaborator {
400 project_id,
401 collaborator: Some(proto::Collaborator {
402 peer_id: request.sender_id.0,
403 replica_id: response.replica_id,
404 user_id: user_id.to_proto(),
405 }),
406 },
407 )
408 })?;
409 self.update_contacts_for_users(&contact_user_ids)?;
410 Ok(response)
411 }
412
413 async fn leave_project(
414 mut self: Arc<Server>,
415 request: TypedEnvelope<proto::LeaveProject>,
416 ) -> tide::Result<()> {
417 let sender_id = request.sender_id;
418 let project_id = request.payload.project_id;
419 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
420
421 broadcast(sender_id, worktree.connection_ids, |conn_id| {
422 self.peer.send(
423 conn_id,
424 proto::RemoveProjectCollaborator {
425 project_id,
426 peer_id: sender_id.0,
427 },
428 )
429 })?;
430 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
431
432 Ok(())
433 }
434
435 async fn register_worktree(
436 mut self: Arc<Server>,
437 request: TypedEnvelope<proto::RegisterWorktree>,
438 ) -> tide::Result<proto::Ack> {
439 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
440
441 let mut contact_user_ids = HashSet::default();
442 contact_user_ids.insert(host_user_id);
443 for github_login in &request.payload.authorized_logins {
444 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
445 contact_user_ids.insert(contact_user_id);
446 }
447
448 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
449 let guest_connection_ids;
450 {
451 let mut state = self.state_mut();
452 guest_connection_ids = state
453 .read_project(request.payload.project_id, request.sender_id)?
454 .guest_connection_ids();
455 state.register_worktree(
456 request.payload.project_id,
457 request.payload.worktree_id,
458 request.sender_id,
459 Worktree {
460 authorized_user_ids: contact_user_ids.clone(),
461 root_name: request.payload.root_name.clone(),
462 visible: request.payload.visible,
463 },
464 )?;
465 }
466 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
467 self.peer
468 .forward_send(request.sender_id, connection_id, request.payload.clone())
469 })?;
470 self.update_contacts_for_users(&contact_user_ids)?;
471 Ok(proto::Ack {})
472 }
473
474 async fn unregister_worktree(
475 mut self: Arc<Server>,
476 request: TypedEnvelope<proto::UnregisterWorktree>,
477 ) -> tide::Result<()> {
478 let project_id = request.payload.project_id;
479 let worktree_id = request.payload.worktree_id;
480 let (worktree, guest_connection_ids) =
481 self.state_mut()
482 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
483 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
484 self.peer.send(
485 conn_id,
486 proto::UnregisterWorktree {
487 project_id,
488 worktree_id,
489 },
490 )
491 })?;
492 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
493 Ok(())
494 }
495
496 async fn update_worktree(
497 mut self: Arc<Server>,
498 request: TypedEnvelope<proto::UpdateWorktree>,
499 ) -> tide::Result<proto::Ack> {
500 let connection_ids = self.state_mut().update_worktree(
501 request.sender_id,
502 request.payload.project_id,
503 request.payload.worktree_id,
504 &request.payload.removed_entries,
505 &request.payload.updated_entries,
506 )?;
507
508 broadcast(request.sender_id, connection_ids, |connection_id| {
509 self.peer
510 .forward_send(request.sender_id, connection_id, request.payload.clone())
511 })?;
512
513 Ok(proto::Ack {})
514 }
515
516 async fn update_diagnostic_summary(
517 mut self: Arc<Server>,
518 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
519 ) -> tide::Result<()> {
520 let summary = request
521 .payload
522 .summary
523 .clone()
524 .ok_or_else(|| anyhow!("invalid summary"))?;
525 let receiver_ids = self.state_mut().update_diagnostic_summary(
526 request.payload.project_id,
527 request.payload.worktree_id,
528 request.sender_id,
529 summary,
530 )?;
531
532 broadcast(request.sender_id, receiver_ids, |connection_id| {
533 self.peer
534 .forward_send(request.sender_id, connection_id, request.payload.clone())
535 })?;
536 Ok(())
537 }
538
539 async fn disk_based_diagnostics_updating(
540 self: Arc<Server>,
541 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
542 ) -> tide::Result<()> {
543 let receiver_ids = self
544 .state()
545 .project_connection_ids(request.payload.project_id, request.sender_id)?;
546 broadcast(request.sender_id, receiver_ids, |connection_id| {
547 self.peer
548 .forward_send(request.sender_id, connection_id, request.payload.clone())
549 })?;
550 Ok(())
551 }
552
553 async fn disk_based_diagnostics_updated(
554 self: Arc<Server>,
555 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
556 ) -> tide::Result<()> {
557 let receiver_ids = self
558 .state()
559 .project_connection_ids(request.payload.project_id, request.sender_id)?;
560 broadcast(request.sender_id, receiver_ids, |connection_id| {
561 self.peer
562 .forward_send(request.sender_id, connection_id, request.payload.clone())
563 })?;
564 Ok(())
565 }
566
567 async fn forward_project_request<T>(
568 self: Arc<Server>,
569 request: TypedEnvelope<T>,
570 ) -> tide::Result<T::Response>
571 where
572 T: EntityMessage + RequestMessage,
573 {
574 let host_connection_id = self
575 .state()
576 .read_project(request.payload.remote_entity_id(), request.sender_id)?
577 .host_connection_id;
578 Ok(self
579 .peer
580 .forward_request(request.sender_id, host_connection_id, request.payload)
581 .await?)
582 }
583
584 async fn close_buffer(
585 self: Arc<Server>,
586 request: TypedEnvelope<proto::CloseBuffer>,
587 ) -> tide::Result<()> {
588 let host_connection_id = self
589 .state()
590 .read_project(request.payload.project_id, request.sender_id)?
591 .host_connection_id;
592 self.peer
593 .forward_send(request.sender_id, host_connection_id, request.payload)?;
594 Ok(())
595 }
596
597 async fn save_buffer(
598 self: Arc<Server>,
599 request: TypedEnvelope<proto::SaveBuffer>,
600 ) -> tide::Result<proto::BufferSaved> {
601 let host;
602 let mut guests;
603 {
604 let state = self.state();
605 let project = state.read_project(request.payload.project_id, request.sender_id)?;
606 host = project.host_connection_id;
607 guests = project.guest_connection_ids()
608 }
609
610 let response = self
611 .peer
612 .forward_request(request.sender_id, host, request.payload.clone())
613 .await?;
614
615 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
616 broadcast(host, guests, |conn_id| {
617 self.peer.forward_send(host, conn_id, response.clone())
618 })?;
619
620 Ok(response)
621 }
622
623 async fn update_buffer(
624 self: Arc<Server>,
625 request: TypedEnvelope<proto::UpdateBuffer>,
626 ) -> tide::Result<proto::Ack> {
627 let receiver_ids = self
628 .state()
629 .project_connection_ids(request.payload.project_id, request.sender_id)?;
630 broadcast(request.sender_id, receiver_ids, |connection_id| {
631 self.peer
632 .forward_send(request.sender_id, connection_id, request.payload.clone())
633 })?;
634 Ok(proto::Ack {})
635 }
636
637 async fn update_buffer_file(
638 self: Arc<Server>,
639 request: TypedEnvelope<proto::UpdateBufferFile>,
640 ) -> tide::Result<()> {
641 let receiver_ids = self
642 .state()
643 .project_connection_ids(request.payload.project_id, request.sender_id)?;
644 broadcast(request.sender_id, receiver_ids, |connection_id| {
645 self.peer
646 .forward_send(request.sender_id, connection_id, request.payload.clone())
647 })?;
648 Ok(())
649 }
650
651 async fn buffer_reloaded(
652 self: Arc<Server>,
653 request: TypedEnvelope<proto::BufferReloaded>,
654 ) -> tide::Result<()> {
655 let receiver_ids = self
656 .state()
657 .project_connection_ids(request.payload.project_id, request.sender_id)?;
658 broadcast(request.sender_id, receiver_ids, |connection_id| {
659 self.peer
660 .forward_send(request.sender_id, connection_id, request.payload.clone())
661 })?;
662 Ok(())
663 }
664
665 async fn buffer_saved(
666 self: Arc<Server>,
667 request: TypedEnvelope<proto::BufferSaved>,
668 ) -> tide::Result<()> {
669 let receiver_ids = self
670 .state()
671 .project_connection_ids(request.payload.project_id, request.sender_id)?;
672 broadcast(request.sender_id, receiver_ids, |connection_id| {
673 self.peer
674 .forward_send(request.sender_id, connection_id, request.payload.clone())
675 })?;
676 Ok(())
677 }
678
679 async fn get_channels(
680 self: Arc<Server>,
681 request: TypedEnvelope<proto::GetChannels>,
682 ) -> tide::Result<proto::GetChannelsResponse> {
683 let user_id = self.state().user_id_for_connection(request.sender_id)?;
684 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
685 Ok(proto::GetChannelsResponse {
686 channels: channels
687 .into_iter()
688 .map(|chan| proto::Channel {
689 id: chan.id.to_proto(),
690 name: chan.name,
691 })
692 .collect(),
693 })
694 }
695
696 async fn get_users(
697 self: Arc<Server>,
698 request: TypedEnvelope<proto::GetUsers>,
699 ) -> tide::Result<proto::GetUsersResponse> {
700 let user_ids = request
701 .payload
702 .user_ids
703 .into_iter()
704 .map(UserId::from_proto)
705 .collect();
706 let users = self
707 .app_state
708 .db
709 .get_users_by_ids(user_ids)
710 .await?
711 .into_iter()
712 .map(|user| proto::User {
713 id: user.id.to_proto(),
714 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
715 github_login: user.github_login,
716 })
717 .collect();
718 Ok(proto::GetUsersResponse { users })
719 }
720
721 fn update_contacts_for_users<'a>(
722 self: &Arc<Server>,
723 user_ids: impl IntoIterator<Item = &'a UserId>,
724 ) -> anyhow::Result<()> {
725 let mut result = Ok(());
726 let state = self.state();
727 for user_id in user_ids {
728 let contacts = state.contacts_for_user(*user_id);
729 for connection_id in state.connection_ids_for_user(*user_id) {
730 if let Err(error) = self.peer.send(
731 connection_id,
732 proto::UpdateContacts {
733 contacts: contacts.clone(),
734 },
735 ) {
736 result = Err(error);
737 }
738 }
739 }
740 result
741 }
742
743 async fn join_channel(
744 mut self: Arc<Self>,
745 request: TypedEnvelope<proto::JoinChannel>,
746 ) -> tide::Result<proto::JoinChannelResponse> {
747 let user_id = self.state().user_id_for_connection(request.sender_id)?;
748 let channel_id = ChannelId::from_proto(request.payload.channel_id);
749 if !self
750 .app_state
751 .db
752 .can_user_access_channel(user_id, channel_id)
753 .await?
754 {
755 Err(anyhow!("access denied"))?;
756 }
757
758 self.state_mut().join_channel(request.sender_id, channel_id);
759 let messages = self
760 .app_state
761 .db
762 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
763 .await?
764 .into_iter()
765 .map(|msg| proto::ChannelMessage {
766 id: msg.id.to_proto(),
767 body: msg.body,
768 timestamp: msg.sent_at.unix_timestamp() as u64,
769 sender_id: msg.sender_id.to_proto(),
770 nonce: Some(msg.nonce.as_u128().into()),
771 })
772 .collect::<Vec<_>>();
773 Ok(proto::JoinChannelResponse {
774 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
775 messages,
776 })
777 }
778
779 async fn leave_channel(
780 mut self: Arc<Self>,
781 request: TypedEnvelope<proto::LeaveChannel>,
782 ) -> tide::Result<()> {
783 let user_id = self.state().user_id_for_connection(request.sender_id)?;
784 let channel_id = ChannelId::from_proto(request.payload.channel_id);
785 if !self
786 .app_state
787 .db
788 .can_user_access_channel(user_id, channel_id)
789 .await?
790 {
791 Err(anyhow!("access denied"))?;
792 }
793
794 self.state_mut()
795 .leave_channel(request.sender_id, channel_id);
796
797 Ok(())
798 }
799
800 async fn send_channel_message(
801 self: Arc<Self>,
802 request: TypedEnvelope<proto::SendChannelMessage>,
803 ) -> tide::Result<proto::SendChannelMessageResponse> {
804 let channel_id = ChannelId::from_proto(request.payload.channel_id);
805 let user_id;
806 let connection_ids;
807 {
808 let state = self.state();
809 user_id = state.user_id_for_connection(request.sender_id)?;
810 connection_ids = state.channel_connection_ids(channel_id)?;
811 }
812
813 // Validate the message body.
814 let body = request.payload.body.trim().to_string();
815 if body.len() > MAX_MESSAGE_LEN {
816 return Err(anyhow!("message is too long"))?;
817 }
818 if body.is_empty() {
819 return Err(anyhow!("message can't be blank"))?;
820 }
821
822 let timestamp = OffsetDateTime::now_utc();
823 let nonce = request
824 .payload
825 .nonce
826 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
827
828 let message_id = self
829 .app_state
830 .db
831 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
832 .await?
833 .to_proto();
834 let message = proto::ChannelMessage {
835 sender_id: user_id.to_proto(),
836 id: message_id,
837 body,
838 timestamp: timestamp.unix_timestamp() as u64,
839 nonce: Some(nonce),
840 };
841 broadcast(request.sender_id, connection_ids, |conn_id| {
842 self.peer.send(
843 conn_id,
844 proto::ChannelMessageSent {
845 channel_id: channel_id.to_proto(),
846 message: Some(message.clone()),
847 },
848 )
849 })?;
850 Ok(proto::SendChannelMessageResponse {
851 message: Some(message),
852 })
853 }
854
855 async fn get_channel_messages(
856 self: Arc<Self>,
857 request: TypedEnvelope<proto::GetChannelMessages>,
858 ) -> tide::Result<proto::GetChannelMessagesResponse> {
859 let user_id = self.state().user_id_for_connection(request.sender_id)?;
860 let channel_id = ChannelId::from_proto(request.payload.channel_id);
861 if !self
862 .app_state
863 .db
864 .can_user_access_channel(user_id, channel_id)
865 .await?
866 {
867 Err(anyhow!("access denied"))?;
868 }
869
870 let messages = self
871 .app_state
872 .db
873 .get_channel_messages(
874 channel_id,
875 MESSAGE_COUNT_PER_PAGE,
876 Some(MessageId::from_proto(request.payload.before_message_id)),
877 )
878 .await?
879 .into_iter()
880 .map(|msg| proto::ChannelMessage {
881 id: msg.id.to_proto(),
882 body: msg.body,
883 timestamp: msg.sent_at.unix_timestamp() as u64,
884 sender_id: msg.sender_id.to_proto(),
885 nonce: Some(msg.nonce.as_u128().into()),
886 })
887 .collect::<Vec<_>>();
888
889 Ok(proto::GetChannelMessagesResponse {
890 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
891 messages,
892 })
893 }
894
895 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
896 self.store.read()
897 }
898
899 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
900 self.store.write()
901 }
902}
903
904impl Executor for RealExecutor {
905 type Timer = Timer;
906
907 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
908 task::spawn(future);
909 }
910
911 fn timer(&self, duration: Duration) -> Self::Timer {
912 Timer::after(duration)
913 }
914}
915
916fn broadcast<F>(
917 sender_id: ConnectionId,
918 receiver_ids: Vec<ConnectionId>,
919 mut f: F,
920) -> anyhow::Result<()>
921where
922 F: FnMut(ConnectionId) -> anyhow::Result<()>,
923{
924 let mut result = Ok(());
925 for receiver_id in receiver_ids {
926 if receiver_id != sender_id {
927 if let Err(error) = f(receiver_id) {
928 if result.is_ok() {
929 result = Err(error);
930 }
931 }
932 }
933 }
934 result
935}
936
937pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
938 let server = Server::new(app.state().clone(), rpc.clone(), None);
939 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
940 let server = server.clone();
941 async move {
942 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
943
944 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
945 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
946 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
947 let client_protocol_version: Option<u32> = request
948 .header("X-Zed-Protocol-Version")
949 .and_then(|v| v.as_str().parse().ok());
950
951 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
952 return Ok(Response::new(StatusCode::UpgradeRequired));
953 }
954
955 let header = match request.header("Sec-Websocket-Key") {
956 Some(h) => h.as_str(),
957 None => return Err(anyhow!("expected sec-websocket-key"))?,
958 };
959
960 let user_id = process_auth_header(&request).await?;
961
962 let mut response = Response::new(StatusCode::SwitchingProtocols);
963 response.insert_header(UPGRADE, "websocket");
964 response.insert_header(CONNECTION, "Upgrade");
965 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
966 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
967 response.insert_header("Sec-Websocket-Version", "13");
968
969 let http_res: &mut tide::http::Response = response.as_mut();
970 let upgrade_receiver = http_res.recv_upgrade().await;
971 let addr = request.remote().unwrap_or("unknown").to_string();
972 task::spawn(async move {
973 if let Some(stream) = upgrade_receiver.await {
974 server
975 .handle_connection(
976 Connection::new(
977 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
978 ),
979 addr,
980 user_id,
981 None,
982 RealExecutor,
983 )
984 .await;
985 }
986 });
987
988 Ok(response)
989 }
990 });
991}
992
993fn header_contains_ignore_case<T>(
994 request: &tide::Request<T>,
995 header_name: HeaderName,
996 value: &str,
997) -> bool {
998 request
999 .header(header_name)
1000 .map(|h| {
1001 h.as_str()
1002 .split(',')
1003 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1004 })
1005 .unwrap_or(false)
1006}
1007
1008#[cfg(test)]
1009mod tests {
1010 use super::*;
1011 use crate::{
1012 auth,
1013 db::{tests::TestDb, UserId},
1014 github, AppState, Config,
1015 };
1016 use ::rpc::Peer;
1017 use client::{
1018 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1019 EstablishConnectionError, UserStore,
1020 };
1021 use collections::BTreeMap;
1022 use editor::{
1023 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1024 Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1025 };
1026 use gpui::{executor, ModelHandle, TestAppContext};
1027 use language::{
1028 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1029 LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1030 };
1031 use lsp;
1032 use parking_lot::Mutex;
1033 use postage::{barrier, watch};
1034 use project::{
1035 fs::{FakeFs, Fs as _},
1036 search::SearchQuery,
1037 DiagnosticSummary, Project, ProjectPath,
1038 };
1039 use rand::prelude::*;
1040 use rpc::PeerId;
1041 use serde_json::json;
1042 use sqlx::types::time::OffsetDateTime;
1043 use std::{
1044 cell::Cell,
1045 env,
1046 ops::Deref,
1047 path::{Path, PathBuf},
1048 rc::Rc,
1049 sync::{
1050 atomic::{AtomicBool, Ordering::SeqCst},
1051 Arc,
1052 },
1053 time::Duration,
1054 };
1055 use workspace::{Settings, Workspace, WorkspaceParams};
1056
1057 #[cfg(test)]
1058 #[ctor::ctor]
1059 fn init_logger() {
1060 if std::env::var("RUST_LOG").is_ok() {
1061 env_logger::init();
1062 }
1063 }
1064
1065 #[gpui::test(iterations = 10)]
1066 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1067 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1068 let lang_registry = Arc::new(LanguageRegistry::new());
1069 let fs = FakeFs::new(cx_a.background());
1070 cx_a.foreground().forbid_parking();
1071
1072 // Connect to a server as 2 clients.
1073 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1074 let client_a = server.create_client(cx_a, "user_a").await;
1075 let client_b = server.create_client(cx_b, "user_b").await;
1076
1077 // Share a project as client A
1078 fs.insert_tree(
1079 "/a",
1080 json!({
1081 ".zed.toml": r#"collaborators = ["user_b"]"#,
1082 "a.txt": "a-contents",
1083 "b.txt": "b-contents",
1084 }),
1085 )
1086 .await;
1087 let project_a = cx_a.update(|cx| {
1088 Project::local(
1089 client_a.clone(),
1090 client_a.user_store.clone(),
1091 lang_registry.clone(),
1092 fs.clone(),
1093 cx,
1094 )
1095 });
1096 let (worktree_a, _) = project_a
1097 .update(cx_a, |p, cx| {
1098 p.find_or_create_local_worktree("/a", true, cx)
1099 })
1100 .await
1101 .unwrap();
1102 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1103 worktree_a
1104 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1105 .await;
1106 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1107 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1108
1109 // Join that project as client B
1110 let project_b = Project::remote(
1111 project_id,
1112 client_b.clone(),
1113 client_b.user_store.clone(),
1114 lang_registry.clone(),
1115 fs.clone(),
1116 &mut cx_b.to_async(),
1117 )
1118 .await
1119 .unwrap();
1120
1121 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1122 assert_eq!(
1123 project
1124 .collaborators()
1125 .get(&client_a.peer_id)
1126 .unwrap()
1127 .user
1128 .github_login,
1129 "user_a"
1130 );
1131 project.replica_id()
1132 });
1133 project_a
1134 .condition(&cx_a, |tree, _| {
1135 tree.collaborators()
1136 .get(&client_b.peer_id)
1137 .map_or(false, |collaborator| {
1138 collaborator.replica_id == replica_id_b
1139 && collaborator.user.github_login == "user_b"
1140 })
1141 })
1142 .await;
1143
1144 // Open the same file as client B and client A.
1145 let buffer_b = project_b
1146 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1147 .await
1148 .unwrap();
1149 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1150 buffer_b.read_with(cx_b, |buf, cx| {
1151 assert_eq!(buf.read(cx).text(), "b-contents")
1152 });
1153 project_a.read_with(cx_a, |project, cx| {
1154 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1155 });
1156 let buffer_a = project_a
1157 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1158 .await
1159 .unwrap();
1160
1161 let editor_b = cx_b.add_view(window_b, |cx| {
1162 Editor::for_buffer(
1163 buffer_b,
1164 None,
1165 watch::channel_with(Settings::test(cx)).1,
1166 cx,
1167 )
1168 });
1169
1170 // TODO
1171 // // Create a selection set as client B and see that selection set as client A.
1172 // buffer_a
1173 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1174 // .await;
1175
1176 // Edit the buffer as client B and see that edit as client A.
1177 editor_b.update(cx_b, |editor, cx| {
1178 editor.handle_input(&Input("ok, ".into()), cx)
1179 });
1180 buffer_a
1181 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1182 .await;
1183
1184 // TODO
1185 // // Remove the selection set as client B, see those selections disappear as client A.
1186 cx_b.update(move |_| drop(editor_b));
1187 // buffer_a
1188 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1189 // .await;
1190
1191 // Dropping the client B's project removes client B from client A's collaborators.
1192 cx_b.update(move |_| drop(project_b));
1193 project_a
1194 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1195 .await;
1196 }
1197
1198 #[gpui::test(iterations = 10)]
1199 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1200 let lang_registry = Arc::new(LanguageRegistry::new());
1201 let fs = FakeFs::new(cx_a.background());
1202 cx_a.foreground().forbid_parking();
1203
1204 // Connect to a server as 2 clients.
1205 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1206 let client_a = server.create_client(cx_a, "user_a").await;
1207 let client_b = server.create_client(cx_b, "user_b").await;
1208
1209 // Share a project as client A
1210 fs.insert_tree(
1211 "/a",
1212 json!({
1213 ".zed.toml": r#"collaborators = ["user_b"]"#,
1214 "a.txt": "a-contents",
1215 "b.txt": "b-contents",
1216 }),
1217 )
1218 .await;
1219 let project_a = cx_a.update(|cx| {
1220 Project::local(
1221 client_a.clone(),
1222 client_a.user_store.clone(),
1223 lang_registry.clone(),
1224 fs.clone(),
1225 cx,
1226 )
1227 });
1228 let (worktree_a, _) = project_a
1229 .update(cx_a, |p, cx| {
1230 p.find_or_create_local_worktree("/a", true, cx)
1231 })
1232 .await
1233 .unwrap();
1234 worktree_a
1235 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1236 .await;
1237 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1238 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1239 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1240 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1241
1242 // Join that project as client B
1243 let project_b = Project::remote(
1244 project_id,
1245 client_b.clone(),
1246 client_b.user_store.clone(),
1247 lang_registry.clone(),
1248 fs.clone(),
1249 &mut cx_b.to_async(),
1250 )
1251 .await
1252 .unwrap();
1253 project_b
1254 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1255 .await
1256 .unwrap();
1257
1258 // Unshare the project as client A
1259 project_a
1260 .update(cx_a, |project, cx| project.unshare(cx))
1261 .await
1262 .unwrap();
1263 project_b
1264 .condition(cx_b, |project, _| project.is_read_only())
1265 .await;
1266 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1267 cx_b.update(|_| {
1268 drop(project_b);
1269 });
1270
1271 // Share the project again and ensure guests can still join.
1272 project_a
1273 .update(cx_a, |project, cx| project.share(cx))
1274 .await
1275 .unwrap();
1276 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1277
1278 let project_b2 = Project::remote(
1279 project_id,
1280 client_b.clone(),
1281 client_b.user_store.clone(),
1282 lang_registry.clone(),
1283 fs.clone(),
1284 &mut cx_b.to_async(),
1285 )
1286 .await
1287 .unwrap();
1288 project_b2
1289 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1290 .await
1291 .unwrap();
1292 }
1293
1294 #[gpui::test(iterations = 10)]
1295 async fn test_propagate_saves_and_fs_changes(
1296 cx_a: &mut TestAppContext,
1297 cx_b: &mut TestAppContext,
1298 cx_c: &mut TestAppContext,
1299 ) {
1300 let lang_registry = Arc::new(LanguageRegistry::new());
1301 let fs = FakeFs::new(cx_a.background());
1302 cx_a.foreground().forbid_parking();
1303
1304 // Connect to a server as 3 clients.
1305 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1306 let client_a = server.create_client(cx_a, "user_a").await;
1307 let client_b = server.create_client(cx_b, "user_b").await;
1308 let client_c = server.create_client(cx_c, "user_c").await;
1309
1310 // Share a worktree as client A.
1311 fs.insert_tree(
1312 "/a",
1313 json!({
1314 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1315 "file1": "",
1316 "file2": ""
1317 }),
1318 )
1319 .await;
1320 let project_a = cx_a.update(|cx| {
1321 Project::local(
1322 client_a.clone(),
1323 client_a.user_store.clone(),
1324 lang_registry.clone(),
1325 fs.clone(),
1326 cx,
1327 )
1328 });
1329 let (worktree_a, _) = project_a
1330 .update(cx_a, |p, cx| {
1331 p.find_or_create_local_worktree("/a", true, cx)
1332 })
1333 .await
1334 .unwrap();
1335 worktree_a
1336 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1337 .await;
1338 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1339 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1340 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1341
1342 // Join that worktree as clients B and C.
1343 let project_b = Project::remote(
1344 project_id,
1345 client_b.clone(),
1346 client_b.user_store.clone(),
1347 lang_registry.clone(),
1348 fs.clone(),
1349 &mut cx_b.to_async(),
1350 )
1351 .await
1352 .unwrap();
1353 let project_c = Project::remote(
1354 project_id,
1355 client_c.clone(),
1356 client_c.user_store.clone(),
1357 lang_registry.clone(),
1358 fs.clone(),
1359 &mut cx_c.to_async(),
1360 )
1361 .await
1362 .unwrap();
1363 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1364 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1365
1366 // Open and edit a buffer as both guests B and C.
1367 let buffer_b = project_b
1368 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1369 .await
1370 .unwrap();
1371 let buffer_c = project_c
1372 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1373 .await
1374 .unwrap();
1375 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1376 buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1377
1378 // Open and edit that buffer as the host.
1379 let buffer_a = project_a
1380 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1381 .await
1382 .unwrap();
1383
1384 buffer_a
1385 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1386 .await;
1387 buffer_a.update(cx_a, |buf, cx| {
1388 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1389 });
1390
1391 // Wait for edits to propagate
1392 buffer_a
1393 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1394 .await;
1395 buffer_b
1396 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1397 .await;
1398 buffer_c
1399 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1400 .await;
1401
1402 // Edit the buffer as the host and concurrently save as guest B.
1403 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1404 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1405 save_b.await.unwrap();
1406 assert_eq!(
1407 fs.load("/a/file1".as_ref()).await.unwrap(),
1408 "hi-a, i-am-c, i-am-b, i-am-a"
1409 );
1410 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1411 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1412 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1413
1414 // Make changes on host's file system, see those changes on guest worktrees.
1415 fs.rename(
1416 "/a/file1".as_ref(),
1417 "/a/file1-renamed".as_ref(),
1418 Default::default(),
1419 )
1420 .await
1421 .unwrap();
1422
1423 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1424 .await
1425 .unwrap();
1426 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1427
1428 worktree_a
1429 .condition(&cx_a, |tree, _| {
1430 tree.paths()
1431 .map(|p| p.to_string_lossy())
1432 .collect::<Vec<_>>()
1433 == [".zed.toml", "file1-renamed", "file3", "file4"]
1434 })
1435 .await;
1436 worktree_b
1437 .condition(&cx_b, |tree, _| {
1438 tree.paths()
1439 .map(|p| p.to_string_lossy())
1440 .collect::<Vec<_>>()
1441 == [".zed.toml", "file1-renamed", "file3", "file4"]
1442 })
1443 .await;
1444 worktree_c
1445 .condition(&cx_c, |tree, _| {
1446 tree.paths()
1447 .map(|p| p.to_string_lossy())
1448 .collect::<Vec<_>>()
1449 == [".zed.toml", "file1-renamed", "file3", "file4"]
1450 })
1451 .await;
1452
1453 // Ensure buffer files are updated as well.
1454 buffer_a
1455 .condition(&cx_a, |buf, _| {
1456 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1457 })
1458 .await;
1459 buffer_b
1460 .condition(&cx_b, |buf, _| {
1461 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1462 })
1463 .await;
1464 buffer_c
1465 .condition(&cx_c, |buf, _| {
1466 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1467 })
1468 .await;
1469 }
1470
1471 #[gpui::test(iterations = 10)]
1472 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1473 cx_a.foreground().forbid_parking();
1474 let lang_registry = Arc::new(LanguageRegistry::new());
1475 let fs = FakeFs::new(cx_a.background());
1476
1477 // Connect to a server as 2 clients.
1478 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1479 let client_a = server.create_client(cx_a, "user_a").await;
1480 let client_b = server.create_client(cx_b, "user_b").await;
1481
1482 // Share a project as client A
1483 fs.insert_tree(
1484 "/dir",
1485 json!({
1486 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1487 "a.txt": "a-contents",
1488 }),
1489 )
1490 .await;
1491
1492 let project_a = cx_a.update(|cx| {
1493 Project::local(
1494 client_a.clone(),
1495 client_a.user_store.clone(),
1496 lang_registry.clone(),
1497 fs.clone(),
1498 cx,
1499 )
1500 });
1501 let (worktree_a, _) = project_a
1502 .update(cx_a, |p, cx| {
1503 p.find_or_create_local_worktree("/dir", true, cx)
1504 })
1505 .await
1506 .unwrap();
1507 worktree_a
1508 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1509 .await;
1510 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1511 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1512 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1513
1514 // Join that project as client B
1515 let project_b = Project::remote(
1516 project_id,
1517 client_b.clone(),
1518 client_b.user_store.clone(),
1519 lang_registry.clone(),
1520 fs.clone(),
1521 &mut cx_b.to_async(),
1522 )
1523 .await
1524 .unwrap();
1525
1526 // Open a buffer as client B
1527 let buffer_b = project_b
1528 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1529 .await
1530 .unwrap();
1531
1532 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1533 buffer_b.read_with(cx_b, |buf, _| {
1534 assert!(buf.is_dirty());
1535 assert!(!buf.has_conflict());
1536 });
1537
1538 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1539 buffer_b
1540 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1541 .await;
1542 buffer_b.read_with(cx_b, |buf, _| {
1543 assert!(!buf.has_conflict());
1544 });
1545
1546 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1547 buffer_b.read_with(cx_b, |buf, _| {
1548 assert!(buf.is_dirty());
1549 assert!(!buf.has_conflict());
1550 });
1551 }
1552
1553 #[gpui::test(iterations = 10)]
1554 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1555 cx_a.foreground().forbid_parking();
1556 let lang_registry = Arc::new(LanguageRegistry::new());
1557 let fs = FakeFs::new(cx_a.background());
1558
1559 // Connect to a server as 2 clients.
1560 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1561 let client_a = server.create_client(cx_a, "user_a").await;
1562 let client_b = server.create_client(cx_b, "user_b").await;
1563
1564 // Share a project as client A
1565 fs.insert_tree(
1566 "/dir",
1567 json!({
1568 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1569 "a.txt": "a-contents",
1570 }),
1571 )
1572 .await;
1573
1574 let project_a = cx_a.update(|cx| {
1575 Project::local(
1576 client_a.clone(),
1577 client_a.user_store.clone(),
1578 lang_registry.clone(),
1579 fs.clone(),
1580 cx,
1581 )
1582 });
1583 let (worktree_a, _) = project_a
1584 .update(cx_a, |p, cx| {
1585 p.find_or_create_local_worktree("/dir", true, cx)
1586 })
1587 .await
1588 .unwrap();
1589 worktree_a
1590 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1591 .await;
1592 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1593 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1594 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1595
1596 // Join that project as client B
1597 let project_b = Project::remote(
1598 project_id,
1599 client_b.clone(),
1600 client_b.user_store.clone(),
1601 lang_registry.clone(),
1602 fs.clone(),
1603 &mut cx_b.to_async(),
1604 )
1605 .await
1606 .unwrap();
1607 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1608
1609 // Open a buffer as client B
1610 let buffer_b = project_b
1611 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1612 .await
1613 .unwrap();
1614 buffer_b.read_with(cx_b, |buf, _| {
1615 assert!(!buf.is_dirty());
1616 assert!(!buf.has_conflict());
1617 });
1618
1619 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1620 .await
1621 .unwrap();
1622 buffer_b
1623 .condition(&cx_b, |buf, _| {
1624 buf.text() == "new contents" && !buf.is_dirty()
1625 })
1626 .await;
1627 buffer_b.read_with(cx_b, |buf, _| {
1628 assert!(!buf.has_conflict());
1629 });
1630 }
1631
1632 #[gpui::test(iterations = 10)]
1633 async fn test_editing_while_guest_opens_buffer(
1634 cx_a: &mut TestAppContext,
1635 cx_b: &mut TestAppContext,
1636 ) {
1637 cx_a.foreground().forbid_parking();
1638 let lang_registry = Arc::new(LanguageRegistry::new());
1639 let fs = FakeFs::new(cx_a.background());
1640
1641 // Connect to a server as 2 clients.
1642 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1643 let client_a = server.create_client(cx_a, "user_a").await;
1644 let client_b = server.create_client(cx_b, "user_b").await;
1645
1646 // Share a project as client A
1647 fs.insert_tree(
1648 "/dir",
1649 json!({
1650 ".zed.toml": r#"collaborators = ["user_b"]"#,
1651 "a.txt": "a-contents",
1652 }),
1653 )
1654 .await;
1655 let project_a = cx_a.update(|cx| {
1656 Project::local(
1657 client_a.clone(),
1658 client_a.user_store.clone(),
1659 lang_registry.clone(),
1660 fs.clone(),
1661 cx,
1662 )
1663 });
1664 let (worktree_a, _) = project_a
1665 .update(cx_a, |p, cx| {
1666 p.find_or_create_local_worktree("/dir", true, cx)
1667 })
1668 .await
1669 .unwrap();
1670 worktree_a
1671 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1672 .await;
1673 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1674 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1675 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1676
1677 // Join that project as client B
1678 let project_b = Project::remote(
1679 project_id,
1680 client_b.clone(),
1681 client_b.user_store.clone(),
1682 lang_registry.clone(),
1683 fs.clone(),
1684 &mut cx_b.to_async(),
1685 )
1686 .await
1687 .unwrap();
1688
1689 // Open a buffer as client A
1690 let buffer_a = project_a
1691 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1692 .await
1693 .unwrap();
1694
1695 // Start opening the same buffer as client B
1696 let buffer_b = cx_b
1697 .background()
1698 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1699
1700 // Edit the buffer as client A while client B is still opening it.
1701 cx_b.background().simulate_random_delay().await;
1702 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1703 cx_b.background().simulate_random_delay().await;
1704 buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1705
1706 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1707 let buffer_b = buffer_b.await.unwrap();
1708 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1709 }
1710
1711 #[gpui::test(iterations = 10)]
1712 async fn test_leaving_worktree_while_opening_buffer(
1713 cx_a: &mut TestAppContext,
1714 cx_b: &mut TestAppContext,
1715 ) {
1716 cx_a.foreground().forbid_parking();
1717 let lang_registry = Arc::new(LanguageRegistry::new());
1718 let fs = FakeFs::new(cx_a.background());
1719
1720 // Connect to a server as 2 clients.
1721 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1722 let client_a = server.create_client(cx_a, "user_a").await;
1723 let client_b = server.create_client(cx_b, "user_b").await;
1724
1725 // Share a project as client A
1726 fs.insert_tree(
1727 "/dir",
1728 json!({
1729 ".zed.toml": r#"collaborators = ["user_b"]"#,
1730 "a.txt": "a-contents",
1731 }),
1732 )
1733 .await;
1734 let project_a = cx_a.update(|cx| {
1735 Project::local(
1736 client_a.clone(),
1737 client_a.user_store.clone(),
1738 lang_registry.clone(),
1739 fs.clone(),
1740 cx,
1741 )
1742 });
1743 let (worktree_a, _) = project_a
1744 .update(cx_a, |p, cx| {
1745 p.find_or_create_local_worktree("/dir", true, cx)
1746 })
1747 .await
1748 .unwrap();
1749 worktree_a
1750 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1751 .await;
1752 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1753 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1754 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1755
1756 // Join that project as client B
1757 let project_b = Project::remote(
1758 project_id,
1759 client_b.clone(),
1760 client_b.user_store.clone(),
1761 lang_registry.clone(),
1762 fs.clone(),
1763 &mut cx_b.to_async(),
1764 )
1765 .await
1766 .unwrap();
1767
1768 // See that a guest has joined as client A.
1769 project_a
1770 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1771 .await;
1772
1773 // Begin opening a buffer as client B, but leave the project before the open completes.
1774 let buffer_b = cx_b
1775 .background()
1776 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1777 cx_b.update(|_| drop(project_b));
1778 drop(buffer_b);
1779
1780 // See that the guest has left.
1781 project_a
1782 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1783 .await;
1784 }
1785
1786 #[gpui::test(iterations = 10)]
1787 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1788 cx_a.foreground().forbid_parking();
1789 let lang_registry = Arc::new(LanguageRegistry::new());
1790 let fs = FakeFs::new(cx_a.background());
1791
1792 // Connect to a server as 2 clients.
1793 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1794 let client_a = server.create_client(cx_a, "user_a").await;
1795 let client_b = server.create_client(cx_b, "user_b").await;
1796
1797 // Share a project as client A
1798 fs.insert_tree(
1799 "/a",
1800 json!({
1801 ".zed.toml": r#"collaborators = ["user_b"]"#,
1802 "a.txt": "a-contents",
1803 "b.txt": "b-contents",
1804 }),
1805 )
1806 .await;
1807 let project_a = cx_a.update(|cx| {
1808 Project::local(
1809 client_a.clone(),
1810 client_a.user_store.clone(),
1811 lang_registry.clone(),
1812 fs.clone(),
1813 cx,
1814 )
1815 });
1816 let (worktree_a, _) = project_a
1817 .update(cx_a, |p, cx| {
1818 p.find_or_create_local_worktree("/a", true, cx)
1819 })
1820 .await
1821 .unwrap();
1822 worktree_a
1823 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1824 .await;
1825 let project_id = project_a
1826 .update(cx_a, |project, _| project.next_remote_id())
1827 .await;
1828 project_a
1829 .update(cx_a, |project, cx| project.share(cx))
1830 .await
1831 .unwrap();
1832
1833 // Join that project as client B
1834 let _project_b = Project::remote(
1835 project_id,
1836 client_b.clone(),
1837 client_b.user_store.clone(),
1838 lang_registry.clone(),
1839 fs.clone(),
1840 &mut cx_b.to_async(),
1841 )
1842 .await
1843 .unwrap();
1844
1845 // Client A sees that a guest has joined.
1846 project_a
1847 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1848 .await;
1849
1850 // Drop client B's connection and ensure client A observes client B leaving the project.
1851 client_b.disconnect(&cx_b.to_async()).unwrap();
1852 project_a
1853 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1854 .await;
1855
1856 // Rejoin the project as client B
1857 let _project_b = Project::remote(
1858 project_id,
1859 client_b.clone(),
1860 client_b.user_store.clone(),
1861 lang_registry.clone(),
1862 fs.clone(),
1863 &mut cx_b.to_async(),
1864 )
1865 .await
1866 .unwrap();
1867
1868 // Client A sees that a guest has re-joined.
1869 project_a
1870 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1871 .await;
1872
1873 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1874 server.disconnect_client(client_b.current_user_id(cx_b));
1875 cx_a.foreground().advance_clock(Duration::from_secs(3));
1876 project_a
1877 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1878 .await;
1879 }
1880
1881 #[gpui::test(iterations = 10)]
1882 async fn test_collaborating_with_diagnostics(
1883 cx_a: &mut TestAppContext,
1884 cx_b: &mut TestAppContext,
1885 ) {
1886 cx_a.foreground().forbid_parking();
1887 let mut lang_registry = Arc::new(LanguageRegistry::new());
1888 let fs = FakeFs::new(cx_a.background());
1889
1890 // Set up a fake language server.
1891 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1892 Arc::get_mut(&mut lang_registry)
1893 .unwrap()
1894 .add(Arc::new(Language::new(
1895 LanguageConfig {
1896 name: "Rust".into(),
1897 path_suffixes: vec!["rs".to_string()],
1898 language_server: Some(language_server_config),
1899 ..Default::default()
1900 },
1901 Some(tree_sitter_rust::language()),
1902 )));
1903
1904 // Connect to a server as 2 clients.
1905 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1906 let client_a = server.create_client(cx_a, "user_a").await;
1907 let client_b = server.create_client(cx_b, "user_b").await;
1908
1909 // Share a project as client A
1910 fs.insert_tree(
1911 "/a",
1912 json!({
1913 ".zed.toml": r#"collaborators = ["user_b"]"#,
1914 "a.rs": "let one = two",
1915 "other.rs": "",
1916 }),
1917 )
1918 .await;
1919 let project_a = cx_a.update(|cx| {
1920 Project::local(
1921 client_a.clone(),
1922 client_a.user_store.clone(),
1923 lang_registry.clone(),
1924 fs.clone(),
1925 cx,
1926 )
1927 });
1928 let (worktree_a, _) = project_a
1929 .update(cx_a, |p, cx| {
1930 p.find_or_create_local_worktree("/a", true, cx)
1931 })
1932 .await
1933 .unwrap();
1934 worktree_a
1935 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1936 .await;
1937 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1938 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1939 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1940
1941 // Cause the language server to start.
1942 let _ = cx_a
1943 .background()
1944 .spawn(project_a.update(cx_a, |project, cx| {
1945 project.open_buffer(
1946 ProjectPath {
1947 worktree_id,
1948 path: Path::new("other.rs").into(),
1949 },
1950 cx,
1951 )
1952 }))
1953 .await
1954 .unwrap();
1955
1956 // Simulate a language server reporting errors for a file.
1957 let mut fake_language_server = fake_language_servers.next().await.unwrap();
1958 fake_language_server
1959 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1960 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1961 version: None,
1962 diagnostics: vec![lsp::Diagnostic {
1963 severity: Some(lsp::DiagnosticSeverity::ERROR),
1964 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1965 message: "message 1".to_string(),
1966 ..Default::default()
1967 }],
1968 })
1969 .await;
1970
1971 // Wait for server to see the diagnostics update.
1972 server
1973 .condition(|store| {
1974 let worktree = store
1975 .project(project_id)
1976 .unwrap()
1977 .share
1978 .as_ref()
1979 .unwrap()
1980 .worktrees
1981 .get(&worktree_id.to_proto())
1982 .unwrap();
1983
1984 !worktree.diagnostic_summaries.is_empty()
1985 })
1986 .await;
1987
1988 // Join the worktree as client B.
1989 let project_b = Project::remote(
1990 project_id,
1991 client_b.clone(),
1992 client_b.user_store.clone(),
1993 lang_registry.clone(),
1994 fs.clone(),
1995 &mut cx_b.to_async(),
1996 )
1997 .await
1998 .unwrap();
1999
2000 project_b.read_with(cx_b, |project, cx| {
2001 assert_eq!(
2002 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2003 &[(
2004 ProjectPath {
2005 worktree_id,
2006 path: Arc::from(Path::new("a.rs")),
2007 },
2008 DiagnosticSummary {
2009 error_count: 1,
2010 warning_count: 0,
2011 ..Default::default()
2012 },
2013 )]
2014 )
2015 });
2016
2017 // Simulate a language server reporting more errors for a file.
2018 fake_language_server
2019 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2020 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2021 version: None,
2022 diagnostics: vec![
2023 lsp::Diagnostic {
2024 severity: Some(lsp::DiagnosticSeverity::ERROR),
2025 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2026 message: "message 1".to_string(),
2027 ..Default::default()
2028 },
2029 lsp::Diagnostic {
2030 severity: Some(lsp::DiagnosticSeverity::WARNING),
2031 range: lsp::Range::new(
2032 lsp::Position::new(0, 10),
2033 lsp::Position::new(0, 13),
2034 ),
2035 message: "message 2".to_string(),
2036 ..Default::default()
2037 },
2038 ],
2039 })
2040 .await;
2041
2042 // Client b gets the updated summaries
2043 project_b
2044 .condition(&cx_b, |project, cx| {
2045 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2046 == &[(
2047 ProjectPath {
2048 worktree_id,
2049 path: Arc::from(Path::new("a.rs")),
2050 },
2051 DiagnosticSummary {
2052 error_count: 1,
2053 warning_count: 1,
2054 ..Default::default()
2055 },
2056 )]
2057 })
2058 .await;
2059
2060 // Open the file with the errors on client B. They should be present.
2061 let buffer_b = cx_b
2062 .background()
2063 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2064 .await
2065 .unwrap();
2066
2067 buffer_b.read_with(cx_b, |buffer, _| {
2068 assert_eq!(
2069 buffer
2070 .snapshot()
2071 .diagnostics_in_range::<_, Point>(0..buffer.len())
2072 .map(|entry| entry)
2073 .collect::<Vec<_>>(),
2074 &[
2075 DiagnosticEntry {
2076 range: Point::new(0, 4)..Point::new(0, 7),
2077 diagnostic: Diagnostic {
2078 group_id: 0,
2079 message: "message 1".to_string(),
2080 severity: lsp::DiagnosticSeverity::ERROR,
2081 is_primary: true,
2082 ..Default::default()
2083 }
2084 },
2085 DiagnosticEntry {
2086 range: Point::new(0, 10)..Point::new(0, 13),
2087 diagnostic: Diagnostic {
2088 group_id: 1,
2089 severity: lsp::DiagnosticSeverity::WARNING,
2090 message: "message 2".to_string(),
2091 is_primary: true,
2092 ..Default::default()
2093 }
2094 }
2095 ]
2096 );
2097 });
2098 }
2099
2100 #[gpui::test(iterations = 10)]
2101 async fn test_collaborating_with_completion(
2102 cx_a: &mut TestAppContext,
2103 cx_b: &mut TestAppContext,
2104 ) {
2105 cx_a.foreground().forbid_parking();
2106 let mut lang_registry = Arc::new(LanguageRegistry::new());
2107 let fs = FakeFs::new(cx_a.background());
2108
2109 // Set up a fake language server.
2110 let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2111 language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2112 completion_provider: Some(lsp::CompletionOptions {
2113 trigger_characters: Some(vec![".".to_string()]),
2114 ..Default::default()
2115 }),
2116 ..Default::default()
2117 });
2118 Arc::get_mut(&mut lang_registry)
2119 .unwrap()
2120 .add(Arc::new(Language::new(
2121 LanguageConfig {
2122 name: "Rust".into(),
2123 path_suffixes: vec!["rs".to_string()],
2124 language_server: Some(language_server_config),
2125 ..Default::default()
2126 },
2127 Some(tree_sitter_rust::language()),
2128 )));
2129
2130 // Connect to a server as 2 clients.
2131 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2132 let client_a = server.create_client(cx_a, "user_a").await;
2133 let client_b = server.create_client(cx_b, "user_b").await;
2134
2135 // Share a project as client A
2136 fs.insert_tree(
2137 "/a",
2138 json!({
2139 ".zed.toml": r#"collaborators = ["user_b"]"#,
2140 "main.rs": "fn main() { a }",
2141 "other.rs": "",
2142 }),
2143 )
2144 .await;
2145 let project_a = cx_a.update(|cx| {
2146 Project::local(
2147 client_a.clone(),
2148 client_a.user_store.clone(),
2149 lang_registry.clone(),
2150 fs.clone(),
2151 cx,
2152 )
2153 });
2154 let (worktree_a, _) = project_a
2155 .update(cx_a, |p, cx| {
2156 p.find_or_create_local_worktree("/a", true, cx)
2157 })
2158 .await
2159 .unwrap();
2160 worktree_a
2161 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2162 .await;
2163 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2164 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2165 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2166
2167 // Join the worktree as client B.
2168 let project_b = Project::remote(
2169 project_id,
2170 client_b.clone(),
2171 client_b.user_store.clone(),
2172 lang_registry.clone(),
2173 fs.clone(),
2174 &mut cx_b.to_async(),
2175 )
2176 .await
2177 .unwrap();
2178
2179 // Open a file in an editor as the guest.
2180 let buffer_b = project_b
2181 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2182 .await
2183 .unwrap();
2184 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2185 let editor_b = cx_b.add_view(window_b, |cx| {
2186 Editor::for_buffer(
2187 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2188 Some(project_b.clone()),
2189 watch::channel_with(Settings::test(cx)).1,
2190 cx,
2191 )
2192 });
2193
2194 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2195 buffer_b
2196 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2197 .await;
2198
2199 // Type a completion trigger character as the guest.
2200 editor_b.update(cx_b, |editor, cx| {
2201 editor.select_ranges([13..13], None, cx);
2202 editor.handle_input(&Input(".".into()), cx);
2203 cx.focus(&editor_b);
2204 });
2205
2206 // Receive a completion request as the host's language server.
2207 // Return some completions from the host's language server.
2208 cx_a.foreground().start_waiting();
2209 fake_language_server
2210 .handle_request::<lsp::request::Completion, _>(|params, _| {
2211 assert_eq!(
2212 params.text_document_position.text_document.uri,
2213 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2214 );
2215 assert_eq!(
2216 params.text_document_position.position,
2217 lsp::Position::new(0, 14),
2218 );
2219
2220 Some(lsp::CompletionResponse::Array(vec![
2221 lsp::CompletionItem {
2222 label: "first_method(…)".into(),
2223 detail: Some("fn(&mut self, B) -> C".into()),
2224 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2225 new_text: "first_method($1)".to_string(),
2226 range: lsp::Range::new(
2227 lsp::Position::new(0, 14),
2228 lsp::Position::new(0, 14),
2229 ),
2230 })),
2231 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2232 ..Default::default()
2233 },
2234 lsp::CompletionItem {
2235 label: "second_method(…)".into(),
2236 detail: Some("fn(&mut self, C) -> D<E>".into()),
2237 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2238 new_text: "second_method()".to_string(),
2239 range: lsp::Range::new(
2240 lsp::Position::new(0, 14),
2241 lsp::Position::new(0, 14),
2242 ),
2243 })),
2244 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2245 ..Default::default()
2246 },
2247 ]))
2248 })
2249 .next()
2250 .await
2251 .unwrap();
2252 cx_a.foreground().finish_waiting();
2253
2254 // Open the buffer on the host.
2255 let buffer_a = project_a
2256 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2257 .await
2258 .unwrap();
2259 buffer_a
2260 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2261 .await;
2262
2263 // Confirm a completion on the guest.
2264 editor_b
2265 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2266 .await;
2267 editor_b.update(cx_b, |editor, cx| {
2268 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2269 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2270 });
2271
2272 // Return a resolved completion from the host's language server.
2273 // The resolved completion has an additional text edit.
2274 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2275 |params, _| {
2276 assert_eq!(params.label, "first_method(…)");
2277 lsp::CompletionItem {
2278 label: "first_method(…)".into(),
2279 detail: Some("fn(&mut self, B) -> C".into()),
2280 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2281 new_text: "first_method($1)".to_string(),
2282 range: lsp::Range::new(
2283 lsp::Position::new(0, 14),
2284 lsp::Position::new(0, 14),
2285 ),
2286 })),
2287 additional_text_edits: Some(vec![lsp::TextEdit {
2288 new_text: "use d::SomeTrait;\n".to_string(),
2289 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2290 }]),
2291 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2292 ..Default::default()
2293 }
2294 },
2295 );
2296
2297 // The additional edit is applied.
2298 buffer_a
2299 .condition(&cx_a, |buffer, _| {
2300 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2301 })
2302 .await;
2303 buffer_b
2304 .condition(&cx_b, |buffer, _| {
2305 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2306 })
2307 .await;
2308 }
2309
2310 #[gpui::test(iterations = 10)]
2311 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2312 cx_a.foreground().forbid_parking();
2313 let mut lang_registry = Arc::new(LanguageRegistry::new());
2314 let fs = FakeFs::new(cx_a.background());
2315
2316 // Set up a fake language server.
2317 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2318 Arc::get_mut(&mut lang_registry)
2319 .unwrap()
2320 .add(Arc::new(Language::new(
2321 LanguageConfig {
2322 name: "Rust".into(),
2323 path_suffixes: vec!["rs".to_string()],
2324 language_server: Some(language_server_config),
2325 ..Default::default()
2326 },
2327 Some(tree_sitter_rust::language()),
2328 )));
2329
2330 // Connect to a server as 2 clients.
2331 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2332 let client_a = server.create_client(cx_a, "user_a").await;
2333 let client_b = server.create_client(cx_b, "user_b").await;
2334
2335 // Share a project as client A
2336 fs.insert_tree(
2337 "/a",
2338 json!({
2339 ".zed.toml": r#"collaborators = ["user_b"]"#,
2340 "a.rs": "let one = two",
2341 }),
2342 )
2343 .await;
2344 let project_a = cx_a.update(|cx| {
2345 Project::local(
2346 client_a.clone(),
2347 client_a.user_store.clone(),
2348 lang_registry.clone(),
2349 fs.clone(),
2350 cx,
2351 )
2352 });
2353 let (worktree_a, _) = project_a
2354 .update(cx_a, |p, cx| {
2355 p.find_or_create_local_worktree("/a", true, cx)
2356 })
2357 .await
2358 .unwrap();
2359 worktree_a
2360 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2361 .await;
2362 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2363 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2364 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2365
2366 // Join the worktree as client B.
2367 let project_b = Project::remote(
2368 project_id,
2369 client_b.clone(),
2370 client_b.user_store.clone(),
2371 lang_registry.clone(),
2372 fs.clone(),
2373 &mut cx_b.to_async(),
2374 )
2375 .await
2376 .unwrap();
2377
2378 let buffer_b = cx_b
2379 .background()
2380 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2381 .await
2382 .unwrap();
2383
2384 let format = project_b.update(cx_b, |project, cx| {
2385 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2386 });
2387
2388 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2389 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2390 Some(vec![
2391 lsp::TextEdit {
2392 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2393 new_text: "h".to_string(),
2394 },
2395 lsp::TextEdit {
2396 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2397 new_text: "y".to_string(),
2398 },
2399 ])
2400 });
2401
2402 format.await.unwrap();
2403 assert_eq!(
2404 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2405 "let honey = two"
2406 );
2407 }
2408
2409 #[gpui::test(iterations = 10)]
2410 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2411 cx_a.foreground().forbid_parking();
2412 let mut lang_registry = Arc::new(LanguageRegistry::new());
2413 let fs = FakeFs::new(cx_a.background());
2414 fs.insert_tree(
2415 "/root-1",
2416 json!({
2417 ".zed.toml": r#"collaborators = ["user_b"]"#,
2418 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2419 }),
2420 )
2421 .await;
2422 fs.insert_tree(
2423 "/root-2",
2424 json!({
2425 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2426 }),
2427 )
2428 .await;
2429
2430 // Set up a fake language server.
2431 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2432 Arc::get_mut(&mut lang_registry)
2433 .unwrap()
2434 .add(Arc::new(Language::new(
2435 LanguageConfig {
2436 name: "Rust".into(),
2437 path_suffixes: vec!["rs".to_string()],
2438 language_server: Some(language_server_config),
2439 ..Default::default()
2440 },
2441 Some(tree_sitter_rust::language()),
2442 )));
2443
2444 // Connect to a server as 2 clients.
2445 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2446 let client_a = server.create_client(cx_a, "user_a").await;
2447 let client_b = server.create_client(cx_b, "user_b").await;
2448
2449 // Share a project as client A
2450 let project_a = cx_a.update(|cx| {
2451 Project::local(
2452 client_a.clone(),
2453 client_a.user_store.clone(),
2454 lang_registry.clone(),
2455 fs.clone(),
2456 cx,
2457 )
2458 });
2459 let (worktree_a, _) = project_a
2460 .update(cx_a, |p, cx| {
2461 p.find_or_create_local_worktree("/root-1", true, cx)
2462 })
2463 .await
2464 .unwrap();
2465 worktree_a
2466 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2467 .await;
2468 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2469 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2470 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2471
2472 // Join the worktree as client B.
2473 let project_b = Project::remote(
2474 project_id,
2475 client_b.clone(),
2476 client_b.user_store.clone(),
2477 lang_registry.clone(),
2478 fs.clone(),
2479 &mut cx_b.to_async(),
2480 )
2481 .await
2482 .unwrap();
2483
2484 // Open the file on client B.
2485 let buffer_b = cx_b
2486 .background()
2487 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2488 .await
2489 .unwrap();
2490
2491 // Request the definition of a symbol as the guest.
2492 let definitions_1 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2493
2494 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2495 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2496 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2497 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2498 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2499 )))
2500 });
2501
2502 let definitions_1 = definitions_1.await.unwrap();
2503 cx_b.read(|cx| {
2504 assert_eq!(definitions_1.len(), 1);
2505 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2506 let target_buffer = definitions_1[0].buffer.read(cx);
2507 assert_eq!(
2508 target_buffer.text(),
2509 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2510 );
2511 assert_eq!(
2512 definitions_1[0].range.to_point(target_buffer),
2513 Point::new(0, 6)..Point::new(0, 9)
2514 );
2515 });
2516
2517 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2518 // the previous call to `definition`.
2519 let definitions_2 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2520 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2521 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2522 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2523 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2524 )))
2525 });
2526
2527 let definitions_2 = definitions_2.await.unwrap();
2528 cx_b.read(|cx| {
2529 assert_eq!(definitions_2.len(), 1);
2530 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2531 let target_buffer = definitions_2[0].buffer.read(cx);
2532 assert_eq!(
2533 target_buffer.text(),
2534 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2535 );
2536 assert_eq!(
2537 definitions_2[0].range.to_point(target_buffer),
2538 Point::new(1, 6)..Point::new(1, 11)
2539 );
2540 });
2541 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2542 }
2543
2544 #[gpui::test(iterations = 10)]
2545 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2546 cx_a.foreground().forbid_parking();
2547 let mut lang_registry = Arc::new(LanguageRegistry::new());
2548 let fs = FakeFs::new(cx_a.background());
2549 fs.insert_tree(
2550 "/root-1",
2551 json!({
2552 ".zed.toml": r#"collaborators = ["user_b"]"#,
2553 "one.rs": "const ONE: usize = 1;",
2554 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2555 }),
2556 )
2557 .await;
2558 fs.insert_tree(
2559 "/root-2",
2560 json!({
2561 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2562 }),
2563 )
2564 .await;
2565
2566 // Set up a fake language server.
2567 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2568 Arc::get_mut(&mut lang_registry)
2569 .unwrap()
2570 .add(Arc::new(Language::new(
2571 LanguageConfig {
2572 name: "Rust".into(),
2573 path_suffixes: vec!["rs".to_string()],
2574 language_server: Some(language_server_config),
2575 ..Default::default()
2576 },
2577 Some(tree_sitter_rust::language()),
2578 )));
2579
2580 // Connect to a server as 2 clients.
2581 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2582 let client_a = server.create_client(cx_a, "user_a").await;
2583 let client_b = server.create_client(cx_b, "user_b").await;
2584
2585 // Share a project as client A
2586 let project_a = cx_a.update(|cx| {
2587 Project::local(
2588 client_a.clone(),
2589 client_a.user_store.clone(),
2590 lang_registry.clone(),
2591 fs.clone(),
2592 cx,
2593 )
2594 });
2595 let (worktree_a, _) = project_a
2596 .update(cx_a, |p, cx| {
2597 p.find_or_create_local_worktree("/root-1", true, cx)
2598 })
2599 .await
2600 .unwrap();
2601 worktree_a
2602 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2603 .await;
2604 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2605 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2606 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2607
2608 // Join the worktree as client B.
2609 let project_b = Project::remote(
2610 project_id,
2611 client_b.clone(),
2612 client_b.user_store.clone(),
2613 lang_registry.clone(),
2614 fs.clone(),
2615 &mut cx_b.to_async(),
2616 )
2617 .await
2618 .unwrap();
2619
2620 // Open the file on client B.
2621 let buffer_b = cx_b
2622 .background()
2623 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2624 .await
2625 .unwrap();
2626
2627 // Request references to a symbol as the guest.
2628 let references = project_b.update(cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2629
2630 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2631 fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2632 assert_eq!(
2633 params.text_document_position.text_document.uri.as_str(),
2634 "file:///root-1/one.rs"
2635 );
2636 Some(vec![
2637 lsp::Location {
2638 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2639 range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2640 },
2641 lsp::Location {
2642 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2643 range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2644 },
2645 lsp::Location {
2646 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2647 range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2648 },
2649 ])
2650 });
2651
2652 let references = references.await.unwrap();
2653 cx_b.read(|cx| {
2654 assert_eq!(references.len(), 3);
2655 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2656
2657 let two_buffer = references[0].buffer.read(cx);
2658 let three_buffer = references[2].buffer.read(cx);
2659 assert_eq!(
2660 two_buffer.file().unwrap().path().as_ref(),
2661 Path::new("two.rs")
2662 );
2663 assert_eq!(references[1].buffer, references[0].buffer);
2664 assert_eq!(
2665 three_buffer.file().unwrap().full_path(cx),
2666 Path::new("three.rs")
2667 );
2668
2669 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2670 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2671 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2672 });
2673 }
2674
2675 #[gpui::test(iterations = 10)]
2676 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2677 cx_a.foreground().forbid_parking();
2678 let lang_registry = Arc::new(LanguageRegistry::new());
2679 let fs = FakeFs::new(cx_a.background());
2680 fs.insert_tree(
2681 "/root-1",
2682 json!({
2683 ".zed.toml": r#"collaborators = ["user_b"]"#,
2684 "a": "hello world",
2685 "b": "goodnight moon",
2686 "c": "a world of goo",
2687 "d": "world champion of clown world",
2688 }),
2689 )
2690 .await;
2691 fs.insert_tree(
2692 "/root-2",
2693 json!({
2694 "e": "disney world is fun",
2695 }),
2696 )
2697 .await;
2698
2699 // Connect to a server as 2 clients.
2700 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2701 let client_a = server.create_client(cx_a, "user_a").await;
2702 let client_b = server.create_client(cx_b, "user_b").await;
2703
2704 // Share a project as client A
2705 let project_a = cx_a.update(|cx| {
2706 Project::local(
2707 client_a.clone(),
2708 client_a.user_store.clone(),
2709 lang_registry.clone(),
2710 fs.clone(),
2711 cx,
2712 )
2713 });
2714 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2715
2716 let (worktree_1, _) = project_a
2717 .update(cx_a, |p, cx| {
2718 p.find_or_create_local_worktree("/root-1", true, cx)
2719 })
2720 .await
2721 .unwrap();
2722 worktree_1
2723 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2724 .await;
2725 let (worktree_2, _) = project_a
2726 .update(cx_a, |p, cx| {
2727 p.find_or_create_local_worktree("/root-2", true, cx)
2728 })
2729 .await
2730 .unwrap();
2731 worktree_2
2732 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2733 .await;
2734
2735 eprintln!("sharing");
2736
2737 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2738
2739 // Join the worktree as client B.
2740 let project_b = Project::remote(
2741 project_id,
2742 client_b.clone(),
2743 client_b.user_store.clone(),
2744 lang_registry.clone(),
2745 fs.clone(),
2746 &mut cx_b.to_async(),
2747 )
2748 .await
2749 .unwrap();
2750
2751 let results = project_b
2752 .update(cx_b, |project, cx| {
2753 project.search(SearchQuery::text("world", false, false), cx)
2754 })
2755 .await
2756 .unwrap();
2757
2758 let mut ranges_by_path = results
2759 .into_iter()
2760 .map(|(buffer, ranges)| {
2761 buffer.read_with(cx_b, |buffer, cx| {
2762 let path = buffer.file().unwrap().full_path(cx);
2763 let offset_ranges = ranges
2764 .into_iter()
2765 .map(|range| range.to_offset(buffer))
2766 .collect::<Vec<_>>();
2767 (path, offset_ranges)
2768 })
2769 })
2770 .collect::<Vec<_>>();
2771 ranges_by_path.sort_by_key(|(path, _)| path.clone());
2772
2773 assert_eq!(
2774 ranges_by_path,
2775 &[
2776 (PathBuf::from("root-1/a"), vec![6..11]),
2777 (PathBuf::from("root-1/c"), vec![2..7]),
2778 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2779 (PathBuf::from("root-2/e"), vec![7..12]),
2780 ]
2781 );
2782 }
2783
2784 #[gpui::test(iterations = 10)]
2785 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2786 cx_a.foreground().forbid_parking();
2787 let lang_registry = Arc::new(LanguageRegistry::new());
2788 let fs = FakeFs::new(cx_a.background());
2789 fs.insert_tree(
2790 "/root-1",
2791 json!({
2792 ".zed.toml": r#"collaborators = ["user_b"]"#,
2793 "main.rs": "fn double(number: i32) -> i32 { number + number }",
2794 }),
2795 )
2796 .await;
2797
2798 // Set up a fake language server.
2799 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2800 lang_registry.add(Arc::new(Language::new(
2801 LanguageConfig {
2802 name: "Rust".into(),
2803 path_suffixes: vec!["rs".to_string()],
2804 language_server: Some(language_server_config),
2805 ..Default::default()
2806 },
2807 Some(tree_sitter_rust::language()),
2808 )));
2809
2810 // Connect to a server as 2 clients.
2811 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2812 let client_a = server.create_client(cx_a, "user_a").await;
2813 let client_b = server.create_client(cx_b, "user_b").await;
2814
2815 // Share a project as client A
2816 let project_a = cx_a.update(|cx| {
2817 Project::local(
2818 client_a.clone(),
2819 client_a.user_store.clone(),
2820 lang_registry.clone(),
2821 fs.clone(),
2822 cx,
2823 )
2824 });
2825 let (worktree_a, _) = project_a
2826 .update(cx_a, |p, cx| {
2827 p.find_or_create_local_worktree("/root-1", true, cx)
2828 })
2829 .await
2830 .unwrap();
2831 worktree_a
2832 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2833 .await;
2834 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2835 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2836 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2837
2838 // Join the worktree as client B.
2839 let project_b = Project::remote(
2840 project_id,
2841 client_b.clone(),
2842 client_b.user_store.clone(),
2843 lang_registry.clone(),
2844 fs.clone(),
2845 &mut cx_b.to_async(),
2846 )
2847 .await
2848 .unwrap();
2849
2850 // Open the file on client B.
2851 let buffer_b = cx_b
2852 .background()
2853 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2854 .await
2855 .unwrap();
2856
2857 // Request document highlights as the guest.
2858 let highlights = project_b.update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2859
2860 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2861 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2862 |params, _| {
2863 assert_eq!(
2864 params
2865 .text_document_position_params
2866 .text_document
2867 .uri
2868 .as_str(),
2869 "file:///root-1/main.rs"
2870 );
2871 assert_eq!(
2872 params.text_document_position_params.position,
2873 lsp::Position::new(0, 34)
2874 );
2875 Some(vec![
2876 lsp::DocumentHighlight {
2877 kind: Some(lsp::DocumentHighlightKind::WRITE),
2878 range: lsp::Range::new(
2879 lsp::Position::new(0, 10),
2880 lsp::Position::new(0, 16),
2881 ),
2882 },
2883 lsp::DocumentHighlight {
2884 kind: Some(lsp::DocumentHighlightKind::READ),
2885 range: lsp::Range::new(
2886 lsp::Position::new(0, 32),
2887 lsp::Position::new(0, 38),
2888 ),
2889 },
2890 lsp::DocumentHighlight {
2891 kind: Some(lsp::DocumentHighlightKind::READ),
2892 range: lsp::Range::new(
2893 lsp::Position::new(0, 41),
2894 lsp::Position::new(0, 47),
2895 ),
2896 },
2897 ])
2898 },
2899 );
2900
2901 let highlights = highlights.await.unwrap();
2902 buffer_b.read_with(cx_b, |buffer, _| {
2903 let snapshot = buffer.snapshot();
2904
2905 let highlights = highlights
2906 .into_iter()
2907 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2908 .collect::<Vec<_>>();
2909 assert_eq!(
2910 highlights,
2911 &[
2912 (lsp::DocumentHighlightKind::WRITE, 10..16),
2913 (lsp::DocumentHighlightKind::READ, 32..38),
2914 (lsp::DocumentHighlightKind::READ, 41..47)
2915 ]
2916 )
2917 });
2918 }
2919
2920 #[gpui::test(iterations = 10)]
2921 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2922 cx_a.foreground().forbid_parking();
2923 let mut lang_registry = Arc::new(LanguageRegistry::new());
2924 let fs = FakeFs::new(cx_a.background());
2925 fs.insert_tree(
2926 "/code",
2927 json!({
2928 "crate-1": {
2929 ".zed.toml": r#"collaborators = ["user_b"]"#,
2930 "one.rs": "const ONE: usize = 1;",
2931 },
2932 "crate-2": {
2933 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2934 },
2935 "private": {
2936 "passwords.txt": "the-password",
2937 }
2938 }),
2939 )
2940 .await;
2941
2942 // Set up a fake language server.
2943 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2944 Arc::get_mut(&mut lang_registry)
2945 .unwrap()
2946 .add(Arc::new(Language::new(
2947 LanguageConfig {
2948 name: "Rust".into(),
2949 path_suffixes: vec!["rs".to_string()],
2950 language_server: Some(language_server_config),
2951 ..Default::default()
2952 },
2953 Some(tree_sitter_rust::language()),
2954 )));
2955
2956 // Connect to a server as 2 clients.
2957 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2958 let client_a = server.create_client(cx_a, "user_a").await;
2959 let client_b = server.create_client(cx_b, "user_b").await;
2960
2961 // Share a project as client A
2962 let project_a = cx_a.update(|cx| {
2963 Project::local(
2964 client_a.clone(),
2965 client_a.user_store.clone(),
2966 lang_registry.clone(),
2967 fs.clone(),
2968 cx,
2969 )
2970 });
2971 let (worktree_a, _) = project_a
2972 .update(cx_a, |p, cx| {
2973 p.find_or_create_local_worktree("/code/crate-1", true, cx)
2974 })
2975 .await
2976 .unwrap();
2977 worktree_a
2978 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2979 .await;
2980 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2981 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2982 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2983
2984 // Join the worktree as client B.
2985 let project_b = Project::remote(
2986 project_id,
2987 client_b.clone(),
2988 client_b.user_store.clone(),
2989 lang_registry.clone(),
2990 fs.clone(),
2991 &mut cx_b.to_async(),
2992 )
2993 .await
2994 .unwrap();
2995
2996 // Cause the language server to start.
2997 let _buffer = cx_b
2998 .background()
2999 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3000 .await
3001 .unwrap();
3002
3003 // Request the definition of a symbol as the guest.
3004 let symbols = project_b.update(cx_b, |p, cx| p.symbols("two", cx));
3005 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3006 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3007 #[allow(deprecated)]
3008 Some(vec![lsp::SymbolInformation {
3009 name: "TWO".into(),
3010 location: lsp::Location {
3011 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3012 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3013 },
3014 kind: lsp::SymbolKind::CONSTANT,
3015 tags: None,
3016 container_name: None,
3017 deprecated: None,
3018 }])
3019 });
3020
3021 let symbols = symbols.await.unwrap();
3022 assert_eq!(symbols.len(), 1);
3023 assert_eq!(symbols[0].name, "TWO");
3024
3025 // Open one of the returned symbols.
3026 let buffer_b_2 = project_b
3027 .update(cx_b, |project, cx| {
3028 project.open_buffer_for_symbol(&symbols[0], cx)
3029 })
3030 .await
3031 .unwrap();
3032 buffer_b_2.read_with(cx_b, |buffer, _| {
3033 assert_eq!(
3034 buffer.file().unwrap().path().as_ref(),
3035 Path::new("../crate-2/two.rs")
3036 );
3037 });
3038
3039 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3040 let mut fake_symbol = symbols[0].clone();
3041 fake_symbol.path = Path::new("/code/secrets").into();
3042 let error = project_b
3043 .update(cx_b, |project, cx| {
3044 project.open_buffer_for_symbol(&fake_symbol, cx)
3045 })
3046 .await
3047 .unwrap_err();
3048 assert!(error.to_string().contains("invalid symbol signature"));
3049 }
3050
3051 #[gpui::test(iterations = 10)]
3052 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3053 cx_a: &mut TestAppContext,
3054 cx_b: &mut TestAppContext,
3055 mut rng: StdRng,
3056 ) {
3057 cx_a.foreground().forbid_parking();
3058 let mut lang_registry = Arc::new(LanguageRegistry::new());
3059 let fs = FakeFs::new(cx_a.background());
3060 fs.insert_tree(
3061 "/root",
3062 json!({
3063 ".zed.toml": r#"collaborators = ["user_b"]"#,
3064 "a.rs": "const ONE: usize = b::TWO;",
3065 "b.rs": "const TWO: usize = 2",
3066 }),
3067 )
3068 .await;
3069
3070 // Set up a fake language server.
3071 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3072
3073 Arc::get_mut(&mut lang_registry)
3074 .unwrap()
3075 .add(Arc::new(Language::new(
3076 LanguageConfig {
3077 name: "Rust".into(),
3078 path_suffixes: vec!["rs".to_string()],
3079 language_server: Some(language_server_config),
3080 ..Default::default()
3081 },
3082 Some(tree_sitter_rust::language()),
3083 )));
3084
3085 // Connect to a server as 2 clients.
3086 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3087 let client_a = server.create_client(cx_a, "user_a").await;
3088 let client_b = server.create_client(cx_b, "user_b").await;
3089
3090 // Share a project as client A
3091 let project_a = cx_a.update(|cx| {
3092 Project::local(
3093 client_a.clone(),
3094 client_a.user_store.clone(),
3095 lang_registry.clone(),
3096 fs.clone(),
3097 cx,
3098 )
3099 });
3100
3101 let (worktree_a, _) = project_a
3102 .update(cx_a, |p, cx| {
3103 p.find_or_create_local_worktree("/root", true, cx)
3104 })
3105 .await
3106 .unwrap();
3107 worktree_a
3108 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3109 .await;
3110 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3111 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3112 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3113
3114 // Join the worktree as client B.
3115 let project_b = Project::remote(
3116 project_id,
3117 client_b.clone(),
3118 client_b.user_store.clone(),
3119 lang_registry.clone(),
3120 fs.clone(),
3121 &mut cx_b.to_async(),
3122 )
3123 .await
3124 .unwrap();
3125
3126 let buffer_b1 = cx_b
3127 .background()
3128 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3129 .await
3130 .unwrap();
3131
3132 let definitions;
3133 let buffer_b2;
3134 if rng.gen() {
3135 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3136 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3137 } else {
3138 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3139 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3140 }
3141
3142 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3143 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3144 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3145 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3146 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3147 )))
3148 });
3149
3150 let buffer_b2 = buffer_b2.await.unwrap();
3151 let definitions = definitions.await.unwrap();
3152 assert_eq!(definitions.len(), 1);
3153 assert_eq!(definitions[0].buffer, buffer_b2);
3154 }
3155
3156 #[gpui::test(iterations = 10)]
3157 async fn test_collaborating_with_code_actions(
3158 cx_a: &mut TestAppContext,
3159 cx_b: &mut TestAppContext,
3160 ) {
3161 cx_a.foreground().forbid_parking();
3162 let mut lang_registry = Arc::new(LanguageRegistry::new());
3163 let fs = FakeFs::new(cx_a.background());
3164 let mut path_openers_b = Vec::new();
3165 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3166
3167 // Set up a fake language server.
3168 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3169 Arc::get_mut(&mut lang_registry)
3170 .unwrap()
3171 .add(Arc::new(Language::new(
3172 LanguageConfig {
3173 name: "Rust".into(),
3174 path_suffixes: vec!["rs".to_string()],
3175 language_server: Some(language_server_config),
3176 ..Default::default()
3177 },
3178 Some(tree_sitter_rust::language()),
3179 )));
3180
3181 // Connect to a server as 2 clients.
3182 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3183 let client_a = server.create_client(cx_a, "user_a").await;
3184 let client_b = server.create_client(cx_b, "user_b").await;
3185
3186 // Share a project as client A
3187 fs.insert_tree(
3188 "/a",
3189 json!({
3190 ".zed.toml": r#"collaborators = ["user_b"]"#,
3191 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3192 "other.rs": "pub fn foo() -> usize { 4 }",
3193 }),
3194 )
3195 .await;
3196 let project_a = cx_a.update(|cx| {
3197 Project::local(
3198 client_a.clone(),
3199 client_a.user_store.clone(),
3200 lang_registry.clone(),
3201 fs.clone(),
3202 cx,
3203 )
3204 });
3205 let (worktree_a, _) = project_a
3206 .update(cx_a, |p, cx| {
3207 p.find_or_create_local_worktree("/a", true, cx)
3208 })
3209 .await
3210 .unwrap();
3211 worktree_a
3212 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3213 .await;
3214 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3215 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3216 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3217
3218 // Join the worktree as client B.
3219 let project_b = Project::remote(
3220 project_id,
3221 client_b.clone(),
3222 client_b.user_store.clone(),
3223 lang_registry.clone(),
3224 fs.clone(),
3225 &mut cx_b.to_async(),
3226 )
3227 .await
3228 .unwrap();
3229 let mut params = cx_b.update(WorkspaceParams::test);
3230 params.languages = lang_registry.clone();
3231 params.client = client_b.client.clone();
3232 params.user_store = client_b.user_store.clone();
3233 params.project = project_b;
3234 params.path_openers = path_openers_b.into();
3235
3236 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3237 let editor_b = workspace_b
3238 .update(cx_b, |workspace, cx| {
3239 workspace.open_path((worktree_id, "main.rs").into(), cx)
3240 })
3241 .await
3242 .unwrap()
3243 .downcast::<Editor>()
3244 .unwrap();
3245
3246 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3247 fake_language_server
3248 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3249 assert_eq!(
3250 params.text_document.uri,
3251 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3252 );
3253 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3254 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3255 None
3256 })
3257 .next()
3258 .await;
3259
3260 // Move cursor to a location that contains code actions.
3261 editor_b.update(cx_b, |editor, cx| {
3262 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3263 cx.focus(&editor_b);
3264 });
3265
3266 fake_language_server
3267 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3268 assert_eq!(
3269 params.text_document.uri,
3270 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3271 );
3272 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3273 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3274
3275 Some(vec![lsp::CodeActionOrCommand::CodeAction(
3276 lsp::CodeAction {
3277 title: "Inline into all callers".to_string(),
3278 edit: Some(lsp::WorkspaceEdit {
3279 changes: Some(
3280 [
3281 (
3282 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3283 vec![lsp::TextEdit::new(
3284 lsp::Range::new(
3285 lsp::Position::new(1, 22),
3286 lsp::Position::new(1, 34),
3287 ),
3288 "4".to_string(),
3289 )],
3290 ),
3291 (
3292 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3293 vec![lsp::TextEdit::new(
3294 lsp::Range::new(
3295 lsp::Position::new(0, 0),
3296 lsp::Position::new(0, 27),
3297 ),
3298 "".to_string(),
3299 )],
3300 ),
3301 ]
3302 .into_iter()
3303 .collect(),
3304 ),
3305 ..Default::default()
3306 }),
3307 data: Some(json!({
3308 "codeActionParams": {
3309 "range": {
3310 "start": {"line": 1, "column": 31},
3311 "end": {"line": 1, "column": 31},
3312 }
3313 }
3314 })),
3315 ..Default::default()
3316 },
3317 )])
3318 })
3319 .next()
3320 .await;
3321
3322 // Toggle code actions and wait for them to display.
3323 editor_b.update(cx_b, |editor, cx| {
3324 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3325 });
3326 editor_b
3327 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3328 .await;
3329
3330 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3331
3332 // Confirming the code action will trigger a resolve request.
3333 let confirm_action = workspace_b
3334 .update(cx_b, |workspace, cx| {
3335 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3336 })
3337 .unwrap();
3338 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3339 lsp::CodeAction {
3340 title: "Inline into all callers".to_string(),
3341 edit: Some(lsp::WorkspaceEdit {
3342 changes: Some(
3343 [
3344 (
3345 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3346 vec![lsp::TextEdit::new(
3347 lsp::Range::new(
3348 lsp::Position::new(1, 22),
3349 lsp::Position::new(1, 34),
3350 ),
3351 "4".to_string(),
3352 )],
3353 ),
3354 (
3355 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3356 vec![lsp::TextEdit::new(
3357 lsp::Range::new(
3358 lsp::Position::new(0, 0),
3359 lsp::Position::new(0, 27),
3360 ),
3361 "".to_string(),
3362 )],
3363 ),
3364 ]
3365 .into_iter()
3366 .collect(),
3367 ),
3368 ..Default::default()
3369 }),
3370 ..Default::default()
3371 }
3372 });
3373
3374 // After the action is confirmed, an editor containing both modified files is opened.
3375 confirm_action.await.unwrap();
3376 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3377 workspace
3378 .active_item(cx)
3379 .unwrap()
3380 .downcast::<Editor>()
3381 .unwrap()
3382 });
3383 code_action_editor.update(cx_b, |editor, cx| {
3384 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3385 editor.undo(&Undo, cx);
3386 assert_eq!(
3387 editor.text(cx),
3388 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3389 );
3390 editor.redo(&Redo, cx);
3391 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3392 });
3393 }
3394
3395 #[gpui::test(iterations = 10)]
3396 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3397 cx_a.foreground().forbid_parking();
3398 let mut lang_registry = Arc::new(LanguageRegistry::new());
3399 let fs = FakeFs::new(cx_a.background());
3400 let mut path_openers_b = Vec::new();
3401 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3402
3403 // Set up a fake language server.
3404 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3405 Arc::get_mut(&mut lang_registry)
3406 .unwrap()
3407 .add(Arc::new(Language::new(
3408 LanguageConfig {
3409 name: "Rust".into(),
3410 path_suffixes: vec!["rs".to_string()],
3411 language_server: Some(language_server_config),
3412 ..Default::default()
3413 },
3414 Some(tree_sitter_rust::language()),
3415 )));
3416
3417 // Connect to a server as 2 clients.
3418 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3419 let client_a = server.create_client(cx_a, "user_a").await;
3420 let client_b = server.create_client(cx_b, "user_b").await;
3421
3422 // Share a project as client A
3423 fs.insert_tree(
3424 "/dir",
3425 json!({
3426 ".zed.toml": r#"collaborators = ["user_b"]"#,
3427 "one.rs": "const ONE: usize = 1;",
3428 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3429 }),
3430 )
3431 .await;
3432 let project_a = cx_a.update(|cx| {
3433 Project::local(
3434 client_a.clone(),
3435 client_a.user_store.clone(),
3436 lang_registry.clone(),
3437 fs.clone(),
3438 cx,
3439 )
3440 });
3441 let (worktree_a, _) = project_a
3442 .update(cx_a, |p, cx| {
3443 p.find_or_create_local_worktree("/dir", true, cx)
3444 })
3445 .await
3446 .unwrap();
3447 worktree_a
3448 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3449 .await;
3450 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3451 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3452 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3453
3454 // Join the worktree as client B.
3455 let project_b = Project::remote(
3456 project_id,
3457 client_b.clone(),
3458 client_b.user_store.clone(),
3459 lang_registry.clone(),
3460 fs.clone(),
3461 &mut cx_b.to_async(),
3462 )
3463 .await
3464 .unwrap();
3465 let mut params = cx_b.update(WorkspaceParams::test);
3466 params.languages = lang_registry.clone();
3467 params.client = client_b.client.clone();
3468 params.user_store = client_b.user_store.clone();
3469 params.project = project_b;
3470 params.path_openers = path_openers_b.into();
3471
3472 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3473 let editor_b = workspace_b
3474 .update(cx_b, |workspace, cx| {
3475 workspace.open_path((worktree_id, "one.rs").into(), cx)
3476 })
3477 .await
3478 .unwrap()
3479 .downcast::<Editor>()
3480 .unwrap();
3481 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3482
3483 // Move cursor to a location that can be renamed.
3484 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3485 editor.select_ranges([7..7], None, cx);
3486 editor.rename(&Rename, cx).unwrap()
3487 });
3488
3489 fake_language_server
3490 .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3491 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3492 assert_eq!(params.position, lsp::Position::new(0, 7));
3493 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3494 lsp::Position::new(0, 6),
3495 lsp::Position::new(0, 9),
3496 )))
3497 })
3498 .next()
3499 .await
3500 .unwrap();
3501 prepare_rename.await.unwrap();
3502 editor_b.update(cx_b, |editor, cx| {
3503 let rename = editor.pending_rename().unwrap();
3504 let buffer = editor.buffer().read(cx).snapshot(cx);
3505 assert_eq!(
3506 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3507 6..9
3508 );
3509 rename.editor.update(cx, |rename_editor, cx| {
3510 rename_editor.buffer().update(cx, |rename_buffer, cx| {
3511 rename_buffer.edit([0..3], "THREE", cx);
3512 });
3513 });
3514 });
3515
3516 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3517 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3518 });
3519 fake_language_server
3520 .handle_request::<lsp::request::Rename, _>(|params, _| {
3521 assert_eq!(
3522 params.text_document_position.text_document.uri.as_str(),
3523 "file:///dir/one.rs"
3524 );
3525 assert_eq!(
3526 params.text_document_position.position,
3527 lsp::Position::new(0, 6)
3528 );
3529 assert_eq!(params.new_name, "THREE");
3530 Some(lsp::WorkspaceEdit {
3531 changes: Some(
3532 [
3533 (
3534 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3535 vec![lsp::TextEdit::new(
3536 lsp::Range::new(
3537 lsp::Position::new(0, 6),
3538 lsp::Position::new(0, 9),
3539 ),
3540 "THREE".to_string(),
3541 )],
3542 ),
3543 (
3544 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3545 vec![
3546 lsp::TextEdit::new(
3547 lsp::Range::new(
3548 lsp::Position::new(0, 24),
3549 lsp::Position::new(0, 27),
3550 ),
3551 "THREE".to_string(),
3552 ),
3553 lsp::TextEdit::new(
3554 lsp::Range::new(
3555 lsp::Position::new(0, 35),
3556 lsp::Position::new(0, 38),
3557 ),
3558 "THREE".to_string(),
3559 ),
3560 ],
3561 ),
3562 ]
3563 .into_iter()
3564 .collect(),
3565 ),
3566 ..Default::default()
3567 })
3568 })
3569 .next()
3570 .await
3571 .unwrap();
3572 confirm_rename.await.unwrap();
3573
3574 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3575 workspace
3576 .active_item(cx)
3577 .unwrap()
3578 .downcast::<Editor>()
3579 .unwrap()
3580 });
3581 rename_editor.update(cx_b, |editor, cx| {
3582 assert_eq!(
3583 editor.text(cx),
3584 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3585 );
3586 editor.undo(&Undo, cx);
3587 assert_eq!(
3588 editor.text(cx),
3589 "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3590 );
3591 editor.redo(&Redo, cx);
3592 assert_eq!(
3593 editor.text(cx),
3594 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3595 );
3596 });
3597
3598 // Ensure temporary rename edits cannot be undone/redone.
3599 editor_b.update(cx_b, |editor, cx| {
3600 editor.undo(&Undo, cx);
3601 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3602 editor.undo(&Undo, cx);
3603 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3604 editor.redo(&Redo, cx);
3605 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3606 })
3607 }
3608
3609 #[gpui::test(iterations = 10)]
3610 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3611 cx_a.foreground().forbid_parking();
3612
3613 // Connect to a server as 2 clients.
3614 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3615 let client_a = server.create_client(cx_a, "user_a").await;
3616 let client_b = server.create_client(cx_b, "user_b").await;
3617
3618 // Create an org that includes these 2 users.
3619 let db = &server.app_state.db;
3620 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3621 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3622 .await
3623 .unwrap();
3624 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3625 .await
3626 .unwrap();
3627
3628 // Create a channel that includes all the users.
3629 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3630 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3631 .await
3632 .unwrap();
3633 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3634 .await
3635 .unwrap();
3636 db.create_channel_message(
3637 channel_id,
3638 client_b.current_user_id(&cx_b),
3639 "hello A, it's B.",
3640 OffsetDateTime::now_utc(),
3641 1,
3642 )
3643 .await
3644 .unwrap();
3645
3646 let channels_a = cx_a
3647 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3648 channels_a
3649 .condition(cx_a, |list, _| list.available_channels().is_some())
3650 .await;
3651 channels_a.read_with(cx_a, |list, _| {
3652 assert_eq!(
3653 list.available_channels().unwrap(),
3654 &[ChannelDetails {
3655 id: channel_id.to_proto(),
3656 name: "test-channel".to_string()
3657 }]
3658 )
3659 });
3660 let channel_a = channels_a.update(cx_a, |this, cx| {
3661 this.get_channel(channel_id.to_proto(), cx).unwrap()
3662 });
3663 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3664 channel_a
3665 .condition(&cx_a, |channel, _| {
3666 channel_messages(channel)
3667 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3668 })
3669 .await;
3670
3671 let channels_b = cx_b
3672 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3673 channels_b
3674 .condition(cx_b, |list, _| list.available_channels().is_some())
3675 .await;
3676 channels_b.read_with(cx_b, |list, _| {
3677 assert_eq!(
3678 list.available_channels().unwrap(),
3679 &[ChannelDetails {
3680 id: channel_id.to_proto(),
3681 name: "test-channel".to_string()
3682 }]
3683 )
3684 });
3685
3686 let channel_b = channels_b.update(cx_b, |this, cx| {
3687 this.get_channel(channel_id.to_proto(), cx).unwrap()
3688 });
3689 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3690 channel_b
3691 .condition(&cx_b, |channel, _| {
3692 channel_messages(channel)
3693 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3694 })
3695 .await;
3696
3697 channel_a
3698 .update(cx_a, |channel, cx| {
3699 channel
3700 .send_message("oh, hi B.".to_string(), cx)
3701 .unwrap()
3702 .detach();
3703 let task = channel.send_message("sup".to_string(), cx).unwrap();
3704 assert_eq!(
3705 channel_messages(channel),
3706 &[
3707 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3708 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3709 ("user_a".to_string(), "sup".to_string(), true)
3710 ]
3711 );
3712 task
3713 })
3714 .await
3715 .unwrap();
3716
3717 channel_b
3718 .condition(&cx_b, |channel, _| {
3719 channel_messages(channel)
3720 == [
3721 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3722 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3723 ("user_a".to_string(), "sup".to_string(), false),
3724 ]
3725 })
3726 .await;
3727
3728 assert_eq!(
3729 server
3730 .state()
3731 .await
3732 .channel(channel_id)
3733 .unwrap()
3734 .connection_ids
3735 .len(),
3736 2
3737 );
3738 cx_b.update(|_| drop(channel_b));
3739 server
3740 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3741 .await;
3742
3743 cx_a.update(|_| drop(channel_a));
3744 server
3745 .condition(|state| state.channel(channel_id).is_none())
3746 .await;
3747 }
3748
3749 #[gpui::test(iterations = 10)]
3750 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3751 cx_a.foreground().forbid_parking();
3752
3753 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3754 let client_a = server.create_client(cx_a, "user_a").await;
3755
3756 let db = &server.app_state.db;
3757 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3758 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3759 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3760 .await
3761 .unwrap();
3762 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3763 .await
3764 .unwrap();
3765
3766 let channels_a = cx_a
3767 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3768 channels_a
3769 .condition(cx_a, |list, _| list.available_channels().is_some())
3770 .await;
3771 let channel_a = channels_a.update(cx_a, |this, cx| {
3772 this.get_channel(channel_id.to_proto(), cx).unwrap()
3773 });
3774
3775 // Messages aren't allowed to be too long.
3776 channel_a
3777 .update(cx_a, |channel, cx| {
3778 let long_body = "this is long.\n".repeat(1024);
3779 channel.send_message(long_body, cx).unwrap()
3780 })
3781 .await
3782 .unwrap_err();
3783
3784 // Messages aren't allowed to be blank.
3785 channel_a.update(cx_a, |channel, cx| {
3786 channel.send_message(String::new(), cx).unwrap_err()
3787 });
3788
3789 // Leading and trailing whitespace are trimmed.
3790 channel_a
3791 .update(cx_a, |channel, cx| {
3792 channel
3793 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3794 .unwrap()
3795 })
3796 .await
3797 .unwrap();
3798 assert_eq!(
3799 db.get_channel_messages(channel_id, 10, None)
3800 .await
3801 .unwrap()
3802 .iter()
3803 .map(|m| &m.body)
3804 .collect::<Vec<_>>(),
3805 &["surrounded by whitespace"]
3806 );
3807 }
3808
3809 #[gpui::test(iterations = 10)]
3810 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3811 cx_a.foreground().forbid_parking();
3812
3813 // Connect to a server as 2 clients.
3814 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3815 let client_a = server.create_client(cx_a, "user_a").await;
3816 let client_b = server.create_client(cx_b, "user_b").await;
3817 let mut status_b = client_b.status();
3818
3819 // Create an org that includes these 2 users.
3820 let db = &server.app_state.db;
3821 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3822 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3823 .await
3824 .unwrap();
3825 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3826 .await
3827 .unwrap();
3828
3829 // Create a channel that includes all the users.
3830 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3831 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3832 .await
3833 .unwrap();
3834 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3835 .await
3836 .unwrap();
3837 db.create_channel_message(
3838 channel_id,
3839 client_b.current_user_id(&cx_b),
3840 "hello A, it's B.",
3841 OffsetDateTime::now_utc(),
3842 2,
3843 )
3844 .await
3845 .unwrap();
3846
3847 let channels_a = cx_a
3848 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3849 channels_a
3850 .condition(cx_a, |list, _| list.available_channels().is_some())
3851 .await;
3852
3853 channels_a.read_with(cx_a, |list, _| {
3854 assert_eq!(
3855 list.available_channels().unwrap(),
3856 &[ChannelDetails {
3857 id: channel_id.to_proto(),
3858 name: "test-channel".to_string()
3859 }]
3860 )
3861 });
3862 let channel_a = channels_a.update(cx_a, |this, cx| {
3863 this.get_channel(channel_id.to_proto(), cx).unwrap()
3864 });
3865 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3866 channel_a
3867 .condition(&cx_a, |channel, _| {
3868 channel_messages(channel)
3869 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3870 })
3871 .await;
3872
3873 let channels_b = cx_b
3874 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3875 channels_b
3876 .condition(cx_b, |list, _| list.available_channels().is_some())
3877 .await;
3878 channels_b.read_with(cx_b, |list, _| {
3879 assert_eq!(
3880 list.available_channels().unwrap(),
3881 &[ChannelDetails {
3882 id: channel_id.to_proto(),
3883 name: "test-channel".to_string()
3884 }]
3885 )
3886 });
3887
3888 let channel_b = channels_b.update(cx_b, |this, cx| {
3889 this.get_channel(channel_id.to_proto(), cx).unwrap()
3890 });
3891 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3892 channel_b
3893 .condition(&cx_b, |channel, _| {
3894 channel_messages(channel)
3895 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3896 })
3897 .await;
3898
3899 // Disconnect client B, ensuring we can still access its cached channel data.
3900 server.forbid_connections();
3901 server.disconnect_client(client_b.current_user_id(&cx_b));
3902 cx_b.foreground().advance_clock(Duration::from_secs(3));
3903 while !matches!(
3904 status_b.next().await,
3905 Some(client::Status::ReconnectionError { .. })
3906 ) {}
3907
3908 channels_b.read_with(cx_b, |channels, _| {
3909 assert_eq!(
3910 channels.available_channels().unwrap(),
3911 [ChannelDetails {
3912 id: channel_id.to_proto(),
3913 name: "test-channel".to_string()
3914 }]
3915 )
3916 });
3917 channel_b.read_with(cx_b, |channel, _| {
3918 assert_eq!(
3919 channel_messages(channel),
3920 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3921 )
3922 });
3923
3924 // Send a message from client B while it is disconnected.
3925 channel_b
3926 .update(cx_b, |channel, cx| {
3927 let task = channel
3928 .send_message("can you see this?".to_string(), cx)
3929 .unwrap();
3930 assert_eq!(
3931 channel_messages(channel),
3932 &[
3933 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3934 ("user_b".to_string(), "can you see this?".to_string(), true)
3935 ]
3936 );
3937 task
3938 })
3939 .await
3940 .unwrap_err();
3941
3942 // Send a message from client A while B is disconnected.
3943 channel_a
3944 .update(cx_a, |channel, cx| {
3945 channel
3946 .send_message("oh, hi B.".to_string(), cx)
3947 .unwrap()
3948 .detach();
3949 let task = channel.send_message("sup".to_string(), cx).unwrap();
3950 assert_eq!(
3951 channel_messages(channel),
3952 &[
3953 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3954 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3955 ("user_a".to_string(), "sup".to_string(), true)
3956 ]
3957 );
3958 task
3959 })
3960 .await
3961 .unwrap();
3962
3963 // Give client B a chance to reconnect.
3964 server.allow_connections();
3965 cx_b.foreground().advance_clock(Duration::from_secs(10));
3966
3967 // Verify that B sees the new messages upon reconnection, as well as the message client B
3968 // sent while offline.
3969 channel_b
3970 .condition(&cx_b, |channel, _| {
3971 channel_messages(channel)
3972 == [
3973 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3974 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3975 ("user_a".to_string(), "sup".to_string(), false),
3976 ("user_b".to_string(), "can you see this?".to_string(), false),
3977 ]
3978 })
3979 .await;
3980
3981 // Ensure client A and B can communicate normally after reconnection.
3982 channel_a
3983 .update(cx_a, |channel, cx| {
3984 channel.send_message("you online?".to_string(), cx).unwrap()
3985 })
3986 .await
3987 .unwrap();
3988 channel_b
3989 .condition(&cx_b, |channel, _| {
3990 channel_messages(channel)
3991 == [
3992 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3993 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3994 ("user_a".to_string(), "sup".to_string(), false),
3995 ("user_b".to_string(), "can you see this?".to_string(), false),
3996 ("user_a".to_string(), "you online?".to_string(), false),
3997 ]
3998 })
3999 .await;
4000
4001 channel_b
4002 .update(cx_b, |channel, cx| {
4003 channel.send_message("yep".to_string(), cx).unwrap()
4004 })
4005 .await
4006 .unwrap();
4007 channel_a
4008 .condition(&cx_a, |channel, _| {
4009 channel_messages(channel)
4010 == [
4011 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4012 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4013 ("user_a".to_string(), "sup".to_string(), false),
4014 ("user_b".to_string(), "can you see this?".to_string(), false),
4015 ("user_a".to_string(), "you online?".to_string(), false),
4016 ("user_b".to_string(), "yep".to_string(), false),
4017 ]
4018 })
4019 .await;
4020 }
4021
4022 #[gpui::test(iterations = 10)]
4023 async fn test_contacts(
4024 cx_a: &mut TestAppContext,
4025 cx_b: &mut TestAppContext,
4026 cx_c: &mut TestAppContext,
4027 ) {
4028 cx_a.foreground().forbid_parking();
4029 let lang_registry = Arc::new(LanguageRegistry::new());
4030 let fs = FakeFs::new(cx_a.background());
4031
4032 // Connect to a server as 3 clients.
4033 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4034 let client_a = server.create_client(cx_a, "user_a").await;
4035 let client_b = server.create_client(cx_b, "user_b").await;
4036 let client_c = server.create_client(cx_c, "user_c").await;
4037
4038 // Share a worktree as client A.
4039 fs.insert_tree(
4040 "/a",
4041 json!({
4042 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4043 }),
4044 )
4045 .await;
4046
4047 let project_a = cx_a.update(|cx| {
4048 Project::local(
4049 client_a.clone(),
4050 client_a.user_store.clone(),
4051 lang_registry.clone(),
4052 fs.clone(),
4053 cx,
4054 )
4055 });
4056 let (worktree_a, _) = project_a
4057 .update(cx_a, |p, cx| {
4058 p.find_or_create_local_worktree("/a", true, cx)
4059 })
4060 .await
4061 .unwrap();
4062 worktree_a
4063 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4064 .await;
4065
4066 client_a
4067 .user_store
4068 .condition(&cx_a, |user_store, _| {
4069 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4070 })
4071 .await;
4072 client_b
4073 .user_store
4074 .condition(&cx_b, |user_store, _| {
4075 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4076 })
4077 .await;
4078 client_c
4079 .user_store
4080 .condition(&cx_c, |user_store, _| {
4081 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4082 })
4083 .await;
4084
4085 let project_id = project_a
4086 .update(cx_a, |project, _| project.next_remote_id())
4087 .await;
4088 project_a
4089 .update(cx_a, |project, cx| project.share(cx))
4090 .await
4091 .unwrap();
4092
4093 let _project_b = Project::remote(
4094 project_id,
4095 client_b.clone(),
4096 client_b.user_store.clone(),
4097 lang_registry.clone(),
4098 fs.clone(),
4099 &mut cx_b.to_async(),
4100 )
4101 .await
4102 .unwrap();
4103
4104 client_a
4105 .user_store
4106 .condition(&cx_a, |user_store, _| {
4107 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4108 })
4109 .await;
4110 client_b
4111 .user_store
4112 .condition(&cx_b, |user_store, _| {
4113 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4114 })
4115 .await;
4116 client_c
4117 .user_store
4118 .condition(&cx_c, |user_store, _| {
4119 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4120 })
4121 .await;
4122
4123 project_a
4124 .condition(&cx_a, |project, _| {
4125 project.collaborators().contains_key(&client_b.peer_id)
4126 })
4127 .await;
4128
4129 cx_a.update(move |_| drop(project_a));
4130 client_a
4131 .user_store
4132 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4133 .await;
4134 client_b
4135 .user_store
4136 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4137 .await;
4138 client_c
4139 .user_store
4140 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4141 .await;
4142
4143 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4144 user_store
4145 .contacts()
4146 .iter()
4147 .map(|contact| {
4148 let worktrees = contact
4149 .projects
4150 .iter()
4151 .map(|p| {
4152 (
4153 p.worktree_root_names[0].as_str(),
4154 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4155 )
4156 })
4157 .collect();
4158 (contact.user.github_login.as_str(), worktrees)
4159 })
4160 .collect()
4161 }
4162 }
4163
4164 #[gpui::test(iterations = 100)]
4165 async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4166 cx.foreground().forbid_parking();
4167 let max_peers = env::var("MAX_PEERS")
4168 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4169 .unwrap_or(5);
4170 let max_operations = env::var("OPERATIONS")
4171 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4172 .unwrap_or(10);
4173
4174 let rng = Arc::new(Mutex::new(rng));
4175
4176 let guest_lang_registry = Arc::new(LanguageRegistry::new());
4177 let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4178
4179 let fs = FakeFs::new(cx.background());
4180 fs.insert_tree(
4181 "/_collab",
4182 json!({
4183 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4184 }),
4185 )
4186 .await;
4187
4188 let operations = Rc::new(Cell::new(0));
4189 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4190 let mut clients = Vec::new();
4191
4192 let mut next_entity_id = 100000;
4193 let mut host_cx = TestAppContext::new(
4194 cx.foreground_platform(),
4195 cx.platform(),
4196 cx.foreground(),
4197 cx.background(),
4198 cx.font_cache(),
4199 cx.leak_detector(),
4200 next_entity_id,
4201 );
4202 let host = server.create_client(&mut host_cx, "host").await;
4203 let host_project = host_cx.update(|cx| {
4204 Project::local(
4205 host.client.clone(),
4206 host.user_store.clone(),
4207 Arc::new(LanguageRegistry::new()),
4208 fs.clone(),
4209 cx,
4210 )
4211 });
4212 let host_project_id = host_project
4213 .update(&mut host_cx, |p, _| p.next_remote_id())
4214 .await;
4215
4216 let (collab_worktree, _) = host_project
4217 .update(&mut host_cx, |project, cx| {
4218 project.find_or_create_local_worktree("/_collab", true, cx)
4219 })
4220 .await
4221 .unwrap();
4222 collab_worktree
4223 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4224 .await;
4225 host_project
4226 .update(&mut host_cx, |project, cx| project.share(cx))
4227 .await
4228 .unwrap();
4229
4230 clients.push(cx.foreground().spawn(host.simulate_host(
4231 host_project,
4232 language_server_config,
4233 operations.clone(),
4234 max_operations,
4235 rng.clone(),
4236 host_cx,
4237 )));
4238
4239 while operations.get() < max_operations {
4240 cx.background().simulate_random_delay().await;
4241 if clients.len() >= max_peers {
4242 break;
4243 } else if rng.lock().gen_bool(0.05) {
4244 operations.set(operations.get() + 1);
4245
4246 let guest_id = clients.len();
4247 log::info!("Adding guest {}", guest_id);
4248 next_entity_id += 100000;
4249 let mut guest_cx = TestAppContext::new(
4250 cx.foreground_platform(),
4251 cx.platform(),
4252 cx.foreground(),
4253 cx.background(),
4254 cx.font_cache(),
4255 cx.leak_detector(),
4256 next_entity_id,
4257 );
4258 let guest = server
4259 .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4260 .await;
4261 let guest_project = Project::remote(
4262 host_project_id,
4263 guest.client.clone(),
4264 guest.user_store.clone(),
4265 guest_lang_registry.clone(),
4266 FakeFs::new(cx.background()),
4267 &mut guest_cx.to_async(),
4268 )
4269 .await
4270 .unwrap();
4271 clients.push(cx.foreground().spawn(guest.simulate_guest(
4272 guest_id,
4273 guest_project,
4274 operations.clone(),
4275 max_operations,
4276 rng.clone(),
4277 guest_cx,
4278 )));
4279
4280 log::info!("Guest {} added", guest_id);
4281 }
4282 }
4283
4284 let mut clients = futures::future::join_all(clients).await;
4285 cx.foreground().run_until_parked();
4286
4287 let (host_client, mut host_cx) = clients.remove(0);
4288 let host_project = host_client.project.as_ref().unwrap();
4289 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4290 project
4291 .worktrees(cx)
4292 .map(|worktree| {
4293 let snapshot = worktree.read(cx).snapshot();
4294 (snapshot.id(), snapshot)
4295 })
4296 .collect::<BTreeMap<_, _>>()
4297 });
4298
4299 host_client
4300 .project
4301 .as_ref()
4302 .unwrap()
4303 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4304
4305 for (guest_client, mut guest_cx) in clients.into_iter() {
4306 let guest_id = guest_client.client.id();
4307 let worktree_snapshots =
4308 guest_client
4309 .project
4310 .as_ref()
4311 .unwrap()
4312 .read_with(&guest_cx, |project, cx| {
4313 project
4314 .worktrees(cx)
4315 .map(|worktree| {
4316 let worktree = worktree.read(cx);
4317 (worktree.id(), worktree.snapshot())
4318 })
4319 .collect::<BTreeMap<_, _>>()
4320 });
4321
4322 assert_eq!(
4323 worktree_snapshots.keys().collect::<Vec<_>>(),
4324 host_worktree_snapshots.keys().collect::<Vec<_>>(),
4325 "guest {} has different worktrees than the host",
4326 guest_id
4327 );
4328 for (id, host_snapshot) in &host_worktree_snapshots {
4329 let guest_snapshot = &worktree_snapshots[id];
4330 assert_eq!(
4331 guest_snapshot.root_name(),
4332 host_snapshot.root_name(),
4333 "guest {} has different root name than the host for worktree {}",
4334 guest_id,
4335 id
4336 );
4337 assert_eq!(
4338 guest_snapshot.entries(false).collect::<Vec<_>>(),
4339 host_snapshot.entries(false).collect::<Vec<_>>(),
4340 "guest {} has different snapshot than the host for worktree {}",
4341 guest_id,
4342 id
4343 );
4344 }
4345
4346 guest_client
4347 .project
4348 .as_ref()
4349 .unwrap()
4350 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4351
4352 for guest_buffer in &guest_client.buffers {
4353 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4354 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4355 project.buffer_for_id(buffer_id, cx).expect(&format!(
4356 "host does not have buffer for guest:{}, peer:{}, id:{}",
4357 guest_id, guest_client.peer_id, buffer_id
4358 ))
4359 });
4360 let path = host_buffer
4361 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4362
4363 assert_eq!(
4364 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4365 0,
4366 "guest {}, buffer {}, path {:?} has deferred operations",
4367 guest_id,
4368 buffer_id,
4369 path,
4370 );
4371 assert_eq!(
4372 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4373 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4374 "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4375 guest_id,
4376 buffer_id,
4377 path
4378 );
4379 }
4380
4381 guest_cx.update(|_| drop(guest_client));
4382 }
4383
4384 host_cx.update(|_| drop(host_client));
4385 }
4386
4387 struct TestServer {
4388 peer: Arc<Peer>,
4389 app_state: Arc<AppState>,
4390 server: Arc<Server>,
4391 foreground: Rc<executor::Foreground>,
4392 notifications: mpsc::UnboundedReceiver<()>,
4393 connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4394 forbid_connections: Arc<AtomicBool>,
4395 _test_db: TestDb,
4396 }
4397
4398 impl TestServer {
4399 async fn start(
4400 foreground: Rc<executor::Foreground>,
4401 background: Arc<executor::Background>,
4402 ) -> Self {
4403 let test_db = TestDb::fake(background);
4404 let app_state = Self::build_app_state(&test_db).await;
4405 let peer = Peer::new();
4406 let notifications = mpsc::unbounded();
4407 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4408 Self {
4409 peer,
4410 app_state,
4411 server,
4412 foreground,
4413 notifications: notifications.1,
4414 connection_killers: Default::default(),
4415 forbid_connections: Default::default(),
4416 _test_db: test_db,
4417 }
4418 }
4419
4420 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4421 let http = FakeHttpClient::with_404_response();
4422 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4423 let client_name = name.to_string();
4424 let mut client = Client::new(http.clone());
4425 let server = self.server.clone();
4426 let connection_killers = self.connection_killers.clone();
4427 let forbid_connections = self.forbid_connections.clone();
4428 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4429
4430 Arc::get_mut(&mut client)
4431 .unwrap()
4432 .override_authenticate(move |cx| {
4433 cx.spawn(|_| async move {
4434 let access_token = "the-token".to_string();
4435 Ok(Credentials {
4436 user_id: user_id.0 as u64,
4437 access_token,
4438 })
4439 })
4440 })
4441 .override_establish_connection(move |credentials, cx| {
4442 assert_eq!(credentials.user_id, user_id.0 as u64);
4443 assert_eq!(credentials.access_token, "the-token");
4444
4445 let server = server.clone();
4446 let connection_killers = connection_killers.clone();
4447 let forbid_connections = forbid_connections.clone();
4448 let client_name = client_name.clone();
4449 let connection_id_tx = connection_id_tx.clone();
4450 cx.spawn(move |cx| async move {
4451 if forbid_connections.load(SeqCst) {
4452 Err(EstablishConnectionError::other(anyhow!(
4453 "server is forbidding connections"
4454 )))
4455 } else {
4456 let (client_conn, server_conn, kill_conn) =
4457 Connection::in_memory(cx.background());
4458 connection_killers.lock().insert(user_id, kill_conn);
4459 cx.background()
4460 .spawn(server.handle_connection(
4461 server_conn,
4462 client_name,
4463 user_id,
4464 Some(connection_id_tx),
4465 cx.background(),
4466 ))
4467 .detach();
4468 Ok(client_conn)
4469 }
4470 })
4471 });
4472
4473 client
4474 .authenticate_and_connect(&cx.to_async())
4475 .await
4476 .unwrap();
4477
4478 Channel::init(&client);
4479 Project::init(&client);
4480
4481 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4482 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4483 let mut authed_user =
4484 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4485 while authed_user.next().await.unwrap().is_none() {}
4486
4487 TestClient {
4488 client,
4489 peer_id,
4490 user_store,
4491 project: Default::default(),
4492 buffers: Default::default(),
4493 }
4494 }
4495
4496 fn disconnect_client(&self, user_id: UserId) {
4497 self.connection_killers.lock().remove(&user_id);
4498 }
4499
4500 fn forbid_connections(&self) {
4501 self.forbid_connections.store(true, SeqCst);
4502 }
4503
4504 fn allow_connections(&self) {
4505 self.forbid_connections.store(false, SeqCst);
4506 }
4507
4508 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4509 let mut config = Config::default();
4510 config.session_secret = "a".repeat(32);
4511 config.database_url = test_db.url.clone();
4512 let github_client = github::AppClient::test();
4513 Arc::new(AppState {
4514 db: test_db.db().clone(),
4515 handlebars: Default::default(),
4516 auth_client: auth::build_client("", ""),
4517 repo_client: github::RepoClient::test(&github_client),
4518 github_client,
4519 config,
4520 })
4521 }
4522
4523 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4524 self.server.store.read()
4525 }
4526
4527 async fn condition<F>(&mut self, mut predicate: F)
4528 where
4529 F: FnMut(&Store) -> bool,
4530 {
4531 async_std::future::timeout(Duration::from_millis(500), async {
4532 while !(predicate)(&*self.server.store.read()) {
4533 self.foreground.start_waiting();
4534 self.notifications.next().await;
4535 self.foreground.finish_waiting();
4536 }
4537 })
4538 .await
4539 .expect("condition timed out");
4540 }
4541 }
4542
4543 impl Drop for TestServer {
4544 fn drop(&mut self) {
4545 self.peer.reset();
4546 }
4547 }
4548
4549 struct TestClient {
4550 client: Arc<Client>,
4551 pub peer_id: PeerId,
4552 pub user_store: ModelHandle<UserStore>,
4553 project: Option<ModelHandle<Project>>,
4554 buffers: HashSet<ModelHandle<language::Buffer>>,
4555 }
4556
4557 impl Deref for TestClient {
4558 type Target = Arc<Client>;
4559
4560 fn deref(&self) -> &Self::Target {
4561 &self.client
4562 }
4563 }
4564
4565 impl TestClient {
4566 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4567 UserId::from_proto(
4568 self.user_store
4569 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4570 )
4571 }
4572
4573 fn simulate_host(
4574 mut self,
4575 project: ModelHandle<Project>,
4576 mut language_server_config: LanguageServerConfig,
4577 operations: Rc<Cell<usize>>,
4578 max_operations: usize,
4579 rng: Arc<Mutex<StdRng>>,
4580 mut cx: TestAppContext,
4581 ) -> impl Future<Output = (Self, TestAppContext)> {
4582 let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4583
4584 // Set up a fake language server.
4585 language_server_config.set_fake_initializer({
4586 let rng = rng.clone();
4587 let files = files.clone();
4588 let project = project.downgrade();
4589 move |fake_server| {
4590 fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4591 Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4592 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4593 range: lsp::Range::new(
4594 lsp::Position::new(0, 0),
4595 lsp::Position::new(0, 0),
4596 ),
4597 new_text: "the-new-text".to_string(),
4598 })),
4599 ..Default::default()
4600 }]))
4601 });
4602
4603 fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4604 Some(vec![lsp::CodeActionOrCommand::CodeAction(
4605 lsp::CodeAction {
4606 title: "the-code-action".to_string(),
4607 ..Default::default()
4608 },
4609 )])
4610 });
4611
4612 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4613 |params, _| {
4614 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4615 params.position,
4616 params.position,
4617 )))
4618 },
4619 );
4620
4621 fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4622 let files = files.clone();
4623 let rng = rng.clone();
4624 move |_, _| {
4625 let files = files.lock();
4626 let mut rng = rng.lock();
4627 let count = rng.gen_range::<usize, _>(1..3);
4628 let files = (0..count)
4629 .map(|_| files.choose(&mut *rng).unwrap())
4630 .collect::<Vec<_>>();
4631 log::info!("LSP: Returning definitions in files {:?}", &files);
4632 Some(lsp::GotoDefinitionResponse::Array(
4633 files
4634 .into_iter()
4635 .map(|file| lsp::Location {
4636 uri: lsp::Url::from_file_path(file).unwrap(),
4637 range: Default::default(),
4638 })
4639 .collect(),
4640 ))
4641 }
4642 });
4643
4644 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4645 let rng = rng.clone();
4646 let project = project.clone();
4647 move |params, mut cx| {
4648 if let Some(project) = project.upgrade(&cx) {
4649 project.update(&mut cx, |project, cx| {
4650 let path = params
4651 .text_document_position_params
4652 .text_document
4653 .uri
4654 .to_file_path()
4655 .unwrap();
4656 let (worktree, relative_path) =
4657 project.find_local_worktree(&path, cx)?;
4658 let project_path =
4659 ProjectPath::from((worktree.read(cx).id(), relative_path));
4660 let buffer =
4661 project.get_open_buffer(&project_path, cx)?.read(cx);
4662
4663 let mut highlights = Vec::new();
4664 let highlight_count = rng.lock().gen_range(1..=5);
4665 let mut prev_end = 0;
4666 for _ in 0..highlight_count {
4667 let range =
4668 buffer.random_byte_range(prev_end, &mut *rng.lock());
4669 let start = buffer
4670 .offset_to_point_utf16(range.start)
4671 .to_lsp_position();
4672 let end = buffer
4673 .offset_to_point_utf16(range.end)
4674 .to_lsp_position();
4675 highlights.push(lsp::DocumentHighlight {
4676 range: lsp::Range::new(start, end),
4677 kind: Some(lsp::DocumentHighlightKind::READ),
4678 });
4679 prev_end = range.end;
4680 }
4681 Some(highlights)
4682 })
4683 } else {
4684 None
4685 }
4686 }
4687 });
4688 }
4689 });
4690
4691 project.update(&mut cx, |project, _| {
4692 project.languages().add(Arc::new(Language::new(
4693 LanguageConfig {
4694 name: "Rust".into(),
4695 path_suffixes: vec!["rs".to_string()],
4696 language_server: Some(language_server_config),
4697 ..Default::default()
4698 },
4699 None,
4700 )));
4701 });
4702
4703 async move {
4704 let fs = project.read_with(&cx, |project, _| project.fs().clone());
4705 while operations.get() < max_operations {
4706 operations.set(operations.get() + 1);
4707
4708 let distribution = rng.lock().gen_range::<usize, _>(0..100);
4709 match distribution {
4710 0..=20 if !files.lock().is_empty() => {
4711 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4712 let mut path = path.as_path();
4713 while let Some(parent_path) = path.parent() {
4714 path = parent_path;
4715 if rng.lock().gen() {
4716 break;
4717 }
4718 }
4719
4720 log::info!("Host: find/create local worktree {:?}", path);
4721 let find_or_create_worktree = project.update(&mut cx, |project, cx| {
4722 project.find_or_create_local_worktree(path, true, cx)
4723 });
4724 let find_or_create_worktree = async move {
4725 find_or_create_worktree.await.unwrap();
4726 };
4727 if rng.lock().gen() {
4728 cx.background().spawn(find_or_create_worktree).detach();
4729 } else {
4730 find_or_create_worktree.await;
4731 }
4732 }
4733 10..=80 if !files.lock().is_empty() => {
4734 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4735 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4736 let (worktree, path) = project
4737 .update(&mut cx, |project, cx| {
4738 project.find_or_create_local_worktree(
4739 file.clone(),
4740 true,
4741 cx,
4742 )
4743 })
4744 .await
4745 .unwrap();
4746 let project_path =
4747 worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4748 log::info!(
4749 "Host: opening path {:?}, worktree {}, relative_path {:?}",
4750 file,
4751 project_path.0,
4752 project_path.1
4753 );
4754 let buffer = project
4755 .update(&mut cx, |project, cx| {
4756 project.open_buffer(project_path, cx)
4757 })
4758 .await
4759 .unwrap();
4760 self.buffers.insert(buffer.clone());
4761 buffer
4762 } else {
4763 self.buffers
4764 .iter()
4765 .choose(&mut *rng.lock())
4766 .unwrap()
4767 .clone()
4768 };
4769
4770 if rng.lock().gen_bool(0.1) {
4771 cx.update(|cx| {
4772 log::info!(
4773 "Host: dropping buffer {:?}",
4774 buffer.read(cx).file().unwrap().full_path(cx)
4775 );
4776 self.buffers.remove(&buffer);
4777 drop(buffer);
4778 });
4779 } else {
4780 buffer.update(&mut cx, |buffer, cx| {
4781 log::info!(
4782 "Host: updating buffer {:?} ({})",
4783 buffer.file().unwrap().full_path(cx),
4784 buffer.remote_id()
4785 );
4786 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4787 });
4788 }
4789 }
4790 _ => loop {
4791 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4792 let mut path = PathBuf::new();
4793 path.push("/");
4794 for _ in 0..path_component_count {
4795 let letter = rng.lock().gen_range(b'a'..=b'z');
4796 path.push(std::str::from_utf8(&[letter]).unwrap());
4797 }
4798 path.set_extension("rs");
4799 let parent_path = path.parent().unwrap();
4800
4801 log::info!("Host: creating file {:?}", path,);
4802
4803 if fs.create_dir(&parent_path).await.is_ok()
4804 && fs.create_file(&path, Default::default()).await.is_ok()
4805 {
4806 files.lock().push(path);
4807 break;
4808 } else {
4809 log::info!("Host: cannot create file");
4810 }
4811 },
4812 }
4813
4814 cx.background().simulate_random_delay().await;
4815 }
4816
4817 log::info!("Host done");
4818
4819 self.project = Some(project);
4820 (self, cx)
4821 }
4822 }
4823
4824 pub async fn simulate_guest(
4825 mut self,
4826 guest_id: usize,
4827 project: ModelHandle<Project>,
4828 operations: Rc<Cell<usize>>,
4829 max_operations: usize,
4830 rng: Arc<Mutex<StdRng>>,
4831 mut cx: TestAppContext,
4832 ) -> (Self, TestAppContext) {
4833 while operations.get() < max_operations {
4834 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4835 let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4836 project
4837 .worktrees(&cx)
4838 .filter(|worktree| {
4839 let worktree = worktree.read(cx);
4840 worktree.is_visible()
4841 && worktree.entries(false).any(|e| e.is_file())
4842 })
4843 .choose(&mut *rng.lock())
4844 }) {
4845 worktree
4846 } else {
4847 cx.background().simulate_random_delay().await;
4848 continue;
4849 };
4850
4851 operations.set(operations.get() + 1);
4852 let (worktree_root_name, project_path) =
4853 worktree.read_with(&cx, |worktree, _| {
4854 let entry = worktree
4855 .entries(false)
4856 .filter(|e| e.is_file())
4857 .choose(&mut *rng.lock())
4858 .unwrap();
4859 (
4860 worktree.root_name().to_string(),
4861 (worktree.id(), entry.path.clone()),
4862 )
4863 });
4864 log::info!(
4865 "Guest {}: opening path {:?} in worktree {} ({})",
4866 guest_id,
4867 project_path.1,
4868 project_path.0,
4869 worktree_root_name,
4870 );
4871 let buffer = project
4872 .update(&mut cx, |project, cx| {
4873 project.open_buffer(project_path.clone(), cx)
4874 })
4875 .await
4876 .unwrap();
4877 log::info!(
4878 "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
4879 guest_id,
4880 project_path.1,
4881 project_path.0,
4882 worktree_root_name,
4883 buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4884 );
4885 self.buffers.insert(buffer.clone());
4886 buffer
4887 } else {
4888 operations.set(operations.get() + 1);
4889
4890 self.buffers
4891 .iter()
4892 .choose(&mut *rng.lock())
4893 .unwrap()
4894 .clone()
4895 };
4896
4897 let choice = rng.lock().gen_range(0..100);
4898 match choice {
4899 0..=9 => {
4900 cx.update(|cx| {
4901 log::info!(
4902 "Guest {}: dropping buffer {:?}",
4903 guest_id,
4904 buffer.read(cx).file().unwrap().full_path(cx)
4905 );
4906 self.buffers.remove(&buffer);
4907 drop(buffer);
4908 });
4909 }
4910 10..=19 => {
4911 let completions = project.update(&mut cx, |project, cx| {
4912 log::info!(
4913 "Guest {}: requesting completions for buffer {} ({:?})",
4914 guest_id,
4915 buffer.read(cx).remote_id(),
4916 buffer.read(cx).file().unwrap().full_path(cx)
4917 );
4918 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4919 project.completions(&buffer, offset, cx)
4920 });
4921 let completions = cx.background().spawn(async move {
4922 completions.await.expect("completions request failed");
4923 });
4924 if rng.lock().gen_bool(0.3) {
4925 log::info!("Guest {}: detaching completions request", guest_id);
4926 completions.detach();
4927 } else {
4928 completions.await;
4929 }
4930 }
4931 20..=29 => {
4932 let code_actions = project.update(&mut cx, |project, cx| {
4933 log::info!(
4934 "Guest {}: requesting code actions for buffer {} ({:?})",
4935 guest_id,
4936 buffer.read(cx).remote_id(),
4937 buffer.read(cx).file().unwrap().full_path(cx)
4938 );
4939 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4940 project.code_actions(&buffer, range, cx)
4941 });
4942 let code_actions = cx.background().spawn(async move {
4943 code_actions.await.expect("code actions request failed");
4944 });
4945 if rng.lock().gen_bool(0.3) {
4946 log::info!("Guest {}: detaching code actions request", guest_id);
4947 code_actions.detach();
4948 } else {
4949 code_actions.await;
4950 }
4951 }
4952 30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4953 let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4954 log::info!(
4955 "Guest {}: saving buffer {} ({:?})",
4956 guest_id,
4957 buffer.remote_id(),
4958 buffer.file().unwrap().full_path(cx)
4959 );
4960 (buffer.version(), buffer.save(cx))
4961 });
4962 let save = cx.background().spawn(async move {
4963 let (saved_version, _) = save.await.expect("save request failed");
4964 assert!(saved_version.observed_all(&requested_version));
4965 });
4966 if rng.lock().gen_bool(0.3) {
4967 log::info!("Guest {}: detaching save request", guest_id);
4968 save.detach();
4969 } else {
4970 save.await;
4971 }
4972 }
4973 40..=44 => {
4974 let prepare_rename = project.update(&mut cx, |project, cx| {
4975 log::info!(
4976 "Guest {}: preparing rename for buffer {} ({:?})",
4977 guest_id,
4978 buffer.read(cx).remote_id(),
4979 buffer.read(cx).file().unwrap().full_path(cx)
4980 );
4981 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4982 project.prepare_rename(buffer, offset, cx)
4983 });
4984 let prepare_rename = cx.background().spawn(async move {
4985 prepare_rename.await.expect("prepare rename request failed");
4986 });
4987 if rng.lock().gen_bool(0.3) {
4988 log::info!("Guest {}: detaching prepare rename request", guest_id);
4989 prepare_rename.detach();
4990 } else {
4991 prepare_rename.await;
4992 }
4993 }
4994 45..=49 => {
4995 let definitions = project.update(&mut cx, |project, cx| {
4996 log::info!(
4997 "Guest {}: requesting definitions for buffer {} ({:?})",
4998 guest_id,
4999 buffer.read(cx).remote_id(),
5000 buffer.read(cx).file().unwrap().full_path(cx)
5001 );
5002 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5003 project.definition(&buffer, offset, cx)
5004 });
5005 let definitions = cx.background().spawn(async move {
5006 definitions.await.expect("definitions request failed")
5007 });
5008 if rng.lock().gen_bool(0.3) {
5009 log::info!("Guest {}: detaching definitions request", guest_id);
5010 definitions.detach();
5011 } else {
5012 self.buffers
5013 .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5014 }
5015 }
5016 50..=54 => {
5017 let highlights = project.update(&mut cx, |project, cx| {
5018 log::info!(
5019 "Guest {}: requesting highlights for buffer {} ({:?})",
5020 guest_id,
5021 buffer.read(cx).remote_id(),
5022 buffer.read(cx).file().unwrap().full_path(cx)
5023 );
5024 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5025 project.document_highlights(&buffer, offset, cx)
5026 });
5027 let highlights = cx.background().spawn(async move {
5028 highlights.await.expect("highlights request failed");
5029 });
5030 if rng.lock().gen_bool(0.3) {
5031 log::info!("Guest {}: detaching highlights request", guest_id);
5032 highlights.detach();
5033 } else {
5034 highlights.await;
5035 }
5036 }
5037 55..=59 => {
5038 let search = project.update(&mut cx, |project, cx| {
5039 let query = rng.lock().gen_range('a'..='z');
5040 log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5041 project.search(SearchQuery::text(query, false, false), cx)
5042 });
5043 let search = cx
5044 .background()
5045 .spawn(async move { search.await.expect("search request failed") });
5046 if rng.lock().gen_bool(0.3) {
5047 log::info!("Guest {}: detaching search request", guest_id);
5048 search.detach();
5049 } else {
5050 self.buffers.extend(search.await.into_keys());
5051 }
5052 }
5053 _ => {
5054 buffer.update(&mut cx, |buffer, cx| {
5055 log::info!(
5056 "Guest {}: updating buffer {} ({:?})",
5057 guest_id,
5058 buffer.remote_id(),
5059 buffer.file().unwrap().full_path(cx)
5060 );
5061 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5062 });
5063 }
5064 }
5065 cx.background().simulate_random_delay().await;
5066 }
5067
5068 log::info!("Guest {} done", guest_id);
5069
5070 self.project = Some(project);
5071 (self, cx)
5072 }
5073 }
5074
5075 impl Drop for TestClient {
5076 fn drop(&mut self) {
5077 self.client.tear_down();
5078 }
5079 }
5080
5081 impl Executor for Arc<gpui::executor::Background> {
5082 type Timer = gpui::executor::Timer;
5083
5084 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5085 self.spawn(future).detach();
5086 }
5087
5088 fn timer(&self, duration: Duration) -> Self::Timer {
5089 self.as_ref().timer(duration)
5090 }
5091 }
5092
5093 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5094 channel
5095 .messages()
5096 .cursor::<()>()
5097 .map(|m| {
5098 (
5099 m.sender.github_login.clone(),
5100 m.body.clone(),
5101 m.is_pending(),
5102 )
5103 })
5104 .collect()
5105 }
5106
5107 struct EmptyView;
5108
5109 impl gpui::Entity for EmptyView {
5110 type Event = ();
5111 }
5112
5113 impl gpui::View for EmptyView {
5114 fn ui_name() -> &'static str {
5115 "empty view"
5116 }
5117
5118 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5119 gpui::Element::boxed(gpui::elements::Empty)
5120 }
5121 }
5122}