1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_io::Timer;
10use async_std::task;
11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
12use collections::{HashMap, HashSet};
13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{
21 any::TypeId,
22 future::Future,
23 sync::Arc,
24 time::{Duration, Instant},
25};
26use store::{Store, Worktree};
27use surf::StatusCode;
28use tide::log;
29use tide::{
30 http::headers::{HeaderName, CONNECTION, UPGRADE},
31 Request, Response,
32};
33use time::OffsetDateTime;
34
35type MessageHandler = Box<
36 dyn Send
37 + Sync
38 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
39>;
40
41pub struct Server {
42 peer: Arc<Peer>,
43 store: RwLock<Store>,
44 app_state: Arc<AppState>,
45 handlers: HashMap<TypeId, MessageHandler>,
46 notifications: Option<mpsc::UnboundedSender<()>>,
47}
48
49pub trait Executor: Send + Clone {
50 type Timer: Send + Future;
51 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
52 fn timer(&self, duration: Duration) -> Self::Timer;
53}
54
55#[derive(Clone)]
56pub struct RealExecutor;
57
58const MESSAGE_COUNT_PER_PAGE: usize = 100;
59const MAX_MESSAGE_LEN: usize = 1024;
60
61impl Server {
62 pub fn new(
63 app_state: Arc<AppState>,
64 peer: Arc<Peer>,
65 notifications: Option<mpsc::UnboundedSender<()>>,
66 ) -> Arc<Self> {
67 let mut server = Self {
68 peer,
69 app_state,
70 store: Default::default(),
71 handlers: Default::default(),
72 notifications,
73 };
74
75 server
76 .add_request_handler(Server::ping)
77 .add_request_handler(Server::register_project)
78 .add_message_handler(Server::unregister_project)
79 .add_request_handler(Server::share_project)
80 .add_message_handler(Server::unshare_project)
81 .add_request_handler(Server::join_project)
82 .add_message_handler(Server::leave_project)
83 .add_request_handler(Server::register_worktree)
84 .add_message_handler(Server::unregister_worktree)
85 .add_request_handler(Server::update_worktree)
86 .add_message_handler(Server::update_diagnostic_summary)
87 .add_message_handler(Server::disk_based_diagnostics_updating)
88 .add_message_handler(Server::disk_based_diagnostics_updated)
89 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
90 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
91 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
92 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
93 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
94 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
95 .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
96 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
97 .add_request_handler(
98 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
99 )
100 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
101 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
102 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
103 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
104 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
105 .add_message_handler(Server::close_buffer)
106 .add_request_handler(Server::update_buffer)
107 .add_message_handler(Server::update_buffer_file)
108 .add_message_handler(Server::buffer_reloaded)
109 .add_message_handler(Server::buffer_saved)
110 .add_request_handler(Server::save_buffer)
111 .add_request_handler(Server::get_channels)
112 .add_request_handler(Server::get_users)
113 .add_request_handler(Server::join_channel)
114 .add_message_handler(Server::leave_channel)
115 .add_request_handler(Server::send_channel_message)
116 .add_request_handler(Server::get_channel_messages);
117
118 Arc::new(server)
119 }
120
121 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
122 where
123 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
124 Fut: 'static + Send + Future<Output = tide::Result<()>>,
125 M: EnvelopedMessage,
126 {
127 let prev_handler = self.handlers.insert(
128 TypeId::of::<M>(),
129 Box::new(move |server, envelope| {
130 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
131 (handler)(server, *envelope).boxed()
132 }),
133 );
134 if prev_handler.is_some() {
135 panic!("registered a handler for the same message twice");
136 }
137 self
138 }
139
140 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
141 where
142 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
143 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
144 M: RequestMessage,
145 {
146 self.add_message_handler(move |server, envelope| {
147 let receipt = envelope.receipt();
148 let response = (handler)(server.clone(), envelope);
149 async move {
150 match response.await {
151 Ok(response) => {
152 server.peer.respond(receipt, response)?;
153 Ok(())
154 }
155 Err(error) => {
156 server.peer.respond_with_error(
157 receipt,
158 proto::Error {
159 message: error.to_string(),
160 },
161 )?;
162 Err(error)
163 }
164 }
165 }
166 })
167 }
168
169 pub fn handle_connection<E: Executor>(
170 self: &Arc<Self>,
171 connection: Connection,
172 addr: String,
173 user_id: UserId,
174 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
175 executor: E,
176 ) -> impl Future<Output = ()> {
177 let mut this = self.clone();
178 async move {
179 let (connection_id, handle_io, mut incoming_rx) = this
180 .peer
181 .add_connection(connection, {
182 let executor = executor.clone();
183 move |duration| {
184 let timer = executor.timer(duration);
185 async move {
186 timer.await;
187 }
188 }
189 })
190 .await;
191
192 if let Some(send_connection_id) = send_connection_id.as_mut() {
193 let _ = send_connection_id.send(connection_id).await;
194 }
195
196 this.state_mut().add_connection(connection_id, user_id);
197 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
198 log::error!("error updating contacts for {:?}: {}", user_id, err);
199 }
200
201 let handle_io = handle_io.fuse();
202 futures::pin_mut!(handle_io);
203 loop {
204 let next_message = incoming_rx.next().fuse();
205 futures::pin_mut!(next_message);
206 futures::select_biased! {
207 result = handle_io => {
208 if let Err(err) = result {
209 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
210 }
211 break;
212 }
213 message = next_message => {
214 if let Some(message) = message {
215 let start_time = Instant::now();
216 let type_name = message.payload_type_name();
217 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
218 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
219 let notifications = this.notifications.clone();
220 let is_background = message.is_background();
221 let handle_message = (handler)(this.clone(), message);
222 let handle_message = async move {
223 if let Err(err) = handle_message.await {
224 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
225 } else {
226 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
227 }
228 if let Some(mut notifications) = notifications {
229 let _ = notifications.send(()).await;
230 }
231 };
232 if is_background {
233 executor.spawn_detached(handle_message);
234 } else {
235 handle_message.await;
236 }
237 } else {
238 log::warn!("unhandled message: {}", type_name);
239 }
240 } else {
241 log::info!("rpc connection closed {:?}", addr);
242 break;
243 }
244 }
245 }
246 }
247
248 if let Err(err) = this.sign_out(connection_id).await {
249 log::error!("error signing out connection {:?} - {:?}", addr, err);
250 }
251 }
252 }
253
254 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
255 self.peer.disconnect(connection_id);
256 let removed_connection = self.state_mut().remove_connection(connection_id)?;
257
258 for (project_id, project) in removed_connection.hosted_projects {
259 if let Some(share) = project.share {
260 broadcast(
261 connection_id,
262 share.guests.keys().copied().collect(),
263 |conn_id| {
264 self.peer
265 .send(conn_id, proto::UnshareProject { project_id })
266 },
267 )?;
268 }
269 }
270
271 for (project_id, peer_ids) in removed_connection.guest_project_ids {
272 broadcast(connection_id, peer_ids, |conn_id| {
273 self.peer.send(
274 conn_id,
275 proto::RemoveProjectCollaborator {
276 project_id,
277 peer_id: connection_id.0,
278 },
279 )
280 })?;
281 }
282
283 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
284 Ok(())
285 }
286
287 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
288 Ok(proto::Ack {})
289 }
290
291 async fn register_project(
292 mut self: Arc<Server>,
293 request: TypedEnvelope<proto::RegisterProject>,
294 ) -> tide::Result<proto::RegisterProjectResponse> {
295 let project_id = {
296 let mut state = self.state_mut();
297 let user_id = state.user_id_for_connection(request.sender_id)?;
298 state.register_project(request.sender_id, user_id)
299 };
300 Ok(proto::RegisterProjectResponse { project_id })
301 }
302
303 async fn unregister_project(
304 mut self: Arc<Server>,
305 request: TypedEnvelope<proto::UnregisterProject>,
306 ) -> tide::Result<()> {
307 let project = self
308 .state_mut()
309 .unregister_project(request.payload.project_id, request.sender_id)?;
310 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
311 Ok(())
312 }
313
314 async fn share_project(
315 mut self: Arc<Server>,
316 request: TypedEnvelope<proto::ShareProject>,
317 ) -> tide::Result<proto::Ack> {
318 self.state_mut()
319 .share_project(request.payload.project_id, request.sender_id);
320 Ok(proto::Ack {})
321 }
322
323 async fn unshare_project(
324 mut self: Arc<Server>,
325 request: TypedEnvelope<proto::UnshareProject>,
326 ) -> tide::Result<()> {
327 let project_id = request.payload.project_id;
328 let project = self
329 .state_mut()
330 .unshare_project(project_id, request.sender_id)?;
331
332 broadcast(request.sender_id, project.connection_ids, |conn_id| {
333 self.peer
334 .send(conn_id, proto::UnshareProject { project_id })
335 })?;
336 self.update_contacts_for_users(&project.authorized_user_ids)?;
337 Ok(())
338 }
339
340 async fn join_project(
341 mut self: Arc<Server>,
342 request: TypedEnvelope<proto::JoinProject>,
343 ) -> tide::Result<proto::JoinProjectResponse> {
344 let project_id = request.payload.project_id;
345
346 let user_id = self.state().user_id_for_connection(request.sender_id)?;
347 let (response, connection_ids, contact_user_ids) = self
348 .state_mut()
349 .join_project(request.sender_id, user_id, project_id)
350 .and_then(|joined| {
351 let share = joined.project.share()?;
352 let peer_count = share.guests.len();
353 let mut collaborators = Vec::with_capacity(peer_count);
354 collaborators.push(proto::Collaborator {
355 peer_id: joined.project.host_connection_id.0,
356 replica_id: 0,
357 user_id: joined.project.host_user_id.to_proto(),
358 });
359 let worktrees = share
360 .worktrees
361 .iter()
362 .filter_map(|(id, shared_worktree)| {
363 let worktree = joined.project.worktrees.get(&id)?;
364 Some(proto::Worktree {
365 id: *id,
366 root_name: worktree.root_name.clone(),
367 entries: shared_worktree.entries.values().cloned().collect(),
368 diagnostic_summaries: shared_worktree
369 .diagnostic_summaries
370 .values()
371 .cloned()
372 .collect(),
373 visible: worktree.visible,
374 })
375 })
376 .collect();
377 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
378 if *peer_conn_id != request.sender_id {
379 collaborators.push(proto::Collaborator {
380 peer_id: peer_conn_id.0,
381 replica_id: *peer_replica_id as u32,
382 user_id: peer_user_id.to_proto(),
383 });
384 }
385 }
386 let response = proto::JoinProjectResponse {
387 worktrees,
388 replica_id: joined.replica_id as u32,
389 collaborators,
390 };
391 let connection_ids = joined.project.connection_ids();
392 let contact_user_ids = joined.project.authorized_user_ids();
393 Ok((response, connection_ids, contact_user_ids))
394 })?;
395
396 broadcast(request.sender_id, connection_ids, |conn_id| {
397 self.peer.send(
398 conn_id,
399 proto::AddProjectCollaborator {
400 project_id,
401 collaborator: Some(proto::Collaborator {
402 peer_id: request.sender_id.0,
403 replica_id: response.replica_id,
404 user_id: user_id.to_proto(),
405 }),
406 },
407 )
408 })?;
409 self.update_contacts_for_users(&contact_user_ids)?;
410 Ok(response)
411 }
412
413 async fn leave_project(
414 mut self: Arc<Server>,
415 request: TypedEnvelope<proto::LeaveProject>,
416 ) -> tide::Result<()> {
417 let sender_id = request.sender_id;
418 let project_id = request.payload.project_id;
419 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
420
421 broadcast(sender_id, worktree.connection_ids, |conn_id| {
422 self.peer.send(
423 conn_id,
424 proto::RemoveProjectCollaborator {
425 project_id,
426 peer_id: sender_id.0,
427 },
428 )
429 })?;
430 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
431
432 Ok(())
433 }
434
435 async fn register_worktree(
436 mut self: Arc<Server>,
437 request: TypedEnvelope<proto::RegisterWorktree>,
438 ) -> tide::Result<proto::Ack> {
439 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
440
441 let mut contact_user_ids = HashSet::default();
442 contact_user_ids.insert(host_user_id);
443 for github_login in &request.payload.authorized_logins {
444 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
445 contact_user_ids.insert(contact_user_id);
446 }
447
448 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
449 let guest_connection_ids;
450 {
451 let mut state = self.state_mut();
452 guest_connection_ids = state
453 .read_project(request.payload.project_id, request.sender_id)?
454 .guest_connection_ids();
455 state.register_worktree(
456 request.payload.project_id,
457 request.payload.worktree_id,
458 request.sender_id,
459 Worktree {
460 authorized_user_ids: contact_user_ids.clone(),
461 root_name: request.payload.root_name.clone(),
462 visible: request.payload.visible,
463 },
464 )?;
465 }
466 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
467 self.peer
468 .forward_send(request.sender_id, connection_id, request.payload.clone())
469 })?;
470 self.update_contacts_for_users(&contact_user_ids)?;
471 Ok(proto::Ack {})
472 }
473
474 async fn unregister_worktree(
475 mut self: Arc<Server>,
476 request: TypedEnvelope<proto::UnregisterWorktree>,
477 ) -> tide::Result<()> {
478 let project_id = request.payload.project_id;
479 let worktree_id = request.payload.worktree_id;
480 let (worktree, guest_connection_ids) =
481 self.state_mut()
482 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
483 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
484 self.peer.send(
485 conn_id,
486 proto::UnregisterWorktree {
487 project_id,
488 worktree_id,
489 },
490 )
491 })?;
492 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
493 Ok(())
494 }
495
496 async fn update_worktree(
497 mut self: Arc<Server>,
498 request: TypedEnvelope<proto::UpdateWorktree>,
499 ) -> tide::Result<proto::Ack> {
500 let connection_ids = self.state_mut().update_worktree(
501 request.sender_id,
502 request.payload.project_id,
503 request.payload.worktree_id,
504 &request.payload.removed_entries,
505 &request.payload.updated_entries,
506 )?;
507
508 broadcast(request.sender_id, connection_ids, |connection_id| {
509 self.peer
510 .forward_send(request.sender_id, connection_id, request.payload.clone())
511 })?;
512
513 Ok(proto::Ack {})
514 }
515
516 async fn update_diagnostic_summary(
517 mut self: Arc<Server>,
518 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
519 ) -> tide::Result<()> {
520 let summary = request
521 .payload
522 .summary
523 .clone()
524 .ok_or_else(|| anyhow!("invalid summary"))?;
525 let receiver_ids = self.state_mut().update_diagnostic_summary(
526 request.payload.project_id,
527 request.payload.worktree_id,
528 request.sender_id,
529 summary,
530 )?;
531
532 broadcast(request.sender_id, receiver_ids, |connection_id| {
533 self.peer
534 .forward_send(request.sender_id, connection_id, request.payload.clone())
535 })?;
536 Ok(())
537 }
538
539 async fn disk_based_diagnostics_updating(
540 self: Arc<Server>,
541 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
542 ) -> tide::Result<()> {
543 let receiver_ids = self
544 .state()
545 .project_connection_ids(request.payload.project_id, request.sender_id)?;
546 broadcast(request.sender_id, receiver_ids, |connection_id| {
547 self.peer
548 .forward_send(request.sender_id, connection_id, request.payload.clone())
549 })?;
550 Ok(())
551 }
552
553 async fn disk_based_diagnostics_updated(
554 self: Arc<Server>,
555 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
556 ) -> tide::Result<()> {
557 let receiver_ids = self
558 .state()
559 .project_connection_ids(request.payload.project_id, request.sender_id)?;
560 broadcast(request.sender_id, receiver_ids, |connection_id| {
561 self.peer
562 .forward_send(request.sender_id, connection_id, request.payload.clone())
563 })?;
564 Ok(())
565 }
566
567 async fn forward_project_request<T>(
568 self: Arc<Server>,
569 request: TypedEnvelope<T>,
570 ) -> tide::Result<T::Response>
571 where
572 T: EntityMessage + RequestMessage,
573 {
574 let host_connection_id = self
575 .state()
576 .read_project(request.payload.remote_entity_id(), request.sender_id)?
577 .host_connection_id;
578 Ok(self
579 .peer
580 .forward_request(request.sender_id, host_connection_id, request.payload)
581 .await?)
582 }
583
584 async fn close_buffer(
585 self: Arc<Server>,
586 request: TypedEnvelope<proto::CloseBuffer>,
587 ) -> tide::Result<()> {
588 let host_connection_id = self
589 .state()
590 .read_project(request.payload.project_id, request.sender_id)?
591 .host_connection_id;
592 self.peer
593 .forward_send(request.sender_id, host_connection_id, request.payload)?;
594 Ok(())
595 }
596
597 async fn save_buffer(
598 self: Arc<Server>,
599 request: TypedEnvelope<proto::SaveBuffer>,
600 ) -> tide::Result<proto::BufferSaved> {
601 let host;
602 let mut guests;
603 {
604 let state = self.state();
605 let project = state.read_project(request.payload.project_id, request.sender_id)?;
606 host = project.host_connection_id;
607 guests = project.guest_connection_ids()
608 }
609
610 let response = self
611 .peer
612 .forward_request(request.sender_id, host, request.payload.clone())
613 .await?;
614
615 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
616 broadcast(host, guests, |conn_id| {
617 self.peer.forward_send(host, conn_id, response.clone())
618 })?;
619
620 Ok(response)
621 }
622
623 async fn update_buffer(
624 self: Arc<Server>,
625 request: TypedEnvelope<proto::UpdateBuffer>,
626 ) -> tide::Result<proto::Ack> {
627 let receiver_ids = self
628 .state()
629 .project_connection_ids(request.payload.project_id, request.sender_id)?;
630 broadcast(request.sender_id, receiver_ids, |connection_id| {
631 self.peer
632 .forward_send(request.sender_id, connection_id, request.payload.clone())
633 })?;
634 Ok(proto::Ack {})
635 }
636
637 async fn update_buffer_file(
638 self: Arc<Server>,
639 request: TypedEnvelope<proto::UpdateBufferFile>,
640 ) -> tide::Result<()> {
641 let receiver_ids = self
642 .state()
643 .project_connection_ids(request.payload.project_id, request.sender_id)?;
644 broadcast(request.sender_id, receiver_ids, |connection_id| {
645 self.peer
646 .forward_send(request.sender_id, connection_id, request.payload.clone())
647 })?;
648 Ok(())
649 }
650
651 async fn buffer_reloaded(
652 self: Arc<Server>,
653 request: TypedEnvelope<proto::BufferReloaded>,
654 ) -> tide::Result<()> {
655 let receiver_ids = self
656 .state()
657 .project_connection_ids(request.payload.project_id, request.sender_id)?;
658 broadcast(request.sender_id, receiver_ids, |connection_id| {
659 self.peer
660 .forward_send(request.sender_id, connection_id, request.payload.clone())
661 })?;
662 Ok(())
663 }
664
665 async fn buffer_saved(
666 self: Arc<Server>,
667 request: TypedEnvelope<proto::BufferSaved>,
668 ) -> tide::Result<()> {
669 let receiver_ids = self
670 .state()
671 .project_connection_ids(request.payload.project_id, request.sender_id)?;
672 broadcast(request.sender_id, receiver_ids, |connection_id| {
673 self.peer
674 .forward_send(request.sender_id, connection_id, request.payload.clone())
675 })?;
676 Ok(())
677 }
678
679 async fn get_channels(
680 self: Arc<Server>,
681 request: TypedEnvelope<proto::GetChannels>,
682 ) -> tide::Result<proto::GetChannelsResponse> {
683 let user_id = self.state().user_id_for_connection(request.sender_id)?;
684 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
685 Ok(proto::GetChannelsResponse {
686 channels: channels
687 .into_iter()
688 .map(|chan| proto::Channel {
689 id: chan.id.to_proto(),
690 name: chan.name,
691 })
692 .collect(),
693 })
694 }
695
696 async fn get_users(
697 self: Arc<Server>,
698 request: TypedEnvelope<proto::GetUsers>,
699 ) -> tide::Result<proto::GetUsersResponse> {
700 let user_ids = request
701 .payload
702 .user_ids
703 .into_iter()
704 .map(UserId::from_proto)
705 .collect();
706 let users = self
707 .app_state
708 .db
709 .get_users_by_ids(user_ids)
710 .await?
711 .into_iter()
712 .map(|user| proto::User {
713 id: user.id.to_proto(),
714 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
715 github_login: user.github_login,
716 })
717 .collect();
718 Ok(proto::GetUsersResponse { users })
719 }
720
721 fn update_contacts_for_users<'a>(
722 self: &Arc<Server>,
723 user_ids: impl IntoIterator<Item = &'a UserId>,
724 ) -> anyhow::Result<()> {
725 let mut result = Ok(());
726 let state = self.state();
727 for user_id in user_ids {
728 let contacts = state.contacts_for_user(*user_id);
729 for connection_id in state.connection_ids_for_user(*user_id) {
730 if let Err(error) = self.peer.send(
731 connection_id,
732 proto::UpdateContacts {
733 contacts: contacts.clone(),
734 },
735 ) {
736 result = Err(error);
737 }
738 }
739 }
740 result
741 }
742
743 async fn join_channel(
744 mut self: Arc<Self>,
745 request: TypedEnvelope<proto::JoinChannel>,
746 ) -> tide::Result<proto::JoinChannelResponse> {
747 let user_id = self.state().user_id_for_connection(request.sender_id)?;
748 let channel_id = ChannelId::from_proto(request.payload.channel_id);
749 if !self
750 .app_state
751 .db
752 .can_user_access_channel(user_id, channel_id)
753 .await?
754 {
755 Err(anyhow!("access denied"))?;
756 }
757
758 self.state_mut().join_channel(request.sender_id, channel_id);
759 let messages = self
760 .app_state
761 .db
762 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
763 .await?
764 .into_iter()
765 .map(|msg| proto::ChannelMessage {
766 id: msg.id.to_proto(),
767 body: msg.body,
768 timestamp: msg.sent_at.unix_timestamp() as u64,
769 sender_id: msg.sender_id.to_proto(),
770 nonce: Some(msg.nonce.as_u128().into()),
771 })
772 .collect::<Vec<_>>();
773 Ok(proto::JoinChannelResponse {
774 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
775 messages,
776 })
777 }
778
779 async fn leave_channel(
780 mut self: Arc<Self>,
781 request: TypedEnvelope<proto::LeaveChannel>,
782 ) -> tide::Result<()> {
783 let user_id = self.state().user_id_for_connection(request.sender_id)?;
784 let channel_id = ChannelId::from_proto(request.payload.channel_id);
785 if !self
786 .app_state
787 .db
788 .can_user_access_channel(user_id, channel_id)
789 .await?
790 {
791 Err(anyhow!("access denied"))?;
792 }
793
794 self.state_mut()
795 .leave_channel(request.sender_id, channel_id);
796
797 Ok(())
798 }
799
800 async fn send_channel_message(
801 self: Arc<Self>,
802 request: TypedEnvelope<proto::SendChannelMessage>,
803 ) -> tide::Result<proto::SendChannelMessageResponse> {
804 let channel_id = ChannelId::from_proto(request.payload.channel_id);
805 let user_id;
806 let connection_ids;
807 {
808 let state = self.state();
809 user_id = state.user_id_for_connection(request.sender_id)?;
810 connection_ids = state.channel_connection_ids(channel_id)?;
811 }
812
813 // Validate the message body.
814 let body = request.payload.body.trim().to_string();
815 if body.len() > MAX_MESSAGE_LEN {
816 return Err(anyhow!("message is too long"))?;
817 }
818 if body.is_empty() {
819 return Err(anyhow!("message can't be blank"))?;
820 }
821
822 let timestamp = OffsetDateTime::now_utc();
823 let nonce = request
824 .payload
825 .nonce
826 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
827
828 let message_id = self
829 .app_state
830 .db
831 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
832 .await?
833 .to_proto();
834 let message = proto::ChannelMessage {
835 sender_id: user_id.to_proto(),
836 id: message_id,
837 body,
838 timestamp: timestamp.unix_timestamp() as u64,
839 nonce: Some(nonce),
840 };
841 broadcast(request.sender_id, connection_ids, |conn_id| {
842 self.peer.send(
843 conn_id,
844 proto::ChannelMessageSent {
845 channel_id: channel_id.to_proto(),
846 message: Some(message.clone()),
847 },
848 )
849 })?;
850 Ok(proto::SendChannelMessageResponse {
851 message: Some(message),
852 })
853 }
854
855 async fn get_channel_messages(
856 self: Arc<Self>,
857 request: TypedEnvelope<proto::GetChannelMessages>,
858 ) -> tide::Result<proto::GetChannelMessagesResponse> {
859 let user_id = self.state().user_id_for_connection(request.sender_id)?;
860 let channel_id = ChannelId::from_proto(request.payload.channel_id);
861 if !self
862 .app_state
863 .db
864 .can_user_access_channel(user_id, channel_id)
865 .await?
866 {
867 Err(anyhow!("access denied"))?;
868 }
869
870 let messages = self
871 .app_state
872 .db
873 .get_channel_messages(
874 channel_id,
875 MESSAGE_COUNT_PER_PAGE,
876 Some(MessageId::from_proto(request.payload.before_message_id)),
877 )
878 .await?
879 .into_iter()
880 .map(|msg| proto::ChannelMessage {
881 id: msg.id.to_proto(),
882 body: msg.body,
883 timestamp: msg.sent_at.unix_timestamp() as u64,
884 sender_id: msg.sender_id.to_proto(),
885 nonce: Some(msg.nonce.as_u128().into()),
886 })
887 .collect::<Vec<_>>();
888
889 Ok(proto::GetChannelMessagesResponse {
890 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
891 messages,
892 })
893 }
894
895 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
896 self.store.read()
897 }
898
899 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
900 self.store.write()
901 }
902}
903
904impl Executor for RealExecutor {
905 type Timer = Timer;
906
907 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
908 task::spawn(future);
909 }
910
911 fn timer(&self, duration: Duration) -> Self::Timer {
912 Timer::after(duration)
913 }
914}
915
916fn broadcast<F>(
917 sender_id: ConnectionId,
918 receiver_ids: Vec<ConnectionId>,
919 mut f: F,
920) -> anyhow::Result<()>
921where
922 F: FnMut(ConnectionId) -> anyhow::Result<()>,
923{
924 let mut result = Ok(());
925 for receiver_id in receiver_ids {
926 if receiver_id != sender_id {
927 if let Err(error) = f(receiver_id) {
928 if result.is_ok() {
929 result = Err(error);
930 }
931 }
932 }
933 }
934 result
935}
936
937pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
938 let server = Server::new(app.state().clone(), rpc.clone(), None);
939 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
940 let server = server.clone();
941 async move {
942 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
943
944 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
945 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
946 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
947 let client_protocol_version: Option<u32> = request
948 .header("X-Zed-Protocol-Version")
949 .and_then(|v| v.as_str().parse().ok());
950
951 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
952 return Ok(Response::new(StatusCode::UpgradeRequired));
953 }
954
955 let header = match request.header("Sec-Websocket-Key") {
956 Some(h) => h.as_str(),
957 None => return Err(anyhow!("expected sec-websocket-key"))?,
958 };
959
960 let user_id = process_auth_header(&request).await?;
961
962 let mut response = Response::new(StatusCode::SwitchingProtocols);
963 response.insert_header(UPGRADE, "websocket");
964 response.insert_header(CONNECTION, "Upgrade");
965 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
966 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
967 response.insert_header("Sec-Websocket-Version", "13");
968
969 let http_res: &mut tide::http::Response = response.as_mut();
970 let upgrade_receiver = http_res.recv_upgrade().await;
971 let addr = request.remote().unwrap_or("unknown").to_string();
972 task::spawn(async move {
973 if let Some(stream) = upgrade_receiver.await {
974 server
975 .handle_connection(
976 Connection::new(
977 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
978 ),
979 addr,
980 user_id,
981 None,
982 RealExecutor,
983 )
984 .await;
985 }
986 });
987
988 Ok(response)
989 }
990 });
991}
992
993fn header_contains_ignore_case<T>(
994 request: &tide::Request<T>,
995 header_name: HeaderName,
996 value: &str,
997) -> bool {
998 request
999 .header(header_name)
1000 .map(|h| {
1001 h.as_str()
1002 .split(',')
1003 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1004 })
1005 .unwrap_or(false)
1006}
1007
1008#[cfg(test)]
1009mod tests {
1010 use super::*;
1011 use crate::{
1012 auth,
1013 db::{tests::TestDb, UserId},
1014 github, AppState, Config,
1015 };
1016 use ::rpc::Peer;
1017 use client::{
1018 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1019 EstablishConnectionError, UserStore,
1020 };
1021 use collections::BTreeMap;
1022 use editor::{
1023 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1024 Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1025 };
1026 use gpui::{executor, ModelHandle, TestAppContext};
1027 use language::{
1028 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1029 LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1030 };
1031 use lsp;
1032 use parking_lot::Mutex;
1033 use postage::{barrier, watch};
1034 use project::{
1035 fs::{FakeFs, Fs as _},
1036 search::SearchQuery,
1037 worktree::WorktreeHandle,
1038 DiagnosticSummary, Project, ProjectPath,
1039 };
1040 use rand::prelude::*;
1041 use rpc::PeerId;
1042 use serde_json::json;
1043 use sqlx::types::time::OffsetDateTime;
1044 use std::{
1045 cell::Cell,
1046 env,
1047 ops::Deref,
1048 path::{Path, PathBuf},
1049 rc::Rc,
1050 sync::{
1051 atomic::{AtomicBool, Ordering::SeqCst},
1052 Arc,
1053 },
1054 time::Duration,
1055 };
1056 use workspace::{Settings, Workspace, WorkspaceParams};
1057
1058 #[cfg(test)]
1059 #[ctor::ctor]
1060 fn init_logger() {
1061 if std::env::var("RUST_LOG").is_ok() {
1062 env_logger::init();
1063 }
1064 }
1065
1066 #[gpui::test(iterations = 10)]
1067 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1068 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1069 let lang_registry = Arc::new(LanguageRegistry::test());
1070 let fs = FakeFs::new(cx_a.background());
1071 cx_a.foreground().forbid_parking();
1072
1073 // Connect to a server as 2 clients.
1074 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1075 let client_a = server.create_client(cx_a, "user_a").await;
1076 let client_b = server.create_client(cx_b, "user_b").await;
1077
1078 // Share a project as client A
1079 fs.insert_tree(
1080 "/a",
1081 json!({
1082 ".zed.toml": r#"collaborators = ["user_b"]"#,
1083 "a.txt": "a-contents",
1084 "b.txt": "b-contents",
1085 }),
1086 )
1087 .await;
1088 let project_a = cx_a.update(|cx| {
1089 Project::local(
1090 client_a.clone(),
1091 client_a.user_store.clone(),
1092 lang_registry.clone(),
1093 fs.clone(),
1094 cx,
1095 )
1096 });
1097 let (worktree_a, _) = project_a
1098 .update(cx_a, |p, cx| {
1099 p.find_or_create_local_worktree("/a", true, cx)
1100 })
1101 .await
1102 .unwrap();
1103 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1104 worktree_a
1105 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1106 .await;
1107 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1108 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1109
1110 // Join that project as client B
1111 let project_b = Project::remote(
1112 project_id,
1113 client_b.clone(),
1114 client_b.user_store.clone(),
1115 lang_registry.clone(),
1116 fs.clone(),
1117 &mut cx_b.to_async(),
1118 )
1119 .await
1120 .unwrap();
1121
1122 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1123 assert_eq!(
1124 project
1125 .collaborators()
1126 .get(&client_a.peer_id)
1127 .unwrap()
1128 .user
1129 .github_login,
1130 "user_a"
1131 );
1132 project.replica_id()
1133 });
1134 project_a
1135 .condition(&cx_a, |tree, _| {
1136 tree.collaborators()
1137 .get(&client_b.peer_id)
1138 .map_or(false, |collaborator| {
1139 collaborator.replica_id == replica_id_b
1140 && collaborator.user.github_login == "user_b"
1141 })
1142 })
1143 .await;
1144
1145 // Open the same file as client B and client A.
1146 let buffer_b = project_b
1147 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1148 .await
1149 .unwrap();
1150 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1151 buffer_b.read_with(cx_b, |buf, cx| {
1152 assert_eq!(buf.read(cx).text(), "b-contents")
1153 });
1154 project_a.read_with(cx_a, |project, cx| {
1155 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1156 });
1157 let buffer_a = project_a
1158 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1159 .await
1160 .unwrap();
1161
1162 let editor_b = cx_b.add_view(window_b, |cx| {
1163 Editor::for_buffer(
1164 buffer_b,
1165 None,
1166 watch::channel_with(Settings::test(cx)).1,
1167 cx,
1168 )
1169 });
1170
1171 // TODO
1172 // // Create a selection set as client B and see that selection set as client A.
1173 // buffer_a
1174 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1175 // .await;
1176
1177 // Edit the buffer as client B and see that edit as client A.
1178 editor_b.update(cx_b, |editor, cx| {
1179 editor.handle_input(&Input("ok, ".into()), cx)
1180 });
1181 buffer_a
1182 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1183 .await;
1184
1185 // TODO
1186 // // Remove the selection set as client B, see those selections disappear as client A.
1187 cx_b.update(move |_| drop(editor_b));
1188 // buffer_a
1189 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1190 // .await;
1191
1192 // Dropping the client B's project removes client B from client A's collaborators.
1193 cx_b.update(move |_| drop(project_b));
1194 project_a
1195 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1196 .await;
1197 }
1198
1199 #[gpui::test(iterations = 10)]
1200 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1201 let lang_registry = Arc::new(LanguageRegistry::test());
1202 let fs = FakeFs::new(cx_a.background());
1203 cx_a.foreground().forbid_parking();
1204
1205 // Connect to a server as 2 clients.
1206 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1207 let client_a = server.create_client(cx_a, "user_a").await;
1208 let client_b = server.create_client(cx_b, "user_b").await;
1209
1210 // Share a project as client A
1211 fs.insert_tree(
1212 "/a",
1213 json!({
1214 ".zed.toml": r#"collaborators = ["user_b"]"#,
1215 "a.txt": "a-contents",
1216 "b.txt": "b-contents",
1217 }),
1218 )
1219 .await;
1220 let project_a = cx_a.update(|cx| {
1221 Project::local(
1222 client_a.clone(),
1223 client_a.user_store.clone(),
1224 lang_registry.clone(),
1225 fs.clone(),
1226 cx,
1227 )
1228 });
1229 let (worktree_a, _) = project_a
1230 .update(cx_a, |p, cx| {
1231 p.find_or_create_local_worktree("/a", true, cx)
1232 })
1233 .await
1234 .unwrap();
1235 worktree_a
1236 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1237 .await;
1238 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1239 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1240 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1241 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1242
1243 // Join that project as client B
1244 let project_b = Project::remote(
1245 project_id,
1246 client_b.clone(),
1247 client_b.user_store.clone(),
1248 lang_registry.clone(),
1249 fs.clone(),
1250 &mut cx_b.to_async(),
1251 )
1252 .await
1253 .unwrap();
1254 project_b
1255 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1256 .await
1257 .unwrap();
1258
1259 // Unshare the project as client A
1260 project_a
1261 .update(cx_a, |project, cx| project.unshare(cx))
1262 .await
1263 .unwrap();
1264 project_b
1265 .condition(cx_b, |project, _| project.is_read_only())
1266 .await;
1267 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1268 cx_b.update(|_| {
1269 drop(project_b);
1270 });
1271
1272 // Share the project again and ensure guests can still join.
1273 project_a
1274 .update(cx_a, |project, cx| project.share(cx))
1275 .await
1276 .unwrap();
1277 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1278
1279 let project_b2 = Project::remote(
1280 project_id,
1281 client_b.clone(),
1282 client_b.user_store.clone(),
1283 lang_registry.clone(),
1284 fs.clone(),
1285 &mut cx_b.to_async(),
1286 )
1287 .await
1288 .unwrap();
1289 project_b2
1290 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1291 .await
1292 .unwrap();
1293 }
1294
1295 #[gpui::test(iterations = 10)]
1296 async fn test_propagate_saves_and_fs_changes(
1297 cx_a: &mut TestAppContext,
1298 cx_b: &mut TestAppContext,
1299 cx_c: &mut TestAppContext,
1300 ) {
1301 let lang_registry = Arc::new(LanguageRegistry::test());
1302 let fs = FakeFs::new(cx_a.background());
1303 cx_a.foreground().forbid_parking();
1304
1305 // Connect to a server as 3 clients.
1306 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1307 let client_a = server.create_client(cx_a, "user_a").await;
1308 let client_b = server.create_client(cx_b, "user_b").await;
1309 let client_c = server.create_client(cx_c, "user_c").await;
1310
1311 // Share a worktree as client A.
1312 fs.insert_tree(
1313 "/a",
1314 json!({
1315 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1316 "file1": "",
1317 "file2": ""
1318 }),
1319 )
1320 .await;
1321 let project_a = cx_a.update(|cx| {
1322 Project::local(
1323 client_a.clone(),
1324 client_a.user_store.clone(),
1325 lang_registry.clone(),
1326 fs.clone(),
1327 cx,
1328 )
1329 });
1330 let (worktree_a, _) = project_a
1331 .update(cx_a, |p, cx| {
1332 p.find_or_create_local_worktree("/a", true, cx)
1333 })
1334 .await
1335 .unwrap();
1336 worktree_a
1337 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1338 .await;
1339 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1340 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1341 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1342
1343 // Join that worktree as clients B and C.
1344 let project_b = Project::remote(
1345 project_id,
1346 client_b.clone(),
1347 client_b.user_store.clone(),
1348 lang_registry.clone(),
1349 fs.clone(),
1350 &mut cx_b.to_async(),
1351 )
1352 .await
1353 .unwrap();
1354 let project_c = Project::remote(
1355 project_id,
1356 client_c.clone(),
1357 client_c.user_store.clone(),
1358 lang_registry.clone(),
1359 fs.clone(),
1360 &mut cx_c.to_async(),
1361 )
1362 .await
1363 .unwrap();
1364 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1365 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1366
1367 // Open and edit a buffer as both guests B and C.
1368 let buffer_b = project_b
1369 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1370 .await
1371 .unwrap();
1372 let buffer_c = project_c
1373 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1374 .await
1375 .unwrap();
1376 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1377 buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1378
1379 // Open and edit that buffer as the host.
1380 let buffer_a = project_a
1381 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1382 .await
1383 .unwrap();
1384
1385 buffer_a
1386 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1387 .await;
1388 buffer_a.update(cx_a, |buf, cx| {
1389 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1390 });
1391
1392 // Wait for edits to propagate
1393 buffer_a
1394 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1395 .await;
1396 buffer_b
1397 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1398 .await;
1399 buffer_c
1400 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1401 .await;
1402
1403 // Edit the buffer as the host and concurrently save as guest B.
1404 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1405 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1406 save_b.await.unwrap();
1407 assert_eq!(
1408 fs.load("/a/file1".as_ref()).await.unwrap(),
1409 "hi-a, i-am-c, i-am-b, i-am-a"
1410 );
1411 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1412 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1413 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1414
1415 worktree_a.flush_fs_events(cx_a).await;
1416
1417 // Make changes on host's file system, see those changes on guest worktrees.
1418 fs.rename(
1419 "/a/file1".as_ref(),
1420 "/a/file1-renamed".as_ref(),
1421 Default::default(),
1422 )
1423 .await
1424 .unwrap();
1425
1426 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1427 .await
1428 .unwrap();
1429 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1430
1431 worktree_a
1432 .condition(&cx_a, |tree, _| {
1433 tree.paths()
1434 .map(|p| p.to_string_lossy())
1435 .collect::<Vec<_>>()
1436 == [".zed.toml", "file1-renamed", "file3", "file4"]
1437 })
1438 .await;
1439 worktree_b
1440 .condition(&cx_b, |tree, _| {
1441 tree.paths()
1442 .map(|p| p.to_string_lossy())
1443 .collect::<Vec<_>>()
1444 == [".zed.toml", "file1-renamed", "file3", "file4"]
1445 })
1446 .await;
1447 worktree_c
1448 .condition(&cx_c, |tree, _| {
1449 tree.paths()
1450 .map(|p| p.to_string_lossy())
1451 .collect::<Vec<_>>()
1452 == [".zed.toml", "file1-renamed", "file3", "file4"]
1453 })
1454 .await;
1455
1456 // Ensure buffer files are updated as well.
1457 buffer_a
1458 .condition(&cx_a, |buf, _| {
1459 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1460 })
1461 .await;
1462 buffer_b
1463 .condition(&cx_b, |buf, _| {
1464 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1465 })
1466 .await;
1467 buffer_c
1468 .condition(&cx_c, |buf, _| {
1469 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1470 })
1471 .await;
1472 }
1473
1474 #[gpui::test(iterations = 10)]
1475 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1476 cx_a.foreground().forbid_parking();
1477 let lang_registry = Arc::new(LanguageRegistry::test());
1478 let fs = FakeFs::new(cx_a.background());
1479
1480 // Connect to a server as 2 clients.
1481 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1482 let client_a = server.create_client(cx_a, "user_a").await;
1483 let client_b = server.create_client(cx_b, "user_b").await;
1484
1485 // Share a project as client A
1486 fs.insert_tree(
1487 "/dir",
1488 json!({
1489 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1490 "a.txt": "a-contents",
1491 }),
1492 )
1493 .await;
1494
1495 let project_a = cx_a.update(|cx| {
1496 Project::local(
1497 client_a.clone(),
1498 client_a.user_store.clone(),
1499 lang_registry.clone(),
1500 fs.clone(),
1501 cx,
1502 )
1503 });
1504 let (worktree_a, _) = project_a
1505 .update(cx_a, |p, cx| {
1506 p.find_or_create_local_worktree("/dir", true, cx)
1507 })
1508 .await
1509 .unwrap();
1510 worktree_a
1511 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1512 .await;
1513 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1514 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1515 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1516
1517 // Join that project as client B
1518 let project_b = Project::remote(
1519 project_id,
1520 client_b.clone(),
1521 client_b.user_store.clone(),
1522 lang_registry.clone(),
1523 fs.clone(),
1524 &mut cx_b.to_async(),
1525 )
1526 .await
1527 .unwrap();
1528
1529 // Open a buffer as client B
1530 let buffer_b = project_b
1531 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1532 .await
1533 .unwrap();
1534
1535 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1536 buffer_b.read_with(cx_b, |buf, _| {
1537 assert!(buf.is_dirty());
1538 assert!(!buf.has_conflict());
1539 });
1540
1541 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1542 buffer_b
1543 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1544 .await;
1545 buffer_b.read_with(cx_b, |buf, _| {
1546 assert!(!buf.has_conflict());
1547 });
1548
1549 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1550 buffer_b.read_with(cx_b, |buf, _| {
1551 assert!(buf.is_dirty());
1552 assert!(!buf.has_conflict());
1553 });
1554 }
1555
1556 #[gpui::test(iterations = 10)]
1557 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1558 cx_a.foreground().forbid_parking();
1559 let lang_registry = Arc::new(LanguageRegistry::test());
1560 let fs = FakeFs::new(cx_a.background());
1561
1562 // Connect to a server as 2 clients.
1563 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1564 let client_a = server.create_client(cx_a, "user_a").await;
1565 let client_b = server.create_client(cx_b, "user_b").await;
1566
1567 // Share a project as client A
1568 fs.insert_tree(
1569 "/dir",
1570 json!({
1571 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1572 "a.txt": "a-contents",
1573 }),
1574 )
1575 .await;
1576
1577 let project_a = cx_a.update(|cx| {
1578 Project::local(
1579 client_a.clone(),
1580 client_a.user_store.clone(),
1581 lang_registry.clone(),
1582 fs.clone(),
1583 cx,
1584 )
1585 });
1586 let (worktree_a, _) = project_a
1587 .update(cx_a, |p, cx| {
1588 p.find_or_create_local_worktree("/dir", true, cx)
1589 })
1590 .await
1591 .unwrap();
1592 worktree_a
1593 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1594 .await;
1595 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1596 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1597 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1598
1599 // Join that project as client B
1600 let project_b = Project::remote(
1601 project_id,
1602 client_b.clone(),
1603 client_b.user_store.clone(),
1604 lang_registry.clone(),
1605 fs.clone(),
1606 &mut cx_b.to_async(),
1607 )
1608 .await
1609 .unwrap();
1610 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1611
1612 // Open a buffer as client B
1613 let buffer_b = project_b
1614 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1615 .await
1616 .unwrap();
1617 buffer_b.read_with(cx_b, |buf, _| {
1618 assert!(!buf.is_dirty());
1619 assert!(!buf.has_conflict());
1620 });
1621
1622 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1623 .await
1624 .unwrap();
1625 buffer_b
1626 .condition(&cx_b, |buf, _| {
1627 buf.text() == "new contents" && !buf.is_dirty()
1628 })
1629 .await;
1630 buffer_b.read_with(cx_b, |buf, _| {
1631 assert!(!buf.has_conflict());
1632 });
1633 }
1634
1635 #[gpui::test(iterations = 10)]
1636 async fn test_editing_while_guest_opens_buffer(
1637 cx_a: &mut TestAppContext,
1638 cx_b: &mut TestAppContext,
1639 ) {
1640 cx_a.foreground().forbid_parking();
1641 let lang_registry = Arc::new(LanguageRegistry::test());
1642 let fs = FakeFs::new(cx_a.background());
1643
1644 // Connect to a server as 2 clients.
1645 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1646 let client_a = server.create_client(cx_a, "user_a").await;
1647 let client_b = server.create_client(cx_b, "user_b").await;
1648
1649 // Share a project as client A
1650 fs.insert_tree(
1651 "/dir",
1652 json!({
1653 ".zed.toml": r#"collaborators = ["user_b"]"#,
1654 "a.txt": "a-contents",
1655 }),
1656 )
1657 .await;
1658 let project_a = cx_a.update(|cx| {
1659 Project::local(
1660 client_a.clone(),
1661 client_a.user_store.clone(),
1662 lang_registry.clone(),
1663 fs.clone(),
1664 cx,
1665 )
1666 });
1667 let (worktree_a, _) = project_a
1668 .update(cx_a, |p, cx| {
1669 p.find_or_create_local_worktree("/dir", true, cx)
1670 })
1671 .await
1672 .unwrap();
1673 worktree_a
1674 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1675 .await;
1676 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1677 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1678 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1679
1680 // Join that project as client B
1681 let project_b = Project::remote(
1682 project_id,
1683 client_b.clone(),
1684 client_b.user_store.clone(),
1685 lang_registry.clone(),
1686 fs.clone(),
1687 &mut cx_b.to_async(),
1688 )
1689 .await
1690 .unwrap();
1691
1692 // Open a buffer as client A
1693 let buffer_a = project_a
1694 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1695 .await
1696 .unwrap();
1697
1698 // Start opening the same buffer as client B
1699 let buffer_b = cx_b
1700 .background()
1701 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1702
1703 // Edit the buffer as client A while client B is still opening it.
1704 cx_b.background().simulate_random_delay().await;
1705 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1706 cx_b.background().simulate_random_delay().await;
1707 buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1708
1709 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1710 let buffer_b = buffer_b.await.unwrap();
1711 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1712 }
1713
1714 #[gpui::test(iterations = 10)]
1715 async fn test_leaving_worktree_while_opening_buffer(
1716 cx_a: &mut TestAppContext,
1717 cx_b: &mut TestAppContext,
1718 ) {
1719 cx_a.foreground().forbid_parking();
1720 let lang_registry = Arc::new(LanguageRegistry::test());
1721 let fs = FakeFs::new(cx_a.background());
1722
1723 // Connect to a server as 2 clients.
1724 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1725 let client_a = server.create_client(cx_a, "user_a").await;
1726 let client_b = server.create_client(cx_b, "user_b").await;
1727
1728 // Share a project as client A
1729 fs.insert_tree(
1730 "/dir",
1731 json!({
1732 ".zed.toml": r#"collaborators = ["user_b"]"#,
1733 "a.txt": "a-contents",
1734 }),
1735 )
1736 .await;
1737 let project_a = cx_a.update(|cx| {
1738 Project::local(
1739 client_a.clone(),
1740 client_a.user_store.clone(),
1741 lang_registry.clone(),
1742 fs.clone(),
1743 cx,
1744 )
1745 });
1746 let (worktree_a, _) = project_a
1747 .update(cx_a, |p, cx| {
1748 p.find_or_create_local_worktree("/dir", true, cx)
1749 })
1750 .await
1751 .unwrap();
1752 worktree_a
1753 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1754 .await;
1755 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1756 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1757 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1758
1759 // Join that project as client B
1760 let project_b = Project::remote(
1761 project_id,
1762 client_b.clone(),
1763 client_b.user_store.clone(),
1764 lang_registry.clone(),
1765 fs.clone(),
1766 &mut cx_b.to_async(),
1767 )
1768 .await
1769 .unwrap();
1770
1771 // See that a guest has joined as client A.
1772 project_a
1773 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1774 .await;
1775
1776 // Begin opening a buffer as client B, but leave the project before the open completes.
1777 let buffer_b = cx_b
1778 .background()
1779 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1780 cx_b.update(|_| drop(project_b));
1781 drop(buffer_b);
1782
1783 // See that the guest has left.
1784 project_a
1785 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1786 .await;
1787 }
1788
1789 #[gpui::test(iterations = 10)]
1790 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1791 cx_a.foreground().forbid_parking();
1792 let lang_registry = Arc::new(LanguageRegistry::test());
1793 let fs = FakeFs::new(cx_a.background());
1794
1795 // Connect to a server as 2 clients.
1796 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1797 let client_a = server.create_client(cx_a, "user_a").await;
1798 let client_b = server.create_client(cx_b, "user_b").await;
1799
1800 // Share a project as client A
1801 fs.insert_tree(
1802 "/a",
1803 json!({
1804 ".zed.toml": r#"collaborators = ["user_b"]"#,
1805 "a.txt": "a-contents",
1806 "b.txt": "b-contents",
1807 }),
1808 )
1809 .await;
1810 let project_a = cx_a.update(|cx| {
1811 Project::local(
1812 client_a.clone(),
1813 client_a.user_store.clone(),
1814 lang_registry.clone(),
1815 fs.clone(),
1816 cx,
1817 )
1818 });
1819 let (worktree_a, _) = project_a
1820 .update(cx_a, |p, cx| {
1821 p.find_or_create_local_worktree("/a", true, cx)
1822 })
1823 .await
1824 .unwrap();
1825 worktree_a
1826 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1827 .await;
1828 let project_id = project_a
1829 .update(cx_a, |project, _| project.next_remote_id())
1830 .await;
1831 project_a
1832 .update(cx_a, |project, cx| project.share(cx))
1833 .await
1834 .unwrap();
1835
1836 // Join that project as client B
1837 let _project_b = Project::remote(
1838 project_id,
1839 client_b.clone(),
1840 client_b.user_store.clone(),
1841 lang_registry.clone(),
1842 fs.clone(),
1843 &mut cx_b.to_async(),
1844 )
1845 .await
1846 .unwrap();
1847
1848 // Client A sees that a guest has joined.
1849 project_a
1850 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1851 .await;
1852
1853 // Drop client B's connection and ensure client A observes client B leaving the project.
1854 client_b.disconnect(&cx_b.to_async()).unwrap();
1855 project_a
1856 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1857 .await;
1858
1859 // Rejoin the project as client B
1860 let _project_b = Project::remote(
1861 project_id,
1862 client_b.clone(),
1863 client_b.user_store.clone(),
1864 lang_registry.clone(),
1865 fs.clone(),
1866 &mut cx_b.to_async(),
1867 )
1868 .await
1869 .unwrap();
1870
1871 // Client A sees that a guest has re-joined.
1872 project_a
1873 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1874 .await;
1875
1876 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1877 server.disconnect_client(client_b.current_user_id(cx_b));
1878 cx_a.foreground().advance_clock(Duration::from_secs(3));
1879 project_a
1880 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1881 .await;
1882 }
1883
1884 #[gpui::test(iterations = 10)]
1885 async fn test_collaborating_with_diagnostics(
1886 cx_a: &mut TestAppContext,
1887 cx_b: &mut TestAppContext,
1888 ) {
1889 cx_a.foreground().forbid_parking();
1890 let mut lang_registry = Arc::new(LanguageRegistry::test());
1891 let fs = FakeFs::new(cx_a.background());
1892
1893 // Set up a fake language server.
1894 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1895 Arc::get_mut(&mut lang_registry)
1896 .unwrap()
1897 .add(Arc::new(Language::new(
1898 LanguageConfig {
1899 name: "Rust".into(),
1900 path_suffixes: vec!["rs".to_string()],
1901 language_server: Some(language_server_config),
1902 ..Default::default()
1903 },
1904 Some(tree_sitter_rust::language()),
1905 )));
1906
1907 // Connect to a server as 2 clients.
1908 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1909 let client_a = server.create_client(cx_a, "user_a").await;
1910 let client_b = server.create_client(cx_b, "user_b").await;
1911
1912 // Share a project as client A
1913 fs.insert_tree(
1914 "/a",
1915 json!({
1916 ".zed.toml": r#"collaborators = ["user_b"]"#,
1917 "a.rs": "let one = two",
1918 "other.rs": "",
1919 }),
1920 )
1921 .await;
1922 let project_a = cx_a.update(|cx| {
1923 Project::local(
1924 client_a.clone(),
1925 client_a.user_store.clone(),
1926 lang_registry.clone(),
1927 fs.clone(),
1928 cx,
1929 )
1930 });
1931 let (worktree_a, _) = project_a
1932 .update(cx_a, |p, cx| {
1933 p.find_or_create_local_worktree("/a", true, cx)
1934 })
1935 .await
1936 .unwrap();
1937 worktree_a
1938 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1939 .await;
1940 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1941 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1942 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1943
1944 // Cause the language server to start.
1945 let _ = cx_a
1946 .background()
1947 .spawn(project_a.update(cx_a, |project, cx| {
1948 project.open_buffer(
1949 ProjectPath {
1950 worktree_id,
1951 path: Path::new("other.rs").into(),
1952 },
1953 cx,
1954 )
1955 }))
1956 .await
1957 .unwrap();
1958
1959 // Simulate a language server reporting errors for a file.
1960 let mut fake_language_server = fake_language_servers.next().await.unwrap();
1961 fake_language_server
1962 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1963 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1964 version: None,
1965 diagnostics: vec![lsp::Diagnostic {
1966 severity: Some(lsp::DiagnosticSeverity::ERROR),
1967 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1968 message: "message 1".to_string(),
1969 ..Default::default()
1970 }],
1971 })
1972 .await;
1973
1974 // Wait for server to see the diagnostics update.
1975 server
1976 .condition(|store| {
1977 let worktree = store
1978 .project(project_id)
1979 .unwrap()
1980 .share
1981 .as_ref()
1982 .unwrap()
1983 .worktrees
1984 .get(&worktree_id.to_proto())
1985 .unwrap();
1986
1987 !worktree.diagnostic_summaries.is_empty()
1988 })
1989 .await;
1990
1991 // Join the worktree as client B.
1992 let project_b = Project::remote(
1993 project_id,
1994 client_b.clone(),
1995 client_b.user_store.clone(),
1996 lang_registry.clone(),
1997 fs.clone(),
1998 &mut cx_b.to_async(),
1999 )
2000 .await
2001 .unwrap();
2002
2003 project_b.read_with(cx_b, |project, cx| {
2004 assert_eq!(
2005 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2006 &[(
2007 ProjectPath {
2008 worktree_id,
2009 path: Arc::from(Path::new("a.rs")),
2010 },
2011 DiagnosticSummary {
2012 error_count: 1,
2013 warning_count: 0,
2014 ..Default::default()
2015 },
2016 )]
2017 )
2018 });
2019
2020 // Simulate a language server reporting more errors for a file.
2021 fake_language_server
2022 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2023 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2024 version: None,
2025 diagnostics: vec![
2026 lsp::Diagnostic {
2027 severity: Some(lsp::DiagnosticSeverity::ERROR),
2028 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2029 message: "message 1".to_string(),
2030 ..Default::default()
2031 },
2032 lsp::Diagnostic {
2033 severity: Some(lsp::DiagnosticSeverity::WARNING),
2034 range: lsp::Range::new(
2035 lsp::Position::new(0, 10),
2036 lsp::Position::new(0, 13),
2037 ),
2038 message: "message 2".to_string(),
2039 ..Default::default()
2040 },
2041 ],
2042 })
2043 .await;
2044
2045 // Client b gets the updated summaries
2046 project_b
2047 .condition(&cx_b, |project, cx| {
2048 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2049 == &[(
2050 ProjectPath {
2051 worktree_id,
2052 path: Arc::from(Path::new("a.rs")),
2053 },
2054 DiagnosticSummary {
2055 error_count: 1,
2056 warning_count: 1,
2057 ..Default::default()
2058 },
2059 )]
2060 })
2061 .await;
2062
2063 // Open the file with the errors on client B. They should be present.
2064 let buffer_b = cx_b
2065 .background()
2066 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2067 .await
2068 .unwrap();
2069
2070 buffer_b.read_with(cx_b, |buffer, _| {
2071 assert_eq!(
2072 buffer
2073 .snapshot()
2074 .diagnostics_in_range::<_, Point>(0..buffer.len())
2075 .map(|entry| entry)
2076 .collect::<Vec<_>>(),
2077 &[
2078 DiagnosticEntry {
2079 range: Point::new(0, 4)..Point::new(0, 7),
2080 diagnostic: Diagnostic {
2081 group_id: 0,
2082 message: "message 1".to_string(),
2083 severity: lsp::DiagnosticSeverity::ERROR,
2084 is_primary: true,
2085 ..Default::default()
2086 }
2087 },
2088 DiagnosticEntry {
2089 range: Point::new(0, 10)..Point::new(0, 13),
2090 diagnostic: Diagnostic {
2091 group_id: 1,
2092 severity: lsp::DiagnosticSeverity::WARNING,
2093 message: "message 2".to_string(),
2094 is_primary: true,
2095 ..Default::default()
2096 }
2097 }
2098 ]
2099 );
2100 });
2101 }
2102
2103 #[gpui::test(iterations = 10)]
2104 async fn test_collaborating_with_completion(
2105 cx_a: &mut TestAppContext,
2106 cx_b: &mut TestAppContext,
2107 ) {
2108 cx_a.foreground().forbid_parking();
2109 let mut lang_registry = Arc::new(LanguageRegistry::test());
2110 let fs = FakeFs::new(cx_a.background());
2111
2112 // Set up a fake language server.
2113 let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2114 language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2115 completion_provider: Some(lsp::CompletionOptions {
2116 trigger_characters: Some(vec![".".to_string()]),
2117 ..Default::default()
2118 }),
2119 ..Default::default()
2120 });
2121 Arc::get_mut(&mut lang_registry)
2122 .unwrap()
2123 .add(Arc::new(Language::new(
2124 LanguageConfig {
2125 name: "Rust".into(),
2126 path_suffixes: vec!["rs".to_string()],
2127 language_server: Some(language_server_config),
2128 ..Default::default()
2129 },
2130 Some(tree_sitter_rust::language()),
2131 )));
2132
2133 // Connect to a server as 2 clients.
2134 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2135 let client_a = server.create_client(cx_a, "user_a").await;
2136 let client_b = server.create_client(cx_b, "user_b").await;
2137
2138 // Share a project as client A
2139 fs.insert_tree(
2140 "/a",
2141 json!({
2142 ".zed.toml": r#"collaborators = ["user_b"]"#,
2143 "main.rs": "fn main() { a }",
2144 "other.rs": "",
2145 }),
2146 )
2147 .await;
2148 let project_a = cx_a.update(|cx| {
2149 Project::local(
2150 client_a.clone(),
2151 client_a.user_store.clone(),
2152 lang_registry.clone(),
2153 fs.clone(),
2154 cx,
2155 )
2156 });
2157 let (worktree_a, _) = project_a
2158 .update(cx_a, |p, cx| {
2159 p.find_or_create_local_worktree("/a", true, cx)
2160 })
2161 .await
2162 .unwrap();
2163 worktree_a
2164 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2165 .await;
2166 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2167 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2168 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2169
2170 // Join the worktree as client B.
2171 let project_b = Project::remote(
2172 project_id,
2173 client_b.clone(),
2174 client_b.user_store.clone(),
2175 lang_registry.clone(),
2176 fs.clone(),
2177 &mut cx_b.to_async(),
2178 )
2179 .await
2180 .unwrap();
2181
2182 // Open a file in an editor as the guest.
2183 let buffer_b = project_b
2184 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2185 .await
2186 .unwrap();
2187 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2188 let editor_b = cx_b.add_view(window_b, |cx| {
2189 Editor::for_buffer(
2190 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2191 Some(project_b.clone()),
2192 watch::channel_with(Settings::test(cx)).1,
2193 cx,
2194 )
2195 });
2196
2197 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2198 buffer_b
2199 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2200 .await;
2201
2202 // Type a completion trigger character as the guest.
2203 editor_b.update(cx_b, |editor, cx| {
2204 editor.select_ranges([13..13], None, cx);
2205 editor.handle_input(&Input(".".into()), cx);
2206 cx.focus(&editor_b);
2207 });
2208
2209 // Receive a completion request as the host's language server.
2210 // Return some completions from the host's language server.
2211 cx_a.foreground().start_waiting();
2212 fake_language_server
2213 .handle_request::<lsp::request::Completion, _>(|params, _| {
2214 assert_eq!(
2215 params.text_document_position.text_document.uri,
2216 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2217 );
2218 assert_eq!(
2219 params.text_document_position.position,
2220 lsp::Position::new(0, 14),
2221 );
2222
2223 Some(lsp::CompletionResponse::Array(vec![
2224 lsp::CompletionItem {
2225 label: "first_method(…)".into(),
2226 detail: Some("fn(&mut self, B) -> C".into()),
2227 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2228 new_text: "first_method($1)".to_string(),
2229 range: lsp::Range::new(
2230 lsp::Position::new(0, 14),
2231 lsp::Position::new(0, 14),
2232 ),
2233 })),
2234 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2235 ..Default::default()
2236 },
2237 lsp::CompletionItem {
2238 label: "second_method(…)".into(),
2239 detail: Some("fn(&mut self, C) -> D<E>".into()),
2240 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2241 new_text: "second_method()".to_string(),
2242 range: lsp::Range::new(
2243 lsp::Position::new(0, 14),
2244 lsp::Position::new(0, 14),
2245 ),
2246 })),
2247 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2248 ..Default::default()
2249 },
2250 ]))
2251 })
2252 .next()
2253 .await
2254 .unwrap();
2255 cx_a.foreground().finish_waiting();
2256
2257 // Open the buffer on the host.
2258 let buffer_a = project_a
2259 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2260 .await
2261 .unwrap();
2262 buffer_a
2263 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2264 .await;
2265
2266 // Confirm a completion on the guest.
2267 editor_b
2268 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2269 .await;
2270 editor_b.update(cx_b, |editor, cx| {
2271 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2272 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2273 });
2274
2275 // Return a resolved completion from the host's language server.
2276 // The resolved completion has an additional text edit.
2277 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2278 |params, _| {
2279 assert_eq!(params.label, "first_method(…)");
2280 lsp::CompletionItem {
2281 label: "first_method(…)".into(),
2282 detail: Some("fn(&mut self, B) -> C".into()),
2283 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2284 new_text: "first_method($1)".to_string(),
2285 range: lsp::Range::new(
2286 lsp::Position::new(0, 14),
2287 lsp::Position::new(0, 14),
2288 ),
2289 })),
2290 additional_text_edits: Some(vec![lsp::TextEdit {
2291 new_text: "use d::SomeTrait;\n".to_string(),
2292 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2293 }]),
2294 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2295 ..Default::default()
2296 }
2297 },
2298 );
2299
2300 // The additional edit is applied.
2301 buffer_a
2302 .condition(&cx_a, |buffer, _| {
2303 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2304 })
2305 .await;
2306 buffer_b
2307 .condition(&cx_b, |buffer, _| {
2308 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2309 })
2310 .await;
2311 }
2312
2313 #[gpui::test(iterations = 10)]
2314 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2315 cx_a.foreground().forbid_parking();
2316 let mut lang_registry = Arc::new(LanguageRegistry::test());
2317 let fs = FakeFs::new(cx_a.background());
2318
2319 // Set up a fake language server.
2320 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2321 Arc::get_mut(&mut lang_registry)
2322 .unwrap()
2323 .add(Arc::new(Language::new(
2324 LanguageConfig {
2325 name: "Rust".into(),
2326 path_suffixes: vec!["rs".to_string()],
2327 language_server: Some(language_server_config),
2328 ..Default::default()
2329 },
2330 Some(tree_sitter_rust::language()),
2331 )));
2332
2333 // Connect to a server as 2 clients.
2334 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2335 let client_a = server.create_client(cx_a, "user_a").await;
2336 let client_b = server.create_client(cx_b, "user_b").await;
2337
2338 // Share a project as client A
2339 fs.insert_tree(
2340 "/a",
2341 json!({
2342 ".zed.toml": r#"collaborators = ["user_b"]"#,
2343 "a.rs": "let one = two",
2344 }),
2345 )
2346 .await;
2347 let project_a = cx_a.update(|cx| {
2348 Project::local(
2349 client_a.clone(),
2350 client_a.user_store.clone(),
2351 lang_registry.clone(),
2352 fs.clone(),
2353 cx,
2354 )
2355 });
2356 let (worktree_a, _) = project_a
2357 .update(cx_a, |p, cx| {
2358 p.find_or_create_local_worktree("/a", true, cx)
2359 })
2360 .await
2361 .unwrap();
2362 worktree_a
2363 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2364 .await;
2365 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2366 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2367 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2368
2369 // Join the worktree as client B.
2370 let project_b = Project::remote(
2371 project_id,
2372 client_b.clone(),
2373 client_b.user_store.clone(),
2374 lang_registry.clone(),
2375 fs.clone(),
2376 &mut cx_b.to_async(),
2377 )
2378 .await
2379 .unwrap();
2380
2381 let buffer_b = cx_b
2382 .background()
2383 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2384 .await
2385 .unwrap();
2386
2387 let format = project_b.update(cx_b, |project, cx| {
2388 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2389 });
2390
2391 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2392 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2393 Some(vec![
2394 lsp::TextEdit {
2395 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2396 new_text: "h".to_string(),
2397 },
2398 lsp::TextEdit {
2399 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2400 new_text: "y".to_string(),
2401 },
2402 ])
2403 });
2404
2405 format.await.unwrap();
2406 assert_eq!(
2407 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2408 "let honey = two"
2409 );
2410 }
2411
2412 #[gpui::test(iterations = 10)]
2413 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2414 cx_a.foreground().forbid_parking();
2415 let mut lang_registry = Arc::new(LanguageRegistry::test());
2416 let fs = FakeFs::new(cx_a.background());
2417 fs.insert_tree(
2418 "/root-1",
2419 json!({
2420 ".zed.toml": r#"collaborators = ["user_b"]"#,
2421 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2422 }),
2423 )
2424 .await;
2425 fs.insert_tree(
2426 "/root-2",
2427 json!({
2428 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2429 }),
2430 )
2431 .await;
2432
2433 // Set up a fake language server.
2434 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2435 Arc::get_mut(&mut lang_registry)
2436 .unwrap()
2437 .add(Arc::new(Language::new(
2438 LanguageConfig {
2439 name: "Rust".into(),
2440 path_suffixes: vec!["rs".to_string()],
2441 language_server: Some(language_server_config),
2442 ..Default::default()
2443 },
2444 Some(tree_sitter_rust::language()),
2445 )));
2446
2447 // Connect to a server as 2 clients.
2448 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2449 let client_a = server.create_client(cx_a, "user_a").await;
2450 let client_b = server.create_client(cx_b, "user_b").await;
2451
2452 // Share a project as client A
2453 let project_a = cx_a.update(|cx| {
2454 Project::local(
2455 client_a.clone(),
2456 client_a.user_store.clone(),
2457 lang_registry.clone(),
2458 fs.clone(),
2459 cx,
2460 )
2461 });
2462 let (worktree_a, _) = project_a
2463 .update(cx_a, |p, cx| {
2464 p.find_or_create_local_worktree("/root-1", true, cx)
2465 })
2466 .await
2467 .unwrap();
2468 worktree_a
2469 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2470 .await;
2471 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2472 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2473 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2474
2475 // Join the worktree as client B.
2476 let project_b = Project::remote(
2477 project_id,
2478 client_b.clone(),
2479 client_b.user_store.clone(),
2480 lang_registry.clone(),
2481 fs.clone(),
2482 &mut cx_b.to_async(),
2483 )
2484 .await
2485 .unwrap();
2486
2487 // Open the file on client B.
2488 let buffer_b = cx_b
2489 .background()
2490 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2491 .await
2492 .unwrap();
2493
2494 // Request the definition of a symbol as the guest.
2495 let definitions_1 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2496
2497 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2498 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2499 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2500 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2501 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2502 )))
2503 });
2504
2505 let definitions_1 = definitions_1.await.unwrap();
2506 cx_b.read(|cx| {
2507 assert_eq!(definitions_1.len(), 1);
2508 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2509 let target_buffer = definitions_1[0].buffer.read(cx);
2510 assert_eq!(
2511 target_buffer.text(),
2512 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2513 );
2514 assert_eq!(
2515 definitions_1[0].range.to_point(target_buffer),
2516 Point::new(0, 6)..Point::new(0, 9)
2517 );
2518 });
2519
2520 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2521 // the previous call to `definition`.
2522 let definitions_2 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2523 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2524 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2525 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2526 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2527 )))
2528 });
2529
2530 let definitions_2 = definitions_2.await.unwrap();
2531 cx_b.read(|cx| {
2532 assert_eq!(definitions_2.len(), 1);
2533 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2534 let target_buffer = definitions_2[0].buffer.read(cx);
2535 assert_eq!(
2536 target_buffer.text(),
2537 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2538 );
2539 assert_eq!(
2540 definitions_2[0].range.to_point(target_buffer),
2541 Point::new(1, 6)..Point::new(1, 11)
2542 );
2543 });
2544 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2545 }
2546
2547 #[gpui::test(iterations = 10)]
2548 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2549 cx_a.foreground().forbid_parking();
2550 let mut lang_registry = Arc::new(LanguageRegistry::test());
2551 let fs = FakeFs::new(cx_a.background());
2552 fs.insert_tree(
2553 "/root-1",
2554 json!({
2555 ".zed.toml": r#"collaborators = ["user_b"]"#,
2556 "one.rs": "const ONE: usize = 1;",
2557 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2558 }),
2559 )
2560 .await;
2561 fs.insert_tree(
2562 "/root-2",
2563 json!({
2564 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2565 }),
2566 )
2567 .await;
2568
2569 // Set up a fake language server.
2570 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2571 Arc::get_mut(&mut lang_registry)
2572 .unwrap()
2573 .add(Arc::new(Language::new(
2574 LanguageConfig {
2575 name: "Rust".into(),
2576 path_suffixes: vec!["rs".to_string()],
2577 language_server: Some(language_server_config),
2578 ..Default::default()
2579 },
2580 Some(tree_sitter_rust::language()),
2581 )));
2582
2583 // Connect to a server as 2 clients.
2584 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2585 let client_a = server.create_client(cx_a, "user_a").await;
2586 let client_b = server.create_client(cx_b, "user_b").await;
2587
2588 // Share a project as client A
2589 let project_a = cx_a.update(|cx| {
2590 Project::local(
2591 client_a.clone(),
2592 client_a.user_store.clone(),
2593 lang_registry.clone(),
2594 fs.clone(),
2595 cx,
2596 )
2597 });
2598 let (worktree_a, _) = project_a
2599 .update(cx_a, |p, cx| {
2600 p.find_or_create_local_worktree("/root-1", true, cx)
2601 })
2602 .await
2603 .unwrap();
2604 worktree_a
2605 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2606 .await;
2607 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2608 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2609 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2610
2611 // Join the worktree as client B.
2612 let project_b = Project::remote(
2613 project_id,
2614 client_b.clone(),
2615 client_b.user_store.clone(),
2616 lang_registry.clone(),
2617 fs.clone(),
2618 &mut cx_b.to_async(),
2619 )
2620 .await
2621 .unwrap();
2622
2623 // Open the file on client B.
2624 let buffer_b = cx_b
2625 .background()
2626 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2627 .await
2628 .unwrap();
2629
2630 // Request references to a symbol as the guest.
2631 let references = project_b.update(cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2632
2633 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2634 fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2635 assert_eq!(
2636 params.text_document_position.text_document.uri.as_str(),
2637 "file:///root-1/one.rs"
2638 );
2639 Some(vec![
2640 lsp::Location {
2641 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2642 range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2643 },
2644 lsp::Location {
2645 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2646 range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2647 },
2648 lsp::Location {
2649 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2650 range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2651 },
2652 ])
2653 });
2654
2655 let references = references.await.unwrap();
2656 cx_b.read(|cx| {
2657 assert_eq!(references.len(), 3);
2658 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2659
2660 let two_buffer = references[0].buffer.read(cx);
2661 let three_buffer = references[2].buffer.read(cx);
2662 assert_eq!(
2663 two_buffer.file().unwrap().path().as_ref(),
2664 Path::new("two.rs")
2665 );
2666 assert_eq!(references[1].buffer, references[0].buffer);
2667 assert_eq!(
2668 three_buffer.file().unwrap().full_path(cx),
2669 Path::new("three.rs")
2670 );
2671
2672 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2673 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2674 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2675 });
2676 }
2677
2678 #[gpui::test(iterations = 10)]
2679 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2680 cx_a.foreground().forbid_parking();
2681 let lang_registry = Arc::new(LanguageRegistry::test());
2682 let fs = FakeFs::new(cx_a.background());
2683 fs.insert_tree(
2684 "/root-1",
2685 json!({
2686 ".zed.toml": r#"collaborators = ["user_b"]"#,
2687 "a": "hello world",
2688 "b": "goodnight moon",
2689 "c": "a world of goo",
2690 "d": "world champion of clown world",
2691 }),
2692 )
2693 .await;
2694 fs.insert_tree(
2695 "/root-2",
2696 json!({
2697 "e": "disney world is fun",
2698 }),
2699 )
2700 .await;
2701
2702 // Connect to a server as 2 clients.
2703 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2704 let client_a = server.create_client(cx_a, "user_a").await;
2705 let client_b = server.create_client(cx_b, "user_b").await;
2706
2707 // Share a project as client A
2708 let project_a = cx_a.update(|cx| {
2709 Project::local(
2710 client_a.clone(),
2711 client_a.user_store.clone(),
2712 lang_registry.clone(),
2713 fs.clone(),
2714 cx,
2715 )
2716 });
2717 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2718
2719 let (worktree_1, _) = project_a
2720 .update(cx_a, |p, cx| {
2721 p.find_or_create_local_worktree("/root-1", true, cx)
2722 })
2723 .await
2724 .unwrap();
2725 worktree_1
2726 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2727 .await;
2728 let (worktree_2, _) = project_a
2729 .update(cx_a, |p, cx| {
2730 p.find_or_create_local_worktree("/root-2", true, cx)
2731 })
2732 .await
2733 .unwrap();
2734 worktree_2
2735 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2736 .await;
2737
2738 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2739
2740 // Join the worktree as client B.
2741 let project_b = Project::remote(
2742 project_id,
2743 client_b.clone(),
2744 client_b.user_store.clone(),
2745 lang_registry.clone(),
2746 fs.clone(),
2747 &mut cx_b.to_async(),
2748 )
2749 .await
2750 .unwrap();
2751
2752 let results = project_b
2753 .update(cx_b, |project, cx| {
2754 project.search(SearchQuery::text("world", false, false), cx)
2755 })
2756 .await
2757 .unwrap();
2758
2759 let mut ranges_by_path = results
2760 .into_iter()
2761 .map(|(buffer, ranges)| {
2762 buffer.read_with(cx_b, |buffer, cx| {
2763 let path = buffer.file().unwrap().full_path(cx);
2764 let offset_ranges = ranges
2765 .into_iter()
2766 .map(|range| range.to_offset(buffer))
2767 .collect::<Vec<_>>();
2768 (path, offset_ranges)
2769 })
2770 })
2771 .collect::<Vec<_>>();
2772 ranges_by_path.sort_by_key(|(path, _)| path.clone());
2773
2774 assert_eq!(
2775 ranges_by_path,
2776 &[
2777 (PathBuf::from("root-1/a"), vec![6..11]),
2778 (PathBuf::from("root-1/c"), vec![2..7]),
2779 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2780 (PathBuf::from("root-2/e"), vec![7..12]),
2781 ]
2782 );
2783 }
2784
2785 #[gpui::test(iterations = 10)]
2786 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2787 cx_a.foreground().forbid_parking();
2788 let lang_registry = Arc::new(LanguageRegistry::test());
2789 let fs = FakeFs::new(cx_a.background());
2790 fs.insert_tree(
2791 "/root-1",
2792 json!({
2793 ".zed.toml": r#"collaborators = ["user_b"]"#,
2794 "main.rs": "fn double(number: i32) -> i32 { number + number }",
2795 }),
2796 )
2797 .await;
2798
2799 // Set up a fake language server.
2800 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2801 lang_registry.add(Arc::new(Language::new(
2802 LanguageConfig {
2803 name: "Rust".into(),
2804 path_suffixes: vec!["rs".to_string()],
2805 language_server: Some(language_server_config),
2806 ..Default::default()
2807 },
2808 Some(tree_sitter_rust::language()),
2809 )));
2810
2811 // Connect to a server as 2 clients.
2812 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2813 let client_a = server.create_client(cx_a, "user_a").await;
2814 let client_b = server.create_client(cx_b, "user_b").await;
2815
2816 // Share a project as client A
2817 let project_a = cx_a.update(|cx| {
2818 Project::local(
2819 client_a.clone(),
2820 client_a.user_store.clone(),
2821 lang_registry.clone(),
2822 fs.clone(),
2823 cx,
2824 )
2825 });
2826 let (worktree_a, _) = project_a
2827 .update(cx_a, |p, cx| {
2828 p.find_or_create_local_worktree("/root-1", true, cx)
2829 })
2830 .await
2831 .unwrap();
2832 worktree_a
2833 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2834 .await;
2835 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2836 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2837 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2838
2839 // Join the worktree as client B.
2840 let project_b = Project::remote(
2841 project_id,
2842 client_b.clone(),
2843 client_b.user_store.clone(),
2844 lang_registry.clone(),
2845 fs.clone(),
2846 &mut cx_b.to_async(),
2847 )
2848 .await
2849 .unwrap();
2850
2851 // Open the file on client B.
2852 let buffer_b = cx_b
2853 .background()
2854 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2855 .await
2856 .unwrap();
2857
2858 // Request document highlights as the guest.
2859 let highlights = project_b.update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2860
2861 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2862 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2863 |params, _| {
2864 assert_eq!(
2865 params
2866 .text_document_position_params
2867 .text_document
2868 .uri
2869 .as_str(),
2870 "file:///root-1/main.rs"
2871 );
2872 assert_eq!(
2873 params.text_document_position_params.position,
2874 lsp::Position::new(0, 34)
2875 );
2876 Some(vec![
2877 lsp::DocumentHighlight {
2878 kind: Some(lsp::DocumentHighlightKind::WRITE),
2879 range: lsp::Range::new(
2880 lsp::Position::new(0, 10),
2881 lsp::Position::new(0, 16),
2882 ),
2883 },
2884 lsp::DocumentHighlight {
2885 kind: Some(lsp::DocumentHighlightKind::READ),
2886 range: lsp::Range::new(
2887 lsp::Position::new(0, 32),
2888 lsp::Position::new(0, 38),
2889 ),
2890 },
2891 lsp::DocumentHighlight {
2892 kind: Some(lsp::DocumentHighlightKind::READ),
2893 range: lsp::Range::new(
2894 lsp::Position::new(0, 41),
2895 lsp::Position::new(0, 47),
2896 ),
2897 },
2898 ])
2899 },
2900 );
2901
2902 let highlights = highlights.await.unwrap();
2903 buffer_b.read_with(cx_b, |buffer, _| {
2904 let snapshot = buffer.snapshot();
2905
2906 let highlights = highlights
2907 .into_iter()
2908 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2909 .collect::<Vec<_>>();
2910 assert_eq!(
2911 highlights,
2912 &[
2913 (lsp::DocumentHighlightKind::WRITE, 10..16),
2914 (lsp::DocumentHighlightKind::READ, 32..38),
2915 (lsp::DocumentHighlightKind::READ, 41..47)
2916 ]
2917 )
2918 });
2919 }
2920
2921 #[gpui::test(iterations = 10)]
2922 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2923 cx_a.foreground().forbid_parking();
2924 let mut lang_registry = Arc::new(LanguageRegistry::test());
2925 let fs = FakeFs::new(cx_a.background());
2926 fs.insert_tree(
2927 "/code",
2928 json!({
2929 "crate-1": {
2930 ".zed.toml": r#"collaborators = ["user_b"]"#,
2931 "one.rs": "const ONE: usize = 1;",
2932 },
2933 "crate-2": {
2934 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2935 },
2936 "private": {
2937 "passwords.txt": "the-password",
2938 }
2939 }),
2940 )
2941 .await;
2942
2943 // Set up a fake language server.
2944 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2945 Arc::get_mut(&mut lang_registry)
2946 .unwrap()
2947 .add(Arc::new(Language::new(
2948 LanguageConfig {
2949 name: "Rust".into(),
2950 path_suffixes: vec!["rs".to_string()],
2951 language_server: Some(language_server_config),
2952 ..Default::default()
2953 },
2954 Some(tree_sitter_rust::language()),
2955 )));
2956
2957 // Connect to a server as 2 clients.
2958 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2959 let client_a = server.create_client(cx_a, "user_a").await;
2960 let client_b = server.create_client(cx_b, "user_b").await;
2961
2962 // Share a project as client A
2963 let project_a = cx_a.update(|cx| {
2964 Project::local(
2965 client_a.clone(),
2966 client_a.user_store.clone(),
2967 lang_registry.clone(),
2968 fs.clone(),
2969 cx,
2970 )
2971 });
2972 let (worktree_a, _) = project_a
2973 .update(cx_a, |p, cx| {
2974 p.find_or_create_local_worktree("/code/crate-1", true, cx)
2975 })
2976 .await
2977 .unwrap();
2978 worktree_a
2979 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2980 .await;
2981 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2982 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2983 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2984
2985 // Join the worktree as client B.
2986 let project_b = Project::remote(
2987 project_id,
2988 client_b.clone(),
2989 client_b.user_store.clone(),
2990 lang_registry.clone(),
2991 fs.clone(),
2992 &mut cx_b.to_async(),
2993 )
2994 .await
2995 .unwrap();
2996
2997 // Cause the language server to start.
2998 let _buffer = cx_b
2999 .background()
3000 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3001 .await
3002 .unwrap();
3003
3004 // Request the definition of a symbol as the guest.
3005 let symbols = project_b.update(cx_b, |p, cx| p.symbols("two", cx));
3006 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3007 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3008 #[allow(deprecated)]
3009 Some(vec![lsp::SymbolInformation {
3010 name: "TWO".into(),
3011 location: lsp::Location {
3012 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3013 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3014 },
3015 kind: lsp::SymbolKind::CONSTANT,
3016 tags: None,
3017 container_name: None,
3018 deprecated: None,
3019 }])
3020 });
3021
3022 let symbols = symbols.await.unwrap();
3023 assert_eq!(symbols.len(), 1);
3024 assert_eq!(symbols[0].name, "TWO");
3025
3026 // Open one of the returned symbols.
3027 let buffer_b_2 = project_b
3028 .update(cx_b, |project, cx| {
3029 project.open_buffer_for_symbol(&symbols[0], cx)
3030 })
3031 .await
3032 .unwrap();
3033 buffer_b_2.read_with(cx_b, |buffer, _| {
3034 assert_eq!(
3035 buffer.file().unwrap().path().as_ref(),
3036 Path::new("../crate-2/two.rs")
3037 );
3038 });
3039
3040 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3041 let mut fake_symbol = symbols[0].clone();
3042 fake_symbol.path = Path::new("/code/secrets").into();
3043 let error = project_b
3044 .update(cx_b, |project, cx| {
3045 project.open_buffer_for_symbol(&fake_symbol, cx)
3046 })
3047 .await
3048 .unwrap_err();
3049 assert!(error.to_string().contains("invalid symbol signature"));
3050 }
3051
3052 #[gpui::test(iterations = 10)]
3053 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3054 cx_a: &mut TestAppContext,
3055 cx_b: &mut TestAppContext,
3056 mut rng: StdRng,
3057 ) {
3058 cx_a.foreground().forbid_parking();
3059 let mut lang_registry = Arc::new(LanguageRegistry::test());
3060 let fs = FakeFs::new(cx_a.background());
3061 fs.insert_tree(
3062 "/root",
3063 json!({
3064 ".zed.toml": r#"collaborators = ["user_b"]"#,
3065 "a.rs": "const ONE: usize = b::TWO;",
3066 "b.rs": "const TWO: usize = 2",
3067 }),
3068 )
3069 .await;
3070
3071 // Set up a fake language server.
3072 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3073
3074 Arc::get_mut(&mut lang_registry)
3075 .unwrap()
3076 .add(Arc::new(Language::new(
3077 LanguageConfig {
3078 name: "Rust".into(),
3079 path_suffixes: vec!["rs".to_string()],
3080 language_server: Some(language_server_config),
3081 ..Default::default()
3082 },
3083 Some(tree_sitter_rust::language()),
3084 )));
3085
3086 // Connect to a server as 2 clients.
3087 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3088 let client_a = server.create_client(cx_a, "user_a").await;
3089 let client_b = server.create_client(cx_b, "user_b").await;
3090
3091 // Share a project as client A
3092 let project_a = cx_a.update(|cx| {
3093 Project::local(
3094 client_a.clone(),
3095 client_a.user_store.clone(),
3096 lang_registry.clone(),
3097 fs.clone(),
3098 cx,
3099 )
3100 });
3101
3102 let (worktree_a, _) = project_a
3103 .update(cx_a, |p, cx| {
3104 p.find_or_create_local_worktree("/root", true, cx)
3105 })
3106 .await
3107 .unwrap();
3108 worktree_a
3109 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3110 .await;
3111 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3112 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3113 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3114
3115 // Join the worktree as client B.
3116 let project_b = Project::remote(
3117 project_id,
3118 client_b.clone(),
3119 client_b.user_store.clone(),
3120 lang_registry.clone(),
3121 fs.clone(),
3122 &mut cx_b.to_async(),
3123 )
3124 .await
3125 .unwrap();
3126
3127 let buffer_b1 = cx_b
3128 .background()
3129 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3130 .await
3131 .unwrap();
3132
3133 let definitions;
3134 let buffer_b2;
3135 if rng.gen() {
3136 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3137 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3138 } else {
3139 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3140 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3141 }
3142
3143 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3144 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3145 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3146 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3147 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3148 )))
3149 });
3150
3151 let buffer_b2 = buffer_b2.await.unwrap();
3152 let definitions = definitions.await.unwrap();
3153 assert_eq!(definitions.len(), 1);
3154 assert_eq!(definitions[0].buffer, buffer_b2);
3155 }
3156
3157 #[gpui::test(iterations = 10)]
3158 async fn test_collaborating_with_code_actions(
3159 cx_a: &mut TestAppContext,
3160 cx_b: &mut TestAppContext,
3161 ) {
3162 cx_a.foreground().forbid_parking();
3163 let mut lang_registry = Arc::new(LanguageRegistry::test());
3164 let fs = FakeFs::new(cx_a.background());
3165 let mut path_openers_b = Vec::new();
3166 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3167
3168 // Set up a fake language server.
3169 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3170 Arc::get_mut(&mut lang_registry)
3171 .unwrap()
3172 .add(Arc::new(Language::new(
3173 LanguageConfig {
3174 name: "Rust".into(),
3175 path_suffixes: vec!["rs".to_string()],
3176 language_server: Some(language_server_config),
3177 ..Default::default()
3178 },
3179 Some(tree_sitter_rust::language()),
3180 )));
3181
3182 // Connect to a server as 2 clients.
3183 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3184 let client_a = server.create_client(cx_a, "user_a").await;
3185 let client_b = server.create_client(cx_b, "user_b").await;
3186
3187 // Share a project as client A
3188 fs.insert_tree(
3189 "/a",
3190 json!({
3191 ".zed.toml": r#"collaborators = ["user_b"]"#,
3192 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3193 "other.rs": "pub fn foo() -> usize { 4 }",
3194 }),
3195 )
3196 .await;
3197 let project_a = cx_a.update(|cx| {
3198 Project::local(
3199 client_a.clone(),
3200 client_a.user_store.clone(),
3201 lang_registry.clone(),
3202 fs.clone(),
3203 cx,
3204 )
3205 });
3206 let (worktree_a, _) = project_a
3207 .update(cx_a, |p, cx| {
3208 p.find_or_create_local_worktree("/a", true, cx)
3209 })
3210 .await
3211 .unwrap();
3212 worktree_a
3213 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3214 .await;
3215 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3216 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3217 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3218
3219 // Join the worktree as client B.
3220 let project_b = Project::remote(
3221 project_id,
3222 client_b.clone(),
3223 client_b.user_store.clone(),
3224 lang_registry.clone(),
3225 fs.clone(),
3226 &mut cx_b.to_async(),
3227 )
3228 .await
3229 .unwrap();
3230 let mut params = cx_b.update(WorkspaceParams::test);
3231 params.languages = lang_registry.clone();
3232 params.client = client_b.client.clone();
3233 params.user_store = client_b.user_store.clone();
3234 params.project = project_b;
3235 params.path_openers = path_openers_b.into();
3236
3237 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3238 let editor_b = workspace_b
3239 .update(cx_b, |workspace, cx| {
3240 workspace.open_path((worktree_id, "main.rs").into(), cx)
3241 })
3242 .await
3243 .unwrap()
3244 .downcast::<Editor>()
3245 .unwrap();
3246
3247 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3248 fake_language_server
3249 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3250 assert_eq!(
3251 params.text_document.uri,
3252 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3253 );
3254 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3255 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3256 None
3257 })
3258 .next()
3259 .await;
3260
3261 // Move cursor to a location that contains code actions.
3262 editor_b.update(cx_b, |editor, cx| {
3263 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3264 cx.focus(&editor_b);
3265 });
3266
3267 fake_language_server
3268 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3269 assert_eq!(
3270 params.text_document.uri,
3271 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3272 );
3273 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3274 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3275
3276 Some(vec![lsp::CodeActionOrCommand::CodeAction(
3277 lsp::CodeAction {
3278 title: "Inline into all callers".to_string(),
3279 edit: Some(lsp::WorkspaceEdit {
3280 changes: Some(
3281 [
3282 (
3283 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3284 vec![lsp::TextEdit::new(
3285 lsp::Range::new(
3286 lsp::Position::new(1, 22),
3287 lsp::Position::new(1, 34),
3288 ),
3289 "4".to_string(),
3290 )],
3291 ),
3292 (
3293 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3294 vec![lsp::TextEdit::new(
3295 lsp::Range::new(
3296 lsp::Position::new(0, 0),
3297 lsp::Position::new(0, 27),
3298 ),
3299 "".to_string(),
3300 )],
3301 ),
3302 ]
3303 .into_iter()
3304 .collect(),
3305 ),
3306 ..Default::default()
3307 }),
3308 data: Some(json!({
3309 "codeActionParams": {
3310 "range": {
3311 "start": {"line": 1, "column": 31},
3312 "end": {"line": 1, "column": 31},
3313 }
3314 }
3315 })),
3316 ..Default::default()
3317 },
3318 )])
3319 })
3320 .next()
3321 .await;
3322
3323 // Toggle code actions and wait for them to display.
3324 editor_b.update(cx_b, |editor, cx| {
3325 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3326 });
3327 editor_b
3328 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3329 .await;
3330
3331 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3332
3333 // Confirming the code action will trigger a resolve request.
3334 let confirm_action = workspace_b
3335 .update(cx_b, |workspace, cx| {
3336 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3337 })
3338 .unwrap();
3339 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3340 lsp::CodeAction {
3341 title: "Inline into all callers".to_string(),
3342 edit: Some(lsp::WorkspaceEdit {
3343 changes: Some(
3344 [
3345 (
3346 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3347 vec![lsp::TextEdit::new(
3348 lsp::Range::new(
3349 lsp::Position::new(1, 22),
3350 lsp::Position::new(1, 34),
3351 ),
3352 "4".to_string(),
3353 )],
3354 ),
3355 (
3356 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3357 vec![lsp::TextEdit::new(
3358 lsp::Range::new(
3359 lsp::Position::new(0, 0),
3360 lsp::Position::new(0, 27),
3361 ),
3362 "".to_string(),
3363 )],
3364 ),
3365 ]
3366 .into_iter()
3367 .collect(),
3368 ),
3369 ..Default::default()
3370 }),
3371 ..Default::default()
3372 }
3373 });
3374
3375 // After the action is confirmed, an editor containing both modified files is opened.
3376 confirm_action.await.unwrap();
3377 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3378 workspace
3379 .active_item(cx)
3380 .unwrap()
3381 .downcast::<Editor>()
3382 .unwrap()
3383 });
3384 code_action_editor.update(cx_b, |editor, cx| {
3385 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3386 editor.undo(&Undo, cx);
3387 assert_eq!(
3388 editor.text(cx),
3389 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3390 );
3391 editor.redo(&Redo, cx);
3392 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3393 });
3394 }
3395
3396 #[gpui::test(iterations = 10)]
3397 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3398 cx_a.foreground().forbid_parking();
3399 let mut lang_registry = Arc::new(LanguageRegistry::test());
3400 let fs = FakeFs::new(cx_a.background());
3401 let mut path_openers_b = Vec::new();
3402 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3403
3404 // Set up a fake language server.
3405 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3406 Arc::get_mut(&mut lang_registry)
3407 .unwrap()
3408 .add(Arc::new(Language::new(
3409 LanguageConfig {
3410 name: "Rust".into(),
3411 path_suffixes: vec!["rs".to_string()],
3412 language_server: Some(language_server_config),
3413 ..Default::default()
3414 },
3415 Some(tree_sitter_rust::language()),
3416 )));
3417
3418 // Connect to a server as 2 clients.
3419 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3420 let client_a = server.create_client(cx_a, "user_a").await;
3421 let client_b = server.create_client(cx_b, "user_b").await;
3422
3423 // Share a project as client A
3424 fs.insert_tree(
3425 "/dir",
3426 json!({
3427 ".zed.toml": r#"collaborators = ["user_b"]"#,
3428 "one.rs": "const ONE: usize = 1;",
3429 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3430 }),
3431 )
3432 .await;
3433 let project_a = cx_a.update(|cx| {
3434 Project::local(
3435 client_a.clone(),
3436 client_a.user_store.clone(),
3437 lang_registry.clone(),
3438 fs.clone(),
3439 cx,
3440 )
3441 });
3442 let (worktree_a, _) = project_a
3443 .update(cx_a, |p, cx| {
3444 p.find_or_create_local_worktree("/dir", true, cx)
3445 })
3446 .await
3447 .unwrap();
3448 worktree_a
3449 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3450 .await;
3451 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3452 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3453 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3454
3455 // Join the worktree as client B.
3456 let project_b = Project::remote(
3457 project_id,
3458 client_b.clone(),
3459 client_b.user_store.clone(),
3460 lang_registry.clone(),
3461 fs.clone(),
3462 &mut cx_b.to_async(),
3463 )
3464 .await
3465 .unwrap();
3466 let mut params = cx_b.update(WorkspaceParams::test);
3467 params.languages = lang_registry.clone();
3468 params.client = client_b.client.clone();
3469 params.user_store = client_b.user_store.clone();
3470 params.project = project_b;
3471 params.path_openers = path_openers_b.into();
3472
3473 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3474 let editor_b = workspace_b
3475 .update(cx_b, |workspace, cx| {
3476 workspace.open_path((worktree_id, "one.rs").into(), cx)
3477 })
3478 .await
3479 .unwrap()
3480 .downcast::<Editor>()
3481 .unwrap();
3482 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3483
3484 // Move cursor to a location that can be renamed.
3485 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3486 editor.select_ranges([7..7], None, cx);
3487 editor.rename(&Rename, cx).unwrap()
3488 });
3489
3490 fake_language_server
3491 .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3492 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3493 assert_eq!(params.position, lsp::Position::new(0, 7));
3494 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3495 lsp::Position::new(0, 6),
3496 lsp::Position::new(0, 9),
3497 )))
3498 })
3499 .next()
3500 .await
3501 .unwrap();
3502 prepare_rename.await.unwrap();
3503 editor_b.update(cx_b, |editor, cx| {
3504 let rename = editor.pending_rename().unwrap();
3505 let buffer = editor.buffer().read(cx).snapshot(cx);
3506 assert_eq!(
3507 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3508 6..9
3509 );
3510 rename.editor.update(cx, |rename_editor, cx| {
3511 rename_editor.buffer().update(cx, |rename_buffer, cx| {
3512 rename_buffer.edit([0..3], "THREE", cx);
3513 });
3514 });
3515 });
3516
3517 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3518 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3519 });
3520 fake_language_server
3521 .handle_request::<lsp::request::Rename, _>(|params, _| {
3522 assert_eq!(
3523 params.text_document_position.text_document.uri.as_str(),
3524 "file:///dir/one.rs"
3525 );
3526 assert_eq!(
3527 params.text_document_position.position,
3528 lsp::Position::new(0, 6)
3529 );
3530 assert_eq!(params.new_name, "THREE");
3531 Some(lsp::WorkspaceEdit {
3532 changes: Some(
3533 [
3534 (
3535 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3536 vec![lsp::TextEdit::new(
3537 lsp::Range::new(
3538 lsp::Position::new(0, 6),
3539 lsp::Position::new(0, 9),
3540 ),
3541 "THREE".to_string(),
3542 )],
3543 ),
3544 (
3545 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3546 vec![
3547 lsp::TextEdit::new(
3548 lsp::Range::new(
3549 lsp::Position::new(0, 24),
3550 lsp::Position::new(0, 27),
3551 ),
3552 "THREE".to_string(),
3553 ),
3554 lsp::TextEdit::new(
3555 lsp::Range::new(
3556 lsp::Position::new(0, 35),
3557 lsp::Position::new(0, 38),
3558 ),
3559 "THREE".to_string(),
3560 ),
3561 ],
3562 ),
3563 ]
3564 .into_iter()
3565 .collect(),
3566 ),
3567 ..Default::default()
3568 })
3569 })
3570 .next()
3571 .await
3572 .unwrap();
3573 confirm_rename.await.unwrap();
3574
3575 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3576 workspace
3577 .active_item(cx)
3578 .unwrap()
3579 .downcast::<Editor>()
3580 .unwrap()
3581 });
3582 rename_editor.update(cx_b, |editor, cx| {
3583 assert_eq!(
3584 editor.text(cx),
3585 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3586 );
3587 editor.undo(&Undo, cx);
3588 assert_eq!(
3589 editor.text(cx),
3590 "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3591 );
3592 editor.redo(&Redo, cx);
3593 assert_eq!(
3594 editor.text(cx),
3595 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3596 );
3597 });
3598
3599 // Ensure temporary rename edits cannot be undone/redone.
3600 editor_b.update(cx_b, |editor, cx| {
3601 editor.undo(&Undo, cx);
3602 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3603 editor.undo(&Undo, cx);
3604 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3605 editor.redo(&Redo, cx);
3606 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3607 })
3608 }
3609
3610 #[gpui::test(iterations = 10)]
3611 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3612 cx_a.foreground().forbid_parking();
3613
3614 // Connect to a server as 2 clients.
3615 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3616 let client_a = server.create_client(cx_a, "user_a").await;
3617 let client_b = server.create_client(cx_b, "user_b").await;
3618
3619 // Create an org that includes these 2 users.
3620 let db = &server.app_state.db;
3621 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3622 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3623 .await
3624 .unwrap();
3625 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3626 .await
3627 .unwrap();
3628
3629 // Create a channel that includes all the users.
3630 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3631 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3632 .await
3633 .unwrap();
3634 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3635 .await
3636 .unwrap();
3637 db.create_channel_message(
3638 channel_id,
3639 client_b.current_user_id(&cx_b),
3640 "hello A, it's B.",
3641 OffsetDateTime::now_utc(),
3642 1,
3643 )
3644 .await
3645 .unwrap();
3646
3647 let channels_a = cx_a
3648 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3649 channels_a
3650 .condition(cx_a, |list, _| list.available_channels().is_some())
3651 .await;
3652 channels_a.read_with(cx_a, |list, _| {
3653 assert_eq!(
3654 list.available_channels().unwrap(),
3655 &[ChannelDetails {
3656 id: channel_id.to_proto(),
3657 name: "test-channel".to_string()
3658 }]
3659 )
3660 });
3661 let channel_a = channels_a.update(cx_a, |this, cx| {
3662 this.get_channel(channel_id.to_proto(), cx).unwrap()
3663 });
3664 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3665 channel_a
3666 .condition(&cx_a, |channel, _| {
3667 channel_messages(channel)
3668 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3669 })
3670 .await;
3671
3672 let channels_b = cx_b
3673 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3674 channels_b
3675 .condition(cx_b, |list, _| list.available_channels().is_some())
3676 .await;
3677 channels_b.read_with(cx_b, |list, _| {
3678 assert_eq!(
3679 list.available_channels().unwrap(),
3680 &[ChannelDetails {
3681 id: channel_id.to_proto(),
3682 name: "test-channel".to_string()
3683 }]
3684 )
3685 });
3686
3687 let channel_b = channels_b.update(cx_b, |this, cx| {
3688 this.get_channel(channel_id.to_proto(), cx).unwrap()
3689 });
3690 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3691 channel_b
3692 .condition(&cx_b, |channel, _| {
3693 channel_messages(channel)
3694 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3695 })
3696 .await;
3697
3698 channel_a
3699 .update(cx_a, |channel, cx| {
3700 channel
3701 .send_message("oh, hi B.".to_string(), cx)
3702 .unwrap()
3703 .detach();
3704 let task = channel.send_message("sup".to_string(), cx).unwrap();
3705 assert_eq!(
3706 channel_messages(channel),
3707 &[
3708 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3709 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3710 ("user_a".to_string(), "sup".to_string(), true)
3711 ]
3712 );
3713 task
3714 })
3715 .await
3716 .unwrap();
3717
3718 channel_b
3719 .condition(&cx_b, |channel, _| {
3720 channel_messages(channel)
3721 == [
3722 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3723 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3724 ("user_a".to_string(), "sup".to_string(), false),
3725 ]
3726 })
3727 .await;
3728
3729 assert_eq!(
3730 server
3731 .state()
3732 .await
3733 .channel(channel_id)
3734 .unwrap()
3735 .connection_ids
3736 .len(),
3737 2
3738 );
3739 cx_b.update(|_| drop(channel_b));
3740 server
3741 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3742 .await;
3743
3744 cx_a.update(|_| drop(channel_a));
3745 server
3746 .condition(|state| state.channel(channel_id).is_none())
3747 .await;
3748 }
3749
3750 #[gpui::test(iterations = 10)]
3751 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3752 cx_a.foreground().forbid_parking();
3753
3754 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3755 let client_a = server.create_client(cx_a, "user_a").await;
3756
3757 let db = &server.app_state.db;
3758 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3759 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3760 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3761 .await
3762 .unwrap();
3763 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3764 .await
3765 .unwrap();
3766
3767 let channels_a = cx_a
3768 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3769 channels_a
3770 .condition(cx_a, |list, _| list.available_channels().is_some())
3771 .await;
3772 let channel_a = channels_a.update(cx_a, |this, cx| {
3773 this.get_channel(channel_id.to_proto(), cx).unwrap()
3774 });
3775
3776 // Messages aren't allowed to be too long.
3777 channel_a
3778 .update(cx_a, |channel, cx| {
3779 let long_body = "this is long.\n".repeat(1024);
3780 channel.send_message(long_body, cx).unwrap()
3781 })
3782 .await
3783 .unwrap_err();
3784
3785 // Messages aren't allowed to be blank.
3786 channel_a.update(cx_a, |channel, cx| {
3787 channel.send_message(String::new(), cx).unwrap_err()
3788 });
3789
3790 // Leading and trailing whitespace are trimmed.
3791 channel_a
3792 .update(cx_a, |channel, cx| {
3793 channel
3794 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3795 .unwrap()
3796 })
3797 .await
3798 .unwrap();
3799 assert_eq!(
3800 db.get_channel_messages(channel_id, 10, None)
3801 .await
3802 .unwrap()
3803 .iter()
3804 .map(|m| &m.body)
3805 .collect::<Vec<_>>(),
3806 &["surrounded by whitespace"]
3807 );
3808 }
3809
3810 #[gpui::test(iterations = 10)]
3811 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3812 cx_a.foreground().forbid_parking();
3813
3814 // Connect to a server as 2 clients.
3815 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3816 let client_a = server.create_client(cx_a, "user_a").await;
3817 let client_b = server.create_client(cx_b, "user_b").await;
3818 let mut status_b = client_b.status();
3819
3820 // Create an org that includes these 2 users.
3821 let db = &server.app_state.db;
3822 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3823 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3824 .await
3825 .unwrap();
3826 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3827 .await
3828 .unwrap();
3829
3830 // Create a channel that includes all the users.
3831 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3832 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3833 .await
3834 .unwrap();
3835 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3836 .await
3837 .unwrap();
3838 db.create_channel_message(
3839 channel_id,
3840 client_b.current_user_id(&cx_b),
3841 "hello A, it's B.",
3842 OffsetDateTime::now_utc(),
3843 2,
3844 )
3845 .await
3846 .unwrap();
3847
3848 let channels_a = cx_a
3849 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3850 channels_a
3851 .condition(cx_a, |list, _| list.available_channels().is_some())
3852 .await;
3853
3854 channels_a.read_with(cx_a, |list, _| {
3855 assert_eq!(
3856 list.available_channels().unwrap(),
3857 &[ChannelDetails {
3858 id: channel_id.to_proto(),
3859 name: "test-channel".to_string()
3860 }]
3861 )
3862 });
3863 let channel_a = channels_a.update(cx_a, |this, cx| {
3864 this.get_channel(channel_id.to_proto(), cx).unwrap()
3865 });
3866 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3867 channel_a
3868 .condition(&cx_a, |channel, _| {
3869 channel_messages(channel)
3870 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3871 })
3872 .await;
3873
3874 let channels_b = cx_b
3875 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3876 channels_b
3877 .condition(cx_b, |list, _| list.available_channels().is_some())
3878 .await;
3879 channels_b.read_with(cx_b, |list, _| {
3880 assert_eq!(
3881 list.available_channels().unwrap(),
3882 &[ChannelDetails {
3883 id: channel_id.to_proto(),
3884 name: "test-channel".to_string()
3885 }]
3886 )
3887 });
3888
3889 let channel_b = channels_b.update(cx_b, |this, cx| {
3890 this.get_channel(channel_id.to_proto(), cx).unwrap()
3891 });
3892 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3893 channel_b
3894 .condition(&cx_b, |channel, _| {
3895 channel_messages(channel)
3896 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3897 })
3898 .await;
3899
3900 // Disconnect client B, ensuring we can still access its cached channel data.
3901 server.forbid_connections();
3902 server.disconnect_client(client_b.current_user_id(&cx_b));
3903 cx_b.foreground().advance_clock(Duration::from_secs(3));
3904 while !matches!(
3905 status_b.next().await,
3906 Some(client::Status::ReconnectionError { .. })
3907 ) {}
3908
3909 channels_b.read_with(cx_b, |channels, _| {
3910 assert_eq!(
3911 channels.available_channels().unwrap(),
3912 [ChannelDetails {
3913 id: channel_id.to_proto(),
3914 name: "test-channel".to_string()
3915 }]
3916 )
3917 });
3918 channel_b.read_with(cx_b, |channel, _| {
3919 assert_eq!(
3920 channel_messages(channel),
3921 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3922 )
3923 });
3924
3925 // Send a message from client B while it is disconnected.
3926 channel_b
3927 .update(cx_b, |channel, cx| {
3928 let task = channel
3929 .send_message("can you see this?".to_string(), cx)
3930 .unwrap();
3931 assert_eq!(
3932 channel_messages(channel),
3933 &[
3934 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3935 ("user_b".to_string(), "can you see this?".to_string(), true)
3936 ]
3937 );
3938 task
3939 })
3940 .await
3941 .unwrap_err();
3942
3943 // Send a message from client A while B is disconnected.
3944 channel_a
3945 .update(cx_a, |channel, cx| {
3946 channel
3947 .send_message("oh, hi B.".to_string(), cx)
3948 .unwrap()
3949 .detach();
3950 let task = channel.send_message("sup".to_string(), cx).unwrap();
3951 assert_eq!(
3952 channel_messages(channel),
3953 &[
3954 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3955 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3956 ("user_a".to_string(), "sup".to_string(), true)
3957 ]
3958 );
3959 task
3960 })
3961 .await
3962 .unwrap();
3963
3964 // Give client B a chance to reconnect.
3965 server.allow_connections();
3966 cx_b.foreground().advance_clock(Duration::from_secs(10));
3967
3968 // Verify that B sees the new messages upon reconnection, as well as the message client B
3969 // sent while offline.
3970 channel_b
3971 .condition(&cx_b, |channel, _| {
3972 channel_messages(channel)
3973 == [
3974 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3975 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3976 ("user_a".to_string(), "sup".to_string(), false),
3977 ("user_b".to_string(), "can you see this?".to_string(), false),
3978 ]
3979 })
3980 .await;
3981
3982 // Ensure client A and B can communicate normally after reconnection.
3983 channel_a
3984 .update(cx_a, |channel, cx| {
3985 channel.send_message("you online?".to_string(), cx).unwrap()
3986 })
3987 .await
3988 .unwrap();
3989 channel_b
3990 .condition(&cx_b, |channel, _| {
3991 channel_messages(channel)
3992 == [
3993 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3994 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3995 ("user_a".to_string(), "sup".to_string(), false),
3996 ("user_b".to_string(), "can you see this?".to_string(), false),
3997 ("user_a".to_string(), "you online?".to_string(), false),
3998 ]
3999 })
4000 .await;
4001
4002 channel_b
4003 .update(cx_b, |channel, cx| {
4004 channel.send_message("yep".to_string(), cx).unwrap()
4005 })
4006 .await
4007 .unwrap();
4008 channel_a
4009 .condition(&cx_a, |channel, _| {
4010 channel_messages(channel)
4011 == [
4012 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4013 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4014 ("user_a".to_string(), "sup".to_string(), false),
4015 ("user_b".to_string(), "can you see this?".to_string(), false),
4016 ("user_a".to_string(), "you online?".to_string(), false),
4017 ("user_b".to_string(), "yep".to_string(), false),
4018 ]
4019 })
4020 .await;
4021 }
4022
4023 #[gpui::test(iterations = 10)]
4024 async fn test_contacts(
4025 cx_a: &mut TestAppContext,
4026 cx_b: &mut TestAppContext,
4027 cx_c: &mut TestAppContext,
4028 ) {
4029 cx_a.foreground().forbid_parking();
4030 let lang_registry = Arc::new(LanguageRegistry::test());
4031 let fs = FakeFs::new(cx_a.background());
4032
4033 // Connect to a server as 3 clients.
4034 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4035 let client_a = server.create_client(cx_a, "user_a").await;
4036 let client_b = server.create_client(cx_b, "user_b").await;
4037 let client_c = server.create_client(cx_c, "user_c").await;
4038
4039 // Share a worktree as client A.
4040 fs.insert_tree(
4041 "/a",
4042 json!({
4043 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4044 }),
4045 )
4046 .await;
4047
4048 let project_a = cx_a.update(|cx| {
4049 Project::local(
4050 client_a.clone(),
4051 client_a.user_store.clone(),
4052 lang_registry.clone(),
4053 fs.clone(),
4054 cx,
4055 )
4056 });
4057 let (worktree_a, _) = project_a
4058 .update(cx_a, |p, cx| {
4059 p.find_or_create_local_worktree("/a", true, cx)
4060 })
4061 .await
4062 .unwrap();
4063 worktree_a
4064 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4065 .await;
4066
4067 client_a
4068 .user_store
4069 .condition(&cx_a, |user_store, _| {
4070 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4071 })
4072 .await;
4073 client_b
4074 .user_store
4075 .condition(&cx_b, |user_store, _| {
4076 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4077 })
4078 .await;
4079 client_c
4080 .user_store
4081 .condition(&cx_c, |user_store, _| {
4082 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4083 })
4084 .await;
4085
4086 let project_id = project_a
4087 .update(cx_a, |project, _| project.next_remote_id())
4088 .await;
4089 project_a
4090 .update(cx_a, |project, cx| project.share(cx))
4091 .await
4092 .unwrap();
4093
4094 let _project_b = Project::remote(
4095 project_id,
4096 client_b.clone(),
4097 client_b.user_store.clone(),
4098 lang_registry.clone(),
4099 fs.clone(),
4100 &mut cx_b.to_async(),
4101 )
4102 .await
4103 .unwrap();
4104
4105 client_a
4106 .user_store
4107 .condition(&cx_a, |user_store, _| {
4108 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4109 })
4110 .await;
4111 client_b
4112 .user_store
4113 .condition(&cx_b, |user_store, _| {
4114 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4115 })
4116 .await;
4117 client_c
4118 .user_store
4119 .condition(&cx_c, |user_store, _| {
4120 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4121 })
4122 .await;
4123
4124 project_a
4125 .condition(&cx_a, |project, _| {
4126 project.collaborators().contains_key(&client_b.peer_id)
4127 })
4128 .await;
4129
4130 cx_a.update(move |_| drop(project_a));
4131 client_a
4132 .user_store
4133 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4134 .await;
4135 client_b
4136 .user_store
4137 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4138 .await;
4139 client_c
4140 .user_store
4141 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4142 .await;
4143
4144 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4145 user_store
4146 .contacts()
4147 .iter()
4148 .map(|contact| {
4149 let worktrees = contact
4150 .projects
4151 .iter()
4152 .map(|p| {
4153 (
4154 p.worktree_root_names[0].as_str(),
4155 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4156 )
4157 })
4158 .collect();
4159 (contact.user.github_login.as_str(), worktrees)
4160 })
4161 .collect()
4162 }
4163 }
4164
4165 #[gpui::test(iterations = 100)]
4166 async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4167 cx.foreground().forbid_parking();
4168 let max_peers = env::var("MAX_PEERS")
4169 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4170 .unwrap_or(5);
4171 let max_operations = env::var("OPERATIONS")
4172 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4173 .unwrap_or(10);
4174
4175 let rng = Arc::new(Mutex::new(rng));
4176
4177 let guest_lang_registry = Arc::new(LanguageRegistry::test());
4178 let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4179
4180 let fs = FakeFs::new(cx.background());
4181 fs.insert_tree(
4182 "/_collab",
4183 json!({
4184 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4185 }),
4186 )
4187 .await;
4188
4189 let operations = Rc::new(Cell::new(0));
4190 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4191 let mut clients = Vec::new();
4192
4193 let mut next_entity_id = 100000;
4194 let mut host_cx = TestAppContext::new(
4195 cx.foreground_platform(),
4196 cx.platform(),
4197 cx.foreground(),
4198 cx.background(),
4199 cx.font_cache(),
4200 cx.leak_detector(),
4201 next_entity_id,
4202 );
4203 let host = server.create_client(&mut host_cx, "host").await;
4204 let host_project = host_cx.update(|cx| {
4205 Project::local(
4206 host.client.clone(),
4207 host.user_store.clone(),
4208 Arc::new(LanguageRegistry::test()),
4209 fs.clone(),
4210 cx,
4211 )
4212 });
4213 let host_project_id = host_project
4214 .update(&mut host_cx, |p, _| p.next_remote_id())
4215 .await;
4216
4217 let (collab_worktree, _) = host_project
4218 .update(&mut host_cx, |project, cx| {
4219 project.find_or_create_local_worktree("/_collab", true, cx)
4220 })
4221 .await
4222 .unwrap();
4223 collab_worktree
4224 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4225 .await;
4226 host_project
4227 .update(&mut host_cx, |project, cx| project.share(cx))
4228 .await
4229 .unwrap();
4230
4231 clients.push(cx.foreground().spawn(host.simulate_host(
4232 host_project,
4233 language_server_config,
4234 operations.clone(),
4235 max_operations,
4236 rng.clone(),
4237 host_cx,
4238 )));
4239
4240 while operations.get() < max_operations {
4241 cx.background().simulate_random_delay().await;
4242 if clients.len() >= max_peers {
4243 break;
4244 } else if rng.lock().gen_bool(0.05) {
4245 operations.set(operations.get() + 1);
4246
4247 let guest_id = clients.len();
4248 log::info!("Adding guest {}", guest_id);
4249 next_entity_id += 100000;
4250 let mut guest_cx = TestAppContext::new(
4251 cx.foreground_platform(),
4252 cx.platform(),
4253 cx.foreground(),
4254 cx.background(),
4255 cx.font_cache(),
4256 cx.leak_detector(),
4257 next_entity_id,
4258 );
4259 let guest = server
4260 .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4261 .await;
4262 let guest_project = Project::remote(
4263 host_project_id,
4264 guest.client.clone(),
4265 guest.user_store.clone(),
4266 guest_lang_registry.clone(),
4267 FakeFs::new(cx.background()),
4268 &mut guest_cx.to_async(),
4269 )
4270 .await
4271 .unwrap();
4272 clients.push(cx.foreground().spawn(guest.simulate_guest(
4273 guest_id,
4274 guest_project,
4275 operations.clone(),
4276 max_operations,
4277 rng.clone(),
4278 guest_cx,
4279 )));
4280
4281 log::info!("Guest {} added", guest_id);
4282 }
4283 }
4284
4285 let mut clients = futures::future::join_all(clients).await;
4286 cx.foreground().run_until_parked();
4287
4288 let (host_client, mut host_cx) = clients.remove(0);
4289 let host_project = host_client.project.as_ref().unwrap();
4290 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4291 project
4292 .worktrees(cx)
4293 .map(|worktree| {
4294 let snapshot = worktree.read(cx).snapshot();
4295 (snapshot.id(), snapshot)
4296 })
4297 .collect::<BTreeMap<_, _>>()
4298 });
4299
4300 host_client
4301 .project
4302 .as_ref()
4303 .unwrap()
4304 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4305
4306 for (guest_client, mut guest_cx) in clients.into_iter() {
4307 let guest_id = guest_client.client.id();
4308 let worktree_snapshots =
4309 guest_client
4310 .project
4311 .as_ref()
4312 .unwrap()
4313 .read_with(&guest_cx, |project, cx| {
4314 project
4315 .worktrees(cx)
4316 .map(|worktree| {
4317 let worktree = worktree.read(cx);
4318 (worktree.id(), worktree.snapshot())
4319 })
4320 .collect::<BTreeMap<_, _>>()
4321 });
4322
4323 assert_eq!(
4324 worktree_snapshots.keys().collect::<Vec<_>>(),
4325 host_worktree_snapshots.keys().collect::<Vec<_>>(),
4326 "guest {} has different worktrees than the host",
4327 guest_id
4328 );
4329 for (id, host_snapshot) in &host_worktree_snapshots {
4330 let guest_snapshot = &worktree_snapshots[id];
4331 assert_eq!(
4332 guest_snapshot.root_name(),
4333 host_snapshot.root_name(),
4334 "guest {} has different root name than the host for worktree {}",
4335 guest_id,
4336 id
4337 );
4338 assert_eq!(
4339 guest_snapshot.entries(false).collect::<Vec<_>>(),
4340 host_snapshot.entries(false).collect::<Vec<_>>(),
4341 "guest {} has different snapshot than the host for worktree {}",
4342 guest_id,
4343 id
4344 );
4345 }
4346
4347 guest_client
4348 .project
4349 .as_ref()
4350 .unwrap()
4351 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4352
4353 for guest_buffer in &guest_client.buffers {
4354 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4355 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4356 project.buffer_for_id(buffer_id, cx).expect(&format!(
4357 "host does not have buffer for guest:{}, peer:{}, id:{}",
4358 guest_id, guest_client.peer_id, buffer_id
4359 ))
4360 });
4361 let path = host_buffer
4362 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4363
4364 assert_eq!(
4365 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4366 0,
4367 "guest {}, buffer {}, path {:?} has deferred operations",
4368 guest_id,
4369 buffer_id,
4370 path,
4371 );
4372 assert_eq!(
4373 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4374 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4375 "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4376 guest_id,
4377 buffer_id,
4378 path
4379 );
4380 }
4381
4382 guest_cx.update(|_| drop(guest_client));
4383 }
4384
4385 host_cx.update(|_| drop(host_client));
4386 }
4387
4388 struct TestServer {
4389 peer: Arc<Peer>,
4390 app_state: Arc<AppState>,
4391 server: Arc<Server>,
4392 foreground: Rc<executor::Foreground>,
4393 notifications: mpsc::UnboundedReceiver<()>,
4394 connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4395 forbid_connections: Arc<AtomicBool>,
4396 _test_db: TestDb,
4397 }
4398
4399 impl TestServer {
4400 async fn start(
4401 foreground: Rc<executor::Foreground>,
4402 background: Arc<executor::Background>,
4403 ) -> Self {
4404 let test_db = TestDb::fake(background);
4405 let app_state = Self::build_app_state(&test_db).await;
4406 let peer = Peer::new();
4407 let notifications = mpsc::unbounded();
4408 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4409 Self {
4410 peer,
4411 app_state,
4412 server,
4413 foreground,
4414 notifications: notifications.1,
4415 connection_killers: Default::default(),
4416 forbid_connections: Default::default(),
4417 _test_db: test_db,
4418 }
4419 }
4420
4421 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4422 let http = FakeHttpClient::with_404_response();
4423 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4424 let client_name = name.to_string();
4425 let mut client = Client::new(http.clone());
4426 let server = self.server.clone();
4427 let connection_killers = self.connection_killers.clone();
4428 let forbid_connections = self.forbid_connections.clone();
4429 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4430
4431 Arc::get_mut(&mut client)
4432 .unwrap()
4433 .override_authenticate(move |cx| {
4434 cx.spawn(|_| async move {
4435 let access_token = "the-token".to_string();
4436 Ok(Credentials {
4437 user_id: user_id.0 as u64,
4438 access_token,
4439 })
4440 })
4441 })
4442 .override_establish_connection(move |credentials, cx| {
4443 assert_eq!(credentials.user_id, user_id.0 as u64);
4444 assert_eq!(credentials.access_token, "the-token");
4445
4446 let server = server.clone();
4447 let connection_killers = connection_killers.clone();
4448 let forbid_connections = forbid_connections.clone();
4449 let client_name = client_name.clone();
4450 let connection_id_tx = connection_id_tx.clone();
4451 cx.spawn(move |cx| async move {
4452 if forbid_connections.load(SeqCst) {
4453 Err(EstablishConnectionError::other(anyhow!(
4454 "server is forbidding connections"
4455 )))
4456 } else {
4457 let (client_conn, server_conn, kill_conn) =
4458 Connection::in_memory(cx.background());
4459 connection_killers.lock().insert(user_id, kill_conn);
4460 cx.background()
4461 .spawn(server.handle_connection(
4462 server_conn,
4463 client_name,
4464 user_id,
4465 Some(connection_id_tx),
4466 cx.background(),
4467 ))
4468 .detach();
4469 Ok(client_conn)
4470 }
4471 })
4472 });
4473
4474 client
4475 .authenticate_and_connect(&cx.to_async())
4476 .await
4477 .unwrap();
4478
4479 Channel::init(&client);
4480 Project::init(&client);
4481
4482 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4483 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4484 let mut authed_user =
4485 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4486 while authed_user.next().await.unwrap().is_none() {}
4487
4488 TestClient {
4489 client,
4490 peer_id,
4491 user_store,
4492 project: Default::default(),
4493 buffers: Default::default(),
4494 }
4495 }
4496
4497 fn disconnect_client(&self, user_id: UserId) {
4498 self.connection_killers.lock().remove(&user_id);
4499 }
4500
4501 fn forbid_connections(&self) {
4502 self.forbid_connections.store(true, SeqCst);
4503 }
4504
4505 fn allow_connections(&self) {
4506 self.forbid_connections.store(false, SeqCst);
4507 }
4508
4509 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4510 let mut config = Config::default();
4511 config.session_secret = "a".repeat(32);
4512 config.database_url = test_db.url.clone();
4513 let github_client = github::AppClient::test();
4514 Arc::new(AppState {
4515 db: test_db.db().clone(),
4516 handlebars: Default::default(),
4517 auth_client: auth::build_client("", ""),
4518 repo_client: github::RepoClient::test(&github_client),
4519 github_client,
4520 config,
4521 })
4522 }
4523
4524 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4525 self.server.store.read()
4526 }
4527
4528 async fn condition<F>(&mut self, mut predicate: F)
4529 where
4530 F: FnMut(&Store) -> bool,
4531 {
4532 async_std::future::timeout(Duration::from_millis(500), async {
4533 while !(predicate)(&*self.server.store.read()) {
4534 self.foreground.start_waiting();
4535 self.notifications.next().await;
4536 self.foreground.finish_waiting();
4537 }
4538 })
4539 .await
4540 .expect("condition timed out");
4541 }
4542 }
4543
4544 impl Drop for TestServer {
4545 fn drop(&mut self) {
4546 self.peer.reset();
4547 }
4548 }
4549
4550 struct TestClient {
4551 client: Arc<Client>,
4552 pub peer_id: PeerId,
4553 pub user_store: ModelHandle<UserStore>,
4554 project: Option<ModelHandle<Project>>,
4555 buffers: HashSet<ModelHandle<language::Buffer>>,
4556 }
4557
4558 impl Deref for TestClient {
4559 type Target = Arc<Client>;
4560
4561 fn deref(&self) -> &Self::Target {
4562 &self.client
4563 }
4564 }
4565
4566 impl TestClient {
4567 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4568 UserId::from_proto(
4569 self.user_store
4570 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4571 )
4572 }
4573
4574 fn simulate_host(
4575 mut self,
4576 project: ModelHandle<Project>,
4577 mut language_server_config: LanguageServerConfig,
4578 operations: Rc<Cell<usize>>,
4579 max_operations: usize,
4580 rng: Arc<Mutex<StdRng>>,
4581 mut cx: TestAppContext,
4582 ) -> impl Future<Output = (Self, TestAppContext)> {
4583 let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4584
4585 // Set up a fake language server.
4586 language_server_config.set_fake_initializer({
4587 let rng = rng.clone();
4588 let files = files.clone();
4589 let project = project.downgrade();
4590 move |fake_server| {
4591 fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4592 Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4593 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4594 range: lsp::Range::new(
4595 lsp::Position::new(0, 0),
4596 lsp::Position::new(0, 0),
4597 ),
4598 new_text: "the-new-text".to_string(),
4599 })),
4600 ..Default::default()
4601 }]))
4602 });
4603
4604 fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4605 Some(vec![lsp::CodeActionOrCommand::CodeAction(
4606 lsp::CodeAction {
4607 title: "the-code-action".to_string(),
4608 ..Default::default()
4609 },
4610 )])
4611 });
4612
4613 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4614 |params, _| {
4615 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4616 params.position,
4617 params.position,
4618 )))
4619 },
4620 );
4621
4622 fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4623 let files = files.clone();
4624 let rng = rng.clone();
4625 move |_, _| {
4626 let files = files.lock();
4627 let mut rng = rng.lock();
4628 let count = rng.gen_range::<usize, _>(1..3);
4629 let files = (0..count)
4630 .map(|_| files.choose(&mut *rng).unwrap())
4631 .collect::<Vec<_>>();
4632 log::info!("LSP: Returning definitions in files {:?}", &files);
4633 Some(lsp::GotoDefinitionResponse::Array(
4634 files
4635 .into_iter()
4636 .map(|file| lsp::Location {
4637 uri: lsp::Url::from_file_path(file).unwrap(),
4638 range: Default::default(),
4639 })
4640 .collect(),
4641 ))
4642 }
4643 });
4644
4645 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4646 let rng = rng.clone();
4647 let project = project.clone();
4648 move |params, mut cx| {
4649 if let Some(project) = project.upgrade(&cx) {
4650 project.update(&mut cx, |project, cx| {
4651 let path = params
4652 .text_document_position_params
4653 .text_document
4654 .uri
4655 .to_file_path()
4656 .unwrap();
4657 let (worktree, relative_path) =
4658 project.find_local_worktree(&path, cx)?;
4659 let project_path =
4660 ProjectPath::from((worktree.read(cx).id(), relative_path));
4661 let buffer =
4662 project.get_open_buffer(&project_path, cx)?.read(cx);
4663
4664 let mut highlights = Vec::new();
4665 let highlight_count = rng.lock().gen_range(1..=5);
4666 let mut prev_end = 0;
4667 for _ in 0..highlight_count {
4668 let range =
4669 buffer.random_byte_range(prev_end, &mut *rng.lock());
4670 let start = buffer
4671 .offset_to_point_utf16(range.start)
4672 .to_lsp_position();
4673 let end = buffer
4674 .offset_to_point_utf16(range.end)
4675 .to_lsp_position();
4676 highlights.push(lsp::DocumentHighlight {
4677 range: lsp::Range::new(start, end),
4678 kind: Some(lsp::DocumentHighlightKind::READ),
4679 });
4680 prev_end = range.end;
4681 }
4682 Some(highlights)
4683 })
4684 } else {
4685 None
4686 }
4687 }
4688 });
4689 }
4690 });
4691
4692 project.update(&mut cx, |project, _| {
4693 project.languages().add(Arc::new(Language::new(
4694 LanguageConfig {
4695 name: "Rust".into(),
4696 path_suffixes: vec!["rs".to_string()],
4697 language_server: Some(language_server_config),
4698 ..Default::default()
4699 },
4700 None,
4701 )));
4702 });
4703
4704 async move {
4705 let fs = project.read_with(&cx, |project, _| project.fs().clone());
4706 while operations.get() < max_operations {
4707 operations.set(operations.get() + 1);
4708
4709 let distribution = rng.lock().gen_range::<usize, _>(0..100);
4710 match distribution {
4711 0..=20 if !files.lock().is_empty() => {
4712 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4713 let mut path = path.as_path();
4714 while let Some(parent_path) = path.parent() {
4715 path = parent_path;
4716 if rng.lock().gen() {
4717 break;
4718 }
4719 }
4720
4721 log::info!("Host: find/create local worktree {:?}", path);
4722 let find_or_create_worktree = project.update(&mut cx, |project, cx| {
4723 project.find_or_create_local_worktree(path, true, cx)
4724 });
4725 let find_or_create_worktree = async move {
4726 find_or_create_worktree.await.unwrap();
4727 };
4728 if rng.lock().gen() {
4729 cx.background().spawn(find_or_create_worktree).detach();
4730 } else {
4731 find_or_create_worktree.await;
4732 }
4733 }
4734 10..=80 if !files.lock().is_empty() => {
4735 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4736 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4737 let (worktree, path) = project
4738 .update(&mut cx, |project, cx| {
4739 project.find_or_create_local_worktree(
4740 file.clone(),
4741 true,
4742 cx,
4743 )
4744 })
4745 .await
4746 .unwrap();
4747 let project_path =
4748 worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4749 log::info!(
4750 "Host: opening path {:?}, worktree {}, relative_path {:?}",
4751 file,
4752 project_path.0,
4753 project_path.1
4754 );
4755 let buffer = project
4756 .update(&mut cx, |project, cx| {
4757 project.open_buffer(project_path, cx)
4758 })
4759 .await
4760 .unwrap();
4761 self.buffers.insert(buffer.clone());
4762 buffer
4763 } else {
4764 self.buffers
4765 .iter()
4766 .choose(&mut *rng.lock())
4767 .unwrap()
4768 .clone()
4769 };
4770
4771 if rng.lock().gen_bool(0.1) {
4772 cx.update(|cx| {
4773 log::info!(
4774 "Host: dropping buffer {:?}",
4775 buffer.read(cx).file().unwrap().full_path(cx)
4776 );
4777 self.buffers.remove(&buffer);
4778 drop(buffer);
4779 });
4780 } else {
4781 buffer.update(&mut cx, |buffer, cx| {
4782 log::info!(
4783 "Host: updating buffer {:?} ({})",
4784 buffer.file().unwrap().full_path(cx),
4785 buffer.remote_id()
4786 );
4787 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4788 });
4789 }
4790 }
4791 _ => loop {
4792 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4793 let mut path = PathBuf::new();
4794 path.push("/");
4795 for _ in 0..path_component_count {
4796 let letter = rng.lock().gen_range(b'a'..=b'z');
4797 path.push(std::str::from_utf8(&[letter]).unwrap());
4798 }
4799 path.set_extension("rs");
4800 let parent_path = path.parent().unwrap();
4801
4802 log::info!("Host: creating file {:?}", path,);
4803
4804 if fs.create_dir(&parent_path).await.is_ok()
4805 && fs.create_file(&path, Default::default()).await.is_ok()
4806 {
4807 files.lock().push(path);
4808 break;
4809 } else {
4810 log::info!("Host: cannot create file");
4811 }
4812 },
4813 }
4814
4815 cx.background().simulate_random_delay().await;
4816 }
4817
4818 log::info!("Host done");
4819
4820 self.project = Some(project);
4821 (self, cx)
4822 }
4823 }
4824
4825 pub async fn simulate_guest(
4826 mut self,
4827 guest_id: usize,
4828 project: ModelHandle<Project>,
4829 operations: Rc<Cell<usize>>,
4830 max_operations: usize,
4831 rng: Arc<Mutex<StdRng>>,
4832 mut cx: TestAppContext,
4833 ) -> (Self, TestAppContext) {
4834 while operations.get() < max_operations {
4835 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4836 let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4837 project
4838 .worktrees(&cx)
4839 .filter(|worktree| {
4840 let worktree = worktree.read(cx);
4841 worktree.is_visible()
4842 && worktree.entries(false).any(|e| e.is_file())
4843 })
4844 .choose(&mut *rng.lock())
4845 }) {
4846 worktree
4847 } else {
4848 cx.background().simulate_random_delay().await;
4849 continue;
4850 };
4851
4852 operations.set(operations.get() + 1);
4853 let (worktree_root_name, project_path) =
4854 worktree.read_with(&cx, |worktree, _| {
4855 let entry = worktree
4856 .entries(false)
4857 .filter(|e| e.is_file())
4858 .choose(&mut *rng.lock())
4859 .unwrap();
4860 (
4861 worktree.root_name().to_string(),
4862 (worktree.id(), entry.path.clone()),
4863 )
4864 });
4865 log::info!(
4866 "Guest {}: opening path {:?} in worktree {} ({})",
4867 guest_id,
4868 project_path.1,
4869 project_path.0,
4870 worktree_root_name,
4871 );
4872 let buffer = project
4873 .update(&mut cx, |project, cx| {
4874 project.open_buffer(project_path.clone(), cx)
4875 })
4876 .await
4877 .unwrap();
4878 log::info!(
4879 "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
4880 guest_id,
4881 project_path.1,
4882 project_path.0,
4883 worktree_root_name,
4884 buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4885 );
4886 self.buffers.insert(buffer.clone());
4887 buffer
4888 } else {
4889 operations.set(operations.get() + 1);
4890
4891 self.buffers
4892 .iter()
4893 .choose(&mut *rng.lock())
4894 .unwrap()
4895 .clone()
4896 };
4897
4898 let choice = rng.lock().gen_range(0..100);
4899 match choice {
4900 0..=9 => {
4901 cx.update(|cx| {
4902 log::info!(
4903 "Guest {}: dropping buffer {:?}",
4904 guest_id,
4905 buffer.read(cx).file().unwrap().full_path(cx)
4906 );
4907 self.buffers.remove(&buffer);
4908 drop(buffer);
4909 });
4910 }
4911 10..=19 => {
4912 let completions = project.update(&mut cx, |project, cx| {
4913 log::info!(
4914 "Guest {}: requesting completions for buffer {} ({:?})",
4915 guest_id,
4916 buffer.read(cx).remote_id(),
4917 buffer.read(cx).file().unwrap().full_path(cx)
4918 );
4919 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4920 project.completions(&buffer, offset, cx)
4921 });
4922 let completions = cx.background().spawn(async move {
4923 completions.await.expect("completions request failed");
4924 });
4925 if rng.lock().gen_bool(0.3) {
4926 log::info!("Guest {}: detaching completions request", guest_id);
4927 completions.detach();
4928 } else {
4929 completions.await;
4930 }
4931 }
4932 20..=29 => {
4933 let code_actions = project.update(&mut cx, |project, cx| {
4934 log::info!(
4935 "Guest {}: requesting code actions for buffer {} ({:?})",
4936 guest_id,
4937 buffer.read(cx).remote_id(),
4938 buffer.read(cx).file().unwrap().full_path(cx)
4939 );
4940 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4941 project.code_actions(&buffer, range, cx)
4942 });
4943 let code_actions = cx.background().spawn(async move {
4944 code_actions.await.expect("code actions request failed");
4945 });
4946 if rng.lock().gen_bool(0.3) {
4947 log::info!("Guest {}: detaching code actions request", guest_id);
4948 code_actions.detach();
4949 } else {
4950 code_actions.await;
4951 }
4952 }
4953 30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4954 let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4955 log::info!(
4956 "Guest {}: saving buffer {} ({:?})",
4957 guest_id,
4958 buffer.remote_id(),
4959 buffer.file().unwrap().full_path(cx)
4960 );
4961 (buffer.version(), buffer.save(cx))
4962 });
4963 let save = cx.background().spawn(async move {
4964 let (saved_version, _) = save.await.expect("save request failed");
4965 assert!(saved_version.observed_all(&requested_version));
4966 });
4967 if rng.lock().gen_bool(0.3) {
4968 log::info!("Guest {}: detaching save request", guest_id);
4969 save.detach();
4970 } else {
4971 save.await;
4972 }
4973 }
4974 40..=44 => {
4975 let prepare_rename = project.update(&mut cx, |project, cx| {
4976 log::info!(
4977 "Guest {}: preparing rename for buffer {} ({:?})",
4978 guest_id,
4979 buffer.read(cx).remote_id(),
4980 buffer.read(cx).file().unwrap().full_path(cx)
4981 );
4982 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4983 project.prepare_rename(buffer, offset, cx)
4984 });
4985 let prepare_rename = cx.background().spawn(async move {
4986 prepare_rename.await.expect("prepare rename request failed");
4987 });
4988 if rng.lock().gen_bool(0.3) {
4989 log::info!("Guest {}: detaching prepare rename request", guest_id);
4990 prepare_rename.detach();
4991 } else {
4992 prepare_rename.await;
4993 }
4994 }
4995 45..=49 => {
4996 let definitions = project.update(&mut cx, |project, cx| {
4997 log::info!(
4998 "Guest {}: requesting definitions for buffer {} ({:?})",
4999 guest_id,
5000 buffer.read(cx).remote_id(),
5001 buffer.read(cx).file().unwrap().full_path(cx)
5002 );
5003 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5004 project.definition(&buffer, offset, cx)
5005 });
5006 let definitions = cx.background().spawn(async move {
5007 definitions.await.expect("definitions request failed")
5008 });
5009 if rng.lock().gen_bool(0.3) {
5010 log::info!("Guest {}: detaching definitions request", guest_id);
5011 definitions.detach();
5012 } else {
5013 self.buffers
5014 .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5015 }
5016 }
5017 50..=54 => {
5018 let highlights = project.update(&mut cx, |project, cx| {
5019 log::info!(
5020 "Guest {}: requesting highlights for buffer {} ({:?})",
5021 guest_id,
5022 buffer.read(cx).remote_id(),
5023 buffer.read(cx).file().unwrap().full_path(cx)
5024 );
5025 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5026 project.document_highlights(&buffer, offset, cx)
5027 });
5028 let highlights = cx.background().spawn(async move {
5029 highlights.await.expect("highlights request failed");
5030 });
5031 if rng.lock().gen_bool(0.3) {
5032 log::info!("Guest {}: detaching highlights request", guest_id);
5033 highlights.detach();
5034 } else {
5035 highlights.await;
5036 }
5037 }
5038 55..=59 => {
5039 let search = project.update(&mut cx, |project, cx| {
5040 let query = rng.lock().gen_range('a'..='z');
5041 log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5042 project.search(SearchQuery::text(query, false, false), cx)
5043 });
5044 let search = cx
5045 .background()
5046 .spawn(async move { search.await.expect("search request failed") });
5047 if rng.lock().gen_bool(0.3) {
5048 log::info!("Guest {}: detaching search request", guest_id);
5049 search.detach();
5050 } else {
5051 self.buffers.extend(search.await.into_keys());
5052 }
5053 }
5054 _ => {
5055 buffer.update(&mut cx, |buffer, cx| {
5056 log::info!(
5057 "Guest {}: updating buffer {} ({:?})",
5058 guest_id,
5059 buffer.remote_id(),
5060 buffer.file().unwrap().full_path(cx)
5061 );
5062 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5063 });
5064 }
5065 }
5066 cx.background().simulate_random_delay().await;
5067 }
5068
5069 log::info!("Guest {} done", guest_id);
5070
5071 self.project = Some(project);
5072 (self, cx)
5073 }
5074 }
5075
5076 impl Drop for TestClient {
5077 fn drop(&mut self) {
5078 self.client.tear_down();
5079 }
5080 }
5081
5082 impl Executor for Arc<gpui::executor::Background> {
5083 type Timer = gpui::executor::Timer;
5084
5085 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5086 self.spawn(future).detach();
5087 }
5088
5089 fn timer(&self, duration: Duration) -> Self::Timer {
5090 self.as_ref().timer(duration)
5091 }
5092 }
5093
5094 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5095 channel
5096 .messages()
5097 .cursor::<()>()
5098 .map(|m| {
5099 (
5100 m.sender.github_login.clone(),
5101 m.body.clone(),
5102 m.is_pending(),
5103 )
5104 })
5105 .collect()
5106 }
5107
5108 struct EmptyView;
5109
5110 impl gpui::Entity for EmptyView {
5111 type Event = ();
5112 }
5113
5114 impl gpui::View for EmptyView {
5115 fn ui_name() -> &'static str {
5116 "empty view"
5117 }
5118
5119 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5120 gpui::Element::boxed(gpui::elements::Empty)
5121 }
5122 }
5123}