1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use rpc::{
15 proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
16 Connection, ConnectionId, Peer, TypedEnvelope,
17};
18use sha1::{Digest as _, Sha1};
19use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
20use store::{Store, Worktree};
21use surf::StatusCode;
22use tide::log;
23use tide::{
24 http::headers::{HeaderName, CONNECTION, UPGRADE},
25 Request, Response,
26};
27use time::OffsetDateTime;
28
29type MessageHandler = Box<
30 dyn Send
31 + Sync
32 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
33>;
34
35pub struct Server {
36 peer: Arc<Peer>,
37 store: RwLock<Store>,
38 app_state: Arc<AppState>,
39 handlers: HashMap<TypeId, MessageHandler>,
40 notifications: Option<mpsc::UnboundedSender<()>>,
41}
42
43pub trait Executor {
44 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
45}
46
47pub struct RealExecutor;
48
49const MESSAGE_COUNT_PER_PAGE: usize = 100;
50const MAX_MESSAGE_LEN: usize = 1024;
51
52impl Server {
53 pub fn new(
54 app_state: Arc<AppState>,
55 peer: Arc<Peer>,
56 notifications: Option<mpsc::UnboundedSender<()>>,
57 ) -> Arc<Self> {
58 let mut server = Self {
59 peer,
60 app_state,
61 store: Default::default(),
62 handlers: Default::default(),
63 notifications,
64 };
65
66 server
67 .add_request_handler(Server::ping)
68 .add_request_handler(Server::register_project)
69 .add_message_handler(Server::unregister_project)
70 .add_request_handler(Server::share_project)
71 .add_message_handler(Server::unshare_project)
72 .add_request_handler(Server::join_project)
73 .add_message_handler(Server::leave_project)
74 .add_request_handler(Server::register_worktree)
75 .add_message_handler(Server::unregister_worktree)
76 .add_request_handler(Server::share_worktree)
77 .add_request_handler(Server::update_worktree)
78 .add_message_handler(Server::update_diagnostic_summary)
79 .add_message_handler(Server::disk_based_diagnostics_updating)
80 .add_message_handler(Server::disk_based_diagnostics_updated)
81 .add_request_handler(Server::get_definition)
82 .add_request_handler(Server::get_references)
83 .add_request_handler(Server::get_project_symbols)
84 .add_request_handler(Server::open_buffer_for_symbol)
85 .add_request_handler(Server::open_buffer)
86 .add_message_handler(Server::close_buffer)
87 .add_request_handler(Server::update_buffer)
88 .add_message_handler(Server::update_buffer_file)
89 .add_message_handler(Server::buffer_reloaded)
90 .add_message_handler(Server::buffer_saved)
91 .add_request_handler(Server::save_buffer)
92 .add_request_handler(Server::format_buffers)
93 .add_request_handler(Server::get_completions)
94 .add_request_handler(Server::apply_additional_edits_for_completion)
95 .add_request_handler(Server::get_code_actions)
96 .add_request_handler(Server::apply_code_action)
97 .add_request_handler(Server::prepare_rename)
98 .add_request_handler(Server::perform_rename)
99 .add_request_handler(Server::get_channels)
100 .add_request_handler(Server::get_users)
101 .add_request_handler(Server::join_channel)
102 .add_message_handler(Server::leave_channel)
103 .add_request_handler(Server::send_channel_message)
104 .add_request_handler(Server::get_channel_messages);
105
106 Arc::new(server)
107 }
108
109 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
110 where
111 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
112 Fut: 'static + Send + Future<Output = tide::Result<()>>,
113 M: EnvelopedMessage,
114 {
115 let prev_handler = self.handlers.insert(
116 TypeId::of::<M>(),
117 Box::new(move |server, envelope| {
118 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
119 (handler)(server, *envelope).boxed()
120 }),
121 );
122 if prev_handler.is_some() {
123 panic!("registered a handler for the same message twice");
124 }
125 self
126 }
127
128 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
129 where
130 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
131 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
132 M: RequestMessage,
133 {
134 self.add_message_handler(move |server, envelope| {
135 let receipt = envelope.receipt();
136 let response = (handler)(server.clone(), envelope);
137 async move {
138 match response.await {
139 Ok(response) => {
140 server.peer.respond(receipt, response)?;
141 Ok(())
142 }
143 Err(error) => {
144 server.peer.respond_with_error(
145 receipt,
146 proto::Error {
147 message: error.to_string(),
148 },
149 )?;
150 Err(error)
151 }
152 }
153 }
154 })
155 }
156
157 pub fn handle_connection<E: Executor>(
158 self: &Arc<Self>,
159 connection: Connection,
160 addr: String,
161 user_id: UserId,
162 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
163 executor: E,
164 ) -> impl Future<Output = ()> {
165 let mut this = self.clone();
166 async move {
167 let (connection_id, handle_io, mut incoming_rx) =
168 this.peer.add_connection(connection).await;
169
170 if let Some(send_connection_id) = send_connection_id.as_mut() {
171 let _ = send_connection_id.send(connection_id).await;
172 }
173
174 this.state_mut().add_connection(connection_id, user_id);
175 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
176 log::error!("error updating contacts for {:?}: {}", user_id, err);
177 }
178
179 let handle_io = handle_io.fuse();
180 futures::pin_mut!(handle_io);
181 loop {
182 let next_message = incoming_rx.next().fuse();
183 futures::pin_mut!(next_message);
184 futures::select_biased! {
185 result = handle_io => {
186 if let Err(err) = result {
187 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
188 }
189 break;
190 }
191 message = next_message => {
192 if let Some(message) = message {
193 let start_time = Instant::now();
194 let type_name = message.payload_type_name();
195 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
196 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
197 let notifications = this.notifications.clone();
198 let is_background = message.is_background();
199 let handle_message = (handler)(this.clone(), message);
200 let handle_message = async move {
201 if let Err(err) = handle_message.await {
202 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
203 } else {
204 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
205 }
206 if let Some(mut notifications) = notifications {
207 let _ = notifications.send(()).await;
208 }
209 };
210 if is_background {
211 executor.spawn_detached(handle_message);
212 } else {
213 handle_message.await;
214 }
215 } else {
216 log::warn!("unhandled message: {}", type_name);
217 }
218 } else {
219 log::info!("rpc connection closed {:?}", addr);
220 break;
221 }
222 }
223 }
224 }
225
226 if let Err(err) = this.sign_out(connection_id).await {
227 log::error!("error signing out connection {:?} - {:?}", addr, err);
228 }
229 }
230 }
231
232 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
233 self.peer.disconnect(connection_id);
234 let removed_connection = self.state_mut().remove_connection(connection_id)?;
235
236 for (project_id, project) in removed_connection.hosted_projects {
237 if let Some(share) = project.share {
238 broadcast(
239 connection_id,
240 share.guests.keys().copied().collect(),
241 |conn_id| {
242 self.peer
243 .send(conn_id, proto::UnshareProject { project_id })
244 },
245 )?;
246 }
247 }
248
249 for (project_id, peer_ids) in removed_connection.guest_project_ids {
250 broadcast(connection_id, peer_ids, |conn_id| {
251 self.peer.send(
252 conn_id,
253 proto::RemoveProjectCollaborator {
254 project_id,
255 peer_id: connection_id.0,
256 },
257 )
258 })?;
259 }
260
261 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
262 Ok(())
263 }
264
265 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
266 Ok(proto::Ack {})
267 }
268
269 async fn register_project(
270 mut self: Arc<Server>,
271 request: TypedEnvelope<proto::RegisterProject>,
272 ) -> tide::Result<proto::RegisterProjectResponse> {
273 let project_id = {
274 let mut state = self.state_mut();
275 let user_id = state.user_id_for_connection(request.sender_id)?;
276 state.register_project(request.sender_id, user_id)
277 };
278 Ok(proto::RegisterProjectResponse { project_id })
279 }
280
281 async fn unregister_project(
282 mut self: Arc<Server>,
283 request: TypedEnvelope<proto::UnregisterProject>,
284 ) -> tide::Result<()> {
285 let project = self
286 .state_mut()
287 .unregister_project(request.payload.project_id, request.sender_id)?;
288 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
289 Ok(())
290 }
291
292 async fn share_project(
293 mut self: Arc<Server>,
294 request: TypedEnvelope<proto::ShareProject>,
295 ) -> tide::Result<proto::Ack> {
296 self.state_mut()
297 .share_project(request.payload.project_id, request.sender_id);
298 Ok(proto::Ack {})
299 }
300
301 async fn unshare_project(
302 mut self: Arc<Server>,
303 request: TypedEnvelope<proto::UnshareProject>,
304 ) -> tide::Result<()> {
305 let project_id = request.payload.project_id;
306 let project = self
307 .state_mut()
308 .unshare_project(project_id, request.sender_id)?;
309
310 broadcast(request.sender_id, project.connection_ids, |conn_id| {
311 self.peer
312 .send(conn_id, proto::UnshareProject { project_id })
313 })?;
314 self.update_contacts_for_users(&project.authorized_user_ids)?;
315 Ok(())
316 }
317
318 async fn join_project(
319 mut self: Arc<Server>,
320 request: TypedEnvelope<proto::JoinProject>,
321 ) -> tide::Result<proto::JoinProjectResponse> {
322 let project_id = request.payload.project_id;
323
324 let user_id = self.state().user_id_for_connection(request.sender_id)?;
325 let (response, connection_ids, contact_user_ids) = self
326 .state_mut()
327 .join_project(request.sender_id, user_id, project_id)
328 .and_then(|joined| {
329 let share = joined.project.share()?;
330 let peer_count = share.guests.len();
331 let mut collaborators = Vec::with_capacity(peer_count);
332 collaborators.push(proto::Collaborator {
333 peer_id: joined.project.host_connection_id.0,
334 replica_id: 0,
335 user_id: joined.project.host_user_id.to_proto(),
336 });
337 let worktrees = joined
338 .project
339 .worktrees
340 .iter()
341 .filter_map(|(id, worktree)| {
342 worktree.share.as_ref().map(|share| proto::Worktree {
343 id: *id,
344 root_name: worktree.root_name.clone(),
345 entries: share.entries.values().cloned().collect(),
346 diagnostic_summaries: share
347 .diagnostic_summaries
348 .values()
349 .cloned()
350 .collect(),
351 weak: worktree.weak,
352 next_update_id: share.next_update_id as u64,
353 })
354 })
355 .collect();
356 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
357 if *peer_conn_id != request.sender_id {
358 collaborators.push(proto::Collaborator {
359 peer_id: peer_conn_id.0,
360 replica_id: *peer_replica_id as u32,
361 user_id: peer_user_id.to_proto(),
362 });
363 }
364 }
365 let response = proto::JoinProjectResponse {
366 worktrees,
367 replica_id: joined.replica_id as u32,
368 collaborators,
369 };
370 let connection_ids = joined.project.connection_ids();
371 let contact_user_ids = joined.project.authorized_user_ids();
372 Ok((response, connection_ids, contact_user_ids))
373 })?;
374
375 broadcast(request.sender_id, connection_ids, |conn_id| {
376 self.peer.send(
377 conn_id,
378 proto::AddProjectCollaborator {
379 project_id,
380 collaborator: Some(proto::Collaborator {
381 peer_id: request.sender_id.0,
382 replica_id: response.replica_id,
383 user_id: user_id.to_proto(),
384 }),
385 },
386 )
387 })?;
388 self.update_contacts_for_users(&contact_user_ids)?;
389 Ok(response)
390 }
391
392 async fn leave_project(
393 mut self: Arc<Server>,
394 request: TypedEnvelope<proto::LeaveProject>,
395 ) -> tide::Result<()> {
396 let sender_id = request.sender_id;
397 let project_id = request.payload.project_id;
398 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
399
400 broadcast(sender_id, worktree.connection_ids, |conn_id| {
401 self.peer.send(
402 conn_id,
403 proto::RemoveProjectCollaborator {
404 project_id,
405 peer_id: sender_id.0,
406 },
407 )
408 })?;
409 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
410
411 Ok(())
412 }
413
414 async fn register_worktree(
415 mut self: Arc<Server>,
416 request: TypedEnvelope<proto::RegisterWorktree>,
417 ) -> tide::Result<proto::Ack> {
418 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
419
420 let mut contact_user_ids = HashSet::default();
421 contact_user_ids.insert(host_user_id);
422 for github_login in request.payload.authorized_logins {
423 let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
424 contact_user_ids.insert(contact_user_id);
425 }
426
427 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
428 self.state_mut().register_worktree(
429 request.payload.project_id,
430 request.payload.worktree_id,
431 request.sender_id,
432 Worktree {
433 authorized_user_ids: contact_user_ids.clone(),
434 root_name: request.payload.root_name,
435 share: None,
436 weak: false,
437 },
438 )?;
439 self.update_contacts_for_users(&contact_user_ids)?;
440 Ok(proto::Ack {})
441 }
442
443 async fn unregister_worktree(
444 mut self: Arc<Server>,
445 request: TypedEnvelope<proto::UnregisterWorktree>,
446 ) -> tide::Result<()> {
447 let project_id = request.payload.project_id;
448 let worktree_id = request.payload.worktree_id;
449 let (worktree, guest_connection_ids) =
450 self.state_mut()
451 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
452 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
453 self.peer.send(
454 conn_id,
455 proto::UnregisterWorktree {
456 project_id,
457 worktree_id,
458 },
459 )
460 })?;
461 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
462 Ok(())
463 }
464
465 async fn share_worktree(
466 mut self: Arc<Server>,
467 mut request: TypedEnvelope<proto::ShareWorktree>,
468 ) -> tide::Result<proto::Ack> {
469 let worktree = request
470 .payload
471 .worktree
472 .as_mut()
473 .ok_or_else(|| anyhow!("missing worktree"))?;
474 let entries = worktree
475 .entries
476 .iter()
477 .map(|entry| (entry.id, entry.clone()))
478 .collect();
479 let diagnostic_summaries = worktree
480 .diagnostic_summaries
481 .iter()
482 .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
483 .collect();
484
485 let shared_worktree = self.state_mut().share_worktree(
486 request.payload.project_id,
487 worktree.id,
488 request.sender_id,
489 entries,
490 diagnostic_summaries,
491 worktree.next_update_id,
492 )?;
493
494 broadcast(
495 request.sender_id,
496 shared_worktree.connection_ids,
497 |connection_id| {
498 self.peer
499 .forward_send(request.sender_id, connection_id, request.payload.clone())
500 },
501 )?;
502 self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
503
504 Ok(proto::Ack {})
505 }
506
507 async fn update_worktree(
508 mut self: Arc<Server>,
509 request: TypedEnvelope<proto::UpdateWorktree>,
510 ) -> tide::Result<proto::Ack> {
511 let connection_ids = self.state_mut().update_worktree(
512 request.sender_id,
513 request.payload.project_id,
514 request.payload.worktree_id,
515 request.payload.id,
516 &request.payload.removed_entries,
517 &request.payload.updated_entries,
518 )?;
519
520 broadcast(request.sender_id, connection_ids, |connection_id| {
521 self.peer
522 .forward_send(request.sender_id, connection_id, request.payload.clone())
523 })?;
524
525 Ok(proto::Ack {})
526 }
527
528 async fn update_diagnostic_summary(
529 mut self: Arc<Server>,
530 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
531 ) -> tide::Result<()> {
532 let summary = request
533 .payload
534 .summary
535 .clone()
536 .ok_or_else(|| anyhow!("invalid summary"))?;
537 let receiver_ids = self.state_mut().update_diagnostic_summary(
538 request.payload.project_id,
539 request.payload.worktree_id,
540 request.sender_id,
541 summary,
542 )?;
543
544 broadcast(request.sender_id, receiver_ids, |connection_id| {
545 self.peer
546 .forward_send(request.sender_id, connection_id, request.payload.clone())
547 })?;
548 Ok(())
549 }
550
551 async fn disk_based_diagnostics_updating(
552 self: Arc<Server>,
553 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
554 ) -> tide::Result<()> {
555 let receiver_ids = self
556 .state()
557 .project_connection_ids(request.payload.project_id, request.sender_id)?;
558 broadcast(request.sender_id, receiver_ids, |connection_id| {
559 self.peer
560 .forward_send(request.sender_id, connection_id, request.payload.clone())
561 })?;
562 Ok(())
563 }
564
565 async fn disk_based_diagnostics_updated(
566 self: Arc<Server>,
567 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
568 ) -> tide::Result<()> {
569 let receiver_ids = self
570 .state()
571 .project_connection_ids(request.payload.project_id, request.sender_id)?;
572 broadcast(request.sender_id, receiver_ids, |connection_id| {
573 self.peer
574 .forward_send(request.sender_id, connection_id, request.payload.clone())
575 })?;
576 Ok(())
577 }
578
579 async fn get_definition(
580 self: Arc<Server>,
581 request: TypedEnvelope<proto::GetDefinition>,
582 ) -> tide::Result<proto::GetDefinitionResponse> {
583 let host_connection_id = self
584 .state()
585 .read_project(request.payload.project_id, request.sender_id)?
586 .host_connection_id;
587 Ok(self
588 .peer
589 .forward_request(request.sender_id, host_connection_id, request.payload)
590 .await?)
591 }
592
593 async fn get_references(
594 self: Arc<Server>,
595 request: TypedEnvelope<proto::GetReferences>,
596 ) -> tide::Result<proto::GetReferencesResponse> {
597 let host_connection_id = self
598 .state()
599 .read_project(request.payload.project_id, request.sender_id)?
600 .host_connection_id;
601 Ok(self
602 .peer
603 .forward_request(request.sender_id, host_connection_id, request.payload)
604 .await?)
605 }
606
607 async fn get_project_symbols(
608 self: Arc<Server>,
609 request: TypedEnvelope<proto::GetProjectSymbols>,
610 ) -> tide::Result<proto::GetProjectSymbolsResponse> {
611 let host_connection_id = self
612 .state()
613 .read_project(request.payload.project_id, request.sender_id)?
614 .host_connection_id;
615 Ok(self
616 .peer
617 .forward_request(request.sender_id, host_connection_id, request.payload)
618 .await?)
619 }
620
621 async fn open_buffer_for_symbol(
622 self: Arc<Server>,
623 request: TypedEnvelope<proto::OpenBufferForSymbol>,
624 ) -> tide::Result<proto::OpenBufferForSymbolResponse> {
625 let host_connection_id = self
626 .state()
627 .read_project(request.payload.project_id, request.sender_id)?
628 .host_connection_id;
629 Ok(self
630 .peer
631 .forward_request(request.sender_id, host_connection_id, request.payload)
632 .await?)
633 }
634
635 async fn open_buffer(
636 self: Arc<Server>,
637 request: TypedEnvelope<proto::OpenBuffer>,
638 ) -> tide::Result<proto::OpenBufferResponse> {
639 let host_connection_id = self
640 .state()
641 .read_project(request.payload.project_id, request.sender_id)?
642 .host_connection_id;
643 Ok(self
644 .peer
645 .forward_request(request.sender_id, host_connection_id, request.payload)
646 .await?)
647 }
648
649 async fn close_buffer(
650 self: Arc<Server>,
651 request: TypedEnvelope<proto::CloseBuffer>,
652 ) -> tide::Result<()> {
653 let host_connection_id = self
654 .state()
655 .read_project(request.payload.project_id, request.sender_id)?
656 .host_connection_id;
657 self.peer
658 .forward_send(request.sender_id, host_connection_id, request.payload)?;
659 Ok(())
660 }
661
662 async fn save_buffer(
663 self: Arc<Server>,
664 request: TypedEnvelope<proto::SaveBuffer>,
665 ) -> tide::Result<proto::BufferSaved> {
666 let host;
667 let mut guests;
668 {
669 let state = self.state();
670 let project = state.read_project(request.payload.project_id, request.sender_id)?;
671 host = project.host_connection_id;
672 guests = project.guest_connection_ids()
673 }
674
675 let response = self
676 .peer
677 .forward_request(request.sender_id, host, request.payload.clone())
678 .await?;
679
680 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
681 broadcast(host, guests, |conn_id| {
682 self.peer.forward_send(host, conn_id, response.clone())
683 })?;
684
685 Ok(response)
686 }
687
688 async fn format_buffers(
689 self: Arc<Server>,
690 request: TypedEnvelope<proto::FormatBuffers>,
691 ) -> tide::Result<proto::FormatBuffersResponse> {
692 let host = self
693 .state()
694 .read_project(request.payload.project_id, request.sender_id)?
695 .host_connection_id;
696 Ok(self
697 .peer
698 .forward_request(request.sender_id, host, request.payload.clone())
699 .await?)
700 }
701
702 async fn get_completions(
703 self: Arc<Server>,
704 request: TypedEnvelope<proto::GetCompletions>,
705 ) -> tide::Result<proto::GetCompletionsResponse> {
706 let host = self
707 .state()
708 .read_project(request.payload.project_id, request.sender_id)?
709 .host_connection_id;
710 Ok(self
711 .peer
712 .forward_request(request.sender_id, host, request.payload.clone())
713 .await?)
714 }
715
716 async fn apply_additional_edits_for_completion(
717 self: Arc<Server>,
718 request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
719 ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
720 let host = self
721 .state()
722 .read_project(request.payload.project_id, request.sender_id)?
723 .host_connection_id;
724 Ok(self
725 .peer
726 .forward_request(request.sender_id, host, request.payload.clone())
727 .await?)
728 }
729
730 async fn get_code_actions(
731 self: Arc<Server>,
732 request: TypedEnvelope<proto::GetCodeActions>,
733 ) -> tide::Result<proto::GetCodeActionsResponse> {
734 let host = self
735 .state()
736 .read_project(request.payload.project_id, request.sender_id)?
737 .host_connection_id;
738 Ok(self
739 .peer
740 .forward_request(request.sender_id, host, request.payload.clone())
741 .await?)
742 }
743
744 async fn apply_code_action(
745 self: Arc<Server>,
746 request: TypedEnvelope<proto::ApplyCodeAction>,
747 ) -> tide::Result<proto::ApplyCodeActionResponse> {
748 let host = self
749 .state()
750 .read_project(request.payload.project_id, request.sender_id)?
751 .host_connection_id;
752 Ok(self
753 .peer
754 .forward_request(request.sender_id, host, request.payload.clone())
755 .await?)
756 }
757
758 async fn prepare_rename(
759 self: Arc<Server>,
760 request: TypedEnvelope<proto::PrepareRename>,
761 ) -> tide::Result<proto::PrepareRenameResponse> {
762 let host = self
763 .state()
764 .read_project(request.payload.project_id, request.sender_id)?
765 .host_connection_id;
766 Ok(self
767 .peer
768 .forward_request(request.sender_id, host, request.payload.clone())
769 .await?)
770 }
771
772 async fn perform_rename(
773 self: Arc<Server>,
774 request: TypedEnvelope<proto::PerformRename>,
775 ) -> tide::Result<proto::PerformRenameResponse> {
776 let host = self
777 .state()
778 .read_project(request.payload.project_id, request.sender_id)?
779 .host_connection_id;
780 Ok(self
781 .peer
782 .forward_request(request.sender_id, host, request.payload.clone())
783 .await?)
784 }
785
786 async fn update_buffer(
787 self: Arc<Server>,
788 request: TypedEnvelope<proto::UpdateBuffer>,
789 ) -> tide::Result<proto::Ack> {
790 let receiver_ids = self
791 .state()
792 .project_connection_ids(request.payload.project_id, request.sender_id)?;
793 broadcast(request.sender_id, receiver_ids, |connection_id| {
794 self.peer
795 .forward_send(request.sender_id, connection_id, request.payload.clone())
796 })?;
797 Ok(proto::Ack {})
798 }
799
800 async fn update_buffer_file(
801 self: Arc<Server>,
802 request: TypedEnvelope<proto::UpdateBufferFile>,
803 ) -> tide::Result<()> {
804 let receiver_ids = self
805 .state()
806 .project_connection_ids(request.payload.project_id, request.sender_id)?;
807 broadcast(request.sender_id, receiver_ids, |connection_id| {
808 self.peer
809 .forward_send(request.sender_id, connection_id, request.payload.clone())
810 })?;
811 Ok(())
812 }
813
814 async fn buffer_reloaded(
815 self: Arc<Server>,
816 request: TypedEnvelope<proto::BufferReloaded>,
817 ) -> tide::Result<()> {
818 let receiver_ids = self
819 .state()
820 .project_connection_ids(request.payload.project_id, request.sender_id)?;
821 broadcast(request.sender_id, receiver_ids, |connection_id| {
822 self.peer
823 .forward_send(request.sender_id, connection_id, request.payload.clone())
824 })?;
825 Ok(())
826 }
827
828 async fn buffer_saved(
829 self: Arc<Server>,
830 request: TypedEnvelope<proto::BufferSaved>,
831 ) -> tide::Result<()> {
832 let receiver_ids = self
833 .state()
834 .project_connection_ids(request.payload.project_id, request.sender_id)?;
835 broadcast(request.sender_id, receiver_ids, |connection_id| {
836 self.peer
837 .forward_send(request.sender_id, connection_id, request.payload.clone())
838 })?;
839 Ok(())
840 }
841
842 async fn get_channels(
843 self: Arc<Server>,
844 request: TypedEnvelope<proto::GetChannels>,
845 ) -> tide::Result<proto::GetChannelsResponse> {
846 let user_id = self.state().user_id_for_connection(request.sender_id)?;
847 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
848 Ok(proto::GetChannelsResponse {
849 channels: channels
850 .into_iter()
851 .map(|chan| proto::Channel {
852 id: chan.id.to_proto(),
853 name: chan.name,
854 })
855 .collect(),
856 })
857 }
858
859 async fn get_users(
860 self: Arc<Server>,
861 request: TypedEnvelope<proto::GetUsers>,
862 ) -> tide::Result<proto::GetUsersResponse> {
863 let user_ids = request
864 .payload
865 .user_ids
866 .into_iter()
867 .map(UserId::from_proto)
868 .collect();
869 let users = self
870 .app_state
871 .db
872 .get_users_by_ids(user_ids)
873 .await?
874 .into_iter()
875 .map(|user| proto::User {
876 id: user.id.to_proto(),
877 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
878 github_login: user.github_login,
879 })
880 .collect();
881 Ok(proto::GetUsersResponse { users })
882 }
883
884 fn update_contacts_for_users<'a>(
885 self: &Arc<Server>,
886 user_ids: impl IntoIterator<Item = &'a UserId>,
887 ) -> anyhow::Result<()> {
888 let mut result = Ok(());
889 let state = self.state();
890 for user_id in user_ids {
891 let contacts = state.contacts_for_user(*user_id);
892 for connection_id in state.connection_ids_for_user(*user_id) {
893 if let Err(error) = self.peer.send(
894 connection_id,
895 proto::UpdateContacts {
896 contacts: contacts.clone(),
897 },
898 ) {
899 result = Err(error);
900 }
901 }
902 }
903 result
904 }
905
906 async fn join_channel(
907 mut self: Arc<Self>,
908 request: TypedEnvelope<proto::JoinChannel>,
909 ) -> tide::Result<proto::JoinChannelResponse> {
910 let user_id = self.state().user_id_for_connection(request.sender_id)?;
911 let channel_id = ChannelId::from_proto(request.payload.channel_id);
912 if !self
913 .app_state
914 .db
915 .can_user_access_channel(user_id, channel_id)
916 .await?
917 {
918 Err(anyhow!("access denied"))?;
919 }
920
921 self.state_mut().join_channel(request.sender_id, channel_id);
922 let messages = self
923 .app_state
924 .db
925 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
926 .await?
927 .into_iter()
928 .map(|msg| proto::ChannelMessage {
929 id: msg.id.to_proto(),
930 body: msg.body,
931 timestamp: msg.sent_at.unix_timestamp() as u64,
932 sender_id: msg.sender_id.to_proto(),
933 nonce: Some(msg.nonce.as_u128().into()),
934 })
935 .collect::<Vec<_>>();
936 Ok(proto::JoinChannelResponse {
937 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
938 messages,
939 })
940 }
941
942 async fn leave_channel(
943 mut self: Arc<Self>,
944 request: TypedEnvelope<proto::LeaveChannel>,
945 ) -> tide::Result<()> {
946 let user_id = self.state().user_id_for_connection(request.sender_id)?;
947 let channel_id = ChannelId::from_proto(request.payload.channel_id);
948 if !self
949 .app_state
950 .db
951 .can_user_access_channel(user_id, channel_id)
952 .await?
953 {
954 Err(anyhow!("access denied"))?;
955 }
956
957 self.state_mut()
958 .leave_channel(request.sender_id, channel_id);
959
960 Ok(())
961 }
962
963 async fn send_channel_message(
964 self: Arc<Self>,
965 request: TypedEnvelope<proto::SendChannelMessage>,
966 ) -> tide::Result<proto::SendChannelMessageResponse> {
967 let channel_id = ChannelId::from_proto(request.payload.channel_id);
968 let user_id;
969 let connection_ids;
970 {
971 let state = self.state();
972 user_id = state.user_id_for_connection(request.sender_id)?;
973 connection_ids = state.channel_connection_ids(channel_id)?;
974 }
975
976 // Validate the message body.
977 let body = request.payload.body.trim().to_string();
978 if body.len() > MAX_MESSAGE_LEN {
979 return Err(anyhow!("message is too long"))?;
980 }
981 if body.is_empty() {
982 return Err(anyhow!("message can't be blank"))?;
983 }
984
985 let timestamp = OffsetDateTime::now_utc();
986 let nonce = request
987 .payload
988 .nonce
989 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
990
991 let message_id = self
992 .app_state
993 .db
994 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
995 .await?
996 .to_proto();
997 let message = proto::ChannelMessage {
998 sender_id: user_id.to_proto(),
999 id: message_id,
1000 body,
1001 timestamp: timestamp.unix_timestamp() as u64,
1002 nonce: Some(nonce),
1003 };
1004 broadcast(request.sender_id, connection_ids, |conn_id| {
1005 self.peer.send(
1006 conn_id,
1007 proto::ChannelMessageSent {
1008 channel_id: channel_id.to_proto(),
1009 message: Some(message.clone()),
1010 },
1011 )
1012 })?;
1013 Ok(proto::SendChannelMessageResponse {
1014 message: Some(message),
1015 })
1016 }
1017
1018 async fn get_channel_messages(
1019 self: Arc<Self>,
1020 request: TypedEnvelope<proto::GetChannelMessages>,
1021 ) -> tide::Result<proto::GetChannelMessagesResponse> {
1022 let user_id = self.state().user_id_for_connection(request.sender_id)?;
1023 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1024 if !self
1025 .app_state
1026 .db
1027 .can_user_access_channel(user_id, channel_id)
1028 .await?
1029 {
1030 Err(anyhow!("access denied"))?;
1031 }
1032
1033 let messages = self
1034 .app_state
1035 .db
1036 .get_channel_messages(
1037 channel_id,
1038 MESSAGE_COUNT_PER_PAGE,
1039 Some(MessageId::from_proto(request.payload.before_message_id)),
1040 )
1041 .await?
1042 .into_iter()
1043 .map(|msg| proto::ChannelMessage {
1044 id: msg.id.to_proto(),
1045 body: msg.body,
1046 timestamp: msg.sent_at.unix_timestamp() as u64,
1047 sender_id: msg.sender_id.to_proto(),
1048 nonce: Some(msg.nonce.as_u128().into()),
1049 })
1050 .collect::<Vec<_>>();
1051
1052 Ok(proto::GetChannelMessagesResponse {
1053 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1054 messages,
1055 })
1056 }
1057
1058 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1059 self.store.read()
1060 }
1061
1062 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1063 self.store.write()
1064 }
1065}
1066
1067impl Executor for RealExecutor {
1068 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1069 task::spawn(future);
1070 }
1071}
1072
1073fn broadcast<F>(
1074 sender_id: ConnectionId,
1075 receiver_ids: Vec<ConnectionId>,
1076 mut f: F,
1077) -> anyhow::Result<()>
1078where
1079 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1080{
1081 let mut result = Ok(());
1082 for receiver_id in receiver_ids {
1083 if receiver_id != sender_id {
1084 if let Err(error) = f(receiver_id) {
1085 if result.is_ok() {
1086 result = Err(error);
1087 }
1088 }
1089 }
1090 }
1091 result
1092}
1093
1094pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1095 let server = Server::new(app.state().clone(), rpc.clone(), None);
1096 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1097 let server = server.clone();
1098 async move {
1099 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1100
1101 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1102 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1103 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1104 let client_protocol_version: Option<u32> = request
1105 .header("X-Zed-Protocol-Version")
1106 .and_then(|v| v.as_str().parse().ok());
1107
1108 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1109 return Ok(Response::new(StatusCode::UpgradeRequired));
1110 }
1111
1112 let header = match request.header("Sec-Websocket-Key") {
1113 Some(h) => h.as_str(),
1114 None => return Err(anyhow!("expected sec-websocket-key"))?,
1115 };
1116
1117 let user_id = process_auth_header(&request).await?;
1118
1119 let mut response = Response::new(StatusCode::SwitchingProtocols);
1120 response.insert_header(UPGRADE, "websocket");
1121 response.insert_header(CONNECTION, "Upgrade");
1122 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1123 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1124 response.insert_header("Sec-Websocket-Version", "13");
1125
1126 let http_res: &mut tide::http::Response = response.as_mut();
1127 let upgrade_receiver = http_res.recv_upgrade().await;
1128 let addr = request.remote().unwrap_or("unknown").to_string();
1129 task::spawn(async move {
1130 if let Some(stream) = upgrade_receiver.await {
1131 server
1132 .handle_connection(
1133 Connection::new(
1134 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1135 ),
1136 addr,
1137 user_id,
1138 None,
1139 RealExecutor,
1140 )
1141 .await;
1142 }
1143 });
1144
1145 Ok(response)
1146 }
1147 });
1148}
1149
1150fn header_contains_ignore_case<T>(
1151 request: &tide::Request<T>,
1152 header_name: HeaderName,
1153 value: &str,
1154) -> bool {
1155 request
1156 .header(header_name)
1157 .map(|h| {
1158 h.as_str()
1159 .split(',')
1160 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1161 })
1162 .unwrap_or(false)
1163}
1164
1165#[cfg(test)]
1166mod tests {
1167 use super::*;
1168 use crate::{
1169 auth,
1170 db::{tests::TestDb, UserId},
1171 github, AppState, Config,
1172 };
1173 use ::rpc::Peer;
1174 use collections::BTreeMap;
1175 use gpui::{executor, ModelHandle, TestAppContext};
1176 use parking_lot::Mutex;
1177 use postage::{sink::Sink, watch};
1178 use rand::prelude::*;
1179 use rpc::PeerId;
1180 use serde_json::json;
1181 use sqlx::types::time::OffsetDateTime;
1182 use std::{
1183 cell::Cell,
1184 env,
1185 ops::Deref,
1186 path::Path,
1187 rc::Rc,
1188 sync::{
1189 atomic::{AtomicBool, Ordering::SeqCst},
1190 Arc,
1191 },
1192 time::Duration,
1193 };
1194 use zed::{
1195 client::{
1196 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1197 EstablishConnectionError, UserStore,
1198 },
1199 editor::{
1200 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, EditorSettings,
1201 Input, MultiBuffer, Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1202 },
1203 fs::{FakeFs, Fs as _},
1204 language::{
1205 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1206 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1207 },
1208 lsp,
1209 project::{DiagnosticSummary, Project, ProjectPath},
1210 workspace::{Workspace, WorkspaceParams},
1211 };
1212
1213 #[cfg(test)]
1214 #[ctor::ctor]
1215 fn init_logger() {
1216 if std::env::var("RUST_LOG").is_ok() {
1217 env_logger::init();
1218 }
1219 }
1220
1221 #[gpui::test(iterations = 10)]
1222 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1223 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1224 let lang_registry = Arc::new(LanguageRegistry::new());
1225 let fs = FakeFs::new(cx_a.background());
1226 cx_a.foreground().forbid_parking();
1227
1228 // Connect to a server as 2 clients.
1229 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1230 let client_a = server.create_client(&mut cx_a, "user_a").await;
1231 let client_b = server.create_client(&mut cx_b, "user_b").await;
1232
1233 // Share a project as client A
1234 fs.insert_tree(
1235 "/a",
1236 json!({
1237 ".zed.toml": r#"collaborators = ["user_b"]"#,
1238 "a.txt": "a-contents",
1239 "b.txt": "b-contents",
1240 }),
1241 )
1242 .await;
1243 let project_a = cx_a.update(|cx| {
1244 Project::local(
1245 client_a.clone(),
1246 client_a.user_store.clone(),
1247 lang_registry.clone(),
1248 fs.clone(),
1249 cx,
1250 )
1251 });
1252 let (worktree_a, _) = project_a
1253 .update(&mut cx_a, |p, cx| {
1254 p.find_or_create_local_worktree("/a", false, cx)
1255 })
1256 .await
1257 .unwrap();
1258 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1259 worktree_a
1260 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1261 .await;
1262 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1263 project_a
1264 .update(&mut cx_a, |p, cx| p.share(cx))
1265 .await
1266 .unwrap();
1267
1268 // Join that project as client B
1269 let project_b = Project::remote(
1270 project_id,
1271 client_b.clone(),
1272 client_b.user_store.clone(),
1273 lang_registry.clone(),
1274 fs.clone(),
1275 &mut cx_b.to_async(),
1276 )
1277 .await
1278 .unwrap();
1279
1280 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1281 assert_eq!(
1282 project
1283 .collaborators()
1284 .get(&client_a.peer_id)
1285 .unwrap()
1286 .user
1287 .github_login,
1288 "user_a"
1289 );
1290 project.replica_id()
1291 });
1292 project_a
1293 .condition(&cx_a, |tree, _| {
1294 tree.collaborators()
1295 .get(&client_b.peer_id)
1296 .map_or(false, |collaborator| {
1297 collaborator.replica_id == replica_id_b
1298 && collaborator.user.github_login == "user_b"
1299 })
1300 })
1301 .await;
1302
1303 // Open the same file as client B and client A.
1304 let buffer_b = project_b
1305 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1306 .await
1307 .unwrap();
1308 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1309 buffer_b.read_with(&cx_b, |buf, cx| {
1310 assert_eq!(buf.read(cx).text(), "b-contents")
1311 });
1312 project_a.read_with(&cx_a, |project, cx| {
1313 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1314 });
1315 let buffer_a = project_a
1316 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1317 .await
1318 .unwrap();
1319
1320 let editor_b = cx_b.add_view(window_b, |cx| {
1321 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1322 });
1323
1324 // TODO
1325 // // Create a selection set as client B and see that selection set as client A.
1326 // buffer_a
1327 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1328 // .await;
1329
1330 // Edit the buffer as client B and see that edit as client A.
1331 editor_b.update(&mut cx_b, |editor, cx| {
1332 editor.handle_input(&Input("ok, ".into()), cx)
1333 });
1334 buffer_a
1335 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1336 .await;
1337
1338 // TODO
1339 // // Remove the selection set as client B, see those selections disappear as client A.
1340 cx_b.update(move |_| drop(editor_b));
1341 // buffer_a
1342 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1343 // .await;
1344
1345 // Close the buffer as client A, see that the buffer is closed.
1346 cx_a.update(move |_| drop(buffer_a));
1347 project_a
1348 .condition(&cx_a, |project, cx| {
1349 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1350 })
1351 .await;
1352
1353 // Dropping the client B's project removes client B from client A's collaborators.
1354 cx_b.update(move |_| drop(project_b));
1355 project_a
1356 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1357 .await;
1358 }
1359
1360 #[gpui::test(iterations = 10)]
1361 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1362 let lang_registry = Arc::new(LanguageRegistry::new());
1363 let fs = FakeFs::new(cx_a.background());
1364 cx_a.foreground().forbid_parking();
1365
1366 // Connect to a server as 2 clients.
1367 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1368 let client_a = server.create_client(&mut cx_a, "user_a").await;
1369 let client_b = server.create_client(&mut cx_b, "user_b").await;
1370
1371 // Share a project as client A
1372 fs.insert_tree(
1373 "/a",
1374 json!({
1375 ".zed.toml": r#"collaborators = ["user_b"]"#,
1376 "a.txt": "a-contents",
1377 "b.txt": "b-contents",
1378 }),
1379 )
1380 .await;
1381 let project_a = cx_a.update(|cx| {
1382 Project::local(
1383 client_a.clone(),
1384 client_a.user_store.clone(),
1385 lang_registry.clone(),
1386 fs.clone(),
1387 cx,
1388 )
1389 });
1390 let (worktree_a, _) = project_a
1391 .update(&mut cx_a, |p, cx| {
1392 p.find_or_create_local_worktree("/a", false, cx)
1393 })
1394 .await
1395 .unwrap();
1396 worktree_a
1397 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1398 .await;
1399 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1400 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1401 project_a
1402 .update(&mut cx_a, |p, cx| p.share(cx))
1403 .await
1404 .unwrap();
1405 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1406
1407 // Join that project as client B
1408 let project_b = Project::remote(
1409 project_id,
1410 client_b.clone(),
1411 client_b.user_store.clone(),
1412 lang_registry.clone(),
1413 fs.clone(),
1414 &mut cx_b.to_async(),
1415 )
1416 .await
1417 .unwrap();
1418 project_b
1419 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1420 .await
1421 .unwrap();
1422
1423 // Unshare the project as client A
1424 project_a
1425 .update(&mut cx_a, |project, cx| project.unshare(cx))
1426 .await
1427 .unwrap();
1428 project_b
1429 .condition(&mut cx_b, |project, _| project.is_read_only())
1430 .await;
1431 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1432 drop(project_b);
1433
1434 // Share the project again and ensure guests can still join.
1435 project_a
1436 .update(&mut cx_a, |project, cx| project.share(cx))
1437 .await
1438 .unwrap();
1439 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1440
1441 let project_c = Project::remote(
1442 project_id,
1443 client_b.clone(),
1444 client_b.user_store.clone(),
1445 lang_registry.clone(),
1446 fs.clone(),
1447 &mut cx_b.to_async(),
1448 )
1449 .await
1450 .unwrap();
1451 project_c
1452 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1453 .await
1454 .unwrap();
1455 }
1456
1457 #[gpui::test(iterations = 10)]
1458 async fn test_propagate_saves_and_fs_changes(
1459 mut cx_a: TestAppContext,
1460 mut cx_b: TestAppContext,
1461 mut cx_c: TestAppContext,
1462 ) {
1463 let lang_registry = Arc::new(LanguageRegistry::new());
1464 let fs = FakeFs::new(cx_a.background());
1465 cx_a.foreground().forbid_parking();
1466
1467 // Connect to a server as 3 clients.
1468 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1469 let client_a = server.create_client(&mut cx_a, "user_a").await;
1470 let client_b = server.create_client(&mut cx_b, "user_b").await;
1471 let client_c = server.create_client(&mut cx_c, "user_c").await;
1472
1473 // Share a worktree as client A.
1474 fs.insert_tree(
1475 "/a",
1476 json!({
1477 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1478 "file1": "",
1479 "file2": ""
1480 }),
1481 )
1482 .await;
1483 let project_a = cx_a.update(|cx| {
1484 Project::local(
1485 client_a.clone(),
1486 client_a.user_store.clone(),
1487 lang_registry.clone(),
1488 fs.clone(),
1489 cx,
1490 )
1491 });
1492 let (worktree_a, _) = project_a
1493 .update(&mut cx_a, |p, cx| {
1494 p.find_or_create_local_worktree("/a", false, cx)
1495 })
1496 .await
1497 .unwrap();
1498 worktree_a
1499 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1500 .await;
1501 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1502 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1503 project_a
1504 .update(&mut cx_a, |p, cx| p.share(cx))
1505 .await
1506 .unwrap();
1507
1508 // Join that worktree as clients B and C.
1509 let project_b = Project::remote(
1510 project_id,
1511 client_b.clone(),
1512 client_b.user_store.clone(),
1513 lang_registry.clone(),
1514 fs.clone(),
1515 &mut cx_b.to_async(),
1516 )
1517 .await
1518 .unwrap();
1519 let project_c = Project::remote(
1520 project_id,
1521 client_c.clone(),
1522 client_c.user_store.clone(),
1523 lang_registry.clone(),
1524 fs.clone(),
1525 &mut cx_c.to_async(),
1526 )
1527 .await
1528 .unwrap();
1529 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1530 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1531
1532 // Open and edit a buffer as both guests B and C.
1533 let buffer_b = project_b
1534 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1535 .await
1536 .unwrap();
1537 let buffer_c = project_c
1538 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1539 .await
1540 .unwrap();
1541 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1542 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1543
1544 // Open and edit that buffer as the host.
1545 let buffer_a = project_a
1546 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1547 .await
1548 .unwrap();
1549
1550 buffer_a
1551 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1552 .await;
1553 buffer_a.update(&mut cx_a, |buf, cx| {
1554 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1555 });
1556
1557 // Wait for edits to propagate
1558 buffer_a
1559 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1560 .await;
1561 buffer_b
1562 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1563 .await;
1564 buffer_c
1565 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1566 .await;
1567
1568 // Edit the buffer as the host and concurrently save as guest B.
1569 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1570 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1571 save_b.await.unwrap();
1572 assert_eq!(
1573 fs.load("/a/file1".as_ref()).await.unwrap(),
1574 "hi-a, i-am-c, i-am-b, i-am-a"
1575 );
1576 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1577 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1578 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1579
1580 // Make changes on host's file system, see those changes on guest worktrees.
1581 fs.rename(
1582 "/a/file1".as_ref(),
1583 "/a/file1-renamed".as_ref(),
1584 Default::default(),
1585 )
1586 .await
1587 .unwrap();
1588
1589 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1590 .await
1591 .unwrap();
1592 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1593
1594 worktree_a
1595 .condition(&cx_a, |tree, _| {
1596 tree.paths()
1597 .map(|p| p.to_string_lossy())
1598 .collect::<Vec<_>>()
1599 == [".zed.toml", "file1-renamed", "file3", "file4"]
1600 })
1601 .await;
1602 worktree_b
1603 .condition(&cx_b, |tree, _| {
1604 tree.paths()
1605 .map(|p| p.to_string_lossy())
1606 .collect::<Vec<_>>()
1607 == [".zed.toml", "file1-renamed", "file3", "file4"]
1608 })
1609 .await;
1610 worktree_c
1611 .condition(&cx_c, |tree, _| {
1612 tree.paths()
1613 .map(|p| p.to_string_lossy())
1614 .collect::<Vec<_>>()
1615 == [".zed.toml", "file1-renamed", "file3", "file4"]
1616 })
1617 .await;
1618
1619 // Ensure buffer files are updated as well.
1620 buffer_a
1621 .condition(&cx_a, |buf, _| {
1622 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1623 })
1624 .await;
1625 buffer_b
1626 .condition(&cx_b, |buf, _| {
1627 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1628 })
1629 .await;
1630 buffer_c
1631 .condition(&cx_c, |buf, _| {
1632 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1633 })
1634 .await;
1635 }
1636
1637 #[gpui::test(iterations = 10)]
1638 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1639 cx_a.foreground().forbid_parking();
1640 let lang_registry = Arc::new(LanguageRegistry::new());
1641 let fs = FakeFs::new(cx_a.background());
1642
1643 // Connect to a server as 2 clients.
1644 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1645 let client_a = server.create_client(&mut cx_a, "user_a").await;
1646 let client_b = server.create_client(&mut cx_b, "user_b").await;
1647
1648 // Share a project as client A
1649 fs.insert_tree(
1650 "/dir",
1651 json!({
1652 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1653 "a.txt": "a-contents",
1654 }),
1655 )
1656 .await;
1657
1658 let project_a = cx_a.update(|cx| {
1659 Project::local(
1660 client_a.clone(),
1661 client_a.user_store.clone(),
1662 lang_registry.clone(),
1663 fs.clone(),
1664 cx,
1665 )
1666 });
1667 let (worktree_a, _) = project_a
1668 .update(&mut cx_a, |p, cx| {
1669 p.find_or_create_local_worktree("/dir", false, cx)
1670 })
1671 .await
1672 .unwrap();
1673 worktree_a
1674 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1675 .await;
1676 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1677 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1678 project_a
1679 .update(&mut cx_a, |p, cx| p.share(cx))
1680 .await
1681 .unwrap();
1682
1683 // Join that project as client B
1684 let project_b = Project::remote(
1685 project_id,
1686 client_b.clone(),
1687 client_b.user_store.clone(),
1688 lang_registry.clone(),
1689 fs.clone(),
1690 &mut cx_b.to_async(),
1691 )
1692 .await
1693 .unwrap();
1694
1695 // Open a buffer as client B
1696 let buffer_b = project_b
1697 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1698 .await
1699 .unwrap();
1700
1701 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1702 buffer_b.read_with(&cx_b, |buf, _| {
1703 assert!(buf.is_dirty());
1704 assert!(!buf.has_conflict());
1705 });
1706
1707 buffer_b
1708 .update(&mut cx_b, |buf, cx| buf.save(cx))
1709 .await
1710 .unwrap();
1711 buffer_b
1712 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1713 .await;
1714 buffer_b.read_with(&cx_b, |buf, _| {
1715 assert!(!buf.has_conflict());
1716 });
1717
1718 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1719 buffer_b.read_with(&cx_b, |buf, _| {
1720 assert!(buf.is_dirty());
1721 assert!(!buf.has_conflict());
1722 });
1723 }
1724
1725 #[gpui::test(iterations = 10)]
1726 async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1727 cx_a.foreground().forbid_parking();
1728 let lang_registry = Arc::new(LanguageRegistry::new());
1729 let fs = FakeFs::new(cx_a.background());
1730
1731 // Connect to a server as 2 clients.
1732 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1733 let client_a = server.create_client(&mut cx_a, "user_a").await;
1734 let client_b = server.create_client(&mut cx_b, "user_b").await;
1735
1736 // Share a project as client A
1737 fs.insert_tree(
1738 "/dir",
1739 json!({
1740 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1741 "a.txt": "a-contents",
1742 }),
1743 )
1744 .await;
1745
1746 let project_a = cx_a.update(|cx| {
1747 Project::local(
1748 client_a.clone(),
1749 client_a.user_store.clone(),
1750 lang_registry.clone(),
1751 fs.clone(),
1752 cx,
1753 )
1754 });
1755 let (worktree_a, _) = project_a
1756 .update(&mut cx_a, |p, cx| {
1757 p.find_or_create_local_worktree("/dir", false, cx)
1758 })
1759 .await
1760 .unwrap();
1761 worktree_a
1762 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1763 .await;
1764 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1765 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1766 project_a
1767 .update(&mut cx_a, |p, cx| p.share(cx))
1768 .await
1769 .unwrap();
1770
1771 // Join that project as client B
1772 let project_b = Project::remote(
1773 project_id,
1774 client_b.clone(),
1775 client_b.user_store.clone(),
1776 lang_registry.clone(),
1777 fs.clone(),
1778 &mut cx_b.to_async(),
1779 )
1780 .await
1781 .unwrap();
1782 let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1783
1784 // Open a buffer as client B
1785 let buffer_b = project_b
1786 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1787 .await
1788 .unwrap();
1789 buffer_b.read_with(&cx_b, |buf, _| {
1790 assert!(!buf.is_dirty());
1791 assert!(!buf.has_conflict());
1792 });
1793
1794 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1795 .await
1796 .unwrap();
1797 buffer_b
1798 .condition(&cx_b, |buf, _| {
1799 buf.text() == "new contents" && !buf.is_dirty()
1800 })
1801 .await;
1802 buffer_b.read_with(&cx_b, |buf, _| {
1803 assert!(!buf.has_conflict());
1804 });
1805 }
1806
1807 #[gpui::test(iterations = 10)]
1808 async fn test_editing_while_guest_opens_buffer(
1809 mut cx_a: TestAppContext,
1810 mut cx_b: TestAppContext,
1811 ) {
1812 cx_a.foreground().forbid_parking();
1813 let lang_registry = Arc::new(LanguageRegistry::new());
1814 let fs = FakeFs::new(cx_a.background());
1815
1816 // Connect to a server as 2 clients.
1817 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1818 let client_a = server.create_client(&mut cx_a, "user_a").await;
1819 let client_b = server.create_client(&mut cx_b, "user_b").await;
1820
1821 // Share a project as client A
1822 fs.insert_tree(
1823 "/dir",
1824 json!({
1825 ".zed.toml": r#"collaborators = ["user_b"]"#,
1826 "a.txt": "a-contents",
1827 }),
1828 )
1829 .await;
1830 let project_a = cx_a.update(|cx| {
1831 Project::local(
1832 client_a.clone(),
1833 client_a.user_store.clone(),
1834 lang_registry.clone(),
1835 fs.clone(),
1836 cx,
1837 )
1838 });
1839 let (worktree_a, _) = project_a
1840 .update(&mut cx_a, |p, cx| {
1841 p.find_or_create_local_worktree("/dir", false, cx)
1842 })
1843 .await
1844 .unwrap();
1845 worktree_a
1846 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1847 .await;
1848 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1849 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1850 project_a
1851 .update(&mut cx_a, |p, cx| p.share(cx))
1852 .await
1853 .unwrap();
1854
1855 // Join that project as client B
1856 let project_b = Project::remote(
1857 project_id,
1858 client_b.clone(),
1859 client_b.user_store.clone(),
1860 lang_registry.clone(),
1861 fs.clone(),
1862 &mut cx_b.to_async(),
1863 )
1864 .await
1865 .unwrap();
1866
1867 // Open a buffer as client A
1868 let buffer_a = project_a
1869 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1870 .await
1871 .unwrap();
1872
1873 // Start opening the same buffer as client B
1874 let buffer_b = cx_b
1875 .background()
1876 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1877
1878 // Edit the buffer as client A while client B is still opening it.
1879 cx_b.background().simulate_random_delay().await;
1880 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1881 cx_b.background().simulate_random_delay().await;
1882 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1883
1884 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1885 let buffer_b = buffer_b.await.unwrap();
1886 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1887 }
1888
1889 #[gpui::test(iterations = 10)]
1890 async fn test_leaving_worktree_while_opening_buffer(
1891 mut cx_a: TestAppContext,
1892 mut cx_b: TestAppContext,
1893 ) {
1894 cx_a.foreground().forbid_parking();
1895 let lang_registry = Arc::new(LanguageRegistry::new());
1896 let fs = FakeFs::new(cx_a.background());
1897
1898 // Connect to a server as 2 clients.
1899 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1900 let client_a = server.create_client(&mut cx_a, "user_a").await;
1901 let client_b = server.create_client(&mut cx_b, "user_b").await;
1902
1903 // Share a project as client A
1904 fs.insert_tree(
1905 "/dir",
1906 json!({
1907 ".zed.toml": r#"collaborators = ["user_b"]"#,
1908 "a.txt": "a-contents",
1909 }),
1910 )
1911 .await;
1912 let project_a = cx_a.update(|cx| {
1913 Project::local(
1914 client_a.clone(),
1915 client_a.user_store.clone(),
1916 lang_registry.clone(),
1917 fs.clone(),
1918 cx,
1919 )
1920 });
1921 let (worktree_a, _) = project_a
1922 .update(&mut cx_a, |p, cx| {
1923 p.find_or_create_local_worktree("/dir", false, cx)
1924 })
1925 .await
1926 .unwrap();
1927 worktree_a
1928 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1929 .await;
1930 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1931 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1932 project_a
1933 .update(&mut cx_a, |p, cx| p.share(cx))
1934 .await
1935 .unwrap();
1936
1937 // Join that project as client B
1938 let project_b = Project::remote(
1939 project_id,
1940 client_b.clone(),
1941 client_b.user_store.clone(),
1942 lang_registry.clone(),
1943 fs.clone(),
1944 &mut cx_b.to_async(),
1945 )
1946 .await
1947 .unwrap();
1948
1949 // See that a guest has joined as client A.
1950 project_a
1951 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1952 .await;
1953
1954 // Begin opening a buffer as client B, but leave the project before the open completes.
1955 let buffer_b = cx_b
1956 .background()
1957 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1958 cx_b.update(|_| drop(project_b));
1959 drop(buffer_b);
1960
1961 // See that the guest has left.
1962 project_a
1963 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1964 .await;
1965 }
1966
1967 #[gpui::test(iterations = 10)]
1968 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1969 cx_a.foreground().forbid_parking();
1970 let lang_registry = Arc::new(LanguageRegistry::new());
1971 let fs = FakeFs::new(cx_a.background());
1972
1973 // Connect to a server as 2 clients.
1974 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1975 let client_a = server.create_client(&mut cx_a, "user_a").await;
1976 let client_b = server.create_client(&mut cx_b, "user_b").await;
1977
1978 // Share a project as client A
1979 fs.insert_tree(
1980 "/a",
1981 json!({
1982 ".zed.toml": r#"collaborators = ["user_b"]"#,
1983 "a.txt": "a-contents",
1984 "b.txt": "b-contents",
1985 }),
1986 )
1987 .await;
1988 let project_a = cx_a.update(|cx| {
1989 Project::local(
1990 client_a.clone(),
1991 client_a.user_store.clone(),
1992 lang_registry.clone(),
1993 fs.clone(),
1994 cx,
1995 )
1996 });
1997 let (worktree_a, _) = project_a
1998 .update(&mut cx_a, |p, cx| {
1999 p.find_or_create_local_worktree("/a", false, cx)
2000 })
2001 .await
2002 .unwrap();
2003 worktree_a
2004 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2005 .await;
2006 let project_id = project_a
2007 .update(&mut cx_a, |project, _| project.next_remote_id())
2008 .await;
2009 project_a
2010 .update(&mut cx_a, |project, cx| project.share(cx))
2011 .await
2012 .unwrap();
2013
2014 // Join that project as client B
2015 let _project_b = Project::remote(
2016 project_id,
2017 client_b.clone(),
2018 client_b.user_store.clone(),
2019 lang_registry.clone(),
2020 fs.clone(),
2021 &mut cx_b.to_async(),
2022 )
2023 .await
2024 .unwrap();
2025
2026 // See that a guest has joined as client A.
2027 project_a
2028 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2029 .await;
2030
2031 // Drop client B's connection and ensure client A observes client B leaving the worktree.
2032 client_b.disconnect(&cx_b.to_async()).unwrap();
2033 project_a
2034 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2035 .await;
2036 }
2037
2038 #[gpui::test(iterations = 10)]
2039 async fn test_collaborating_with_diagnostics(
2040 mut cx_a: TestAppContext,
2041 mut cx_b: TestAppContext,
2042 ) {
2043 cx_a.foreground().forbid_parking();
2044 let mut lang_registry = Arc::new(LanguageRegistry::new());
2045 let fs = FakeFs::new(cx_a.background());
2046
2047 // Set up a fake language server.
2048 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2049 Arc::get_mut(&mut lang_registry)
2050 .unwrap()
2051 .add(Arc::new(Language::new(
2052 LanguageConfig {
2053 name: "Rust".into(),
2054 path_suffixes: vec!["rs".to_string()],
2055 language_server: Some(language_server_config),
2056 ..Default::default()
2057 },
2058 Some(tree_sitter_rust::language()),
2059 )));
2060
2061 // Connect to a server as 2 clients.
2062 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2063 let client_a = server.create_client(&mut cx_a, "user_a").await;
2064 let client_b = server.create_client(&mut cx_b, "user_b").await;
2065
2066 // Share a project as client A
2067 fs.insert_tree(
2068 "/a",
2069 json!({
2070 ".zed.toml": r#"collaborators = ["user_b"]"#,
2071 "a.rs": "let one = two",
2072 "other.rs": "",
2073 }),
2074 )
2075 .await;
2076 let project_a = cx_a.update(|cx| {
2077 Project::local(
2078 client_a.clone(),
2079 client_a.user_store.clone(),
2080 lang_registry.clone(),
2081 fs.clone(),
2082 cx,
2083 )
2084 });
2085 let (worktree_a, _) = project_a
2086 .update(&mut cx_a, |p, cx| {
2087 p.find_or_create_local_worktree("/a", false, cx)
2088 })
2089 .await
2090 .unwrap();
2091 worktree_a
2092 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2093 .await;
2094 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2095 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2096 project_a
2097 .update(&mut cx_a, |p, cx| p.share(cx))
2098 .await
2099 .unwrap();
2100
2101 // Cause the language server to start.
2102 let _ = cx_a
2103 .background()
2104 .spawn(project_a.update(&mut cx_a, |project, cx| {
2105 project.open_buffer(
2106 ProjectPath {
2107 worktree_id,
2108 path: Path::new("other.rs").into(),
2109 },
2110 cx,
2111 )
2112 }))
2113 .await
2114 .unwrap();
2115
2116 // Simulate a language server reporting errors for a file.
2117 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2118 fake_language_server
2119 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2120 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2121 version: None,
2122 diagnostics: vec![lsp::Diagnostic {
2123 severity: Some(lsp::DiagnosticSeverity::ERROR),
2124 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2125 message: "message 1".to_string(),
2126 ..Default::default()
2127 }],
2128 })
2129 .await;
2130
2131 // Wait for server to see the diagnostics update.
2132 server
2133 .condition(|store| {
2134 let worktree = store
2135 .project(project_id)
2136 .unwrap()
2137 .worktrees
2138 .get(&worktree_id.to_proto())
2139 .unwrap();
2140
2141 !worktree
2142 .share
2143 .as_ref()
2144 .unwrap()
2145 .diagnostic_summaries
2146 .is_empty()
2147 })
2148 .await;
2149
2150 // Join the worktree as client B.
2151 let project_b = Project::remote(
2152 project_id,
2153 client_b.clone(),
2154 client_b.user_store.clone(),
2155 lang_registry.clone(),
2156 fs.clone(),
2157 &mut cx_b.to_async(),
2158 )
2159 .await
2160 .unwrap();
2161
2162 project_b.read_with(&cx_b, |project, cx| {
2163 assert_eq!(
2164 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2165 &[(
2166 ProjectPath {
2167 worktree_id,
2168 path: Arc::from(Path::new("a.rs")),
2169 },
2170 DiagnosticSummary {
2171 error_count: 1,
2172 warning_count: 0,
2173 ..Default::default()
2174 },
2175 )]
2176 )
2177 });
2178
2179 // Simulate a language server reporting more errors for a file.
2180 fake_language_server
2181 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2182 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2183 version: None,
2184 diagnostics: vec![
2185 lsp::Diagnostic {
2186 severity: Some(lsp::DiagnosticSeverity::ERROR),
2187 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2188 message: "message 1".to_string(),
2189 ..Default::default()
2190 },
2191 lsp::Diagnostic {
2192 severity: Some(lsp::DiagnosticSeverity::WARNING),
2193 range: lsp::Range::new(
2194 lsp::Position::new(0, 10),
2195 lsp::Position::new(0, 13),
2196 ),
2197 message: "message 2".to_string(),
2198 ..Default::default()
2199 },
2200 ],
2201 })
2202 .await;
2203
2204 // Client b gets the updated summaries
2205 project_b
2206 .condition(&cx_b, |project, cx| {
2207 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2208 == &[(
2209 ProjectPath {
2210 worktree_id,
2211 path: Arc::from(Path::new("a.rs")),
2212 },
2213 DiagnosticSummary {
2214 error_count: 1,
2215 warning_count: 1,
2216 ..Default::default()
2217 },
2218 )]
2219 })
2220 .await;
2221
2222 // Open the file with the errors on client B. They should be present.
2223 let buffer_b = cx_b
2224 .background()
2225 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2226 .await
2227 .unwrap();
2228
2229 buffer_b.read_with(&cx_b, |buffer, _| {
2230 assert_eq!(
2231 buffer
2232 .snapshot()
2233 .diagnostics_in_range::<_, Point>(0..buffer.len())
2234 .map(|entry| entry)
2235 .collect::<Vec<_>>(),
2236 &[
2237 DiagnosticEntry {
2238 range: Point::new(0, 4)..Point::new(0, 7),
2239 diagnostic: Diagnostic {
2240 group_id: 0,
2241 message: "message 1".to_string(),
2242 severity: lsp::DiagnosticSeverity::ERROR,
2243 is_primary: true,
2244 ..Default::default()
2245 }
2246 },
2247 DiagnosticEntry {
2248 range: Point::new(0, 10)..Point::new(0, 13),
2249 diagnostic: Diagnostic {
2250 group_id: 1,
2251 severity: lsp::DiagnosticSeverity::WARNING,
2252 message: "message 2".to_string(),
2253 is_primary: true,
2254 ..Default::default()
2255 }
2256 }
2257 ]
2258 );
2259 });
2260 }
2261
2262 #[gpui::test(iterations = 10)]
2263 async fn test_collaborating_with_completion(
2264 mut cx_a: TestAppContext,
2265 mut cx_b: TestAppContext,
2266 ) {
2267 cx_a.foreground().forbid_parking();
2268 let mut lang_registry = Arc::new(LanguageRegistry::new());
2269 let fs = FakeFs::new(cx_a.background());
2270
2271 // Set up a fake language server.
2272 let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2273 language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2274 completion_provider: Some(lsp::CompletionOptions {
2275 trigger_characters: Some(vec![".".to_string()]),
2276 ..Default::default()
2277 }),
2278 ..Default::default()
2279 });
2280 Arc::get_mut(&mut lang_registry)
2281 .unwrap()
2282 .add(Arc::new(Language::new(
2283 LanguageConfig {
2284 name: "Rust".into(),
2285 path_suffixes: vec!["rs".to_string()],
2286 language_server: Some(language_server_config),
2287 ..Default::default()
2288 },
2289 Some(tree_sitter_rust::language()),
2290 )));
2291
2292 // Connect to a server as 2 clients.
2293 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2294 let client_a = server.create_client(&mut cx_a, "user_a").await;
2295 let client_b = server.create_client(&mut cx_b, "user_b").await;
2296
2297 // Share a project as client A
2298 fs.insert_tree(
2299 "/a",
2300 json!({
2301 ".zed.toml": r#"collaborators = ["user_b"]"#,
2302 "main.rs": "fn main() { a }",
2303 "other.rs": "",
2304 }),
2305 )
2306 .await;
2307 let project_a = cx_a.update(|cx| {
2308 Project::local(
2309 client_a.clone(),
2310 client_a.user_store.clone(),
2311 lang_registry.clone(),
2312 fs.clone(),
2313 cx,
2314 )
2315 });
2316 let (worktree_a, _) = project_a
2317 .update(&mut cx_a, |p, cx| {
2318 p.find_or_create_local_worktree("/a", false, cx)
2319 })
2320 .await
2321 .unwrap();
2322 worktree_a
2323 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2324 .await;
2325 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2326 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2327 project_a
2328 .update(&mut cx_a, |p, cx| p.share(cx))
2329 .await
2330 .unwrap();
2331
2332 // Join the worktree as client B.
2333 let project_b = Project::remote(
2334 project_id,
2335 client_b.clone(),
2336 client_b.user_store.clone(),
2337 lang_registry.clone(),
2338 fs.clone(),
2339 &mut cx_b.to_async(),
2340 )
2341 .await
2342 .unwrap();
2343
2344 // Open a file in an editor as the guest.
2345 let buffer_b = project_b
2346 .update(&mut cx_b, |p, cx| {
2347 p.open_buffer((worktree_id, "main.rs"), cx)
2348 })
2349 .await
2350 .unwrap();
2351 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2352 let editor_b = cx_b.add_view(window_b, |cx| {
2353 Editor::for_buffer(
2354 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2355 Arc::new(|cx| EditorSettings::test(cx)),
2356 Some(project_b.clone()),
2357 cx,
2358 )
2359 });
2360
2361 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2362 buffer_b
2363 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2364 .await;
2365
2366 // Type a completion trigger character as the guest.
2367 editor_b.update(&mut cx_b, |editor, cx| {
2368 editor.select_ranges([13..13], None, cx);
2369 editor.handle_input(&Input(".".into()), cx);
2370 cx.focus(&editor_b);
2371 });
2372
2373 // Receive a completion request as the host's language server.
2374 // Return some completions from the host's language server.
2375 cx_a.foreground().start_waiting();
2376 fake_language_server
2377 .handle_request::<lsp::request::Completion, _>(|params| {
2378 assert_eq!(
2379 params.text_document_position.text_document.uri,
2380 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2381 );
2382 assert_eq!(
2383 params.text_document_position.position,
2384 lsp::Position::new(0, 14),
2385 );
2386
2387 Some(lsp::CompletionResponse::Array(vec![
2388 lsp::CompletionItem {
2389 label: "first_method(…)".into(),
2390 detail: Some("fn(&mut self, B) -> C".into()),
2391 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2392 new_text: "first_method($1)".to_string(),
2393 range: lsp::Range::new(
2394 lsp::Position::new(0, 14),
2395 lsp::Position::new(0, 14),
2396 ),
2397 })),
2398 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2399 ..Default::default()
2400 },
2401 lsp::CompletionItem {
2402 label: "second_method(…)".into(),
2403 detail: Some("fn(&mut self, C) -> D<E>".into()),
2404 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2405 new_text: "second_method()".to_string(),
2406 range: lsp::Range::new(
2407 lsp::Position::new(0, 14),
2408 lsp::Position::new(0, 14),
2409 ),
2410 })),
2411 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2412 ..Default::default()
2413 },
2414 ]))
2415 })
2416 .next()
2417 .await
2418 .unwrap();
2419 cx_a.foreground().finish_waiting();
2420
2421 // Open the buffer on the host.
2422 let buffer_a = project_a
2423 .update(&mut cx_a, |p, cx| {
2424 p.open_buffer((worktree_id, "main.rs"), cx)
2425 })
2426 .await
2427 .unwrap();
2428 buffer_a
2429 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2430 .await;
2431
2432 // Confirm a completion on the guest.
2433 editor_b
2434 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2435 .await;
2436 editor_b.update(&mut cx_b, |editor, cx| {
2437 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2438 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2439 });
2440
2441 // Return a resolved completion from the host's language server.
2442 // The resolved completion has an additional text edit.
2443 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2444 assert_eq!(params.label, "first_method(…)");
2445 lsp::CompletionItem {
2446 label: "first_method(…)".into(),
2447 detail: Some("fn(&mut self, B) -> C".into()),
2448 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2449 new_text: "first_method($1)".to_string(),
2450 range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2451 })),
2452 additional_text_edits: Some(vec![lsp::TextEdit {
2453 new_text: "use d::SomeTrait;\n".to_string(),
2454 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2455 }]),
2456 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2457 ..Default::default()
2458 }
2459 });
2460
2461 // The additional edit is applied.
2462 buffer_a
2463 .condition(&cx_a, |buffer, _| {
2464 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2465 })
2466 .await;
2467 buffer_b
2468 .condition(&cx_b, |buffer, _| {
2469 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2470 })
2471 .await;
2472 }
2473
2474 #[gpui::test(iterations = 10)]
2475 async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2476 cx_a.foreground().forbid_parking();
2477 let mut lang_registry = Arc::new(LanguageRegistry::new());
2478 let fs = FakeFs::new(cx_a.background());
2479
2480 // Set up a fake language server.
2481 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2482 Arc::get_mut(&mut lang_registry)
2483 .unwrap()
2484 .add(Arc::new(Language::new(
2485 LanguageConfig {
2486 name: "Rust".into(),
2487 path_suffixes: vec!["rs".to_string()],
2488 language_server: Some(language_server_config),
2489 ..Default::default()
2490 },
2491 Some(tree_sitter_rust::language()),
2492 )));
2493
2494 // Connect to a server as 2 clients.
2495 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2496 let client_a = server.create_client(&mut cx_a, "user_a").await;
2497 let client_b = server.create_client(&mut cx_b, "user_b").await;
2498
2499 // Share a project as client A
2500 fs.insert_tree(
2501 "/a",
2502 json!({
2503 ".zed.toml": r#"collaborators = ["user_b"]"#,
2504 "a.rs": "let one = two",
2505 }),
2506 )
2507 .await;
2508 let project_a = cx_a.update(|cx| {
2509 Project::local(
2510 client_a.clone(),
2511 client_a.user_store.clone(),
2512 lang_registry.clone(),
2513 fs.clone(),
2514 cx,
2515 )
2516 });
2517 let (worktree_a, _) = project_a
2518 .update(&mut cx_a, |p, cx| {
2519 p.find_or_create_local_worktree("/a", false, cx)
2520 })
2521 .await
2522 .unwrap();
2523 worktree_a
2524 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2525 .await;
2526 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2527 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2528 project_a
2529 .update(&mut cx_a, |p, cx| p.share(cx))
2530 .await
2531 .unwrap();
2532
2533 // Join the worktree as client B.
2534 let project_b = Project::remote(
2535 project_id,
2536 client_b.clone(),
2537 client_b.user_store.clone(),
2538 lang_registry.clone(),
2539 fs.clone(),
2540 &mut cx_b.to_async(),
2541 )
2542 .await
2543 .unwrap();
2544
2545 let buffer_b = cx_b
2546 .background()
2547 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2548 .await
2549 .unwrap();
2550
2551 let format = project_b.update(&mut cx_b, |project, cx| {
2552 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2553 });
2554
2555 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2556 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2557 Some(vec![
2558 lsp::TextEdit {
2559 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2560 new_text: "h".to_string(),
2561 },
2562 lsp::TextEdit {
2563 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2564 new_text: "y".to_string(),
2565 },
2566 ])
2567 });
2568
2569 format.await.unwrap();
2570 assert_eq!(
2571 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2572 "let honey = two"
2573 );
2574 }
2575
2576 #[gpui::test(iterations = 10)]
2577 async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2578 cx_a.foreground().forbid_parking();
2579 let mut lang_registry = Arc::new(LanguageRegistry::new());
2580 let fs = FakeFs::new(cx_a.background());
2581 fs.insert_tree(
2582 "/root-1",
2583 json!({
2584 ".zed.toml": r#"collaborators = ["user_b"]"#,
2585 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2586 }),
2587 )
2588 .await;
2589 fs.insert_tree(
2590 "/root-2",
2591 json!({
2592 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2593 }),
2594 )
2595 .await;
2596
2597 // Set up a fake language server.
2598 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2599 Arc::get_mut(&mut lang_registry)
2600 .unwrap()
2601 .add(Arc::new(Language::new(
2602 LanguageConfig {
2603 name: "Rust".into(),
2604 path_suffixes: vec!["rs".to_string()],
2605 language_server: Some(language_server_config),
2606 ..Default::default()
2607 },
2608 Some(tree_sitter_rust::language()),
2609 )));
2610
2611 // Connect to a server as 2 clients.
2612 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2613 let client_a = server.create_client(&mut cx_a, "user_a").await;
2614 let client_b = server.create_client(&mut cx_b, "user_b").await;
2615
2616 // Share a project as client A
2617 let project_a = cx_a.update(|cx| {
2618 Project::local(
2619 client_a.clone(),
2620 client_a.user_store.clone(),
2621 lang_registry.clone(),
2622 fs.clone(),
2623 cx,
2624 )
2625 });
2626 let (worktree_a, _) = project_a
2627 .update(&mut cx_a, |p, cx| {
2628 p.find_or_create_local_worktree("/root-1", false, cx)
2629 })
2630 .await
2631 .unwrap();
2632 worktree_a
2633 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2634 .await;
2635 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2636 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2637 project_a
2638 .update(&mut cx_a, |p, cx| p.share(cx))
2639 .await
2640 .unwrap();
2641
2642 // Join the worktree as client B.
2643 let project_b = Project::remote(
2644 project_id,
2645 client_b.clone(),
2646 client_b.user_store.clone(),
2647 lang_registry.clone(),
2648 fs.clone(),
2649 &mut cx_b.to_async(),
2650 )
2651 .await
2652 .unwrap();
2653
2654 // Open the file on client B.
2655 let buffer_b = cx_b
2656 .background()
2657 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2658 .await
2659 .unwrap();
2660
2661 // Request the definition of a symbol as the guest.
2662 let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2663
2664 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2665 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2666 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2667 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2668 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2669 )))
2670 });
2671
2672 let definitions_1 = definitions_1.await.unwrap();
2673 cx_b.read(|cx| {
2674 assert_eq!(definitions_1.len(), 1);
2675 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2676 let target_buffer = definitions_1[0].buffer.read(cx);
2677 assert_eq!(
2678 target_buffer.text(),
2679 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2680 );
2681 assert_eq!(
2682 definitions_1[0].range.to_point(target_buffer),
2683 Point::new(0, 6)..Point::new(0, 9)
2684 );
2685 });
2686
2687 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2688 // the previous call to `definition`.
2689 let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2690 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2691 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2692 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2693 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2694 )))
2695 });
2696
2697 let definitions_2 = definitions_2.await.unwrap();
2698 cx_b.read(|cx| {
2699 assert_eq!(definitions_2.len(), 1);
2700 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2701 let target_buffer = definitions_2[0].buffer.read(cx);
2702 assert_eq!(
2703 target_buffer.text(),
2704 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2705 );
2706 assert_eq!(
2707 definitions_2[0].range.to_point(target_buffer),
2708 Point::new(1, 6)..Point::new(1, 11)
2709 );
2710 });
2711 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2712
2713 cx_b.update(|_| {
2714 drop(definitions_1);
2715 drop(definitions_2);
2716 });
2717 project_b
2718 .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2719 .await;
2720 }
2721
2722 #[gpui::test(iterations = 10)]
2723 async fn test_references(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2724 cx_a.foreground().forbid_parking();
2725 let mut lang_registry = Arc::new(LanguageRegistry::new());
2726 let fs = FakeFs::new(cx_a.background());
2727 fs.insert_tree(
2728 "/root-1",
2729 json!({
2730 ".zed.toml": r#"collaborators = ["user_b"]"#,
2731 "one.rs": "const ONE: usize = 1;",
2732 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2733 }),
2734 )
2735 .await;
2736 fs.insert_tree(
2737 "/root-2",
2738 json!({
2739 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2740 }),
2741 )
2742 .await;
2743
2744 // Set up a fake language server.
2745 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2746 Arc::get_mut(&mut lang_registry)
2747 .unwrap()
2748 .add(Arc::new(Language::new(
2749 LanguageConfig {
2750 name: "Rust".into(),
2751 path_suffixes: vec!["rs".to_string()],
2752 language_server: Some(language_server_config),
2753 ..Default::default()
2754 },
2755 Some(tree_sitter_rust::language()),
2756 )));
2757
2758 // Connect to a server as 2 clients.
2759 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2760 let client_a = server.create_client(&mut cx_a, "user_a").await;
2761 let client_b = server.create_client(&mut cx_b, "user_b").await;
2762
2763 // Share a project as client A
2764 let project_a = cx_a.update(|cx| {
2765 Project::local(
2766 client_a.clone(),
2767 client_a.user_store.clone(),
2768 lang_registry.clone(),
2769 fs.clone(),
2770 cx,
2771 )
2772 });
2773 let (worktree_a, _) = project_a
2774 .update(&mut cx_a, |p, cx| {
2775 p.find_or_create_local_worktree("/root-1", false, cx)
2776 })
2777 .await
2778 .unwrap();
2779 worktree_a
2780 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2781 .await;
2782 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2783 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2784 project_a
2785 .update(&mut cx_a, |p, cx| p.share(cx))
2786 .await
2787 .unwrap();
2788
2789 // Join the worktree as client B.
2790 let project_b = Project::remote(
2791 project_id,
2792 client_b.clone(),
2793 client_b.user_store.clone(),
2794 lang_registry.clone(),
2795 fs.clone(),
2796 &mut cx_b.to_async(),
2797 )
2798 .await
2799 .unwrap();
2800
2801 // Open the file on client B.
2802 let buffer_b = cx_b
2803 .background()
2804 .spawn(project_b.update(&mut cx_b, |p, cx| {
2805 p.open_buffer((worktree_id, "one.rs"), cx)
2806 }))
2807 .await
2808 .unwrap();
2809
2810 // Request references to a symbol as the guest.
2811 let references = project_b.update(&mut cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2812
2813 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2814 fake_language_server.handle_request::<lsp::request::References, _>(|params| {
2815 assert_eq!(
2816 params.text_document_position.text_document.uri.as_str(),
2817 "file:///root-1/one.rs"
2818 );
2819 Some(vec![
2820 lsp::Location {
2821 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2822 range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2823 },
2824 lsp::Location {
2825 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2826 range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2827 },
2828 lsp::Location {
2829 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2830 range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2831 },
2832 ])
2833 });
2834
2835 let references = references.await.unwrap();
2836 cx_b.read(|cx| {
2837 assert_eq!(references.len(), 3);
2838 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2839
2840 let two_buffer = references[0].buffer.read(cx);
2841 let three_buffer = references[2].buffer.read(cx);
2842 assert_eq!(
2843 two_buffer.file().unwrap().path().as_ref(),
2844 Path::new("two.rs")
2845 );
2846 assert_eq!(references[1].buffer, references[0].buffer);
2847 assert_eq!(
2848 three_buffer.file().unwrap().full_path(cx),
2849 Path::new("three.rs")
2850 );
2851
2852 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2853 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2854 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2855 });
2856 }
2857
2858 #[gpui::test(iterations = 10)]
2859 async fn test_project_symbols(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2860 cx_a.foreground().forbid_parking();
2861 let mut lang_registry = Arc::new(LanguageRegistry::new());
2862 let fs = FakeFs::new(cx_a.background());
2863 fs.insert_tree(
2864 "/code",
2865 json!({
2866 "crate-1": {
2867 ".zed.toml": r#"collaborators = ["user_b"]"#,
2868 "one.rs": "const ONE: usize = 1;",
2869 },
2870 "crate-2": {
2871 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2872 },
2873 "private": {
2874 "passwords.txt": "the-password",
2875 }
2876 }),
2877 )
2878 .await;
2879
2880 // Set up a fake language server.
2881 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2882 Arc::get_mut(&mut lang_registry)
2883 .unwrap()
2884 .add(Arc::new(Language::new(
2885 LanguageConfig {
2886 name: "Rust".into(),
2887 path_suffixes: vec!["rs".to_string()],
2888 language_server: Some(language_server_config),
2889 ..Default::default()
2890 },
2891 Some(tree_sitter_rust::language()),
2892 )));
2893
2894 // Connect to a server as 2 clients.
2895 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2896 let client_a = server.create_client(&mut cx_a, "user_a").await;
2897 let client_b = server.create_client(&mut cx_b, "user_b").await;
2898
2899 // Share a project as client A
2900 let project_a = cx_a.update(|cx| {
2901 Project::local(
2902 client_a.clone(),
2903 client_a.user_store.clone(),
2904 lang_registry.clone(),
2905 fs.clone(),
2906 cx,
2907 )
2908 });
2909 let (worktree_a, _) = project_a
2910 .update(&mut cx_a, |p, cx| {
2911 p.find_or_create_local_worktree("/code/crate-1", false, cx)
2912 })
2913 .await
2914 .unwrap();
2915 worktree_a
2916 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2917 .await;
2918 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2919 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2920 project_a
2921 .update(&mut cx_a, |p, cx| p.share(cx))
2922 .await
2923 .unwrap();
2924
2925 // Join the worktree as client B.
2926 let project_b = Project::remote(
2927 project_id,
2928 client_b.clone(),
2929 client_b.user_store.clone(),
2930 lang_registry.clone(),
2931 fs.clone(),
2932 &mut cx_b.to_async(),
2933 )
2934 .await
2935 .unwrap();
2936
2937 // Cause the language server to start.
2938 let _buffer = cx_b
2939 .background()
2940 .spawn(project_b.update(&mut cx_b, |p, cx| {
2941 p.open_buffer((worktree_id, "one.rs"), cx)
2942 }))
2943 .await
2944 .unwrap();
2945
2946 // Request the definition of a symbol as the guest.
2947 let symbols = project_b.update(&mut cx_b, |p, cx| p.symbols("two", cx));
2948 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2949 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_| {
2950 #[allow(deprecated)]
2951 Some(vec![lsp::SymbolInformation {
2952 name: "TWO".into(),
2953 location: lsp::Location {
2954 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
2955 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2956 },
2957 kind: lsp::SymbolKind::CONSTANT,
2958 tags: None,
2959 container_name: None,
2960 deprecated: None,
2961 }])
2962 });
2963
2964 let symbols = symbols.await.unwrap();
2965 assert_eq!(symbols.len(), 1);
2966 assert_eq!(symbols[0].name, "TWO");
2967
2968 // Open one of the returned symbols.
2969 let buffer_b_2 = project_b
2970 .update(&mut cx_b, |project, cx| {
2971 project.open_buffer_for_symbol(&symbols[0], cx)
2972 })
2973 .await
2974 .unwrap();
2975 buffer_b_2.read_with(&cx_b, |buffer, _| {
2976 assert_eq!(
2977 buffer.file().unwrap().path().as_ref(),
2978 Path::new("../crate-2/two.rs")
2979 );
2980 });
2981
2982 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
2983 let mut fake_symbol = symbols[0].clone();
2984 fake_symbol.path = Path::new("/code/secrets").into();
2985 let error = project_b
2986 .update(&mut cx_b, |project, cx| {
2987 project.open_buffer_for_symbol(&fake_symbol, cx)
2988 })
2989 .await
2990 .unwrap_err();
2991 assert!(error.to_string().contains("invalid symbol signature"));
2992 }
2993
2994 #[gpui::test(iterations = 10)]
2995 async fn test_open_buffer_while_getting_definition_pointing_to_it(
2996 mut cx_a: TestAppContext,
2997 mut cx_b: TestAppContext,
2998 mut rng: StdRng,
2999 ) {
3000 cx_a.foreground().forbid_parking();
3001 let mut lang_registry = Arc::new(LanguageRegistry::new());
3002 let fs = FakeFs::new(cx_a.background());
3003 fs.insert_tree(
3004 "/root",
3005 json!({
3006 ".zed.toml": r#"collaborators = ["user_b"]"#,
3007 "a.rs": "const ONE: usize = b::TWO;",
3008 "b.rs": "const TWO: usize = 2",
3009 }),
3010 )
3011 .await;
3012
3013 // Set up a fake language server.
3014 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3015
3016 Arc::get_mut(&mut lang_registry)
3017 .unwrap()
3018 .add(Arc::new(Language::new(
3019 LanguageConfig {
3020 name: "Rust".into(),
3021 path_suffixes: vec!["rs".to_string()],
3022 language_server: Some(language_server_config),
3023 ..Default::default()
3024 },
3025 Some(tree_sitter_rust::language()),
3026 )));
3027
3028 // Connect to a server as 2 clients.
3029 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3030 let client_a = server.create_client(&mut cx_a, "user_a").await;
3031 let client_b = server.create_client(&mut cx_b, "user_b").await;
3032
3033 // Share a project as client A
3034 let project_a = cx_a.update(|cx| {
3035 Project::local(
3036 client_a.clone(),
3037 client_a.user_store.clone(),
3038 lang_registry.clone(),
3039 fs.clone(),
3040 cx,
3041 )
3042 });
3043
3044 let (worktree_a, _) = project_a
3045 .update(&mut cx_a, |p, cx| {
3046 p.find_or_create_local_worktree("/root", false, cx)
3047 })
3048 .await
3049 .unwrap();
3050 worktree_a
3051 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3052 .await;
3053 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3054 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3055 project_a
3056 .update(&mut cx_a, |p, cx| p.share(cx))
3057 .await
3058 .unwrap();
3059
3060 // Join the worktree as client B.
3061 let project_b = Project::remote(
3062 project_id,
3063 client_b.clone(),
3064 client_b.user_store.clone(),
3065 lang_registry.clone(),
3066 fs.clone(),
3067 &mut cx_b.to_async(),
3068 )
3069 .await
3070 .unwrap();
3071
3072 let buffer_b1 = cx_b
3073 .background()
3074 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3075 .await
3076 .unwrap();
3077
3078 let definitions;
3079 let buffer_b2;
3080 if rng.gen() {
3081 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3082 buffer_b2 =
3083 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3084 } else {
3085 buffer_b2 =
3086 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3087 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3088 }
3089
3090 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3091 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
3092 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3093 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3094 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3095 )))
3096 });
3097
3098 let buffer_b2 = buffer_b2.await.unwrap();
3099 let definitions = definitions.await.unwrap();
3100 assert_eq!(definitions.len(), 1);
3101 assert_eq!(definitions[0].buffer, buffer_b2);
3102 }
3103
3104 #[gpui::test(iterations = 10)]
3105 async fn test_collaborating_with_code_actions(
3106 mut cx_a: TestAppContext,
3107 mut cx_b: TestAppContext,
3108 ) {
3109 cx_a.foreground().forbid_parking();
3110 let mut lang_registry = Arc::new(LanguageRegistry::new());
3111 let fs = FakeFs::new(cx_a.background());
3112 let mut path_openers_b = Vec::new();
3113 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3114
3115 // Set up a fake language server.
3116 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3117 Arc::get_mut(&mut lang_registry)
3118 .unwrap()
3119 .add(Arc::new(Language::new(
3120 LanguageConfig {
3121 name: "Rust".into(),
3122 path_suffixes: vec!["rs".to_string()],
3123 language_server: Some(language_server_config),
3124 ..Default::default()
3125 },
3126 Some(tree_sitter_rust::language()),
3127 )));
3128
3129 // Connect to a server as 2 clients.
3130 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3131 let client_a = server.create_client(&mut cx_a, "user_a").await;
3132 let client_b = server.create_client(&mut cx_b, "user_b").await;
3133
3134 // Share a project as client A
3135 fs.insert_tree(
3136 "/a",
3137 json!({
3138 ".zed.toml": r#"collaborators = ["user_b"]"#,
3139 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3140 "other.rs": "pub fn foo() -> usize { 4 }",
3141 }),
3142 )
3143 .await;
3144 let project_a = cx_a.update(|cx| {
3145 Project::local(
3146 client_a.clone(),
3147 client_a.user_store.clone(),
3148 lang_registry.clone(),
3149 fs.clone(),
3150 cx,
3151 )
3152 });
3153 let (worktree_a, _) = project_a
3154 .update(&mut cx_a, |p, cx| {
3155 p.find_or_create_local_worktree("/a", false, cx)
3156 })
3157 .await
3158 .unwrap();
3159 worktree_a
3160 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3161 .await;
3162 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3163 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3164 project_a
3165 .update(&mut cx_a, |p, cx| p.share(cx))
3166 .await
3167 .unwrap();
3168
3169 // Join the worktree as client B.
3170 let project_b = Project::remote(
3171 project_id,
3172 client_b.clone(),
3173 client_b.user_store.clone(),
3174 lang_registry.clone(),
3175 fs.clone(),
3176 &mut cx_b.to_async(),
3177 )
3178 .await
3179 .unwrap();
3180 let mut params = cx_b.update(WorkspaceParams::test);
3181 params.languages = lang_registry.clone();
3182 params.client = client_b.client.clone();
3183 params.user_store = client_b.user_store.clone();
3184 params.project = project_b;
3185 params.path_openers = path_openers_b.into();
3186
3187 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3188 let editor_b = workspace_b
3189 .update(&mut cx_b, |workspace, cx| {
3190 workspace.open_path((worktree_id, "main.rs").into(), cx)
3191 })
3192 .await
3193 .unwrap()
3194 .downcast::<Editor>()
3195 .unwrap();
3196
3197 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3198 fake_language_server
3199 .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
3200 assert_eq!(
3201 params.text_document.uri,
3202 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3203 );
3204 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3205 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3206 None
3207 })
3208 .next()
3209 .await;
3210
3211 // Move cursor to a location that contains code actions.
3212 editor_b.update(&mut cx_b, |editor, cx| {
3213 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3214 cx.focus(&editor_b);
3215 });
3216
3217 fake_language_server
3218 .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
3219 assert_eq!(
3220 params.text_document.uri,
3221 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3222 );
3223 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3224 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3225
3226 Some(vec![lsp::CodeActionOrCommand::CodeAction(
3227 lsp::CodeAction {
3228 title: "Inline into all callers".to_string(),
3229 edit: Some(lsp::WorkspaceEdit {
3230 changes: Some(
3231 [
3232 (
3233 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3234 vec![lsp::TextEdit::new(
3235 lsp::Range::new(
3236 lsp::Position::new(1, 22),
3237 lsp::Position::new(1, 34),
3238 ),
3239 "4".to_string(),
3240 )],
3241 ),
3242 (
3243 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3244 vec![lsp::TextEdit::new(
3245 lsp::Range::new(
3246 lsp::Position::new(0, 0),
3247 lsp::Position::new(0, 27),
3248 ),
3249 "".to_string(),
3250 )],
3251 ),
3252 ]
3253 .into_iter()
3254 .collect(),
3255 ),
3256 ..Default::default()
3257 }),
3258 data: Some(json!({
3259 "codeActionParams": {
3260 "range": {
3261 "start": {"line": 1, "column": 31},
3262 "end": {"line": 1, "column": 31},
3263 }
3264 }
3265 })),
3266 ..Default::default()
3267 },
3268 )])
3269 })
3270 .next()
3271 .await;
3272
3273 // Toggle code actions and wait for them to display.
3274 editor_b.update(&mut cx_b, |editor, cx| {
3275 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3276 });
3277 editor_b
3278 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3279 .await;
3280
3281 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3282
3283 // Confirming the code action will trigger a resolve request.
3284 let confirm_action = workspace_b
3285 .update(&mut cx_b, |workspace, cx| {
3286 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3287 })
3288 .unwrap();
3289 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
3290 lsp::CodeAction {
3291 title: "Inline into all callers".to_string(),
3292 edit: Some(lsp::WorkspaceEdit {
3293 changes: Some(
3294 [
3295 (
3296 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3297 vec![lsp::TextEdit::new(
3298 lsp::Range::new(
3299 lsp::Position::new(1, 22),
3300 lsp::Position::new(1, 34),
3301 ),
3302 "4".to_string(),
3303 )],
3304 ),
3305 (
3306 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3307 vec![lsp::TextEdit::new(
3308 lsp::Range::new(
3309 lsp::Position::new(0, 0),
3310 lsp::Position::new(0, 27),
3311 ),
3312 "".to_string(),
3313 )],
3314 ),
3315 ]
3316 .into_iter()
3317 .collect(),
3318 ),
3319 ..Default::default()
3320 }),
3321 ..Default::default()
3322 }
3323 });
3324
3325 // After the action is confirmed, an editor containing both modified files is opened.
3326 confirm_action.await.unwrap();
3327 let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3328 workspace
3329 .active_item(cx)
3330 .unwrap()
3331 .downcast::<Editor>()
3332 .unwrap()
3333 });
3334 code_action_editor.update(&mut cx_b, |editor, cx| {
3335 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3336 editor.undo(&Undo, cx);
3337 assert_eq!(
3338 editor.text(cx),
3339 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3340 );
3341 editor.redo(&Redo, cx);
3342 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3343 });
3344 }
3345
3346 #[gpui::test(iterations = 10)]
3347 async fn test_collaborating_with_renames(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3348 cx_a.foreground().forbid_parking();
3349 let mut lang_registry = Arc::new(LanguageRegistry::new());
3350 let fs = FakeFs::new(cx_a.background());
3351 let mut path_openers_b = Vec::new();
3352 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3353
3354 // Set up a fake language server.
3355 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3356 Arc::get_mut(&mut lang_registry)
3357 .unwrap()
3358 .add(Arc::new(Language::new(
3359 LanguageConfig {
3360 name: "Rust".into(),
3361 path_suffixes: vec!["rs".to_string()],
3362 language_server: Some(language_server_config),
3363 ..Default::default()
3364 },
3365 Some(tree_sitter_rust::language()),
3366 )));
3367
3368 // Connect to a server as 2 clients.
3369 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3370 let client_a = server.create_client(&mut cx_a, "user_a").await;
3371 let client_b = server.create_client(&mut cx_b, "user_b").await;
3372
3373 // Share a project as client A
3374 fs.insert_tree(
3375 "/dir",
3376 json!({
3377 ".zed.toml": r#"collaborators = ["user_b"]"#,
3378 "one.rs": "const ONE: usize = 1;",
3379 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3380 }),
3381 )
3382 .await;
3383 let project_a = cx_a.update(|cx| {
3384 Project::local(
3385 client_a.clone(),
3386 client_a.user_store.clone(),
3387 lang_registry.clone(),
3388 fs.clone(),
3389 cx,
3390 )
3391 });
3392 let (worktree_a, _) = project_a
3393 .update(&mut cx_a, |p, cx| {
3394 p.find_or_create_local_worktree("/dir", false, cx)
3395 })
3396 .await
3397 .unwrap();
3398 worktree_a
3399 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3400 .await;
3401 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3402 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3403 project_a
3404 .update(&mut cx_a, |p, cx| p.share(cx))
3405 .await
3406 .unwrap();
3407
3408 // Join the worktree as client B.
3409 let project_b = Project::remote(
3410 project_id,
3411 client_b.clone(),
3412 client_b.user_store.clone(),
3413 lang_registry.clone(),
3414 fs.clone(),
3415 &mut cx_b.to_async(),
3416 )
3417 .await
3418 .unwrap();
3419 let mut params = cx_b.update(WorkspaceParams::test);
3420 params.languages = lang_registry.clone();
3421 params.client = client_b.client.clone();
3422 params.user_store = client_b.user_store.clone();
3423 params.project = project_b;
3424 params.path_openers = path_openers_b.into();
3425
3426 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3427 let editor_b = workspace_b
3428 .update(&mut cx_b, |workspace, cx| {
3429 workspace.open_path((worktree_id, "one.rs").into(), cx)
3430 })
3431 .await
3432 .unwrap()
3433 .downcast::<Editor>()
3434 .unwrap();
3435 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3436
3437 // Move cursor to a location that can be renamed.
3438 let prepare_rename = editor_b.update(&mut cx_b, |editor, cx| {
3439 editor.select_ranges([7..7], None, cx);
3440 editor.rename(&Rename, cx).unwrap()
3441 });
3442
3443 fake_language_server
3444 .handle_request::<lsp::request::PrepareRenameRequest, _>(|params| {
3445 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3446 assert_eq!(params.position, lsp::Position::new(0, 7));
3447 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3448 lsp::Position::new(0, 6),
3449 lsp::Position::new(0, 9),
3450 )))
3451 })
3452 .next()
3453 .await
3454 .unwrap();
3455 prepare_rename.await.unwrap();
3456 editor_b.update(&mut cx_b, |editor, cx| {
3457 let rename = editor.pending_rename().unwrap();
3458 let buffer = editor.buffer().read(cx).snapshot(cx);
3459 assert_eq!(
3460 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3461 6..9
3462 );
3463 rename.editor.update(cx, |rename_editor, cx| {
3464 rename_editor.buffer().update(cx, |rename_buffer, cx| {
3465 rename_buffer.edit([0..3], "THREE", cx);
3466 });
3467 });
3468 });
3469
3470 let confirm_rename = workspace_b.update(&mut cx_b, |workspace, cx| {
3471 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3472 });
3473 fake_language_server
3474 .handle_request::<lsp::request::Rename, _>(|params| {
3475 assert_eq!(
3476 params.text_document_position.text_document.uri.as_str(),
3477 "file:///dir/one.rs"
3478 );
3479 assert_eq!(
3480 params.text_document_position.position,
3481 lsp::Position::new(0, 6)
3482 );
3483 assert_eq!(params.new_name, "THREE");
3484 Some(lsp::WorkspaceEdit {
3485 changes: Some(
3486 [
3487 (
3488 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3489 vec![lsp::TextEdit::new(
3490 lsp::Range::new(
3491 lsp::Position::new(0, 6),
3492 lsp::Position::new(0, 9),
3493 ),
3494 "THREE".to_string(),
3495 )],
3496 ),
3497 (
3498 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3499 vec![
3500 lsp::TextEdit::new(
3501 lsp::Range::new(
3502 lsp::Position::new(0, 24),
3503 lsp::Position::new(0, 27),
3504 ),
3505 "THREE".to_string(),
3506 ),
3507 lsp::TextEdit::new(
3508 lsp::Range::new(
3509 lsp::Position::new(0, 35),
3510 lsp::Position::new(0, 38),
3511 ),
3512 "THREE".to_string(),
3513 ),
3514 ],
3515 ),
3516 ]
3517 .into_iter()
3518 .collect(),
3519 ),
3520 ..Default::default()
3521 })
3522 })
3523 .next()
3524 .await
3525 .unwrap();
3526 confirm_rename.await.unwrap();
3527
3528 let rename_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3529 workspace
3530 .active_item(cx)
3531 .unwrap()
3532 .downcast::<Editor>()
3533 .unwrap()
3534 });
3535 rename_editor.update(&mut cx_b, |editor, cx| {
3536 assert_eq!(
3537 editor.text(cx),
3538 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3539 );
3540 editor.undo(&Undo, cx);
3541 assert_eq!(
3542 editor.text(cx),
3543 "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3544 );
3545 editor.redo(&Redo, cx);
3546 assert_eq!(
3547 editor.text(cx),
3548 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3549 );
3550 });
3551
3552 // Ensure temporary rename edits cannot be undone/redone.
3553 editor_b.update(&mut cx_b, |editor, cx| {
3554 editor.undo(&Undo, cx);
3555 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3556 editor.undo(&Undo, cx);
3557 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3558 editor.redo(&Redo, cx);
3559 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3560 })
3561 }
3562
3563 #[gpui::test(iterations = 10)]
3564 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3565 cx_a.foreground().forbid_parking();
3566
3567 // Connect to a server as 2 clients.
3568 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3569 let client_a = server.create_client(&mut cx_a, "user_a").await;
3570 let client_b = server.create_client(&mut cx_b, "user_b").await;
3571
3572 // Create an org that includes these 2 users.
3573 let db = &server.app_state.db;
3574 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3575 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3576 .await
3577 .unwrap();
3578 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3579 .await
3580 .unwrap();
3581
3582 // Create a channel that includes all the users.
3583 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3584 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3585 .await
3586 .unwrap();
3587 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3588 .await
3589 .unwrap();
3590 db.create_channel_message(
3591 channel_id,
3592 client_b.current_user_id(&cx_b),
3593 "hello A, it's B.",
3594 OffsetDateTime::now_utc(),
3595 1,
3596 )
3597 .await
3598 .unwrap();
3599
3600 let channels_a = cx_a
3601 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3602 channels_a
3603 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3604 .await;
3605 channels_a.read_with(&cx_a, |list, _| {
3606 assert_eq!(
3607 list.available_channels().unwrap(),
3608 &[ChannelDetails {
3609 id: channel_id.to_proto(),
3610 name: "test-channel".to_string()
3611 }]
3612 )
3613 });
3614 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3615 this.get_channel(channel_id.to_proto(), cx).unwrap()
3616 });
3617 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3618 channel_a
3619 .condition(&cx_a, |channel, _| {
3620 channel_messages(channel)
3621 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3622 })
3623 .await;
3624
3625 let channels_b = cx_b
3626 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3627 channels_b
3628 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3629 .await;
3630 channels_b.read_with(&cx_b, |list, _| {
3631 assert_eq!(
3632 list.available_channels().unwrap(),
3633 &[ChannelDetails {
3634 id: channel_id.to_proto(),
3635 name: "test-channel".to_string()
3636 }]
3637 )
3638 });
3639
3640 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3641 this.get_channel(channel_id.to_proto(), cx).unwrap()
3642 });
3643 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3644 channel_b
3645 .condition(&cx_b, |channel, _| {
3646 channel_messages(channel)
3647 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3648 })
3649 .await;
3650
3651 channel_a
3652 .update(&mut cx_a, |channel, cx| {
3653 channel
3654 .send_message("oh, hi B.".to_string(), cx)
3655 .unwrap()
3656 .detach();
3657 let task = channel.send_message("sup".to_string(), cx).unwrap();
3658 assert_eq!(
3659 channel_messages(channel),
3660 &[
3661 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3662 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3663 ("user_a".to_string(), "sup".to_string(), true)
3664 ]
3665 );
3666 task
3667 })
3668 .await
3669 .unwrap();
3670
3671 channel_b
3672 .condition(&cx_b, |channel, _| {
3673 channel_messages(channel)
3674 == [
3675 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3676 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3677 ("user_a".to_string(), "sup".to_string(), false),
3678 ]
3679 })
3680 .await;
3681
3682 assert_eq!(
3683 server
3684 .state()
3685 .await
3686 .channel(channel_id)
3687 .unwrap()
3688 .connection_ids
3689 .len(),
3690 2
3691 );
3692 cx_b.update(|_| drop(channel_b));
3693 server
3694 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3695 .await;
3696
3697 cx_a.update(|_| drop(channel_a));
3698 server
3699 .condition(|state| state.channel(channel_id).is_none())
3700 .await;
3701 }
3702
3703 #[gpui::test(iterations = 10)]
3704 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3705 cx_a.foreground().forbid_parking();
3706
3707 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3708 let client_a = server.create_client(&mut cx_a, "user_a").await;
3709
3710 let db = &server.app_state.db;
3711 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3712 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3713 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3714 .await
3715 .unwrap();
3716 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3717 .await
3718 .unwrap();
3719
3720 let channels_a = cx_a
3721 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3722 channels_a
3723 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3724 .await;
3725 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3726 this.get_channel(channel_id.to_proto(), cx).unwrap()
3727 });
3728
3729 // Messages aren't allowed to be too long.
3730 channel_a
3731 .update(&mut cx_a, |channel, cx| {
3732 let long_body = "this is long.\n".repeat(1024);
3733 channel.send_message(long_body, cx).unwrap()
3734 })
3735 .await
3736 .unwrap_err();
3737
3738 // Messages aren't allowed to be blank.
3739 channel_a.update(&mut cx_a, |channel, cx| {
3740 channel.send_message(String::new(), cx).unwrap_err()
3741 });
3742
3743 // Leading and trailing whitespace are trimmed.
3744 channel_a
3745 .update(&mut cx_a, |channel, cx| {
3746 channel
3747 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3748 .unwrap()
3749 })
3750 .await
3751 .unwrap();
3752 assert_eq!(
3753 db.get_channel_messages(channel_id, 10, None)
3754 .await
3755 .unwrap()
3756 .iter()
3757 .map(|m| &m.body)
3758 .collect::<Vec<_>>(),
3759 &["surrounded by whitespace"]
3760 );
3761 }
3762
3763 #[gpui::test(iterations = 10)]
3764 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3765 cx_a.foreground().forbid_parking();
3766
3767 // Connect to a server as 2 clients.
3768 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3769 let client_a = server.create_client(&mut cx_a, "user_a").await;
3770 let client_b = server.create_client(&mut cx_b, "user_b").await;
3771 let mut status_b = client_b.status();
3772
3773 // Create an org that includes these 2 users.
3774 let db = &server.app_state.db;
3775 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3776 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3777 .await
3778 .unwrap();
3779 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3780 .await
3781 .unwrap();
3782
3783 // Create a channel that includes all the users.
3784 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3785 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3786 .await
3787 .unwrap();
3788 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3789 .await
3790 .unwrap();
3791 db.create_channel_message(
3792 channel_id,
3793 client_b.current_user_id(&cx_b),
3794 "hello A, it's B.",
3795 OffsetDateTime::now_utc(),
3796 2,
3797 )
3798 .await
3799 .unwrap();
3800
3801 let channels_a = cx_a
3802 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3803 channels_a
3804 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3805 .await;
3806
3807 channels_a.read_with(&cx_a, |list, _| {
3808 assert_eq!(
3809 list.available_channels().unwrap(),
3810 &[ChannelDetails {
3811 id: channel_id.to_proto(),
3812 name: "test-channel".to_string()
3813 }]
3814 )
3815 });
3816 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3817 this.get_channel(channel_id.to_proto(), cx).unwrap()
3818 });
3819 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3820 channel_a
3821 .condition(&cx_a, |channel, _| {
3822 channel_messages(channel)
3823 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3824 })
3825 .await;
3826
3827 let channels_b = cx_b
3828 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3829 channels_b
3830 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3831 .await;
3832 channels_b.read_with(&cx_b, |list, _| {
3833 assert_eq!(
3834 list.available_channels().unwrap(),
3835 &[ChannelDetails {
3836 id: channel_id.to_proto(),
3837 name: "test-channel".to_string()
3838 }]
3839 )
3840 });
3841
3842 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3843 this.get_channel(channel_id.to_proto(), cx).unwrap()
3844 });
3845 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3846 channel_b
3847 .condition(&cx_b, |channel, _| {
3848 channel_messages(channel)
3849 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3850 })
3851 .await;
3852
3853 // Disconnect client B, ensuring we can still access its cached channel data.
3854 server.forbid_connections();
3855 server.disconnect_client(client_b.current_user_id(&cx_b));
3856 while !matches!(
3857 status_b.next().await,
3858 Some(client::Status::ReconnectionError { .. })
3859 ) {}
3860
3861 channels_b.read_with(&cx_b, |channels, _| {
3862 assert_eq!(
3863 channels.available_channels().unwrap(),
3864 [ChannelDetails {
3865 id: channel_id.to_proto(),
3866 name: "test-channel".to_string()
3867 }]
3868 )
3869 });
3870 channel_b.read_with(&cx_b, |channel, _| {
3871 assert_eq!(
3872 channel_messages(channel),
3873 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3874 )
3875 });
3876
3877 // Send a message from client B while it is disconnected.
3878 channel_b
3879 .update(&mut cx_b, |channel, cx| {
3880 let task = channel
3881 .send_message("can you see this?".to_string(), cx)
3882 .unwrap();
3883 assert_eq!(
3884 channel_messages(channel),
3885 &[
3886 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3887 ("user_b".to_string(), "can you see this?".to_string(), true)
3888 ]
3889 );
3890 task
3891 })
3892 .await
3893 .unwrap_err();
3894
3895 // Send a message from client A while B is disconnected.
3896 channel_a
3897 .update(&mut cx_a, |channel, cx| {
3898 channel
3899 .send_message("oh, hi B.".to_string(), cx)
3900 .unwrap()
3901 .detach();
3902 let task = channel.send_message("sup".to_string(), cx).unwrap();
3903 assert_eq!(
3904 channel_messages(channel),
3905 &[
3906 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3907 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3908 ("user_a".to_string(), "sup".to_string(), true)
3909 ]
3910 );
3911 task
3912 })
3913 .await
3914 .unwrap();
3915
3916 // Give client B a chance to reconnect.
3917 server.allow_connections();
3918 cx_b.foreground().advance_clock(Duration::from_secs(10));
3919
3920 // Verify that B sees the new messages upon reconnection, as well as the message client B
3921 // sent while offline.
3922 channel_b
3923 .condition(&cx_b, |channel, _| {
3924 channel_messages(channel)
3925 == [
3926 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3927 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3928 ("user_a".to_string(), "sup".to_string(), false),
3929 ("user_b".to_string(), "can you see this?".to_string(), false),
3930 ]
3931 })
3932 .await;
3933
3934 // Ensure client A and B can communicate normally after reconnection.
3935 channel_a
3936 .update(&mut cx_a, |channel, cx| {
3937 channel.send_message("you online?".to_string(), cx).unwrap()
3938 })
3939 .await
3940 .unwrap();
3941 channel_b
3942 .condition(&cx_b, |channel, _| {
3943 channel_messages(channel)
3944 == [
3945 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3946 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3947 ("user_a".to_string(), "sup".to_string(), false),
3948 ("user_b".to_string(), "can you see this?".to_string(), false),
3949 ("user_a".to_string(), "you online?".to_string(), false),
3950 ]
3951 })
3952 .await;
3953
3954 channel_b
3955 .update(&mut cx_b, |channel, cx| {
3956 channel.send_message("yep".to_string(), cx).unwrap()
3957 })
3958 .await
3959 .unwrap();
3960 channel_a
3961 .condition(&cx_a, |channel, _| {
3962 channel_messages(channel)
3963 == [
3964 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3965 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3966 ("user_a".to_string(), "sup".to_string(), false),
3967 ("user_b".to_string(), "can you see this?".to_string(), false),
3968 ("user_a".to_string(), "you online?".to_string(), false),
3969 ("user_b".to_string(), "yep".to_string(), false),
3970 ]
3971 })
3972 .await;
3973 }
3974
3975 #[gpui::test(iterations = 10)]
3976 async fn test_contacts(
3977 mut cx_a: TestAppContext,
3978 mut cx_b: TestAppContext,
3979 mut cx_c: TestAppContext,
3980 ) {
3981 cx_a.foreground().forbid_parking();
3982 let lang_registry = Arc::new(LanguageRegistry::new());
3983 let fs = FakeFs::new(cx_a.background());
3984
3985 // Connect to a server as 3 clients.
3986 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3987 let client_a = server.create_client(&mut cx_a, "user_a").await;
3988 let client_b = server.create_client(&mut cx_b, "user_b").await;
3989 let client_c = server.create_client(&mut cx_c, "user_c").await;
3990
3991 // Share a worktree as client A.
3992 fs.insert_tree(
3993 "/a",
3994 json!({
3995 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3996 }),
3997 )
3998 .await;
3999
4000 let project_a = cx_a.update(|cx| {
4001 Project::local(
4002 client_a.clone(),
4003 client_a.user_store.clone(),
4004 lang_registry.clone(),
4005 fs.clone(),
4006 cx,
4007 )
4008 });
4009 let (worktree_a, _) = project_a
4010 .update(&mut cx_a, |p, cx| {
4011 p.find_or_create_local_worktree("/a", false, cx)
4012 })
4013 .await
4014 .unwrap();
4015 worktree_a
4016 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4017 .await;
4018
4019 client_a
4020 .user_store
4021 .condition(&cx_a, |user_store, _| {
4022 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4023 })
4024 .await;
4025 client_b
4026 .user_store
4027 .condition(&cx_b, |user_store, _| {
4028 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4029 })
4030 .await;
4031 client_c
4032 .user_store
4033 .condition(&cx_c, |user_store, _| {
4034 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4035 })
4036 .await;
4037
4038 let project_id = project_a
4039 .update(&mut cx_a, |project, _| project.next_remote_id())
4040 .await;
4041 project_a
4042 .update(&mut cx_a, |project, cx| project.share(cx))
4043 .await
4044 .unwrap();
4045
4046 let _project_b = Project::remote(
4047 project_id,
4048 client_b.clone(),
4049 client_b.user_store.clone(),
4050 lang_registry.clone(),
4051 fs.clone(),
4052 &mut cx_b.to_async(),
4053 )
4054 .await
4055 .unwrap();
4056
4057 client_a
4058 .user_store
4059 .condition(&cx_a, |user_store, _| {
4060 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4061 })
4062 .await;
4063 client_b
4064 .user_store
4065 .condition(&cx_b, |user_store, _| {
4066 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4067 })
4068 .await;
4069 client_c
4070 .user_store
4071 .condition(&cx_c, |user_store, _| {
4072 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4073 })
4074 .await;
4075
4076 project_a
4077 .condition(&cx_a, |project, _| {
4078 project.collaborators().contains_key(&client_b.peer_id)
4079 })
4080 .await;
4081
4082 cx_a.update(move |_| drop(project_a));
4083 client_a
4084 .user_store
4085 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4086 .await;
4087 client_b
4088 .user_store
4089 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4090 .await;
4091 client_c
4092 .user_store
4093 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4094 .await;
4095
4096 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4097 user_store
4098 .contacts()
4099 .iter()
4100 .map(|contact| {
4101 let worktrees = contact
4102 .projects
4103 .iter()
4104 .map(|p| {
4105 (
4106 p.worktree_root_names[0].as_str(),
4107 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4108 )
4109 })
4110 .collect();
4111 (contact.user.github_login.as_str(), worktrees)
4112 })
4113 .collect()
4114 }
4115 }
4116
4117 #[gpui::test(iterations = 100)]
4118 async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
4119 cx.foreground().forbid_parking();
4120 let max_peers = env::var("MAX_PEERS")
4121 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4122 .unwrap_or(5);
4123 let max_operations = env::var("OPERATIONS")
4124 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4125 .unwrap_or(10);
4126
4127 let rng = Arc::new(Mutex::new(rng));
4128
4129 let guest_lang_registry = Arc::new(LanguageRegistry::new());
4130 let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4131
4132 let fs = FakeFs::new(cx.background());
4133 fs.insert_tree(
4134 "/_collab",
4135 json!({
4136 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4137 }),
4138 )
4139 .await;
4140
4141 let operations = Rc::new(Cell::new(0));
4142 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4143 let mut clients = Vec::new();
4144
4145 let mut next_entity_id = 100000;
4146 let mut host_cx = TestAppContext::new(
4147 cx.foreground_platform(),
4148 cx.platform(),
4149 cx.foreground(),
4150 cx.background(),
4151 cx.font_cache(),
4152 next_entity_id,
4153 );
4154 let host = server.create_client(&mut host_cx, "host").await;
4155 let host_project = host_cx.update(|cx| {
4156 Project::local(
4157 host.client.clone(),
4158 host.user_store.clone(),
4159 Arc::new(LanguageRegistry::new()),
4160 fs.clone(),
4161 cx,
4162 )
4163 });
4164 let host_project_id = host_project
4165 .update(&mut host_cx, |p, _| p.next_remote_id())
4166 .await;
4167
4168 let (collab_worktree, _) = host_project
4169 .update(&mut host_cx, |project, cx| {
4170 project.find_or_create_local_worktree("/_collab", false, cx)
4171 })
4172 .await
4173 .unwrap();
4174 collab_worktree
4175 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4176 .await;
4177 host_project
4178 .update(&mut host_cx, |project, cx| project.share(cx))
4179 .await
4180 .unwrap();
4181
4182 clients.push(cx.foreground().spawn(host.simulate_host(
4183 host_project.clone(),
4184 language_server_config,
4185 operations.clone(),
4186 max_operations,
4187 rng.clone(),
4188 host_cx.clone(),
4189 )));
4190
4191 while operations.get() < max_operations {
4192 cx.background().simulate_random_delay().await;
4193 if clients.len() < max_peers && rng.lock().gen_bool(0.05) {
4194 operations.set(operations.get() + 1);
4195
4196 let guest_id = clients.len();
4197 log::info!("Adding guest {}", guest_id);
4198 next_entity_id += 100000;
4199 let mut guest_cx = TestAppContext::new(
4200 cx.foreground_platform(),
4201 cx.platform(),
4202 cx.foreground(),
4203 cx.background(),
4204 cx.font_cache(),
4205 next_entity_id,
4206 );
4207 let guest = server
4208 .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4209 .await;
4210 let guest_project = Project::remote(
4211 host_project_id,
4212 guest.client.clone(),
4213 guest.user_store.clone(),
4214 guest_lang_registry.clone(),
4215 fs.clone(),
4216 &mut guest_cx.to_async(),
4217 )
4218 .await
4219 .unwrap();
4220 clients.push(cx.foreground().spawn(guest.simulate_guest(
4221 guest_id,
4222 guest_project,
4223 operations.clone(),
4224 max_operations,
4225 rng.clone(),
4226 guest_cx,
4227 )));
4228
4229 log::info!("Guest {} added", guest_id);
4230 }
4231 }
4232
4233 let clients = futures::future::join_all(clients).await;
4234 cx.foreground().run_until_parked();
4235
4236 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4237 project
4238 .worktrees(cx)
4239 .map(|worktree| {
4240 let snapshot = worktree.read(cx).snapshot();
4241 (snapshot.id(), snapshot)
4242 })
4243 .collect::<BTreeMap<_, _>>()
4244 });
4245
4246 for (guest_client, guest_cx) in clients.iter().skip(1) {
4247 let guest_id = guest_client.client.id();
4248 let worktree_snapshots =
4249 guest_client
4250 .project
4251 .as_ref()
4252 .unwrap()
4253 .read_with(guest_cx, |project, cx| {
4254 project
4255 .worktrees(cx)
4256 .map(|worktree| {
4257 let worktree = worktree.read(cx);
4258 assert!(
4259 !worktree.as_remote().unwrap().has_pending_updates(),
4260 "Guest {} worktree {:?} contains deferred updates",
4261 guest_id,
4262 worktree.id()
4263 );
4264 (worktree.id(), worktree.snapshot())
4265 })
4266 .collect::<BTreeMap<_, _>>()
4267 });
4268
4269 assert_eq!(
4270 worktree_snapshots.keys().collect::<Vec<_>>(),
4271 host_worktree_snapshots.keys().collect::<Vec<_>>(),
4272 "guest {} has different worktrees than the host",
4273 guest_id
4274 );
4275 for (id, host_snapshot) in &host_worktree_snapshots {
4276 let guest_snapshot = &worktree_snapshots[id];
4277 assert_eq!(
4278 guest_snapshot.root_name(),
4279 host_snapshot.root_name(),
4280 "guest {} has different root name than the host for worktree {}",
4281 guest_id,
4282 id
4283 );
4284 assert_eq!(
4285 guest_snapshot.entries(false).collect::<Vec<_>>(),
4286 host_snapshot.entries(false).collect::<Vec<_>>(),
4287 "guest {} has different snapshot than the host for worktree {}",
4288 guest_id,
4289 id
4290 );
4291 }
4292
4293 guest_client
4294 .project
4295 .as_ref()
4296 .unwrap()
4297 .read_with(guest_cx, |project, _| {
4298 assert!(
4299 !project.has_buffered_operations(),
4300 "guest {} has buffered operations ",
4301 guest_id,
4302 );
4303 });
4304
4305 for guest_buffer in &guest_client.buffers {
4306 let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4307 let host_buffer = host_project.read_with(&host_cx, |project, _| {
4308 project
4309 .shared_buffer(guest_client.peer_id, buffer_id)
4310 .expect(&format!(
4311 "host doest not have buffer for guest:{}, peer:{}, id:{}",
4312 guest_id, guest_client.peer_id, buffer_id
4313 ))
4314 });
4315 assert_eq!(
4316 guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4317 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4318 "guest {} buffer {} differs from the host's buffer",
4319 guest_id,
4320 buffer_id,
4321 );
4322 }
4323 }
4324 }
4325
4326 struct TestServer {
4327 peer: Arc<Peer>,
4328 app_state: Arc<AppState>,
4329 server: Arc<Server>,
4330 foreground: Rc<executor::Foreground>,
4331 notifications: mpsc::UnboundedReceiver<()>,
4332 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4333 forbid_connections: Arc<AtomicBool>,
4334 _test_db: TestDb,
4335 }
4336
4337 impl TestServer {
4338 async fn start(
4339 foreground: Rc<executor::Foreground>,
4340 background: Arc<executor::Background>,
4341 ) -> Self {
4342 let test_db = TestDb::fake(background);
4343 let app_state = Self::build_app_state(&test_db).await;
4344 let peer = Peer::new();
4345 let notifications = mpsc::unbounded();
4346 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4347 Self {
4348 peer,
4349 app_state,
4350 server,
4351 foreground,
4352 notifications: notifications.1,
4353 connection_killers: Default::default(),
4354 forbid_connections: Default::default(),
4355 _test_db: test_db,
4356 }
4357 }
4358
4359 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4360 let http = FakeHttpClient::with_404_response();
4361 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4362 let client_name = name.to_string();
4363 let mut client = Client::new(http.clone());
4364 let server = self.server.clone();
4365 let connection_killers = self.connection_killers.clone();
4366 let forbid_connections = self.forbid_connections.clone();
4367 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4368
4369 Arc::get_mut(&mut client)
4370 .unwrap()
4371 .override_authenticate(move |cx| {
4372 cx.spawn(|_| async move {
4373 let access_token = "the-token".to_string();
4374 Ok(Credentials {
4375 user_id: user_id.0 as u64,
4376 access_token,
4377 })
4378 })
4379 })
4380 .override_establish_connection(move |credentials, cx| {
4381 assert_eq!(credentials.user_id, user_id.0 as u64);
4382 assert_eq!(credentials.access_token, "the-token");
4383
4384 let server = server.clone();
4385 let connection_killers = connection_killers.clone();
4386 let forbid_connections = forbid_connections.clone();
4387 let client_name = client_name.clone();
4388 let connection_id_tx = connection_id_tx.clone();
4389 cx.spawn(move |cx| async move {
4390 if forbid_connections.load(SeqCst) {
4391 Err(EstablishConnectionError::other(anyhow!(
4392 "server is forbidding connections"
4393 )))
4394 } else {
4395 let (client_conn, server_conn, kill_conn) =
4396 Connection::in_memory(cx.background());
4397 connection_killers.lock().insert(user_id, kill_conn);
4398 cx.background()
4399 .spawn(server.handle_connection(
4400 server_conn,
4401 client_name,
4402 user_id,
4403 Some(connection_id_tx),
4404 cx.background(),
4405 ))
4406 .detach();
4407 Ok(client_conn)
4408 }
4409 })
4410 });
4411
4412 client
4413 .authenticate_and_connect(&cx.to_async())
4414 .await
4415 .unwrap();
4416
4417 Channel::init(&client);
4418 Project::init(&client);
4419
4420 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4421 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4422 let mut authed_user =
4423 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4424 while authed_user.next().await.unwrap().is_none() {}
4425
4426 TestClient {
4427 client,
4428 peer_id,
4429 user_store,
4430 project: Default::default(),
4431 buffers: Default::default(),
4432 }
4433 }
4434
4435 fn disconnect_client(&self, user_id: UserId) {
4436 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4437 let _ = kill_conn.try_send(Some(()));
4438 }
4439 }
4440
4441 fn forbid_connections(&self) {
4442 self.forbid_connections.store(true, SeqCst);
4443 }
4444
4445 fn allow_connections(&self) {
4446 self.forbid_connections.store(false, SeqCst);
4447 }
4448
4449 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4450 let mut config = Config::default();
4451 config.session_secret = "a".repeat(32);
4452 config.database_url = test_db.url.clone();
4453 let github_client = github::AppClient::test();
4454 Arc::new(AppState {
4455 db: test_db.db().clone(),
4456 handlebars: Default::default(),
4457 auth_client: auth::build_client("", ""),
4458 repo_client: github::RepoClient::test(&github_client),
4459 github_client,
4460 config,
4461 })
4462 }
4463
4464 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4465 self.server.store.read()
4466 }
4467
4468 async fn condition<F>(&mut self, mut predicate: F)
4469 where
4470 F: FnMut(&Store) -> bool,
4471 {
4472 async_std::future::timeout(Duration::from_millis(500), async {
4473 while !(predicate)(&*self.server.store.read()) {
4474 self.foreground.start_waiting();
4475 self.notifications.next().await;
4476 self.foreground.finish_waiting();
4477 }
4478 })
4479 .await
4480 .expect("condition timed out");
4481 }
4482 }
4483
4484 impl Drop for TestServer {
4485 fn drop(&mut self) {
4486 self.peer.reset();
4487 }
4488 }
4489
4490 struct TestClient {
4491 client: Arc<Client>,
4492 pub peer_id: PeerId,
4493 pub user_store: ModelHandle<UserStore>,
4494 project: Option<ModelHandle<Project>>,
4495 buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4496 }
4497
4498 impl Deref for TestClient {
4499 type Target = Arc<Client>;
4500
4501 fn deref(&self) -> &Self::Target {
4502 &self.client
4503 }
4504 }
4505
4506 impl TestClient {
4507 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4508 UserId::from_proto(
4509 self.user_store
4510 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4511 )
4512 }
4513
4514 fn simulate_host(
4515 mut self,
4516 project: ModelHandle<Project>,
4517 mut language_server_config: LanguageServerConfig,
4518 operations: Rc<Cell<usize>>,
4519 max_operations: usize,
4520 rng: Arc<Mutex<StdRng>>,
4521 mut cx: TestAppContext,
4522 ) -> impl Future<Output = (Self, TestAppContext)> {
4523 let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4524
4525 // Set up a fake language server.
4526 language_server_config.set_fake_initializer({
4527 let rng = rng.clone();
4528 let files = files.clone();
4529 move |fake_server| {
4530 fake_server.handle_request::<lsp::request::Completion, _>(|_| {
4531 Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4532 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4533 range: lsp::Range::new(
4534 lsp::Position::new(0, 0),
4535 lsp::Position::new(0, 0),
4536 ),
4537 new_text: "the-new-text".to_string(),
4538 })),
4539 ..Default::default()
4540 }]))
4541 });
4542
4543 fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_| {
4544 Some(vec![lsp::CodeActionOrCommand::CodeAction(
4545 lsp::CodeAction {
4546 title: "the-code-action".to_string(),
4547 ..Default::default()
4548 },
4549 )])
4550 });
4551
4552 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(|params| {
4553 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4554 params.position,
4555 params.position,
4556 )))
4557 });
4558
4559 fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4560 let files = files.clone();
4561 let rng = rng.clone();
4562 move |_| {
4563 let files = files.lock();
4564 let mut rng = rng.lock();
4565 let count = rng.gen_range::<usize, _>(1..3);
4566 Some(lsp::GotoDefinitionResponse::Array(
4567 (0..count)
4568 .map(|_| {
4569 let file = files.choose(&mut *rng).unwrap().as_path();
4570 lsp::Location {
4571 uri: lsp::Url::from_file_path(file).unwrap(),
4572 range: Default::default(),
4573 }
4574 })
4575 .collect(),
4576 ))
4577 }
4578 });
4579 }
4580 });
4581
4582 project.update(&mut cx, |project, _| {
4583 project.languages().add(Arc::new(Language::new(
4584 LanguageConfig {
4585 name: "Rust".into(),
4586 path_suffixes: vec!["rs".to_string()],
4587 language_server: Some(language_server_config),
4588 ..Default::default()
4589 },
4590 None,
4591 )));
4592 });
4593
4594 async move {
4595 let fs = project.read_with(&cx, |project, _| project.fs().clone());
4596 while operations.get() < max_operations {
4597 operations.set(operations.get() + 1);
4598
4599 let distribution = rng.lock().gen_range::<usize, _>(0..100);
4600 match distribution {
4601 0..=20 if !files.lock().is_empty() => {
4602 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4603 let mut path = path.as_path();
4604 while let Some(parent_path) = path.parent() {
4605 path = parent_path;
4606 if rng.lock().gen() {
4607 break;
4608 }
4609 }
4610
4611 log::info!("Host: find/create local worktree {:?}", path);
4612 project
4613 .update(&mut cx, |project, cx| {
4614 project.find_or_create_local_worktree(path, false, cx)
4615 })
4616 .await
4617 .unwrap();
4618 }
4619 10..=80 if !files.lock().is_empty() => {
4620 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4621 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4622 let (worktree, path) = project
4623 .update(&mut cx, |project, cx| {
4624 project.find_or_create_local_worktree(file, false, cx)
4625 })
4626 .await
4627 .unwrap();
4628 let project_path =
4629 worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4630 log::info!("Host: opening path {:?}", project_path);
4631 let buffer = project
4632 .update(&mut cx, |project, cx| {
4633 project.open_buffer(project_path, cx)
4634 })
4635 .await
4636 .unwrap();
4637 self.buffers.insert(buffer.clone());
4638 buffer
4639 } else {
4640 self.buffers
4641 .iter()
4642 .choose(&mut *rng.lock())
4643 .unwrap()
4644 .clone()
4645 };
4646
4647 if rng.lock().gen_bool(0.1) {
4648 cx.update(|cx| {
4649 log::info!(
4650 "Host: dropping buffer {:?}",
4651 buffer.read(cx).file().unwrap().full_path(cx)
4652 );
4653 self.buffers.remove(&buffer);
4654 drop(buffer);
4655 });
4656 } else {
4657 buffer.update(&mut cx, |buffer, cx| {
4658 log::info!(
4659 "Host: updating buffer {:?}",
4660 buffer.file().unwrap().full_path(cx)
4661 );
4662 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4663 });
4664 }
4665 }
4666 _ => loop {
4667 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4668 let mut path = PathBuf::new();
4669 path.push("/");
4670 for _ in 0..path_component_count {
4671 let letter = rng.lock().gen_range(b'a'..=b'z');
4672 path.push(std::str::from_utf8(&[letter]).unwrap());
4673 }
4674 path.set_extension("rs");
4675 let parent_path = path.parent().unwrap();
4676
4677 log::info!("Host: creating file {:?}", path,);
4678
4679 if fs.create_dir(&parent_path).await.is_ok()
4680 && fs.create_file(&path, Default::default()).await.is_ok()
4681 {
4682 files.lock().push(path);
4683 break;
4684 } else {
4685 log::info!("Host: cannot create file");
4686 }
4687 },
4688 }
4689
4690 cx.background().simulate_random_delay().await;
4691 }
4692
4693 self.project = Some(project);
4694 (self, cx)
4695 }
4696 }
4697
4698 pub async fn simulate_guest(
4699 mut self,
4700 guest_id: usize,
4701 project: ModelHandle<Project>,
4702 operations: Rc<Cell<usize>>,
4703 max_operations: usize,
4704 rng: Arc<Mutex<StdRng>>,
4705 mut cx: TestAppContext,
4706 ) -> (Self, TestAppContext) {
4707 while operations.get() < max_operations {
4708 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4709 let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4710 project
4711 .worktrees(&cx)
4712 .filter(|worktree| {
4713 worktree.read(cx).entries(false).any(|e| e.is_file())
4714 })
4715 .choose(&mut *rng.lock())
4716 }) {
4717 worktree
4718 } else {
4719 cx.background().simulate_random_delay().await;
4720 continue;
4721 };
4722
4723 operations.set(operations.get() + 1);
4724 let project_path = worktree.read_with(&cx, |worktree, _| {
4725 let entry = worktree
4726 .entries(false)
4727 .filter(|e| e.is_file())
4728 .choose(&mut *rng.lock())
4729 .unwrap();
4730 (worktree.id(), entry.path.clone())
4731 });
4732 log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4733 let buffer = project
4734 .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4735 .await
4736 .unwrap();
4737 self.buffers.insert(buffer.clone());
4738 buffer
4739 } else {
4740 operations.set(operations.get() + 1);
4741
4742 self.buffers
4743 .iter()
4744 .choose(&mut *rng.lock())
4745 .unwrap()
4746 .clone()
4747 };
4748
4749 let choice = rng.lock().gen_range(0..100);
4750 match choice {
4751 0..=9 => {
4752 cx.update(|cx| {
4753 log::info!(
4754 "Guest {}: dropping buffer {:?}",
4755 guest_id,
4756 buffer.read(cx).file().unwrap().full_path(cx)
4757 );
4758 self.buffers.remove(&buffer);
4759 drop(buffer);
4760 });
4761 }
4762 10..=19 => {
4763 let completions = project.update(&mut cx, |project, cx| {
4764 log::info!(
4765 "Guest {}: requesting completions for buffer {:?}",
4766 guest_id,
4767 buffer.read(cx).file().unwrap().full_path(cx)
4768 );
4769 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4770 project.completions(&buffer, offset, cx)
4771 });
4772 let completions = cx.background().spawn(async move {
4773 completions.await.expect("completions request failed");
4774 });
4775 if rng.lock().gen_bool(0.3) {
4776 log::info!("Guest {}: detaching completions request", guest_id);
4777 completions.detach();
4778 } else {
4779 completions.await;
4780 }
4781 }
4782 20..=29 => {
4783 let code_actions = project.update(&mut cx, |project, cx| {
4784 log::info!(
4785 "Guest {}: requesting code actions for buffer {:?}",
4786 guest_id,
4787 buffer.read(cx).file().unwrap().full_path(cx)
4788 );
4789 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4790 project.code_actions(&buffer, range, cx)
4791 });
4792 let code_actions = cx.background().spawn(async move {
4793 code_actions.await.expect("code actions request failed");
4794 });
4795 if rng.lock().gen_bool(0.3) {
4796 log::info!("Guest {}: detaching code actions request", guest_id);
4797 code_actions.detach();
4798 } else {
4799 code_actions.await;
4800 }
4801 }
4802 30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4803 let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4804 log::info!(
4805 "Guest {}: saving buffer {:?}",
4806 guest_id,
4807 buffer.file().unwrap().full_path(cx)
4808 );
4809 (buffer.version(), buffer.save(cx))
4810 });
4811 let save = cx.spawn(|cx| async move {
4812 let (saved_version, _) = save.await.expect("save request failed");
4813 buffer.read_with(&cx, |buffer, _| {
4814 assert!(buffer.version().observed_all(&saved_version));
4815 assert!(saved_version.observed_all(&requested_version));
4816 });
4817 });
4818 if rng.lock().gen_bool(0.3) {
4819 log::info!("Guest {}: detaching save request", guest_id);
4820 save.detach();
4821 } else {
4822 save.await;
4823 }
4824 }
4825 40..=45 => {
4826 let prepare_rename = project.update(&mut cx, |project, cx| {
4827 log::info!(
4828 "Guest {}: preparing rename for buffer {:?}",
4829 guest_id,
4830 buffer.read(cx).file().unwrap().full_path(cx)
4831 );
4832 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4833 project.prepare_rename(buffer, offset, cx)
4834 });
4835 let prepare_rename = cx.background().spawn(async move {
4836 prepare_rename.await.expect("prepare rename request failed");
4837 });
4838 if rng.lock().gen_bool(0.3) {
4839 log::info!("Guest {}: detaching prepare rename request", guest_id);
4840 prepare_rename.detach();
4841 } else {
4842 prepare_rename.await;
4843 }
4844 }
4845 46..=49 => {
4846 let definitions = project.update(&mut cx, |project, cx| {
4847 log::info!(
4848 "Guest {}: requesting defintions for buffer {:?}",
4849 guest_id,
4850 buffer.read(cx).file().unwrap().full_path(cx)
4851 );
4852 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4853 project.definition(&buffer, offset, cx)
4854 });
4855 let definitions = cx.background().spawn(async move {
4856 definitions.await.expect("definitions request failed");
4857 });
4858 if rng.lock().gen_bool(0.3) {
4859 log::info!("Guest {}: detaching definitions request", guest_id);
4860 definitions.detach();
4861 } else {
4862 definitions.await;
4863 }
4864 }
4865 _ => {
4866 buffer.update(&mut cx, |buffer, cx| {
4867 log::info!(
4868 "Guest {}: updating buffer {:?}",
4869 guest_id,
4870 buffer.file().unwrap().full_path(cx)
4871 );
4872 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4873 });
4874 }
4875 }
4876 cx.background().simulate_random_delay().await;
4877 }
4878
4879 self.project = Some(project);
4880 (self, cx)
4881 }
4882 }
4883
4884 impl Executor for Arc<gpui::executor::Background> {
4885 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4886 self.spawn(future).detach();
4887 }
4888 }
4889
4890 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4891 channel
4892 .messages()
4893 .cursor::<()>()
4894 .map(|m| {
4895 (
4896 m.sender.github_login.clone(),
4897 m.body.clone(),
4898 m.is_pending(),
4899 )
4900 })
4901 .collect()
4902 }
4903
4904 struct EmptyView;
4905
4906 impl gpui::Entity for EmptyView {
4907 type Event = ();
4908 }
4909
4910 impl gpui::View for EmptyView {
4911 fn ui_name() -> &'static str {
4912 "empty view"
4913 }
4914
4915 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
4916 gpui::Element::boxed(gpui::elements::Empty)
4917 }
4918 }
4919}