1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use rpc::{
15 proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
16 Connection, ConnectionId, Peer, TypedEnvelope,
17};
18use sha1::{Digest as _, Sha1};
19use std::{any::TypeId, future::Future, sync::Arc, time::Instant};
20use store::{Store, Worktree};
21use surf::StatusCode;
22use tide::log;
23use tide::{
24 http::headers::{HeaderName, CONNECTION, UPGRADE},
25 Request, Response,
26};
27use time::OffsetDateTime;
28
29type MessageHandler = Box<
30 dyn Send
31 + Sync
32 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
33>;
34
35pub struct Server {
36 peer: Arc<Peer>,
37 store: RwLock<Store>,
38 app_state: Arc<AppState>,
39 handlers: HashMap<TypeId, MessageHandler>,
40 notifications: Option<mpsc::UnboundedSender<()>>,
41}
42
43pub trait Executor {
44 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
45}
46
47pub struct RealExecutor;
48
49const MESSAGE_COUNT_PER_PAGE: usize = 100;
50const MAX_MESSAGE_LEN: usize = 1024;
51
52impl Server {
53 pub fn new(
54 app_state: Arc<AppState>,
55 peer: Arc<Peer>,
56 notifications: Option<mpsc::UnboundedSender<()>>,
57 ) -> Arc<Self> {
58 let mut server = Self {
59 peer,
60 app_state,
61 store: Default::default(),
62 handlers: Default::default(),
63 notifications,
64 };
65
66 server
67 .add_request_handler(Server::ping)
68 .add_request_handler(Server::register_project)
69 .add_message_handler(Server::unregister_project)
70 .add_request_handler(Server::share_project)
71 .add_message_handler(Server::unshare_project)
72 .add_request_handler(Server::join_project)
73 .add_message_handler(Server::leave_project)
74 .add_request_handler(Server::register_worktree)
75 .add_message_handler(Server::unregister_worktree)
76 .add_request_handler(Server::update_worktree)
77 .add_message_handler(Server::update_diagnostic_summary)
78 .add_message_handler(Server::disk_based_diagnostics_updating)
79 .add_message_handler(Server::disk_based_diagnostics_updated)
80 .add_request_handler(Server::get_definition)
81 .add_request_handler(Server::get_references)
82 .add_request_handler(Server::search_project)
83 .add_request_handler(Server::get_document_highlights)
84 .add_request_handler(Server::get_project_symbols)
85 .add_request_handler(Server::open_buffer_for_symbol)
86 .add_request_handler(Server::open_buffer)
87 .add_message_handler(Server::close_buffer)
88 .add_request_handler(Server::update_buffer)
89 .add_message_handler(Server::update_buffer_file)
90 .add_message_handler(Server::buffer_reloaded)
91 .add_message_handler(Server::buffer_saved)
92 .add_request_handler(Server::save_buffer)
93 .add_request_handler(Server::format_buffers)
94 .add_request_handler(Server::get_completions)
95 .add_request_handler(Server::apply_additional_edits_for_completion)
96 .add_request_handler(Server::get_code_actions)
97 .add_request_handler(Server::apply_code_action)
98 .add_request_handler(Server::prepare_rename)
99 .add_request_handler(Server::perform_rename)
100 .add_request_handler(Server::get_channels)
101 .add_request_handler(Server::get_users)
102 .add_request_handler(Server::join_channel)
103 .add_message_handler(Server::leave_channel)
104 .add_request_handler(Server::send_channel_message)
105 .add_request_handler(Server::get_channel_messages);
106
107 Arc::new(server)
108 }
109
110 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
111 where
112 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
113 Fut: 'static + Send + Future<Output = tide::Result<()>>,
114 M: EnvelopedMessage,
115 {
116 let prev_handler = self.handlers.insert(
117 TypeId::of::<M>(),
118 Box::new(move |server, envelope| {
119 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
120 (handler)(server, *envelope).boxed()
121 }),
122 );
123 if prev_handler.is_some() {
124 panic!("registered a handler for the same message twice");
125 }
126 self
127 }
128
129 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
130 where
131 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
132 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
133 M: RequestMessage,
134 {
135 self.add_message_handler(move |server, envelope| {
136 let receipt = envelope.receipt();
137 let response = (handler)(server.clone(), envelope);
138 async move {
139 match response.await {
140 Ok(response) => {
141 server.peer.respond(receipt, response)?;
142 Ok(())
143 }
144 Err(error) => {
145 server.peer.respond_with_error(
146 receipt,
147 proto::Error {
148 message: error.to_string(),
149 },
150 )?;
151 Err(error)
152 }
153 }
154 }
155 })
156 }
157
158 pub fn handle_connection<E: Executor>(
159 self: &Arc<Self>,
160 connection: Connection,
161 addr: String,
162 user_id: UserId,
163 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
164 executor: E,
165 ) -> impl Future<Output = ()> {
166 let mut this = self.clone();
167 async move {
168 let (connection_id, handle_io, mut incoming_rx) =
169 this.peer.add_connection(connection).await;
170
171 if let Some(send_connection_id) = send_connection_id.as_mut() {
172 let _ = send_connection_id.send(connection_id).await;
173 }
174
175 this.state_mut().add_connection(connection_id, user_id);
176 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
177 log::error!("error updating contacts for {:?}: {}", user_id, err);
178 }
179
180 let handle_io = handle_io.fuse();
181 futures::pin_mut!(handle_io);
182 loop {
183 let next_message = incoming_rx.next().fuse();
184 futures::pin_mut!(next_message);
185 futures::select_biased! {
186 result = handle_io => {
187 if let Err(err) = result {
188 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
189 }
190 break;
191 }
192 message = next_message => {
193 if let Some(message) = message {
194 let start_time = Instant::now();
195 let type_name = message.payload_type_name();
196 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
197 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
198 let notifications = this.notifications.clone();
199 let is_background = message.is_background();
200 let handle_message = (handler)(this.clone(), message);
201 let handle_message = async move {
202 if let Err(err) = handle_message.await {
203 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
204 } else {
205 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
206 }
207 if let Some(mut notifications) = notifications {
208 let _ = notifications.send(()).await;
209 }
210 };
211 if is_background {
212 executor.spawn_detached(handle_message);
213 } else {
214 handle_message.await;
215 }
216 } else {
217 log::warn!("unhandled message: {}", type_name);
218 }
219 } else {
220 log::info!("rpc connection closed {:?}", addr);
221 break;
222 }
223 }
224 }
225 }
226
227 if let Err(err) = this.sign_out(connection_id).await {
228 log::error!("error signing out connection {:?} - {:?}", addr, err);
229 }
230 }
231 }
232
233 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
234 self.peer.disconnect(connection_id);
235 let removed_connection = self.state_mut().remove_connection(connection_id)?;
236
237 for (project_id, project) in removed_connection.hosted_projects {
238 if let Some(share) = project.share {
239 broadcast(
240 connection_id,
241 share.guests.keys().copied().collect(),
242 |conn_id| {
243 self.peer
244 .send(conn_id, proto::UnshareProject { project_id })
245 },
246 )?;
247 }
248 }
249
250 for (project_id, peer_ids) in removed_connection.guest_project_ids {
251 broadcast(connection_id, peer_ids, |conn_id| {
252 self.peer.send(
253 conn_id,
254 proto::RemoveProjectCollaborator {
255 project_id,
256 peer_id: connection_id.0,
257 },
258 )
259 })?;
260 }
261
262 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
263 Ok(())
264 }
265
266 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
267 Ok(proto::Ack {})
268 }
269
270 async fn register_project(
271 mut self: Arc<Server>,
272 request: TypedEnvelope<proto::RegisterProject>,
273 ) -> tide::Result<proto::RegisterProjectResponse> {
274 let project_id = {
275 let mut state = self.state_mut();
276 let user_id = state.user_id_for_connection(request.sender_id)?;
277 state.register_project(request.sender_id, user_id)
278 };
279 Ok(proto::RegisterProjectResponse { project_id })
280 }
281
282 async fn unregister_project(
283 mut self: Arc<Server>,
284 request: TypedEnvelope<proto::UnregisterProject>,
285 ) -> tide::Result<()> {
286 let project = self
287 .state_mut()
288 .unregister_project(request.payload.project_id, request.sender_id)?;
289 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
290 Ok(())
291 }
292
293 async fn share_project(
294 mut self: Arc<Server>,
295 request: TypedEnvelope<proto::ShareProject>,
296 ) -> tide::Result<proto::Ack> {
297 self.state_mut()
298 .share_project(request.payload.project_id, request.sender_id);
299 Ok(proto::Ack {})
300 }
301
302 async fn unshare_project(
303 mut self: Arc<Server>,
304 request: TypedEnvelope<proto::UnshareProject>,
305 ) -> tide::Result<()> {
306 let project_id = request.payload.project_id;
307 let project = self
308 .state_mut()
309 .unshare_project(project_id, request.sender_id)?;
310
311 broadcast(request.sender_id, project.connection_ids, |conn_id| {
312 self.peer
313 .send(conn_id, proto::UnshareProject { project_id })
314 })?;
315 self.update_contacts_for_users(&project.authorized_user_ids)?;
316 Ok(())
317 }
318
319 async fn join_project(
320 mut self: Arc<Server>,
321 request: TypedEnvelope<proto::JoinProject>,
322 ) -> tide::Result<proto::JoinProjectResponse> {
323 let project_id = request.payload.project_id;
324
325 let user_id = self.state().user_id_for_connection(request.sender_id)?;
326 let (response, connection_ids, contact_user_ids) = self
327 .state_mut()
328 .join_project(request.sender_id, user_id, project_id)
329 .and_then(|joined| {
330 let share = joined.project.share()?;
331 let peer_count = share.guests.len();
332 let mut collaborators = Vec::with_capacity(peer_count);
333 collaborators.push(proto::Collaborator {
334 peer_id: joined.project.host_connection_id.0,
335 replica_id: 0,
336 user_id: joined.project.host_user_id.to_proto(),
337 });
338 let worktrees = share
339 .worktrees
340 .iter()
341 .filter_map(|(id, shared_worktree)| {
342 let worktree = joined.project.worktrees.get(&id)?;
343 Some(proto::Worktree {
344 id: *id,
345 root_name: worktree.root_name.clone(),
346 entries: shared_worktree.entries.values().cloned().collect(),
347 diagnostic_summaries: shared_worktree
348 .diagnostic_summaries
349 .values()
350 .cloned()
351 .collect(),
352 weak: worktree.weak,
353 })
354 })
355 .collect();
356 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
357 if *peer_conn_id != request.sender_id {
358 collaborators.push(proto::Collaborator {
359 peer_id: peer_conn_id.0,
360 replica_id: *peer_replica_id as u32,
361 user_id: peer_user_id.to_proto(),
362 });
363 }
364 }
365 let response = proto::JoinProjectResponse {
366 worktrees,
367 replica_id: joined.replica_id as u32,
368 collaborators,
369 };
370 let connection_ids = joined.project.connection_ids();
371 let contact_user_ids = joined.project.authorized_user_ids();
372 Ok((response, connection_ids, contact_user_ids))
373 })?;
374
375 broadcast(request.sender_id, connection_ids, |conn_id| {
376 self.peer.send(
377 conn_id,
378 proto::AddProjectCollaborator {
379 project_id,
380 collaborator: Some(proto::Collaborator {
381 peer_id: request.sender_id.0,
382 replica_id: response.replica_id,
383 user_id: user_id.to_proto(),
384 }),
385 },
386 )
387 })?;
388 self.update_contacts_for_users(&contact_user_ids)?;
389 Ok(response)
390 }
391
392 async fn leave_project(
393 mut self: Arc<Server>,
394 request: TypedEnvelope<proto::LeaveProject>,
395 ) -> tide::Result<()> {
396 let sender_id = request.sender_id;
397 let project_id = request.payload.project_id;
398 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
399
400 broadcast(sender_id, worktree.connection_ids, |conn_id| {
401 self.peer.send(
402 conn_id,
403 proto::RemoveProjectCollaborator {
404 project_id,
405 peer_id: sender_id.0,
406 },
407 )
408 })?;
409 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
410
411 Ok(())
412 }
413
414 async fn register_worktree(
415 mut self: Arc<Server>,
416 request: TypedEnvelope<proto::RegisterWorktree>,
417 ) -> tide::Result<proto::Ack> {
418 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
419
420 let mut contact_user_ids = HashSet::default();
421 contact_user_ids.insert(host_user_id);
422 for github_login in &request.payload.authorized_logins {
423 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
424 contact_user_ids.insert(contact_user_id);
425 }
426
427 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
428 let guest_connection_ids;
429 {
430 let mut state = self.state_mut();
431 guest_connection_ids = state
432 .read_project(request.payload.project_id, request.sender_id)?
433 .guest_connection_ids();
434 state.register_worktree(
435 request.payload.project_id,
436 request.payload.worktree_id,
437 request.sender_id,
438 Worktree {
439 authorized_user_ids: contact_user_ids.clone(),
440 root_name: request.payload.root_name.clone(),
441 weak: request.payload.weak,
442 },
443 )?;
444 }
445 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
446 self.peer
447 .forward_send(request.sender_id, connection_id, request.payload.clone())
448 })?;
449 self.update_contacts_for_users(&contact_user_ids)?;
450 Ok(proto::Ack {})
451 }
452
453 async fn unregister_worktree(
454 mut self: Arc<Server>,
455 request: TypedEnvelope<proto::UnregisterWorktree>,
456 ) -> tide::Result<()> {
457 let project_id = request.payload.project_id;
458 let worktree_id = request.payload.worktree_id;
459 let (worktree, guest_connection_ids) =
460 self.state_mut()
461 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
462 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
463 self.peer.send(
464 conn_id,
465 proto::UnregisterWorktree {
466 project_id,
467 worktree_id,
468 },
469 )
470 })?;
471 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
472 Ok(())
473 }
474
475 async fn update_worktree(
476 mut self: Arc<Server>,
477 request: TypedEnvelope<proto::UpdateWorktree>,
478 ) -> tide::Result<proto::Ack> {
479 let connection_ids = self.state_mut().update_worktree(
480 request.sender_id,
481 request.payload.project_id,
482 request.payload.worktree_id,
483 &request.payload.removed_entries,
484 &request.payload.updated_entries,
485 )?;
486
487 broadcast(request.sender_id, connection_ids, |connection_id| {
488 self.peer
489 .forward_send(request.sender_id, connection_id, request.payload.clone())
490 })?;
491
492 Ok(proto::Ack {})
493 }
494
495 async fn update_diagnostic_summary(
496 mut self: Arc<Server>,
497 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
498 ) -> tide::Result<()> {
499 let summary = request
500 .payload
501 .summary
502 .clone()
503 .ok_or_else(|| anyhow!("invalid summary"))?;
504 let receiver_ids = self.state_mut().update_diagnostic_summary(
505 request.payload.project_id,
506 request.payload.worktree_id,
507 request.sender_id,
508 summary,
509 )?;
510
511 broadcast(request.sender_id, receiver_ids, |connection_id| {
512 self.peer
513 .forward_send(request.sender_id, connection_id, request.payload.clone())
514 })?;
515 Ok(())
516 }
517
518 async fn disk_based_diagnostics_updating(
519 self: Arc<Server>,
520 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
521 ) -> tide::Result<()> {
522 let receiver_ids = self
523 .state()
524 .project_connection_ids(request.payload.project_id, request.sender_id)?;
525 broadcast(request.sender_id, receiver_ids, |connection_id| {
526 self.peer
527 .forward_send(request.sender_id, connection_id, request.payload.clone())
528 })?;
529 Ok(())
530 }
531
532 async fn disk_based_diagnostics_updated(
533 self: Arc<Server>,
534 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
535 ) -> tide::Result<()> {
536 let receiver_ids = self
537 .state()
538 .project_connection_ids(request.payload.project_id, request.sender_id)?;
539 broadcast(request.sender_id, receiver_ids, |connection_id| {
540 self.peer
541 .forward_send(request.sender_id, connection_id, request.payload.clone())
542 })?;
543 Ok(())
544 }
545
546 async fn get_definition(
547 self: Arc<Server>,
548 request: TypedEnvelope<proto::GetDefinition>,
549 ) -> tide::Result<proto::GetDefinitionResponse> {
550 let host_connection_id = self
551 .state()
552 .read_project(request.payload.project_id, request.sender_id)?
553 .host_connection_id;
554 Ok(self
555 .peer
556 .forward_request(request.sender_id, host_connection_id, request.payload)
557 .await?)
558 }
559
560 async fn get_references(
561 self: Arc<Server>,
562 request: TypedEnvelope<proto::GetReferences>,
563 ) -> tide::Result<proto::GetReferencesResponse> {
564 let host_connection_id = self
565 .state()
566 .read_project(request.payload.project_id, request.sender_id)?
567 .host_connection_id;
568 Ok(self
569 .peer
570 .forward_request(request.sender_id, host_connection_id, request.payload)
571 .await?)
572 }
573
574 async fn search_project(
575 self: Arc<Server>,
576 request: TypedEnvelope<proto::SearchProject>,
577 ) -> tide::Result<proto::SearchProjectResponse> {
578 let host_connection_id = self
579 .state()
580 .read_project(request.payload.project_id, request.sender_id)?
581 .host_connection_id;
582 Ok(self
583 .peer
584 .forward_request(request.sender_id, host_connection_id, request.payload)
585 .await?)
586 }
587
588 async fn get_document_highlights(
589 self: Arc<Server>,
590 request: TypedEnvelope<proto::GetDocumentHighlights>,
591 ) -> tide::Result<proto::GetDocumentHighlightsResponse> {
592 let host_connection_id = self
593 .state()
594 .read_project(request.payload.project_id, request.sender_id)?
595 .host_connection_id;
596 Ok(self
597 .peer
598 .forward_request(request.sender_id, host_connection_id, request.payload)
599 .await?)
600 }
601
602 async fn get_project_symbols(
603 self: Arc<Server>,
604 request: TypedEnvelope<proto::GetProjectSymbols>,
605 ) -> tide::Result<proto::GetProjectSymbolsResponse> {
606 let host_connection_id = self
607 .state()
608 .read_project(request.payload.project_id, request.sender_id)?
609 .host_connection_id;
610 Ok(self
611 .peer
612 .forward_request(request.sender_id, host_connection_id, request.payload)
613 .await?)
614 }
615
616 async fn open_buffer_for_symbol(
617 self: Arc<Server>,
618 request: TypedEnvelope<proto::OpenBufferForSymbol>,
619 ) -> tide::Result<proto::OpenBufferForSymbolResponse> {
620 let host_connection_id = self
621 .state()
622 .read_project(request.payload.project_id, request.sender_id)?
623 .host_connection_id;
624 Ok(self
625 .peer
626 .forward_request(request.sender_id, host_connection_id, request.payload)
627 .await?)
628 }
629
630 async fn open_buffer(
631 self: Arc<Server>,
632 request: TypedEnvelope<proto::OpenBuffer>,
633 ) -> tide::Result<proto::OpenBufferResponse> {
634 let host_connection_id = self
635 .state()
636 .read_project(request.payload.project_id, request.sender_id)?
637 .host_connection_id;
638 Ok(self
639 .peer
640 .forward_request(request.sender_id, host_connection_id, request.payload)
641 .await?)
642 }
643
644 async fn close_buffer(
645 self: Arc<Server>,
646 request: TypedEnvelope<proto::CloseBuffer>,
647 ) -> tide::Result<()> {
648 let host_connection_id = self
649 .state()
650 .read_project(request.payload.project_id, request.sender_id)?
651 .host_connection_id;
652 self.peer
653 .forward_send(request.sender_id, host_connection_id, request.payload)?;
654 Ok(())
655 }
656
657 async fn save_buffer(
658 self: Arc<Server>,
659 request: TypedEnvelope<proto::SaveBuffer>,
660 ) -> tide::Result<proto::BufferSaved> {
661 let host;
662 let mut guests;
663 {
664 let state = self.state();
665 let project = state.read_project(request.payload.project_id, request.sender_id)?;
666 host = project.host_connection_id;
667 guests = project.guest_connection_ids()
668 }
669
670 let response = self
671 .peer
672 .forward_request(request.sender_id, host, request.payload.clone())
673 .await?;
674
675 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
676 broadcast(host, guests, |conn_id| {
677 self.peer.forward_send(host, conn_id, response.clone())
678 })?;
679
680 Ok(response)
681 }
682
683 async fn format_buffers(
684 self: Arc<Server>,
685 request: TypedEnvelope<proto::FormatBuffers>,
686 ) -> tide::Result<proto::FormatBuffersResponse> {
687 let host = self
688 .state()
689 .read_project(request.payload.project_id, request.sender_id)?
690 .host_connection_id;
691 Ok(self
692 .peer
693 .forward_request(request.sender_id, host, request.payload.clone())
694 .await?)
695 }
696
697 async fn get_completions(
698 self: Arc<Server>,
699 request: TypedEnvelope<proto::GetCompletions>,
700 ) -> tide::Result<proto::GetCompletionsResponse> {
701 let host = self
702 .state()
703 .read_project(request.payload.project_id, request.sender_id)?
704 .host_connection_id;
705 Ok(self
706 .peer
707 .forward_request(request.sender_id, host, request.payload.clone())
708 .await?)
709 }
710
711 async fn apply_additional_edits_for_completion(
712 self: Arc<Server>,
713 request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
714 ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
715 let host = self
716 .state()
717 .read_project(request.payload.project_id, request.sender_id)?
718 .host_connection_id;
719 Ok(self
720 .peer
721 .forward_request(request.sender_id, host, request.payload.clone())
722 .await?)
723 }
724
725 async fn get_code_actions(
726 self: Arc<Server>,
727 request: TypedEnvelope<proto::GetCodeActions>,
728 ) -> tide::Result<proto::GetCodeActionsResponse> {
729 let host = self
730 .state()
731 .read_project(request.payload.project_id, request.sender_id)?
732 .host_connection_id;
733 Ok(self
734 .peer
735 .forward_request(request.sender_id, host, request.payload.clone())
736 .await?)
737 }
738
739 async fn apply_code_action(
740 self: Arc<Server>,
741 request: TypedEnvelope<proto::ApplyCodeAction>,
742 ) -> tide::Result<proto::ApplyCodeActionResponse> {
743 let host = self
744 .state()
745 .read_project(request.payload.project_id, request.sender_id)?
746 .host_connection_id;
747 Ok(self
748 .peer
749 .forward_request(request.sender_id, host, request.payload.clone())
750 .await?)
751 }
752
753 async fn prepare_rename(
754 self: Arc<Server>,
755 request: TypedEnvelope<proto::PrepareRename>,
756 ) -> tide::Result<proto::PrepareRenameResponse> {
757 let host = self
758 .state()
759 .read_project(request.payload.project_id, request.sender_id)?
760 .host_connection_id;
761 Ok(self
762 .peer
763 .forward_request(request.sender_id, host, request.payload.clone())
764 .await?)
765 }
766
767 async fn perform_rename(
768 self: Arc<Server>,
769 request: TypedEnvelope<proto::PerformRename>,
770 ) -> tide::Result<proto::PerformRenameResponse> {
771 let host = self
772 .state()
773 .read_project(request.payload.project_id, request.sender_id)?
774 .host_connection_id;
775 Ok(self
776 .peer
777 .forward_request(request.sender_id, host, request.payload.clone())
778 .await?)
779 }
780
781 async fn update_buffer(
782 self: Arc<Server>,
783 request: TypedEnvelope<proto::UpdateBuffer>,
784 ) -> tide::Result<proto::Ack> {
785 let receiver_ids = self
786 .state()
787 .project_connection_ids(request.payload.project_id, request.sender_id)?;
788 broadcast(request.sender_id, receiver_ids, |connection_id| {
789 self.peer
790 .forward_send(request.sender_id, connection_id, request.payload.clone())
791 })?;
792 Ok(proto::Ack {})
793 }
794
795 async fn update_buffer_file(
796 self: Arc<Server>,
797 request: TypedEnvelope<proto::UpdateBufferFile>,
798 ) -> tide::Result<()> {
799 let receiver_ids = self
800 .state()
801 .project_connection_ids(request.payload.project_id, request.sender_id)?;
802 broadcast(request.sender_id, receiver_ids, |connection_id| {
803 self.peer
804 .forward_send(request.sender_id, connection_id, request.payload.clone())
805 })?;
806 Ok(())
807 }
808
809 async fn buffer_reloaded(
810 self: Arc<Server>,
811 request: TypedEnvelope<proto::BufferReloaded>,
812 ) -> tide::Result<()> {
813 let receiver_ids = self
814 .state()
815 .project_connection_ids(request.payload.project_id, request.sender_id)?;
816 broadcast(request.sender_id, receiver_ids, |connection_id| {
817 self.peer
818 .forward_send(request.sender_id, connection_id, request.payload.clone())
819 })?;
820 Ok(())
821 }
822
823 async fn buffer_saved(
824 self: Arc<Server>,
825 request: TypedEnvelope<proto::BufferSaved>,
826 ) -> tide::Result<()> {
827 let receiver_ids = self
828 .state()
829 .project_connection_ids(request.payload.project_id, request.sender_id)?;
830 broadcast(request.sender_id, receiver_ids, |connection_id| {
831 self.peer
832 .forward_send(request.sender_id, connection_id, request.payload.clone())
833 })?;
834 Ok(())
835 }
836
837 async fn get_channels(
838 self: Arc<Server>,
839 request: TypedEnvelope<proto::GetChannels>,
840 ) -> tide::Result<proto::GetChannelsResponse> {
841 let user_id = self.state().user_id_for_connection(request.sender_id)?;
842 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
843 Ok(proto::GetChannelsResponse {
844 channels: channels
845 .into_iter()
846 .map(|chan| proto::Channel {
847 id: chan.id.to_proto(),
848 name: chan.name,
849 })
850 .collect(),
851 })
852 }
853
854 async fn get_users(
855 self: Arc<Server>,
856 request: TypedEnvelope<proto::GetUsers>,
857 ) -> tide::Result<proto::GetUsersResponse> {
858 let user_ids = request
859 .payload
860 .user_ids
861 .into_iter()
862 .map(UserId::from_proto)
863 .collect();
864 let users = self
865 .app_state
866 .db
867 .get_users_by_ids(user_ids)
868 .await?
869 .into_iter()
870 .map(|user| proto::User {
871 id: user.id.to_proto(),
872 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
873 github_login: user.github_login,
874 })
875 .collect();
876 Ok(proto::GetUsersResponse { users })
877 }
878
879 fn update_contacts_for_users<'a>(
880 self: &Arc<Server>,
881 user_ids: impl IntoIterator<Item = &'a UserId>,
882 ) -> anyhow::Result<()> {
883 let mut result = Ok(());
884 let state = self.state();
885 for user_id in user_ids {
886 let contacts = state.contacts_for_user(*user_id);
887 for connection_id in state.connection_ids_for_user(*user_id) {
888 if let Err(error) = self.peer.send(
889 connection_id,
890 proto::UpdateContacts {
891 contacts: contacts.clone(),
892 },
893 ) {
894 result = Err(error);
895 }
896 }
897 }
898 result
899 }
900
901 async fn join_channel(
902 mut self: Arc<Self>,
903 request: TypedEnvelope<proto::JoinChannel>,
904 ) -> tide::Result<proto::JoinChannelResponse> {
905 let user_id = self.state().user_id_for_connection(request.sender_id)?;
906 let channel_id = ChannelId::from_proto(request.payload.channel_id);
907 if !self
908 .app_state
909 .db
910 .can_user_access_channel(user_id, channel_id)
911 .await?
912 {
913 Err(anyhow!("access denied"))?;
914 }
915
916 self.state_mut().join_channel(request.sender_id, channel_id);
917 let messages = self
918 .app_state
919 .db
920 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
921 .await?
922 .into_iter()
923 .map(|msg| proto::ChannelMessage {
924 id: msg.id.to_proto(),
925 body: msg.body,
926 timestamp: msg.sent_at.unix_timestamp() as u64,
927 sender_id: msg.sender_id.to_proto(),
928 nonce: Some(msg.nonce.as_u128().into()),
929 })
930 .collect::<Vec<_>>();
931 Ok(proto::JoinChannelResponse {
932 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
933 messages,
934 })
935 }
936
937 async fn leave_channel(
938 mut self: Arc<Self>,
939 request: TypedEnvelope<proto::LeaveChannel>,
940 ) -> tide::Result<()> {
941 let user_id = self.state().user_id_for_connection(request.sender_id)?;
942 let channel_id = ChannelId::from_proto(request.payload.channel_id);
943 if !self
944 .app_state
945 .db
946 .can_user_access_channel(user_id, channel_id)
947 .await?
948 {
949 Err(anyhow!("access denied"))?;
950 }
951
952 self.state_mut()
953 .leave_channel(request.sender_id, channel_id);
954
955 Ok(())
956 }
957
958 async fn send_channel_message(
959 self: Arc<Self>,
960 request: TypedEnvelope<proto::SendChannelMessage>,
961 ) -> tide::Result<proto::SendChannelMessageResponse> {
962 let channel_id = ChannelId::from_proto(request.payload.channel_id);
963 let user_id;
964 let connection_ids;
965 {
966 let state = self.state();
967 user_id = state.user_id_for_connection(request.sender_id)?;
968 connection_ids = state.channel_connection_ids(channel_id)?;
969 }
970
971 // Validate the message body.
972 let body = request.payload.body.trim().to_string();
973 if body.len() > MAX_MESSAGE_LEN {
974 return Err(anyhow!("message is too long"))?;
975 }
976 if body.is_empty() {
977 return Err(anyhow!("message can't be blank"))?;
978 }
979
980 let timestamp = OffsetDateTime::now_utc();
981 let nonce = request
982 .payload
983 .nonce
984 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
985
986 let message_id = self
987 .app_state
988 .db
989 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
990 .await?
991 .to_proto();
992 let message = proto::ChannelMessage {
993 sender_id: user_id.to_proto(),
994 id: message_id,
995 body,
996 timestamp: timestamp.unix_timestamp() as u64,
997 nonce: Some(nonce),
998 };
999 broadcast(request.sender_id, connection_ids, |conn_id| {
1000 self.peer.send(
1001 conn_id,
1002 proto::ChannelMessageSent {
1003 channel_id: channel_id.to_proto(),
1004 message: Some(message.clone()),
1005 },
1006 )
1007 })?;
1008 Ok(proto::SendChannelMessageResponse {
1009 message: Some(message),
1010 })
1011 }
1012
1013 async fn get_channel_messages(
1014 self: Arc<Self>,
1015 request: TypedEnvelope<proto::GetChannelMessages>,
1016 ) -> tide::Result<proto::GetChannelMessagesResponse> {
1017 let user_id = self.state().user_id_for_connection(request.sender_id)?;
1018 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1019 if !self
1020 .app_state
1021 .db
1022 .can_user_access_channel(user_id, channel_id)
1023 .await?
1024 {
1025 Err(anyhow!("access denied"))?;
1026 }
1027
1028 let messages = self
1029 .app_state
1030 .db
1031 .get_channel_messages(
1032 channel_id,
1033 MESSAGE_COUNT_PER_PAGE,
1034 Some(MessageId::from_proto(request.payload.before_message_id)),
1035 )
1036 .await?
1037 .into_iter()
1038 .map(|msg| proto::ChannelMessage {
1039 id: msg.id.to_proto(),
1040 body: msg.body,
1041 timestamp: msg.sent_at.unix_timestamp() as u64,
1042 sender_id: msg.sender_id.to_proto(),
1043 nonce: Some(msg.nonce.as_u128().into()),
1044 })
1045 .collect::<Vec<_>>();
1046
1047 Ok(proto::GetChannelMessagesResponse {
1048 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1049 messages,
1050 })
1051 }
1052
1053 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1054 self.store.read()
1055 }
1056
1057 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1058 self.store.write()
1059 }
1060}
1061
1062impl Executor for RealExecutor {
1063 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1064 task::spawn(future);
1065 }
1066}
1067
1068fn broadcast<F>(
1069 sender_id: ConnectionId,
1070 receiver_ids: Vec<ConnectionId>,
1071 mut f: F,
1072) -> anyhow::Result<()>
1073where
1074 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1075{
1076 let mut result = Ok(());
1077 for receiver_id in receiver_ids {
1078 if receiver_id != sender_id {
1079 if let Err(error) = f(receiver_id) {
1080 if result.is_ok() {
1081 result = Err(error);
1082 }
1083 }
1084 }
1085 }
1086 result
1087}
1088
1089pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1090 let server = Server::new(app.state().clone(), rpc.clone(), None);
1091 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1092 let server = server.clone();
1093 async move {
1094 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1095
1096 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1097 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1098 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1099 let client_protocol_version: Option<u32> = request
1100 .header("X-Zed-Protocol-Version")
1101 .and_then(|v| v.as_str().parse().ok());
1102
1103 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1104 return Ok(Response::new(StatusCode::UpgradeRequired));
1105 }
1106
1107 let header = match request.header("Sec-Websocket-Key") {
1108 Some(h) => h.as_str(),
1109 None => return Err(anyhow!("expected sec-websocket-key"))?,
1110 };
1111
1112 let user_id = process_auth_header(&request).await?;
1113
1114 let mut response = Response::new(StatusCode::SwitchingProtocols);
1115 response.insert_header(UPGRADE, "websocket");
1116 response.insert_header(CONNECTION, "Upgrade");
1117 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1118 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1119 response.insert_header("Sec-Websocket-Version", "13");
1120
1121 let http_res: &mut tide::http::Response = response.as_mut();
1122 let upgrade_receiver = http_res.recv_upgrade().await;
1123 let addr = request.remote().unwrap_or("unknown").to_string();
1124 task::spawn(async move {
1125 if let Some(stream) = upgrade_receiver.await {
1126 server
1127 .handle_connection(
1128 Connection::new(
1129 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1130 ),
1131 addr,
1132 user_id,
1133 None,
1134 RealExecutor,
1135 )
1136 .await;
1137 }
1138 });
1139
1140 Ok(response)
1141 }
1142 });
1143}
1144
1145fn header_contains_ignore_case<T>(
1146 request: &tide::Request<T>,
1147 header_name: HeaderName,
1148 value: &str,
1149) -> bool {
1150 request
1151 .header(header_name)
1152 .map(|h| {
1153 h.as_str()
1154 .split(',')
1155 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1156 })
1157 .unwrap_or(false)
1158}
1159
1160#[cfg(test)]
1161mod tests {
1162 use super::*;
1163 use crate::{
1164 auth,
1165 db::{tests::TestDb, UserId},
1166 github, AppState, Config,
1167 };
1168 use ::rpc::Peer;
1169 use collections::BTreeMap;
1170 use gpui::{executor, ModelHandle, TestAppContext};
1171 use parking_lot::Mutex;
1172 use postage::{sink::Sink, watch};
1173 use rand::prelude::*;
1174 use rpc::PeerId;
1175 use serde_json::json;
1176 use sqlx::types::time::OffsetDateTime;
1177 use std::{
1178 cell::Cell,
1179 env,
1180 ops::Deref,
1181 path::{Path, PathBuf},
1182 rc::Rc,
1183 sync::{
1184 atomic::{AtomicBool, Ordering::SeqCst},
1185 Arc,
1186 },
1187 time::Duration,
1188 };
1189 use zed::{
1190 client::{
1191 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1192 EstablishConnectionError, UserStore,
1193 },
1194 editor::{
1195 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1196 Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1197 },
1198 fs::{FakeFs, Fs as _},
1199 language::{
1200 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1201 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1202 },
1203 lsp,
1204 project::{search::SearchQuery, DiagnosticSummary, Project, ProjectPath},
1205 workspace::{Settings, Workspace, WorkspaceParams},
1206 };
1207
1208 #[cfg(test)]
1209 #[ctor::ctor]
1210 fn init_logger() {
1211 if std::env::var("RUST_LOG").is_ok() {
1212 env_logger::init();
1213 }
1214 }
1215
1216 #[gpui::test(iterations = 10)]
1217 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1218 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1219 let lang_registry = Arc::new(LanguageRegistry::new());
1220 let fs = FakeFs::new(cx_a.background());
1221 cx_a.foreground().forbid_parking();
1222
1223 // Connect to a server as 2 clients.
1224 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1225 let client_a = server.create_client(&mut cx_a, "user_a").await;
1226 let client_b = server.create_client(&mut cx_b, "user_b").await;
1227
1228 // Share a project as client A
1229 fs.insert_tree(
1230 "/a",
1231 json!({
1232 ".zed.toml": r#"collaborators = ["user_b"]"#,
1233 "a.txt": "a-contents",
1234 "b.txt": "b-contents",
1235 }),
1236 )
1237 .await;
1238 let project_a = cx_a.update(|cx| {
1239 Project::local(
1240 client_a.clone(),
1241 client_a.user_store.clone(),
1242 lang_registry.clone(),
1243 fs.clone(),
1244 cx,
1245 )
1246 });
1247 let (worktree_a, _) = project_a
1248 .update(&mut cx_a, |p, cx| {
1249 p.find_or_create_local_worktree("/a", false, cx)
1250 })
1251 .await
1252 .unwrap();
1253 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1254 worktree_a
1255 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1256 .await;
1257 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1258 project_a
1259 .update(&mut cx_a, |p, cx| p.share(cx))
1260 .await
1261 .unwrap();
1262
1263 // Join that project as client B
1264 let project_b = Project::remote(
1265 project_id,
1266 client_b.clone(),
1267 client_b.user_store.clone(),
1268 lang_registry.clone(),
1269 fs.clone(),
1270 &mut cx_b.to_async(),
1271 )
1272 .await
1273 .unwrap();
1274
1275 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1276 assert_eq!(
1277 project
1278 .collaborators()
1279 .get(&client_a.peer_id)
1280 .unwrap()
1281 .user
1282 .github_login,
1283 "user_a"
1284 );
1285 project.replica_id()
1286 });
1287 project_a
1288 .condition(&cx_a, |tree, _| {
1289 tree.collaborators()
1290 .get(&client_b.peer_id)
1291 .map_or(false, |collaborator| {
1292 collaborator.replica_id == replica_id_b
1293 && collaborator.user.github_login == "user_b"
1294 })
1295 })
1296 .await;
1297
1298 // Open the same file as client B and client A.
1299 let buffer_b = project_b
1300 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1301 .await
1302 .unwrap();
1303 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1304 buffer_b.read_with(&cx_b, |buf, cx| {
1305 assert_eq!(buf.read(cx).text(), "b-contents")
1306 });
1307 project_a.read_with(&cx_a, |project, cx| {
1308 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1309 });
1310 let buffer_a = project_a
1311 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1312 .await
1313 .unwrap();
1314
1315 let editor_b = cx_b.add_view(window_b, |cx| {
1316 Editor::for_buffer(
1317 buffer_b,
1318 None,
1319 watch::channel_with(Settings::test(cx)).1,
1320 cx,
1321 )
1322 });
1323
1324 // TODO
1325 // // Create a selection set as client B and see that selection set as client A.
1326 // buffer_a
1327 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1328 // .await;
1329
1330 // Edit the buffer as client B and see that edit as client A.
1331 editor_b.update(&mut cx_b, |editor, cx| {
1332 editor.handle_input(&Input("ok, ".into()), cx)
1333 });
1334 buffer_a
1335 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1336 .await;
1337
1338 // TODO
1339 // // Remove the selection set as client B, see those selections disappear as client A.
1340 cx_b.update(move |_| drop(editor_b));
1341 // buffer_a
1342 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1343 // .await;
1344
1345 // Close the buffer as client A, see that the buffer is closed.
1346 cx_a.update(move |_| drop(buffer_a));
1347 project_a
1348 .condition(&cx_a, |project, cx| {
1349 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1350 })
1351 .await;
1352
1353 // Dropping the client B's project removes client B from client A's collaborators.
1354 cx_b.update(move |_| drop(project_b));
1355 project_a
1356 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1357 .await;
1358 }
1359
1360 #[gpui::test(iterations = 10)]
1361 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1362 let lang_registry = Arc::new(LanguageRegistry::new());
1363 let fs = FakeFs::new(cx_a.background());
1364 cx_a.foreground().forbid_parking();
1365
1366 // Connect to a server as 2 clients.
1367 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1368 let client_a = server.create_client(&mut cx_a, "user_a").await;
1369 let client_b = server.create_client(&mut cx_b, "user_b").await;
1370
1371 // Share a project as client A
1372 fs.insert_tree(
1373 "/a",
1374 json!({
1375 ".zed.toml": r#"collaborators = ["user_b"]"#,
1376 "a.txt": "a-contents",
1377 "b.txt": "b-contents",
1378 }),
1379 )
1380 .await;
1381 let project_a = cx_a.update(|cx| {
1382 Project::local(
1383 client_a.clone(),
1384 client_a.user_store.clone(),
1385 lang_registry.clone(),
1386 fs.clone(),
1387 cx,
1388 )
1389 });
1390 let (worktree_a, _) = project_a
1391 .update(&mut cx_a, |p, cx| {
1392 p.find_or_create_local_worktree("/a", false, cx)
1393 })
1394 .await
1395 .unwrap();
1396 worktree_a
1397 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1398 .await;
1399 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1400 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1401 project_a
1402 .update(&mut cx_a, |p, cx| p.share(cx))
1403 .await
1404 .unwrap();
1405 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1406
1407 // Join that project as client B
1408 let project_b = Project::remote(
1409 project_id,
1410 client_b.clone(),
1411 client_b.user_store.clone(),
1412 lang_registry.clone(),
1413 fs.clone(),
1414 &mut cx_b.to_async(),
1415 )
1416 .await
1417 .unwrap();
1418 project_b
1419 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1420 .await
1421 .unwrap();
1422
1423 // Unshare the project as client A
1424 project_a
1425 .update(&mut cx_a, |project, cx| project.unshare(cx))
1426 .await
1427 .unwrap();
1428 project_b
1429 .condition(&mut cx_b, |project, _| project.is_read_only())
1430 .await;
1431 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1432 drop(project_b);
1433
1434 // Share the project again and ensure guests can still join.
1435 project_a
1436 .update(&mut cx_a, |project, cx| project.share(cx))
1437 .await
1438 .unwrap();
1439 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1440
1441 let project_c = Project::remote(
1442 project_id,
1443 client_b.clone(),
1444 client_b.user_store.clone(),
1445 lang_registry.clone(),
1446 fs.clone(),
1447 &mut cx_b.to_async(),
1448 )
1449 .await
1450 .unwrap();
1451 project_c
1452 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1453 .await
1454 .unwrap();
1455 }
1456
1457 #[gpui::test(iterations = 10)]
1458 async fn test_propagate_saves_and_fs_changes(
1459 mut cx_a: TestAppContext,
1460 mut cx_b: TestAppContext,
1461 mut cx_c: TestAppContext,
1462 ) {
1463 let lang_registry = Arc::new(LanguageRegistry::new());
1464 let fs = FakeFs::new(cx_a.background());
1465 cx_a.foreground().forbid_parking();
1466
1467 // Connect to a server as 3 clients.
1468 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1469 let client_a = server.create_client(&mut cx_a, "user_a").await;
1470 let client_b = server.create_client(&mut cx_b, "user_b").await;
1471 let client_c = server.create_client(&mut cx_c, "user_c").await;
1472
1473 // Share a worktree as client A.
1474 fs.insert_tree(
1475 "/a",
1476 json!({
1477 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1478 "file1": "",
1479 "file2": ""
1480 }),
1481 )
1482 .await;
1483 let project_a = cx_a.update(|cx| {
1484 Project::local(
1485 client_a.clone(),
1486 client_a.user_store.clone(),
1487 lang_registry.clone(),
1488 fs.clone(),
1489 cx,
1490 )
1491 });
1492 let (worktree_a, _) = project_a
1493 .update(&mut cx_a, |p, cx| {
1494 p.find_or_create_local_worktree("/a", false, cx)
1495 })
1496 .await
1497 .unwrap();
1498 worktree_a
1499 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1500 .await;
1501 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1502 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1503 project_a
1504 .update(&mut cx_a, |p, cx| p.share(cx))
1505 .await
1506 .unwrap();
1507
1508 // Join that worktree as clients B and C.
1509 let project_b = Project::remote(
1510 project_id,
1511 client_b.clone(),
1512 client_b.user_store.clone(),
1513 lang_registry.clone(),
1514 fs.clone(),
1515 &mut cx_b.to_async(),
1516 )
1517 .await
1518 .unwrap();
1519 let project_c = Project::remote(
1520 project_id,
1521 client_c.clone(),
1522 client_c.user_store.clone(),
1523 lang_registry.clone(),
1524 fs.clone(),
1525 &mut cx_c.to_async(),
1526 )
1527 .await
1528 .unwrap();
1529 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1530 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1531
1532 // Open and edit a buffer as both guests B and C.
1533 let buffer_b = project_b
1534 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1535 .await
1536 .unwrap();
1537 let buffer_c = project_c
1538 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1539 .await
1540 .unwrap();
1541 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1542 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1543
1544 // Open and edit that buffer as the host.
1545 let buffer_a = project_a
1546 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1547 .await
1548 .unwrap();
1549
1550 buffer_a
1551 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1552 .await;
1553 buffer_a.update(&mut cx_a, |buf, cx| {
1554 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1555 });
1556
1557 // Wait for edits to propagate
1558 buffer_a
1559 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1560 .await;
1561 buffer_b
1562 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1563 .await;
1564 buffer_c
1565 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1566 .await;
1567
1568 // Edit the buffer as the host and concurrently save as guest B.
1569 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1570 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1571 save_b.await.unwrap();
1572 assert_eq!(
1573 fs.load("/a/file1".as_ref()).await.unwrap(),
1574 "hi-a, i-am-c, i-am-b, i-am-a"
1575 );
1576 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1577 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1578 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1579
1580 // Make changes on host's file system, see those changes on guest worktrees.
1581 fs.rename(
1582 "/a/file1".as_ref(),
1583 "/a/file1-renamed".as_ref(),
1584 Default::default(),
1585 )
1586 .await
1587 .unwrap();
1588
1589 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1590 .await
1591 .unwrap();
1592 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1593
1594 worktree_a
1595 .condition(&cx_a, |tree, _| {
1596 tree.paths()
1597 .map(|p| p.to_string_lossy())
1598 .collect::<Vec<_>>()
1599 == [".zed.toml", "file1-renamed", "file3", "file4"]
1600 })
1601 .await;
1602 worktree_b
1603 .condition(&cx_b, |tree, _| {
1604 tree.paths()
1605 .map(|p| p.to_string_lossy())
1606 .collect::<Vec<_>>()
1607 == [".zed.toml", "file1-renamed", "file3", "file4"]
1608 })
1609 .await;
1610 worktree_c
1611 .condition(&cx_c, |tree, _| {
1612 tree.paths()
1613 .map(|p| p.to_string_lossy())
1614 .collect::<Vec<_>>()
1615 == [".zed.toml", "file1-renamed", "file3", "file4"]
1616 })
1617 .await;
1618
1619 // Ensure buffer files are updated as well.
1620 buffer_a
1621 .condition(&cx_a, |buf, _| {
1622 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1623 })
1624 .await;
1625 buffer_b
1626 .condition(&cx_b, |buf, _| {
1627 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1628 })
1629 .await;
1630 buffer_c
1631 .condition(&cx_c, |buf, _| {
1632 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1633 })
1634 .await;
1635 }
1636
1637 #[gpui::test(iterations = 10)]
1638 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1639 cx_a.foreground().forbid_parking();
1640 let lang_registry = Arc::new(LanguageRegistry::new());
1641 let fs = FakeFs::new(cx_a.background());
1642
1643 // Connect to a server as 2 clients.
1644 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1645 let client_a = server.create_client(&mut cx_a, "user_a").await;
1646 let client_b = server.create_client(&mut cx_b, "user_b").await;
1647
1648 // Share a project as client A
1649 fs.insert_tree(
1650 "/dir",
1651 json!({
1652 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1653 "a.txt": "a-contents",
1654 }),
1655 )
1656 .await;
1657
1658 let project_a = cx_a.update(|cx| {
1659 Project::local(
1660 client_a.clone(),
1661 client_a.user_store.clone(),
1662 lang_registry.clone(),
1663 fs.clone(),
1664 cx,
1665 )
1666 });
1667 let (worktree_a, _) = project_a
1668 .update(&mut cx_a, |p, cx| {
1669 p.find_or_create_local_worktree("/dir", false, cx)
1670 })
1671 .await
1672 .unwrap();
1673 worktree_a
1674 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1675 .await;
1676 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1677 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1678 project_a
1679 .update(&mut cx_a, |p, cx| p.share(cx))
1680 .await
1681 .unwrap();
1682
1683 // Join that project as client B
1684 let project_b = Project::remote(
1685 project_id,
1686 client_b.clone(),
1687 client_b.user_store.clone(),
1688 lang_registry.clone(),
1689 fs.clone(),
1690 &mut cx_b.to_async(),
1691 )
1692 .await
1693 .unwrap();
1694
1695 // Open a buffer as client B
1696 let buffer_b = project_b
1697 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1698 .await
1699 .unwrap();
1700
1701 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1702 buffer_b.read_with(&cx_b, |buf, _| {
1703 assert!(buf.is_dirty());
1704 assert!(!buf.has_conflict());
1705 });
1706
1707 buffer_b
1708 .update(&mut cx_b, |buf, cx| buf.save(cx))
1709 .await
1710 .unwrap();
1711 buffer_b
1712 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1713 .await;
1714 buffer_b.read_with(&cx_b, |buf, _| {
1715 assert!(!buf.has_conflict());
1716 });
1717
1718 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1719 buffer_b.read_with(&cx_b, |buf, _| {
1720 assert!(buf.is_dirty());
1721 assert!(!buf.has_conflict());
1722 });
1723 }
1724
1725 #[gpui::test(iterations = 10)]
1726 async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1727 cx_a.foreground().forbid_parking();
1728 let lang_registry = Arc::new(LanguageRegistry::new());
1729 let fs = FakeFs::new(cx_a.background());
1730
1731 // Connect to a server as 2 clients.
1732 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1733 let client_a = server.create_client(&mut cx_a, "user_a").await;
1734 let client_b = server.create_client(&mut cx_b, "user_b").await;
1735
1736 // Share a project as client A
1737 fs.insert_tree(
1738 "/dir",
1739 json!({
1740 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1741 "a.txt": "a-contents",
1742 }),
1743 )
1744 .await;
1745
1746 let project_a = cx_a.update(|cx| {
1747 Project::local(
1748 client_a.clone(),
1749 client_a.user_store.clone(),
1750 lang_registry.clone(),
1751 fs.clone(),
1752 cx,
1753 )
1754 });
1755 let (worktree_a, _) = project_a
1756 .update(&mut cx_a, |p, cx| {
1757 p.find_or_create_local_worktree("/dir", false, cx)
1758 })
1759 .await
1760 .unwrap();
1761 worktree_a
1762 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1763 .await;
1764 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1765 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1766 project_a
1767 .update(&mut cx_a, |p, cx| p.share(cx))
1768 .await
1769 .unwrap();
1770
1771 // Join that project as client B
1772 let project_b = Project::remote(
1773 project_id,
1774 client_b.clone(),
1775 client_b.user_store.clone(),
1776 lang_registry.clone(),
1777 fs.clone(),
1778 &mut cx_b.to_async(),
1779 )
1780 .await
1781 .unwrap();
1782 let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1783
1784 // Open a buffer as client B
1785 let buffer_b = project_b
1786 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1787 .await
1788 .unwrap();
1789 buffer_b.read_with(&cx_b, |buf, _| {
1790 assert!(!buf.is_dirty());
1791 assert!(!buf.has_conflict());
1792 });
1793
1794 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1795 .await
1796 .unwrap();
1797 buffer_b
1798 .condition(&cx_b, |buf, _| {
1799 buf.text() == "new contents" && !buf.is_dirty()
1800 })
1801 .await;
1802 buffer_b.read_with(&cx_b, |buf, _| {
1803 assert!(!buf.has_conflict());
1804 });
1805 }
1806
1807 #[gpui::test(iterations = 10)]
1808 async fn test_editing_while_guest_opens_buffer(
1809 mut cx_a: TestAppContext,
1810 mut cx_b: TestAppContext,
1811 ) {
1812 cx_a.foreground().forbid_parking();
1813 let lang_registry = Arc::new(LanguageRegistry::new());
1814 let fs = FakeFs::new(cx_a.background());
1815
1816 // Connect to a server as 2 clients.
1817 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1818 let client_a = server.create_client(&mut cx_a, "user_a").await;
1819 let client_b = server.create_client(&mut cx_b, "user_b").await;
1820
1821 // Share a project as client A
1822 fs.insert_tree(
1823 "/dir",
1824 json!({
1825 ".zed.toml": r#"collaborators = ["user_b"]"#,
1826 "a.txt": "a-contents",
1827 }),
1828 )
1829 .await;
1830 let project_a = cx_a.update(|cx| {
1831 Project::local(
1832 client_a.clone(),
1833 client_a.user_store.clone(),
1834 lang_registry.clone(),
1835 fs.clone(),
1836 cx,
1837 )
1838 });
1839 let (worktree_a, _) = project_a
1840 .update(&mut cx_a, |p, cx| {
1841 p.find_or_create_local_worktree("/dir", false, cx)
1842 })
1843 .await
1844 .unwrap();
1845 worktree_a
1846 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1847 .await;
1848 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1849 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1850 project_a
1851 .update(&mut cx_a, |p, cx| p.share(cx))
1852 .await
1853 .unwrap();
1854
1855 // Join that project as client B
1856 let project_b = Project::remote(
1857 project_id,
1858 client_b.clone(),
1859 client_b.user_store.clone(),
1860 lang_registry.clone(),
1861 fs.clone(),
1862 &mut cx_b.to_async(),
1863 )
1864 .await
1865 .unwrap();
1866
1867 // Open a buffer as client A
1868 let buffer_a = project_a
1869 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1870 .await
1871 .unwrap();
1872
1873 // Start opening the same buffer as client B
1874 let buffer_b = cx_b
1875 .background()
1876 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1877
1878 // Edit the buffer as client A while client B is still opening it.
1879 cx_b.background().simulate_random_delay().await;
1880 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1881 cx_b.background().simulate_random_delay().await;
1882 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1883
1884 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1885 let buffer_b = buffer_b.await.unwrap();
1886 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1887 }
1888
1889 #[gpui::test(iterations = 10)]
1890 async fn test_leaving_worktree_while_opening_buffer(
1891 mut cx_a: TestAppContext,
1892 mut cx_b: TestAppContext,
1893 ) {
1894 cx_a.foreground().forbid_parking();
1895 let lang_registry = Arc::new(LanguageRegistry::new());
1896 let fs = FakeFs::new(cx_a.background());
1897
1898 // Connect to a server as 2 clients.
1899 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1900 let client_a = server.create_client(&mut cx_a, "user_a").await;
1901 let client_b = server.create_client(&mut cx_b, "user_b").await;
1902
1903 // Share a project as client A
1904 fs.insert_tree(
1905 "/dir",
1906 json!({
1907 ".zed.toml": r#"collaborators = ["user_b"]"#,
1908 "a.txt": "a-contents",
1909 }),
1910 )
1911 .await;
1912 let project_a = cx_a.update(|cx| {
1913 Project::local(
1914 client_a.clone(),
1915 client_a.user_store.clone(),
1916 lang_registry.clone(),
1917 fs.clone(),
1918 cx,
1919 )
1920 });
1921 let (worktree_a, _) = project_a
1922 .update(&mut cx_a, |p, cx| {
1923 p.find_or_create_local_worktree("/dir", false, cx)
1924 })
1925 .await
1926 .unwrap();
1927 worktree_a
1928 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1929 .await;
1930 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1931 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1932 project_a
1933 .update(&mut cx_a, |p, cx| p.share(cx))
1934 .await
1935 .unwrap();
1936
1937 // Join that project as client B
1938 let project_b = Project::remote(
1939 project_id,
1940 client_b.clone(),
1941 client_b.user_store.clone(),
1942 lang_registry.clone(),
1943 fs.clone(),
1944 &mut cx_b.to_async(),
1945 )
1946 .await
1947 .unwrap();
1948
1949 // See that a guest has joined as client A.
1950 project_a
1951 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1952 .await;
1953
1954 // Begin opening a buffer as client B, but leave the project before the open completes.
1955 let buffer_b = cx_b
1956 .background()
1957 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1958 cx_b.update(|_| drop(project_b));
1959 drop(buffer_b);
1960
1961 // See that the guest has left.
1962 project_a
1963 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1964 .await;
1965 }
1966
1967 #[gpui::test(iterations = 10)]
1968 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1969 cx_a.foreground().forbid_parking();
1970 let lang_registry = Arc::new(LanguageRegistry::new());
1971 let fs = FakeFs::new(cx_a.background());
1972
1973 // Connect to a server as 2 clients.
1974 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1975 let client_a = server.create_client(&mut cx_a, "user_a").await;
1976 let client_b = server.create_client(&mut cx_b, "user_b").await;
1977
1978 // Share a project as client A
1979 fs.insert_tree(
1980 "/a",
1981 json!({
1982 ".zed.toml": r#"collaborators = ["user_b"]"#,
1983 "a.txt": "a-contents",
1984 "b.txt": "b-contents",
1985 }),
1986 )
1987 .await;
1988 let project_a = cx_a.update(|cx| {
1989 Project::local(
1990 client_a.clone(),
1991 client_a.user_store.clone(),
1992 lang_registry.clone(),
1993 fs.clone(),
1994 cx,
1995 )
1996 });
1997 let (worktree_a, _) = project_a
1998 .update(&mut cx_a, |p, cx| {
1999 p.find_or_create_local_worktree("/a", false, cx)
2000 })
2001 .await
2002 .unwrap();
2003 worktree_a
2004 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2005 .await;
2006 let project_id = project_a
2007 .update(&mut cx_a, |project, _| project.next_remote_id())
2008 .await;
2009 project_a
2010 .update(&mut cx_a, |project, cx| project.share(cx))
2011 .await
2012 .unwrap();
2013
2014 // Join that project as client B
2015 let _project_b = Project::remote(
2016 project_id,
2017 client_b.clone(),
2018 client_b.user_store.clone(),
2019 lang_registry.clone(),
2020 fs.clone(),
2021 &mut cx_b.to_async(),
2022 )
2023 .await
2024 .unwrap();
2025
2026 // See that a guest has joined as client A.
2027 project_a
2028 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2029 .await;
2030
2031 // Drop client B's connection and ensure client A observes client B leaving the worktree.
2032 client_b.disconnect(&cx_b.to_async()).unwrap();
2033 project_a
2034 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2035 .await;
2036 }
2037
2038 #[gpui::test(iterations = 10)]
2039 async fn test_collaborating_with_diagnostics(
2040 mut cx_a: TestAppContext,
2041 mut cx_b: TestAppContext,
2042 ) {
2043 cx_a.foreground().forbid_parking();
2044 let mut lang_registry = Arc::new(LanguageRegistry::new());
2045 let fs = FakeFs::new(cx_a.background());
2046
2047 // Set up a fake language server.
2048 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2049 Arc::get_mut(&mut lang_registry)
2050 .unwrap()
2051 .add(Arc::new(Language::new(
2052 LanguageConfig {
2053 name: "Rust".into(),
2054 path_suffixes: vec!["rs".to_string()],
2055 language_server: Some(language_server_config),
2056 ..Default::default()
2057 },
2058 Some(tree_sitter_rust::language()),
2059 )));
2060
2061 // Connect to a server as 2 clients.
2062 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2063 let client_a = server.create_client(&mut cx_a, "user_a").await;
2064 let client_b = server.create_client(&mut cx_b, "user_b").await;
2065
2066 // Share a project as client A
2067 fs.insert_tree(
2068 "/a",
2069 json!({
2070 ".zed.toml": r#"collaborators = ["user_b"]"#,
2071 "a.rs": "let one = two",
2072 "other.rs": "",
2073 }),
2074 )
2075 .await;
2076 let project_a = cx_a.update(|cx| {
2077 Project::local(
2078 client_a.clone(),
2079 client_a.user_store.clone(),
2080 lang_registry.clone(),
2081 fs.clone(),
2082 cx,
2083 )
2084 });
2085 let (worktree_a, _) = project_a
2086 .update(&mut cx_a, |p, cx| {
2087 p.find_or_create_local_worktree("/a", false, cx)
2088 })
2089 .await
2090 .unwrap();
2091 worktree_a
2092 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2093 .await;
2094 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2095 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2096 project_a
2097 .update(&mut cx_a, |p, cx| p.share(cx))
2098 .await
2099 .unwrap();
2100
2101 // Cause the language server to start.
2102 let _ = cx_a
2103 .background()
2104 .spawn(project_a.update(&mut cx_a, |project, cx| {
2105 project.open_buffer(
2106 ProjectPath {
2107 worktree_id,
2108 path: Path::new("other.rs").into(),
2109 },
2110 cx,
2111 )
2112 }))
2113 .await
2114 .unwrap();
2115
2116 // Simulate a language server reporting errors for a file.
2117 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2118 fake_language_server
2119 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2120 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2121 version: None,
2122 diagnostics: vec![lsp::Diagnostic {
2123 severity: Some(lsp::DiagnosticSeverity::ERROR),
2124 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2125 message: "message 1".to_string(),
2126 ..Default::default()
2127 }],
2128 })
2129 .await;
2130
2131 // Wait for server to see the diagnostics update.
2132 server
2133 .condition(|store| {
2134 let worktree = store
2135 .project(project_id)
2136 .unwrap()
2137 .share
2138 .as_ref()
2139 .unwrap()
2140 .worktrees
2141 .get(&worktree_id.to_proto())
2142 .unwrap();
2143
2144 !worktree.diagnostic_summaries.is_empty()
2145 })
2146 .await;
2147
2148 // Join the worktree as client B.
2149 let project_b = Project::remote(
2150 project_id,
2151 client_b.clone(),
2152 client_b.user_store.clone(),
2153 lang_registry.clone(),
2154 fs.clone(),
2155 &mut cx_b.to_async(),
2156 )
2157 .await
2158 .unwrap();
2159
2160 project_b.read_with(&cx_b, |project, cx| {
2161 assert_eq!(
2162 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2163 &[(
2164 ProjectPath {
2165 worktree_id,
2166 path: Arc::from(Path::new("a.rs")),
2167 },
2168 DiagnosticSummary {
2169 error_count: 1,
2170 warning_count: 0,
2171 ..Default::default()
2172 },
2173 )]
2174 )
2175 });
2176
2177 // Simulate a language server reporting more errors for a file.
2178 fake_language_server
2179 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2180 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2181 version: None,
2182 diagnostics: vec![
2183 lsp::Diagnostic {
2184 severity: Some(lsp::DiagnosticSeverity::ERROR),
2185 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2186 message: "message 1".to_string(),
2187 ..Default::default()
2188 },
2189 lsp::Diagnostic {
2190 severity: Some(lsp::DiagnosticSeverity::WARNING),
2191 range: lsp::Range::new(
2192 lsp::Position::new(0, 10),
2193 lsp::Position::new(0, 13),
2194 ),
2195 message: "message 2".to_string(),
2196 ..Default::default()
2197 },
2198 ],
2199 })
2200 .await;
2201
2202 // Client b gets the updated summaries
2203 project_b
2204 .condition(&cx_b, |project, cx| {
2205 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2206 == &[(
2207 ProjectPath {
2208 worktree_id,
2209 path: Arc::from(Path::new("a.rs")),
2210 },
2211 DiagnosticSummary {
2212 error_count: 1,
2213 warning_count: 1,
2214 ..Default::default()
2215 },
2216 )]
2217 })
2218 .await;
2219
2220 // Open the file with the errors on client B. They should be present.
2221 let buffer_b = cx_b
2222 .background()
2223 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2224 .await
2225 .unwrap();
2226
2227 buffer_b.read_with(&cx_b, |buffer, _| {
2228 assert_eq!(
2229 buffer
2230 .snapshot()
2231 .diagnostics_in_range::<_, Point>(0..buffer.len())
2232 .map(|entry| entry)
2233 .collect::<Vec<_>>(),
2234 &[
2235 DiagnosticEntry {
2236 range: Point::new(0, 4)..Point::new(0, 7),
2237 diagnostic: Diagnostic {
2238 group_id: 0,
2239 message: "message 1".to_string(),
2240 severity: lsp::DiagnosticSeverity::ERROR,
2241 is_primary: true,
2242 ..Default::default()
2243 }
2244 },
2245 DiagnosticEntry {
2246 range: Point::new(0, 10)..Point::new(0, 13),
2247 diagnostic: Diagnostic {
2248 group_id: 1,
2249 severity: lsp::DiagnosticSeverity::WARNING,
2250 message: "message 2".to_string(),
2251 is_primary: true,
2252 ..Default::default()
2253 }
2254 }
2255 ]
2256 );
2257 });
2258 }
2259
2260 #[gpui::test(iterations = 10)]
2261 async fn test_collaborating_with_completion(
2262 mut cx_a: TestAppContext,
2263 mut cx_b: TestAppContext,
2264 ) {
2265 cx_a.foreground().forbid_parking();
2266 let mut lang_registry = Arc::new(LanguageRegistry::new());
2267 let fs = FakeFs::new(cx_a.background());
2268
2269 // Set up a fake language server.
2270 let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2271 language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2272 completion_provider: Some(lsp::CompletionOptions {
2273 trigger_characters: Some(vec![".".to_string()]),
2274 ..Default::default()
2275 }),
2276 ..Default::default()
2277 });
2278 Arc::get_mut(&mut lang_registry)
2279 .unwrap()
2280 .add(Arc::new(Language::new(
2281 LanguageConfig {
2282 name: "Rust".into(),
2283 path_suffixes: vec!["rs".to_string()],
2284 language_server: Some(language_server_config),
2285 ..Default::default()
2286 },
2287 Some(tree_sitter_rust::language()),
2288 )));
2289
2290 // Connect to a server as 2 clients.
2291 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2292 let client_a = server.create_client(&mut cx_a, "user_a").await;
2293 let client_b = server.create_client(&mut cx_b, "user_b").await;
2294
2295 // Share a project as client A
2296 fs.insert_tree(
2297 "/a",
2298 json!({
2299 ".zed.toml": r#"collaborators = ["user_b"]"#,
2300 "main.rs": "fn main() { a }",
2301 "other.rs": "",
2302 }),
2303 )
2304 .await;
2305 let project_a = cx_a.update(|cx| {
2306 Project::local(
2307 client_a.clone(),
2308 client_a.user_store.clone(),
2309 lang_registry.clone(),
2310 fs.clone(),
2311 cx,
2312 )
2313 });
2314 let (worktree_a, _) = project_a
2315 .update(&mut cx_a, |p, cx| {
2316 p.find_or_create_local_worktree("/a", false, cx)
2317 })
2318 .await
2319 .unwrap();
2320 worktree_a
2321 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2322 .await;
2323 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2324 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2325 project_a
2326 .update(&mut cx_a, |p, cx| p.share(cx))
2327 .await
2328 .unwrap();
2329
2330 // Join the worktree as client B.
2331 let project_b = Project::remote(
2332 project_id,
2333 client_b.clone(),
2334 client_b.user_store.clone(),
2335 lang_registry.clone(),
2336 fs.clone(),
2337 &mut cx_b.to_async(),
2338 )
2339 .await
2340 .unwrap();
2341
2342 // Open a file in an editor as the guest.
2343 let buffer_b = project_b
2344 .update(&mut cx_b, |p, cx| {
2345 p.open_buffer((worktree_id, "main.rs"), cx)
2346 })
2347 .await
2348 .unwrap();
2349 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2350 let editor_b = cx_b.add_view(window_b, |cx| {
2351 Editor::for_buffer(
2352 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2353 Some(project_b.clone()),
2354 watch::channel_with(Settings::test(cx)).1,
2355 cx,
2356 )
2357 });
2358
2359 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2360 buffer_b
2361 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2362 .await;
2363
2364 // Type a completion trigger character as the guest.
2365 editor_b.update(&mut cx_b, |editor, cx| {
2366 editor.select_ranges([13..13], None, cx);
2367 editor.handle_input(&Input(".".into()), cx);
2368 cx.focus(&editor_b);
2369 });
2370
2371 // Receive a completion request as the host's language server.
2372 // Return some completions from the host's language server.
2373 cx_a.foreground().start_waiting();
2374 fake_language_server
2375 .handle_request::<lsp::request::Completion, _>(|params, _| {
2376 assert_eq!(
2377 params.text_document_position.text_document.uri,
2378 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2379 );
2380 assert_eq!(
2381 params.text_document_position.position,
2382 lsp::Position::new(0, 14),
2383 );
2384
2385 Some(lsp::CompletionResponse::Array(vec![
2386 lsp::CompletionItem {
2387 label: "first_method(…)".into(),
2388 detail: Some("fn(&mut self, B) -> C".into()),
2389 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2390 new_text: "first_method($1)".to_string(),
2391 range: lsp::Range::new(
2392 lsp::Position::new(0, 14),
2393 lsp::Position::new(0, 14),
2394 ),
2395 })),
2396 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2397 ..Default::default()
2398 },
2399 lsp::CompletionItem {
2400 label: "second_method(…)".into(),
2401 detail: Some("fn(&mut self, C) -> D<E>".into()),
2402 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2403 new_text: "second_method()".to_string(),
2404 range: lsp::Range::new(
2405 lsp::Position::new(0, 14),
2406 lsp::Position::new(0, 14),
2407 ),
2408 })),
2409 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2410 ..Default::default()
2411 },
2412 ]))
2413 })
2414 .next()
2415 .await
2416 .unwrap();
2417 cx_a.foreground().finish_waiting();
2418
2419 // Open the buffer on the host.
2420 let buffer_a = project_a
2421 .update(&mut cx_a, |p, cx| {
2422 p.open_buffer((worktree_id, "main.rs"), cx)
2423 })
2424 .await
2425 .unwrap();
2426 buffer_a
2427 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2428 .await;
2429
2430 // Confirm a completion on the guest.
2431 editor_b
2432 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2433 .await;
2434 editor_b.update(&mut cx_b, |editor, cx| {
2435 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2436 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2437 });
2438
2439 // Return a resolved completion from the host's language server.
2440 // The resolved completion has an additional text edit.
2441 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2442 |params, _| {
2443 assert_eq!(params.label, "first_method(…)");
2444 lsp::CompletionItem {
2445 label: "first_method(…)".into(),
2446 detail: Some("fn(&mut self, B) -> C".into()),
2447 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2448 new_text: "first_method($1)".to_string(),
2449 range: lsp::Range::new(
2450 lsp::Position::new(0, 14),
2451 lsp::Position::new(0, 14),
2452 ),
2453 })),
2454 additional_text_edits: Some(vec![lsp::TextEdit {
2455 new_text: "use d::SomeTrait;\n".to_string(),
2456 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2457 }]),
2458 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2459 ..Default::default()
2460 }
2461 },
2462 );
2463
2464 // The additional edit is applied.
2465 buffer_a
2466 .condition(&cx_a, |buffer, _| {
2467 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2468 })
2469 .await;
2470 buffer_b
2471 .condition(&cx_b, |buffer, _| {
2472 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2473 })
2474 .await;
2475 }
2476
2477 #[gpui::test(iterations = 10)]
2478 async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2479 cx_a.foreground().forbid_parking();
2480 let mut lang_registry = Arc::new(LanguageRegistry::new());
2481 let fs = FakeFs::new(cx_a.background());
2482
2483 // Set up a fake language server.
2484 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2485 Arc::get_mut(&mut lang_registry)
2486 .unwrap()
2487 .add(Arc::new(Language::new(
2488 LanguageConfig {
2489 name: "Rust".into(),
2490 path_suffixes: vec!["rs".to_string()],
2491 language_server: Some(language_server_config),
2492 ..Default::default()
2493 },
2494 Some(tree_sitter_rust::language()),
2495 )));
2496
2497 // Connect to a server as 2 clients.
2498 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2499 let client_a = server.create_client(&mut cx_a, "user_a").await;
2500 let client_b = server.create_client(&mut cx_b, "user_b").await;
2501
2502 // Share a project as client A
2503 fs.insert_tree(
2504 "/a",
2505 json!({
2506 ".zed.toml": r#"collaborators = ["user_b"]"#,
2507 "a.rs": "let one = two",
2508 }),
2509 )
2510 .await;
2511 let project_a = cx_a.update(|cx| {
2512 Project::local(
2513 client_a.clone(),
2514 client_a.user_store.clone(),
2515 lang_registry.clone(),
2516 fs.clone(),
2517 cx,
2518 )
2519 });
2520 let (worktree_a, _) = project_a
2521 .update(&mut cx_a, |p, cx| {
2522 p.find_or_create_local_worktree("/a", false, cx)
2523 })
2524 .await
2525 .unwrap();
2526 worktree_a
2527 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2528 .await;
2529 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2530 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2531 project_a
2532 .update(&mut cx_a, |p, cx| p.share(cx))
2533 .await
2534 .unwrap();
2535
2536 // Join the worktree as client B.
2537 let project_b = Project::remote(
2538 project_id,
2539 client_b.clone(),
2540 client_b.user_store.clone(),
2541 lang_registry.clone(),
2542 fs.clone(),
2543 &mut cx_b.to_async(),
2544 )
2545 .await
2546 .unwrap();
2547
2548 let buffer_b = cx_b
2549 .background()
2550 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2551 .await
2552 .unwrap();
2553
2554 let format = project_b.update(&mut cx_b, |project, cx| {
2555 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2556 });
2557
2558 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2559 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2560 Some(vec![
2561 lsp::TextEdit {
2562 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2563 new_text: "h".to_string(),
2564 },
2565 lsp::TextEdit {
2566 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2567 new_text: "y".to_string(),
2568 },
2569 ])
2570 });
2571
2572 format.await.unwrap();
2573 assert_eq!(
2574 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2575 "let honey = two"
2576 );
2577 }
2578
2579 #[gpui::test(iterations = 10)]
2580 async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2581 cx_a.foreground().forbid_parking();
2582 let mut lang_registry = Arc::new(LanguageRegistry::new());
2583 let fs = FakeFs::new(cx_a.background());
2584 fs.insert_tree(
2585 "/root-1",
2586 json!({
2587 ".zed.toml": r#"collaborators = ["user_b"]"#,
2588 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2589 }),
2590 )
2591 .await;
2592 fs.insert_tree(
2593 "/root-2",
2594 json!({
2595 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2596 }),
2597 )
2598 .await;
2599
2600 // Set up a fake language server.
2601 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2602 Arc::get_mut(&mut lang_registry)
2603 .unwrap()
2604 .add(Arc::new(Language::new(
2605 LanguageConfig {
2606 name: "Rust".into(),
2607 path_suffixes: vec!["rs".to_string()],
2608 language_server: Some(language_server_config),
2609 ..Default::default()
2610 },
2611 Some(tree_sitter_rust::language()),
2612 )));
2613
2614 // Connect to a server as 2 clients.
2615 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2616 let client_a = server.create_client(&mut cx_a, "user_a").await;
2617 let client_b = server.create_client(&mut cx_b, "user_b").await;
2618
2619 // Share a project as client A
2620 let project_a = cx_a.update(|cx| {
2621 Project::local(
2622 client_a.clone(),
2623 client_a.user_store.clone(),
2624 lang_registry.clone(),
2625 fs.clone(),
2626 cx,
2627 )
2628 });
2629 let (worktree_a, _) = project_a
2630 .update(&mut cx_a, |p, cx| {
2631 p.find_or_create_local_worktree("/root-1", false, cx)
2632 })
2633 .await
2634 .unwrap();
2635 worktree_a
2636 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2637 .await;
2638 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2639 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2640 project_a
2641 .update(&mut cx_a, |p, cx| p.share(cx))
2642 .await
2643 .unwrap();
2644
2645 // Join the worktree as client B.
2646 let project_b = Project::remote(
2647 project_id,
2648 client_b.clone(),
2649 client_b.user_store.clone(),
2650 lang_registry.clone(),
2651 fs.clone(),
2652 &mut cx_b.to_async(),
2653 )
2654 .await
2655 .unwrap();
2656
2657 // Open the file on client B.
2658 let buffer_b = cx_b
2659 .background()
2660 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2661 .await
2662 .unwrap();
2663
2664 // Request the definition of a symbol as the guest.
2665 let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2666
2667 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2668 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2669 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2670 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2671 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2672 )))
2673 });
2674
2675 let definitions_1 = definitions_1.await.unwrap();
2676 cx_b.read(|cx| {
2677 assert_eq!(definitions_1.len(), 1);
2678 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2679 let target_buffer = definitions_1[0].buffer.read(cx);
2680 assert_eq!(
2681 target_buffer.text(),
2682 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2683 );
2684 assert_eq!(
2685 definitions_1[0].range.to_point(target_buffer),
2686 Point::new(0, 6)..Point::new(0, 9)
2687 );
2688 });
2689
2690 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2691 // the previous call to `definition`.
2692 let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2693 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2694 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2695 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2696 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2697 )))
2698 });
2699
2700 let definitions_2 = definitions_2.await.unwrap();
2701 cx_b.read(|cx| {
2702 assert_eq!(definitions_2.len(), 1);
2703 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2704 let target_buffer = definitions_2[0].buffer.read(cx);
2705 assert_eq!(
2706 target_buffer.text(),
2707 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2708 );
2709 assert_eq!(
2710 definitions_2[0].range.to_point(target_buffer),
2711 Point::new(1, 6)..Point::new(1, 11)
2712 );
2713 });
2714 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2715
2716 cx_b.update(|_| {
2717 drop(definitions_1);
2718 drop(definitions_2);
2719 });
2720 project_b
2721 .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2722 .await;
2723 }
2724
2725 #[gpui::test(iterations = 10)]
2726 async fn test_references(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2727 cx_a.foreground().forbid_parking();
2728 let mut lang_registry = Arc::new(LanguageRegistry::new());
2729 let fs = FakeFs::new(cx_a.background());
2730 fs.insert_tree(
2731 "/root-1",
2732 json!({
2733 ".zed.toml": r#"collaborators = ["user_b"]"#,
2734 "one.rs": "const ONE: usize = 1;",
2735 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2736 }),
2737 )
2738 .await;
2739 fs.insert_tree(
2740 "/root-2",
2741 json!({
2742 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2743 }),
2744 )
2745 .await;
2746
2747 // Set up a fake language server.
2748 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2749 Arc::get_mut(&mut lang_registry)
2750 .unwrap()
2751 .add(Arc::new(Language::new(
2752 LanguageConfig {
2753 name: "Rust".into(),
2754 path_suffixes: vec!["rs".to_string()],
2755 language_server: Some(language_server_config),
2756 ..Default::default()
2757 },
2758 Some(tree_sitter_rust::language()),
2759 )));
2760
2761 // Connect to a server as 2 clients.
2762 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2763 let client_a = server.create_client(&mut cx_a, "user_a").await;
2764 let client_b = server.create_client(&mut cx_b, "user_b").await;
2765
2766 // Share a project as client A
2767 let project_a = cx_a.update(|cx| {
2768 Project::local(
2769 client_a.clone(),
2770 client_a.user_store.clone(),
2771 lang_registry.clone(),
2772 fs.clone(),
2773 cx,
2774 )
2775 });
2776 let (worktree_a, _) = project_a
2777 .update(&mut cx_a, |p, cx| {
2778 p.find_or_create_local_worktree("/root-1", false, cx)
2779 })
2780 .await
2781 .unwrap();
2782 worktree_a
2783 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2784 .await;
2785 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2786 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2787 project_a
2788 .update(&mut cx_a, |p, cx| p.share(cx))
2789 .await
2790 .unwrap();
2791
2792 // Join the worktree as client B.
2793 let project_b = Project::remote(
2794 project_id,
2795 client_b.clone(),
2796 client_b.user_store.clone(),
2797 lang_registry.clone(),
2798 fs.clone(),
2799 &mut cx_b.to_async(),
2800 )
2801 .await
2802 .unwrap();
2803
2804 // Open the file on client B.
2805 let buffer_b = cx_b
2806 .background()
2807 .spawn(project_b.update(&mut cx_b, |p, cx| {
2808 p.open_buffer((worktree_id, "one.rs"), cx)
2809 }))
2810 .await
2811 .unwrap();
2812
2813 // Request references to a symbol as the guest.
2814 let references = project_b.update(&mut cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2815
2816 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2817 fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2818 assert_eq!(
2819 params.text_document_position.text_document.uri.as_str(),
2820 "file:///root-1/one.rs"
2821 );
2822 Some(vec![
2823 lsp::Location {
2824 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2825 range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2826 },
2827 lsp::Location {
2828 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2829 range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2830 },
2831 lsp::Location {
2832 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2833 range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2834 },
2835 ])
2836 });
2837
2838 let references = references.await.unwrap();
2839 cx_b.read(|cx| {
2840 assert_eq!(references.len(), 3);
2841 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2842
2843 let two_buffer = references[0].buffer.read(cx);
2844 let three_buffer = references[2].buffer.read(cx);
2845 assert_eq!(
2846 two_buffer.file().unwrap().path().as_ref(),
2847 Path::new("two.rs")
2848 );
2849 assert_eq!(references[1].buffer, references[0].buffer);
2850 assert_eq!(
2851 three_buffer.file().unwrap().full_path(cx),
2852 Path::new("three.rs")
2853 );
2854
2855 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2856 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2857 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2858 });
2859 }
2860
2861 #[gpui::test(iterations = 10)]
2862 async fn test_project_search(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2863 cx_a.foreground().forbid_parking();
2864 let lang_registry = Arc::new(LanguageRegistry::new());
2865 let fs = FakeFs::new(cx_a.background());
2866 fs.insert_tree(
2867 "/root-1",
2868 json!({
2869 ".zed.toml": r#"collaborators = ["user_b"]"#,
2870 "a": "hello world",
2871 "b": "goodnight moon",
2872 "c": "a world of goo",
2873 "d": "world champion of clown world",
2874 }),
2875 )
2876 .await;
2877 fs.insert_tree(
2878 "/root-2",
2879 json!({
2880 "e": "disney world is fun",
2881 }),
2882 )
2883 .await;
2884
2885 // Connect to a server as 2 clients.
2886 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2887 let client_a = server.create_client(&mut cx_a, "user_a").await;
2888 let client_b = server.create_client(&mut cx_b, "user_b").await;
2889
2890 // Share a project as client A
2891 let project_a = cx_a.update(|cx| {
2892 Project::local(
2893 client_a.clone(),
2894 client_a.user_store.clone(),
2895 lang_registry.clone(),
2896 fs.clone(),
2897 cx,
2898 )
2899 });
2900 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2901
2902 let (worktree_1, _) = project_a
2903 .update(&mut cx_a, |p, cx| {
2904 p.find_or_create_local_worktree("/root-1", false, cx)
2905 })
2906 .await
2907 .unwrap();
2908 worktree_1
2909 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2910 .await;
2911 let (worktree_2, _) = project_a
2912 .update(&mut cx_a, |p, cx| {
2913 p.find_or_create_local_worktree("/root-2", false, cx)
2914 })
2915 .await
2916 .unwrap();
2917 worktree_2
2918 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2919 .await;
2920
2921 eprintln!("sharing");
2922
2923 project_a
2924 .update(&mut cx_a, |p, cx| p.share(cx))
2925 .await
2926 .unwrap();
2927
2928 // Join the worktree as client B.
2929 let project_b = Project::remote(
2930 project_id,
2931 client_b.clone(),
2932 client_b.user_store.clone(),
2933 lang_registry.clone(),
2934 fs.clone(),
2935 &mut cx_b.to_async(),
2936 )
2937 .await
2938 .unwrap();
2939
2940 let results = project_b
2941 .update(&mut cx_b, |project, cx| {
2942 project.search(SearchQuery::text("world", false, false), cx)
2943 })
2944 .await
2945 .unwrap();
2946
2947 let mut ranges_by_path = results
2948 .into_iter()
2949 .map(|(buffer, ranges)| {
2950 buffer.read_with(&cx_b, |buffer, cx| {
2951 let path = buffer.file().unwrap().full_path(cx);
2952 let offset_ranges = ranges
2953 .into_iter()
2954 .map(|range| range.to_offset(buffer))
2955 .collect::<Vec<_>>();
2956 (path, offset_ranges)
2957 })
2958 })
2959 .collect::<Vec<_>>();
2960 ranges_by_path.sort_by_key(|(path, _)| path.clone());
2961
2962 assert_eq!(
2963 ranges_by_path,
2964 &[
2965 (PathBuf::from("root-1/a"), vec![6..11]),
2966 (PathBuf::from("root-1/c"), vec![2..7]),
2967 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2968 (PathBuf::from("root-2/e"), vec![7..12]),
2969 ]
2970 );
2971 }
2972
2973 #[gpui::test(iterations = 10)]
2974 async fn test_document_highlights(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2975 cx_a.foreground().forbid_parking();
2976 let lang_registry = Arc::new(LanguageRegistry::new());
2977 let fs = FakeFs::new(cx_a.background());
2978 fs.insert_tree(
2979 "/root-1",
2980 json!({
2981 ".zed.toml": r#"collaborators = ["user_b"]"#,
2982 "main.rs": "fn double(number: i32) -> i32 { number + number }",
2983 }),
2984 )
2985 .await;
2986
2987 // Set up a fake language server.
2988 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2989 lang_registry.add(Arc::new(Language::new(
2990 LanguageConfig {
2991 name: "Rust".into(),
2992 path_suffixes: vec!["rs".to_string()],
2993 language_server: Some(language_server_config),
2994 ..Default::default()
2995 },
2996 Some(tree_sitter_rust::language()),
2997 )));
2998
2999 // Connect to a server as 2 clients.
3000 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3001 let client_a = server.create_client(&mut cx_a, "user_a").await;
3002 let client_b = server.create_client(&mut cx_b, "user_b").await;
3003
3004 // Share a project as client A
3005 let project_a = cx_a.update(|cx| {
3006 Project::local(
3007 client_a.clone(),
3008 client_a.user_store.clone(),
3009 lang_registry.clone(),
3010 fs.clone(),
3011 cx,
3012 )
3013 });
3014 let (worktree_a, _) = project_a
3015 .update(&mut cx_a, |p, cx| {
3016 p.find_or_create_local_worktree("/root-1", false, cx)
3017 })
3018 .await
3019 .unwrap();
3020 worktree_a
3021 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3022 .await;
3023 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3024 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3025 project_a
3026 .update(&mut cx_a, |p, cx| p.share(cx))
3027 .await
3028 .unwrap();
3029
3030 // Join the worktree as client B.
3031 let project_b = Project::remote(
3032 project_id,
3033 client_b.clone(),
3034 client_b.user_store.clone(),
3035 lang_registry.clone(),
3036 fs.clone(),
3037 &mut cx_b.to_async(),
3038 )
3039 .await
3040 .unwrap();
3041
3042 // Open the file on client B.
3043 let buffer_b = cx_b
3044 .background()
3045 .spawn(project_b.update(&mut cx_b, |p, cx| {
3046 p.open_buffer((worktree_id, "main.rs"), cx)
3047 }))
3048 .await
3049 .unwrap();
3050
3051 // Request document highlights as the guest.
3052 let highlights =
3053 project_b.update(&mut cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
3054
3055 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3056 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
3057 |params, _| {
3058 assert_eq!(
3059 params
3060 .text_document_position_params
3061 .text_document
3062 .uri
3063 .as_str(),
3064 "file:///root-1/main.rs"
3065 );
3066 assert_eq!(
3067 params.text_document_position_params.position,
3068 lsp::Position::new(0, 34)
3069 );
3070 Some(vec![
3071 lsp::DocumentHighlight {
3072 kind: Some(lsp::DocumentHighlightKind::WRITE),
3073 range: lsp::Range::new(
3074 lsp::Position::new(0, 10),
3075 lsp::Position::new(0, 16),
3076 ),
3077 },
3078 lsp::DocumentHighlight {
3079 kind: Some(lsp::DocumentHighlightKind::READ),
3080 range: lsp::Range::new(
3081 lsp::Position::new(0, 32),
3082 lsp::Position::new(0, 38),
3083 ),
3084 },
3085 lsp::DocumentHighlight {
3086 kind: Some(lsp::DocumentHighlightKind::READ),
3087 range: lsp::Range::new(
3088 lsp::Position::new(0, 41),
3089 lsp::Position::new(0, 47),
3090 ),
3091 },
3092 ])
3093 },
3094 );
3095
3096 let highlights = highlights.await.unwrap();
3097 buffer_b.read_with(&cx_b, |buffer, _| {
3098 let snapshot = buffer.snapshot();
3099
3100 let highlights = highlights
3101 .into_iter()
3102 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3103 .collect::<Vec<_>>();
3104 assert_eq!(
3105 highlights,
3106 &[
3107 (lsp::DocumentHighlightKind::WRITE, 10..16),
3108 (lsp::DocumentHighlightKind::READ, 32..38),
3109 (lsp::DocumentHighlightKind::READ, 41..47)
3110 ]
3111 )
3112 });
3113 }
3114
3115 #[gpui::test(iterations = 10)]
3116 async fn test_project_symbols(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3117 cx_a.foreground().forbid_parking();
3118 let mut lang_registry = Arc::new(LanguageRegistry::new());
3119 let fs = FakeFs::new(cx_a.background());
3120 fs.insert_tree(
3121 "/code",
3122 json!({
3123 "crate-1": {
3124 ".zed.toml": r#"collaborators = ["user_b"]"#,
3125 "one.rs": "const ONE: usize = 1;",
3126 },
3127 "crate-2": {
3128 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3129 },
3130 "private": {
3131 "passwords.txt": "the-password",
3132 }
3133 }),
3134 )
3135 .await;
3136
3137 // Set up a fake language server.
3138 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3139 Arc::get_mut(&mut lang_registry)
3140 .unwrap()
3141 .add(Arc::new(Language::new(
3142 LanguageConfig {
3143 name: "Rust".into(),
3144 path_suffixes: vec!["rs".to_string()],
3145 language_server: Some(language_server_config),
3146 ..Default::default()
3147 },
3148 Some(tree_sitter_rust::language()),
3149 )));
3150
3151 // Connect to a server as 2 clients.
3152 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3153 let client_a = server.create_client(&mut cx_a, "user_a").await;
3154 let client_b = server.create_client(&mut cx_b, "user_b").await;
3155
3156 // Share a project as client A
3157 let project_a = cx_a.update(|cx| {
3158 Project::local(
3159 client_a.clone(),
3160 client_a.user_store.clone(),
3161 lang_registry.clone(),
3162 fs.clone(),
3163 cx,
3164 )
3165 });
3166 let (worktree_a, _) = project_a
3167 .update(&mut cx_a, |p, cx| {
3168 p.find_or_create_local_worktree("/code/crate-1", false, cx)
3169 })
3170 .await
3171 .unwrap();
3172 worktree_a
3173 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3174 .await;
3175 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3176 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3177 project_a
3178 .update(&mut cx_a, |p, cx| p.share(cx))
3179 .await
3180 .unwrap();
3181
3182 // Join the worktree as client B.
3183 let project_b = Project::remote(
3184 project_id,
3185 client_b.clone(),
3186 client_b.user_store.clone(),
3187 lang_registry.clone(),
3188 fs.clone(),
3189 &mut cx_b.to_async(),
3190 )
3191 .await
3192 .unwrap();
3193
3194 // Cause the language server to start.
3195 let _buffer = cx_b
3196 .background()
3197 .spawn(project_b.update(&mut cx_b, |p, cx| {
3198 p.open_buffer((worktree_id, "one.rs"), cx)
3199 }))
3200 .await
3201 .unwrap();
3202
3203 // Request the definition of a symbol as the guest.
3204 let symbols = project_b.update(&mut cx_b, |p, cx| p.symbols("two", cx));
3205 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3206 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3207 #[allow(deprecated)]
3208 Some(vec![lsp::SymbolInformation {
3209 name: "TWO".into(),
3210 location: lsp::Location {
3211 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3212 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3213 },
3214 kind: lsp::SymbolKind::CONSTANT,
3215 tags: None,
3216 container_name: None,
3217 deprecated: None,
3218 }])
3219 });
3220
3221 let symbols = symbols.await.unwrap();
3222 assert_eq!(symbols.len(), 1);
3223 assert_eq!(symbols[0].name, "TWO");
3224
3225 // Open one of the returned symbols.
3226 let buffer_b_2 = project_b
3227 .update(&mut cx_b, |project, cx| {
3228 project.open_buffer_for_symbol(&symbols[0], cx)
3229 })
3230 .await
3231 .unwrap();
3232 buffer_b_2.read_with(&cx_b, |buffer, _| {
3233 assert_eq!(
3234 buffer.file().unwrap().path().as_ref(),
3235 Path::new("../crate-2/two.rs")
3236 );
3237 });
3238
3239 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3240 let mut fake_symbol = symbols[0].clone();
3241 fake_symbol.path = Path::new("/code/secrets").into();
3242 let error = project_b
3243 .update(&mut cx_b, |project, cx| {
3244 project.open_buffer_for_symbol(&fake_symbol, cx)
3245 })
3246 .await
3247 .unwrap_err();
3248 assert!(error.to_string().contains("invalid symbol signature"));
3249 }
3250
3251 #[gpui::test(iterations = 10)]
3252 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3253 mut cx_a: TestAppContext,
3254 mut cx_b: TestAppContext,
3255 mut rng: StdRng,
3256 ) {
3257 cx_a.foreground().forbid_parking();
3258 let mut lang_registry = Arc::new(LanguageRegistry::new());
3259 let fs = FakeFs::new(cx_a.background());
3260 fs.insert_tree(
3261 "/root",
3262 json!({
3263 ".zed.toml": r#"collaborators = ["user_b"]"#,
3264 "a.rs": "const ONE: usize = b::TWO;",
3265 "b.rs": "const TWO: usize = 2",
3266 }),
3267 )
3268 .await;
3269
3270 // Set up a fake language server.
3271 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3272
3273 Arc::get_mut(&mut lang_registry)
3274 .unwrap()
3275 .add(Arc::new(Language::new(
3276 LanguageConfig {
3277 name: "Rust".into(),
3278 path_suffixes: vec!["rs".to_string()],
3279 language_server: Some(language_server_config),
3280 ..Default::default()
3281 },
3282 Some(tree_sitter_rust::language()),
3283 )));
3284
3285 // Connect to a server as 2 clients.
3286 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3287 let client_a = server.create_client(&mut cx_a, "user_a").await;
3288 let client_b = server.create_client(&mut cx_b, "user_b").await;
3289
3290 // Share a project as client A
3291 let project_a = cx_a.update(|cx| {
3292 Project::local(
3293 client_a.clone(),
3294 client_a.user_store.clone(),
3295 lang_registry.clone(),
3296 fs.clone(),
3297 cx,
3298 )
3299 });
3300
3301 let (worktree_a, _) = project_a
3302 .update(&mut cx_a, |p, cx| {
3303 p.find_or_create_local_worktree("/root", false, cx)
3304 })
3305 .await
3306 .unwrap();
3307 worktree_a
3308 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3309 .await;
3310 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3311 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3312 project_a
3313 .update(&mut cx_a, |p, cx| p.share(cx))
3314 .await
3315 .unwrap();
3316
3317 // Join the worktree as client B.
3318 let project_b = Project::remote(
3319 project_id,
3320 client_b.clone(),
3321 client_b.user_store.clone(),
3322 lang_registry.clone(),
3323 fs.clone(),
3324 &mut cx_b.to_async(),
3325 )
3326 .await
3327 .unwrap();
3328
3329 let buffer_b1 = cx_b
3330 .background()
3331 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3332 .await
3333 .unwrap();
3334
3335 let definitions;
3336 let buffer_b2;
3337 if rng.gen() {
3338 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3339 buffer_b2 =
3340 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3341 } else {
3342 buffer_b2 =
3343 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3344 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3345 }
3346
3347 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3348 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3349 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3350 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3351 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3352 )))
3353 });
3354
3355 let buffer_b2 = buffer_b2.await.unwrap();
3356 let definitions = definitions.await.unwrap();
3357 assert_eq!(definitions.len(), 1);
3358 assert_eq!(definitions[0].buffer, buffer_b2);
3359 }
3360
3361 #[gpui::test(iterations = 10)]
3362 async fn test_collaborating_with_code_actions(
3363 mut cx_a: TestAppContext,
3364 mut cx_b: TestAppContext,
3365 ) {
3366 cx_a.foreground().forbid_parking();
3367 let mut lang_registry = Arc::new(LanguageRegistry::new());
3368 let fs = FakeFs::new(cx_a.background());
3369 let mut path_openers_b = Vec::new();
3370 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3371
3372 // Set up a fake language server.
3373 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3374 Arc::get_mut(&mut lang_registry)
3375 .unwrap()
3376 .add(Arc::new(Language::new(
3377 LanguageConfig {
3378 name: "Rust".into(),
3379 path_suffixes: vec!["rs".to_string()],
3380 language_server: Some(language_server_config),
3381 ..Default::default()
3382 },
3383 Some(tree_sitter_rust::language()),
3384 )));
3385
3386 // Connect to a server as 2 clients.
3387 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3388 let client_a = server.create_client(&mut cx_a, "user_a").await;
3389 let client_b = server.create_client(&mut cx_b, "user_b").await;
3390
3391 // Share a project as client A
3392 fs.insert_tree(
3393 "/a",
3394 json!({
3395 ".zed.toml": r#"collaborators = ["user_b"]"#,
3396 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3397 "other.rs": "pub fn foo() -> usize { 4 }",
3398 }),
3399 )
3400 .await;
3401 let project_a = cx_a.update(|cx| {
3402 Project::local(
3403 client_a.clone(),
3404 client_a.user_store.clone(),
3405 lang_registry.clone(),
3406 fs.clone(),
3407 cx,
3408 )
3409 });
3410 let (worktree_a, _) = project_a
3411 .update(&mut cx_a, |p, cx| {
3412 p.find_or_create_local_worktree("/a", false, cx)
3413 })
3414 .await
3415 .unwrap();
3416 worktree_a
3417 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3418 .await;
3419 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3420 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3421 project_a
3422 .update(&mut cx_a, |p, cx| p.share(cx))
3423 .await
3424 .unwrap();
3425
3426 // Join the worktree as client B.
3427 let project_b = Project::remote(
3428 project_id,
3429 client_b.clone(),
3430 client_b.user_store.clone(),
3431 lang_registry.clone(),
3432 fs.clone(),
3433 &mut cx_b.to_async(),
3434 )
3435 .await
3436 .unwrap();
3437 let mut params = cx_b.update(WorkspaceParams::test);
3438 params.languages = lang_registry.clone();
3439 params.client = client_b.client.clone();
3440 params.user_store = client_b.user_store.clone();
3441 params.project = project_b;
3442 params.path_openers = path_openers_b.into();
3443
3444 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3445 let editor_b = workspace_b
3446 .update(&mut cx_b, |workspace, cx| {
3447 workspace.open_path((worktree_id, "main.rs").into(), cx)
3448 })
3449 .await
3450 .unwrap()
3451 .downcast::<Editor>()
3452 .unwrap();
3453
3454 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3455 fake_language_server
3456 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3457 assert_eq!(
3458 params.text_document.uri,
3459 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3460 );
3461 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3462 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3463 None
3464 })
3465 .next()
3466 .await;
3467
3468 // Move cursor to a location that contains code actions.
3469 editor_b.update(&mut cx_b, |editor, cx| {
3470 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3471 cx.focus(&editor_b);
3472 });
3473
3474 fake_language_server
3475 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3476 assert_eq!(
3477 params.text_document.uri,
3478 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3479 );
3480 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3481 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3482
3483 Some(vec![lsp::CodeActionOrCommand::CodeAction(
3484 lsp::CodeAction {
3485 title: "Inline into all callers".to_string(),
3486 edit: Some(lsp::WorkspaceEdit {
3487 changes: Some(
3488 [
3489 (
3490 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3491 vec![lsp::TextEdit::new(
3492 lsp::Range::new(
3493 lsp::Position::new(1, 22),
3494 lsp::Position::new(1, 34),
3495 ),
3496 "4".to_string(),
3497 )],
3498 ),
3499 (
3500 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3501 vec![lsp::TextEdit::new(
3502 lsp::Range::new(
3503 lsp::Position::new(0, 0),
3504 lsp::Position::new(0, 27),
3505 ),
3506 "".to_string(),
3507 )],
3508 ),
3509 ]
3510 .into_iter()
3511 .collect(),
3512 ),
3513 ..Default::default()
3514 }),
3515 data: Some(json!({
3516 "codeActionParams": {
3517 "range": {
3518 "start": {"line": 1, "column": 31},
3519 "end": {"line": 1, "column": 31},
3520 }
3521 }
3522 })),
3523 ..Default::default()
3524 },
3525 )])
3526 })
3527 .next()
3528 .await;
3529
3530 // Toggle code actions and wait for them to display.
3531 editor_b.update(&mut cx_b, |editor, cx| {
3532 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3533 });
3534 editor_b
3535 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3536 .await;
3537
3538 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3539
3540 // Confirming the code action will trigger a resolve request.
3541 let confirm_action = workspace_b
3542 .update(&mut cx_b, |workspace, cx| {
3543 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3544 })
3545 .unwrap();
3546 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3547 lsp::CodeAction {
3548 title: "Inline into all callers".to_string(),
3549 edit: Some(lsp::WorkspaceEdit {
3550 changes: Some(
3551 [
3552 (
3553 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3554 vec![lsp::TextEdit::new(
3555 lsp::Range::new(
3556 lsp::Position::new(1, 22),
3557 lsp::Position::new(1, 34),
3558 ),
3559 "4".to_string(),
3560 )],
3561 ),
3562 (
3563 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3564 vec![lsp::TextEdit::new(
3565 lsp::Range::new(
3566 lsp::Position::new(0, 0),
3567 lsp::Position::new(0, 27),
3568 ),
3569 "".to_string(),
3570 )],
3571 ),
3572 ]
3573 .into_iter()
3574 .collect(),
3575 ),
3576 ..Default::default()
3577 }),
3578 ..Default::default()
3579 }
3580 });
3581
3582 // After the action is confirmed, an editor containing both modified files is opened.
3583 confirm_action.await.unwrap();
3584 let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3585 workspace
3586 .active_item(cx)
3587 .unwrap()
3588 .downcast::<Editor>()
3589 .unwrap()
3590 });
3591 code_action_editor.update(&mut cx_b, |editor, cx| {
3592 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3593 editor.undo(&Undo, cx);
3594 assert_eq!(
3595 editor.text(cx),
3596 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3597 );
3598 editor.redo(&Redo, cx);
3599 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3600 });
3601 }
3602
3603 #[gpui::test(iterations = 10)]
3604 async fn test_collaborating_with_renames(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3605 cx_a.foreground().forbid_parking();
3606 let mut lang_registry = Arc::new(LanguageRegistry::new());
3607 let fs = FakeFs::new(cx_a.background());
3608 let mut path_openers_b = Vec::new();
3609 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3610
3611 // Set up a fake language server.
3612 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3613 Arc::get_mut(&mut lang_registry)
3614 .unwrap()
3615 .add(Arc::new(Language::new(
3616 LanguageConfig {
3617 name: "Rust".into(),
3618 path_suffixes: vec!["rs".to_string()],
3619 language_server: Some(language_server_config),
3620 ..Default::default()
3621 },
3622 Some(tree_sitter_rust::language()),
3623 )));
3624
3625 // Connect to a server as 2 clients.
3626 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3627 let client_a = server.create_client(&mut cx_a, "user_a").await;
3628 let client_b = server.create_client(&mut cx_b, "user_b").await;
3629
3630 // Share a project as client A
3631 fs.insert_tree(
3632 "/dir",
3633 json!({
3634 ".zed.toml": r#"collaborators = ["user_b"]"#,
3635 "one.rs": "const ONE: usize = 1;",
3636 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3637 }),
3638 )
3639 .await;
3640 let project_a = cx_a.update(|cx| {
3641 Project::local(
3642 client_a.clone(),
3643 client_a.user_store.clone(),
3644 lang_registry.clone(),
3645 fs.clone(),
3646 cx,
3647 )
3648 });
3649 let (worktree_a, _) = project_a
3650 .update(&mut cx_a, |p, cx| {
3651 p.find_or_create_local_worktree("/dir", false, cx)
3652 })
3653 .await
3654 .unwrap();
3655 worktree_a
3656 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3657 .await;
3658 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3659 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3660 project_a
3661 .update(&mut cx_a, |p, cx| p.share(cx))
3662 .await
3663 .unwrap();
3664
3665 // Join the worktree as client B.
3666 let project_b = Project::remote(
3667 project_id,
3668 client_b.clone(),
3669 client_b.user_store.clone(),
3670 lang_registry.clone(),
3671 fs.clone(),
3672 &mut cx_b.to_async(),
3673 )
3674 .await
3675 .unwrap();
3676 let mut params = cx_b.update(WorkspaceParams::test);
3677 params.languages = lang_registry.clone();
3678 params.client = client_b.client.clone();
3679 params.user_store = client_b.user_store.clone();
3680 params.project = project_b;
3681 params.path_openers = path_openers_b.into();
3682
3683 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3684 let editor_b = workspace_b
3685 .update(&mut cx_b, |workspace, cx| {
3686 workspace.open_path((worktree_id, "one.rs").into(), cx)
3687 })
3688 .await
3689 .unwrap()
3690 .downcast::<Editor>()
3691 .unwrap();
3692 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3693
3694 // Move cursor to a location that can be renamed.
3695 let prepare_rename = editor_b.update(&mut cx_b, |editor, cx| {
3696 editor.select_ranges([7..7], None, cx);
3697 editor.rename(&Rename, cx).unwrap()
3698 });
3699
3700 fake_language_server
3701 .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3702 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3703 assert_eq!(params.position, lsp::Position::new(0, 7));
3704 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3705 lsp::Position::new(0, 6),
3706 lsp::Position::new(0, 9),
3707 )))
3708 })
3709 .next()
3710 .await
3711 .unwrap();
3712 prepare_rename.await.unwrap();
3713 editor_b.update(&mut cx_b, |editor, cx| {
3714 let rename = editor.pending_rename().unwrap();
3715 let buffer = editor.buffer().read(cx).snapshot(cx);
3716 assert_eq!(
3717 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3718 6..9
3719 );
3720 rename.editor.update(cx, |rename_editor, cx| {
3721 rename_editor.buffer().update(cx, |rename_buffer, cx| {
3722 rename_buffer.edit([0..3], "THREE", cx);
3723 });
3724 });
3725 });
3726
3727 let confirm_rename = workspace_b.update(&mut cx_b, |workspace, cx| {
3728 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3729 });
3730 fake_language_server
3731 .handle_request::<lsp::request::Rename, _>(|params, _| {
3732 assert_eq!(
3733 params.text_document_position.text_document.uri.as_str(),
3734 "file:///dir/one.rs"
3735 );
3736 assert_eq!(
3737 params.text_document_position.position,
3738 lsp::Position::new(0, 6)
3739 );
3740 assert_eq!(params.new_name, "THREE");
3741 Some(lsp::WorkspaceEdit {
3742 changes: Some(
3743 [
3744 (
3745 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3746 vec![lsp::TextEdit::new(
3747 lsp::Range::new(
3748 lsp::Position::new(0, 6),
3749 lsp::Position::new(0, 9),
3750 ),
3751 "THREE".to_string(),
3752 )],
3753 ),
3754 (
3755 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3756 vec![
3757 lsp::TextEdit::new(
3758 lsp::Range::new(
3759 lsp::Position::new(0, 24),
3760 lsp::Position::new(0, 27),
3761 ),
3762 "THREE".to_string(),
3763 ),
3764 lsp::TextEdit::new(
3765 lsp::Range::new(
3766 lsp::Position::new(0, 35),
3767 lsp::Position::new(0, 38),
3768 ),
3769 "THREE".to_string(),
3770 ),
3771 ],
3772 ),
3773 ]
3774 .into_iter()
3775 .collect(),
3776 ),
3777 ..Default::default()
3778 })
3779 })
3780 .next()
3781 .await
3782 .unwrap();
3783 confirm_rename.await.unwrap();
3784
3785 let rename_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3786 workspace
3787 .active_item(cx)
3788 .unwrap()
3789 .downcast::<Editor>()
3790 .unwrap()
3791 });
3792 rename_editor.update(&mut cx_b, |editor, cx| {
3793 assert_eq!(
3794 editor.text(cx),
3795 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3796 );
3797 editor.undo(&Undo, cx);
3798 assert_eq!(
3799 editor.text(cx),
3800 "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3801 );
3802 editor.redo(&Redo, cx);
3803 assert_eq!(
3804 editor.text(cx),
3805 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3806 );
3807 });
3808
3809 // Ensure temporary rename edits cannot be undone/redone.
3810 editor_b.update(&mut cx_b, |editor, cx| {
3811 editor.undo(&Undo, cx);
3812 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3813 editor.undo(&Undo, cx);
3814 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3815 editor.redo(&Redo, cx);
3816 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3817 })
3818 }
3819
3820 #[gpui::test(iterations = 10)]
3821 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3822 cx_a.foreground().forbid_parking();
3823
3824 // Connect to a server as 2 clients.
3825 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3826 let client_a = server.create_client(&mut cx_a, "user_a").await;
3827 let client_b = server.create_client(&mut cx_b, "user_b").await;
3828
3829 // Create an org that includes these 2 users.
3830 let db = &server.app_state.db;
3831 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3832 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3833 .await
3834 .unwrap();
3835 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3836 .await
3837 .unwrap();
3838
3839 // Create a channel that includes all the users.
3840 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3841 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3842 .await
3843 .unwrap();
3844 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3845 .await
3846 .unwrap();
3847 db.create_channel_message(
3848 channel_id,
3849 client_b.current_user_id(&cx_b),
3850 "hello A, it's B.",
3851 OffsetDateTime::now_utc(),
3852 1,
3853 )
3854 .await
3855 .unwrap();
3856
3857 let channels_a = cx_a
3858 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3859 channels_a
3860 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3861 .await;
3862 channels_a.read_with(&cx_a, |list, _| {
3863 assert_eq!(
3864 list.available_channels().unwrap(),
3865 &[ChannelDetails {
3866 id: channel_id.to_proto(),
3867 name: "test-channel".to_string()
3868 }]
3869 )
3870 });
3871 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3872 this.get_channel(channel_id.to_proto(), cx).unwrap()
3873 });
3874 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3875 channel_a
3876 .condition(&cx_a, |channel, _| {
3877 channel_messages(channel)
3878 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3879 })
3880 .await;
3881
3882 let channels_b = cx_b
3883 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3884 channels_b
3885 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3886 .await;
3887 channels_b.read_with(&cx_b, |list, _| {
3888 assert_eq!(
3889 list.available_channels().unwrap(),
3890 &[ChannelDetails {
3891 id: channel_id.to_proto(),
3892 name: "test-channel".to_string()
3893 }]
3894 )
3895 });
3896
3897 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3898 this.get_channel(channel_id.to_proto(), cx).unwrap()
3899 });
3900 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3901 channel_b
3902 .condition(&cx_b, |channel, _| {
3903 channel_messages(channel)
3904 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3905 })
3906 .await;
3907
3908 channel_a
3909 .update(&mut cx_a, |channel, cx| {
3910 channel
3911 .send_message("oh, hi B.".to_string(), cx)
3912 .unwrap()
3913 .detach();
3914 let task = channel.send_message("sup".to_string(), cx).unwrap();
3915 assert_eq!(
3916 channel_messages(channel),
3917 &[
3918 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3919 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3920 ("user_a".to_string(), "sup".to_string(), true)
3921 ]
3922 );
3923 task
3924 })
3925 .await
3926 .unwrap();
3927
3928 channel_b
3929 .condition(&cx_b, |channel, _| {
3930 channel_messages(channel)
3931 == [
3932 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3933 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3934 ("user_a".to_string(), "sup".to_string(), false),
3935 ]
3936 })
3937 .await;
3938
3939 assert_eq!(
3940 server
3941 .state()
3942 .await
3943 .channel(channel_id)
3944 .unwrap()
3945 .connection_ids
3946 .len(),
3947 2
3948 );
3949 cx_b.update(|_| drop(channel_b));
3950 server
3951 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3952 .await;
3953
3954 cx_a.update(|_| drop(channel_a));
3955 server
3956 .condition(|state| state.channel(channel_id).is_none())
3957 .await;
3958 }
3959
3960 #[gpui::test(iterations = 10)]
3961 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3962 cx_a.foreground().forbid_parking();
3963
3964 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3965 let client_a = server.create_client(&mut cx_a, "user_a").await;
3966
3967 let db = &server.app_state.db;
3968 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3969 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3970 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3971 .await
3972 .unwrap();
3973 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3974 .await
3975 .unwrap();
3976
3977 let channels_a = cx_a
3978 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3979 channels_a
3980 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3981 .await;
3982 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3983 this.get_channel(channel_id.to_proto(), cx).unwrap()
3984 });
3985
3986 // Messages aren't allowed to be too long.
3987 channel_a
3988 .update(&mut cx_a, |channel, cx| {
3989 let long_body = "this is long.\n".repeat(1024);
3990 channel.send_message(long_body, cx).unwrap()
3991 })
3992 .await
3993 .unwrap_err();
3994
3995 // Messages aren't allowed to be blank.
3996 channel_a.update(&mut cx_a, |channel, cx| {
3997 channel.send_message(String::new(), cx).unwrap_err()
3998 });
3999
4000 // Leading and trailing whitespace are trimmed.
4001 channel_a
4002 .update(&mut cx_a, |channel, cx| {
4003 channel
4004 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4005 .unwrap()
4006 })
4007 .await
4008 .unwrap();
4009 assert_eq!(
4010 db.get_channel_messages(channel_id, 10, None)
4011 .await
4012 .unwrap()
4013 .iter()
4014 .map(|m| &m.body)
4015 .collect::<Vec<_>>(),
4016 &["surrounded by whitespace"]
4017 );
4018 }
4019
4020 #[gpui::test(iterations = 10)]
4021 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
4022 cx_a.foreground().forbid_parking();
4023
4024 // Connect to a server as 2 clients.
4025 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4026 let client_a = server.create_client(&mut cx_a, "user_a").await;
4027 let client_b = server.create_client(&mut cx_b, "user_b").await;
4028 let mut status_b = client_b.status();
4029
4030 // Create an org that includes these 2 users.
4031 let db = &server.app_state.db;
4032 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4033 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4034 .await
4035 .unwrap();
4036 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4037 .await
4038 .unwrap();
4039
4040 // Create a channel that includes all the users.
4041 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4042 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4043 .await
4044 .unwrap();
4045 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4046 .await
4047 .unwrap();
4048 db.create_channel_message(
4049 channel_id,
4050 client_b.current_user_id(&cx_b),
4051 "hello A, it's B.",
4052 OffsetDateTime::now_utc(),
4053 2,
4054 )
4055 .await
4056 .unwrap();
4057
4058 let channels_a = cx_a
4059 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4060 channels_a
4061 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
4062 .await;
4063
4064 channels_a.read_with(&cx_a, |list, _| {
4065 assert_eq!(
4066 list.available_channels().unwrap(),
4067 &[ChannelDetails {
4068 id: channel_id.to_proto(),
4069 name: "test-channel".to_string()
4070 }]
4071 )
4072 });
4073 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
4074 this.get_channel(channel_id.to_proto(), cx).unwrap()
4075 });
4076 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
4077 channel_a
4078 .condition(&cx_a, |channel, _| {
4079 channel_messages(channel)
4080 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4081 })
4082 .await;
4083
4084 let channels_b = cx_b
4085 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4086 channels_b
4087 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
4088 .await;
4089 channels_b.read_with(&cx_b, |list, _| {
4090 assert_eq!(
4091 list.available_channels().unwrap(),
4092 &[ChannelDetails {
4093 id: channel_id.to_proto(),
4094 name: "test-channel".to_string()
4095 }]
4096 )
4097 });
4098
4099 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
4100 this.get_channel(channel_id.to_proto(), cx).unwrap()
4101 });
4102 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
4103 channel_b
4104 .condition(&cx_b, |channel, _| {
4105 channel_messages(channel)
4106 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4107 })
4108 .await;
4109
4110 // Disconnect client B, ensuring we can still access its cached channel data.
4111 server.forbid_connections();
4112 server.disconnect_client(client_b.current_user_id(&cx_b));
4113 while !matches!(
4114 status_b.next().await,
4115 Some(client::Status::ReconnectionError { .. })
4116 ) {}
4117
4118 channels_b.read_with(&cx_b, |channels, _| {
4119 assert_eq!(
4120 channels.available_channels().unwrap(),
4121 [ChannelDetails {
4122 id: channel_id.to_proto(),
4123 name: "test-channel".to_string()
4124 }]
4125 )
4126 });
4127 channel_b.read_with(&cx_b, |channel, _| {
4128 assert_eq!(
4129 channel_messages(channel),
4130 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4131 )
4132 });
4133
4134 // Send a message from client B while it is disconnected.
4135 channel_b
4136 .update(&mut cx_b, |channel, cx| {
4137 let task = channel
4138 .send_message("can you see this?".to_string(), cx)
4139 .unwrap();
4140 assert_eq!(
4141 channel_messages(channel),
4142 &[
4143 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4144 ("user_b".to_string(), "can you see this?".to_string(), true)
4145 ]
4146 );
4147 task
4148 })
4149 .await
4150 .unwrap_err();
4151
4152 // Send a message from client A while B is disconnected.
4153 channel_a
4154 .update(&mut cx_a, |channel, cx| {
4155 channel
4156 .send_message("oh, hi B.".to_string(), cx)
4157 .unwrap()
4158 .detach();
4159 let task = channel.send_message("sup".to_string(), cx).unwrap();
4160 assert_eq!(
4161 channel_messages(channel),
4162 &[
4163 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4164 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4165 ("user_a".to_string(), "sup".to_string(), true)
4166 ]
4167 );
4168 task
4169 })
4170 .await
4171 .unwrap();
4172
4173 // Give client B a chance to reconnect.
4174 server.allow_connections();
4175 cx_b.foreground().advance_clock(Duration::from_secs(10));
4176
4177 // Verify that B sees the new messages upon reconnection, as well as the message client B
4178 // sent while offline.
4179 channel_b
4180 .condition(&cx_b, |channel, _| {
4181 channel_messages(channel)
4182 == [
4183 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4184 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4185 ("user_a".to_string(), "sup".to_string(), false),
4186 ("user_b".to_string(), "can you see this?".to_string(), false),
4187 ]
4188 })
4189 .await;
4190
4191 // Ensure client A and B can communicate normally after reconnection.
4192 channel_a
4193 .update(&mut cx_a, |channel, cx| {
4194 channel.send_message("you online?".to_string(), cx).unwrap()
4195 })
4196 .await
4197 .unwrap();
4198 channel_b
4199 .condition(&cx_b, |channel, _| {
4200 channel_messages(channel)
4201 == [
4202 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4203 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4204 ("user_a".to_string(), "sup".to_string(), false),
4205 ("user_b".to_string(), "can you see this?".to_string(), false),
4206 ("user_a".to_string(), "you online?".to_string(), false),
4207 ]
4208 })
4209 .await;
4210
4211 channel_b
4212 .update(&mut cx_b, |channel, cx| {
4213 channel.send_message("yep".to_string(), cx).unwrap()
4214 })
4215 .await
4216 .unwrap();
4217 channel_a
4218 .condition(&cx_a, |channel, _| {
4219 channel_messages(channel)
4220 == [
4221 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4222 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4223 ("user_a".to_string(), "sup".to_string(), false),
4224 ("user_b".to_string(), "can you see this?".to_string(), false),
4225 ("user_a".to_string(), "you online?".to_string(), false),
4226 ("user_b".to_string(), "yep".to_string(), false),
4227 ]
4228 })
4229 .await;
4230 }
4231
4232 #[gpui::test(iterations = 10)]
4233 async fn test_contacts(
4234 mut cx_a: TestAppContext,
4235 mut cx_b: TestAppContext,
4236 mut cx_c: TestAppContext,
4237 ) {
4238 cx_a.foreground().forbid_parking();
4239 let lang_registry = Arc::new(LanguageRegistry::new());
4240 let fs = FakeFs::new(cx_a.background());
4241
4242 // Connect to a server as 3 clients.
4243 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4244 let client_a = server.create_client(&mut cx_a, "user_a").await;
4245 let client_b = server.create_client(&mut cx_b, "user_b").await;
4246 let client_c = server.create_client(&mut cx_c, "user_c").await;
4247
4248 // Share a worktree as client A.
4249 fs.insert_tree(
4250 "/a",
4251 json!({
4252 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4253 }),
4254 )
4255 .await;
4256
4257 let project_a = cx_a.update(|cx| {
4258 Project::local(
4259 client_a.clone(),
4260 client_a.user_store.clone(),
4261 lang_registry.clone(),
4262 fs.clone(),
4263 cx,
4264 )
4265 });
4266 let (worktree_a, _) = project_a
4267 .update(&mut cx_a, |p, cx| {
4268 p.find_or_create_local_worktree("/a", false, cx)
4269 })
4270 .await
4271 .unwrap();
4272 worktree_a
4273 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4274 .await;
4275
4276 client_a
4277 .user_store
4278 .condition(&cx_a, |user_store, _| {
4279 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4280 })
4281 .await;
4282 client_b
4283 .user_store
4284 .condition(&cx_b, |user_store, _| {
4285 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4286 })
4287 .await;
4288 client_c
4289 .user_store
4290 .condition(&cx_c, |user_store, _| {
4291 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4292 })
4293 .await;
4294
4295 let project_id = project_a
4296 .update(&mut cx_a, |project, _| project.next_remote_id())
4297 .await;
4298 project_a
4299 .update(&mut cx_a, |project, cx| project.share(cx))
4300 .await
4301 .unwrap();
4302
4303 let _project_b = Project::remote(
4304 project_id,
4305 client_b.clone(),
4306 client_b.user_store.clone(),
4307 lang_registry.clone(),
4308 fs.clone(),
4309 &mut cx_b.to_async(),
4310 )
4311 .await
4312 .unwrap();
4313
4314 client_a
4315 .user_store
4316 .condition(&cx_a, |user_store, _| {
4317 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4318 })
4319 .await;
4320 client_b
4321 .user_store
4322 .condition(&cx_b, |user_store, _| {
4323 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4324 })
4325 .await;
4326 client_c
4327 .user_store
4328 .condition(&cx_c, |user_store, _| {
4329 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4330 })
4331 .await;
4332
4333 project_a
4334 .condition(&cx_a, |project, _| {
4335 project.collaborators().contains_key(&client_b.peer_id)
4336 })
4337 .await;
4338
4339 cx_a.update(move |_| drop(project_a));
4340 client_a
4341 .user_store
4342 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4343 .await;
4344 client_b
4345 .user_store
4346 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4347 .await;
4348 client_c
4349 .user_store
4350 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4351 .await;
4352
4353 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4354 user_store
4355 .contacts()
4356 .iter()
4357 .map(|contact| {
4358 let worktrees = contact
4359 .projects
4360 .iter()
4361 .map(|p| {
4362 (
4363 p.worktree_root_names[0].as_str(),
4364 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4365 )
4366 })
4367 .collect();
4368 (contact.user.github_login.as_str(), worktrees)
4369 })
4370 .collect()
4371 }
4372 }
4373
4374 #[gpui::test(iterations = 100)]
4375 async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
4376 cx.foreground().forbid_parking();
4377 let max_peers = env::var("MAX_PEERS")
4378 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4379 .unwrap_or(5);
4380 let max_operations = env::var("OPERATIONS")
4381 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4382 .unwrap_or(10);
4383
4384 let rng = Arc::new(Mutex::new(rng));
4385
4386 let guest_lang_registry = Arc::new(LanguageRegistry::new());
4387 let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4388
4389 let fs = FakeFs::new(cx.background());
4390 fs.insert_tree(
4391 "/_collab",
4392 json!({
4393 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4394 }),
4395 )
4396 .await;
4397
4398 let operations = Rc::new(Cell::new(0));
4399 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4400 let mut clients = Vec::new();
4401
4402 let mut next_entity_id = 100000;
4403 let mut host_cx = TestAppContext::new(
4404 cx.foreground_platform(),
4405 cx.platform(),
4406 cx.foreground(),
4407 cx.background(),
4408 cx.font_cache(),
4409 next_entity_id,
4410 );
4411 let host = server.create_client(&mut host_cx, "host").await;
4412 let host_project = host_cx.update(|cx| {
4413 Project::local(
4414 host.client.clone(),
4415 host.user_store.clone(),
4416 Arc::new(LanguageRegistry::new()),
4417 fs.clone(),
4418 cx,
4419 )
4420 });
4421 let host_project_id = host_project
4422 .update(&mut host_cx, |p, _| p.next_remote_id())
4423 .await;
4424
4425 let (collab_worktree, _) = host_project
4426 .update(&mut host_cx, |project, cx| {
4427 project.find_or_create_local_worktree("/_collab", false, cx)
4428 })
4429 .await
4430 .unwrap();
4431 collab_worktree
4432 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4433 .await;
4434 host_project
4435 .update(&mut host_cx, |project, cx| project.share(cx))
4436 .await
4437 .unwrap();
4438
4439 clients.push(cx.foreground().spawn(host.simulate_host(
4440 host_project.clone(),
4441 language_server_config,
4442 operations.clone(),
4443 max_operations,
4444 rng.clone(),
4445 host_cx.clone(),
4446 )));
4447
4448 while operations.get() < max_operations {
4449 cx.background().simulate_random_delay().await;
4450 if clients.len() < max_peers && rng.lock().gen_bool(0.05) {
4451 operations.set(operations.get() + 1);
4452
4453 let guest_id = clients.len();
4454 log::info!("Adding guest {}", guest_id);
4455 next_entity_id += 100000;
4456 let mut guest_cx = TestAppContext::new(
4457 cx.foreground_platform(),
4458 cx.platform(),
4459 cx.foreground(),
4460 cx.background(),
4461 cx.font_cache(),
4462 next_entity_id,
4463 );
4464 let guest = server
4465 .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4466 .await;
4467 let guest_project = Project::remote(
4468 host_project_id,
4469 guest.client.clone(),
4470 guest.user_store.clone(),
4471 guest_lang_registry.clone(),
4472 fs.clone(),
4473 &mut guest_cx.to_async(),
4474 )
4475 .await
4476 .unwrap();
4477 clients.push(cx.foreground().spawn(guest.simulate_guest(
4478 guest_id,
4479 guest_project,
4480 operations.clone(),
4481 max_operations,
4482 rng.clone(),
4483 guest_cx,
4484 )));
4485
4486 log::info!("Guest {} added", guest_id);
4487 }
4488 }
4489
4490 let clients = futures::future::join_all(clients).await;
4491 cx.foreground().run_until_parked();
4492
4493 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4494 project
4495 .worktrees(cx)
4496 .map(|worktree| {
4497 let snapshot = worktree.read(cx).snapshot();
4498 (snapshot.id(), snapshot)
4499 })
4500 .collect::<BTreeMap<_, _>>()
4501 });
4502
4503 for (guest_client, guest_cx) in clients.iter().skip(1) {
4504 let guest_id = guest_client.client.id();
4505 let worktree_snapshots =
4506 guest_client
4507 .project
4508 .as_ref()
4509 .unwrap()
4510 .read_with(guest_cx, |project, cx| {
4511 project
4512 .worktrees(cx)
4513 .map(|worktree| {
4514 let worktree = worktree.read(cx);
4515 (worktree.id(), worktree.snapshot())
4516 })
4517 .collect::<BTreeMap<_, _>>()
4518 });
4519
4520 assert_eq!(
4521 worktree_snapshots.keys().collect::<Vec<_>>(),
4522 host_worktree_snapshots.keys().collect::<Vec<_>>(),
4523 "guest {} has different worktrees than the host",
4524 guest_id
4525 );
4526 for (id, host_snapshot) in &host_worktree_snapshots {
4527 let guest_snapshot = &worktree_snapshots[id];
4528 assert_eq!(
4529 guest_snapshot.root_name(),
4530 host_snapshot.root_name(),
4531 "guest {} has different root name than the host for worktree {}",
4532 guest_id,
4533 id
4534 );
4535 assert_eq!(
4536 guest_snapshot.entries(false).collect::<Vec<_>>(),
4537 host_snapshot.entries(false).collect::<Vec<_>>(),
4538 "guest {} has different snapshot than the host for worktree {}",
4539 guest_id,
4540 id
4541 );
4542 }
4543
4544 guest_client
4545 .project
4546 .as_ref()
4547 .unwrap()
4548 .read_with(guest_cx, |project, _| {
4549 assert!(
4550 !project.has_buffered_operations(),
4551 "guest {} has buffered operations ",
4552 guest_id,
4553 );
4554 });
4555
4556 for guest_buffer in &guest_client.buffers {
4557 let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4558 let host_buffer = host_project.read_with(&host_cx, |project, _| {
4559 project
4560 .shared_buffer(guest_client.peer_id, buffer_id)
4561 .expect(&format!(
4562 "host doest not have buffer for guest:{}, peer:{}, id:{}",
4563 guest_id, guest_client.peer_id, buffer_id
4564 ))
4565 });
4566 assert_eq!(
4567 guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4568 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4569 "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4570 guest_id,
4571 buffer_id,
4572 host_buffer
4573 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx))
4574 );
4575 }
4576 }
4577 }
4578
4579 struct TestServer {
4580 peer: Arc<Peer>,
4581 app_state: Arc<AppState>,
4582 server: Arc<Server>,
4583 foreground: Rc<executor::Foreground>,
4584 notifications: mpsc::UnboundedReceiver<()>,
4585 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4586 forbid_connections: Arc<AtomicBool>,
4587 _test_db: TestDb,
4588 }
4589
4590 impl TestServer {
4591 async fn start(
4592 foreground: Rc<executor::Foreground>,
4593 background: Arc<executor::Background>,
4594 ) -> Self {
4595 let test_db = TestDb::fake(background);
4596 let app_state = Self::build_app_state(&test_db).await;
4597 let peer = Peer::new();
4598 let notifications = mpsc::unbounded();
4599 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4600 Self {
4601 peer,
4602 app_state,
4603 server,
4604 foreground,
4605 notifications: notifications.1,
4606 connection_killers: Default::default(),
4607 forbid_connections: Default::default(),
4608 _test_db: test_db,
4609 }
4610 }
4611
4612 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4613 let http = FakeHttpClient::with_404_response();
4614 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4615 let client_name = name.to_string();
4616 let mut client = Client::new(http.clone());
4617 let server = self.server.clone();
4618 let connection_killers = self.connection_killers.clone();
4619 let forbid_connections = self.forbid_connections.clone();
4620 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4621
4622 Arc::get_mut(&mut client)
4623 .unwrap()
4624 .override_authenticate(move |cx| {
4625 cx.spawn(|_| async move {
4626 let access_token = "the-token".to_string();
4627 Ok(Credentials {
4628 user_id: user_id.0 as u64,
4629 access_token,
4630 })
4631 })
4632 })
4633 .override_establish_connection(move |credentials, cx| {
4634 assert_eq!(credentials.user_id, user_id.0 as u64);
4635 assert_eq!(credentials.access_token, "the-token");
4636
4637 let server = server.clone();
4638 let connection_killers = connection_killers.clone();
4639 let forbid_connections = forbid_connections.clone();
4640 let client_name = client_name.clone();
4641 let connection_id_tx = connection_id_tx.clone();
4642 cx.spawn(move |cx| async move {
4643 if forbid_connections.load(SeqCst) {
4644 Err(EstablishConnectionError::other(anyhow!(
4645 "server is forbidding connections"
4646 )))
4647 } else {
4648 let (client_conn, server_conn, kill_conn) =
4649 Connection::in_memory(cx.background());
4650 connection_killers.lock().insert(user_id, kill_conn);
4651 cx.background()
4652 .spawn(server.handle_connection(
4653 server_conn,
4654 client_name,
4655 user_id,
4656 Some(connection_id_tx),
4657 cx.background(),
4658 ))
4659 .detach();
4660 Ok(client_conn)
4661 }
4662 })
4663 });
4664
4665 client
4666 .authenticate_and_connect(&cx.to_async())
4667 .await
4668 .unwrap();
4669
4670 Channel::init(&client);
4671 Project::init(&client);
4672
4673 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4674 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4675 let mut authed_user =
4676 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4677 while authed_user.next().await.unwrap().is_none() {}
4678
4679 TestClient {
4680 client,
4681 peer_id,
4682 user_store,
4683 project: Default::default(),
4684 buffers: Default::default(),
4685 }
4686 }
4687
4688 fn disconnect_client(&self, user_id: UserId) {
4689 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4690 let _ = kill_conn.try_send(Some(()));
4691 }
4692 }
4693
4694 fn forbid_connections(&self) {
4695 self.forbid_connections.store(true, SeqCst);
4696 }
4697
4698 fn allow_connections(&self) {
4699 self.forbid_connections.store(false, SeqCst);
4700 }
4701
4702 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4703 let mut config = Config::default();
4704 config.session_secret = "a".repeat(32);
4705 config.database_url = test_db.url.clone();
4706 let github_client = github::AppClient::test();
4707 Arc::new(AppState {
4708 db: test_db.db().clone(),
4709 handlebars: Default::default(),
4710 auth_client: auth::build_client("", ""),
4711 repo_client: github::RepoClient::test(&github_client),
4712 github_client,
4713 config,
4714 })
4715 }
4716
4717 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4718 self.server.store.read()
4719 }
4720
4721 async fn condition<F>(&mut self, mut predicate: F)
4722 where
4723 F: FnMut(&Store) -> bool,
4724 {
4725 async_std::future::timeout(Duration::from_millis(500), async {
4726 while !(predicate)(&*self.server.store.read()) {
4727 self.foreground.start_waiting();
4728 self.notifications.next().await;
4729 self.foreground.finish_waiting();
4730 }
4731 })
4732 .await
4733 .expect("condition timed out");
4734 }
4735 }
4736
4737 impl Drop for TestServer {
4738 fn drop(&mut self) {
4739 self.peer.reset();
4740 }
4741 }
4742
4743 struct TestClient {
4744 client: Arc<Client>,
4745 pub peer_id: PeerId,
4746 pub user_store: ModelHandle<UserStore>,
4747 project: Option<ModelHandle<Project>>,
4748 buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4749 }
4750
4751 impl Deref for TestClient {
4752 type Target = Arc<Client>;
4753
4754 fn deref(&self) -> &Self::Target {
4755 &self.client
4756 }
4757 }
4758
4759 impl TestClient {
4760 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4761 UserId::from_proto(
4762 self.user_store
4763 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4764 )
4765 }
4766
4767 fn simulate_host(
4768 mut self,
4769 project: ModelHandle<Project>,
4770 mut language_server_config: LanguageServerConfig,
4771 operations: Rc<Cell<usize>>,
4772 max_operations: usize,
4773 rng: Arc<Mutex<StdRng>>,
4774 mut cx: TestAppContext,
4775 ) -> impl Future<Output = (Self, TestAppContext)> {
4776 let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4777
4778 // Set up a fake language server.
4779 language_server_config.set_fake_initializer({
4780 let rng = rng.clone();
4781 let files = files.clone();
4782 let project = project.clone();
4783 move |fake_server| {
4784 fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4785 Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4786 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4787 range: lsp::Range::new(
4788 lsp::Position::new(0, 0),
4789 lsp::Position::new(0, 0),
4790 ),
4791 new_text: "the-new-text".to_string(),
4792 })),
4793 ..Default::default()
4794 }]))
4795 });
4796
4797 fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4798 Some(vec![lsp::CodeActionOrCommand::CodeAction(
4799 lsp::CodeAction {
4800 title: "the-code-action".to_string(),
4801 ..Default::default()
4802 },
4803 )])
4804 });
4805
4806 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4807 |params, _| {
4808 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4809 params.position,
4810 params.position,
4811 )))
4812 },
4813 );
4814
4815 fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4816 let files = files.clone();
4817 let rng = rng.clone();
4818 move |_, _| {
4819 let files = files.lock();
4820 let mut rng = rng.lock();
4821 let count = rng.gen_range::<usize, _>(1..3);
4822 let files = (0..count)
4823 .map(|_| files.choose(&mut *rng).unwrap())
4824 .collect::<Vec<_>>();
4825 log::info!("LSP: Returning definitions in files {:?}", &files);
4826 Some(lsp::GotoDefinitionResponse::Array(
4827 files
4828 .into_iter()
4829 .map(|file| lsp::Location {
4830 uri: lsp::Url::from_file_path(file).unwrap(),
4831 range: Default::default(),
4832 })
4833 .collect(),
4834 ))
4835 }
4836 });
4837
4838 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4839 let rng = rng.clone();
4840 let project = project.clone();
4841 move |params, mut cx| {
4842 project.update(&mut cx, |project, cx| {
4843 let path = params
4844 .text_document_position_params
4845 .text_document
4846 .uri
4847 .to_file_path()
4848 .unwrap();
4849 let (worktree, relative_path) =
4850 project.find_local_worktree(&path, cx)?;
4851 let project_path =
4852 ProjectPath::from((worktree.read(cx).id(), relative_path));
4853 let buffer = project.get_open_buffer(&project_path, cx)?.read(cx);
4854
4855 let mut highlights = Vec::new();
4856 let highlight_count = rng.lock().gen_range(1..=5);
4857 let mut prev_end = 0;
4858 for _ in 0..highlight_count {
4859 let range =
4860 buffer.random_byte_range(prev_end, &mut *rng.lock());
4861 let start =
4862 buffer.offset_to_point_utf16(range.start).to_lsp_position();
4863 let end =
4864 buffer.offset_to_point_utf16(range.end).to_lsp_position();
4865 highlights.push(lsp::DocumentHighlight {
4866 range: lsp::Range::new(start, end),
4867 kind: Some(lsp::DocumentHighlightKind::READ),
4868 });
4869 prev_end = range.end;
4870 }
4871 Some(highlights)
4872 })
4873 }
4874 });
4875 }
4876 });
4877
4878 project.update(&mut cx, |project, _| {
4879 project.languages().add(Arc::new(Language::new(
4880 LanguageConfig {
4881 name: "Rust".into(),
4882 path_suffixes: vec!["rs".to_string()],
4883 language_server: Some(language_server_config),
4884 ..Default::default()
4885 },
4886 None,
4887 )));
4888 });
4889
4890 async move {
4891 let fs = project.read_with(&cx, |project, _| project.fs().clone());
4892 while operations.get() < max_operations {
4893 operations.set(operations.get() + 1);
4894
4895 let distribution = rng.lock().gen_range::<usize, _>(0..100);
4896 match distribution {
4897 0..=20 if !files.lock().is_empty() => {
4898 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4899 let mut path = path.as_path();
4900 while let Some(parent_path) = path.parent() {
4901 path = parent_path;
4902 if rng.lock().gen() {
4903 break;
4904 }
4905 }
4906
4907 log::info!("Host: find/create local worktree {:?}", path);
4908 project
4909 .update(&mut cx, |project, cx| {
4910 project.find_or_create_local_worktree(path, false, cx)
4911 })
4912 .await
4913 .unwrap();
4914 }
4915 10..=80 if !files.lock().is_empty() => {
4916 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4917 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4918 let (worktree, path) = project
4919 .update(&mut cx, |project, cx| {
4920 project.find_or_create_local_worktree(
4921 file.clone(),
4922 false,
4923 cx,
4924 )
4925 })
4926 .await
4927 .unwrap();
4928 let project_path =
4929 worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4930 log::info!("Host: opening path {:?}, {:?}", file, project_path);
4931 let buffer = project
4932 .update(&mut cx, |project, cx| {
4933 project.open_buffer(project_path, cx)
4934 })
4935 .await
4936 .unwrap();
4937 self.buffers.insert(buffer.clone());
4938 buffer
4939 } else {
4940 self.buffers
4941 .iter()
4942 .choose(&mut *rng.lock())
4943 .unwrap()
4944 .clone()
4945 };
4946
4947 if rng.lock().gen_bool(0.1) {
4948 cx.update(|cx| {
4949 log::info!(
4950 "Host: dropping buffer {:?}",
4951 buffer.read(cx).file().unwrap().full_path(cx)
4952 );
4953 self.buffers.remove(&buffer);
4954 drop(buffer);
4955 });
4956 } else {
4957 buffer.update(&mut cx, |buffer, cx| {
4958 log::info!(
4959 "Host: updating buffer {:?}",
4960 buffer.file().unwrap().full_path(cx)
4961 );
4962 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4963 });
4964 }
4965 }
4966 _ => loop {
4967 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4968 let mut path = PathBuf::new();
4969 path.push("/");
4970 for _ in 0..path_component_count {
4971 let letter = rng.lock().gen_range(b'a'..=b'z');
4972 path.push(std::str::from_utf8(&[letter]).unwrap());
4973 }
4974 path.set_extension("rs");
4975 let parent_path = path.parent().unwrap();
4976
4977 log::info!("Host: creating file {:?}", path,);
4978
4979 if fs.create_dir(&parent_path).await.is_ok()
4980 && fs.create_file(&path, Default::default()).await.is_ok()
4981 {
4982 files.lock().push(path);
4983 break;
4984 } else {
4985 log::info!("Host: cannot create file");
4986 }
4987 },
4988 }
4989
4990 cx.background().simulate_random_delay().await;
4991 }
4992
4993 log::info!("Host done");
4994
4995 self.project = Some(project);
4996 (self, cx)
4997 }
4998 }
4999
5000 pub async fn simulate_guest(
5001 mut self,
5002 guest_id: usize,
5003 project: ModelHandle<Project>,
5004 operations: Rc<Cell<usize>>,
5005 max_operations: usize,
5006 rng: Arc<Mutex<StdRng>>,
5007 mut cx: TestAppContext,
5008 ) -> (Self, TestAppContext) {
5009 while operations.get() < max_operations {
5010 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
5011 let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
5012 project
5013 .worktrees(&cx)
5014 .filter(|worktree| {
5015 let worktree = worktree.read(cx);
5016 !worktree.is_weak() && worktree.entries(false).any(|e| e.is_file())
5017 })
5018 .choose(&mut *rng.lock())
5019 }) {
5020 worktree
5021 } else {
5022 cx.background().simulate_random_delay().await;
5023 continue;
5024 };
5025
5026 operations.set(operations.get() + 1);
5027 let (worktree_root_name, project_path) =
5028 worktree.read_with(&cx, |worktree, _| {
5029 let entry = worktree
5030 .entries(false)
5031 .filter(|e| e.is_file())
5032 .choose(&mut *rng.lock())
5033 .unwrap();
5034 (
5035 worktree.root_name().to_string(),
5036 (worktree.id(), entry.path.clone()),
5037 )
5038 });
5039 log::info!(
5040 "Guest {}: opening path in worktree {:?} {:?} {:?}",
5041 guest_id,
5042 project_path.0,
5043 worktree_root_name,
5044 project_path.1
5045 );
5046 let buffer = project
5047 .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
5048 .await
5049 .unwrap();
5050 self.buffers.insert(buffer.clone());
5051 buffer
5052 } else {
5053 operations.set(operations.get() + 1);
5054
5055 self.buffers
5056 .iter()
5057 .choose(&mut *rng.lock())
5058 .unwrap()
5059 .clone()
5060 };
5061
5062 let choice = rng.lock().gen_range(0..100);
5063 match choice {
5064 0..=9 => {
5065 cx.update(|cx| {
5066 log::info!(
5067 "Guest {}: dropping buffer {:?}",
5068 guest_id,
5069 buffer.read(cx).file().unwrap().full_path(cx)
5070 );
5071 self.buffers.remove(&buffer);
5072 drop(buffer);
5073 });
5074 }
5075 10..=19 => {
5076 let completions = project.update(&mut cx, |project, cx| {
5077 log::info!(
5078 "Guest {}: requesting completions for buffer {:?}",
5079 guest_id,
5080 buffer.read(cx).file().unwrap().full_path(cx)
5081 );
5082 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5083 project.completions(&buffer, offset, cx)
5084 });
5085 let completions = cx.background().spawn(async move {
5086 completions.await.expect("completions request failed");
5087 });
5088 if rng.lock().gen_bool(0.3) {
5089 log::info!("Guest {}: detaching completions request", guest_id);
5090 completions.detach();
5091 } else {
5092 completions.await;
5093 }
5094 }
5095 20..=29 => {
5096 let code_actions = project.update(&mut cx, |project, cx| {
5097 log::info!(
5098 "Guest {}: requesting code actions for buffer {:?}",
5099 guest_id,
5100 buffer.read(cx).file().unwrap().full_path(cx)
5101 );
5102 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
5103 project.code_actions(&buffer, range, cx)
5104 });
5105 let code_actions = cx.background().spawn(async move {
5106 code_actions.await.expect("code actions request failed");
5107 });
5108 if rng.lock().gen_bool(0.3) {
5109 log::info!("Guest {}: detaching code actions request", guest_id);
5110 code_actions.detach();
5111 } else {
5112 code_actions.await;
5113 }
5114 }
5115 30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
5116 let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
5117 log::info!(
5118 "Guest {}: saving buffer {:?}",
5119 guest_id,
5120 buffer.file().unwrap().full_path(cx)
5121 );
5122 (buffer.version(), buffer.save(cx))
5123 });
5124 let save = cx.spawn(|cx| async move {
5125 let (saved_version, _) = save.await.expect("save request failed");
5126 buffer.read_with(&cx, |buffer, _| {
5127 assert!(buffer.version().observed_all(&saved_version));
5128 assert!(saved_version.observed_all(&requested_version));
5129 });
5130 });
5131 if rng.lock().gen_bool(0.3) {
5132 log::info!("Guest {}: detaching save request", guest_id);
5133 save.detach();
5134 } else {
5135 save.await;
5136 }
5137 }
5138 40..=45 => {
5139 let prepare_rename = project.update(&mut cx, |project, cx| {
5140 log::info!(
5141 "Guest {}: preparing rename for buffer {:?}",
5142 guest_id,
5143 buffer.read(cx).file().unwrap().full_path(cx)
5144 );
5145 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5146 project.prepare_rename(buffer, offset, cx)
5147 });
5148 let prepare_rename = cx.background().spawn(async move {
5149 prepare_rename.await.expect("prepare rename request failed");
5150 });
5151 if rng.lock().gen_bool(0.3) {
5152 log::info!("Guest {}: detaching prepare rename request", guest_id);
5153 prepare_rename.detach();
5154 } else {
5155 prepare_rename.await;
5156 }
5157 }
5158 46..=49 => {
5159 let definitions = project.update(&mut cx, |project, cx| {
5160 log::info!(
5161 "Guest {}: requesting defintions for buffer {:?}",
5162 guest_id,
5163 buffer.read(cx).file().unwrap().full_path(cx)
5164 );
5165 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5166 project.definition(&buffer, offset, cx)
5167 });
5168 let definitions = cx.background().spawn(async move {
5169 definitions.await.expect("definitions request failed")
5170 });
5171 if rng.lock().gen_bool(0.3) {
5172 log::info!("Guest {}: detaching definitions request", guest_id);
5173 definitions.detach();
5174 } else {
5175 self.buffers
5176 .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5177 }
5178 }
5179 50..=55 => {
5180 let highlights = project.update(&mut cx, |project, cx| {
5181 log::info!(
5182 "Guest {}: requesting highlights for buffer {:?}",
5183 guest_id,
5184 buffer.read(cx).file().unwrap().full_path(cx)
5185 );
5186 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5187 project.document_highlights(&buffer, offset, cx)
5188 });
5189 let highlights = cx.background().spawn(async move {
5190 highlights.await.expect("highlights request failed");
5191 });
5192 if rng.lock().gen_bool(0.3) {
5193 log::info!("Guest {}: detaching highlights request", guest_id);
5194 highlights.detach();
5195 } else {
5196 highlights.await;
5197 }
5198 }
5199 _ => {
5200 buffer.update(&mut cx, |buffer, cx| {
5201 log::info!(
5202 "Guest {}: updating buffer {:?}",
5203 guest_id,
5204 buffer.file().unwrap().full_path(cx)
5205 );
5206 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5207 });
5208 }
5209 }
5210 cx.background().simulate_random_delay().await;
5211 }
5212
5213 log::info!("Guest {} done", guest_id);
5214
5215 self.project = Some(project);
5216 (self, cx)
5217 }
5218 }
5219
5220 impl Executor for Arc<gpui::executor::Background> {
5221 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5222 self.spawn(future).detach();
5223 }
5224 }
5225
5226 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5227 channel
5228 .messages()
5229 .cursor::<()>()
5230 .map(|m| {
5231 (
5232 m.sender.github_login.clone(),
5233 m.body.clone(),
5234 m.is_pending(),
5235 )
5236 })
5237 .collect()
5238 }
5239
5240 struct EmptyView;
5241
5242 impl gpui::Entity for EmptyView {
5243 type Event = ();
5244 }
5245
5246 impl gpui::View for EmptyView {
5247 fn ui_name() -> &'static str {
5248 "empty view"
5249 }
5250
5251 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5252 gpui::Element::boxed(gpui::elements::Empty)
5253 }
5254 }
5255}