1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use rpc::{
15 proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
16 Connection, ConnectionId, Peer, TypedEnvelope,
17};
18use sha1::{Digest as _, Sha1};
19use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
20use store::{Store, Worktree};
21use surf::StatusCode;
22use tide::log;
23use tide::{
24 http::headers::{HeaderName, CONNECTION, UPGRADE},
25 Request, Response,
26};
27use time::OffsetDateTime;
28
29type MessageHandler = Box<
30 dyn Send
31 + Sync
32 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
33>;
34
35pub struct Server {
36 peer: Arc<Peer>,
37 store: RwLock<Store>,
38 app_state: Arc<AppState>,
39 handlers: HashMap<TypeId, MessageHandler>,
40 notifications: Option<mpsc::UnboundedSender<()>>,
41}
42
43pub trait Executor {
44 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
45}
46
47pub struct RealExecutor;
48
49const MESSAGE_COUNT_PER_PAGE: usize = 100;
50const MAX_MESSAGE_LEN: usize = 1024;
51
52impl Server {
53 pub fn new(
54 app_state: Arc<AppState>,
55 peer: Arc<Peer>,
56 notifications: Option<mpsc::UnboundedSender<()>>,
57 ) -> Arc<Self> {
58 let mut server = Self {
59 peer,
60 app_state,
61 store: Default::default(),
62 handlers: Default::default(),
63 notifications,
64 };
65
66 server
67 .add_request_handler(Server::ping)
68 .add_request_handler(Server::register_project)
69 .add_message_handler(Server::unregister_project)
70 .add_request_handler(Server::share_project)
71 .add_message_handler(Server::unshare_project)
72 .add_request_handler(Server::join_project)
73 .add_message_handler(Server::leave_project)
74 .add_request_handler(Server::register_worktree)
75 .add_message_handler(Server::unregister_worktree)
76 .add_request_handler(Server::share_worktree)
77 .add_request_handler(Server::update_worktree)
78 .add_message_handler(Server::update_diagnostic_summary)
79 .add_message_handler(Server::disk_based_diagnostics_updating)
80 .add_message_handler(Server::disk_based_diagnostics_updated)
81 .add_request_handler(Server::get_definition)
82 .add_request_handler(Server::get_references)
83 .add_request_handler(Server::get_document_highlights)
84 .add_request_handler(Server::get_project_symbols)
85 .add_request_handler(Server::open_buffer_for_symbol)
86 .add_request_handler(Server::open_buffer)
87 .add_message_handler(Server::close_buffer)
88 .add_request_handler(Server::update_buffer)
89 .add_message_handler(Server::update_buffer_file)
90 .add_message_handler(Server::buffer_reloaded)
91 .add_message_handler(Server::buffer_saved)
92 .add_request_handler(Server::save_buffer)
93 .add_request_handler(Server::format_buffers)
94 .add_request_handler(Server::get_completions)
95 .add_request_handler(Server::apply_additional_edits_for_completion)
96 .add_request_handler(Server::get_code_actions)
97 .add_request_handler(Server::apply_code_action)
98 .add_request_handler(Server::prepare_rename)
99 .add_request_handler(Server::perform_rename)
100 .add_request_handler(Server::get_channels)
101 .add_request_handler(Server::get_users)
102 .add_request_handler(Server::join_channel)
103 .add_message_handler(Server::leave_channel)
104 .add_request_handler(Server::send_channel_message)
105 .add_request_handler(Server::get_channel_messages);
106
107 Arc::new(server)
108 }
109
110 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
111 where
112 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
113 Fut: 'static + Send + Future<Output = tide::Result<()>>,
114 M: EnvelopedMessage,
115 {
116 let prev_handler = self.handlers.insert(
117 TypeId::of::<M>(),
118 Box::new(move |server, envelope| {
119 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
120 (handler)(server, *envelope).boxed()
121 }),
122 );
123 if prev_handler.is_some() {
124 panic!("registered a handler for the same message twice");
125 }
126 self
127 }
128
129 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
130 where
131 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
132 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
133 M: RequestMessage,
134 {
135 self.add_message_handler(move |server, envelope| {
136 let receipt = envelope.receipt();
137 let response = (handler)(server.clone(), envelope);
138 async move {
139 match response.await {
140 Ok(response) => {
141 server.peer.respond(receipt, response)?;
142 Ok(())
143 }
144 Err(error) => {
145 server.peer.respond_with_error(
146 receipt,
147 proto::Error {
148 message: error.to_string(),
149 },
150 )?;
151 Err(error)
152 }
153 }
154 }
155 })
156 }
157
158 pub fn handle_connection<E: Executor>(
159 self: &Arc<Self>,
160 connection: Connection,
161 addr: String,
162 user_id: UserId,
163 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
164 executor: E,
165 ) -> impl Future<Output = ()> {
166 let mut this = self.clone();
167 async move {
168 let (connection_id, handle_io, mut incoming_rx) =
169 this.peer.add_connection(connection).await;
170
171 if let Some(send_connection_id) = send_connection_id.as_mut() {
172 let _ = send_connection_id.send(connection_id).await;
173 }
174
175 this.state_mut().add_connection(connection_id, user_id);
176 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
177 log::error!("error updating contacts for {:?}: {}", user_id, err);
178 }
179
180 let handle_io = handle_io.fuse();
181 futures::pin_mut!(handle_io);
182 loop {
183 let next_message = incoming_rx.next().fuse();
184 futures::pin_mut!(next_message);
185 futures::select_biased! {
186 result = handle_io => {
187 if let Err(err) = result {
188 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
189 }
190 break;
191 }
192 message = next_message => {
193 if let Some(message) = message {
194 let start_time = Instant::now();
195 let type_name = message.payload_type_name();
196 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
197 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
198 let notifications = this.notifications.clone();
199 let is_background = message.is_background();
200 let handle_message = (handler)(this.clone(), message);
201 let handle_message = async move {
202 if let Err(err) = handle_message.await {
203 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
204 } else {
205 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
206 }
207 if let Some(mut notifications) = notifications {
208 let _ = notifications.send(()).await;
209 }
210 };
211 if is_background {
212 executor.spawn_detached(handle_message);
213 } else {
214 handle_message.await;
215 }
216 } else {
217 log::warn!("unhandled message: {}", type_name);
218 }
219 } else {
220 log::info!("rpc connection closed {:?}", addr);
221 break;
222 }
223 }
224 }
225 }
226
227 if let Err(err) = this.sign_out(connection_id).await {
228 log::error!("error signing out connection {:?} - {:?}", addr, err);
229 }
230 }
231 }
232
233 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
234 self.peer.disconnect(connection_id);
235 let removed_connection = self.state_mut().remove_connection(connection_id)?;
236
237 for (project_id, project) in removed_connection.hosted_projects {
238 if let Some(share) = project.share {
239 broadcast(
240 connection_id,
241 share.guests.keys().copied().collect(),
242 |conn_id| {
243 self.peer
244 .send(conn_id, proto::UnshareProject { project_id })
245 },
246 )?;
247 }
248 }
249
250 for (project_id, peer_ids) in removed_connection.guest_project_ids {
251 broadcast(connection_id, peer_ids, |conn_id| {
252 self.peer.send(
253 conn_id,
254 proto::RemoveProjectCollaborator {
255 project_id,
256 peer_id: connection_id.0,
257 },
258 )
259 })?;
260 }
261
262 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
263 Ok(())
264 }
265
266 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
267 Ok(proto::Ack {})
268 }
269
270 async fn register_project(
271 mut self: Arc<Server>,
272 request: TypedEnvelope<proto::RegisterProject>,
273 ) -> tide::Result<proto::RegisterProjectResponse> {
274 let project_id = {
275 let mut state = self.state_mut();
276 let user_id = state.user_id_for_connection(request.sender_id)?;
277 state.register_project(request.sender_id, user_id)
278 };
279 Ok(proto::RegisterProjectResponse { project_id })
280 }
281
282 async fn unregister_project(
283 mut self: Arc<Server>,
284 request: TypedEnvelope<proto::UnregisterProject>,
285 ) -> tide::Result<()> {
286 let project = self
287 .state_mut()
288 .unregister_project(request.payload.project_id, request.sender_id)?;
289 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
290 Ok(())
291 }
292
293 async fn share_project(
294 mut self: Arc<Server>,
295 request: TypedEnvelope<proto::ShareProject>,
296 ) -> tide::Result<proto::Ack> {
297 self.state_mut()
298 .share_project(request.payload.project_id, request.sender_id);
299 Ok(proto::Ack {})
300 }
301
302 async fn unshare_project(
303 mut self: Arc<Server>,
304 request: TypedEnvelope<proto::UnshareProject>,
305 ) -> tide::Result<()> {
306 let project_id = request.payload.project_id;
307 let project = self
308 .state_mut()
309 .unshare_project(project_id, request.sender_id)?;
310
311 broadcast(request.sender_id, project.connection_ids, |conn_id| {
312 self.peer
313 .send(conn_id, proto::UnshareProject { project_id })
314 })?;
315 self.update_contacts_for_users(&project.authorized_user_ids)?;
316 Ok(())
317 }
318
319 async fn join_project(
320 mut self: Arc<Server>,
321 request: TypedEnvelope<proto::JoinProject>,
322 ) -> tide::Result<proto::JoinProjectResponse> {
323 let project_id = request.payload.project_id;
324
325 let user_id = self.state().user_id_for_connection(request.sender_id)?;
326 let (response, connection_ids, contact_user_ids) = self
327 .state_mut()
328 .join_project(request.sender_id, user_id, project_id)
329 .and_then(|joined| {
330 let share = joined.project.share()?;
331 let peer_count = share.guests.len();
332 let mut collaborators = Vec::with_capacity(peer_count);
333 collaborators.push(proto::Collaborator {
334 peer_id: joined.project.host_connection_id.0,
335 replica_id: 0,
336 user_id: joined.project.host_user_id.to_proto(),
337 });
338 let worktrees = joined
339 .project
340 .worktrees
341 .iter()
342 .filter_map(|(id, worktree)| {
343 worktree.share.as_ref().map(|share| proto::Worktree {
344 id: *id,
345 root_name: worktree.root_name.clone(),
346 entries: share.entries.values().cloned().collect(),
347 diagnostic_summaries: share
348 .diagnostic_summaries
349 .values()
350 .cloned()
351 .collect(),
352 weak: worktree.weak,
353 next_update_id: share.next_update_id as u64,
354 })
355 })
356 .collect();
357 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
358 if *peer_conn_id != request.sender_id {
359 collaborators.push(proto::Collaborator {
360 peer_id: peer_conn_id.0,
361 replica_id: *peer_replica_id as u32,
362 user_id: peer_user_id.to_proto(),
363 });
364 }
365 }
366 let response = proto::JoinProjectResponse {
367 worktrees,
368 replica_id: joined.replica_id as u32,
369 collaborators,
370 };
371 let connection_ids = joined.project.connection_ids();
372 let contact_user_ids = joined.project.authorized_user_ids();
373 Ok((response, connection_ids, contact_user_ids))
374 })?;
375
376 broadcast(request.sender_id, connection_ids, |conn_id| {
377 self.peer.send(
378 conn_id,
379 proto::AddProjectCollaborator {
380 project_id,
381 collaborator: Some(proto::Collaborator {
382 peer_id: request.sender_id.0,
383 replica_id: response.replica_id,
384 user_id: user_id.to_proto(),
385 }),
386 },
387 )
388 })?;
389 self.update_contacts_for_users(&contact_user_ids)?;
390 Ok(response)
391 }
392
393 async fn leave_project(
394 mut self: Arc<Server>,
395 request: TypedEnvelope<proto::LeaveProject>,
396 ) -> tide::Result<()> {
397 let sender_id = request.sender_id;
398 let project_id = request.payload.project_id;
399 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
400
401 broadcast(sender_id, worktree.connection_ids, |conn_id| {
402 self.peer.send(
403 conn_id,
404 proto::RemoveProjectCollaborator {
405 project_id,
406 peer_id: sender_id.0,
407 },
408 )
409 })?;
410 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
411
412 Ok(())
413 }
414
415 async fn register_worktree(
416 mut self: Arc<Server>,
417 request: TypedEnvelope<proto::RegisterWorktree>,
418 ) -> tide::Result<proto::Ack> {
419 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
420
421 let mut contact_user_ids = HashSet::default();
422 contact_user_ids.insert(host_user_id);
423 for github_login in request.payload.authorized_logins {
424 let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
425 contact_user_ids.insert(contact_user_id);
426 }
427
428 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
429 self.state_mut().register_worktree(
430 request.payload.project_id,
431 request.payload.worktree_id,
432 request.sender_id,
433 Worktree {
434 authorized_user_ids: contact_user_ids.clone(),
435 root_name: request.payload.root_name,
436 share: None,
437 weak: false,
438 },
439 )?;
440 self.update_contacts_for_users(&contact_user_ids)?;
441 Ok(proto::Ack {})
442 }
443
444 async fn unregister_worktree(
445 mut self: Arc<Server>,
446 request: TypedEnvelope<proto::UnregisterWorktree>,
447 ) -> tide::Result<()> {
448 let project_id = request.payload.project_id;
449 let worktree_id = request.payload.worktree_id;
450 let (worktree, guest_connection_ids) =
451 self.state_mut()
452 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
453 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
454 self.peer.send(
455 conn_id,
456 proto::UnregisterWorktree {
457 project_id,
458 worktree_id,
459 },
460 )
461 })?;
462 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
463 Ok(())
464 }
465
466 async fn share_worktree(
467 mut self: Arc<Server>,
468 mut request: TypedEnvelope<proto::ShareWorktree>,
469 ) -> tide::Result<proto::Ack> {
470 let worktree = request
471 .payload
472 .worktree
473 .as_mut()
474 .ok_or_else(|| anyhow!("missing worktree"))?;
475 let entries = worktree
476 .entries
477 .iter()
478 .map(|entry| (entry.id, entry.clone()))
479 .collect();
480 let diagnostic_summaries = worktree
481 .diagnostic_summaries
482 .iter()
483 .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
484 .collect();
485
486 let shared_worktree = self.state_mut().share_worktree(
487 request.payload.project_id,
488 worktree.id,
489 request.sender_id,
490 entries,
491 diagnostic_summaries,
492 worktree.next_update_id,
493 )?;
494
495 broadcast(
496 request.sender_id,
497 shared_worktree.connection_ids,
498 |connection_id| {
499 self.peer
500 .forward_send(request.sender_id, connection_id, request.payload.clone())
501 },
502 )?;
503 self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
504
505 Ok(proto::Ack {})
506 }
507
508 async fn update_worktree(
509 mut self: Arc<Server>,
510 request: TypedEnvelope<proto::UpdateWorktree>,
511 ) -> tide::Result<proto::Ack> {
512 let connection_ids = self.state_mut().update_worktree(
513 request.sender_id,
514 request.payload.project_id,
515 request.payload.worktree_id,
516 request.payload.id,
517 &request.payload.removed_entries,
518 &request.payload.updated_entries,
519 )?;
520
521 broadcast(request.sender_id, connection_ids, |connection_id| {
522 self.peer
523 .forward_send(request.sender_id, connection_id, request.payload.clone())
524 })?;
525
526 Ok(proto::Ack {})
527 }
528
529 async fn update_diagnostic_summary(
530 mut self: Arc<Server>,
531 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
532 ) -> tide::Result<()> {
533 let summary = request
534 .payload
535 .summary
536 .clone()
537 .ok_or_else(|| anyhow!("invalid summary"))?;
538 let receiver_ids = self.state_mut().update_diagnostic_summary(
539 request.payload.project_id,
540 request.payload.worktree_id,
541 request.sender_id,
542 summary,
543 )?;
544
545 broadcast(request.sender_id, receiver_ids, |connection_id| {
546 self.peer
547 .forward_send(request.sender_id, connection_id, request.payload.clone())
548 })?;
549 Ok(())
550 }
551
552 async fn disk_based_diagnostics_updating(
553 self: Arc<Server>,
554 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
555 ) -> tide::Result<()> {
556 let receiver_ids = self
557 .state()
558 .project_connection_ids(request.payload.project_id, request.sender_id)?;
559 broadcast(request.sender_id, receiver_ids, |connection_id| {
560 self.peer
561 .forward_send(request.sender_id, connection_id, request.payload.clone())
562 })?;
563 Ok(())
564 }
565
566 async fn disk_based_diagnostics_updated(
567 self: Arc<Server>,
568 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
569 ) -> tide::Result<()> {
570 let receiver_ids = self
571 .state()
572 .project_connection_ids(request.payload.project_id, request.sender_id)?;
573 broadcast(request.sender_id, receiver_ids, |connection_id| {
574 self.peer
575 .forward_send(request.sender_id, connection_id, request.payload.clone())
576 })?;
577 Ok(())
578 }
579
580 async fn get_definition(
581 self: Arc<Server>,
582 request: TypedEnvelope<proto::GetDefinition>,
583 ) -> tide::Result<proto::GetDefinitionResponse> {
584 let host_connection_id = self
585 .state()
586 .read_project(request.payload.project_id, request.sender_id)?
587 .host_connection_id;
588 Ok(self
589 .peer
590 .forward_request(request.sender_id, host_connection_id, request.payload)
591 .await?)
592 }
593
594 async fn get_references(
595 self: Arc<Server>,
596 request: TypedEnvelope<proto::GetReferences>,
597 ) -> tide::Result<proto::GetReferencesResponse> {
598 let host_connection_id = self
599 .state()
600 .read_project(request.payload.project_id, request.sender_id)?
601 .host_connection_id;
602 Ok(self
603 .peer
604 .forward_request(request.sender_id, host_connection_id, request.payload)
605 .await?)
606 }
607
608 async fn get_document_highlights(
609 self: Arc<Server>,
610 request: TypedEnvelope<proto::GetDocumentHighlights>,
611 ) -> tide::Result<proto::GetDocumentHighlightsResponse> {
612 let host_connection_id = self
613 .state()
614 .read_project(request.payload.project_id, request.sender_id)?
615 .host_connection_id;
616 Ok(self
617 .peer
618 .forward_request(request.sender_id, host_connection_id, request.payload)
619 .await?)
620 }
621
622 async fn get_project_symbols(
623 self: Arc<Server>,
624 request: TypedEnvelope<proto::GetProjectSymbols>,
625 ) -> tide::Result<proto::GetProjectSymbolsResponse> {
626 let host_connection_id = self
627 .state()
628 .read_project(request.payload.project_id, request.sender_id)?
629 .host_connection_id;
630 Ok(self
631 .peer
632 .forward_request(request.sender_id, host_connection_id, request.payload)
633 .await?)
634 }
635
636 async fn open_buffer_for_symbol(
637 self: Arc<Server>,
638 request: TypedEnvelope<proto::OpenBufferForSymbol>,
639 ) -> tide::Result<proto::OpenBufferForSymbolResponse> {
640 let host_connection_id = self
641 .state()
642 .read_project(request.payload.project_id, request.sender_id)?
643 .host_connection_id;
644 Ok(self
645 .peer
646 .forward_request(request.sender_id, host_connection_id, request.payload)
647 .await?)
648 }
649
650 async fn open_buffer(
651 self: Arc<Server>,
652 request: TypedEnvelope<proto::OpenBuffer>,
653 ) -> tide::Result<proto::OpenBufferResponse> {
654 let host_connection_id = self
655 .state()
656 .read_project(request.payload.project_id, request.sender_id)?
657 .host_connection_id;
658 Ok(self
659 .peer
660 .forward_request(request.sender_id, host_connection_id, request.payload)
661 .await?)
662 }
663
664 async fn close_buffer(
665 self: Arc<Server>,
666 request: TypedEnvelope<proto::CloseBuffer>,
667 ) -> tide::Result<()> {
668 let host_connection_id = self
669 .state()
670 .read_project(request.payload.project_id, request.sender_id)?
671 .host_connection_id;
672 self.peer
673 .forward_send(request.sender_id, host_connection_id, request.payload)?;
674 Ok(())
675 }
676
677 async fn save_buffer(
678 self: Arc<Server>,
679 request: TypedEnvelope<proto::SaveBuffer>,
680 ) -> tide::Result<proto::BufferSaved> {
681 let host;
682 let mut guests;
683 {
684 let state = self.state();
685 let project = state.read_project(request.payload.project_id, request.sender_id)?;
686 host = project.host_connection_id;
687 guests = project.guest_connection_ids()
688 }
689
690 let response = self
691 .peer
692 .forward_request(request.sender_id, host, request.payload.clone())
693 .await?;
694
695 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
696 broadcast(host, guests, |conn_id| {
697 self.peer.forward_send(host, conn_id, response.clone())
698 })?;
699
700 Ok(response)
701 }
702
703 async fn format_buffers(
704 self: Arc<Server>,
705 request: TypedEnvelope<proto::FormatBuffers>,
706 ) -> tide::Result<proto::FormatBuffersResponse> {
707 let host = self
708 .state()
709 .read_project(request.payload.project_id, request.sender_id)?
710 .host_connection_id;
711 Ok(self
712 .peer
713 .forward_request(request.sender_id, host, request.payload.clone())
714 .await?)
715 }
716
717 async fn get_completions(
718 self: Arc<Server>,
719 request: TypedEnvelope<proto::GetCompletions>,
720 ) -> tide::Result<proto::GetCompletionsResponse> {
721 let host = self
722 .state()
723 .read_project(request.payload.project_id, request.sender_id)?
724 .host_connection_id;
725 Ok(self
726 .peer
727 .forward_request(request.sender_id, host, request.payload.clone())
728 .await?)
729 }
730
731 async fn apply_additional_edits_for_completion(
732 self: Arc<Server>,
733 request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
734 ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
735 let host = self
736 .state()
737 .read_project(request.payload.project_id, request.sender_id)?
738 .host_connection_id;
739 Ok(self
740 .peer
741 .forward_request(request.sender_id, host, request.payload.clone())
742 .await?)
743 }
744
745 async fn get_code_actions(
746 self: Arc<Server>,
747 request: TypedEnvelope<proto::GetCodeActions>,
748 ) -> tide::Result<proto::GetCodeActionsResponse> {
749 let host = self
750 .state()
751 .read_project(request.payload.project_id, request.sender_id)?
752 .host_connection_id;
753 Ok(self
754 .peer
755 .forward_request(request.sender_id, host, request.payload.clone())
756 .await?)
757 }
758
759 async fn apply_code_action(
760 self: Arc<Server>,
761 request: TypedEnvelope<proto::ApplyCodeAction>,
762 ) -> tide::Result<proto::ApplyCodeActionResponse> {
763 let host = self
764 .state()
765 .read_project(request.payload.project_id, request.sender_id)?
766 .host_connection_id;
767 Ok(self
768 .peer
769 .forward_request(request.sender_id, host, request.payload.clone())
770 .await?)
771 }
772
773 async fn prepare_rename(
774 self: Arc<Server>,
775 request: TypedEnvelope<proto::PrepareRename>,
776 ) -> tide::Result<proto::PrepareRenameResponse> {
777 let host = self
778 .state()
779 .read_project(request.payload.project_id, request.sender_id)?
780 .host_connection_id;
781 Ok(self
782 .peer
783 .forward_request(request.sender_id, host, request.payload.clone())
784 .await?)
785 }
786
787 async fn perform_rename(
788 self: Arc<Server>,
789 request: TypedEnvelope<proto::PerformRename>,
790 ) -> tide::Result<proto::PerformRenameResponse> {
791 let host = self
792 .state()
793 .read_project(request.payload.project_id, request.sender_id)?
794 .host_connection_id;
795 Ok(self
796 .peer
797 .forward_request(request.sender_id, host, request.payload.clone())
798 .await?)
799 }
800
801 async fn update_buffer(
802 self: Arc<Server>,
803 request: TypedEnvelope<proto::UpdateBuffer>,
804 ) -> tide::Result<proto::Ack> {
805 let receiver_ids = self
806 .state()
807 .project_connection_ids(request.payload.project_id, request.sender_id)?;
808 broadcast(request.sender_id, receiver_ids, |connection_id| {
809 self.peer
810 .forward_send(request.sender_id, connection_id, request.payload.clone())
811 })?;
812 Ok(proto::Ack {})
813 }
814
815 async fn update_buffer_file(
816 self: Arc<Server>,
817 request: TypedEnvelope<proto::UpdateBufferFile>,
818 ) -> tide::Result<()> {
819 let receiver_ids = self
820 .state()
821 .project_connection_ids(request.payload.project_id, request.sender_id)?;
822 broadcast(request.sender_id, receiver_ids, |connection_id| {
823 self.peer
824 .forward_send(request.sender_id, connection_id, request.payload.clone())
825 })?;
826 Ok(())
827 }
828
829 async fn buffer_reloaded(
830 self: Arc<Server>,
831 request: TypedEnvelope<proto::BufferReloaded>,
832 ) -> tide::Result<()> {
833 let receiver_ids = self
834 .state()
835 .project_connection_ids(request.payload.project_id, request.sender_id)?;
836 broadcast(request.sender_id, receiver_ids, |connection_id| {
837 self.peer
838 .forward_send(request.sender_id, connection_id, request.payload.clone())
839 })?;
840 Ok(())
841 }
842
843 async fn buffer_saved(
844 self: Arc<Server>,
845 request: TypedEnvelope<proto::BufferSaved>,
846 ) -> tide::Result<()> {
847 let receiver_ids = self
848 .state()
849 .project_connection_ids(request.payload.project_id, request.sender_id)?;
850 broadcast(request.sender_id, receiver_ids, |connection_id| {
851 self.peer
852 .forward_send(request.sender_id, connection_id, request.payload.clone())
853 })?;
854 Ok(())
855 }
856
857 async fn get_channels(
858 self: Arc<Server>,
859 request: TypedEnvelope<proto::GetChannels>,
860 ) -> tide::Result<proto::GetChannelsResponse> {
861 let user_id = self.state().user_id_for_connection(request.sender_id)?;
862 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
863 Ok(proto::GetChannelsResponse {
864 channels: channels
865 .into_iter()
866 .map(|chan| proto::Channel {
867 id: chan.id.to_proto(),
868 name: chan.name,
869 })
870 .collect(),
871 })
872 }
873
874 async fn get_users(
875 self: Arc<Server>,
876 request: TypedEnvelope<proto::GetUsers>,
877 ) -> tide::Result<proto::GetUsersResponse> {
878 let user_ids = request
879 .payload
880 .user_ids
881 .into_iter()
882 .map(UserId::from_proto)
883 .collect();
884 let users = self
885 .app_state
886 .db
887 .get_users_by_ids(user_ids)
888 .await?
889 .into_iter()
890 .map(|user| proto::User {
891 id: user.id.to_proto(),
892 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
893 github_login: user.github_login,
894 })
895 .collect();
896 Ok(proto::GetUsersResponse { users })
897 }
898
899 fn update_contacts_for_users<'a>(
900 self: &Arc<Server>,
901 user_ids: impl IntoIterator<Item = &'a UserId>,
902 ) -> anyhow::Result<()> {
903 let mut result = Ok(());
904 let state = self.state();
905 for user_id in user_ids {
906 let contacts = state.contacts_for_user(*user_id);
907 for connection_id in state.connection_ids_for_user(*user_id) {
908 if let Err(error) = self.peer.send(
909 connection_id,
910 proto::UpdateContacts {
911 contacts: contacts.clone(),
912 },
913 ) {
914 result = Err(error);
915 }
916 }
917 }
918 result
919 }
920
921 async fn join_channel(
922 mut self: Arc<Self>,
923 request: TypedEnvelope<proto::JoinChannel>,
924 ) -> tide::Result<proto::JoinChannelResponse> {
925 let user_id = self.state().user_id_for_connection(request.sender_id)?;
926 let channel_id = ChannelId::from_proto(request.payload.channel_id);
927 if !self
928 .app_state
929 .db
930 .can_user_access_channel(user_id, channel_id)
931 .await?
932 {
933 Err(anyhow!("access denied"))?;
934 }
935
936 self.state_mut().join_channel(request.sender_id, channel_id);
937 let messages = self
938 .app_state
939 .db
940 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
941 .await?
942 .into_iter()
943 .map(|msg| proto::ChannelMessage {
944 id: msg.id.to_proto(),
945 body: msg.body,
946 timestamp: msg.sent_at.unix_timestamp() as u64,
947 sender_id: msg.sender_id.to_proto(),
948 nonce: Some(msg.nonce.as_u128().into()),
949 })
950 .collect::<Vec<_>>();
951 Ok(proto::JoinChannelResponse {
952 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
953 messages,
954 })
955 }
956
957 async fn leave_channel(
958 mut self: Arc<Self>,
959 request: TypedEnvelope<proto::LeaveChannel>,
960 ) -> tide::Result<()> {
961 let user_id = self.state().user_id_for_connection(request.sender_id)?;
962 let channel_id = ChannelId::from_proto(request.payload.channel_id);
963 if !self
964 .app_state
965 .db
966 .can_user_access_channel(user_id, channel_id)
967 .await?
968 {
969 Err(anyhow!("access denied"))?;
970 }
971
972 self.state_mut()
973 .leave_channel(request.sender_id, channel_id);
974
975 Ok(())
976 }
977
978 async fn send_channel_message(
979 self: Arc<Self>,
980 request: TypedEnvelope<proto::SendChannelMessage>,
981 ) -> tide::Result<proto::SendChannelMessageResponse> {
982 let channel_id = ChannelId::from_proto(request.payload.channel_id);
983 let user_id;
984 let connection_ids;
985 {
986 let state = self.state();
987 user_id = state.user_id_for_connection(request.sender_id)?;
988 connection_ids = state.channel_connection_ids(channel_id)?;
989 }
990
991 // Validate the message body.
992 let body = request.payload.body.trim().to_string();
993 if body.len() > MAX_MESSAGE_LEN {
994 return Err(anyhow!("message is too long"))?;
995 }
996 if body.is_empty() {
997 return Err(anyhow!("message can't be blank"))?;
998 }
999
1000 let timestamp = OffsetDateTime::now_utc();
1001 let nonce = request
1002 .payload
1003 .nonce
1004 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1005
1006 let message_id = self
1007 .app_state
1008 .db
1009 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1010 .await?
1011 .to_proto();
1012 let message = proto::ChannelMessage {
1013 sender_id: user_id.to_proto(),
1014 id: message_id,
1015 body,
1016 timestamp: timestamp.unix_timestamp() as u64,
1017 nonce: Some(nonce),
1018 };
1019 broadcast(request.sender_id, connection_ids, |conn_id| {
1020 self.peer.send(
1021 conn_id,
1022 proto::ChannelMessageSent {
1023 channel_id: channel_id.to_proto(),
1024 message: Some(message.clone()),
1025 },
1026 )
1027 })?;
1028 Ok(proto::SendChannelMessageResponse {
1029 message: Some(message),
1030 })
1031 }
1032
1033 async fn get_channel_messages(
1034 self: Arc<Self>,
1035 request: TypedEnvelope<proto::GetChannelMessages>,
1036 ) -> tide::Result<proto::GetChannelMessagesResponse> {
1037 let user_id = self.state().user_id_for_connection(request.sender_id)?;
1038 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1039 if !self
1040 .app_state
1041 .db
1042 .can_user_access_channel(user_id, channel_id)
1043 .await?
1044 {
1045 Err(anyhow!("access denied"))?;
1046 }
1047
1048 let messages = self
1049 .app_state
1050 .db
1051 .get_channel_messages(
1052 channel_id,
1053 MESSAGE_COUNT_PER_PAGE,
1054 Some(MessageId::from_proto(request.payload.before_message_id)),
1055 )
1056 .await?
1057 .into_iter()
1058 .map(|msg| proto::ChannelMessage {
1059 id: msg.id.to_proto(),
1060 body: msg.body,
1061 timestamp: msg.sent_at.unix_timestamp() as u64,
1062 sender_id: msg.sender_id.to_proto(),
1063 nonce: Some(msg.nonce.as_u128().into()),
1064 })
1065 .collect::<Vec<_>>();
1066
1067 Ok(proto::GetChannelMessagesResponse {
1068 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1069 messages,
1070 })
1071 }
1072
1073 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1074 self.store.read()
1075 }
1076
1077 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1078 self.store.write()
1079 }
1080}
1081
1082impl Executor for RealExecutor {
1083 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1084 task::spawn(future);
1085 }
1086}
1087
1088fn broadcast<F>(
1089 sender_id: ConnectionId,
1090 receiver_ids: Vec<ConnectionId>,
1091 mut f: F,
1092) -> anyhow::Result<()>
1093where
1094 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1095{
1096 let mut result = Ok(());
1097 for receiver_id in receiver_ids {
1098 if receiver_id != sender_id {
1099 if let Err(error) = f(receiver_id) {
1100 if result.is_ok() {
1101 result = Err(error);
1102 }
1103 }
1104 }
1105 }
1106 result
1107}
1108
1109pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1110 let server = Server::new(app.state().clone(), rpc.clone(), None);
1111 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1112 let server = server.clone();
1113 async move {
1114 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1115
1116 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1117 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1118 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1119 let client_protocol_version: Option<u32> = request
1120 .header("X-Zed-Protocol-Version")
1121 .and_then(|v| v.as_str().parse().ok());
1122
1123 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1124 return Ok(Response::new(StatusCode::UpgradeRequired));
1125 }
1126
1127 let header = match request.header("Sec-Websocket-Key") {
1128 Some(h) => h.as_str(),
1129 None => return Err(anyhow!("expected sec-websocket-key"))?,
1130 };
1131
1132 let user_id = process_auth_header(&request).await?;
1133
1134 let mut response = Response::new(StatusCode::SwitchingProtocols);
1135 response.insert_header(UPGRADE, "websocket");
1136 response.insert_header(CONNECTION, "Upgrade");
1137 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1138 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1139 response.insert_header("Sec-Websocket-Version", "13");
1140
1141 let http_res: &mut tide::http::Response = response.as_mut();
1142 let upgrade_receiver = http_res.recv_upgrade().await;
1143 let addr = request.remote().unwrap_or("unknown").to_string();
1144 task::spawn(async move {
1145 if let Some(stream) = upgrade_receiver.await {
1146 server
1147 .handle_connection(
1148 Connection::new(
1149 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1150 ),
1151 addr,
1152 user_id,
1153 None,
1154 RealExecutor,
1155 )
1156 .await;
1157 }
1158 });
1159
1160 Ok(response)
1161 }
1162 });
1163}
1164
1165fn header_contains_ignore_case<T>(
1166 request: &tide::Request<T>,
1167 header_name: HeaderName,
1168 value: &str,
1169) -> bool {
1170 request
1171 .header(header_name)
1172 .map(|h| {
1173 h.as_str()
1174 .split(',')
1175 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1176 })
1177 .unwrap_or(false)
1178}
1179
1180#[cfg(test)]
1181mod tests {
1182 use super::*;
1183 use crate::{
1184 auth,
1185 db::{tests::TestDb, UserId},
1186 github, AppState, Config,
1187 };
1188 use ::rpc::Peer;
1189 use collections::BTreeMap;
1190 use gpui::{executor, ModelHandle, TestAppContext};
1191 use parking_lot::Mutex;
1192 use postage::{sink::Sink, watch};
1193 use rand::prelude::*;
1194 use rpc::PeerId;
1195 use serde_json::json;
1196 use sqlx::types::time::OffsetDateTime;
1197 use std::{
1198 cell::Cell,
1199 env,
1200 ops::Deref,
1201 path::Path,
1202 rc::Rc,
1203 sync::{
1204 atomic::{AtomicBool, Ordering::SeqCst},
1205 Arc,
1206 },
1207 time::Duration,
1208 };
1209 use zed::{
1210 client::{
1211 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1212 EstablishConnectionError, UserStore,
1213 },
1214 editor::{
1215 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, EditorSettings,
1216 Input, MultiBuffer, Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1217 },
1218 fs::{FakeFs, Fs as _},
1219 language::{
1220 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1221 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1222 },
1223 lsp,
1224 project::{DiagnosticSummary, Project, ProjectPath},
1225 workspace::{Workspace, WorkspaceParams},
1226 };
1227
1228 #[cfg(test)]
1229 #[ctor::ctor]
1230 fn init_logger() {
1231 if std::env::var("RUST_LOG").is_ok() {
1232 env_logger::init();
1233 }
1234 }
1235
1236 #[gpui::test(iterations = 10)]
1237 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1238 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1239 let lang_registry = Arc::new(LanguageRegistry::new());
1240 let fs = FakeFs::new(cx_a.background());
1241 cx_a.foreground().forbid_parking();
1242
1243 // Connect to a server as 2 clients.
1244 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1245 let client_a = server.create_client(&mut cx_a, "user_a").await;
1246 let client_b = server.create_client(&mut cx_b, "user_b").await;
1247
1248 // Share a project as client A
1249 fs.insert_tree(
1250 "/a",
1251 json!({
1252 ".zed.toml": r#"collaborators = ["user_b"]"#,
1253 "a.txt": "a-contents",
1254 "b.txt": "b-contents",
1255 }),
1256 )
1257 .await;
1258 let project_a = cx_a.update(|cx| {
1259 Project::local(
1260 client_a.clone(),
1261 client_a.user_store.clone(),
1262 lang_registry.clone(),
1263 fs.clone(),
1264 cx,
1265 )
1266 });
1267 let (worktree_a, _) = project_a
1268 .update(&mut cx_a, |p, cx| {
1269 p.find_or_create_local_worktree("/a", false, cx)
1270 })
1271 .await
1272 .unwrap();
1273 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1274 worktree_a
1275 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1276 .await;
1277 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1278 project_a
1279 .update(&mut cx_a, |p, cx| p.share(cx))
1280 .await
1281 .unwrap();
1282
1283 // Join that project as client B
1284 let project_b = Project::remote(
1285 project_id,
1286 client_b.clone(),
1287 client_b.user_store.clone(),
1288 lang_registry.clone(),
1289 fs.clone(),
1290 &mut cx_b.to_async(),
1291 )
1292 .await
1293 .unwrap();
1294
1295 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1296 assert_eq!(
1297 project
1298 .collaborators()
1299 .get(&client_a.peer_id)
1300 .unwrap()
1301 .user
1302 .github_login,
1303 "user_a"
1304 );
1305 project.replica_id()
1306 });
1307 project_a
1308 .condition(&cx_a, |tree, _| {
1309 tree.collaborators()
1310 .get(&client_b.peer_id)
1311 .map_or(false, |collaborator| {
1312 collaborator.replica_id == replica_id_b
1313 && collaborator.user.github_login == "user_b"
1314 })
1315 })
1316 .await;
1317
1318 // Open the same file as client B and client A.
1319 let buffer_b = project_b
1320 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1321 .await
1322 .unwrap();
1323 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1324 buffer_b.read_with(&cx_b, |buf, cx| {
1325 assert_eq!(buf.read(cx).text(), "b-contents")
1326 });
1327 project_a.read_with(&cx_a, |project, cx| {
1328 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1329 });
1330 let buffer_a = project_a
1331 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1332 .await
1333 .unwrap();
1334
1335 let editor_b = cx_b.add_view(window_b, |cx| {
1336 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1337 });
1338
1339 // TODO
1340 // // Create a selection set as client B and see that selection set as client A.
1341 // buffer_a
1342 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1343 // .await;
1344
1345 // Edit the buffer as client B and see that edit as client A.
1346 editor_b.update(&mut cx_b, |editor, cx| {
1347 editor.handle_input(&Input("ok, ".into()), cx)
1348 });
1349 buffer_a
1350 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1351 .await;
1352
1353 // TODO
1354 // // Remove the selection set as client B, see those selections disappear as client A.
1355 cx_b.update(move |_| drop(editor_b));
1356 // buffer_a
1357 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1358 // .await;
1359
1360 // Close the buffer as client A, see that the buffer is closed.
1361 cx_a.update(move |_| drop(buffer_a));
1362 project_a
1363 .condition(&cx_a, |project, cx| {
1364 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1365 })
1366 .await;
1367
1368 // Dropping the client B's project removes client B from client A's collaborators.
1369 cx_b.update(move |_| drop(project_b));
1370 project_a
1371 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1372 .await;
1373 }
1374
1375 #[gpui::test(iterations = 10)]
1376 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1377 let lang_registry = Arc::new(LanguageRegistry::new());
1378 let fs = FakeFs::new(cx_a.background());
1379 cx_a.foreground().forbid_parking();
1380
1381 // Connect to a server as 2 clients.
1382 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1383 let client_a = server.create_client(&mut cx_a, "user_a").await;
1384 let client_b = server.create_client(&mut cx_b, "user_b").await;
1385
1386 // Share a project as client A
1387 fs.insert_tree(
1388 "/a",
1389 json!({
1390 ".zed.toml": r#"collaborators = ["user_b"]"#,
1391 "a.txt": "a-contents",
1392 "b.txt": "b-contents",
1393 }),
1394 )
1395 .await;
1396 let project_a = cx_a.update(|cx| {
1397 Project::local(
1398 client_a.clone(),
1399 client_a.user_store.clone(),
1400 lang_registry.clone(),
1401 fs.clone(),
1402 cx,
1403 )
1404 });
1405 let (worktree_a, _) = project_a
1406 .update(&mut cx_a, |p, cx| {
1407 p.find_or_create_local_worktree("/a", false, cx)
1408 })
1409 .await
1410 .unwrap();
1411 worktree_a
1412 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1413 .await;
1414 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1415 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1416 project_a
1417 .update(&mut cx_a, |p, cx| p.share(cx))
1418 .await
1419 .unwrap();
1420 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1421
1422 // Join that project as client B
1423 let project_b = Project::remote(
1424 project_id,
1425 client_b.clone(),
1426 client_b.user_store.clone(),
1427 lang_registry.clone(),
1428 fs.clone(),
1429 &mut cx_b.to_async(),
1430 )
1431 .await
1432 .unwrap();
1433 project_b
1434 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1435 .await
1436 .unwrap();
1437
1438 // Unshare the project as client A
1439 project_a
1440 .update(&mut cx_a, |project, cx| project.unshare(cx))
1441 .await
1442 .unwrap();
1443 project_b
1444 .condition(&mut cx_b, |project, _| project.is_read_only())
1445 .await;
1446 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1447 drop(project_b);
1448
1449 // Share the project again and ensure guests can still join.
1450 project_a
1451 .update(&mut cx_a, |project, cx| project.share(cx))
1452 .await
1453 .unwrap();
1454 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1455
1456 let project_c = Project::remote(
1457 project_id,
1458 client_b.clone(),
1459 client_b.user_store.clone(),
1460 lang_registry.clone(),
1461 fs.clone(),
1462 &mut cx_b.to_async(),
1463 )
1464 .await
1465 .unwrap();
1466 project_c
1467 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1468 .await
1469 .unwrap();
1470 }
1471
1472 #[gpui::test(iterations = 10)]
1473 async fn test_propagate_saves_and_fs_changes(
1474 mut cx_a: TestAppContext,
1475 mut cx_b: TestAppContext,
1476 mut cx_c: TestAppContext,
1477 ) {
1478 let lang_registry = Arc::new(LanguageRegistry::new());
1479 let fs = FakeFs::new(cx_a.background());
1480 cx_a.foreground().forbid_parking();
1481
1482 // Connect to a server as 3 clients.
1483 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1484 let client_a = server.create_client(&mut cx_a, "user_a").await;
1485 let client_b = server.create_client(&mut cx_b, "user_b").await;
1486 let client_c = server.create_client(&mut cx_c, "user_c").await;
1487
1488 // Share a worktree as client A.
1489 fs.insert_tree(
1490 "/a",
1491 json!({
1492 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1493 "file1": "",
1494 "file2": ""
1495 }),
1496 )
1497 .await;
1498 let project_a = cx_a.update(|cx| {
1499 Project::local(
1500 client_a.clone(),
1501 client_a.user_store.clone(),
1502 lang_registry.clone(),
1503 fs.clone(),
1504 cx,
1505 )
1506 });
1507 let (worktree_a, _) = project_a
1508 .update(&mut cx_a, |p, cx| {
1509 p.find_or_create_local_worktree("/a", false, cx)
1510 })
1511 .await
1512 .unwrap();
1513 worktree_a
1514 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1515 .await;
1516 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1517 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1518 project_a
1519 .update(&mut cx_a, |p, cx| p.share(cx))
1520 .await
1521 .unwrap();
1522
1523 // Join that worktree as clients B and C.
1524 let project_b = Project::remote(
1525 project_id,
1526 client_b.clone(),
1527 client_b.user_store.clone(),
1528 lang_registry.clone(),
1529 fs.clone(),
1530 &mut cx_b.to_async(),
1531 )
1532 .await
1533 .unwrap();
1534 let project_c = Project::remote(
1535 project_id,
1536 client_c.clone(),
1537 client_c.user_store.clone(),
1538 lang_registry.clone(),
1539 fs.clone(),
1540 &mut cx_c.to_async(),
1541 )
1542 .await
1543 .unwrap();
1544 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1545 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1546
1547 // Open and edit a buffer as both guests B and C.
1548 let buffer_b = project_b
1549 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1550 .await
1551 .unwrap();
1552 let buffer_c = project_c
1553 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1554 .await
1555 .unwrap();
1556 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1557 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1558
1559 // Open and edit that buffer as the host.
1560 let buffer_a = project_a
1561 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1562 .await
1563 .unwrap();
1564
1565 buffer_a
1566 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1567 .await;
1568 buffer_a.update(&mut cx_a, |buf, cx| {
1569 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1570 });
1571
1572 // Wait for edits to propagate
1573 buffer_a
1574 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1575 .await;
1576 buffer_b
1577 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1578 .await;
1579 buffer_c
1580 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1581 .await;
1582
1583 // Edit the buffer as the host and concurrently save as guest B.
1584 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1585 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1586 save_b.await.unwrap();
1587 assert_eq!(
1588 fs.load("/a/file1".as_ref()).await.unwrap(),
1589 "hi-a, i-am-c, i-am-b, i-am-a"
1590 );
1591 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1592 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1593 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1594
1595 // Make changes on host's file system, see those changes on guest worktrees.
1596 fs.rename(
1597 "/a/file1".as_ref(),
1598 "/a/file1-renamed".as_ref(),
1599 Default::default(),
1600 )
1601 .await
1602 .unwrap();
1603
1604 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1605 .await
1606 .unwrap();
1607 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1608
1609 worktree_a
1610 .condition(&cx_a, |tree, _| {
1611 tree.paths()
1612 .map(|p| p.to_string_lossy())
1613 .collect::<Vec<_>>()
1614 == [".zed.toml", "file1-renamed", "file3", "file4"]
1615 })
1616 .await;
1617 worktree_b
1618 .condition(&cx_b, |tree, _| {
1619 tree.paths()
1620 .map(|p| p.to_string_lossy())
1621 .collect::<Vec<_>>()
1622 == [".zed.toml", "file1-renamed", "file3", "file4"]
1623 })
1624 .await;
1625 worktree_c
1626 .condition(&cx_c, |tree, _| {
1627 tree.paths()
1628 .map(|p| p.to_string_lossy())
1629 .collect::<Vec<_>>()
1630 == [".zed.toml", "file1-renamed", "file3", "file4"]
1631 })
1632 .await;
1633
1634 // Ensure buffer files are updated as well.
1635 buffer_a
1636 .condition(&cx_a, |buf, _| {
1637 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1638 })
1639 .await;
1640 buffer_b
1641 .condition(&cx_b, |buf, _| {
1642 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1643 })
1644 .await;
1645 buffer_c
1646 .condition(&cx_c, |buf, _| {
1647 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1648 })
1649 .await;
1650 }
1651
1652 #[gpui::test(iterations = 10)]
1653 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1654 cx_a.foreground().forbid_parking();
1655 let lang_registry = Arc::new(LanguageRegistry::new());
1656 let fs = FakeFs::new(cx_a.background());
1657
1658 // Connect to a server as 2 clients.
1659 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1660 let client_a = server.create_client(&mut cx_a, "user_a").await;
1661 let client_b = server.create_client(&mut cx_b, "user_b").await;
1662
1663 // Share a project as client A
1664 fs.insert_tree(
1665 "/dir",
1666 json!({
1667 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1668 "a.txt": "a-contents",
1669 }),
1670 )
1671 .await;
1672
1673 let project_a = cx_a.update(|cx| {
1674 Project::local(
1675 client_a.clone(),
1676 client_a.user_store.clone(),
1677 lang_registry.clone(),
1678 fs.clone(),
1679 cx,
1680 )
1681 });
1682 let (worktree_a, _) = project_a
1683 .update(&mut cx_a, |p, cx| {
1684 p.find_or_create_local_worktree("/dir", false, cx)
1685 })
1686 .await
1687 .unwrap();
1688 worktree_a
1689 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1690 .await;
1691 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1692 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1693 project_a
1694 .update(&mut cx_a, |p, cx| p.share(cx))
1695 .await
1696 .unwrap();
1697
1698 // Join that project as client B
1699 let project_b = Project::remote(
1700 project_id,
1701 client_b.clone(),
1702 client_b.user_store.clone(),
1703 lang_registry.clone(),
1704 fs.clone(),
1705 &mut cx_b.to_async(),
1706 )
1707 .await
1708 .unwrap();
1709
1710 // Open a buffer as client B
1711 let buffer_b = project_b
1712 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1713 .await
1714 .unwrap();
1715
1716 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1717 buffer_b.read_with(&cx_b, |buf, _| {
1718 assert!(buf.is_dirty());
1719 assert!(!buf.has_conflict());
1720 });
1721
1722 buffer_b
1723 .update(&mut cx_b, |buf, cx| buf.save(cx))
1724 .await
1725 .unwrap();
1726 buffer_b
1727 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1728 .await;
1729 buffer_b.read_with(&cx_b, |buf, _| {
1730 assert!(!buf.has_conflict());
1731 });
1732
1733 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1734 buffer_b.read_with(&cx_b, |buf, _| {
1735 assert!(buf.is_dirty());
1736 assert!(!buf.has_conflict());
1737 });
1738 }
1739
1740 #[gpui::test(iterations = 10)]
1741 async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1742 cx_a.foreground().forbid_parking();
1743 let lang_registry = Arc::new(LanguageRegistry::new());
1744 let fs = FakeFs::new(cx_a.background());
1745
1746 // Connect to a server as 2 clients.
1747 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1748 let client_a = server.create_client(&mut cx_a, "user_a").await;
1749 let client_b = server.create_client(&mut cx_b, "user_b").await;
1750
1751 // Share a project as client A
1752 fs.insert_tree(
1753 "/dir",
1754 json!({
1755 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1756 "a.txt": "a-contents",
1757 }),
1758 )
1759 .await;
1760
1761 let project_a = cx_a.update(|cx| {
1762 Project::local(
1763 client_a.clone(),
1764 client_a.user_store.clone(),
1765 lang_registry.clone(),
1766 fs.clone(),
1767 cx,
1768 )
1769 });
1770 let (worktree_a, _) = project_a
1771 .update(&mut cx_a, |p, cx| {
1772 p.find_or_create_local_worktree("/dir", false, cx)
1773 })
1774 .await
1775 .unwrap();
1776 worktree_a
1777 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1778 .await;
1779 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1780 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1781 project_a
1782 .update(&mut cx_a, |p, cx| p.share(cx))
1783 .await
1784 .unwrap();
1785
1786 // Join that project as client B
1787 let project_b = Project::remote(
1788 project_id,
1789 client_b.clone(),
1790 client_b.user_store.clone(),
1791 lang_registry.clone(),
1792 fs.clone(),
1793 &mut cx_b.to_async(),
1794 )
1795 .await
1796 .unwrap();
1797 let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1798
1799 // Open a buffer as client B
1800 let buffer_b = project_b
1801 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1802 .await
1803 .unwrap();
1804 buffer_b.read_with(&cx_b, |buf, _| {
1805 assert!(!buf.is_dirty());
1806 assert!(!buf.has_conflict());
1807 });
1808
1809 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1810 .await
1811 .unwrap();
1812 buffer_b
1813 .condition(&cx_b, |buf, _| {
1814 buf.text() == "new contents" && !buf.is_dirty()
1815 })
1816 .await;
1817 buffer_b.read_with(&cx_b, |buf, _| {
1818 assert!(!buf.has_conflict());
1819 });
1820 }
1821
1822 #[gpui::test(iterations = 10)]
1823 async fn test_editing_while_guest_opens_buffer(
1824 mut cx_a: TestAppContext,
1825 mut cx_b: TestAppContext,
1826 ) {
1827 cx_a.foreground().forbid_parking();
1828 let lang_registry = Arc::new(LanguageRegistry::new());
1829 let fs = FakeFs::new(cx_a.background());
1830
1831 // Connect to a server as 2 clients.
1832 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1833 let client_a = server.create_client(&mut cx_a, "user_a").await;
1834 let client_b = server.create_client(&mut cx_b, "user_b").await;
1835
1836 // Share a project as client A
1837 fs.insert_tree(
1838 "/dir",
1839 json!({
1840 ".zed.toml": r#"collaborators = ["user_b"]"#,
1841 "a.txt": "a-contents",
1842 }),
1843 )
1844 .await;
1845 let project_a = cx_a.update(|cx| {
1846 Project::local(
1847 client_a.clone(),
1848 client_a.user_store.clone(),
1849 lang_registry.clone(),
1850 fs.clone(),
1851 cx,
1852 )
1853 });
1854 let (worktree_a, _) = project_a
1855 .update(&mut cx_a, |p, cx| {
1856 p.find_or_create_local_worktree("/dir", false, cx)
1857 })
1858 .await
1859 .unwrap();
1860 worktree_a
1861 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1862 .await;
1863 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1864 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1865 project_a
1866 .update(&mut cx_a, |p, cx| p.share(cx))
1867 .await
1868 .unwrap();
1869
1870 // Join that project as client B
1871 let project_b = Project::remote(
1872 project_id,
1873 client_b.clone(),
1874 client_b.user_store.clone(),
1875 lang_registry.clone(),
1876 fs.clone(),
1877 &mut cx_b.to_async(),
1878 )
1879 .await
1880 .unwrap();
1881
1882 // Open a buffer as client A
1883 let buffer_a = project_a
1884 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1885 .await
1886 .unwrap();
1887
1888 // Start opening the same buffer as client B
1889 let buffer_b = cx_b
1890 .background()
1891 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1892
1893 // Edit the buffer as client A while client B is still opening it.
1894 cx_b.background().simulate_random_delay().await;
1895 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1896 cx_b.background().simulate_random_delay().await;
1897 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1898
1899 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1900 let buffer_b = buffer_b.await.unwrap();
1901 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1902 }
1903
1904 #[gpui::test(iterations = 10)]
1905 async fn test_leaving_worktree_while_opening_buffer(
1906 mut cx_a: TestAppContext,
1907 mut cx_b: TestAppContext,
1908 ) {
1909 cx_a.foreground().forbid_parking();
1910 let lang_registry = Arc::new(LanguageRegistry::new());
1911 let fs = FakeFs::new(cx_a.background());
1912
1913 // Connect to a server as 2 clients.
1914 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1915 let client_a = server.create_client(&mut cx_a, "user_a").await;
1916 let client_b = server.create_client(&mut cx_b, "user_b").await;
1917
1918 // Share a project as client A
1919 fs.insert_tree(
1920 "/dir",
1921 json!({
1922 ".zed.toml": r#"collaborators = ["user_b"]"#,
1923 "a.txt": "a-contents",
1924 }),
1925 )
1926 .await;
1927 let project_a = cx_a.update(|cx| {
1928 Project::local(
1929 client_a.clone(),
1930 client_a.user_store.clone(),
1931 lang_registry.clone(),
1932 fs.clone(),
1933 cx,
1934 )
1935 });
1936 let (worktree_a, _) = project_a
1937 .update(&mut cx_a, |p, cx| {
1938 p.find_or_create_local_worktree("/dir", false, cx)
1939 })
1940 .await
1941 .unwrap();
1942 worktree_a
1943 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1944 .await;
1945 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1946 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1947 project_a
1948 .update(&mut cx_a, |p, cx| p.share(cx))
1949 .await
1950 .unwrap();
1951
1952 // Join that project as client B
1953 let project_b = Project::remote(
1954 project_id,
1955 client_b.clone(),
1956 client_b.user_store.clone(),
1957 lang_registry.clone(),
1958 fs.clone(),
1959 &mut cx_b.to_async(),
1960 )
1961 .await
1962 .unwrap();
1963
1964 // See that a guest has joined as client A.
1965 project_a
1966 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1967 .await;
1968
1969 // Begin opening a buffer as client B, but leave the project before the open completes.
1970 let buffer_b = cx_b
1971 .background()
1972 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1973 cx_b.update(|_| drop(project_b));
1974 drop(buffer_b);
1975
1976 // See that the guest has left.
1977 project_a
1978 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1979 .await;
1980 }
1981
1982 #[gpui::test(iterations = 10)]
1983 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1984 cx_a.foreground().forbid_parking();
1985 let lang_registry = Arc::new(LanguageRegistry::new());
1986 let fs = FakeFs::new(cx_a.background());
1987
1988 // Connect to a server as 2 clients.
1989 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1990 let client_a = server.create_client(&mut cx_a, "user_a").await;
1991 let client_b = server.create_client(&mut cx_b, "user_b").await;
1992
1993 // Share a project as client A
1994 fs.insert_tree(
1995 "/a",
1996 json!({
1997 ".zed.toml": r#"collaborators = ["user_b"]"#,
1998 "a.txt": "a-contents",
1999 "b.txt": "b-contents",
2000 }),
2001 )
2002 .await;
2003 let project_a = cx_a.update(|cx| {
2004 Project::local(
2005 client_a.clone(),
2006 client_a.user_store.clone(),
2007 lang_registry.clone(),
2008 fs.clone(),
2009 cx,
2010 )
2011 });
2012 let (worktree_a, _) = project_a
2013 .update(&mut cx_a, |p, cx| {
2014 p.find_or_create_local_worktree("/a", false, cx)
2015 })
2016 .await
2017 .unwrap();
2018 worktree_a
2019 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2020 .await;
2021 let project_id = project_a
2022 .update(&mut cx_a, |project, _| project.next_remote_id())
2023 .await;
2024 project_a
2025 .update(&mut cx_a, |project, cx| project.share(cx))
2026 .await
2027 .unwrap();
2028
2029 // Join that project as client B
2030 let _project_b = Project::remote(
2031 project_id,
2032 client_b.clone(),
2033 client_b.user_store.clone(),
2034 lang_registry.clone(),
2035 fs.clone(),
2036 &mut cx_b.to_async(),
2037 )
2038 .await
2039 .unwrap();
2040
2041 // See that a guest has joined as client A.
2042 project_a
2043 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2044 .await;
2045
2046 // Drop client B's connection and ensure client A observes client B leaving the worktree.
2047 client_b.disconnect(&cx_b.to_async()).unwrap();
2048 project_a
2049 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2050 .await;
2051 }
2052
2053 #[gpui::test(iterations = 10)]
2054 async fn test_collaborating_with_diagnostics(
2055 mut cx_a: TestAppContext,
2056 mut cx_b: TestAppContext,
2057 ) {
2058 cx_a.foreground().forbid_parking();
2059 let mut lang_registry = Arc::new(LanguageRegistry::new());
2060 let fs = FakeFs::new(cx_a.background());
2061
2062 // Set up a fake language server.
2063 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2064 Arc::get_mut(&mut lang_registry)
2065 .unwrap()
2066 .add(Arc::new(Language::new(
2067 LanguageConfig {
2068 name: "Rust".into(),
2069 path_suffixes: vec!["rs".to_string()],
2070 language_server: Some(language_server_config),
2071 ..Default::default()
2072 },
2073 Some(tree_sitter_rust::language()),
2074 )));
2075
2076 // Connect to a server as 2 clients.
2077 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2078 let client_a = server.create_client(&mut cx_a, "user_a").await;
2079 let client_b = server.create_client(&mut cx_b, "user_b").await;
2080
2081 // Share a project as client A
2082 fs.insert_tree(
2083 "/a",
2084 json!({
2085 ".zed.toml": r#"collaborators = ["user_b"]"#,
2086 "a.rs": "let one = two",
2087 "other.rs": "",
2088 }),
2089 )
2090 .await;
2091 let project_a = cx_a.update(|cx| {
2092 Project::local(
2093 client_a.clone(),
2094 client_a.user_store.clone(),
2095 lang_registry.clone(),
2096 fs.clone(),
2097 cx,
2098 )
2099 });
2100 let (worktree_a, _) = project_a
2101 .update(&mut cx_a, |p, cx| {
2102 p.find_or_create_local_worktree("/a", false, cx)
2103 })
2104 .await
2105 .unwrap();
2106 worktree_a
2107 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2108 .await;
2109 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2110 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2111 project_a
2112 .update(&mut cx_a, |p, cx| p.share(cx))
2113 .await
2114 .unwrap();
2115
2116 // Cause the language server to start.
2117 let _ = cx_a
2118 .background()
2119 .spawn(project_a.update(&mut cx_a, |project, cx| {
2120 project.open_buffer(
2121 ProjectPath {
2122 worktree_id,
2123 path: Path::new("other.rs").into(),
2124 },
2125 cx,
2126 )
2127 }))
2128 .await
2129 .unwrap();
2130
2131 // Simulate a language server reporting errors for a file.
2132 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2133 fake_language_server
2134 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2135 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2136 version: None,
2137 diagnostics: vec![lsp::Diagnostic {
2138 severity: Some(lsp::DiagnosticSeverity::ERROR),
2139 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2140 message: "message 1".to_string(),
2141 ..Default::default()
2142 }],
2143 })
2144 .await;
2145
2146 // Wait for server to see the diagnostics update.
2147 server
2148 .condition(|store| {
2149 let worktree = store
2150 .project(project_id)
2151 .unwrap()
2152 .worktrees
2153 .get(&worktree_id.to_proto())
2154 .unwrap();
2155
2156 !worktree
2157 .share
2158 .as_ref()
2159 .unwrap()
2160 .diagnostic_summaries
2161 .is_empty()
2162 })
2163 .await;
2164
2165 // Join the worktree as client B.
2166 let project_b = Project::remote(
2167 project_id,
2168 client_b.clone(),
2169 client_b.user_store.clone(),
2170 lang_registry.clone(),
2171 fs.clone(),
2172 &mut cx_b.to_async(),
2173 )
2174 .await
2175 .unwrap();
2176
2177 project_b.read_with(&cx_b, |project, cx| {
2178 assert_eq!(
2179 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2180 &[(
2181 ProjectPath {
2182 worktree_id,
2183 path: Arc::from(Path::new("a.rs")),
2184 },
2185 DiagnosticSummary {
2186 error_count: 1,
2187 warning_count: 0,
2188 ..Default::default()
2189 },
2190 )]
2191 )
2192 });
2193
2194 // Simulate a language server reporting more errors for a file.
2195 fake_language_server
2196 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2197 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2198 version: None,
2199 diagnostics: vec![
2200 lsp::Diagnostic {
2201 severity: Some(lsp::DiagnosticSeverity::ERROR),
2202 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2203 message: "message 1".to_string(),
2204 ..Default::default()
2205 },
2206 lsp::Diagnostic {
2207 severity: Some(lsp::DiagnosticSeverity::WARNING),
2208 range: lsp::Range::new(
2209 lsp::Position::new(0, 10),
2210 lsp::Position::new(0, 13),
2211 ),
2212 message: "message 2".to_string(),
2213 ..Default::default()
2214 },
2215 ],
2216 })
2217 .await;
2218
2219 // Client b gets the updated summaries
2220 project_b
2221 .condition(&cx_b, |project, cx| {
2222 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2223 == &[(
2224 ProjectPath {
2225 worktree_id,
2226 path: Arc::from(Path::new("a.rs")),
2227 },
2228 DiagnosticSummary {
2229 error_count: 1,
2230 warning_count: 1,
2231 ..Default::default()
2232 },
2233 )]
2234 })
2235 .await;
2236
2237 // Open the file with the errors on client B. They should be present.
2238 let buffer_b = cx_b
2239 .background()
2240 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2241 .await
2242 .unwrap();
2243
2244 buffer_b.read_with(&cx_b, |buffer, _| {
2245 assert_eq!(
2246 buffer
2247 .snapshot()
2248 .diagnostics_in_range::<_, Point>(0..buffer.len())
2249 .map(|entry| entry)
2250 .collect::<Vec<_>>(),
2251 &[
2252 DiagnosticEntry {
2253 range: Point::new(0, 4)..Point::new(0, 7),
2254 diagnostic: Diagnostic {
2255 group_id: 0,
2256 message: "message 1".to_string(),
2257 severity: lsp::DiagnosticSeverity::ERROR,
2258 is_primary: true,
2259 ..Default::default()
2260 }
2261 },
2262 DiagnosticEntry {
2263 range: Point::new(0, 10)..Point::new(0, 13),
2264 diagnostic: Diagnostic {
2265 group_id: 1,
2266 severity: lsp::DiagnosticSeverity::WARNING,
2267 message: "message 2".to_string(),
2268 is_primary: true,
2269 ..Default::default()
2270 }
2271 }
2272 ]
2273 );
2274 });
2275 }
2276
2277 #[gpui::test(iterations = 10)]
2278 async fn test_collaborating_with_completion(
2279 mut cx_a: TestAppContext,
2280 mut cx_b: TestAppContext,
2281 ) {
2282 cx_a.foreground().forbid_parking();
2283 let mut lang_registry = Arc::new(LanguageRegistry::new());
2284 let fs = FakeFs::new(cx_a.background());
2285
2286 // Set up a fake language server.
2287 let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2288 language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2289 completion_provider: Some(lsp::CompletionOptions {
2290 trigger_characters: Some(vec![".".to_string()]),
2291 ..Default::default()
2292 }),
2293 ..Default::default()
2294 });
2295 Arc::get_mut(&mut lang_registry)
2296 .unwrap()
2297 .add(Arc::new(Language::new(
2298 LanguageConfig {
2299 name: "Rust".into(),
2300 path_suffixes: vec!["rs".to_string()],
2301 language_server: Some(language_server_config),
2302 ..Default::default()
2303 },
2304 Some(tree_sitter_rust::language()),
2305 )));
2306
2307 // Connect to a server as 2 clients.
2308 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2309 let client_a = server.create_client(&mut cx_a, "user_a").await;
2310 let client_b = server.create_client(&mut cx_b, "user_b").await;
2311
2312 // Share a project as client A
2313 fs.insert_tree(
2314 "/a",
2315 json!({
2316 ".zed.toml": r#"collaborators = ["user_b"]"#,
2317 "main.rs": "fn main() { a }",
2318 "other.rs": "",
2319 }),
2320 )
2321 .await;
2322 let project_a = cx_a.update(|cx| {
2323 Project::local(
2324 client_a.clone(),
2325 client_a.user_store.clone(),
2326 lang_registry.clone(),
2327 fs.clone(),
2328 cx,
2329 )
2330 });
2331 let (worktree_a, _) = project_a
2332 .update(&mut cx_a, |p, cx| {
2333 p.find_or_create_local_worktree("/a", false, cx)
2334 })
2335 .await
2336 .unwrap();
2337 worktree_a
2338 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2339 .await;
2340 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2341 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2342 project_a
2343 .update(&mut cx_a, |p, cx| p.share(cx))
2344 .await
2345 .unwrap();
2346
2347 // Join the worktree as client B.
2348 let project_b = Project::remote(
2349 project_id,
2350 client_b.clone(),
2351 client_b.user_store.clone(),
2352 lang_registry.clone(),
2353 fs.clone(),
2354 &mut cx_b.to_async(),
2355 )
2356 .await
2357 .unwrap();
2358
2359 // Open a file in an editor as the guest.
2360 let buffer_b = project_b
2361 .update(&mut cx_b, |p, cx| {
2362 p.open_buffer((worktree_id, "main.rs"), cx)
2363 })
2364 .await
2365 .unwrap();
2366 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2367 let editor_b = cx_b.add_view(window_b, |cx| {
2368 Editor::for_buffer(
2369 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2370 Arc::new(|cx| EditorSettings::test(cx)),
2371 Some(project_b.clone()),
2372 cx,
2373 )
2374 });
2375
2376 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2377 buffer_b
2378 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2379 .await;
2380
2381 // Type a completion trigger character as the guest.
2382 editor_b.update(&mut cx_b, |editor, cx| {
2383 editor.select_ranges([13..13], None, cx);
2384 editor.handle_input(&Input(".".into()), cx);
2385 cx.focus(&editor_b);
2386 });
2387
2388 // Receive a completion request as the host's language server.
2389 // Return some completions from the host's language server.
2390 cx_a.foreground().start_waiting();
2391 fake_language_server
2392 .handle_request::<lsp::request::Completion, _>(|params, _| {
2393 assert_eq!(
2394 params.text_document_position.text_document.uri,
2395 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2396 );
2397 assert_eq!(
2398 params.text_document_position.position,
2399 lsp::Position::new(0, 14),
2400 );
2401
2402 Some(lsp::CompletionResponse::Array(vec![
2403 lsp::CompletionItem {
2404 label: "first_method(…)".into(),
2405 detail: Some("fn(&mut self, B) -> C".into()),
2406 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2407 new_text: "first_method($1)".to_string(),
2408 range: lsp::Range::new(
2409 lsp::Position::new(0, 14),
2410 lsp::Position::new(0, 14),
2411 ),
2412 })),
2413 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2414 ..Default::default()
2415 },
2416 lsp::CompletionItem {
2417 label: "second_method(…)".into(),
2418 detail: Some("fn(&mut self, C) -> D<E>".into()),
2419 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2420 new_text: "second_method()".to_string(),
2421 range: lsp::Range::new(
2422 lsp::Position::new(0, 14),
2423 lsp::Position::new(0, 14),
2424 ),
2425 })),
2426 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2427 ..Default::default()
2428 },
2429 ]))
2430 })
2431 .next()
2432 .await
2433 .unwrap();
2434 cx_a.foreground().finish_waiting();
2435
2436 // Open the buffer on the host.
2437 let buffer_a = project_a
2438 .update(&mut cx_a, |p, cx| {
2439 p.open_buffer((worktree_id, "main.rs"), cx)
2440 })
2441 .await
2442 .unwrap();
2443 buffer_a
2444 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2445 .await;
2446
2447 // Confirm a completion on the guest.
2448 editor_b
2449 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2450 .await;
2451 editor_b.update(&mut cx_b, |editor, cx| {
2452 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2453 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2454 });
2455
2456 // Return a resolved completion from the host's language server.
2457 // The resolved completion has an additional text edit.
2458 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2459 |params, _| {
2460 assert_eq!(params.label, "first_method(…)");
2461 lsp::CompletionItem {
2462 label: "first_method(…)".into(),
2463 detail: Some("fn(&mut self, B) -> C".into()),
2464 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2465 new_text: "first_method($1)".to_string(),
2466 range: lsp::Range::new(
2467 lsp::Position::new(0, 14),
2468 lsp::Position::new(0, 14),
2469 ),
2470 })),
2471 additional_text_edits: Some(vec![lsp::TextEdit {
2472 new_text: "use d::SomeTrait;\n".to_string(),
2473 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2474 }]),
2475 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2476 ..Default::default()
2477 }
2478 },
2479 );
2480
2481 // The additional edit is applied.
2482 buffer_a
2483 .condition(&cx_a, |buffer, _| {
2484 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2485 })
2486 .await;
2487 buffer_b
2488 .condition(&cx_b, |buffer, _| {
2489 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2490 })
2491 .await;
2492 }
2493
2494 #[gpui::test(iterations = 10)]
2495 async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2496 cx_a.foreground().forbid_parking();
2497 let mut lang_registry = Arc::new(LanguageRegistry::new());
2498 let fs = FakeFs::new(cx_a.background());
2499
2500 // Set up a fake language server.
2501 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2502 Arc::get_mut(&mut lang_registry)
2503 .unwrap()
2504 .add(Arc::new(Language::new(
2505 LanguageConfig {
2506 name: "Rust".into(),
2507 path_suffixes: vec!["rs".to_string()],
2508 language_server: Some(language_server_config),
2509 ..Default::default()
2510 },
2511 Some(tree_sitter_rust::language()),
2512 )));
2513
2514 // Connect to a server as 2 clients.
2515 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2516 let client_a = server.create_client(&mut cx_a, "user_a").await;
2517 let client_b = server.create_client(&mut cx_b, "user_b").await;
2518
2519 // Share a project as client A
2520 fs.insert_tree(
2521 "/a",
2522 json!({
2523 ".zed.toml": r#"collaborators = ["user_b"]"#,
2524 "a.rs": "let one = two",
2525 }),
2526 )
2527 .await;
2528 let project_a = cx_a.update(|cx| {
2529 Project::local(
2530 client_a.clone(),
2531 client_a.user_store.clone(),
2532 lang_registry.clone(),
2533 fs.clone(),
2534 cx,
2535 )
2536 });
2537 let (worktree_a, _) = project_a
2538 .update(&mut cx_a, |p, cx| {
2539 p.find_or_create_local_worktree("/a", false, cx)
2540 })
2541 .await
2542 .unwrap();
2543 worktree_a
2544 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2545 .await;
2546 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2547 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2548 project_a
2549 .update(&mut cx_a, |p, cx| p.share(cx))
2550 .await
2551 .unwrap();
2552
2553 // Join the worktree as client B.
2554 let project_b = Project::remote(
2555 project_id,
2556 client_b.clone(),
2557 client_b.user_store.clone(),
2558 lang_registry.clone(),
2559 fs.clone(),
2560 &mut cx_b.to_async(),
2561 )
2562 .await
2563 .unwrap();
2564
2565 let buffer_b = cx_b
2566 .background()
2567 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2568 .await
2569 .unwrap();
2570
2571 let format = project_b.update(&mut cx_b, |project, cx| {
2572 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2573 });
2574
2575 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2576 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2577 Some(vec![
2578 lsp::TextEdit {
2579 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2580 new_text: "h".to_string(),
2581 },
2582 lsp::TextEdit {
2583 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2584 new_text: "y".to_string(),
2585 },
2586 ])
2587 });
2588
2589 format.await.unwrap();
2590 assert_eq!(
2591 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2592 "let honey = two"
2593 );
2594 }
2595
2596 #[gpui::test(iterations = 10)]
2597 async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2598 cx_a.foreground().forbid_parking();
2599 let mut lang_registry = Arc::new(LanguageRegistry::new());
2600 let fs = FakeFs::new(cx_a.background());
2601 fs.insert_tree(
2602 "/root-1",
2603 json!({
2604 ".zed.toml": r#"collaborators = ["user_b"]"#,
2605 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2606 }),
2607 )
2608 .await;
2609 fs.insert_tree(
2610 "/root-2",
2611 json!({
2612 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2613 }),
2614 )
2615 .await;
2616
2617 // Set up a fake language server.
2618 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2619 Arc::get_mut(&mut lang_registry)
2620 .unwrap()
2621 .add(Arc::new(Language::new(
2622 LanguageConfig {
2623 name: "Rust".into(),
2624 path_suffixes: vec!["rs".to_string()],
2625 language_server: Some(language_server_config),
2626 ..Default::default()
2627 },
2628 Some(tree_sitter_rust::language()),
2629 )));
2630
2631 // Connect to a server as 2 clients.
2632 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2633 let client_a = server.create_client(&mut cx_a, "user_a").await;
2634 let client_b = server.create_client(&mut cx_b, "user_b").await;
2635
2636 // Share a project as client A
2637 let project_a = cx_a.update(|cx| {
2638 Project::local(
2639 client_a.clone(),
2640 client_a.user_store.clone(),
2641 lang_registry.clone(),
2642 fs.clone(),
2643 cx,
2644 )
2645 });
2646 let (worktree_a, _) = project_a
2647 .update(&mut cx_a, |p, cx| {
2648 p.find_or_create_local_worktree("/root-1", false, cx)
2649 })
2650 .await
2651 .unwrap();
2652 worktree_a
2653 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2654 .await;
2655 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2656 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2657 project_a
2658 .update(&mut cx_a, |p, cx| p.share(cx))
2659 .await
2660 .unwrap();
2661
2662 // Join the worktree as client B.
2663 let project_b = Project::remote(
2664 project_id,
2665 client_b.clone(),
2666 client_b.user_store.clone(),
2667 lang_registry.clone(),
2668 fs.clone(),
2669 &mut cx_b.to_async(),
2670 )
2671 .await
2672 .unwrap();
2673
2674 // Open the file on client B.
2675 let buffer_b = cx_b
2676 .background()
2677 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2678 .await
2679 .unwrap();
2680
2681 // Request the definition of a symbol as the guest.
2682 let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2683
2684 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2685 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2686 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2687 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2688 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2689 )))
2690 });
2691
2692 let definitions_1 = definitions_1.await.unwrap();
2693 cx_b.read(|cx| {
2694 assert_eq!(definitions_1.len(), 1);
2695 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2696 let target_buffer = definitions_1[0].buffer.read(cx);
2697 assert_eq!(
2698 target_buffer.text(),
2699 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2700 );
2701 assert_eq!(
2702 definitions_1[0].range.to_point(target_buffer),
2703 Point::new(0, 6)..Point::new(0, 9)
2704 );
2705 });
2706
2707 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2708 // the previous call to `definition`.
2709 let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2710 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2711 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2712 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2713 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2714 )))
2715 });
2716
2717 let definitions_2 = definitions_2.await.unwrap();
2718 cx_b.read(|cx| {
2719 assert_eq!(definitions_2.len(), 1);
2720 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2721 let target_buffer = definitions_2[0].buffer.read(cx);
2722 assert_eq!(
2723 target_buffer.text(),
2724 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2725 );
2726 assert_eq!(
2727 definitions_2[0].range.to_point(target_buffer),
2728 Point::new(1, 6)..Point::new(1, 11)
2729 );
2730 });
2731 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2732
2733 cx_b.update(|_| {
2734 drop(definitions_1);
2735 drop(definitions_2);
2736 });
2737 project_b
2738 .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2739 .await;
2740 }
2741
2742 #[gpui::test(iterations = 10)]
2743 async fn test_references(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2744 cx_a.foreground().forbid_parking();
2745 let mut lang_registry = Arc::new(LanguageRegistry::new());
2746 let fs = FakeFs::new(cx_a.background());
2747 fs.insert_tree(
2748 "/root-1",
2749 json!({
2750 ".zed.toml": r#"collaborators = ["user_b"]"#,
2751 "one.rs": "const ONE: usize = 1;",
2752 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2753 }),
2754 )
2755 .await;
2756 fs.insert_tree(
2757 "/root-2",
2758 json!({
2759 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2760 }),
2761 )
2762 .await;
2763
2764 // Set up a fake language server.
2765 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2766 Arc::get_mut(&mut lang_registry)
2767 .unwrap()
2768 .add(Arc::new(Language::new(
2769 LanguageConfig {
2770 name: "Rust".into(),
2771 path_suffixes: vec!["rs".to_string()],
2772 language_server: Some(language_server_config),
2773 ..Default::default()
2774 },
2775 Some(tree_sitter_rust::language()),
2776 )));
2777
2778 // Connect to a server as 2 clients.
2779 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2780 let client_a = server.create_client(&mut cx_a, "user_a").await;
2781 let client_b = server.create_client(&mut cx_b, "user_b").await;
2782
2783 // Share a project as client A
2784 let project_a = cx_a.update(|cx| {
2785 Project::local(
2786 client_a.clone(),
2787 client_a.user_store.clone(),
2788 lang_registry.clone(),
2789 fs.clone(),
2790 cx,
2791 )
2792 });
2793 let (worktree_a, _) = project_a
2794 .update(&mut cx_a, |p, cx| {
2795 p.find_or_create_local_worktree("/root-1", false, cx)
2796 })
2797 .await
2798 .unwrap();
2799 worktree_a
2800 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2801 .await;
2802 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2803 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2804 project_a
2805 .update(&mut cx_a, |p, cx| p.share(cx))
2806 .await
2807 .unwrap();
2808
2809 // Join the worktree as client B.
2810 let project_b = Project::remote(
2811 project_id,
2812 client_b.clone(),
2813 client_b.user_store.clone(),
2814 lang_registry.clone(),
2815 fs.clone(),
2816 &mut cx_b.to_async(),
2817 )
2818 .await
2819 .unwrap();
2820
2821 // Open the file on client B.
2822 let buffer_b = cx_b
2823 .background()
2824 .spawn(project_b.update(&mut cx_b, |p, cx| {
2825 p.open_buffer((worktree_id, "one.rs"), cx)
2826 }))
2827 .await
2828 .unwrap();
2829
2830 // Request references to a symbol as the guest.
2831 let references = project_b.update(&mut cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2832
2833 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2834 fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2835 assert_eq!(
2836 params.text_document_position.text_document.uri.as_str(),
2837 "file:///root-1/one.rs"
2838 );
2839 Some(vec![
2840 lsp::Location {
2841 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2842 range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2843 },
2844 lsp::Location {
2845 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2846 range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2847 },
2848 lsp::Location {
2849 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2850 range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2851 },
2852 ])
2853 });
2854
2855 let references = references.await.unwrap();
2856 cx_b.read(|cx| {
2857 assert_eq!(references.len(), 3);
2858 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2859
2860 let two_buffer = references[0].buffer.read(cx);
2861 let three_buffer = references[2].buffer.read(cx);
2862 assert_eq!(
2863 two_buffer.file().unwrap().path().as_ref(),
2864 Path::new("two.rs")
2865 );
2866 assert_eq!(references[1].buffer, references[0].buffer);
2867 assert_eq!(
2868 three_buffer.file().unwrap().full_path(cx),
2869 Path::new("three.rs")
2870 );
2871
2872 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2873 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2874 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2875 });
2876 }
2877
2878 #[gpui::test(iterations = 10)]
2879 async fn test_document_highlights(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2880 cx_a.foreground().forbid_parking();
2881 let lang_registry = Arc::new(LanguageRegistry::new());
2882 let fs = FakeFs::new(cx_a.background());
2883 fs.insert_tree(
2884 "/root-1",
2885 json!({
2886 ".zed.toml": r#"collaborators = ["user_b"]"#,
2887 "main.rs": "fn double(number: i32) -> i32 { number + number }",
2888 }),
2889 )
2890 .await;
2891
2892 // Set up a fake language server.
2893 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2894 lang_registry.add(Arc::new(Language::new(
2895 LanguageConfig {
2896 name: "Rust".into(),
2897 path_suffixes: vec!["rs".to_string()],
2898 language_server: Some(language_server_config),
2899 ..Default::default()
2900 },
2901 Some(tree_sitter_rust::language()),
2902 )));
2903
2904 // Connect to a server as 2 clients.
2905 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2906 let client_a = server.create_client(&mut cx_a, "user_a").await;
2907 let client_b = server.create_client(&mut cx_b, "user_b").await;
2908
2909 // Share a project as client A
2910 let project_a = cx_a.update(|cx| {
2911 Project::local(
2912 client_a.clone(),
2913 client_a.user_store.clone(),
2914 lang_registry.clone(),
2915 fs.clone(),
2916 cx,
2917 )
2918 });
2919 let (worktree_a, _) = project_a
2920 .update(&mut cx_a, |p, cx| {
2921 p.find_or_create_local_worktree("/root-1", false, cx)
2922 })
2923 .await
2924 .unwrap();
2925 worktree_a
2926 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2927 .await;
2928 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2929 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2930 project_a
2931 .update(&mut cx_a, |p, cx| p.share(cx))
2932 .await
2933 .unwrap();
2934
2935 // Join the worktree as client B.
2936 let project_b = Project::remote(
2937 project_id,
2938 client_b.clone(),
2939 client_b.user_store.clone(),
2940 lang_registry.clone(),
2941 fs.clone(),
2942 &mut cx_b.to_async(),
2943 )
2944 .await
2945 .unwrap();
2946
2947 // Open the file on client B.
2948 let buffer_b = cx_b
2949 .background()
2950 .spawn(project_b.update(&mut cx_b, |p, cx| {
2951 p.open_buffer((worktree_id, "main.rs"), cx)
2952 }))
2953 .await
2954 .unwrap();
2955
2956 // Request document highlights as the guest.
2957 let highlights =
2958 project_b.update(&mut cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2959
2960 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2961 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2962 |params, _| {
2963 assert_eq!(
2964 params
2965 .text_document_position_params
2966 .text_document
2967 .uri
2968 .as_str(),
2969 "file:///root-1/main.rs"
2970 );
2971 assert_eq!(
2972 params.text_document_position_params.position,
2973 lsp::Position::new(0, 34)
2974 );
2975 Some(vec![
2976 lsp::DocumentHighlight {
2977 kind: Some(lsp::DocumentHighlightKind::WRITE),
2978 range: lsp::Range::new(
2979 lsp::Position::new(0, 10),
2980 lsp::Position::new(0, 16),
2981 ),
2982 },
2983 lsp::DocumentHighlight {
2984 kind: Some(lsp::DocumentHighlightKind::READ),
2985 range: lsp::Range::new(
2986 lsp::Position::new(0, 32),
2987 lsp::Position::new(0, 38),
2988 ),
2989 },
2990 lsp::DocumentHighlight {
2991 kind: Some(lsp::DocumentHighlightKind::READ),
2992 range: lsp::Range::new(
2993 lsp::Position::new(0, 41),
2994 lsp::Position::new(0, 47),
2995 ),
2996 },
2997 ])
2998 },
2999 );
3000
3001 let highlights = highlights.await.unwrap();
3002 buffer_b.read_with(&cx_b, |buffer, _| {
3003 let snapshot = buffer.snapshot();
3004
3005 let highlights = highlights
3006 .into_iter()
3007 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3008 .collect::<Vec<_>>();
3009 assert_eq!(
3010 highlights,
3011 &[
3012 (lsp::DocumentHighlightKind::WRITE, 10..16),
3013 (lsp::DocumentHighlightKind::READ, 32..38),
3014 (lsp::DocumentHighlightKind::READ, 41..47)
3015 ]
3016 )
3017 });
3018 }
3019
3020 #[gpui::test(iterations = 10)]
3021 async fn test_project_symbols(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3022 cx_a.foreground().forbid_parking();
3023 let mut lang_registry = Arc::new(LanguageRegistry::new());
3024 let fs = FakeFs::new(cx_a.background());
3025 fs.insert_tree(
3026 "/code",
3027 json!({
3028 "crate-1": {
3029 ".zed.toml": r#"collaborators = ["user_b"]"#,
3030 "one.rs": "const ONE: usize = 1;",
3031 },
3032 "crate-2": {
3033 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3034 },
3035 "private": {
3036 "passwords.txt": "the-password",
3037 }
3038 }),
3039 )
3040 .await;
3041
3042 // Set up a fake language server.
3043 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3044 Arc::get_mut(&mut lang_registry)
3045 .unwrap()
3046 .add(Arc::new(Language::new(
3047 LanguageConfig {
3048 name: "Rust".into(),
3049 path_suffixes: vec!["rs".to_string()],
3050 language_server: Some(language_server_config),
3051 ..Default::default()
3052 },
3053 Some(tree_sitter_rust::language()),
3054 )));
3055
3056 // Connect to a server as 2 clients.
3057 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3058 let client_a = server.create_client(&mut cx_a, "user_a").await;
3059 let client_b = server.create_client(&mut cx_b, "user_b").await;
3060
3061 // Share a project as client A
3062 let project_a = cx_a.update(|cx| {
3063 Project::local(
3064 client_a.clone(),
3065 client_a.user_store.clone(),
3066 lang_registry.clone(),
3067 fs.clone(),
3068 cx,
3069 )
3070 });
3071 let (worktree_a, _) = project_a
3072 .update(&mut cx_a, |p, cx| {
3073 p.find_or_create_local_worktree("/code/crate-1", false, cx)
3074 })
3075 .await
3076 .unwrap();
3077 worktree_a
3078 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3079 .await;
3080 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3081 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3082 project_a
3083 .update(&mut cx_a, |p, cx| p.share(cx))
3084 .await
3085 .unwrap();
3086
3087 // Join the worktree as client B.
3088 let project_b = Project::remote(
3089 project_id,
3090 client_b.clone(),
3091 client_b.user_store.clone(),
3092 lang_registry.clone(),
3093 fs.clone(),
3094 &mut cx_b.to_async(),
3095 )
3096 .await
3097 .unwrap();
3098
3099 // Cause the language server to start.
3100 let _buffer = cx_b
3101 .background()
3102 .spawn(project_b.update(&mut cx_b, |p, cx| {
3103 p.open_buffer((worktree_id, "one.rs"), cx)
3104 }))
3105 .await
3106 .unwrap();
3107
3108 // Request the definition of a symbol as the guest.
3109 let symbols = project_b.update(&mut cx_b, |p, cx| p.symbols("two", cx));
3110 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3111 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3112 #[allow(deprecated)]
3113 Some(vec![lsp::SymbolInformation {
3114 name: "TWO".into(),
3115 location: lsp::Location {
3116 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3117 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3118 },
3119 kind: lsp::SymbolKind::CONSTANT,
3120 tags: None,
3121 container_name: None,
3122 deprecated: None,
3123 }])
3124 });
3125
3126 let symbols = symbols.await.unwrap();
3127 assert_eq!(symbols.len(), 1);
3128 assert_eq!(symbols[0].name, "TWO");
3129
3130 // Open one of the returned symbols.
3131 let buffer_b_2 = project_b
3132 .update(&mut cx_b, |project, cx| {
3133 project.open_buffer_for_symbol(&symbols[0], cx)
3134 })
3135 .await
3136 .unwrap();
3137 buffer_b_2.read_with(&cx_b, |buffer, _| {
3138 assert_eq!(
3139 buffer.file().unwrap().path().as_ref(),
3140 Path::new("../crate-2/two.rs")
3141 );
3142 });
3143
3144 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3145 let mut fake_symbol = symbols[0].clone();
3146 fake_symbol.path = Path::new("/code/secrets").into();
3147 let error = project_b
3148 .update(&mut cx_b, |project, cx| {
3149 project.open_buffer_for_symbol(&fake_symbol, cx)
3150 })
3151 .await
3152 .unwrap_err();
3153 assert!(error.to_string().contains("invalid symbol signature"));
3154 }
3155
3156 #[gpui::test(iterations = 10)]
3157 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3158 mut cx_a: TestAppContext,
3159 mut cx_b: TestAppContext,
3160 mut rng: StdRng,
3161 ) {
3162 cx_a.foreground().forbid_parking();
3163 let mut lang_registry = Arc::new(LanguageRegistry::new());
3164 let fs = FakeFs::new(cx_a.background());
3165 fs.insert_tree(
3166 "/root",
3167 json!({
3168 ".zed.toml": r#"collaborators = ["user_b"]"#,
3169 "a.rs": "const ONE: usize = b::TWO;",
3170 "b.rs": "const TWO: usize = 2",
3171 }),
3172 )
3173 .await;
3174
3175 // Set up a fake language server.
3176 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3177
3178 Arc::get_mut(&mut lang_registry)
3179 .unwrap()
3180 .add(Arc::new(Language::new(
3181 LanguageConfig {
3182 name: "Rust".into(),
3183 path_suffixes: vec!["rs".to_string()],
3184 language_server: Some(language_server_config),
3185 ..Default::default()
3186 },
3187 Some(tree_sitter_rust::language()),
3188 )));
3189
3190 // Connect to a server as 2 clients.
3191 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3192 let client_a = server.create_client(&mut cx_a, "user_a").await;
3193 let client_b = server.create_client(&mut cx_b, "user_b").await;
3194
3195 // Share a project as client A
3196 let project_a = cx_a.update(|cx| {
3197 Project::local(
3198 client_a.clone(),
3199 client_a.user_store.clone(),
3200 lang_registry.clone(),
3201 fs.clone(),
3202 cx,
3203 )
3204 });
3205
3206 let (worktree_a, _) = project_a
3207 .update(&mut cx_a, |p, cx| {
3208 p.find_or_create_local_worktree("/root", false, cx)
3209 })
3210 .await
3211 .unwrap();
3212 worktree_a
3213 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3214 .await;
3215 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3216 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3217 project_a
3218 .update(&mut cx_a, |p, cx| p.share(cx))
3219 .await
3220 .unwrap();
3221
3222 // Join the worktree as client B.
3223 let project_b = Project::remote(
3224 project_id,
3225 client_b.clone(),
3226 client_b.user_store.clone(),
3227 lang_registry.clone(),
3228 fs.clone(),
3229 &mut cx_b.to_async(),
3230 )
3231 .await
3232 .unwrap();
3233
3234 let buffer_b1 = cx_b
3235 .background()
3236 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3237 .await
3238 .unwrap();
3239
3240 let definitions;
3241 let buffer_b2;
3242 if rng.gen() {
3243 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3244 buffer_b2 =
3245 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3246 } else {
3247 buffer_b2 =
3248 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3249 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3250 }
3251
3252 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3253 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3254 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3255 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3256 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3257 )))
3258 });
3259
3260 let buffer_b2 = buffer_b2.await.unwrap();
3261 let definitions = definitions.await.unwrap();
3262 assert_eq!(definitions.len(), 1);
3263 assert_eq!(definitions[0].buffer, buffer_b2);
3264 }
3265
3266 #[gpui::test(iterations = 10)]
3267 async fn test_collaborating_with_code_actions(
3268 mut cx_a: TestAppContext,
3269 mut cx_b: TestAppContext,
3270 ) {
3271 cx_a.foreground().forbid_parking();
3272 let mut lang_registry = Arc::new(LanguageRegistry::new());
3273 let fs = FakeFs::new(cx_a.background());
3274 let mut path_openers_b = Vec::new();
3275 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3276
3277 // Set up a fake language server.
3278 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3279 Arc::get_mut(&mut lang_registry)
3280 .unwrap()
3281 .add(Arc::new(Language::new(
3282 LanguageConfig {
3283 name: "Rust".into(),
3284 path_suffixes: vec!["rs".to_string()],
3285 language_server: Some(language_server_config),
3286 ..Default::default()
3287 },
3288 Some(tree_sitter_rust::language()),
3289 )));
3290
3291 // Connect to a server as 2 clients.
3292 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3293 let client_a = server.create_client(&mut cx_a, "user_a").await;
3294 let client_b = server.create_client(&mut cx_b, "user_b").await;
3295
3296 // Share a project as client A
3297 fs.insert_tree(
3298 "/a",
3299 json!({
3300 ".zed.toml": r#"collaborators = ["user_b"]"#,
3301 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3302 "other.rs": "pub fn foo() -> usize { 4 }",
3303 }),
3304 )
3305 .await;
3306 let project_a = cx_a.update(|cx| {
3307 Project::local(
3308 client_a.clone(),
3309 client_a.user_store.clone(),
3310 lang_registry.clone(),
3311 fs.clone(),
3312 cx,
3313 )
3314 });
3315 let (worktree_a, _) = project_a
3316 .update(&mut cx_a, |p, cx| {
3317 p.find_or_create_local_worktree("/a", false, cx)
3318 })
3319 .await
3320 .unwrap();
3321 worktree_a
3322 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3323 .await;
3324 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3325 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3326 project_a
3327 .update(&mut cx_a, |p, cx| p.share(cx))
3328 .await
3329 .unwrap();
3330
3331 // Join the worktree as client B.
3332 let project_b = Project::remote(
3333 project_id,
3334 client_b.clone(),
3335 client_b.user_store.clone(),
3336 lang_registry.clone(),
3337 fs.clone(),
3338 &mut cx_b.to_async(),
3339 )
3340 .await
3341 .unwrap();
3342 let mut params = cx_b.update(WorkspaceParams::test);
3343 params.languages = lang_registry.clone();
3344 params.client = client_b.client.clone();
3345 params.user_store = client_b.user_store.clone();
3346 params.project = project_b;
3347 params.path_openers = path_openers_b.into();
3348
3349 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3350 let editor_b = workspace_b
3351 .update(&mut cx_b, |workspace, cx| {
3352 workspace.open_path((worktree_id, "main.rs").into(), cx)
3353 })
3354 .await
3355 .unwrap()
3356 .downcast::<Editor>()
3357 .unwrap();
3358
3359 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3360 fake_language_server
3361 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3362 assert_eq!(
3363 params.text_document.uri,
3364 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3365 );
3366 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3367 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3368 None
3369 })
3370 .next()
3371 .await;
3372
3373 // Move cursor to a location that contains code actions.
3374 editor_b.update(&mut cx_b, |editor, cx| {
3375 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3376 cx.focus(&editor_b);
3377 });
3378
3379 fake_language_server
3380 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3381 assert_eq!(
3382 params.text_document.uri,
3383 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3384 );
3385 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3386 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3387
3388 Some(vec![lsp::CodeActionOrCommand::CodeAction(
3389 lsp::CodeAction {
3390 title: "Inline into all callers".to_string(),
3391 edit: Some(lsp::WorkspaceEdit {
3392 changes: Some(
3393 [
3394 (
3395 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3396 vec![lsp::TextEdit::new(
3397 lsp::Range::new(
3398 lsp::Position::new(1, 22),
3399 lsp::Position::new(1, 34),
3400 ),
3401 "4".to_string(),
3402 )],
3403 ),
3404 (
3405 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3406 vec![lsp::TextEdit::new(
3407 lsp::Range::new(
3408 lsp::Position::new(0, 0),
3409 lsp::Position::new(0, 27),
3410 ),
3411 "".to_string(),
3412 )],
3413 ),
3414 ]
3415 .into_iter()
3416 .collect(),
3417 ),
3418 ..Default::default()
3419 }),
3420 data: Some(json!({
3421 "codeActionParams": {
3422 "range": {
3423 "start": {"line": 1, "column": 31},
3424 "end": {"line": 1, "column": 31},
3425 }
3426 }
3427 })),
3428 ..Default::default()
3429 },
3430 )])
3431 })
3432 .next()
3433 .await;
3434
3435 // Toggle code actions and wait for them to display.
3436 editor_b.update(&mut cx_b, |editor, cx| {
3437 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3438 });
3439 editor_b
3440 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3441 .await;
3442
3443 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3444
3445 // Confirming the code action will trigger a resolve request.
3446 let confirm_action = workspace_b
3447 .update(&mut cx_b, |workspace, cx| {
3448 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3449 })
3450 .unwrap();
3451 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3452 lsp::CodeAction {
3453 title: "Inline into all callers".to_string(),
3454 edit: Some(lsp::WorkspaceEdit {
3455 changes: Some(
3456 [
3457 (
3458 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3459 vec![lsp::TextEdit::new(
3460 lsp::Range::new(
3461 lsp::Position::new(1, 22),
3462 lsp::Position::new(1, 34),
3463 ),
3464 "4".to_string(),
3465 )],
3466 ),
3467 (
3468 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3469 vec![lsp::TextEdit::new(
3470 lsp::Range::new(
3471 lsp::Position::new(0, 0),
3472 lsp::Position::new(0, 27),
3473 ),
3474 "".to_string(),
3475 )],
3476 ),
3477 ]
3478 .into_iter()
3479 .collect(),
3480 ),
3481 ..Default::default()
3482 }),
3483 ..Default::default()
3484 }
3485 });
3486
3487 // After the action is confirmed, an editor containing both modified files is opened.
3488 confirm_action.await.unwrap();
3489 let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3490 workspace
3491 .active_item(cx)
3492 .unwrap()
3493 .downcast::<Editor>()
3494 .unwrap()
3495 });
3496 code_action_editor.update(&mut cx_b, |editor, cx| {
3497 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3498 editor.undo(&Undo, cx);
3499 assert_eq!(
3500 editor.text(cx),
3501 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3502 );
3503 editor.redo(&Redo, cx);
3504 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3505 });
3506 }
3507
3508 #[gpui::test(iterations = 10)]
3509 async fn test_collaborating_with_renames(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3510 cx_a.foreground().forbid_parking();
3511 let mut lang_registry = Arc::new(LanguageRegistry::new());
3512 let fs = FakeFs::new(cx_a.background());
3513 let mut path_openers_b = Vec::new();
3514 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3515
3516 // Set up a fake language server.
3517 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3518 Arc::get_mut(&mut lang_registry)
3519 .unwrap()
3520 .add(Arc::new(Language::new(
3521 LanguageConfig {
3522 name: "Rust".into(),
3523 path_suffixes: vec!["rs".to_string()],
3524 language_server: Some(language_server_config),
3525 ..Default::default()
3526 },
3527 Some(tree_sitter_rust::language()),
3528 )));
3529
3530 // Connect to a server as 2 clients.
3531 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3532 let client_a = server.create_client(&mut cx_a, "user_a").await;
3533 let client_b = server.create_client(&mut cx_b, "user_b").await;
3534
3535 // Share a project as client A
3536 fs.insert_tree(
3537 "/dir",
3538 json!({
3539 ".zed.toml": r#"collaborators = ["user_b"]"#,
3540 "one.rs": "const ONE: usize = 1;",
3541 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3542 }),
3543 )
3544 .await;
3545 let project_a = cx_a.update(|cx| {
3546 Project::local(
3547 client_a.clone(),
3548 client_a.user_store.clone(),
3549 lang_registry.clone(),
3550 fs.clone(),
3551 cx,
3552 )
3553 });
3554 let (worktree_a, _) = project_a
3555 .update(&mut cx_a, |p, cx| {
3556 p.find_or_create_local_worktree("/dir", false, cx)
3557 })
3558 .await
3559 .unwrap();
3560 worktree_a
3561 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3562 .await;
3563 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3564 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3565 project_a
3566 .update(&mut cx_a, |p, cx| p.share(cx))
3567 .await
3568 .unwrap();
3569
3570 // Join the worktree as client B.
3571 let project_b = Project::remote(
3572 project_id,
3573 client_b.clone(),
3574 client_b.user_store.clone(),
3575 lang_registry.clone(),
3576 fs.clone(),
3577 &mut cx_b.to_async(),
3578 )
3579 .await
3580 .unwrap();
3581 let mut params = cx_b.update(WorkspaceParams::test);
3582 params.languages = lang_registry.clone();
3583 params.client = client_b.client.clone();
3584 params.user_store = client_b.user_store.clone();
3585 params.project = project_b;
3586 params.path_openers = path_openers_b.into();
3587
3588 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3589 let editor_b = workspace_b
3590 .update(&mut cx_b, |workspace, cx| {
3591 workspace.open_path((worktree_id, "one.rs").into(), cx)
3592 })
3593 .await
3594 .unwrap()
3595 .downcast::<Editor>()
3596 .unwrap();
3597 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3598
3599 // Move cursor to a location that can be renamed.
3600 let prepare_rename = editor_b.update(&mut cx_b, |editor, cx| {
3601 editor.select_ranges([7..7], None, cx);
3602 editor.rename(&Rename, cx).unwrap()
3603 });
3604
3605 fake_language_server
3606 .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3607 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3608 assert_eq!(params.position, lsp::Position::new(0, 7));
3609 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3610 lsp::Position::new(0, 6),
3611 lsp::Position::new(0, 9),
3612 )))
3613 })
3614 .next()
3615 .await
3616 .unwrap();
3617 prepare_rename.await.unwrap();
3618 editor_b.update(&mut cx_b, |editor, cx| {
3619 let rename = editor.pending_rename().unwrap();
3620 let buffer = editor.buffer().read(cx).snapshot(cx);
3621 assert_eq!(
3622 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3623 6..9
3624 );
3625 rename.editor.update(cx, |rename_editor, cx| {
3626 rename_editor.buffer().update(cx, |rename_buffer, cx| {
3627 rename_buffer.edit([0..3], "THREE", cx);
3628 });
3629 });
3630 });
3631
3632 let confirm_rename = workspace_b.update(&mut cx_b, |workspace, cx| {
3633 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3634 });
3635 fake_language_server
3636 .handle_request::<lsp::request::Rename, _>(|params, _| {
3637 assert_eq!(
3638 params.text_document_position.text_document.uri.as_str(),
3639 "file:///dir/one.rs"
3640 );
3641 assert_eq!(
3642 params.text_document_position.position,
3643 lsp::Position::new(0, 6)
3644 );
3645 assert_eq!(params.new_name, "THREE");
3646 Some(lsp::WorkspaceEdit {
3647 changes: Some(
3648 [
3649 (
3650 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3651 vec![lsp::TextEdit::new(
3652 lsp::Range::new(
3653 lsp::Position::new(0, 6),
3654 lsp::Position::new(0, 9),
3655 ),
3656 "THREE".to_string(),
3657 )],
3658 ),
3659 (
3660 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3661 vec![
3662 lsp::TextEdit::new(
3663 lsp::Range::new(
3664 lsp::Position::new(0, 24),
3665 lsp::Position::new(0, 27),
3666 ),
3667 "THREE".to_string(),
3668 ),
3669 lsp::TextEdit::new(
3670 lsp::Range::new(
3671 lsp::Position::new(0, 35),
3672 lsp::Position::new(0, 38),
3673 ),
3674 "THREE".to_string(),
3675 ),
3676 ],
3677 ),
3678 ]
3679 .into_iter()
3680 .collect(),
3681 ),
3682 ..Default::default()
3683 })
3684 })
3685 .next()
3686 .await
3687 .unwrap();
3688 confirm_rename.await.unwrap();
3689
3690 let rename_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3691 workspace
3692 .active_item(cx)
3693 .unwrap()
3694 .downcast::<Editor>()
3695 .unwrap()
3696 });
3697 rename_editor.update(&mut cx_b, |editor, cx| {
3698 assert_eq!(
3699 editor.text(cx),
3700 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3701 );
3702 editor.undo(&Undo, cx);
3703 assert_eq!(
3704 editor.text(cx),
3705 "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3706 );
3707 editor.redo(&Redo, cx);
3708 assert_eq!(
3709 editor.text(cx),
3710 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3711 );
3712 });
3713
3714 // Ensure temporary rename edits cannot be undone/redone.
3715 editor_b.update(&mut cx_b, |editor, cx| {
3716 editor.undo(&Undo, cx);
3717 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3718 editor.undo(&Undo, cx);
3719 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3720 editor.redo(&Redo, cx);
3721 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3722 })
3723 }
3724
3725 #[gpui::test(iterations = 10)]
3726 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3727 cx_a.foreground().forbid_parking();
3728
3729 // Connect to a server as 2 clients.
3730 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3731 let client_a = server.create_client(&mut cx_a, "user_a").await;
3732 let client_b = server.create_client(&mut cx_b, "user_b").await;
3733
3734 // Create an org that includes these 2 users.
3735 let db = &server.app_state.db;
3736 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3737 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3738 .await
3739 .unwrap();
3740 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3741 .await
3742 .unwrap();
3743
3744 // Create a channel that includes all the users.
3745 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3746 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3747 .await
3748 .unwrap();
3749 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3750 .await
3751 .unwrap();
3752 db.create_channel_message(
3753 channel_id,
3754 client_b.current_user_id(&cx_b),
3755 "hello A, it's B.",
3756 OffsetDateTime::now_utc(),
3757 1,
3758 )
3759 .await
3760 .unwrap();
3761
3762 let channels_a = cx_a
3763 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3764 channels_a
3765 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3766 .await;
3767 channels_a.read_with(&cx_a, |list, _| {
3768 assert_eq!(
3769 list.available_channels().unwrap(),
3770 &[ChannelDetails {
3771 id: channel_id.to_proto(),
3772 name: "test-channel".to_string()
3773 }]
3774 )
3775 });
3776 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3777 this.get_channel(channel_id.to_proto(), cx).unwrap()
3778 });
3779 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3780 channel_a
3781 .condition(&cx_a, |channel, _| {
3782 channel_messages(channel)
3783 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3784 })
3785 .await;
3786
3787 let channels_b = cx_b
3788 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3789 channels_b
3790 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3791 .await;
3792 channels_b.read_with(&cx_b, |list, _| {
3793 assert_eq!(
3794 list.available_channels().unwrap(),
3795 &[ChannelDetails {
3796 id: channel_id.to_proto(),
3797 name: "test-channel".to_string()
3798 }]
3799 )
3800 });
3801
3802 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3803 this.get_channel(channel_id.to_proto(), cx).unwrap()
3804 });
3805 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3806 channel_b
3807 .condition(&cx_b, |channel, _| {
3808 channel_messages(channel)
3809 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3810 })
3811 .await;
3812
3813 channel_a
3814 .update(&mut cx_a, |channel, cx| {
3815 channel
3816 .send_message("oh, hi B.".to_string(), cx)
3817 .unwrap()
3818 .detach();
3819 let task = channel.send_message("sup".to_string(), cx).unwrap();
3820 assert_eq!(
3821 channel_messages(channel),
3822 &[
3823 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3824 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3825 ("user_a".to_string(), "sup".to_string(), true)
3826 ]
3827 );
3828 task
3829 })
3830 .await
3831 .unwrap();
3832
3833 channel_b
3834 .condition(&cx_b, |channel, _| {
3835 channel_messages(channel)
3836 == [
3837 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3838 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3839 ("user_a".to_string(), "sup".to_string(), false),
3840 ]
3841 })
3842 .await;
3843
3844 assert_eq!(
3845 server
3846 .state()
3847 .await
3848 .channel(channel_id)
3849 .unwrap()
3850 .connection_ids
3851 .len(),
3852 2
3853 );
3854 cx_b.update(|_| drop(channel_b));
3855 server
3856 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3857 .await;
3858
3859 cx_a.update(|_| drop(channel_a));
3860 server
3861 .condition(|state| state.channel(channel_id).is_none())
3862 .await;
3863 }
3864
3865 #[gpui::test(iterations = 10)]
3866 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3867 cx_a.foreground().forbid_parking();
3868
3869 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3870 let client_a = server.create_client(&mut cx_a, "user_a").await;
3871
3872 let db = &server.app_state.db;
3873 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3874 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3875 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3876 .await
3877 .unwrap();
3878 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3879 .await
3880 .unwrap();
3881
3882 let channels_a = cx_a
3883 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3884 channels_a
3885 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3886 .await;
3887 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3888 this.get_channel(channel_id.to_proto(), cx).unwrap()
3889 });
3890
3891 // Messages aren't allowed to be too long.
3892 channel_a
3893 .update(&mut cx_a, |channel, cx| {
3894 let long_body = "this is long.\n".repeat(1024);
3895 channel.send_message(long_body, cx).unwrap()
3896 })
3897 .await
3898 .unwrap_err();
3899
3900 // Messages aren't allowed to be blank.
3901 channel_a.update(&mut cx_a, |channel, cx| {
3902 channel.send_message(String::new(), cx).unwrap_err()
3903 });
3904
3905 // Leading and trailing whitespace are trimmed.
3906 channel_a
3907 .update(&mut cx_a, |channel, cx| {
3908 channel
3909 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3910 .unwrap()
3911 })
3912 .await
3913 .unwrap();
3914 assert_eq!(
3915 db.get_channel_messages(channel_id, 10, None)
3916 .await
3917 .unwrap()
3918 .iter()
3919 .map(|m| &m.body)
3920 .collect::<Vec<_>>(),
3921 &["surrounded by whitespace"]
3922 );
3923 }
3924
3925 #[gpui::test(iterations = 10)]
3926 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3927 cx_a.foreground().forbid_parking();
3928
3929 // Connect to a server as 2 clients.
3930 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3931 let client_a = server.create_client(&mut cx_a, "user_a").await;
3932 let client_b = server.create_client(&mut cx_b, "user_b").await;
3933 let mut status_b = client_b.status();
3934
3935 // Create an org that includes these 2 users.
3936 let db = &server.app_state.db;
3937 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3938 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3939 .await
3940 .unwrap();
3941 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3942 .await
3943 .unwrap();
3944
3945 // Create a channel that includes all the users.
3946 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3947 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3948 .await
3949 .unwrap();
3950 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3951 .await
3952 .unwrap();
3953 db.create_channel_message(
3954 channel_id,
3955 client_b.current_user_id(&cx_b),
3956 "hello A, it's B.",
3957 OffsetDateTime::now_utc(),
3958 2,
3959 )
3960 .await
3961 .unwrap();
3962
3963 let channels_a = cx_a
3964 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3965 channels_a
3966 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3967 .await;
3968
3969 channels_a.read_with(&cx_a, |list, _| {
3970 assert_eq!(
3971 list.available_channels().unwrap(),
3972 &[ChannelDetails {
3973 id: channel_id.to_proto(),
3974 name: "test-channel".to_string()
3975 }]
3976 )
3977 });
3978 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3979 this.get_channel(channel_id.to_proto(), cx).unwrap()
3980 });
3981 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3982 channel_a
3983 .condition(&cx_a, |channel, _| {
3984 channel_messages(channel)
3985 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3986 })
3987 .await;
3988
3989 let channels_b = cx_b
3990 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3991 channels_b
3992 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3993 .await;
3994 channels_b.read_with(&cx_b, |list, _| {
3995 assert_eq!(
3996 list.available_channels().unwrap(),
3997 &[ChannelDetails {
3998 id: channel_id.to_proto(),
3999 name: "test-channel".to_string()
4000 }]
4001 )
4002 });
4003
4004 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
4005 this.get_channel(channel_id.to_proto(), cx).unwrap()
4006 });
4007 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
4008 channel_b
4009 .condition(&cx_b, |channel, _| {
4010 channel_messages(channel)
4011 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4012 })
4013 .await;
4014
4015 // Disconnect client B, ensuring we can still access its cached channel data.
4016 server.forbid_connections();
4017 server.disconnect_client(client_b.current_user_id(&cx_b));
4018 while !matches!(
4019 status_b.next().await,
4020 Some(client::Status::ReconnectionError { .. })
4021 ) {}
4022
4023 channels_b.read_with(&cx_b, |channels, _| {
4024 assert_eq!(
4025 channels.available_channels().unwrap(),
4026 [ChannelDetails {
4027 id: channel_id.to_proto(),
4028 name: "test-channel".to_string()
4029 }]
4030 )
4031 });
4032 channel_b.read_with(&cx_b, |channel, _| {
4033 assert_eq!(
4034 channel_messages(channel),
4035 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4036 )
4037 });
4038
4039 // Send a message from client B while it is disconnected.
4040 channel_b
4041 .update(&mut cx_b, |channel, cx| {
4042 let task = channel
4043 .send_message("can you see this?".to_string(), cx)
4044 .unwrap();
4045 assert_eq!(
4046 channel_messages(channel),
4047 &[
4048 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4049 ("user_b".to_string(), "can you see this?".to_string(), true)
4050 ]
4051 );
4052 task
4053 })
4054 .await
4055 .unwrap_err();
4056
4057 // Send a message from client A while B is disconnected.
4058 channel_a
4059 .update(&mut cx_a, |channel, cx| {
4060 channel
4061 .send_message("oh, hi B.".to_string(), cx)
4062 .unwrap()
4063 .detach();
4064 let task = channel.send_message("sup".to_string(), cx).unwrap();
4065 assert_eq!(
4066 channel_messages(channel),
4067 &[
4068 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4069 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4070 ("user_a".to_string(), "sup".to_string(), true)
4071 ]
4072 );
4073 task
4074 })
4075 .await
4076 .unwrap();
4077
4078 // Give client B a chance to reconnect.
4079 server.allow_connections();
4080 cx_b.foreground().advance_clock(Duration::from_secs(10));
4081
4082 // Verify that B sees the new messages upon reconnection, as well as the message client B
4083 // sent while offline.
4084 channel_b
4085 .condition(&cx_b, |channel, _| {
4086 channel_messages(channel)
4087 == [
4088 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4089 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4090 ("user_a".to_string(), "sup".to_string(), false),
4091 ("user_b".to_string(), "can you see this?".to_string(), false),
4092 ]
4093 })
4094 .await;
4095
4096 // Ensure client A and B can communicate normally after reconnection.
4097 channel_a
4098 .update(&mut cx_a, |channel, cx| {
4099 channel.send_message("you online?".to_string(), cx).unwrap()
4100 })
4101 .await
4102 .unwrap();
4103 channel_b
4104 .condition(&cx_b, |channel, _| {
4105 channel_messages(channel)
4106 == [
4107 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4108 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4109 ("user_a".to_string(), "sup".to_string(), false),
4110 ("user_b".to_string(), "can you see this?".to_string(), false),
4111 ("user_a".to_string(), "you online?".to_string(), false),
4112 ]
4113 })
4114 .await;
4115
4116 channel_b
4117 .update(&mut cx_b, |channel, cx| {
4118 channel.send_message("yep".to_string(), cx).unwrap()
4119 })
4120 .await
4121 .unwrap();
4122 channel_a
4123 .condition(&cx_a, |channel, _| {
4124 channel_messages(channel)
4125 == [
4126 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4127 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4128 ("user_a".to_string(), "sup".to_string(), false),
4129 ("user_b".to_string(), "can you see this?".to_string(), false),
4130 ("user_a".to_string(), "you online?".to_string(), false),
4131 ("user_b".to_string(), "yep".to_string(), false),
4132 ]
4133 })
4134 .await;
4135 }
4136
4137 #[gpui::test(iterations = 10)]
4138 async fn test_contacts(
4139 mut cx_a: TestAppContext,
4140 mut cx_b: TestAppContext,
4141 mut cx_c: TestAppContext,
4142 ) {
4143 cx_a.foreground().forbid_parking();
4144 let lang_registry = Arc::new(LanguageRegistry::new());
4145 let fs = FakeFs::new(cx_a.background());
4146
4147 // Connect to a server as 3 clients.
4148 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4149 let client_a = server.create_client(&mut cx_a, "user_a").await;
4150 let client_b = server.create_client(&mut cx_b, "user_b").await;
4151 let client_c = server.create_client(&mut cx_c, "user_c").await;
4152
4153 // Share a worktree as client A.
4154 fs.insert_tree(
4155 "/a",
4156 json!({
4157 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4158 }),
4159 )
4160 .await;
4161
4162 let project_a = cx_a.update(|cx| {
4163 Project::local(
4164 client_a.clone(),
4165 client_a.user_store.clone(),
4166 lang_registry.clone(),
4167 fs.clone(),
4168 cx,
4169 )
4170 });
4171 let (worktree_a, _) = project_a
4172 .update(&mut cx_a, |p, cx| {
4173 p.find_or_create_local_worktree("/a", false, cx)
4174 })
4175 .await
4176 .unwrap();
4177 worktree_a
4178 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4179 .await;
4180
4181 client_a
4182 .user_store
4183 .condition(&cx_a, |user_store, _| {
4184 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4185 })
4186 .await;
4187 client_b
4188 .user_store
4189 .condition(&cx_b, |user_store, _| {
4190 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4191 })
4192 .await;
4193 client_c
4194 .user_store
4195 .condition(&cx_c, |user_store, _| {
4196 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4197 })
4198 .await;
4199
4200 let project_id = project_a
4201 .update(&mut cx_a, |project, _| project.next_remote_id())
4202 .await;
4203 project_a
4204 .update(&mut cx_a, |project, cx| project.share(cx))
4205 .await
4206 .unwrap();
4207
4208 let _project_b = Project::remote(
4209 project_id,
4210 client_b.clone(),
4211 client_b.user_store.clone(),
4212 lang_registry.clone(),
4213 fs.clone(),
4214 &mut cx_b.to_async(),
4215 )
4216 .await
4217 .unwrap();
4218
4219 client_a
4220 .user_store
4221 .condition(&cx_a, |user_store, _| {
4222 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4223 })
4224 .await;
4225 client_b
4226 .user_store
4227 .condition(&cx_b, |user_store, _| {
4228 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4229 })
4230 .await;
4231 client_c
4232 .user_store
4233 .condition(&cx_c, |user_store, _| {
4234 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4235 })
4236 .await;
4237
4238 project_a
4239 .condition(&cx_a, |project, _| {
4240 project.collaborators().contains_key(&client_b.peer_id)
4241 })
4242 .await;
4243
4244 cx_a.update(move |_| drop(project_a));
4245 client_a
4246 .user_store
4247 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4248 .await;
4249 client_b
4250 .user_store
4251 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4252 .await;
4253 client_c
4254 .user_store
4255 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4256 .await;
4257
4258 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4259 user_store
4260 .contacts()
4261 .iter()
4262 .map(|contact| {
4263 let worktrees = contact
4264 .projects
4265 .iter()
4266 .map(|p| {
4267 (
4268 p.worktree_root_names[0].as_str(),
4269 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4270 )
4271 })
4272 .collect();
4273 (contact.user.github_login.as_str(), worktrees)
4274 })
4275 .collect()
4276 }
4277 }
4278
4279 #[gpui::test(iterations = 100)]
4280 async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
4281 cx.foreground().forbid_parking();
4282 let max_peers = env::var("MAX_PEERS")
4283 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4284 .unwrap_or(5);
4285 let max_operations = env::var("OPERATIONS")
4286 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4287 .unwrap_or(10);
4288
4289 let rng = Arc::new(Mutex::new(rng));
4290
4291 let guest_lang_registry = Arc::new(LanguageRegistry::new());
4292 let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4293
4294 let fs = FakeFs::new(cx.background());
4295 fs.insert_tree(
4296 "/_collab",
4297 json!({
4298 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4299 }),
4300 )
4301 .await;
4302
4303 let operations = Rc::new(Cell::new(0));
4304 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4305 let mut clients = Vec::new();
4306
4307 let mut next_entity_id = 100000;
4308 let mut host_cx = TestAppContext::new(
4309 cx.foreground_platform(),
4310 cx.platform(),
4311 cx.foreground(),
4312 cx.background(),
4313 cx.font_cache(),
4314 next_entity_id,
4315 );
4316 let host = server.create_client(&mut host_cx, "host").await;
4317 let host_project = host_cx.update(|cx| {
4318 Project::local(
4319 host.client.clone(),
4320 host.user_store.clone(),
4321 Arc::new(LanguageRegistry::new()),
4322 fs.clone(),
4323 cx,
4324 )
4325 });
4326 let host_project_id = host_project
4327 .update(&mut host_cx, |p, _| p.next_remote_id())
4328 .await;
4329
4330 let (collab_worktree, _) = host_project
4331 .update(&mut host_cx, |project, cx| {
4332 project.find_or_create_local_worktree("/_collab", false, cx)
4333 })
4334 .await
4335 .unwrap();
4336 collab_worktree
4337 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4338 .await;
4339 host_project
4340 .update(&mut host_cx, |project, cx| project.share(cx))
4341 .await
4342 .unwrap();
4343
4344 clients.push(cx.foreground().spawn(host.simulate_host(
4345 host_project.clone(),
4346 language_server_config,
4347 operations.clone(),
4348 max_operations,
4349 rng.clone(),
4350 host_cx.clone(),
4351 )));
4352
4353 while operations.get() < max_operations {
4354 cx.background().simulate_random_delay().await;
4355 if clients.len() < max_peers && rng.lock().gen_bool(0.05) {
4356 operations.set(operations.get() + 1);
4357
4358 let guest_id = clients.len();
4359 log::info!("Adding guest {}", guest_id);
4360 next_entity_id += 100000;
4361 let mut guest_cx = TestAppContext::new(
4362 cx.foreground_platform(),
4363 cx.platform(),
4364 cx.foreground(),
4365 cx.background(),
4366 cx.font_cache(),
4367 next_entity_id,
4368 );
4369 let guest = server
4370 .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4371 .await;
4372 let guest_project = Project::remote(
4373 host_project_id,
4374 guest.client.clone(),
4375 guest.user_store.clone(),
4376 guest_lang_registry.clone(),
4377 fs.clone(),
4378 &mut guest_cx.to_async(),
4379 )
4380 .await
4381 .unwrap();
4382 clients.push(cx.foreground().spawn(guest.simulate_guest(
4383 guest_id,
4384 guest_project,
4385 operations.clone(),
4386 max_operations,
4387 rng.clone(),
4388 guest_cx,
4389 )));
4390
4391 log::info!("Guest {} added", guest_id);
4392 }
4393 }
4394
4395 let clients = futures::future::join_all(clients).await;
4396 cx.foreground().run_until_parked();
4397
4398 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4399 project
4400 .worktrees(cx)
4401 .map(|worktree| {
4402 let snapshot = worktree.read(cx).snapshot();
4403 (snapshot.id(), snapshot)
4404 })
4405 .collect::<BTreeMap<_, _>>()
4406 });
4407
4408 for (guest_client, guest_cx) in clients.iter().skip(1) {
4409 let guest_id = guest_client.client.id();
4410 let worktree_snapshots =
4411 guest_client
4412 .project
4413 .as_ref()
4414 .unwrap()
4415 .read_with(guest_cx, |project, cx| {
4416 project
4417 .worktrees(cx)
4418 .map(|worktree| {
4419 let worktree = worktree.read(cx);
4420 assert!(
4421 !worktree.as_remote().unwrap().has_pending_updates(),
4422 "Guest {} worktree {:?} contains deferred updates",
4423 guest_id,
4424 worktree.id()
4425 );
4426 (worktree.id(), worktree.snapshot())
4427 })
4428 .collect::<BTreeMap<_, _>>()
4429 });
4430
4431 assert_eq!(
4432 worktree_snapshots.keys().collect::<Vec<_>>(),
4433 host_worktree_snapshots.keys().collect::<Vec<_>>(),
4434 "guest {} has different worktrees than the host",
4435 guest_id
4436 );
4437 for (id, host_snapshot) in &host_worktree_snapshots {
4438 let guest_snapshot = &worktree_snapshots[id];
4439 assert_eq!(
4440 guest_snapshot.root_name(),
4441 host_snapshot.root_name(),
4442 "guest {} has different root name than the host for worktree {}",
4443 guest_id,
4444 id
4445 );
4446 assert_eq!(
4447 guest_snapshot.entries(false).collect::<Vec<_>>(),
4448 host_snapshot.entries(false).collect::<Vec<_>>(),
4449 "guest {} has different snapshot than the host for worktree {}",
4450 guest_id,
4451 id
4452 );
4453 }
4454
4455 guest_client
4456 .project
4457 .as_ref()
4458 .unwrap()
4459 .read_with(guest_cx, |project, _| {
4460 assert!(
4461 !project.has_buffered_operations(),
4462 "guest {} has buffered operations ",
4463 guest_id,
4464 );
4465 });
4466
4467 for guest_buffer in &guest_client.buffers {
4468 let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4469 let host_buffer = host_project.read_with(&host_cx, |project, _| {
4470 project
4471 .shared_buffer(guest_client.peer_id, buffer_id)
4472 .expect(&format!(
4473 "host doest not have buffer for guest:{}, peer:{}, id:{}",
4474 guest_id, guest_client.peer_id, buffer_id
4475 ))
4476 });
4477 assert_eq!(
4478 guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4479 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4480 "guest {} buffer {} differs from the host's buffer",
4481 guest_id,
4482 buffer_id,
4483 );
4484 }
4485 }
4486 }
4487
4488 struct TestServer {
4489 peer: Arc<Peer>,
4490 app_state: Arc<AppState>,
4491 server: Arc<Server>,
4492 foreground: Rc<executor::Foreground>,
4493 notifications: mpsc::UnboundedReceiver<()>,
4494 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4495 forbid_connections: Arc<AtomicBool>,
4496 _test_db: TestDb,
4497 }
4498
4499 impl TestServer {
4500 async fn start(
4501 foreground: Rc<executor::Foreground>,
4502 background: Arc<executor::Background>,
4503 ) -> Self {
4504 let test_db = TestDb::fake(background);
4505 let app_state = Self::build_app_state(&test_db).await;
4506 let peer = Peer::new();
4507 let notifications = mpsc::unbounded();
4508 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4509 Self {
4510 peer,
4511 app_state,
4512 server,
4513 foreground,
4514 notifications: notifications.1,
4515 connection_killers: Default::default(),
4516 forbid_connections: Default::default(),
4517 _test_db: test_db,
4518 }
4519 }
4520
4521 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4522 let http = FakeHttpClient::with_404_response();
4523 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4524 let client_name = name.to_string();
4525 let mut client = Client::new(http.clone());
4526 let server = self.server.clone();
4527 let connection_killers = self.connection_killers.clone();
4528 let forbid_connections = self.forbid_connections.clone();
4529 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4530
4531 Arc::get_mut(&mut client)
4532 .unwrap()
4533 .override_authenticate(move |cx| {
4534 cx.spawn(|_| async move {
4535 let access_token = "the-token".to_string();
4536 Ok(Credentials {
4537 user_id: user_id.0 as u64,
4538 access_token,
4539 })
4540 })
4541 })
4542 .override_establish_connection(move |credentials, cx| {
4543 assert_eq!(credentials.user_id, user_id.0 as u64);
4544 assert_eq!(credentials.access_token, "the-token");
4545
4546 let server = server.clone();
4547 let connection_killers = connection_killers.clone();
4548 let forbid_connections = forbid_connections.clone();
4549 let client_name = client_name.clone();
4550 let connection_id_tx = connection_id_tx.clone();
4551 cx.spawn(move |cx| async move {
4552 if forbid_connections.load(SeqCst) {
4553 Err(EstablishConnectionError::other(anyhow!(
4554 "server is forbidding connections"
4555 )))
4556 } else {
4557 let (client_conn, server_conn, kill_conn) =
4558 Connection::in_memory(cx.background());
4559 connection_killers.lock().insert(user_id, kill_conn);
4560 cx.background()
4561 .spawn(server.handle_connection(
4562 server_conn,
4563 client_name,
4564 user_id,
4565 Some(connection_id_tx),
4566 cx.background(),
4567 ))
4568 .detach();
4569 Ok(client_conn)
4570 }
4571 })
4572 });
4573
4574 client
4575 .authenticate_and_connect(&cx.to_async())
4576 .await
4577 .unwrap();
4578
4579 Channel::init(&client);
4580 Project::init(&client);
4581
4582 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4583 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4584 let mut authed_user =
4585 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4586 while authed_user.next().await.unwrap().is_none() {}
4587
4588 TestClient {
4589 client,
4590 peer_id,
4591 user_store,
4592 project: Default::default(),
4593 buffers: Default::default(),
4594 }
4595 }
4596
4597 fn disconnect_client(&self, user_id: UserId) {
4598 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4599 let _ = kill_conn.try_send(Some(()));
4600 }
4601 }
4602
4603 fn forbid_connections(&self) {
4604 self.forbid_connections.store(true, SeqCst);
4605 }
4606
4607 fn allow_connections(&self) {
4608 self.forbid_connections.store(false, SeqCst);
4609 }
4610
4611 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4612 let mut config = Config::default();
4613 config.session_secret = "a".repeat(32);
4614 config.database_url = test_db.url.clone();
4615 let github_client = github::AppClient::test();
4616 Arc::new(AppState {
4617 db: test_db.db().clone(),
4618 handlebars: Default::default(),
4619 auth_client: auth::build_client("", ""),
4620 repo_client: github::RepoClient::test(&github_client),
4621 github_client,
4622 config,
4623 })
4624 }
4625
4626 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4627 self.server.store.read()
4628 }
4629
4630 async fn condition<F>(&mut self, mut predicate: F)
4631 where
4632 F: FnMut(&Store) -> bool,
4633 {
4634 async_std::future::timeout(Duration::from_millis(500), async {
4635 while !(predicate)(&*self.server.store.read()) {
4636 self.foreground.start_waiting();
4637 self.notifications.next().await;
4638 self.foreground.finish_waiting();
4639 }
4640 })
4641 .await
4642 .expect("condition timed out");
4643 }
4644 }
4645
4646 impl Drop for TestServer {
4647 fn drop(&mut self) {
4648 self.peer.reset();
4649 }
4650 }
4651
4652 struct TestClient {
4653 client: Arc<Client>,
4654 pub peer_id: PeerId,
4655 pub user_store: ModelHandle<UserStore>,
4656 project: Option<ModelHandle<Project>>,
4657 buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4658 }
4659
4660 impl Deref for TestClient {
4661 type Target = Arc<Client>;
4662
4663 fn deref(&self) -> &Self::Target {
4664 &self.client
4665 }
4666 }
4667
4668 impl TestClient {
4669 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4670 UserId::from_proto(
4671 self.user_store
4672 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4673 )
4674 }
4675
4676 fn simulate_host(
4677 mut self,
4678 project: ModelHandle<Project>,
4679 mut language_server_config: LanguageServerConfig,
4680 operations: Rc<Cell<usize>>,
4681 max_operations: usize,
4682 rng: Arc<Mutex<StdRng>>,
4683 mut cx: TestAppContext,
4684 ) -> impl Future<Output = (Self, TestAppContext)> {
4685 let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4686
4687 // Set up a fake language server.
4688 language_server_config.set_fake_initializer({
4689 let rng = rng.clone();
4690 let files = files.clone();
4691 let project = project.clone();
4692 move |fake_server| {
4693 fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4694 Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4695 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4696 range: lsp::Range::new(
4697 lsp::Position::new(0, 0),
4698 lsp::Position::new(0, 0),
4699 ),
4700 new_text: "the-new-text".to_string(),
4701 })),
4702 ..Default::default()
4703 }]))
4704 });
4705
4706 fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4707 Some(vec![lsp::CodeActionOrCommand::CodeAction(
4708 lsp::CodeAction {
4709 title: "the-code-action".to_string(),
4710 ..Default::default()
4711 },
4712 )])
4713 });
4714
4715 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4716 |params, _| {
4717 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4718 params.position,
4719 params.position,
4720 )))
4721 },
4722 );
4723
4724 fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4725 let files = files.clone();
4726 let rng = rng.clone();
4727 move |_, _| {
4728 let files = files.lock();
4729 let mut rng = rng.lock();
4730 let count = rng.gen_range::<usize, _>(1..3);
4731 Some(lsp::GotoDefinitionResponse::Array(
4732 (0..count)
4733 .map(|_| {
4734 let file = files.choose(&mut *rng).unwrap().as_path();
4735 lsp::Location {
4736 uri: lsp::Url::from_file_path(file).unwrap(),
4737 range: Default::default(),
4738 }
4739 })
4740 .collect(),
4741 ))
4742 }
4743 });
4744
4745 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4746 let rng = rng.clone();
4747 let project = project.clone();
4748 move |params, mut cx| {
4749 project.update(&mut cx, |project, cx| {
4750 let path = params
4751 .text_document_position_params
4752 .text_document
4753 .uri
4754 .to_file_path()
4755 .unwrap();
4756 let (worktree, relative_path) =
4757 project.find_local_worktree(&path, cx)?;
4758 let project_path =
4759 ProjectPath::from((worktree.read(cx).id(), relative_path));
4760 let buffer = project.get_open_buffer(&project_path, cx)?.read(cx);
4761
4762 let mut highlights = Vec::new();
4763 let highlight_count = rng.lock().gen_range(1..=5);
4764 let mut prev_end = 0;
4765 for _ in 0..highlight_count {
4766 let range =
4767 buffer.random_byte_range(prev_end, &mut *rng.lock());
4768 let start =
4769 buffer.offset_to_point_utf16(range.start).to_lsp_position();
4770 let end =
4771 buffer.offset_to_point_utf16(range.end).to_lsp_position();
4772 highlights.push(lsp::DocumentHighlight {
4773 range: lsp::Range::new(start, end),
4774 kind: Some(lsp::DocumentHighlightKind::READ),
4775 });
4776 prev_end = range.end;
4777 }
4778 Some(highlights)
4779 })
4780 }
4781 });
4782 }
4783 });
4784
4785 project.update(&mut cx, |project, _| {
4786 project.languages().add(Arc::new(Language::new(
4787 LanguageConfig {
4788 name: "Rust".into(),
4789 path_suffixes: vec!["rs".to_string()],
4790 language_server: Some(language_server_config),
4791 ..Default::default()
4792 },
4793 None,
4794 )));
4795 });
4796
4797 async move {
4798 let fs = project.read_with(&cx, |project, _| project.fs().clone());
4799 while operations.get() < max_operations {
4800 operations.set(operations.get() + 1);
4801
4802 let distribution = rng.lock().gen_range::<usize, _>(0..100);
4803 match distribution {
4804 0..=20 if !files.lock().is_empty() => {
4805 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4806 let mut path = path.as_path();
4807 while let Some(parent_path) = path.parent() {
4808 path = parent_path;
4809 if rng.lock().gen() {
4810 break;
4811 }
4812 }
4813
4814 log::info!("Host: find/create local worktree {:?}", path);
4815 project
4816 .update(&mut cx, |project, cx| {
4817 project.find_or_create_local_worktree(path, false, cx)
4818 })
4819 .await
4820 .unwrap();
4821 }
4822 10..=80 if !files.lock().is_empty() => {
4823 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4824 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4825 let (worktree, path) = project
4826 .update(&mut cx, |project, cx| {
4827 project.find_or_create_local_worktree(file, false, cx)
4828 })
4829 .await
4830 .unwrap();
4831 let project_path =
4832 worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4833 log::info!("Host: opening path {:?}", project_path);
4834 let buffer = project
4835 .update(&mut cx, |project, cx| {
4836 project.open_buffer(project_path, cx)
4837 })
4838 .await
4839 .unwrap();
4840 self.buffers.insert(buffer.clone());
4841 buffer
4842 } else {
4843 self.buffers
4844 .iter()
4845 .choose(&mut *rng.lock())
4846 .unwrap()
4847 .clone()
4848 };
4849
4850 if rng.lock().gen_bool(0.1) {
4851 cx.update(|cx| {
4852 log::info!(
4853 "Host: dropping buffer {:?}",
4854 buffer.read(cx).file().unwrap().full_path(cx)
4855 );
4856 self.buffers.remove(&buffer);
4857 drop(buffer);
4858 });
4859 } else {
4860 buffer.update(&mut cx, |buffer, cx| {
4861 log::info!(
4862 "Host: updating buffer {:?}",
4863 buffer.file().unwrap().full_path(cx)
4864 );
4865 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4866 });
4867 }
4868 }
4869 _ => loop {
4870 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4871 let mut path = PathBuf::new();
4872 path.push("/");
4873 for _ in 0..path_component_count {
4874 let letter = rng.lock().gen_range(b'a'..=b'z');
4875 path.push(std::str::from_utf8(&[letter]).unwrap());
4876 }
4877 path.set_extension("rs");
4878 let parent_path = path.parent().unwrap();
4879
4880 log::info!("Host: creating file {:?}", path,);
4881
4882 if fs.create_dir(&parent_path).await.is_ok()
4883 && fs.create_file(&path, Default::default()).await.is_ok()
4884 {
4885 files.lock().push(path);
4886 break;
4887 } else {
4888 log::info!("Host: cannot create file");
4889 }
4890 },
4891 }
4892
4893 cx.background().simulate_random_delay().await;
4894 }
4895
4896 self.project = Some(project);
4897 (self, cx)
4898 }
4899 }
4900
4901 pub async fn simulate_guest(
4902 mut self,
4903 guest_id: usize,
4904 project: ModelHandle<Project>,
4905 operations: Rc<Cell<usize>>,
4906 max_operations: usize,
4907 rng: Arc<Mutex<StdRng>>,
4908 mut cx: TestAppContext,
4909 ) -> (Self, TestAppContext) {
4910 while operations.get() < max_operations {
4911 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4912 let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4913 project
4914 .worktrees(&cx)
4915 .filter(|worktree| {
4916 worktree.read(cx).entries(false).any(|e| e.is_file())
4917 })
4918 .choose(&mut *rng.lock())
4919 }) {
4920 worktree
4921 } else {
4922 cx.background().simulate_random_delay().await;
4923 continue;
4924 };
4925
4926 operations.set(operations.get() + 1);
4927 let project_path = worktree.read_with(&cx, |worktree, _| {
4928 let entry = worktree
4929 .entries(false)
4930 .filter(|e| e.is_file())
4931 .choose(&mut *rng.lock())
4932 .unwrap();
4933 (worktree.id(), entry.path.clone())
4934 });
4935 log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4936 let buffer = project
4937 .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4938 .await
4939 .unwrap();
4940 self.buffers.insert(buffer.clone());
4941 buffer
4942 } else {
4943 operations.set(operations.get() + 1);
4944
4945 self.buffers
4946 .iter()
4947 .choose(&mut *rng.lock())
4948 .unwrap()
4949 .clone()
4950 };
4951
4952 let choice = rng.lock().gen_range(0..100);
4953 match choice {
4954 0..=9 => {
4955 cx.update(|cx| {
4956 log::info!(
4957 "Guest {}: dropping buffer {:?}",
4958 guest_id,
4959 buffer.read(cx).file().unwrap().full_path(cx)
4960 );
4961 self.buffers.remove(&buffer);
4962 drop(buffer);
4963 });
4964 }
4965 10..=19 => {
4966 let completions = project.update(&mut cx, |project, cx| {
4967 log::info!(
4968 "Guest {}: requesting completions for buffer {:?}",
4969 guest_id,
4970 buffer.read(cx).file().unwrap().full_path(cx)
4971 );
4972 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4973 project.completions(&buffer, offset, cx)
4974 });
4975 let completions = cx.background().spawn(async move {
4976 completions.await.expect("completions request failed");
4977 });
4978 if rng.lock().gen_bool(0.3) {
4979 log::info!("Guest {}: detaching completions request", guest_id);
4980 completions.detach();
4981 } else {
4982 completions.await;
4983 }
4984 }
4985 20..=29 => {
4986 let code_actions = project.update(&mut cx, |project, cx| {
4987 log::info!(
4988 "Guest {}: requesting code actions for buffer {:?}",
4989 guest_id,
4990 buffer.read(cx).file().unwrap().full_path(cx)
4991 );
4992 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4993 project.code_actions(&buffer, range, cx)
4994 });
4995 let code_actions = cx.background().spawn(async move {
4996 code_actions.await.expect("code actions request failed");
4997 });
4998 if rng.lock().gen_bool(0.3) {
4999 log::info!("Guest {}: detaching code actions request", guest_id);
5000 code_actions.detach();
5001 } else {
5002 code_actions.await;
5003 }
5004 }
5005 30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
5006 let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
5007 log::info!(
5008 "Guest {}: saving buffer {:?}",
5009 guest_id,
5010 buffer.file().unwrap().full_path(cx)
5011 );
5012 (buffer.version(), buffer.save(cx))
5013 });
5014 let save = cx.spawn(|cx| async move {
5015 let (saved_version, _) = save.await.expect("save request failed");
5016 buffer.read_with(&cx, |buffer, _| {
5017 assert!(buffer.version().observed_all(&saved_version));
5018 assert!(saved_version.observed_all(&requested_version));
5019 });
5020 });
5021 if rng.lock().gen_bool(0.3) {
5022 log::info!("Guest {}: detaching save request", guest_id);
5023 save.detach();
5024 } else {
5025 save.await;
5026 }
5027 }
5028 40..=45 => {
5029 let prepare_rename = project.update(&mut cx, |project, cx| {
5030 log::info!(
5031 "Guest {}: preparing rename for buffer {:?}",
5032 guest_id,
5033 buffer.read(cx).file().unwrap().full_path(cx)
5034 );
5035 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5036 project.prepare_rename(buffer, offset, cx)
5037 });
5038 let prepare_rename = cx.background().spawn(async move {
5039 prepare_rename.await.expect("prepare rename request failed");
5040 });
5041 if rng.lock().gen_bool(0.3) {
5042 log::info!("Guest {}: detaching prepare rename request", guest_id);
5043 prepare_rename.detach();
5044 } else {
5045 prepare_rename.await;
5046 }
5047 }
5048 46..=49 => {
5049 let definitions = project.update(&mut cx, |project, cx| {
5050 log::info!(
5051 "Guest {}: requesting defintions for buffer {:?}",
5052 guest_id,
5053 buffer.read(cx).file().unwrap().full_path(cx)
5054 );
5055 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5056 project.definition(&buffer, offset, cx)
5057 });
5058 let definitions = cx.background().spawn(async move {
5059 definitions.await.expect("definitions request failed");
5060 });
5061 if rng.lock().gen_bool(0.3) {
5062 log::info!("Guest {}: detaching definitions request", guest_id);
5063 definitions.detach();
5064 } else {
5065 definitions.await;
5066 }
5067 }
5068 50..=55 => {
5069 let highlights = project.update(&mut cx, |project, cx| {
5070 log::info!(
5071 "Guest {}: requesting highlights for buffer {:?}",
5072 guest_id,
5073 buffer.read(cx).file().unwrap().full_path(cx)
5074 );
5075 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5076 project.document_highlights(&buffer, offset, cx)
5077 });
5078 let highlights = cx.background().spawn(async move {
5079 highlights.await.expect("highlights request failed");
5080 });
5081 if rng.lock().gen_bool(0.3) {
5082 log::info!("Guest {}: detaching highlights request", guest_id);
5083 highlights.detach();
5084 } else {
5085 highlights.await;
5086 }
5087 }
5088 _ => {
5089 buffer.update(&mut cx, |buffer, cx| {
5090 log::info!(
5091 "Guest {}: updating buffer {:?}",
5092 guest_id,
5093 buffer.file().unwrap().full_path(cx)
5094 );
5095 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5096 });
5097 }
5098 }
5099 cx.background().simulate_random_delay().await;
5100 }
5101
5102 self.project = Some(project);
5103 (self, cx)
5104 }
5105 }
5106
5107 impl Executor for Arc<gpui::executor::Background> {
5108 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5109 self.spawn(future).detach();
5110 }
5111 }
5112
5113 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5114 channel
5115 .messages()
5116 .cursor::<()>()
5117 .map(|m| {
5118 (
5119 m.sender.github_login.clone(),
5120 m.body.clone(),
5121 m.is_pending(),
5122 )
5123 })
5124 .collect()
5125 }
5126
5127 struct EmptyView;
5128
5129 impl gpui::Entity for EmptyView {
5130 type Event = ();
5131 }
5132
5133 impl gpui::View for EmptyView {
5134 fn ui_name() -> &'static str {
5135 "empty view"
5136 }
5137
5138 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5139 gpui::Element::boxed(gpui::elements::Empty)
5140 }
5141 }
5142}