1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use rpc::{
15 proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
16 Connection, ConnectionId, Peer, TypedEnvelope,
17};
18use sha1::{Digest as _, Sha1};
19use std::{any::TypeId, future::Future, sync::Arc, time::Instant};
20use store::{Store, Worktree};
21use surf::StatusCode;
22use tide::log;
23use tide::{
24 http::headers::{HeaderName, CONNECTION, UPGRADE},
25 Request, Response,
26};
27use time::OffsetDateTime;
28
29type MessageHandler = Box<
30 dyn Send
31 + Sync
32 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
33>;
34
35pub struct Server {
36 peer: Arc<Peer>,
37 store: RwLock<Store>,
38 app_state: Arc<AppState>,
39 handlers: HashMap<TypeId, MessageHandler>,
40 notifications: Option<mpsc::UnboundedSender<()>>,
41}
42
43pub trait Executor {
44 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
45}
46
47pub struct RealExecutor;
48
49const MESSAGE_COUNT_PER_PAGE: usize = 100;
50const MAX_MESSAGE_LEN: usize = 1024;
51
52impl Server {
53 pub fn new(
54 app_state: Arc<AppState>,
55 peer: Arc<Peer>,
56 notifications: Option<mpsc::UnboundedSender<()>>,
57 ) -> Arc<Self> {
58 let mut server = Self {
59 peer,
60 app_state,
61 store: Default::default(),
62 handlers: Default::default(),
63 notifications,
64 };
65
66 server
67 .add_request_handler(Server::ping)
68 .add_request_handler(Server::register_project)
69 .add_message_handler(Server::unregister_project)
70 .add_request_handler(Server::share_project)
71 .add_message_handler(Server::unshare_project)
72 .add_request_handler(Server::join_project)
73 .add_message_handler(Server::leave_project)
74 .add_request_handler(Server::register_worktree)
75 .add_message_handler(Server::unregister_worktree)
76 .add_request_handler(Server::update_worktree)
77 .add_message_handler(Server::update_diagnostic_summary)
78 .add_message_handler(Server::disk_based_diagnostics_updating)
79 .add_message_handler(Server::disk_based_diagnostics_updated)
80 .add_request_handler(Server::get_definition)
81 .add_request_handler(Server::get_references)
82 .add_request_handler(Server::get_document_highlights)
83 .add_request_handler(Server::get_project_symbols)
84 .add_request_handler(Server::open_buffer_for_symbol)
85 .add_request_handler(Server::open_buffer)
86 .add_message_handler(Server::close_buffer)
87 .add_request_handler(Server::update_buffer)
88 .add_message_handler(Server::update_buffer_file)
89 .add_message_handler(Server::buffer_reloaded)
90 .add_message_handler(Server::buffer_saved)
91 .add_request_handler(Server::save_buffer)
92 .add_request_handler(Server::format_buffers)
93 .add_request_handler(Server::get_completions)
94 .add_request_handler(Server::apply_additional_edits_for_completion)
95 .add_request_handler(Server::get_code_actions)
96 .add_request_handler(Server::apply_code_action)
97 .add_request_handler(Server::prepare_rename)
98 .add_request_handler(Server::perform_rename)
99 .add_request_handler(Server::get_channels)
100 .add_request_handler(Server::get_users)
101 .add_request_handler(Server::join_channel)
102 .add_message_handler(Server::leave_channel)
103 .add_request_handler(Server::send_channel_message)
104 .add_request_handler(Server::get_channel_messages);
105
106 Arc::new(server)
107 }
108
109 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
110 where
111 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
112 Fut: 'static + Send + Future<Output = tide::Result<()>>,
113 M: EnvelopedMessage,
114 {
115 let prev_handler = self.handlers.insert(
116 TypeId::of::<M>(),
117 Box::new(move |server, envelope| {
118 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
119 (handler)(server, *envelope).boxed()
120 }),
121 );
122 if prev_handler.is_some() {
123 panic!("registered a handler for the same message twice");
124 }
125 self
126 }
127
128 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
129 where
130 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
131 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
132 M: RequestMessage,
133 {
134 self.add_message_handler(move |server, envelope| {
135 let receipt = envelope.receipt();
136 let response = (handler)(server.clone(), envelope);
137 async move {
138 match response.await {
139 Ok(response) => {
140 server.peer.respond(receipt, response)?;
141 Ok(())
142 }
143 Err(error) => {
144 server.peer.respond_with_error(
145 receipt,
146 proto::Error {
147 message: error.to_string(),
148 },
149 )?;
150 Err(error)
151 }
152 }
153 }
154 })
155 }
156
157 pub fn handle_connection<E: Executor>(
158 self: &Arc<Self>,
159 connection: Connection,
160 addr: String,
161 user_id: UserId,
162 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
163 executor: E,
164 ) -> impl Future<Output = ()> {
165 let mut this = self.clone();
166 async move {
167 let (connection_id, handle_io, mut incoming_rx) =
168 this.peer.add_connection(connection).await;
169
170 if let Some(send_connection_id) = send_connection_id.as_mut() {
171 let _ = send_connection_id.send(connection_id).await;
172 }
173
174 this.state_mut().add_connection(connection_id, user_id);
175 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
176 log::error!("error updating contacts for {:?}: {}", user_id, err);
177 }
178
179 let handle_io = handle_io.fuse();
180 futures::pin_mut!(handle_io);
181 loop {
182 let next_message = incoming_rx.next().fuse();
183 futures::pin_mut!(next_message);
184 futures::select_biased! {
185 result = handle_io => {
186 if let Err(err) = result {
187 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
188 }
189 break;
190 }
191 message = next_message => {
192 if let Some(message) = message {
193 let start_time = Instant::now();
194 let type_name = message.payload_type_name();
195 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
196 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
197 let notifications = this.notifications.clone();
198 let is_background = message.is_background();
199 let handle_message = (handler)(this.clone(), message);
200 let handle_message = async move {
201 if let Err(err) = handle_message.await {
202 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
203 } else {
204 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
205 }
206 if let Some(mut notifications) = notifications {
207 let _ = notifications.send(()).await;
208 }
209 };
210 if is_background {
211 executor.spawn_detached(handle_message);
212 } else {
213 handle_message.await;
214 }
215 } else {
216 log::warn!("unhandled message: {}", type_name);
217 }
218 } else {
219 log::info!("rpc connection closed {:?}", addr);
220 break;
221 }
222 }
223 }
224 }
225
226 if let Err(err) = this.sign_out(connection_id).await {
227 log::error!("error signing out connection {:?} - {:?}", addr, err);
228 }
229 }
230 }
231
232 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
233 self.peer.disconnect(connection_id);
234 let removed_connection = self.state_mut().remove_connection(connection_id)?;
235
236 for (project_id, project) in removed_connection.hosted_projects {
237 if let Some(share) = project.share {
238 broadcast(
239 connection_id,
240 share.guests.keys().copied().collect(),
241 |conn_id| {
242 self.peer
243 .send(conn_id, proto::UnshareProject { project_id })
244 },
245 )?;
246 }
247 }
248
249 for (project_id, peer_ids) in removed_connection.guest_project_ids {
250 broadcast(connection_id, peer_ids, |conn_id| {
251 self.peer.send(
252 conn_id,
253 proto::RemoveProjectCollaborator {
254 project_id,
255 peer_id: connection_id.0,
256 },
257 )
258 })?;
259 }
260
261 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
262 Ok(())
263 }
264
265 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
266 Ok(proto::Ack {})
267 }
268
269 async fn register_project(
270 mut self: Arc<Server>,
271 request: TypedEnvelope<proto::RegisterProject>,
272 ) -> tide::Result<proto::RegisterProjectResponse> {
273 let project_id = {
274 let mut state = self.state_mut();
275 let user_id = state.user_id_for_connection(request.sender_id)?;
276 state.register_project(request.sender_id, user_id)
277 };
278 Ok(proto::RegisterProjectResponse { project_id })
279 }
280
281 async fn unregister_project(
282 mut self: Arc<Server>,
283 request: TypedEnvelope<proto::UnregisterProject>,
284 ) -> tide::Result<()> {
285 let project = self
286 .state_mut()
287 .unregister_project(request.payload.project_id, request.sender_id)?;
288 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
289 Ok(())
290 }
291
292 async fn share_project(
293 mut self: Arc<Server>,
294 request: TypedEnvelope<proto::ShareProject>,
295 ) -> tide::Result<proto::Ack> {
296 self.state_mut()
297 .share_project(request.payload.project_id, request.sender_id);
298 Ok(proto::Ack {})
299 }
300
301 async fn unshare_project(
302 mut self: Arc<Server>,
303 request: TypedEnvelope<proto::UnshareProject>,
304 ) -> tide::Result<()> {
305 let project_id = request.payload.project_id;
306 let project = self
307 .state_mut()
308 .unshare_project(project_id, request.sender_id)?;
309
310 broadcast(request.sender_id, project.connection_ids, |conn_id| {
311 self.peer
312 .send(conn_id, proto::UnshareProject { project_id })
313 })?;
314 self.update_contacts_for_users(&project.authorized_user_ids)?;
315 Ok(())
316 }
317
318 async fn join_project(
319 mut self: Arc<Server>,
320 request: TypedEnvelope<proto::JoinProject>,
321 ) -> tide::Result<proto::JoinProjectResponse> {
322 let project_id = request.payload.project_id;
323
324 let user_id = self.state().user_id_for_connection(request.sender_id)?;
325 let (response, connection_ids, contact_user_ids) = self
326 .state_mut()
327 .join_project(request.sender_id, user_id, project_id)
328 .and_then(|joined| {
329 let share = joined.project.share()?;
330 let peer_count = share.guests.len();
331 let mut collaborators = Vec::with_capacity(peer_count);
332 collaborators.push(proto::Collaborator {
333 peer_id: joined.project.host_connection_id.0,
334 replica_id: 0,
335 user_id: joined.project.host_user_id.to_proto(),
336 });
337 let worktrees = share
338 .worktrees
339 .iter()
340 .filter_map(|(id, shared_worktree)| {
341 let worktree = joined.project.worktrees.get(&id)?;
342 Some(proto::Worktree {
343 id: *id,
344 root_name: worktree.root_name.clone(),
345 entries: shared_worktree.entries.values().cloned().collect(),
346 diagnostic_summaries: shared_worktree
347 .diagnostic_summaries
348 .values()
349 .cloned()
350 .collect(),
351 weak: worktree.weak,
352 })
353 })
354 .collect();
355 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
356 if *peer_conn_id != request.sender_id {
357 collaborators.push(proto::Collaborator {
358 peer_id: peer_conn_id.0,
359 replica_id: *peer_replica_id as u32,
360 user_id: peer_user_id.to_proto(),
361 });
362 }
363 }
364 let response = proto::JoinProjectResponse {
365 worktrees,
366 replica_id: joined.replica_id as u32,
367 collaborators,
368 };
369 let connection_ids = joined.project.connection_ids();
370 let contact_user_ids = joined.project.authorized_user_ids();
371 Ok((response, connection_ids, contact_user_ids))
372 })?;
373
374 broadcast(request.sender_id, connection_ids, |conn_id| {
375 self.peer.send(
376 conn_id,
377 proto::AddProjectCollaborator {
378 project_id,
379 collaborator: Some(proto::Collaborator {
380 peer_id: request.sender_id.0,
381 replica_id: response.replica_id,
382 user_id: user_id.to_proto(),
383 }),
384 },
385 )
386 })?;
387 self.update_contacts_for_users(&contact_user_ids)?;
388 Ok(response)
389 }
390
391 async fn leave_project(
392 mut self: Arc<Server>,
393 request: TypedEnvelope<proto::LeaveProject>,
394 ) -> tide::Result<()> {
395 let sender_id = request.sender_id;
396 let project_id = request.payload.project_id;
397 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
398
399 broadcast(sender_id, worktree.connection_ids, |conn_id| {
400 self.peer.send(
401 conn_id,
402 proto::RemoveProjectCollaborator {
403 project_id,
404 peer_id: sender_id.0,
405 },
406 )
407 })?;
408 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
409
410 Ok(())
411 }
412
413 async fn register_worktree(
414 mut self: Arc<Server>,
415 request: TypedEnvelope<proto::RegisterWorktree>,
416 ) -> tide::Result<proto::Ack> {
417 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
418
419 let mut contact_user_ids = HashSet::default();
420 contact_user_ids.insert(host_user_id);
421 for github_login in &request.payload.authorized_logins {
422 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
423 contact_user_ids.insert(contact_user_id);
424 }
425
426 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
427 let guest_connection_ids;
428 {
429 let mut state = self.state_mut();
430 guest_connection_ids = state
431 .read_project(request.payload.project_id, request.sender_id)?
432 .guest_connection_ids();
433 state.register_worktree(
434 request.payload.project_id,
435 request.payload.worktree_id,
436 request.sender_id,
437 Worktree {
438 authorized_user_ids: contact_user_ids.clone(),
439 root_name: request.payload.root_name.clone(),
440 weak: request.payload.weak,
441 },
442 )?;
443 }
444 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
445 self.peer
446 .forward_send(request.sender_id, connection_id, request.payload.clone())
447 })?;
448 self.update_contacts_for_users(&contact_user_ids)?;
449 Ok(proto::Ack {})
450 }
451
452 async fn unregister_worktree(
453 mut self: Arc<Server>,
454 request: TypedEnvelope<proto::UnregisterWorktree>,
455 ) -> tide::Result<()> {
456 let project_id = request.payload.project_id;
457 let worktree_id = request.payload.worktree_id;
458 let (worktree, guest_connection_ids) =
459 self.state_mut()
460 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
461 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
462 self.peer.send(
463 conn_id,
464 proto::UnregisterWorktree {
465 project_id,
466 worktree_id,
467 },
468 )
469 })?;
470 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
471 Ok(())
472 }
473
474 async fn update_worktree(
475 mut self: Arc<Server>,
476 request: TypedEnvelope<proto::UpdateWorktree>,
477 ) -> tide::Result<proto::Ack> {
478 let connection_ids = self.state_mut().update_worktree(
479 request.sender_id,
480 request.payload.project_id,
481 request.payload.worktree_id,
482 &request.payload.removed_entries,
483 &request.payload.updated_entries,
484 )?;
485
486 broadcast(request.sender_id, connection_ids, |connection_id| {
487 self.peer
488 .forward_send(request.sender_id, connection_id, request.payload.clone())
489 })?;
490
491 Ok(proto::Ack {})
492 }
493
494 async fn update_diagnostic_summary(
495 mut self: Arc<Server>,
496 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
497 ) -> tide::Result<()> {
498 let summary = request
499 .payload
500 .summary
501 .clone()
502 .ok_or_else(|| anyhow!("invalid summary"))?;
503 let receiver_ids = self.state_mut().update_diagnostic_summary(
504 request.payload.project_id,
505 request.payload.worktree_id,
506 request.sender_id,
507 summary,
508 )?;
509
510 broadcast(request.sender_id, receiver_ids, |connection_id| {
511 self.peer
512 .forward_send(request.sender_id, connection_id, request.payload.clone())
513 })?;
514 Ok(())
515 }
516
517 async fn disk_based_diagnostics_updating(
518 self: Arc<Server>,
519 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
520 ) -> tide::Result<()> {
521 let receiver_ids = self
522 .state()
523 .project_connection_ids(request.payload.project_id, request.sender_id)?;
524 broadcast(request.sender_id, receiver_ids, |connection_id| {
525 self.peer
526 .forward_send(request.sender_id, connection_id, request.payload.clone())
527 })?;
528 Ok(())
529 }
530
531 async fn disk_based_diagnostics_updated(
532 self: Arc<Server>,
533 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
534 ) -> tide::Result<()> {
535 let receiver_ids = self
536 .state()
537 .project_connection_ids(request.payload.project_id, request.sender_id)?;
538 broadcast(request.sender_id, receiver_ids, |connection_id| {
539 self.peer
540 .forward_send(request.sender_id, connection_id, request.payload.clone())
541 })?;
542 Ok(())
543 }
544
545 async fn get_definition(
546 self: Arc<Server>,
547 request: TypedEnvelope<proto::GetDefinition>,
548 ) -> tide::Result<proto::GetDefinitionResponse> {
549 let host_connection_id = self
550 .state()
551 .read_project(request.payload.project_id, request.sender_id)?
552 .host_connection_id;
553 Ok(self
554 .peer
555 .forward_request(request.sender_id, host_connection_id, request.payload)
556 .await?)
557 }
558
559 async fn get_references(
560 self: Arc<Server>,
561 request: TypedEnvelope<proto::GetReferences>,
562 ) -> tide::Result<proto::GetReferencesResponse> {
563 let host_connection_id = self
564 .state()
565 .read_project(request.payload.project_id, request.sender_id)?
566 .host_connection_id;
567 Ok(self
568 .peer
569 .forward_request(request.sender_id, host_connection_id, request.payload)
570 .await?)
571 }
572
573 async fn get_document_highlights(
574 self: Arc<Server>,
575 request: TypedEnvelope<proto::GetDocumentHighlights>,
576 ) -> tide::Result<proto::GetDocumentHighlightsResponse> {
577 let host_connection_id = self
578 .state()
579 .read_project(request.payload.project_id, request.sender_id)?
580 .host_connection_id;
581 Ok(self
582 .peer
583 .forward_request(request.sender_id, host_connection_id, request.payload)
584 .await?)
585 }
586
587 async fn get_project_symbols(
588 self: Arc<Server>,
589 request: TypedEnvelope<proto::GetProjectSymbols>,
590 ) -> tide::Result<proto::GetProjectSymbolsResponse> {
591 let host_connection_id = self
592 .state()
593 .read_project(request.payload.project_id, request.sender_id)?
594 .host_connection_id;
595 Ok(self
596 .peer
597 .forward_request(request.sender_id, host_connection_id, request.payload)
598 .await?)
599 }
600
601 async fn open_buffer_for_symbol(
602 self: Arc<Server>,
603 request: TypedEnvelope<proto::OpenBufferForSymbol>,
604 ) -> tide::Result<proto::OpenBufferForSymbolResponse> {
605 let host_connection_id = self
606 .state()
607 .read_project(request.payload.project_id, request.sender_id)?
608 .host_connection_id;
609 Ok(self
610 .peer
611 .forward_request(request.sender_id, host_connection_id, request.payload)
612 .await?)
613 }
614
615 async fn open_buffer(
616 self: Arc<Server>,
617 request: TypedEnvelope<proto::OpenBuffer>,
618 ) -> tide::Result<proto::OpenBufferResponse> {
619 let host_connection_id = self
620 .state()
621 .read_project(request.payload.project_id, request.sender_id)?
622 .host_connection_id;
623 Ok(self
624 .peer
625 .forward_request(request.sender_id, host_connection_id, request.payload)
626 .await?)
627 }
628
629 async fn close_buffer(
630 self: Arc<Server>,
631 request: TypedEnvelope<proto::CloseBuffer>,
632 ) -> tide::Result<()> {
633 let host_connection_id = self
634 .state()
635 .read_project(request.payload.project_id, request.sender_id)?
636 .host_connection_id;
637 self.peer
638 .forward_send(request.sender_id, host_connection_id, request.payload)?;
639 Ok(())
640 }
641
642 async fn save_buffer(
643 self: Arc<Server>,
644 request: TypedEnvelope<proto::SaveBuffer>,
645 ) -> tide::Result<proto::BufferSaved> {
646 let host;
647 let mut guests;
648 {
649 let state = self.state();
650 let project = state.read_project(request.payload.project_id, request.sender_id)?;
651 host = project.host_connection_id;
652 guests = project.guest_connection_ids()
653 }
654
655 let response = self
656 .peer
657 .forward_request(request.sender_id, host, request.payload.clone())
658 .await?;
659
660 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
661 broadcast(host, guests, |conn_id| {
662 self.peer.forward_send(host, conn_id, response.clone())
663 })?;
664
665 Ok(response)
666 }
667
668 async fn format_buffers(
669 self: Arc<Server>,
670 request: TypedEnvelope<proto::FormatBuffers>,
671 ) -> tide::Result<proto::FormatBuffersResponse> {
672 let host = self
673 .state()
674 .read_project(request.payload.project_id, request.sender_id)?
675 .host_connection_id;
676 Ok(self
677 .peer
678 .forward_request(request.sender_id, host, request.payload.clone())
679 .await?)
680 }
681
682 async fn get_completions(
683 self: Arc<Server>,
684 request: TypedEnvelope<proto::GetCompletions>,
685 ) -> tide::Result<proto::GetCompletionsResponse> {
686 let host = self
687 .state()
688 .read_project(request.payload.project_id, request.sender_id)?
689 .host_connection_id;
690 Ok(self
691 .peer
692 .forward_request(request.sender_id, host, request.payload.clone())
693 .await?)
694 }
695
696 async fn apply_additional_edits_for_completion(
697 self: Arc<Server>,
698 request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
699 ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
700 let host = self
701 .state()
702 .read_project(request.payload.project_id, request.sender_id)?
703 .host_connection_id;
704 Ok(self
705 .peer
706 .forward_request(request.sender_id, host, request.payload.clone())
707 .await?)
708 }
709
710 async fn get_code_actions(
711 self: Arc<Server>,
712 request: TypedEnvelope<proto::GetCodeActions>,
713 ) -> tide::Result<proto::GetCodeActionsResponse> {
714 let host = self
715 .state()
716 .read_project(request.payload.project_id, request.sender_id)?
717 .host_connection_id;
718 Ok(self
719 .peer
720 .forward_request(request.sender_id, host, request.payload.clone())
721 .await?)
722 }
723
724 async fn apply_code_action(
725 self: Arc<Server>,
726 request: TypedEnvelope<proto::ApplyCodeAction>,
727 ) -> tide::Result<proto::ApplyCodeActionResponse> {
728 let host = self
729 .state()
730 .read_project(request.payload.project_id, request.sender_id)?
731 .host_connection_id;
732 Ok(self
733 .peer
734 .forward_request(request.sender_id, host, request.payload.clone())
735 .await?)
736 }
737
738 async fn prepare_rename(
739 self: Arc<Server>,
740 request: TypedEnvelope<proto::PrepareRename>,
741 ) -> tide::Result<proto::PrepareRenameResponse> {
742 let host = self
743 .state()
744 .read_project(request.payload.project_id, request.sender_id)?
745 .host_connection_id;
746 Ok(self
747 .peer
748 .forward_request(request.sender_id, host, request.payload.clone())
749 .await?)
750 }
751
752 async fn perform_rename(
753 self: Arc<Server>,
754 request: TypedEnvelope<proto::PerformRename>,
755 ) -> tide::Result<proto::PerformRenameResponse> {
756 let host = self
757 .state()
758 .read_project(request.payload.project_id, request.sender_id)?
759 .host_connection_id;
760 Ok(self
761 .peer
762 .forward_request(request.sender_id, host, request.payload.clone())
763 .await?)
764 }
765
766 async fn update_buffer(
767 self: Arc<Server>,
768 request: TypedEnvelope<proto::UpdateBuffer>,
769 ) -> tide::Result<proto::Ack> {
770 let receiver_ids = self
771 .state()
772 .project_connection_ids(request.payload.project_id, request.sender_id)?;
773 broadcast(request.sender_id, receiver_ids, |connection_id| {
774 self.peer
775 .forward_send(request.sender_id, connection_id, request.payload.clone())
776 })?;
777 Ok(proto::Ack {})
778 }
779
780 async fn update_buffer_file(
781 self: Arc<Server>,
782 request: TypedEnvelope<proto::UpdateBufferFile>,
783 ) -> tide::Result<()> {
784 let receiver_ids = self
785 .state()
786 .project_connection_ids(request.payload.project_id, request.sender_id)?;
787 broadcast(request.sender_id, receiver_ids, |connection_id| {
788 self.peer
789 .forward_send(request.sender_id, connection_id, request.payload.clone())
790 })?;
791 Ok(())
792 }
793
794 async fn buffer_reloaded(
795 self: Arc<Server>,
796 request: TypedEnvelope<proto::BufferReloaded>,
797 ) -> tide::Result<()> {
798 let receiver_ids = self
799 .state()
800 .project_connection_ids(request.payload.project_id, request.sender_id)?;
801 broadcast(request.sender_id, receiver_ids, |connection_id| {
802 self.peer
803 .forward_send(request.sender_id, connection_id, request.payload.clone())
804 })?;
805 Ok(())
806 }
807
808 async fn buffer_saved(
809 self: Arc<Server>,
810 request: TypedEnvelope<proto::BufferSaved>,
811 ) -> tide::Result<()> {
812 let receiver_ids = self
813 .state()
814 .project_connection_ids(request.payload.project_id, request.sender_id)?;
815 broadcast(request.sender_id, receiver_ids, |connection_id| {
816 self.peer
817 .forward_send(request.sender_id, connection_id, request.payload.clone())
818 })?;
819 Ok(())
820 }
821
822 async fn get_channels(
823 self: Arc<Server>,
824 request: TypedEnvelope<proto::GetChannels>,
825 ) -> tide::Result<proto::GetChannelsResponse> {
826 let user_id = self.state().user_id_for_connection(request.sender_id)?;
827 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
828 Ok(proto::GetChannelsResponse {
829 channels: channels
830 .into_iter()
831 .map(|chan| proto::Channel {
832 id: chan.id.to_proto(),
833 name: chan.name,
834 })
835 .collect(),
836 })
837 }
838
839 async fn get_users(
840 self: Arc<Server>,
841 request: TypedEnvelope<proto::GetUsers>,
842 ) -> tide::Result<proto::GetUsersResponse> {
843 let user_ids = request
844 .payload
845 .user_ids
846 .into_iter()
847 .map(UserId::from_proto)
848 .collect();
849 let users = self
850 .app_state
851 .db
852 .get_users_by_ids(user_ids)
853 .await?
854 .into_iter()
855 .map(|user| proto::User {
856 id: user.id.to_proto(),
857 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
858 github_login: user.github_login,
859 })
860 .collect();
861 Ok(proto::GetUsersResponse { users })
862 }
863
864 fn update_contacts_for_users<'a>(
865 self: &Arc<Server>,
866 user_ids: impl IntoIterator<Item = &'a UserId>,
867 ) -> anyhow::Result<()> {
868 let mut result = Ok(());
869 let state = self.state();
870 for user_id in user_ids {
871 let contacts = state.contacts_for_user(*user_id);
872 for connection_id in state.connection_ids_for_user(*user_id) {
873 if let Err(error) = self.peer.send(
874 connection_id,
875 proto::UpdateContacts {
876 contacts: contacts.clone(),
877 },
878 ) {
879 result = Err(error);
880 }
881 }
882 }
883 result
884 }
885
886 async fn join_channel(
887 mut self: Arc<Self>,
888 request: TypedEnvelope<proto::JoinChannel>,
889 ) -> tide::Result<proto::JoinChannelResponse> {
890 let user_id = self.state().user_id_for_connection(request.sender_id)?;
891 let channel_id = ChannelId::from_proto(request.payload.channel_id);
892 if !self
893 .app_state
894 .db
895 .can_user_access_channel(user_id, channel_id)
896 .await?
897 {
898 Err(anyhow!("access denied"))?;
899 }
900
901 self.state_mut().join_channel(request.sender_id, channel_id);
902 let messages = self
903 .app_state
904 .db
905 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
906 .await?
907 .into_iter()
908 .map(|msg| proto::ChannelMessage {
909 id: msg.id.to_proto(),
910 body: msg.body,
911 timestamp: msg.sent_at.unix_timestamp() as u64,
912 sender_id: msg.sender_id.to_proto(),
913 nonce: Some(msg.nonce.as_u128().into()),
914 })
915 .collect::<Vec<_>>();
916 Ok(proto::JoinChannelResponse {
917 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
918 messages,
919 })
920 }
921
922 async fn leave_channel(
923 mut self: Arc<Self>,
924 request: TypedEnvelope<proto::LeaveChannel>,
925 ) -> tide::Result<()> {
926 let user_id = self.state().user_id_for_connection(request.sender_id)?;
927 let channel_id = ChannelId::from_proto(request.payload.channel_id);
928 if !self
929 .app_state
930 .db
931 .can_user_access_channel(user_id, channel_id)
932 .await?
933 {
934 Err(anyhow!("access denied"))?;
935 }
936
937 self.state_mut()
938 .leave_channel(request.sender_id, channel_id);
939
940 Ok(())
941 }
942
943 async fn send_channel_message(
944 self: Arc<Self>,
945 request: TypedEnvelope<proto::SendChannelMessage>,
946 ) -> tide::Result<proto::SendChannelMessageResponse> {
947 let channel_id = ChannelId::from_proto(request.payload.channel_id);
948 let user_id;
949 let connection_ids;
950 {
951 let state = self.state();
952 user_id = state.user_id_for_connection(request.sender_id)?;
953 connection_ids = state.channel_connection_ids(channel_id)?;
954 }
955
956 // Validate the message body.
957 let body = request.payload.body.trim().to_string();
958 if body.len() > MAX_MESSAGE_LEN {
959 return Err(anyhow!("message is too long"))?;
960 }
961 if body.is_empty() {
962 return Err(anyhow!("message can't be blank"))?;
963 }
964
965 let timestamp = OffsetDateTime::now_utc();
966 let nonce = request
967 .payload
968 .nonce
969 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
970
971 let message_id = self
972 .app_state
973 .db
974 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
975 .await?
976 .to_proto();
977 let message = proto::ChannelMessage {
978 sender_id: user_id.to_proto(),
979 id: message_id,
980 body,
981 timestamp: timestamp.unix_timestamp() as u64,
982 nonce: Some(nonce),
983 };
984 broadcast(request.sender_id, connection_ids, |conn_id| {
985 self.peer.send(
986 conn_id,
987 proto::ChannelMessageSent {
988 channel_id: channel_id.to_proto(),
989 message: Some(message.clone()),
990 },
991 )
992 })?;
993 Ok(proto::SendChannelMessageResponse {
994 message: Some(message),
995 })
996 }
997
998 async fn get_channel_messages(
999 self: Arc<Self>,
1000 request: TypedEnvelope<proto::GetChannelMessages>,
1001 ) -> tide::Result<proto::GetChannelMessagesResponse> {
1002 let user_id = self.state().user_id_for_connection(request.sender_id)?;
1003 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1004 if !self
1005 .app_state
1006 .db
1007 .can_user_access_channel(user_id, channel_id)
1008 .await?
1009 {
1010 Err(anyhow!("access denied"))?;
1011 }
1012
1013 let messages = self
1014 .app_state
1015 .db
1016 .get_channel_messages(
1017 channel_id,
1018 MESSAGE_COUNT_PER_PAGE,
1019 Some(MessageId::from_proto(request.payload.before_message_id)),
1020 )
1021 .await?
1022 .into_iter()
1023 .map(|msg| proto::ChannelMessage {
1024 id: msg.id.to_proto(),
1025 body: msg.body,
1026 timestamp: msg.sent_at.unix_timestamp() as u64,
1027 sender_id: msg.sender_id.to_proto(),
1028 nonce: Some(msg.nonce.as_u128().into()),
1029 })
1030 .collect::<Vec<_>>();
1031
1032 Ok(proto::GetChannelMessagesResponse {
1033 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1034 messages,
1035 })
1036 }
1037
1038 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1039 self.store.read()
1040 }
1041
1042 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1043 self.store.write()
1044 }
1045}
1046
1047impl Executor for RealExecutor {
1048 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1049 task::spawn(future);
1050 }
1051}
1052
1053fn broadcast<F>(
1054 sender_id: ConnectionId,
1055 receiver_ids: Vec<ConnectionId>,
1056 mut f: F,
1057) -> anyhow::Result<()>
1058where
1059 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1060{
1061 let mut result = Ok(());
1062 for receiver_id in receiver_ids {
1063 if receiver_id != sender_id {
1064 if let Err(error) = f(receiver_id) {
1065 if result.is_ok() {
1066 result = Err(error);
1067 }
1068 }
1069 }
1070 }
1071 result
1072}
1073
1074pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1075 let server = Server::new(app.state().clone(), rpc.clone(), None);
1076 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1077 let server = server.clone();
1078 async move {
1079 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1080
1081 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1082 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1083 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1084 let client_protocol_version: Option<u32> = request
1085 .header("X-Zed-Protocol-Version")
1086 .and_then(|v| v.as_str().parse().ok());
1087
1088 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1089 return Ok(Response::new(StatusCode::UpgradeRequired));
1090 }
1091
1092 let header = match request.header("Sec-Websocket-Key") {
1093 Some(h) => h.as_str(),
1094 None => return Err(anyhow!("expected sec-websocket-key"))?,
1095 };
1096
1097 let user_id = process_auth_header(&request).await?;
1098
1099 let mut response = Response::new(StatusCode::SwitchingProtocols);
1100 response.insert_header(UPGRADE, "websocket");
1101 response.insert_header(CONNECTION, "Upgrade");
1102 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1103 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1104 response.insert_header("Sec-Websocket-Version", "13");
1105
1106 let http_res: &mut tide::http::Response = response.as_mut();
1107 let upgrade_receiver = http_res.recv_upgrade().await;
1108 let addr = request.remote().unwrap_or("unknown").to_string();
1109 task::spawn(async move {
1110 if let Some(stream) = upgrade_receiver.await {
1111 server
1112 .handle_connection(
1113 Connection::new(
1114 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1115 ),
1116 addr,
1117 user_id,
1118 None,
1119 RealExecutor,
1120 )
1121 .await;
1122 }
1123 });
1124
1125 Ok(response)
1126 }
1127 });
1128}
1129
1130fn header_contains_ignore_case<T>(
1131 request: &tide::Request<T>,
1132 header_name: HeaderName,
1133 value: &str,
1134) -> bool {
1135 request
1136 .header(header_name)
1137 .map(|h| {
1138 h.as_str()
1139 .split(',')
1140 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1141 })
1142 .unwrap_or(false)
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147 use super::*;
1148 use crate::{
1149 auth,
1150 db::{tests::TestDb, UserId},
1151 github, AppState, Config,
1152 };
1153 use ::rpc::Peer;
1154 use collections::BTreeMap;
1155 use gpui::{executor, ModelHandle, TestAppContext};
1156 use parking_lot::Mutex;
1157 use postage::{sink::Sink, watch};
1158 use rand::prelude::*;
1159 use rpc::PeerId;
1160 use serde_json::json;
1161 use sqlx::types::time::OffsetDateTime;
1162 use std::{
1163 cell::Cell,
1164 env,
1165 ops::Deref,
1166 path::{Path, PathBuf},
1167 rc::Rc,
1168 sync::{
1169 atomic::{AtomicBool, Ordering::SeqCst},
1170 Arc,
1171 },
1172 time::Duration,
1173 };
1174 use zed::{
1175 client::{
1176 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1177 EstablishConnectionError, UserStore,
1178 },
1179 editor::{
1180 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, EditorSettings,
1181 Input, MultiBuffer, Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1182 },
1183 fs::{FakeFs, Fs as _},
1184 language::{
1185 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1186 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1187 },
1188 lsp,
1189 project::{DiagnosticSummary, Project, ProjectPath},
1190 workspace::{Workspace, WorkspaceParams},
1191 };
1192
1193 #[cfg(test)]
1194 #[ctor::ctor]
1195 fn init_logger() {
1196 if std::env::var("RUST_LOG").is_ok() {
1197 env_logger::init();
1198 }
1199 }
1200
1201 #[gpui::test(iterations = 10)]
1202 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1203 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1204 let lang_registry = Arc::new(LanguageRegistry::new());
1205 let fs = FakeFs::new(cx_a.background());
1206 cx_a.foreground().forbid_parking();
1207
1208 // Connect to a server as 2 clients.
1209 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1210 let client_a = server.create_client(&mut cx_a, "user_a").await;
1211 let client_b = server.create_client(&mut cx_b, "user_b").await;
1212
1213 // Share a project as client A
1214 fs.insert_tree(
1215 "/a",
1216 json!({
1217 ".zed.toml": r#"collaborators = ["user_b"]"#,
1218 "a.txt": "a-contents",
1219 "b.txt": "b-contents",
1220 }),
1221 )
1222 .await;
1223 let project_a = cx_a.update(|cx| {
1224 Project::local(
1225 client_a.clone(),
1226 client_a.user_store.clone(),
1227 lang_registry.clone(),
1228 fs.clone(),
1229 cx,
1230 )
1231 });
1232 let (worktree_a, _) = project_a
1233 .update(&mut cx_a, |p, cx| {
1234 p.find_or_create_local_worktree("/a", false, cx)
1235 })
1236 .await
1237 .unwrap();
1238 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1239 worktree_a
1240 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1241 .await;
1242 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1243 project_a
1244 .update(&mut cx_a, |p, cx| p.share(cx))
1245 .await
1246 .unwrap();
1247
1248 // Join that project as client B
1249 let project_b = Project::remote(
1250 project_id,
1251 client_b.clone(),
1252 client_b.user_store.clone(),
1253 lang_registry.clone(),
1254 fs.clone(),
1255 &mut cx_b.to_async(),
1256 )
1257 .await
1258 .unwrap();
1259
1260 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1261 assert_eq!(
1262 project
1263 .collaborators()
1264 .get(&client_a.peer_id)
1265 .unwrap()
1266 .user
1267 .github_login,
1268 "user_a"
1269 );
1270 project.replica_id()
1271 });
1272 project_a
1273 .condition(&cx_a, |tree, _| {
1274 tree.collaborators()
1275 .get(&client_b.peer_id)
1276 .map_or(false, |collaborator| {
1277 collaborator.replica_id == replica_id_b
1278 && collaborator.user.github_login == "user_b"
1279 })
1280 })
1281 .await;
1282
1283 // Open the same file as client B and client A.
1284 let buffer_b = project_b
1285 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1286 .await
1287 .unwrap();
1288 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1289 buffer_b.read_with(&cx_b, |buf, cx| {
1290 assert_eq!(buf.read(cx).text(), "b-contents")
1291 });
1292 project_a.read_with(&cx_a, |project, cx| {
1293 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1294 });
1295 let buffer_a = project_a
1296 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1297 .await
1298 .unwrap();
1299
1300 let editor_b = cx_b.add_view(window_b, |cx| {
1301 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1302 });
1303
1304 // TODO
1305 // // Create a selection set as client B and see that selection set as client A.
1306 // buffer_a
1307 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1308 // .await;
1309
1310 // Edit the buffer as client B and see that edit as client A.
1311 editor_b.update(&mut cx_b, |editor, cx| {
1312 editor.handle_input(&Input("ok, ".into()), cx)
1313 });
1314 buffer_a
1315 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1316 .await;
1317
1318 // TODO
1319 // // Remove the selection set as client B, see those selections disappear as client A.
1320 cx_b.update(move |_| drop(editor_b));
1321 // buffer_a
1322 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1323 // .await;
1324
1325 // Close the buffer as client A, see that the buffer is closed.
1326 cx_a.update(move |_| drop(buffer_a));
1327 project_a
1328 .condition(&cx_a, |project, cx| {
1329 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1330 })
1331 .await;
1332
1333 // Dropping the client B's project removes client B from client A's collaborators.
1334 cx_b.update(move |_| drop(project_b));
1335 project_a
1336 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1337 .await;
1338 }
1339
1340 #[gpui::test(iterations = 10)]
1341 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1342 let lang_registry = Arc::new(LanguageRegistry::new());
1343 let fs = FakeFs::new(cx_a.background());
1344 cx_a.foreground().forbid_parking();
1345
1346 // Connect to a server as 2 clients.
1347 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1348 let client_a = server.create_client(&mut cx_a, "user_a").await;
1349 let client_b = server.create_client(&mut cx_b, "user_b").await;
1350
1351 // Share a project as client A
1352 fs.insert_tree(
1353 "/a",
1354 json!({
1355 ".zed.toml": r#"collaborators = ["user_b"]"#,
1356 "a.txt": "a-contents",
1357 "b.txt": "b-contents",
1358 }),
1359 )
1360 .await;
1361 let project_a = cx_a.update(|cx| {
1362 Project::local(
1363 client_a.clone(),
1364 client_a.user_store.clone(),
1365 lang_registry.clone(),
1366 fs.clone(),
1367 cx,
1368 )
1369 });
1370 let (worktree_a, _) = project_a
1371 .update(&mut cx_a, |p, cx| {
1372 p.find_or_create_local_worktree("/a", false, cx)
1373 })
1374 .await
1375 .unwrap();
1376 worktree_a
1377 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1378 .await;
1379 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1380 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1381 project_a
1382 .update(&mut cx_a, |p, cx| p.share(cx))
1383 .await
1384 .unwrap();
1385 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1386
1387 // Join that project as client B
1388 let project_b = Project::remote(
1389 project_id,
1390 client_b.clone(),
1391 client_b.user_store.clone(),
1392 lang_registry.clone(),
1393 fs.clone(),
1394 &mut cx_b.to_async(),
1395 )
1396 .await
1397 .unwrap();
1398 project_b
1399 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1400 .await
1401 .unwrap();
1402
1403 // Unshare the project as client A
1404 project_a
1405 .update(&mut cx_a, |project, cx| project.unshare(cx))
1406 .await
1407 .unwrap();
1408 project_b
1409 .condition(&mut cx_b, |project, _| project.is_read_only())
1410 .await;
1411 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1412 drop(project_b);
1413
1414 // Share the project again and ensure guests can still join.
1415 project_a
1416 .update(&mut cx_a, |project, cx| project.share(cx))
1417 .await
1418 .unwrap();
1419 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1420
1421 let project_c = Project::remote(
1422 project_id,
1423 client_b.clone(),
1424 client_b.user_store.clone(),
1425 lang_registry.clone(),
1426 fs.clone(),
1427 &mut cx_b.to_async(),
1428 )
1429 .await
1430 .unwrap();
1431 project_c
1432 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1433 .await
1434 .unwrap();
1435 }
1436
1437 #[gpui::test(iterations = 10)]
1438 async fn test_propagate_saves_and_fs_changes(
1439 mut cx_a: TestAppContext,
1440 mut cx_b: TestAppContext,
1441 mut cx_c: TestAppContext,
1442 ) {
1443 let lang_registry = Arc::new(LanguageRegistry::new());
1444 let fs = FakeFs::new(cx_a.background());
1445 cx_a.foreground().forbid_parking();
1446
1447 // Connect to a server as 3 clients.
1448 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1449 let client_a = server.create_client(&mut cx_a, "user_a").await;
1450 let client_b = server.create_client(&mut cx_b, "user_b").await;
1451 let client_c = server.create_client(&mut cx_c, "user_c").await;
1452
1453 // Share a worktree as client A.
1454 fs.insert_tree(
1455 "/a",
1456 json!({
1457 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1458 "file1": "",
1459 "file2": ""
1460 }),
1461 )
1462 .await;
1463 let project_a = cx_a.update(|cx| {
1464 Project::local(
1465 client_a.clone(),
1466 client_a.user_store.clone(),
1467 lang_registry.clone(),
1468 fs.clone(),
1469 cx,
1470 )
1471 });
1472 let (worktree_a, _) = project_a
1473 .update(&mut cx_a, |p, cx| {
1474 p.find_or_create_local_worktree("/a", false, cx)
1475 })
1476 .await
1477 .unwrap();
1478 worktree_a
1479 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1480 .await;
1481 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1482 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1483 project_a
1484 .update(&mut cx_a, |p, cx| p.share(cx))
1485 .await
1486 .unwrap();
1487
1488 // Join that worktree as clients B and C.
1489 let project_b = Project::remote(
1490 project_id,
1491 client_b.clone(),
1492 client_b.user_store.clone(),
1493 lang_registry.clone(),
1494 fs.clone(),
1495 &mut cx_b.to_async(),
1496 )
1497 .await
1498 .unwrap();
1499 let project_c = Project::remote(
1500 project_id,
1501 client_c.clone(),
1502 client_c.user_store.clone(),
1503 lang_registry.clone(),
1504 fs.clone(),
1505 &mut cx_c.to_async(),
1506 )
1507 .await
1508 .unwrap();
1509 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1510 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1511
1512 // Open and edit a buffer as both guests B and C.
1513 let buffer_b = project_b
1514 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1515 .await
1516 .unwrap();
1517 let buffer_c = project_c
1518 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1519 .await
1520 .unwrap();
1521 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1522 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1523
1524 // Open and edit that buffer as the host.
1525 let buffer_a = project_a
1526 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1527 .await
1528 .unwrap();
1529
1530 buffer_a
1531 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1532 .await;
1533 buffer_a.update(&mut cx_a, |buf, cx| {
1534 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1535 });
1536
1537 // Wait for edits to propagate
1538 buffer_a
1539 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1540 .await;
1541 buffer_b
1542 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1543 .await;
1544 buffer_c
1545 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1546 .await;
1547
1548 // Edit the buffer as the host and concurrently save as guest B.
1549 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1550 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1551 save_b.await.unwrap();
1552 assert_eq!(
1553 fs.load("/a/file1".as_ref()).await.unwrap(),
1554 "hi-a, i-am-c, i-am-b, i-am-a"
1555 );
1556 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1557 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1558 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1559
1560 // Make changes on host's file system, see those changes on guest worktrees.
1561 fs.rename(
1562 "/a/file1".as_ref(),
1563 "/a/file1-renamed".as_ref(),
1564 Default::default(),
1565 )
1566 .await
1567 .unwrap();
1568
1569 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1570 .await
1571 .unwrap();
1572 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1573
1574 worktree_a
1575 .condition(&cx_a, |tree, _| {
1576 tree.paths()
1577 .map(|p| p.to_string_lossy())
1578 .collect::<Vec<_>>()
1579 == [".zed.toml", "file1-renamed", "file3", "file4"]
1580 })
1581 .await;
1582 worktree_b
1583 .condition(&cx_b, |tree, _| {
1584 tree.paths()
1585 .map(|p| p.to_string_lossy())
1586 .collect::<Vec<_>>()
1587 == [".zed.toml", "file1-renamed", "file3", "file4"]
1588 })
1589 .await;
1590 worktree_c
1591 .condition(&cx_c, |tree, _| {
1592 tree.paths()
1593 .map(|p| p.to_string_lossy())
1594 .collect::<Vec<_>>()
1595 == [".zed.toml", "file1-renamed", "file3", "file4"]
1596 })
1597 .await;
1598
1599 // Ensure buffer files are updated as well.
1600 buffer_a
1601 .condition(&cx_a, |buf, _| {
1602 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1603 })
1604 .await;
1605 buffer_b
1606 .condition(&cx_b, |buf, _| {
1607 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1608 })
1609 .await;
1610 buffer_c
1611 .condition(&cx_c, |buf, _| {
1612 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1613 })
1614 .await;
1615 }
1616
1617 #[gpui::test(iterations = 10)]
1618 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1619 cx_a.foreground().forbid_parking();
1620 let lang_registry = Arc::new(LanguageRegistry::new());
1621 let fs = FakeFs::new(cx_a.background());
1622
1623 // Connect to a server as 2 clients.
1624 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1625 let client_a = server.create_client(&mut cx_a, "user_a").await;
1626 let client_b = server.create_client(&mut cx_b, "user_b").await;
1627
1628 // Share a project as client A
1629 fs.insert_tree(
1630 "/dir",
1631 json!({
1632 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1633 "a.txt": "a-contents",
1634 }),
1635 )
1636 .await;
1637
1638 let project_a = cx_a.update(|cx| {
1639 Project::local(
1640 client_a.clone(),
1641 client_a.user_store.clone(),
1642 lang_registry.clone(),
1643 fs.clone(),
1644 cx,
1645 )
1646 });
1647 let (worktree_a, _) = project_a
1648 .update(&mut cx_a, |p, cx| {
1649 p.find_or_create_local_worktree("/dir", false, cx)
1650 })
1651 .await
1652 .unwrap();
1653 worktree_a
1654 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1655 .await;
1656 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1657 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1658 project_a
1659 .update(&mut cx_a, |p, cx| p.share(cx))
1660 .await
1661 .unwrap();
1662
1663 // Join that project as client B
1664 let project_b = Project::remote(
1665 project_id,
1666 client_b.clone(),
1667 client_b.user_store.clone(),
1668 lang_registry.clone(),
1669 fs.clone(),
1670 &mut cx_b.to_async(),
1671 )
1672 .await
1673 .unwrap();
1674
1675 // Open a buffer as client B
1676 let buffer_b = project_b
1677 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1678 .await
1679 .unwrap();
1680
1681 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1682 buffer_b.read_with(&cx_b, |buf, _| {
1683 assert!(buf.is_dirty());
1684 assert!(!buf.has_conflict());
1685 });
1686
1687 buffer_b
1688 .update(&mut cx_b, |buf, cx| buf.save(cx))
1689 .await
1690 .unwrap();
1691 buffer_b
1692 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1693 .await;
1694 buffer_b.read_with(&cx_b, |buf, _| {
1695 assert!(!buf.has_conflict());
1696 });
1697
1698 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1699 buffer_b.read_with(&cx_b, |buf, _| {
1700 assert!(buf.is_dirty());
1701 assert!(!buf.has_conflict());
1702 });
1703 }
1704
1705 #[gpui::test(iterations = 10)]
1706 async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1707 cx_a.foreground().forbid_parking();
1708 let lang_registry = Arc::new(LanguageRegistry::new());
1709 let fs = FakeFs::new(cx_a.background());
1710
1711 // Connect to a server as 2 clients.
1712 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1713 let client_a = server.create_client(&mut cx_a, "user_a").await;
1714 let client_b = server.create_client(&mut cx_b, "user_b").await;
1715
1716 // Share a project as client A
1717 fs.insert_tree(
1718 "/dir",
1719 json!({
1720 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1721 "a.txt": "a-contents",
1722 }),
1723 )
1724 .await;
1725
1726 let project_a = cx_a.update(|cx| {
1727 Project::local(
1728 client_a.clone(),
1729 client_a.user_store.clone(),
1730 lang_registry.clone(),
1731 fs.clone(),
1732 cx,
1733 )
1734 });
1735 let (worktree_a, _) = project_a
1736 .update(&mut cx_a, |p, cx| {
1737 p.find_or_create_local_worktree("/dir", false, cx)
1738 })
1739 .await
1740 .unwrap();
1741 worktree_a
1742 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1743 .await;
1744 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1745 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1746 project_a
1747 .update(&mut cx_a, |p, cx| p.share(cx))
1748 .await
1749 .unwrap();
1750
1751 // Join that project as client B
1752 let project_b = Project::remote(
1753 project_id,
1754 client_b.clone(),
1755 client_b.user_store.clone(),
1756 lang_registry.clone(),
1757 fs.clone(),
1758 &mut cx_b.to_async(),
1759 )
1760 .await
1761 .unwrap();
1762 let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1763
1764 // Open a buffer as client B
1765 let buffer_b = project_b
1766 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1767 .await
1768 .unwrap();
1769 buffer_b.read_with(&cx_b, |buf, _| {
1770 assert!(!buf.is_dirty());
1771 assert!(!buf.has_conflict());
1772 });
1773
1774 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1775 .await
1776 .unwrap();
1777 buffer_b
1778 .condition(&cx_b, |buf, _| {
1779 buf.text() == "new contents" && !buf.is_dirty()
1780 })
1781 .await;
1782 buffer_b.read_with(&cx_b, |buf, _| {
1783 assert!(!buf.has_conflict());
1784 });
1785 }
1786
1787 #[gpui::test(iterations = 10)]
1788 async fn test_editing_while_guest_opens_buffer(
1789 mut cx_a: TestAppContext,
1790 mut cx_b: TestAppContext,
1791 ) {
1792 cx_a.foreground().forbid_parking();
1793 let lang_registry = Arc::new(LanguageRegistry::new());
1794 let fs = FakeFs::new(cx_a.background());
1795
1796 // Connect to a server as 2 clients.
1797 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1798 let client_a = server.create_client(&mut cx_a, "user_a").await;
1799 let client_b = server.create_client(&mut cx_b, "user_b").await;
1800
1801 // Share a project as client A
1802 fs.insert_tree(
1803 "/dir",
1804 json!({
1805 ".zed.toml": r#"collaborators = ["user_b"]"#,
1806 "a.txt": "a-contents",
1807 }),
1808 )
1809 .await;
1810 let project_a = cx_a.update(|cx| {
1811 Project::local(
1812 client_a.clone(),
1813 client_a.user_store.clone(),
1814 lang_registry.clone(),
1815 fs.clone(),
1816 cx,
1817 )
1818 });
1819 let (worktree_a, _) = project_a
1820 .update(&mut cx_a, |p, cx| {
1821 p.find_or_create_local_worktree("/dir", false, cx)
1822 })
1823 .await
1824 .unwrap();
1825 worktree_a
1826 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1827 .await;
1828 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1829 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1830 project_a
1831 .update(&mut cx_a, |p, cx| p.share(cx))
1832 .await
1833 .unwrap();
1834
1835 // Join that project as client B
1836 let project_b = Project::remote(
1837 project_id,
1838 client_b.clone(),
1839 client_b.user_store.clone(),
1840 lang_registry.clone(),
1841 fs.clone(),
1842 &mut cx_b.to_async(),
1843 )
1844 .await
1845 .unwrap();
1846
1847 // Open a buffer as client A
1848 let buffer_a = project_a
1849 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1850 .await
1851 .unwrap();
1852
1853 // Start opening the same buffer as client B
1854 let buffer_b = cx_b
1855 .background()
1856 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1857
1858 // Edit the buffer as client A while client B is still opening it.
1859 cx_b.background().simulate_random_delay().await;
1860 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1861 cx_b.background().simulate_random_delay().await;
1862 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1863
1864 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1865 let buffer_b = buffer_b.await.unwrap();
1866 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1867 }
1868
1869 #[gpui::test(iterations = 10)]
1870 async fn test_leaving_worktree_while_opening_buffer(
1871 mut cx_a: TestAppContext,
1872 mut cx_b: TestAppContext,
1873 ) {
1874 cx_a.foreground().forbid_parking();
1875 let lang_registry = Arc::new(LanguageRegistry::new());
1876 let fs = FakeFs::new(cx_a.background());
1877
1878 // Connect to a server as 2 clients.
1879 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1880 let client_a = server.create_client(&mut cx_a, "user_a").await;
1881 let client_b = server.create_client(&mut cx_b, "user_b").await;
1882
1883 // Share a project as client A
1884 fs.insert_tree(
1885 "/dir",
1886 json!({
1887 ".zed.toml": r#"collaborators = ["user_b"]"#,
1888 "a.txt": "a-contents",
1889 }),
1890 )
1891 .await;
1892 let project_a = cx_a.update(|cx| {
1893 Project::local(
1894 client_a.clone(),
1895 client_a.user_store.clone(),
1896 lang_registry.clone(),
1897 fs.clone(),
1898 cx,
1899 )
1900 });
1901 let (worktree_a, _) = project_a
1902 .update(&mut cx_a, |p, cx| {
1903 p.find_or_create_local_worktree("/dir", false, cx)
1904 })
1905 .await
1906 .unwrap();
1907 worktree_a
1908 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1909 .await;
1910 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1911 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1912 project_a
1913 .update(&mut cx_a, |p, cx| p.share(cx))
1914 .await
1915 .unwrap();
1916
1917 // Join that project as client B
1918 let project_b = Project::remote(
1919 project_id,
1920 client_b.clone(),
1921 client_b.user_store.clone(),
1922 lang_registry.clone(),
1923 fs.clone(),
1924 &mut cx_b.to_async(),
1925 )
1926 .await
1927 .unwrap();
1928
1929 // See that a guest has joined as client A.
1930 project_a
1931 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1932 .await;
1933
1934 // Begin opening a buffer as client B, but leave the project before the open completes.
1935 let buffer_b = cx_b
1936 .background()
1937 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1938 cx_b.update(|_| drop(project_b));
1939 drop(buffer_b);
1940
1941 // See that the guest has left.
1942 project_a
1943 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1944 .await;
1945 }
1946
1947 #[gpui::test(iterations = 10)]
1948 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1949 cx_a.foreground().forbid_parking();
1950 let lang_registry = Arc::new(LanguageRegistry::new());
1951 let fs = FakeFs::new(cx_a.background());
1952
1953 // Connect to a server as 2 clients.
1954 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1955 let client_a = server.create_client(&mut cx_a, "user_a").await;
1956 let client_b = server.create_client(&mut cx_b, "user_b").await;
1957
1958 // Share a project as client A
1959 fs.insert_tree(
1960 "/a",
1961 json!({
1962 ".zed.toml": r#"collaborators = ["user_b"]"#,
1963 "a.txt": "a-contents",
1964 "b.txt": "b-contents",
1965 }),
1966 )
1967 .await;
1968 let project_a = cx_a.update(|cx| {
1969 Project::local(
1970 client_a.clone(),
1971 client_a.user_store.clone(),
1972 lang_registry.clone(),
1973 fs.clone(),
1974 cx,
1975 )
1976 });
1977 let (worktree_a, _) = project_a
1978 .update(&mut cx_a, |p, cx| {
1979 p.find_or_create_local_worktree("/a", false, cx)
1980 })
1981 .await
1982 .unwrap();
1983 worktree_a
1984 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1985 .await;
1986 let project_id = project_a
1987 .update(&mut cx_a, |project, _| project.next_remote_id())
1988 .await;
1989 project_a
1990 .update(&mut cx_a, |project, cx| project.share(cx))
1991 .await
1992 .unwrap();
1993
1994 // Join that project as client B
1995 let _project_b = Project::remote(
1996 project_id,
1997 client_b.clone(),
1998 client_b.user_store.clone(),
1999 lang_registry.clone(),
2000 fs.clone(),
2001 &mut cx_b.to_async(),
2002 )
2003 .await
2004 .unwrap();
2005
2006 // See that a guest has joined as client A.
2007 project_a
2008 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2009 .await;
2010
2011 // Drop client B's connection and ensure client A observes client B leaving the worktree.
2012 client_b.disconnect(&cx_b.to_async()).unwrap();
2013 project_a
2014 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2015 .await;
2016 }
2017
2018 #[gpui::test(iterations = 10)]
2019 async fn test_collaborating_with_diagnostics(
2020 mut cx_a: TestAppContext,
2021 mut cx_b: TestAppContext,
2022 ) {
2023 cx_a.foreground().forbid_parking();
2024 let mut lang_registry = Arc::new(LanguageRegistry::new());
2025 let fs = FakeFs::new(cx_a.background());
2026
2027 // Set up a fake language server.
2028 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2029 Arc::get_mut(&mut lang_registry)
2030 .unwrap()
2031 .add(Arc::new(Language::new(
2032 LanguageConfig {
2033 name: "Rust".into(),
2034 path_suffixes: vec!["rs".to_string()],
2035 language_server: Some(language_server_config),
2036 ..Default::default()
2037 },
2038 Some(tree_sitter_rust::language()),
2039 )));
2040
2041 // Connect to a server as 2 clients.
2042 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2043 let client_a = server.create_client(&mut cx_a, "user_a").await;
2044 let client_b = server.create_client(&mut cx_b, "user_b").await;
2045
2046 // Share a project as client A
2047 fs.insert_tree(
2048 "/a",
2049 json!({
2050 ".zed.toml": r#"collaborators = ["user_b"]"#,
2051 "a.rs": "let one = two",
2052 "other.rs": "",
2053 }),
2054 )
2055 .await;
2056 let project_a = cx_a.update(|cx| {
2057 Project::local(
2058 client_a.clone(),
2059 client_a.user_store.clone(),
2060 lang_registry.clone(),
2061 fs.clone(),
2062 cx,
2063 )
2064 });
2065 let (worktree_a, _) = project_a
2066 .update(&mut cx_a, |p, cx| {
2067 p.find_or_create_local_worktree("/a", false, cx)
2068 })
2069 .await
2070 .unwrap();
2071 worktree_a
2072 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2073 .await;
2074 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2075 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2076 project_a
2077 .update(&mut cx_a, |p, cx| p.share(cx))
2078 .await
2079 .unwrap();
2080
2081 // Cause the language server to start.
2082 let _ = cx_a
2083 .background()
2084 .spawn(project_a.update(&mut cx_a, |project, cx| {
2085 project.open_buffer(
2086 ProjectPath {
2087 worktree_id,
2088 path: Path::new("other.rs").into(),
2089 },
2090 cx,
2091 )
2092 }))
2093 .await
2094 .unwrap();
2095
2096 // Simulate a language server reporting errors for a file.
2097 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2098 fake_language_server
2099 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2100 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2101 version: None,
2102 diagnostics: vec![lsp::Diagnostic {
2103 severity: Some(lsp::DiagnosticSeverity::ERROR),
2104 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2105 message: "message 1".to_string(),
2106 ..Default::default()
2107 }],
2108 })
2109 .await;
2110
2111 // Wait for server to see the diagnostics update.
2112 server
2113 .condition(|store| {
2114 let worktree = store
2115 .project(project_id)
2116 .unwrap()
2117 .share
2118 .as_ref()
2119 .unwrap()
2120 .worktrees
2121 .get(&worktree_id.to_proto())
2122 .unwrap();
2123
2124 !worktree.diagnostic_summaries.is_empty()
2125 })
2126 .await;
2127
2128 // Join the worktree as client B.
2129 let project_b = Project::remote(
2130 project_id,
2131 client_b.clone(),
2132 client_b.user_store.clone(),
2133 lang_registry.clone(),
2134 fs.clone(),
2135 &mut cx_b.to_async(),
2136 )
2137 .await
2138 .unwrap();
2139
2140 project_b.read_with(&cx_b, |project, cx| {
2141 assert_eq!(
2142 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2143 &[(
2144 ProjectPath {
2145 worktree_id,
2146 path: Arc::from(Path::new("a.rs")),
2147 },
2148 DiagnosticSummary {
2149 error_count: 1,
2150 warning_count: 0,
2151 ..Default::default()
2152 },
2153 )]
2154 )
2155 });
2156
2157 // Simulate a language server reporting more errors for a file.
2158 fake_language_server
2159 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2160 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2161 version: None,
2162 diagnostics: vec![
2163 lsp::Diagnostic {
2164 severity: Some(lsp::DiagnosticSeverity::ERROR),
2165 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2166 message: "message 1".to_string(),
2167 ..Default::default()
2168 },
2169 lsp::Diagnostic {
2170 severity: Some(lsp::DiagnosticSeverity::WARNING),
2171 range: lsp::Range::new(
2172 lsp::Position::new(0, 10),
2173 lsp::Position::new(0, 13),
2174 ),
2175 message: "message 2".to_string(),
2176 ..Default::default()
2177 },
2178 ],
2179 })
2180 .await;
2181
2182 // Client b gets the updated summaries
2183 project_b
2184 .condition(&cx_b, |project, cx| {
2185 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2186 == &[(
2187 ProjectPath {
2188 worktree_id,
2189 path: Arc::from(Path::new("a.rs")),
2190 },
2191 DiagnosticSummary {
2192 error_count: 1,
2193 warning_count: 1,
2194 ..Default::default()
2195 },
2196 )]
2197 })
2198 .await;
2199
2200 // Open the file with the errors on client B. They should be present.
2201 let buffer_b = cx_b
2202 .background()
2203 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2204 .await
2205 .unwrap();
2206
2207 buffer_b.read_with(&cx_b, |buffer, _| {
2208 assert_eq!(
2209 buffer
2210 .snapshot()
2211 .diagnostics_in_range::<_, Point>(0..buffer.len())
2212 .map(|entry| entry)
2213 .collect::<Vec<_>>(),
2214 &[
2215 DiagnosticEntry {
2216 range: Point::new(0, 4)..Point::new(0, 7),
2217 diagnostic: Diagnostic {
2218 group_id: 0,
2219 message: "message 1".to_string(),
2220 severity: lsp::DiagnosticSeverity::ERROR,
2221 is_primary: true,
2222 ..Default::default()
2223 }
2224 },
2225 DiagnosticEntry {
2226 range: Point::new(0, 10)..Point::new(0, 13),
2227 diagnostic: Diagnostic {
2228 group_id: 1,
2229 severity: lsp::DiagnosticSeverity::WARNING,
2230 message: "message 2".to_string(),
2231 is_primary: true,
2232 ..Default::default()
2233 }
2234 }
2235 ]
2236 );
2237 });
2238 }
2239
2240 #[gpui::test(iterations = 10)]
2241 async fn test_collaborating_with_completion(
2242 mut cx_a: TestAppContext,
2243 mut cx_b: TestAppContext,
2244 ) {
2245 cx_a.foreground().forbid_parking();
2246 let mut lang_registry = Arc::new(LanguageRegistry::new());
2247 let fs = FakeFs::new(cx_a.background());
2248
2249 // Set up a fake language server.
2250 let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2251 language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2252 completion_provider: Some(lsp::CompletionOptions {
2253 trigger_characters: Some(vec![".".to_string()]),
2254 ..Default::default()
2255 }),
2256 ..Default::default()
2257 });
2258 Arc::get_mut(&mut lang_registry)
2259 .unwrap()
2260 .add(Arc::new(Language::new(
2261 LanguageConfig {
2262 name: "Rust".into(),
2263 path_suffixes: vec!["rs".to_string()],
2264 language_server: Some(language_server_config),
2265 ..Default::default()
2266 },
2267 Some(tree_sitter_rust::language()),
2268 )));
2269
2270 // Connect to a server as 2 clients.
2271 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2272 let client_a = server.create_client(&mut cx_a, "user_a").await;
2273 let client_b = server.create_client(&mut cx_b, "user_b").await;
2274
2275 // Share a project as client A
2276 fs.insert_tree(
2277 "/a",
2278 json!({
2279 ".zed.toml": r#"collaborators = ["user_b"]"#,
2280 "main.rs": "fn main() { a }",
2281 "other.rs": "",
2282 }),
2283 )
2284 .await;
2285 let project_a = cx_a.update(|cx| {
2286 Project::local(
2287 client_a.clone(),
2288 client_a.user_store.clone(),
2289 lang_registry.clone(),
2290 fs.clone(),
2291 cx,
2292 )
2293 });
2294 let (worktree_a, _) = project_a
2295 .update(&mut cx_a, |p, cx| {
2296 p.find_or_create_local_worktree("/a", false, cx)
2297 })
2298 .await
2299 .unwrap();
2300 worktree_a
2301 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2302 .await;
2303 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2304 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2305 project_a
2306 .update(&mut cx_a, |p, cx| p.share(cx))
2307 .await
2308 .unwrap();
2309
2310 // Join the worktree as client B.
2311 let project_b = Project::remote(
2312 project_id,
2313 client_b.clone(),
2314 client_b.user_store.clone(),
2315 lang_registry.clone(),
2316 fs.clone(),
2317 &mut cx_b.to_async(),
2318 )
2319 .await
2320 .unwrap();
2321
2322 // Open a file in an editor as the guest.
2323 let buffer_b = project_b
2324 .update(&mut cx_b, |p, cx| {
2325 p.open_buffer((worktree_id, "main.rs"), cx)
2326 })
2327 .await
2328 .unwrap();
2329 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2330 let editor_b = cx_b.add_view(window_b, |cx| {
2331 Editor::for_buffer(
2332 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2333 Arc::new(|cx| EditorSettings::test(cx)),
2334 Some(project_b.clone()),
2335 cx,
2336 )
2337 });
2338
2339 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2340 buffer_b
2341 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2342 .await;
2343
2344 // Type a completion trigger character as the guest.
2345 editor_b.update(&mut cx_b, |editor, cx| {
2346 editor.select_ranges([13..13], None, cx);
2347 editor.handle_input(&Input(".".into()), cx);
2348 cx.focus(&editor_b);
2349 });
2350
2351 // Receive a completion request as the host's language server.
2352 // Return some completions from the host's language server.
2353 cx_a.foreground().start_waiting();
2354 fake_language_server
2355 .handle_request::<lsp::request::Completion, _>(|params, _| {
2356 assert_eq!(
2357 params.text_document_position.text_document.uri,
2358 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2359 );
2360 assert_eq!(
2361 params.text_document_position.position,
2362 lsp::Position::new(0, 14),
2363 );
2364
2365 Some(lsp::CompletionResponse::Array(vec![
2366 lsp::CompletionItem {
2367 label: "first_method(…)".into(),
2368 detail: Some("fn(&mut self, B) -> C".into()),
2369 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2370 new_text: "first_method($1)".to_string(),
2371 range: lsp::Range::new(
2372 lsp::Position::new(0, 14),
2373 lsp::Position::new(0, 14),
2374 ),
2375 })),
2376 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2377 ..Default::default()
2378 },
2379 lsp::CompletionItem {
2380 label: "second_method(…)".into(),
2381 detail: Some("fn(&mut self, C) -> D<E>".into()),
2382 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2383 new_text: "second_method()".to_string(),
2384 range: lsp::Range::new(
2385 lsp::Position::new(0, 14),
2386 lsp::Position::new(0, 14),
2387 ),
2388 })),
2389 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2390 ..Default::default()
2391 },
2392 ]))
2393 })
2394 .next()
2395 .await
2396 .unwrap();
2397 cx_a.foreground().finish_waiting();
2398
2399 // Open the buffer on the host.
2400 let buffer_a = project_a
2401 .update(&mut cx_a, |p, cx| {
2402 p.open_buffer((worktree_id, "main.rs"), cx)
2403 })
2404 .await
2405 .unwrap();
2406 buffer_a
2407 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2408 .await;
2409
2410 // Confirm a completion on the guest.
2411 editor_b
2412 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2413 .await;
2414 editor_b.update(&mut cx_b, |editor, cx| {
2415 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2416 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2417 });
2418
2419 // Return a resolved completion from the host's language server.
2420 // The resolved completion has an additional text edit.
2421 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2422 |params, _| {
2423 assert_eq!(params.label, "first_method(…)");
2424 lsp::CompletionItem {
2425 label: "first_method(…)".into(),
2426 detail: Some("fn(&mut self, B) -> C".into()),
2427 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2428 new_text: "first_method($1)".to_string(),
2429 range: lsp::Range::new(
2430 lsp::Position::new(0, 14),
2431 lsp::Position::new(0, 14),
2432 ),
2433 })),
2434 additional_text_edits: Some(vec![lsp::TextEdit {
2435 new_text: "use d::SomeTrait;\n".to_string(),
2436 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2437 }]),
2438 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2439 ..Default::default()
2440 }
2441 },
2442 );
2443
2444 // The additional edit is applied.
2445 buffer_a
2446 .condition(&cx_a, |buffer, _| {
2447 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2448 })
2449 .await;
2450 buffer_b
2451 .condition(&cx_b, |buffer, _| {
2452 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2453 })
2454 .await;
2455 }
2456
2457 #[gpui::test(iterations = 10)]
2458 async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2459 cx_a.foreground().forbid_parking();
2460 let mut lang_registry = Arc::new(LanguageRegistry::new());
2461 let fs = FakeFs::new(cx_a.background());
2462
2463 // Set up a fake language server.
2464 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2465 Arc::get_mut(&mut lang_registry)
2466 .unwrap()
2467 .add(Arc::new(Language::new(
2468 LanguageConfig {
2469 name: "Rust".into(),
2470 path_suffixes: vec!["rs".to_string()],
2471 language_server: Some(language_server_config),
2472 ..Default::default()
2473 },
2474 Some(tree_sitter_rust::language()),
2475 )));
2476
2477 // Connect to a server as 2 clients.
2478 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2479 let client_a = server.create_client(&mut cx_a, "user_a").await;
2480 let client_b = server.create_client(&mut cx_b, "user_b").await;
2481
2482 // Share a project as client A
2483 fs.insert_tree(
2484 "/a",
2485 json!({
2486 ".zed.toml": r#"collaborators = ["user_b"]"#,
2487 "a.rs": "let one = two",
2488 }),
2489 )
2490 .await;
2491 let project_a = cx_a.update(|cx| {
2492 Project::local(
2493 client_a.clone(),
2494 client_a.user_store.clone(),
2495 lang_registry.clone(),
2496 fs.clone(),
2497 cx,
2498 )
2499 });
2500 let (worktree_a, _) = project_a
2501 .update(&mut cx_a, |p, cx| {
2502 p.find_or_create_local_worktree("/a", false, cx)
2503 })
2504 .await
2505 .unwrap();
2506 worktree_a
2507 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2508 .await;
2509 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2510 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2511 project_a
2512 .update(&mut cx_a, |p, cx| p.share(cx))
2513 .await
2514 .unwrap();
2515
2516 // Join the worktree as client B.
2517 let project_b = Project::remote(
2518 project_id,
2519 client_b.clone(),
2520 client_b.user_store.clone(),
2521 lang_registry.clone(),
2522 fs.clone(),
2523 &mut cx_b.to_async(),
2524 )
2525 .await
2526 .unwrap();
2527
2528 let buffer_b = cx_b
2529 .background()
2530 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2531 .await
2532 .unwrap();
2533
2534 let format = project_b.update(&mut cx_b, |project, cx| {
2535 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2536 });
2537
2538 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2539 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2540 Some(vec![
2541 lsp::TextEdit {
2542 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2543 new_text: "h".to_string(),
2544 },
2545 lsp::TextEdit {
2546 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2547 new_text: "y".to_string(),
2548 },
2549 ])
2550 });
2551
2552 format.await.unwrap();
2553 assert_eq!(
2554 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2555 "let honey = two"
2556 );
2557 }
2558
2559 #[gpui::test(iterations = 10)]
2560 async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2561 cx_a.foreground().forbid_parking();
2562 let mut lang_registry = Arc::new(LanguageRegistry::new());
2563 let fs = FakeFs::new(cx_a.background());
2564 fs.insert_tree(
2565 "/root-1",
2566 json!({
2567 ".zed.toml": r#"collaborators = ["user_b"]"#,
2568 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2569 }),
2570 )
2571 .await;
2572 fs.insert_tree(
2573 "/root-2",
2574 json!({
2575 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2576 }),
2577 )
2578 .await;
2579
2580 // Set up a fake language server.
2581 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2582 Arc::get_mut(&mut lang_registry)
2583 .unwrap()
2584 .add(Arc::new(Language::new(
2585 LanguageConfig {
2586 name: "Rust".into(),
2587 path_suffixes: vec!["rs".to_string()],
2588 language_server: Some(language_server_config),
2589 ..Default::default()
2590 },
2591 Some(tree_sitter_rust::language()),
2592 )));
2593
2594 // Connect to a server as 2 clients.
2595 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2596 let client_a = server.create_client(&mut cx_a, "user_a").await;
2597 let client_b = server.create_client(&mut cx_b, "user_b").await;
2598
2599 // Share a project as client A
2600 let project_a = cx_a.update(|cx| {
2601 Project::local(
2602 client_a.clone(),
2603 client_a.user_store.clone(),
2604 lang_registry.clone(),
2605 fs.clone(),
2606 cx,
2607 )
2608 });
2609 let (worktree_a, _) = project_a
2610 .update(&mut cx_a, |p, cx| {
2611 p.find_or_create_local_worktree("/root-1", false, cx)
2612 })
2613 .await
2614 .unwrap();
2615 worktree_a
2616 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2617 .await;
2618 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2619 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2620 project_a
2621 .update(&mut cx_a, |p, cx| p.share(cx))
2622 .await
2623 .unwrap();
2624
2625 // Join the worktree as client B.
2626 let project_b = Project::remote(
2627 project_id,
2628 client_b.clone(),
2629 client_b.user_store.clone(),
2630 lang_registry.clone(),
2631 fs.clone(),
2632 &mut cx_b.to_async(),
2633 )
2634 .await
2635 .unwrap();
2636
2637 // Open the file on client B.
2638 let buffer_b = cx_b
2639 .background()
2640 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2641 .await
2642 .unwrap();
2643
2644 // Request the definition of a symbol as the guest.
2645 let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2646
2647 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2648 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2649 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2650 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2651 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2652 )))
2653 });
2654
2655 let definitions_1 = definitions_1.await.unwrap();
2656 cx_b.read(|cx| {
2657 assert_eq!(definitions_1.len(), 1);
2658 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2659 let target_buffer = definitions_1[0].buffer.read(cx);
2660 assert_eq!(
2661 target_buffer.text(),
2662 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2663 );
2664 assert_eq!(
2665 definitions_1[0].range.to_point(target_buffer),
2666 Point::new(0, 6)..Point::new(0, 9)
2667 );
2668 });
2669
2670 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2671 // the previous call to `definition`.
2672 let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2673 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2674 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2675 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2676 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2677 )))
2678 });
2679
2680 let definitions_2 = definitions_2.await.unwrap();
2681 cx_b.read(|cx| {
2682 assert_eq!(definitions_2.len(), 1);
2683 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2684 let target_buffer = definitions_2[0].buffer.read(cx);
2685 assert_eq!(
2686 target_buffer.text(),
2687 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2688 );
2689 assert_eq!(
2690 definitions_2[0].range.to_point(target_buffer),
2691 Point::new(1, 6)..Point::new(1, 11)
2692 );
2693 });
2694 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2695
2696 cx_b.update(|_| {
2697 drop(definitions_1);
2698 drop(definitions_2);
2699 });
2700 project_b
2701 .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2702 .await;
2703 }
2704
2705 #[gpui::test(iterations = 10)]
2706 async fn test_references(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2707 cx_a.foreground().forbid_parking();
2708 let mut lang_registry = Arc::new(LanguageRegistry::new());
2709 let fs = FakeFs::new(cx_a.background());
2710 fs.insert_tree(
2711 "/root-1",
2712 json!({
2713 ".zed.toml": r#"collaborators = ["user_b"]"#,
2714 "one.rs": "const ONE: usize = 1;",
2715 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2716 }),
2717 )
2718 .await;
2719 fs.insert_tree(
2720 "/root-2",
2721 json!({
2722 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2723 }),
2724 )
2725 .await;
2726
2727 // Set up a fake language server.
2728 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2729 Arc::get_mut(&mut lang_registry)
2730 .unwrap()
2731 .add(Arc::new(Language::new(
2732 LanguageConfig {
2733 name: "Rust".into(),
2734 path_suffixes: vec!["rs".to_string()],
2735 language_server: Some(language_server_config),
2736 ..Default::default()
2737 },
2738 Some(tree_sitter_rust::language()),
2739 )));
2740
2741 // Connect to a server as 2 clients.
2742 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2743 let client_a = server.create_client(&mut cx_a, "user_a").await;
2744 let client_b = server.create_client(&mut cx_b, "user_b").await;
2745
2746 // Share a project as client A
2747 let project_a = cx_a.update(|cx| {
2748 Project::local(
2749 client_a.clone(),
2750 client_a.user_store.clone(),
2751 lang_registry.clone(),
2752 fs.clone(),
2753 cx,
2754 )
2755 });
2756 let (worktree_a, _) = project_a
2757 .update(&mut cx_a, |p, cx| {
2758 p.find_or_create_local_worktree("/root-1", false, cx)
2759 })
2760 .await
2761 .unwrap();
2762 worktree_a
2763 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2764 .await;
2765 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2766 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2767 project_a
2768 .update(&mut cx_a, |p, cx| p.share(cx))
2769 .await
2770 .unwrap();
2771
2772 // Join the worktree as client B.
2773 let project_b = Project::remote(
2774 project_id,
2775 client_b.clone(),
2776 client_b.user_store.clone(),
2777 lang_registry.clone(),
2778 fs.clone(),
2779 &mut cx_b.to_async(),
2780 )
2781 .await
2782 .unwrap();
2783
2784 // Open the file on client B.
2785 let buffer_b = cx_b
2786 .background()
2787 .spawn(project_b.update(&mut cx_b, |p, cx| {
2788 p.open_buffer((worktree_id, "one.rs"), cx)
2789 }))
2790 .await
2791 .unwrap();
2792
2793 // Request references to a symbol as the guest.
2794 let references = project_b.update(&mut cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2795
2796 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2797 fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2798 assert_eq!(
2799 params.text_document_position.text_document.uri.as_str(),
2800 "file:///root-1/one.rs"
2801 );
2802 Some(vec![
2803 lsp::Location {
2804 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2805 range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2806 },
2807 lsp::Location {
2808 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2809 range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2810 },
2811 lsp::Location {
2812 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2813 range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2814 },
2815 ])
2816 });
2817
2818 let references = references.await.unwrap();
2819 cx_b.read(|cx| {
2820 assert_eq!(references.len(), 3);
2821 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2822
2823 let two_buffer = references[0].buffer.read(cx);
2824 let three_buffer = references[2].buffer.read(cx);
2825 assert_eq!(
2826 two_buffer.file().unwrap().path().as_ref(),
2827 Path::new("two.rs")
2828 );
2829 assert_eq!(references[1].buffer, references[0].buffer);
2830 assert_eq!(
2831 three_buffer.file().unwrap().full_path(cx),
2832 Path::new("three.rs")
2833 );
2834
2835 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2836 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2837 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2838 });
2839 }
2840
2841 #[gpui::test(iterations = 10)]
2842 async fn test_document_highlights(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2843 cx_a.foreground().forbid_parking();
2844 let lang_registry = Arc::new(LanguageRegistry::new());
2845 let fs = FakeFs::new(cx_a.background());
2846 fs.insert_tree(
2847 "/root-1",
2848 json!({
2849 ".zed.toml": r#"collaborators = ["user_b"]"#,
2850 "main.rs": "fn double(number: i32) -> i32 { number + number }",
2851 }),
2852 )
2853 .await;
2854
2855 // Set up a fake language server.
2856 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2857 lang_registry.add(Arc::new(Language::new(
2858 LanguageConfig {
2859 name: "Rust".into(),
2860 path_suffixes: vec!["rs".to_string()],
2861 language_server: Some(language_server_config),
2862 ..Default::default()
2863 },
2864 Some(tree_sitter_rust::language()),
2865 )));
2866
2867 // Connect to a server as 2 clients.
2868 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2869 let client_a = server.create_client(&mut cx_a, "user_a").await;
2870 let client_b = server.create_client(&mut cx_b, "user_b").await;
2871
2872 // Share a project as client A
2873 let project_a = cx_a.update(|cx| {
2874 Project::local(
2875 client_a.clone(),
2876 client_a.user_store.clone(),
2877 lang_registry.clone(),
2878 fs.clone(),
2879 cx,
2880 )
2881 });
2882 let (worktree_a, _) = project_a
2883 .update(&mut cx_a, |p, cx| {
2884 p.find_or_create_local_worktree("/root-1", false, cx)
2885 })
2886 .await
2887 .unwrap();
2888 worktree_a
2889 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2890 .await;
2891 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2892 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2893 project_a
2894 .update(&mut cx_a, |p, cx| p.share(cx))
2895 .await
2896 .unwrap();
2897
2898 // Join the worktree as client B.
2899 let project_b = Project::remote(
2900 project_id,
2901 client_b.clone(),
2902 client_b.user_store.clone(),
2903 lang_registry.clone(),
2904 fs.clone(),
2905 &mut cx_b.to_async(),
2906 )
2907 .await
2908 .unwrap();
2909
2910 // Open the file on client B.
2911 let buffer_b = cx_b
2912 .background()
2913 .spawn(project_b.update(&mut cx_b, |p, cx| {
2914 p.open_buffer((worktree_id, "main.rs"), cx)
2915 }))
2916 .await
2917 .unwrap();
2918
2919 // Request document highlights as the guest.
2920 let highlights =
2921 project_b.update(&mut cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2922
2923 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2924 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2925 |params, _| {
2926 assert_eq!(
2927 params
2928 .text_document_position_params
2929 .text_document
2930 .uri
2931 .as_str(),
2932 "file:///root-1/main.rs"
2933 );
2934 assert_eq!(
2935 params.text_document_position_params.position,
2936 lsp::Position::new(0, 34)
2937 );
2938 Some(vec![
2939 lsp::DocumentHighlight {
2940 kind: Some(lsp::DocumentHighlightKind::WRITE),
2941 range: lsp::Range::new(
2942 lsp::Position::new(0, 10),
2943 lsp::Position::new(0, 16),
2944 ),
2945 },
2946 lsp::DocumentHighlight {
2947 kind: Some(lsp::DocumentHighlightKind::READ),
2948 range: lsp::Range::new(
2949 lsp::Position::new(0, 32),
2950 lsp::Position::new(0, 38),
2951 ),
2952 },
2953 lsp::DocumentHighlight {
2954 kind: Some(lsp::DocumentHighlightKind::READ),
2955 range: lsp::Range::new(
2956 lsp::Position::new(0, 41),
2957 lsp::Position::new(0, 47),
2958 ),
2959 },
2960 ])
2961 },
2962 );
2963
2964 let highlights = highlights.await.unwrap();
2965 buffer_b.read_with(&cx_b, |buffer, _| {
2966 let snapshot = buffer.snapshot();
2967
2968 let highlights = highlights
2969 .into_iter()
2970 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2971 .collect::<Vec<_>>();
2972 assert_eq!(
2973 highlights,
2974 &[
2975 (lsp::DocumentHighlightKind::WRITE, 10..16),
2976 (lsp::DocumentHighlightKind::READ, 32..38),
2977 (lsp::DocumentHighlightKind::READ, 41..47)
2978 ]
2979 )
2980 });
2981 }
2982
2983 #[gpui::test(iterations = 10)]
2984 async fn test_project_symbols(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2985 cx_a.foreground().forbid_parking();
2986 let mut lang_registry = Arc::new(LanguageRegistry::new());
2987 let fs = FakeFs::new(cx_a.background());
2988 fs.insert_tree(
2989 "/code",
2990 json!({
2991 "crate-1": {
2992 ".zed.toml": r#"collaborators = ["user_b"]"#,
2993 "one.rs": "const ONE: usize = 1;",
2994 },
2995 "crate-2": {
2996 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2997 },
2998 "private": {
2999 "passwords.txt": "the-password",
3000 }
3001 }),
3002 )
3003 .await;
3004
3005 // Set up a fake language server.
3006 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3007 Arc::get_mut(&mut lang_registry)
3008 .unwrap()
3009 .add(Arc::new(Language::new(
3010 LanguageConfig {
3011 name: "Rust".into(),
3012 path_suffixes: vec!["rs".to_string()],
3013 language_server: Some(language_server_config),
3014 ..Default::default()
3015 },
3016 Some(tree_sitter_rust::language()),
3017 )));
3018
3019 // Connect to a server as 2 clients.
3020 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3021 let client_a = server.create_client(&mut cx_a, "user_a").await;
3022 let client_b = server.create_client(&mut cx_b, "user_b").await;
3023
3024 // Share a project as client A
3025 let project_a = cx_a.update(|cx| {
3026 Project::local(
3027 client_a.clone(),
3028 client_a.user_store.clone(),
3029 lang_registry.clone(),
3030 fs.clone(),
3031 cx,
3032 )
3033 });
3034 let (worktree_a, _) = project_a
3035 .update(&mut cx_a, |p, cx| {
3036 p.find_or_create_local_worktree("/code/crate-1", false, cx)
3037 })
3038 .await
3039 .unwrap();
3040 worktree_a
3041 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3042 .await;
3043 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3044 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3045 project_a
3046 .update(&mut cx_a, |p, cx| p.share(cx))
3047 .await
3048 .unwrap();
3049
3050 // Join the worktree as client B.
3051 let project_b = Project::remote(
3052 project_id,
3053 client_b.clone(),
3054 client_b.user_store.clone(),
3055 lang_registry.clone(),
3056 fs.clone(),
3057 &mut cx_b.to_async(),
3058 )
3059 .await
3060 .unwrap();
3061
3062 // Cause the language server to start.
3063 let _buffer = cx_b
3064 .background()
3065 .spawn(project_b.update(&mut cx_b, |p, cx| {
3066 p.open_buffer((worktree_id, "one.rs"), cx)
3067 }))
3068 .await
3069 .unwrap();
3070
3071 // Request the definition of a symbol as the guest.
3072 let symbols = project_b.update(&mut cx_b, |p, cx| p.symbols("two", cx));
3073 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3074 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3075 #[allow(deprecated)]
3076 Some(vec![lsp::SymbolInformation {
3077 name: "TWO".into(),
3078 location: lsp::Location {
3079 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3080 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3081 },
3082 kind: lsp::SymbolKind::CONSTANT,
3083 tags: None,
3084 container_name: None,
3085 deprecated: None,
3086 }])
3087 });
3088
3089 let symbols = symbols.await.unwrap();
3090 assert_eq!(symbols.len(), 1);
3091 assert_eq!(symbols[0].name, "TWO");
3092
3093 // Open one of the returned symbols.
3094 let buffer_b_2 = project_b
3095 .update(&mut cx_b, |project, cx| {
3096 project.open_buffer_for_symbol(&symbols[0], cx)
3097 })
3098 .await
3099 .unwrap();
3100 buffer_b_2.read_with(&cx_b, |buffer, _| {
3101 assert_eq!(
3102 buffer.file().unwrap().path().as_ref(),
3103 Path::new("../crate-2/two.rs")
3104 );
3105 });
3106
3107 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3108 let mut fake_symbol = symbols[0].clone();
3109 fake_symbol.path = Path::new("/code/secrets").into();
3110 let error = project_b
3111 .update(&mut cx_b, |project, cx| {
3112 project.open_buffer_for_symbol(&fake_symbol, cx)
3113 })
3114 .await
3115 .unwrap_err();
3116 assert!(error.to_string().contains("invalid symbol signature"));
3117 }
3118
3119 #[gpui::test(iterations = 10)]
3120 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3121 mut cx_a: TestAppContext,
3122 mut cx_b: TestAppContext,
3123 mut rng: StdRng,
3124 ) {
3125 cx_a.foreground().forbid_parking();
3126 let mut lang_registry = Arc::new(LanguageRegistry::new());
3127 let fs = FakeFs::new(cx_a.background());
3128 fs.insert_tree(
3129 "/root",
3130 json!({
3131 ".zed.toml": r#"collaborators = ["user_b"]"#,
3132 "a.rs": "const ONE: usize = b::TWO;",
3133 "b.rs": "const TWO: usize = 2",
3134 }),
3135 )
3136 .await;
3137
3138 // Set up a fake language server.
3139 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3140
3141 Arc::get_mut(&mut lang_registry)
3142 .unwrap()
3143 .add(Arc::new(Language::new(
3144 LanguageConfig {
3145 name: "Rust".into(),
3146 path_suffixes: vec!["rs".to_string()],
3147 language_server: Some(language_server_config),
3148 ..Default::default()
3149 },
3150 Some(tree_sitter_rust::language()),
3151 )));
3152
3153 // Connect to a server as 2 clients.
3154 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3155 let client_a = server.create_client(&mut cx_a, "user_a").await;
3156 let client_b = server.create_client(&mut cx_b, "user_b").await;
3157
3158 // Share a project as client A
3159 let project_a = cx_a.update(|cx| {
3160 Project::local(
3161 client_a.clone(),
3162 client_a.user_store.clone(),
3163 lang_registry.clone(),
3164 fs.clone(),
3165 cx,
3166 )
3167 });
3168
3169 let (worktree_a, _) = project_a
3170 .update(&mut cx_a, |p, cx| {
3171 p.find_or_create_local_worktree("/root", false, cx)
3172 })
3173 .await
3174 .unwrap();
3175 worktree_a
3176 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3177 .await;
3178 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3179 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3180 project_a
3181 .update(&mut cx_a, |p, cx| p.share(cx))
3182 .await
3183 .unwrap();
3184
3185 // Join the worktree as client B.
3186 let project_b = Project::remote(
3187 project_id,
3188 client_b.clone(),
3189 client_b.user_store.clone(),
3190 lang_registry.clone(),
3191 fs.clone(),
3192 &mut cx_b.to_async(),
3193 )
3194 .await
3195 .unwrap();
3196
3197 let buffer_b1 = cx_b
3198 .background()
3199 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3200 .await
3201 .unwrap();
3202
3203 let definitions;
3204 let buffer_b2;
3205 if rng.gen() {
3206 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3207 buffer_b2 =
3208 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3209 } else {
3210 buffer_b2 =
3211 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3212 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3213 }
3214
3215 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3216 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3217 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3218 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3219 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3220 )))
3221 });
3222
3223 let buffer_b2 = buffer_b2.await.unwrap();
3224 let definitions = definitions.await.unwrap();
3225 assert_eq!(definitions.len(), 1);
3226 assert_eq!(definitions[0].buffer, buffer_b2);
3227 }
3228
3229 #[gpui::test(iterations = 10)]
3230 async fn test_collaborating_with_code_actions(
3231 mut cx_a: TestAppContext,
3232 mut cx_b: TestAppContext,
3233 ) {
3234 cx_a.foreground().forbid_parking();
3235 let mut lang_registry = Arc::new(LanguageRegistry::new());
3236 let fs = FakeFs::new(cx_a.background());
3237 let mut path_openers_b = Vec::new();
3238 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3239
3240 // Set up a fake language server.
3241 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3242 Arc::get_mut(&mut lang_registry)
3243 .unwrap()
3244 .add(Arc::new(Language::new(
3245 LanguageConfig {
3246 name: "Rust".into(),
3247 path_suffixes: vec!["rs".to_string()],
3248 language_server: Some(language_server_config),
3249 ..Default::default()
3250 },
3251 Some(tree_sitter_rust::language()),
3252 )));
3253
3254 // Connect to a server as 2 clients.
3255 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3256 let client_a = server.create_client(&mut cx_a, "user_a").await;
3257 let client_b = server.create_client(&mut cx_b, "user_b").await;
3258
3259 // Share a project as client A
3260 fs.insert_tree(
3261 "/a",
3262 json!({
3263 ".zed.toml": r#"collaborators = ["user_b"]"#,
3264 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3265 "other.rs": "pub fn foo() -> usize { 4 }",
3266 }),
3267 )
3268 .await;
3269 let project_a = cx_a.update(|cx| {
3270 Project::local(
3271 client_a.clone(),
3272 client_a.user_store.clone(),
3273 lang_registry.clone(),
3274 fs.clone(),
3275 cx,
3276 )
3277 });
3278 let (worktree_a, _) = project_a
3279 .update(&mut cx_a, |p, cx| {
3280 p.find_or_create_local_worktree("/a", false, cx)
3281 })
3282 .await
3283 .unwrap();
3284 worktree_a
3285 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3286 .await;
3287 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3288 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3289 project_a
3290 .update(&mut cx_a, |p, cx| p.share(cx))
3291 .await
3292 .unwrap();
3293
3294 // Join the worktree as client B.
3295 let project_b = Project::remote(
3296 project_id,
3297 client_b.clone(),
3298 client_b.user_store.clone(),
3299 lang_registry.clone(),
3300 fs.clone(),
3301 &mut cx_b.to_async(),
3302 )
3303 .await
3304 .unwrap();
3305 let mut params = cx_b.update(WorkspaceParams::test);
3306 params.languages = lang_registry.clone();
3307 params.client = client_b.client.clone();
3308 params.user_store = client_b.user_store.clone();
3309 params.project = project_b;
3310 params.path_openers = path_openers_b.into();
3311
3312 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3313 let editor_b = workspace_b
3314 .update(&mut cx_b, |workspace, cx| {
3315 workspace.open_path((worktree_id, "main.rs").into(), cx)
3316 })
3317 .await
3318 .unwrap()
3319 .downcast::<Editor>()
3320 .unwrap();
3321
3322 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3323 fake_language_server
3324 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3325 assert_eq!(
3326 params.text_document.uri,
3327 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3328 );
3329 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3330 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3331 None
3332 })
3333 .next()
3334 .await;
3335
3336 // Move cursor to a location that contains code actions.
3337 editor_b.update(&mut cx_b, |editor, cx| {
3338 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3339 cx.focus(&editor_b);
3340 });
3341
3342 fake_language_server
3343 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3344 assert_eq!(
3345 params.text_document.uri,
3346 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3347 );
3348 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3349 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3350
3351 Some(vec![lsp::CodeActionOrCommand::CodeAction(
3352 lsp::CodeAction {
3353 title: "Inline into all callers".to_string(),
3354 edit: Some(lsp::WorkspaceEdit {
3355 changes: Some(
3356 [
3357 (
3358 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3359 vec![lsp::TextEdit::new(
3360 lsp::Range::new(
3361 lsp::Position::new(1, 22),
3362 lsp::Position::new(1, 34),
3363 ),
3364 "4".to_string(),
3365 )],
3366 ),
3367 (
3368 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3369 vec![lsp::TextEdit::new(
3370 lsp::Range::new(
3371 lsp::Position::new(0, 0),
3372 lsp::Position::new(0, 27),
3373 ),
3374 "".to_string(),
3375 )],
3376 ),
3377 ]
3378 .into_iter()
3379 .collect(),
3380 ),
3381 ..Default::default()
3382 }),
3383 data: Some(json!({
3384 "codeActionParams": {
3385 "range": {
3386 "start": {"line": 1, "column": 31},
3387 "end": {"line": 1, "column": 31},
3388 }
3389 }
3390 })),
3391 ..Default::default()
3392 },
3393 )])
3394 })
3395 .next()
3396 .await;
3397
3398 // Toggle code actions and wait for them to display.
3399 editor_b.update(&mut cx_b, |editor, cx| {
3400 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3401 });
3402 editor_b
3403 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3404 .await;
3405
3406 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3407
3408 // Confirming the code action will trigger a resolve request.
3409 let confirm_action = workspace_b
3410 .update(&mut cx_b, |workspace, cx| {
3411 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3412 })
3413 .unwrap();
3414 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3415 lsp::CodeAction {
3416 title: "Inline into all callers".to_string(),
3417 edit: Some(lsp::WorkspaceEdit {
3418 changes: Some(
3419 [
3420 (
3421 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3422 vec![lsp::TextEdit::new(
3423 lsp::Range::new(
3424 lsp::Position::new(1, 22),
3425 lsp::Position::new(1, 34),
3426 ),
3427 "4".to_string(),
3428 )],
3429 ),
3430 (
3431 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3432 vec![lsp::TextEdit::new(
3433 lsp::Range::new(
3434 lsp::Position::new(0, 0),
3435 lsp::Position::new(0, 27),
3436 ),
3437 "".to_string(),
3438 )],
3439 ),
3440 ]
3441 .into_iter()
3442 .collect(),
3443 ),
3444 ..Default::default()
3445 }),
3446 ..Default::default()
3447 }
3448 });
3449
3450 // After the action is confirmed, an editor containing both modified files is opened.
3451 confirm_action.await.unwrap();
3452 let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3453 workspace
3454 .active_item(cx)
3455 .unwrap()
3456 .downcast::<Editor>()
3457 .unwrap()
3458 });
3459 code_action_editor.update(&mut cx_b, |editor, cx| {
3460 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3461 editor.undo(&Undo, cx);
3462 assert_eq!(
3463 editor.text(cx),
3464 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3465 );
3466 editor.redo(&Redo, cx);
3467 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3468 });
3469 }
3470
3471 #[gpui::test(iterations = 10)]
3472 async fn test_collaborating_with_renames(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3473 cx_a.foreground().forbid_parking();
3474 let mut lang_registry = Arc::new(LanguageRegistry::new());
3475 let fs = FakeFs::new(cx_a.background());
3476 let mut path_openers_b = Vec::new();
3477 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3478
3479 // Set up a fake language server.
3480 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3481 Arc::get_mut(&mut lang_registry)
3482 .unwrap()
3483 .add(Arc::new(Language::new(
3484 LanguageConfig {
3485 name: "Rust".into(),
3486 path_suffixes: vec!["rs".to_string()],
3487 language_server: Some(language_server_config),
3488 ..Default::default()
3489 },
3490 Some(tree_sitter_rust::language()),
3491 )));
3492
3493 // Connect to a server as 2 clients.
3494 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3495 let client_a = server.create_client(&mut cx_a, "user_a").await;
3496 let client_b = server.create_client(&mut cx_b, "user_b").await;
3497
3498 // Share a project as client A
3499 fs.insert_tree(
3500 "/dir",
3501 json!({
3502 ".zed.toml": r#"collaborators = ["user_b"]"#,
3503 "one.rs": "const ONE: usize = 1;",
3504 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3505 }),
3506 )
3507 .await;
3508 let project_a = cx_a.update(|cx| {
3509 Project::local(
3510 client_a.clone(),
3511 client_a.user_store.clone(),
3512 lang_registry.clone(),
3513 fs.clone(),
3514 cx,
3515 )
3516 });
3517 let (worktree_a, _) = project_a
3518 .update(&mut cx_a, |p, cx| {
3519 p.find_or_create_local_worktree("/dir", false, cx)
3520 })
3521 .await
3522 .unwrap();
3523 worktree_a
3524 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3525 .await;
3526 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3527 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3528 project_a
3529 .update(&mut cx_a, |p, cx| p.share(cx))
3530 .await
3531 .unwrap();
3532
3533 // Join the worktree as client B.
3534 let project_b = Project::remote(
3535 project_id,
3536 client_b.clone(),
3537 client_b.user_store.clone(),
3538 lang_registry.clone(),
3539 fs.clone(),
3540 &mut cx_b.to_async(),
3541 )
3542 .await
3543 .unwrap();
3544 let mut params = cx_b.update(WorkspaceParams::test);
3545 params.languages = lang_registry.clone();
3546 params.client = client_b.client.clone();
3547 params.user_store = client_b.user_store.clone();
3548 params.project = project_b;
3549 params.path_openers = path_openers_b.into();
3550
3551 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3552 let editor_b = workspace_b
3553 .update(&mut cx_b, |workspace, cx| {
3554 workspace.open_path((worktree_id, "one.rs").into(), cx)
3555 })
3556 .await
3557 .unwrap()
3558 .downcast::<Editor>()
3559 .unwrap();
3560 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3561
3562 // Move cursor to a location that can be renamed.
3563 let prepare_rename = editor_b.update(&mut cx_b, |editor, cx| {
3564 editor.select_ranges([7..7], None, cx);
3565 editor.rename(&Rename, cx).unwrap()
3566 });
3567
3568 fake_language_server
3569 .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3570 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3571 assert_eq!(params.position, lsp::Position::new(0, 7));
3572 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3573 lsp::Position::new(0, 6),
3574 lsp::Position::new(0, 9),
3575 )))
3576 })
3577 .next()
3578 .await
3579 .unwrap();
3580 prepare_rename.await.unwrap();
3581 editor_b.update(&mut cx_b, |editor, cx| {
3582 let rename = editor.pending_rename().unwrap();
3583 let buffer = editor.buffer().read(cx).snapshot(cx);
3584 assert_eq!(
3585 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3586 6..9
3587 );
3588 rename.editor.update(cx, |rename_editor, cx| {
3589 rename_editor.buffer().update(cx, |rename_buffer, cx| {
3590 rename_buffer.edit([0..3], "THREE", cx);
3591 });
3592 });
3593 });
3594
3595 let confirm_rename = workspace_b.update(&mut cx_b, |workspace, cx| {
3596 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3597 });
3598 fake_language_server
3599 .handle_request::<lsp::request::Rename, _>(|params, _| {
3600 assert_eq!(
3601 params.text_document_position.text_document.uri.as_str(),
3602 "file:///dir/one.rs"
3603 );
3604 assert_eq!(
3605 params.text_document_position.position,
3606 lsp::Position::new(0, 6)
3607 );
3608 assert_eq!(params.new_name, "THREE");
3609 Some(lsp::WorkspaceEdit {
3610 changes: Some(
3611 [
3612 (
3613 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3614 vec![lsp::TextEdit::new(
3615 lsp::Range::new(
3616 lsp::Position::new(0, 6),
3617 lsp::Position::new(0, 9),
3618 ),
3619 "THREE".to_string(),
3620 )],
3621 ),
3622 (
3623 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3624 vec![
3625 lsp::TextEdit::new(
3626 lsp::Range::new(
3627 lsp::Position::new(0, 24),
3628 lsp::Position::new(0, 27),
3629 ),
3630 "THREE".to_string(),
3631 ),
3632 lsp::TextEdit::new(
3633 lsp::Range::new(
3634 lsp::Position::new(0, 35),
3635 lsp::Position::new(0, 38),
3636 ),
3637 "THREE".to_string(),
3638 ),
3639 ],
3640 ),
3641 ]
3642 .into_iter()
3643 .collect(),
3644 ),
3645 ..Default::default()
3646 })
3647 })
3648 .next()
3649 .await
3650 .unwrap();
3651 confirm_rename.await.unwrap();
3652
3653 let rename_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3654 workspace
3655 .active_item(cx)
3656 .unwrap()
3657 .downcast::<Editor>()
3658 .unwrap()
3659 });
3660 rename_editor.update(&mut cx_b, |editor, cx| {
3661 assert_eq!(
3662 editor.text(cx),
3663 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3664 );
3665 editor.undo(&Undo, cx);
3666 assert_eq!(
3667 editor.text(cx),
3668 "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3669 );
3670 editor.redo(&Redo, cx);
3671 assert_eq!(
3672 editor.text(cx),
3673 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3674 );
3675 });
3676
3677 // Ensure temporary rename edits cannot be undone/redone.
3678 editor_b.update(&mut cx_b, |editor, cx| {
3679 editor.undo(&Undo, cx);
3680 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3681 editor.undo(&Undo, cx);
3682 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3683 editor.redo(&Redo, cx);
3684 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3685 })
3686 }
3687
3688 #[gpui::test(iterations = 10)]
3689 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3690 cx_a.foreground().forbid_parking();
3691
3692 // Connect to a server as 2 clients.
3693 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3694 let client_a = server.create_client(&mut cx_a, "user_a").await;
3695 let client_b = server.create_client(&mut cx_b, "user_b").await;
3696
3697 // Create an org that includes these 2 users.
3698 let db = &server.app_state.db;
3699 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3700 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3701 .await
3702 .unwrap();
3703 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3704 .await
3705 .unwrap();
3706
3707 // Create a channel that includes all the users.
3708 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3709 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3710 .await
3711 .unwrap();
3712 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3713 .await
3714 .unwrap();
3715 db.create_channel_message(
3716 channel_id,
3717 client_b.current_user_id(&cx_b),
3718 "hello A, it's B.",
3719 OffsetDateTime::now_utc(),
3720 1,
3721 )
3722 .await
3723 .unwrap();
3724
3725 let channels_a = cx_a
3726 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3727 channels_a
3728 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3729 .await;
3730 channels_a.read_with(&cx_a, |list, _| {
3731 assert_eq!(
3732 list.available_channels().unwrap(),
3733 &[ChannelDetails {
3734 id: channel_id.to_proto(),
3735 name: "test-channel".to_string()
3736 }]
3737 )
3738 });
3739 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3740 this.get_channel(channel_id.to_proto(), cx).unwrap()
3741 });
3742 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3743 channel_a
3744 .condition(&cx_a, |channel, _| {
3745 channel_messages(channel)
3746 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3747 })
3748 .await;
3749
3750 let channels_b = cx_b
3751 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3752 channels_b
3753 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3754 .await;
3755 channels_b.read_with(&cx_b, |list, _| {
3756 assert_eq!(
3757 list.available_channels().unwrap(),
3758 &[ChannelDetails {
3759 id: channel_id.to_proto(),
3760 name: "test-channel".to_string()
3761 }]
3762 )
3763 });
3764
3765 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3766 this.get_channel(channel_id.to_proto(), cx).unwrap()
3767 });
3768 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3769 channel_b
3770 .condition(&cx_b, |channel, _| {
3771 channel_messages(channel)
3772 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3773 })
3774 .await;
3775
3776 channel_a
3777 .update(&mut cx_a, |channel, cx| {
3778 channel
3779 .send_message("oh, hi B.".to_string(), cx)
3780 .unwrap()
3781 .detach();
3782 let task = channel.send_message("sup".to_string(), cx).unwrap();
3783 assert_eq!(
3784 channel_messages(channel),
3785 &[
3786 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3787 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3788 ("user_a".to_string(), "sup".to_string(), true)
3789 ]
3790 );
3791 task
3792 })
3793 .await
3794 .unwrap();
3795
3796 channel_b
3797 .condition(&cx_b, |channel, _| {
3798 channel_messages(channel)
3799 == [
3800 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3801 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3802 ("user_a".to_string(), "sup".to_string(), false),
3803 ]
3804 })
3805 .await;
3806
3807 assert_eq!(
3808 server
3809 .state()
3810 .await
3811 .channel(channel_id)
3812 .unwrap()
3813 .connection_ids
3814 .len(),
3815 2
3816 );
3817 cx_b.update(|_| drop(channel_b));
3818 server
3819 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3820 .await;
3821
3822 cx_a.update(|_| drop(channel_a));
3823 server
3824 .condition(|state| state.channel(channel_id).is_none())
3825 .await;
3826 }
3827
3828 #[gpui::test(iterations = 10)]
3829 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3830 cx_a.foreground().forbid_parking();
3831
3832 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3833 let client_a = server.create_client(&mut cx_a, "user_a").await;
3834
3835 let db = &server.app_state.db;
3836 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3837 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3838 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3839 .await
3840 .unwrap();
3841 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3842 .await
3843 .unwrap();
3844
3845 let channels_a = cx_a
3846 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3847 channels_a
3848 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3849 .await;
3850 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3851 this.get_channel(channel_id.to_proto(), cx).unwrap()
3852 });
3853
3854 // Messages aren't allowed to be too long.
3855 channel_a
3856 .update(&mut cx_a, |channel, cx| {
3857 let long_body = "this is long.\n".repeat(1024);
3858 channel.send_message(long_body, cx).unwrap()
3859 })
3860 .await
3861 .unwrap_err();
3862
3863 // Messages aren't allowed to be blank.
3864 channel_a.update(&mut cx_a, |channel, cx| {
3865 channel.send_message(String::new(), cx).unwrap_err()
3866 });
3867
3868 // Leading and trailing whitespace are trimmed.
3869 channel_a
3870 .update(&mut cx_a, |channel, cx| {
3871 channel
3872 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3873 .unwrap()
3874 })
3875 .await
3876 .unwrap();
3877 assert_eq!(
3878 db.get_channel_messages(channel_id, 10, None)
3879 .await
3880 .unwrap()
3881 .iter()
3882 .map(|m| &m.body)
3883 .collect::<Vec<_>>(),
3884 &["surrounded by whitespace"]
3885 );
3886 }
3887
3888 #[gpui::test(iterations = 10)]
3889 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3890 cx_a.foreground().forbid_parking();
3891
3892 // Connect to a server as 2 clients.
3893 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3894 let client_a = server.create_client(&mut cx_a, "user_a").await;
3895 let client_b = server.create_client(&mut cx_b, "user_b").await;
3896 let mut status_b = client_b.status();
3897
3898 // Create an org that includes these 2 users.
3899 let db = &server.app_state.db;
3900 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3901 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3902 .await
3903 .unwrap();
3904 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3905 .await
3906 .unwrap();
3907
3908 // Create a channel that includes all the users.
3909 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3910 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3911 .await
3912 .unwrap();
3913 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3914 .await
3915 .unwrap();
3916 db.create_channel_message(
3917 channel_id,
3918 client_b.current_user_id(&cx_b),
3919 "hello A, it's B.",
3920 OffsetDateTime::now_utc(),
3921 2,
3922 )
3923 .await
3924 .unwrap();
3925
3926 let channels_a = cx_a
3927 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3928 channels_a
3929 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3930 .await;
3931
3932 channels_a.read_with(&cx_a, |list, _| {
3933 assert_eq!(
3934 list.available_channels().unwrap(),
3935 &[ChannelDetails {
3936 id: channel_id.to_proto(),
3937 name: "test-channel".to_string()
3938 }]
3939 )
3940 });
3941 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3942 this.get_channel(channel_id.to_proto(), cx).unwrap()
3943 });
3944 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3945 channel_a
3946 .condition(&cx_a, |channel, _| {
3947 channel_messages(channel)
3948 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3949 })
3950 .await;
3951
3952 let channels_b = cx_b
3953 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3954 channels_b
3955 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3956 .await;
3957 channels_b.read_with(&cx_b, |list, _| {
3958 assert_eq!(
3959 list.available_channels().unwrap(),
3960 &[ChannelDetails {
3961 id: channel_id.to_proto(),
3962 name: "test-channel".to_string()
3963 }]
3964 )
3965 });
3966
3967 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3968 this.get_channel(channel_id.to_proto(), cx).unwrap()
3969 });
3970 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3971 channel_b
3972 .condition(&cx_b, |channel, _| {
3973 channel_messages(channel)
3974 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3975 })
3976 .await;
3977
3978 // Disconnect client B, ensuring we can still access its cached channel data.
3979 server.forbid_connections();
3980 server.disconnect_client(client_b.current_user_id(&cx_b));
3981 while !matches!(
3982 status_b.next().await,
3983 Some(client::Status::ReconnectionError { .. })
3984 ) {}
3985
3986 channels_b.read_with(&cx_b, |channels, _| {
3987 assert_eq!(
3988 channels.available_channels().unwrap(),
3989 [ChannelDetails {
3990 id: channel_id.to_proto(),
3991 name: "test-channel".to_string()
3992 }]
3993 )
3994 });
3995 channel_b.read_with(&cx_b, |channel, _| {
3996 assert_eq!(
3997 channel_messages(channel),
3998 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3999 )
4000 });
4001
4002 // Send a message from client B while it is disconnected.
4003 channel_b
4004 .update(&mut cx_b, |channel, cx| {
4005 let task = channel
4006 .send_message("can you see this?".to_string(), cx)
4007 .unwrap();
4008 assert_eq!(
4009 channel_messages(channel),
4010 &[
4011 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4012 ("user_b".to_string(), "can you see this?".to_string(), true)
4013 ]
4014 );
4015 task
4016 })
4017 .await
4018 .unwrap_err();
4019
4020 // Send a message from client A while B is disconnected.
4021 channel_a
4022 .update(&mut cx_a, |channel, cx| {
4023 channel
4024 .send_message("oh, hi B.".to_string(), cx)
4025 .unwrap()
4026 .detach();
4027 let task = channel.send_message("sup".to_string(), cx).unwrap();
4028 assert_eq!(
4029 channel_messages(channel),
4030 &[
4031 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4032 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4033 ("user_a".to_string(), "sup".to_string(), true)
4034 ]
4035 );
4036 task
4037 })
4038 .await
4039 .unwrap();
4040
4041 // Give client B a chance to reconnect.
4042 server.allow_connections();
4043 cx_b.foreground().advance_clock(Duration::from_secs(10));
4044
4045 // Verify that B sees the new messages upon reconnection, as well as the message client B
4046 // sent while offline.
4047 channel_b
4048 .condition(&cx_b, |channel, _| {
4049 channel_messages(channel)
4050 == [
4051 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4052 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4053 ("user_a".to_string(), "sup".to_string(), false),
4054 ("user_b".to_string(), "can you see this?".to_string(), false),
4055 ]
4056 })
4057 .await;
4058
4059 // Ensure client A and B can communicate normally after reconnection.
4060 channel_a
4061 .update(&mut cx_a, |channel, cx| {
4062 channel.send_message("you online?".to_string(), cx).unwrap()
4063 })
4064 .await
4065 .unwrap();
4066 channel_b
4067 .condition(&cx_b, |channel, _| {
4068 channel_messages(channel)
4069 == [
4070 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4071 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4072 ("user_a".to_string(), "sup".to_string(), false),
4073 ("user_b".to_string(), "can you see this?".to_string(), false),
4074 ("user_a".to_string(), "you online?".to_string(), false),
4075 ]
4076 })
4077 .await;
4078
4079 channel_b
4080 .update(&mut cx_b, |channel, cx| {
4081 channel.send_message("yep".to_string(), cx).unwrap()
4082 })
4083 .await
4084 .unwrap();
4085 channel_a
4086 .condition(&cx_a, |channel, _| {
4087 channel_messages(channel)
4088 == [
4089 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4090 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4091 ("user_a".to_string(), "sup".to_string(), false),
4092 ("user_b".to_string(), "can you see this?".to_string(), false),
4093 ("user_a".to_string(), "you online?".to_string(), false),
4094 ("user_b".to_string(), "yep".to_string(), false),
4095 ]
4096 })
4097 .await;
4098 }
4099
4100 #[gpui::test(iterations = 10)]
4101 async fn test_contacts(
4102 mut cx_a: TestAppContext,
4103 mut cx_b: TestAppContext,
4104 mut cx_c: TestAppContext,
4105 ) {
4106 cx_a.foreground().forbid_parking();
4107 let lang_registry = Arc::new(LanguageRegistry::new());
4108 let fs = FakeFs::new(cx_a.background());
4109
4110 // Connect to a server as 3 clients.
4111 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4112 let client_a = server.create_client(&mut cx_a, "user_a").await;
4113 let client_b = server.create_client(&mut cx_b, "user_b").await;
4114 let client_c = server.create_client(&mut cx_c, "user_c").await;
4115
4116 // Share a worktree as client A.
4117 fs.insert_tree(
4118 "/a",
4119 json!({
4120 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4121 }),
4122 )
4123 .await;
4124
4125 let project_a = cx_a.update(|cx| {
4126 Project::local(
4127 client_a.clone(),
4128 client_a.user_store.clone(),
4129 lang_registry.clone(),
4130 fs.clone(),
4131 cx,
4132 )
4133 });
4134 let (worktree_a, _) = project_a
4135 .update(&mut cx_a, |p, cx| {
4136 p.find_or_create_local_worktree("/a", false, cx)
4137 })
4138 .await
4139 .unwrap();
4140 worktree_a
4141 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4142 .await;
4143
4144 client_a
4145 .user_store
4146 .condition(&cx_a, |user_store, _| {
4147 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4148 })
4149 .await;
4150 client_b
4151 .user_store
4152 .condition(&cx_b, |user_store, _| {
4153 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4154 })
4155 .await;
4156 client_c
4157 .user_store
4158 .condition(&cx_c, |user_store, _| {
4159 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4160 })
4161 .await;
4162
4163 let project_id = project_a
4164 .update(&mut cx_a, |project, _| project.next_remote_id())
4165 .await;
4166 project_a
4167 .update(&mut cx_a, |project, cx| project.share(cx))
4168 .await
4169 .unwrap();
4170
4171 let _project_b = Project::remote(
4172 project_id,
4173 client_b.clone(),
4174 client_b.user_store.clone(),
4175 lang_registry.clone(),
4176 fs.clone(),
4177 &mut cx_b.to_async(),
4178 )
4179 .await
4180 .unwrap();
4181
4182 client_a
4183 .user_store
4184 .condition(&cx_a, |user_store, _| {
4185 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4186 })
4187 .await;
4188 client_b
4189 .user_store
4190 .condition(&cx_b, |user_store, _| {
4191 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4192 })
4193 .await;
4194 client_c
4195 .user_store
4196 .condition(&cx_c, |user_store, _| {
4197 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4198 })
4199 .await;
4200
4201 project_a
4202 .condition(&cx_a, |project, _| {
4203 project.collaborators().contains_key(&client_b.peer_id)
4204 })
4205 .await;
4206
4207 cx_a.update(move |_| drop(project_a));
4208 client_a
4209 .user_store
4210 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4211 .await;
4212 client_b
4213 .user_store
4214 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4215 .await;
4216 client_c
4217 .user_store
4218 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4219 .await;
4220
4221 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4222 user_store
4223 .contacts()
4224 .iter()
4225 .map(|contact| {
4226 let worktrees = contact
4227 .projects
4228 .iter()
4229 .map(|p| {
4230 (
4231 p.worktree_root_names[0].as_str(),
4232 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4233 )
4234 })
4235 .collect();
4236 (contact.user.github_login.as_str(), worktrees)
4237 })
4238 .collect()
4239 }
4240 }
4241
4242 #[gpui::test(iterations = 100)]
4243 async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
4244 cx.foreground().forbid_parking();
4245 let max_peers = env::var("MAX_PEERS")
4246 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4247 .unwrap_or(5);
4248 let max_operations = env::var("OPERATIONS")
4249 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4250 .unwrap_or(10);
4251
4252 let rng = Arc::new(Mutex::new(rng));
4253
4254 let guest_lang_registry = Arc::new(LanguageRegistry::new());
4255 let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4256
4257 let fs = FakeFs::new(cx.background());
4258 fs.insert_tree(
4259 "/_collab",
4260 json!({
4261 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4262 }),
4263 )
4264 .await;
4265
4266 let operations = Rc::new(Cell::new(0));
4267 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4268 let mut clients = Vec::new();
4269
4270 let mut next_entity_id = 100000;
4271 let mut host_cx = TestAppContext::new(
4272 cx.foreground_platform(),
4273 cx.platform(),
4274 cx.foreground(),
4275 cx.background(),
4276 cx.font_cache(),
4277 next_entity_id,
4278 );
4279 let host = server.create_client(&mut host_cx, "host").await;
4280 let host_project = host_cx.update(|cx| {
4281 Project::local(
4282 host.client.clone(),
4283 host.user_store.clone(),
4284 Arc::new(LanguageRegistry::new()),
4285 fs.clone(),
4286 cx,
4287 )
4288 });
4289 let host_project_id = host_project
4290 .update(&mut host_cx, |p, _| p.next_remote_id())
4291 .await;
4292
4293 let (collab_worktree, _) = host_project
4294 .update(&mut host_cx, |project, cx| {
4295 project.find_or_create_local_worktree("/_collab", false, cx)
4296 })
4297 .await
4298 .unwrap();
4299 collab_worktree
4300 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4301 .await;
4302 host_project
4303 .update(&mut host_cx, |project, cx| project.share(cx))
4304 .await
4305 .unwrap();
4306
4307 clients.push(cx.foreground().spawn(host.simulate_host(
4308 host_project.clone(),
4309 language_server_config,
4310 operations.clone(),
4311 max_operations,
4312 rng.clone(),
4313 host_cx.clone(),
4314 )));
4315
4316 while operations.get() < max_operations {
4317 cx.background().simulate_random_delay().await;
4318 if clients.len() < max_peers && rng.lock().gen_bool(0.05) {
4319 operations.set(operations.get() + 1);
4320
4321 let guest_id = clients.len();
4322 log::info!("Adding guest {}", guest_id);
4323 next_entity_id += 100000;
4324 let mut guest_cx = TestAppContext::new(
4325 cx.foreground_platform(),
4326 cx.platform(),
4327 cx.foreground(),
4328 cx.background(),
4329 cx.font_cache(),
4330 next_entity_id,
4331 );
4332 let guest = server
4333 .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4334 .await;
4335 let guest_project = Project::remote(
4336 host_project_id,
4337 guest.client.clone(),
4338 guest.user_store.clone(),
4339 guest_lang_registry.clone(),
4340 fs.clone(),
4341 &mut guest_cx.to_async(),
4342 )
4343 .await
4344 .unwrap();
4345 clients.push(cx.foreground().spawn(guest.simulate_guest(
4346 guest_id,
4347 guest_project,
4348 operations.clone(),
4349 max_operations,
4350 rng.clone(),
4351 guest_cx,
4352 )));
4353
4354 log::info!("Guest {} added", guest_id);
4355 }
4356 }
4357
4358 let clients = futures::future::join_all(clients).await;
4359 cx.foreground().run_until_parked();
4360
4361 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4362 project
4363 .worktrees(cx)
4364 .map(|worktree| {
4365 let snapshot = worktree.read(cx).snapshot();
4366 (snapshot.id(), snapshot)
4367 })
4368 .collect::<BTreeMap<_, _>>()
4369 });
4370
4371 for (guest_client, guest_cx) in clients.iter().skip(1) {
4372 let guest_id = guest_client.client.id();
4373 let worktree_snapshots =
4374 guest_client
4375 .project
4376 .as_ref()
4377 .unwrap()
4378 .read_with(guest_cx, |project, cx| {
4379 project
4380 .worktrees(cx)
4381 .map(|worktree| {
4382 let worktree = worktree.read(cx);
4383 (worktree.id(), worktree.snapshot())
4384 })
4385 .collect::<BTreeMap<_, _>>()
4386 });
4387
4388 assert_eq!(
4389 worktree_snapshots.keys().collect::<Vec<_>>(),
4390 host_worktree_snapshots.keys().collect::<Vec<_>>(),
4391 "guest {} has different worktrees than the host",
4392 guest_id
4393 );
4394 for (id, host_snapshot) in &host_worktree_snapshots {
4395 let guest_snapshot = &worktree_snapshots[id];
4396 assert_eq!(
4397 guest_snapshot.root_name(),
4398 host_snapshot.root_name(),
4399 "guest {} has different root name than the host for worktree {}",
4400 guest_id,
4401 id
4402 );
4403 assert_eq!(
4404 guest_snapshot.entries(false).collect::<Vec<_>>(),
4405 host_snapshot.entries(false).collect::<Vec<_>>(),
4406 "guest {} has different snapshot than the host for worktree {}",
4407 guest_id,
4408 id
4409 );
4410 }
4411
4412 guest_client
4413 .project
4414 .as_ref()
4415 .unwrap()
4416 .read_with(guest_cx, |project, _| {
4417 assert!(
4418 !project.has_buffered_operations(),
4419 "guest {} has buffered operations ",
4420 guest_id,
4421 );
4422 });
4423
4424 for guest_buffer in &guest_client.buffers {
4425 let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4426 let host_buffer = host_project.read_with(&host_cx, |project, _| {
4427 project
4428 .shared_buffer(guest_client.peer_id, buffer_id)
4429 .expect(&format!(
4430 "host doest not have buffer for guest:{}, peer:{}, id:{}",
4431 guest_id, guest_client.peer_id, buffer_id
4432 ))
4433 });
4434 assert_eq!(
4435 guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4436 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4437 "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4438 guest_id,
4439 buffer_id,
4440 host_buffer
4441 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx))
4442 );
4443 }
4444 }
4445 }
4446
4447 struct TestServer {
4448 peer: Arc<Peer>,
4449 app_state: Arc<AppState>,
4450 server: Arc<Server>,
4451 foreground: Rc<executor::Foreground>,
4452 notifications: mpsc::UnboundedReceiver<()>,
4453 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4454 forbid_connections: Arc<AtomicBool>,
4455 _test_db: TestDb,
4456 }
4457
4458 impl TestServer {
4459 async fn start(
4460 foreground: Rc<executor::Foreground>,
4461 background: Arc<executor::Background>,
4462 ) -> Self {
4463 let test_db = TestDb::fake(background);
4464 let app_state = Self::build_app_state(&test_db).await;
4465 let peer = Peer::new();
4466 let notifications = mpsc::unbounded();
4467 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4468 Self {
4469 peer,
4470 app_state,
4471 server,
4472 foreground,
4473 notifications: notifications.1,
4474 connection_killers: Default::default(),
4475 forbid_connections: Default::default(),
4476 _test_db: test_db,
4477 }
4478 }
4479
4480 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4481 let http = FakeHttpClient::with_404_response();
4482 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4483 let client_name = name.to_string();
4484 let mut client = Client::new(http.clone());
4485 let server = self.server.clone();
4486 let connection_killers = self.connection_killers.clone();
4487 let forbid_connections = self.forbid_connections.clone();
4488 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4489
4490 Arc::get_mut(&mut client)
4491 .unwrap()
4492 .override_authenticate(move |cx| {
4493 cx.spawn(|_| async move {
4494 let access_token = "the-token".to_string();
4495 Ok(Credentials {
4496 user_id: user_id.0 as u64,
4497 access_token,
4498 })
4499 })
4500 })
4501 .override_establish_connection(move |credentials, cx| {
4502 assert_eq!(credentials.user_id, user_id.0 as u64);
4503 assert_eq!(credentials.access_token, "the-token");
4504
4505 let server = server.clone();
4506 let connection_killers = connection_killers.clone();
4507 let forbid_connections = forbid_connections.clone();
4508 let client_name = client_name.clone();
4509 let connection_id_tx = connection_id_tx.clone();
4510 cx.spawn(move |cx| async move {
4511 if forbid_connections.load(SeqCst) {
4512 Err(EstablishConnectionError::other(anyhow!(
4513 "server is forbidding connections"
4514 )))
4515 } else {
4516 let (client_conn, server_conn, kill_conn) =
4517 Connection::in_memory(cx.background());
4518 connection_killers.lock().insert(user_id, kill_conn);
4519 cx.background()
4520 .spawn(server.handle_connection(
4521 server_conn,
4522 client_name,
4523 user_id,
4524 Some(connection_id_tx),
4525 cx.background(),
4526 ))
4527 .detach();
4528 Ok(client_conn)
4529 }
4530 })
4531 });
4532
4533 client
4534 .authenticate_and_connect(&cx.to_async())
4535 .await
4536 .unwrap();
4537
4538 Channel::init(&client);
4539 Project::init(&client);
4540
4541 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4542 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4543 let mut authed_user =
4544 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4545 while authed_user.next().await.unwrap().is_none() {}
4546
4547 TestClient {
4548 client,
4549 peer_id,
4550 user_store,
4551 project: Default::default(),
4552 buffers: Default::default(),
4553 }
4554 }
4555
4556 fn disconnect_client(&self, user_id: UserId) {
4557 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4558 let _ = kill_conn.try_send(Some(()));
4559 }
4560 }
4561
4562 fn forbid_connections(&self) {
4563 self.forbid_connections.store(true, SeqCst);
4564 }
4565
4566 fn allow_connections(&self) {
4567 self.forbid_connections.store(false, SeqCst);
4568 }
4569
4570 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4571 let mut config = Config::default();
4572 config.session_secret = "a".repeat(32);
4573 config.database_url = test_db.url.clone();
4574 let github_client = github::AppClient::test();
4575 Arc::new(AppState {
4576 db: test_db.db().clone(),
4577 handlebars: Default::default(),
4578 auth_client: auth::build_client("", ""),
4579 repo_client: github::RepoClient::test(&github_client),
4580 github_client,
4581 config,
4582 })
4583 }
4584
4585 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4586 self.server.store.read()
4587 }
4588
4589 async fn condition<F>(&mut self, mut predicate: F)
4590 where
4591 F: FnMut(&Store) -> bool,
4592 {
4593 async_std::future::timeout(Duration::from_millis(500), async {
4594 while !(predicate)(&*self.server.store.read()) {
4595 self.foreground.start_waiting();
4596 self.notifications.next().await;
4597 self.foreground.finish_waiting();
4598 }
4599 })
4600 .await
4601 .expect("condition timed out");
4602 }
4603 }
4604
4605 impl Drop for TestServer {
4606 fn drop(&mut self) {
4607 self.peer.reset();
4608 }
4609 }
4610
4611 struct TestClient {
4612 client: Arc<Client>,
4613 pub peer_id: PeerId,
4614 pub user_store: ModelHandle<UserStore>,
4615 project: Option<ModelHandle<Project>>,
4616 buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4617 }
4618
4619 impl Deref for TestClient {
4620 type Target = Arc<Client>;
4621
4622 fn deref(&self) -> &Self::Target {
4623 &self.client
4624 }
4625 }
4626
4627 impl TestClient {
4628 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4629 UserId::from_proto(
4630 self.user_store
4631 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4632 )
4633 }
4634
4635 fn simulate_host(
4636 mut self,
4637 project: ModelHandle<Project>,
4638 mut language_server_config: LanguageServerConfig,
4639 operations: Rc<Cell<usize>>,
4640 max_operations: usize,
4641 rng: Arc<Mutex<StdRng>>,
4642 mut cx: TestAppContext,
4643 ) -> impl Future<Output = (Self, TestAppContext)> {
4644 let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4645
4646 // Set up a fake language server.
4647 language_server_config.set_fake_initializer({
4648 let rng = rng.clone();
4649 let files = files.clone();
4650 let project = project.clone();
4651 move |fake_server| {
4652 fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4653 Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4654 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4655 range: lsp::Range::new(
4656 lsp::Position::new(0, 0),
4657 lsp::Position::new(0, 0),
4658 ),
4659 new_text: "the-new-text".to_string(),
4660 })),
4661 ..Default::default()
4662 }]))
4663 });
4664
4665 fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4666 Some(vec![lsp::CodeActionOrCommand::CodeAction(
4667 lsp::CodeAction {
4668 title: "the-code-action".to_string(),
4669 ..Default::default()
4670 },
4671 )])
4672 });
4673
4674 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4675 |params, _| {
4676 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4677 params.position,
4678 params.position,
4679 )))
4680 },
4681 );
4682
4683 fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4684 let files = files.clone();
4685 let rng = rng.clone();
4686 move |_, _| {
4687 let files = files.lock();
4688 let mut rng = rng.lock();
4689 let count = rng.gen_range::<usize, _>(1..3);
4690 let files = (0..count)
4691 .map(|_| files.choose(&mut *rng).unwrap())
4692 .collect::<Vec<_>>();
4693 log::info!("LSP: Returning definitions in files {:?}", &files);
4694 Some(lsp::GotoDefinitionResponse::Array(
4695 files
4696 .into_iter()
4697 .map(|file| lsp::Location {
4698 uri: lsp::Url::from_file_path(file).unwrap(),
4699 range: Default::default(),
4700 })
4701 .collect(),
4702 ))
4703 }
4704 });
4705
4706 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4707 let rng = rng.clone();
4708 let project = project.clone();
4709 move |params, mut cx| {
4710 project.update(&mut cx, |project, cx| {
4711 let path = params
4712 .text_document_position_params
4713 .text_document
4714 .uri
4715 .to_file_path()
4716 .unwrap();
4717 let (worktree, relative_path) =
4718 project.find_local_worktree(&path, cx)?;
4719 let project_path =
4720 ProjectPath::from((worktree.read(cx).id(), relative_path));
4721 let buffer = project.get_open_buffer(&project_path, cx)?.read(cx);
4722
4723 let mut highlights = Vec::new();
4724 let highlight_count = rng.lock().gen_range(1..=5);
4725 let mut prev_end = 0;
4726 for _ in 0..highlight_count {
4727 let range =
4728 buffer.random_byte_range(prev_end, &mut *rng.lock());
4729 let start =
4730 buffer.offset_to_point_utf16(range.start).to_lsp_position();
4731 let end =
4732 buffer.offset_to_point_utf16(range.end).to_lsp_position();
4733 highlights.push(lsp::DocumentHighlight {
4734 range: lsp::Range::new(start, end),
4735 kind: Some(lsp::DocumentHighlightKind::READ),
4736 });
4737 prev_end = range.end;
4738 }
4739 Some(highlights)
4740 })
4741 }
4742 });
4743 }
4744 });
4745
4746 project.update(&mut cx, |project, _| {
4747 project.languages().add(Arc::new(Language::new(
4748 LanguageConfig {
4749 name: "Rust".into(),
4750 path_suffixes: vec!["rs".to_string()],
4751 language_server: Some(language_server_config),
4752 ..Default::default()
4753 },
4754 None,
4755 )));
4756 });
4757
4758 async move {
4759 let fs = project.read_with(&cx, |project, _| project.fs().clone());
4760 while operations.get() < max_operations {
4761 operations.set(operations.get() + 1);
4762
4763 let distribution = rng.lock().gen_range::<usize, _>(0..100);
4764 match distribution {
4765 0..=20 if !files.lock().is_empty() => {
4766 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4767 let mut path = path.as_path();
4768 while let Some(parent_path) = path.parent() {
4769 path = parent_path;
4770 if rng.lock().gen() {
4771 break;
4772 }
4773 }
4774
4775 log::info!("Host: find/create local worktree {:?}", path);
4776 project
4777 .update(&mut cx, |project, cx| {
4778 project.find_or_create_local_worktree(path, false, cx)
4779 })
4780 .await
4781 .unwrap();
4782 }
4783 10..=80 if !files.lock().is_empty() => {
4784 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4785 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4786 let (worktree, path) = project
4787 .update(&mut cx, |project, cx| {
4788 project.find_or_create_local_worktree(
4789 file.clone(),
4790 false,
4791 cx,
4792 )
4793 })
4794 .await
4795 .unwrap();
4796 let project_path =
4797 worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4798 log::info!("Host: opening path {:?}, {:?}", file, project_path);
4799 let buffer = project
4800 .update(&mut cx, |project, cx| {
4801 project.open_buffer(project_path, cx)
4802 })
4803 .await
4804 .unwrap();
4805 self.buffers.insert(buffer.clone());
4806 buffer
4807 } else {
4808 self.buffers
4809 .iter()
4810 .choose(&mut *rng.lock())
4811 .unwrap()
4812 .clone()
4813 };
4814
4815 if rng.lock().gen_bool(0.1) {
4816 cx.update(|cx| {
4817 log::info!(
4818 "Host: dropping buffer {:?}",
4819 buffer.read(cx).file().unwrap().full_path(cx)
4820 );
4821 self.buffers.remove(&buffer);
4822 drop(buffer);
4823 });
4824 } else {
4825 buffer.update(&mut cx, |buffer, cx| {
4826 log::info!(
4827 "Host: updating buffer {:?}",
4828 buffer.file().unwrap().full_path(cx)
4829 );
4830 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4831 });
4832 }
4833 }
4834 _ => loop {
4835 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4836 let mut path = PathBuf::new();
4837 path.push("/");
4838 for _ in 0..path_component_count {
4839 let letter = rng.lock().gen_range(b'a'..=b'z');
4840 path.push(std::str::from_utf8(&[letter]).unwrap());
4841 }
4842 path.set_extension("rs");
4843 let parent_path = path.parent().unwrap();
4844
4845 log::info!("Host: creating file {:?}", path,);
4846
4847 if fs.create_dir(&parent_path).await.is_ok()
4848 && fs.create_file(&path, Default::default()).await.is_ok()
4849 {
4850 files.lock().push(path);
4851 break;
4852 } else {
4853 log::info!("Host: cannot create file");
4854 }
4855 },
4856 }
4857
4858 cx.background().simulate_random_delay().await;
4859 }
4860
4861 log::info!("Host done");
4862
4863 self.project = Some(project);
4864 (self, cx)
4865 }
4866 }
4867
4868 pub async fn simulate_guest(
4869 mut self,
4870 guest_id: usize,
4871 project: ModelHandle<Project>,
4872 operations: Rc<Cell<usize>>,
4873 max_operations: usize,
4874 rng: Arc<Mutex<StdRng>>,
4875 mut cx: TestAppContext,
4876 ) -> (Self, TestAppContext) {
4877 while operations.get() < max_operations {
4878 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4879 let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4880 project
4881 .worktrees(&cx)
4882 .filter(|worktree| {
4883 let worktree = worktree.read(cx);
4884 !worktree.is_weak() && worktree.entries(false).any(|e| e.is_file())
4885 })
4886 .choose(&mut *rng.lock())
4887 }) {
4888 worktree
4889 } else {
4890 cx.background().simulate_random_delay().await;
4891 continue;
4892 };
4893
4894 operations.set(operations.get() + 1);
4895 let (worktree_root_name, project_path) =
4896 worktree.read_with(&cx, |worktree, _| {
4897 let entry = worktree
4898 .entries(false)
4899 .filter(|e| e.is_file())
4900 .choose(&mut *rng.lock())
4901 .unwrap();
4902 (
4903 worktree.root_name().to_string(),
4904 (worktree.id(), entry.path.clone()),
4905 )
4906 });
4907 log::info!(
4908 "Guest {}: opening path in worktree {:?} {:?} {:?}",
4909 guest_id,
4910 project_path.0,
4911 worktree_root_name,
4912 project_path.1
4913 );
4914 let buffer = project
4915 .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4916 .await
4917 .unwrap();
4918 self.buffers.insert(buffer.clone());
4919 buffer
4920 } else {
4921 operations.set(operations.get() + 1);
4922
4923 self.buffers
4924 .iter()
4925 .choose(&mut *rng.lock())
4926 .unwrap()
4927 .clone()
4928 };
4929
4930 let choice = rng.lock().gen_range(0..100);
4931 match choice {
4932 0..=9 => {
4933 cx.update(|cx| {
4934 log::info!(
4935 "Guest {}: dropping buffer {:?}",
4936 guest_id,
4937 buffer.read(cx).file().unwrap().full_path(cx)
4938 );
4939 self.buffers.remove(&buffer);
4940 drop(buffer);
4941 });
4942 }
4943 10..=19 => {
4944 let completions = project.update(&mut cx, |project, cx| {
4945 log::info!(
4946 "Guest {}: requesting completions for buffer {:?}",
4947 guest_id,
4948 buffer.read(cx).file().unwrap().full_path(cx)
4949 );
4950 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4951 project.completions(&buffer, offset, cx)
4952 });
4953 let completions = cx.background().spawn(async move {
4954 completions.await.expect("completions request failed");
4955 });
4956 if rng.lock().gen_bool(0.3) {
4957 log::info!("Guest {}: detaching completions request", guest_id);
4958 completions.detach();
4959 } else {
4960 completions.await;
4961 }
4962 }
4963 20..=29 => {
4964 let code_actions = project.update(&mut cx, |project, cx| {
4965 log::info!(
4966 "Guest {}: requesting code actions for buffer {:?}",
4967 guest_id,
4968 buffer.read(cx).file().unwrap().full_path(cx)
4969 );
4970 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4971 project.code_actions(&buffer, range, cx)
4972 });
4973 let code_actions = cx.background().spawn(async move {
4974 code_actions.await.expect("code actions request failed");
4975 });
4976 if rng.lock().gen_bool(0.3) {
4977 log::info!("Guest {}: detaching code actions request", guest_id);
4978 code_actions.detach();
4979 } else {
4980 code_actions.await;
4981 }
4982 }
4983 30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4984 let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4985 log::info!(
4986 "Guest {}: saving buffer {:?}",
4987 guest_id,
4988 buffer.file().unwrap().full_path(cx)
4989 );
4990 (buffer.version(), buffer.save(cx))
4991 });
4992 let save = cx.spawn(|cx| async move {
4993 let (saved_version, _) = save.await.expect("save request failed");
4994 buffer.read_with(&cx, |buffer, _| {
4995 assert!(buffer.version().observed_all(&saved_version));
4996 assert!(saved_version.observed_all(&requested_version));
4997 });
4998 });
4999 if rng.lock().gen_bool(0.3) {
5000 log::info!("Guest {}: detaching save request", guest_id);
5001 save.detach();
5002 } else {
5003 save.await;
5004 }
5005 }
5006 40..=45 => {
5007 let prepare_rename = project.update(&mut cx, |project, cx| {
5008 log::info!(
5009 "Guest {}: preparing rename for buffer {:?}",
5010 guest_id,
5011 buffer.read(cx).file().unwrap().full_path(cx)
5012 );
5013 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5014 project.prepare_rename(buffer, offset, cx)
5015 });
5016 let prepare_rename = cx.background().spawn(async move {
5017 prepare_rename.await.expect("prepare rename request failed");
5018 });
5019 if rng.lock().gen_bool(0.3) {
5020 log::info!("Guest {}: detaching prepare rename request", guest_id);
5021 prepare_rename.detach();
5022 } else {
5023 prepare_rename.await;
5024 }
5025 }
5026 46..=49 => {
5027 let definitions = project.update(&mut cx, |project, cx| {
5028 log::info!(
5029 "Guest {}: requesting defintions for buffer {:?}",
5030 guest_id,
5031 buffer.read(cx).file().unwrap().full_path(cx)
5032 );
5033 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5034 project.definition(&buffer, offset, cx)
5035 });
5036 let definitions = cx.background().spawn(async move {
5037 definitions.await.expect("definitions request failed")
5038 });
5039 if rng.lock().gen_bool(0.3) {
5040 log::info!("Guest {}: detaching definitions request", guest_id);
5041 definitions.detach();
5042 } else {
5043 self.buffers
5044 .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5045 }
5046 }
5047 50..=55 => {
5048 let highlights = project.update(&mut cx, |project, cx| {
5049 log::info!(
5050 "Guest {}: requesting highlights for buffer {:?}",
5051 guest_id,
5052 buffer.read(cx).file().unwrap().full_path(cx)
5053 );
5054 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5055 project.document_highlights(&buffer, offset, cx)
5056 });
5057 let highlights = cx.background().spawn(async move {
5058 highlights.await.expect("highlights request failed");
5059 });
5060 if rng.lock().gen_bool(0.3) {
5061 log::info!("Guest {}: detaching highlights request", guest_id);
5062 highlights.detach();
5063 } else {
5064 highlights.await;
5065 }
5066 }
5067 _ => {
5068 buffer.update(&mut cx, |buffer, cx| {
5069 log::info!(
5070 "Guest {}: updating buffer {:?}",
5071 guest_id,
5072 buffer.file().unwrap().full_path(cx)
5073 );
5074 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5075 });
5076 }
5077 }
5078 cx.background().simulate_random_delay().await;
5079 }
5080
5081 log::info!("Guest {} done", guest_id);
5082
5083 self.project = Some(project);
5084 (self, cx)
5085 }
5086 }
5087
5088 impl Executor for Arc<gpui::executor::Background> {
5089 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5090 self.spawn(future).detach();
5091 }
5092 }
5093
5094 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5095 channel
5096 .messages()
5097 .cursor::<()>()
5098 .map(|m| {
5099 (
5100 m.sender.github_login.clone(),
5101 m.body.clone(),
5102 m.is_pending(),
5103 )
5104 })
5105 .collect()
5106 }
5107
5108 struct EmptyView;
5109
5110 impl gpui::Entity for EmptyView {
5111 type Event = ();
5112 }
5113
5114 impl gpui::View for EmptyView {
5115 fn ui_name() -> &'static str {
5116 "empty view"
5117 }
5118
5119 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5120 gpui::Element::boxed(gpui::elements::Empty)
5121 }
5122 }
5123}