1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use rpc::{
15 proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
16 Connection, ConnectionId, Peer, TypedEnvelope,
17};
18use sha1::{Digest as _, Sha1};
19use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
20use store::{Store, Worktree};
21use surf::StatusCode;
22use tide::log;
23use tide::{
24 http::headers::{HeaderName, CONNECTION, UPGRADE},
25 Request, Response,
26};
27use time::OffsetDateTime;
28
29type MessageHandler = Box<
30 dyn Send
31 + Sync
32 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
33>;
34
35pub struct Server {
36 peer: Arc<Peer>,
37 store: RwLock<Store>,
38 app_state: Arc<AppState>,
39 handlers: HashMap<TypeId, MessageHandler>,
40 notifications: Option<mpsc::UnboundedSender<()>>,
41}
42
43pub trait Executor {
44 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
45}
46
47pub struct RealExecutor;
48
49const MESSAGE_COUNT_PER_PAGE: usize = 100;
50const MAX_MESSAGE_LEN: usize = 1024;
51
52impl Server {
53 pub fn new(
54 app_state: Arc<AppState>,
55 peer: Arc<Peer>,
56 notifications: Option<mpsc::UnboundedSender<()>>,
57 ) -> Arc<Self> {
58 let mut server = Self {
59 peer,
60 app_state,
61 store: Default::default(),
62 handlers: Default::default(),
63 notifications,
64 };
65
66 server
67 .add_request_handler(Server::ping)
68 .add_request_handler(Server::register_project)
69 .add_message_handler(Server::unregister_project)
70 .add_request_handler(Server::share_project)
71 .add_message_handler(Server::unshare_project)
72 .add_request_handler(Server::join_project)
73 .add_message_handler(Server::leave_project)
74 .add_request_handler(Server::register_worktree)
75 .add_message_handler(Server::unregister_worktree)
76 .add_request_handler(Server::share_worktree)
77 .add_request_handler(Server::update_worktree)
78 .add_message_handler(Server::update_diagnostic_summary)
79 .add_message_handler(Server::disk_based_diagnostics_updating)
80 .add_message_handler(Server::disk_based_diagnostics_updated)
81 .add_request_handler(Server::get_definition)
82 .add_request_handler(Server::get_project_symbols)
83 .add_request_handler(Server::open_buffer_for_symbol)
84 .add_request_handler(Server::open_buffer)
85 .add_message_handler(Server::close_buffer)
86 .add_request_handler(Server::update_buffer)
87 .add_message_handler(Server::update_buffer_file)
88 .add_message_handler(Server::buffer_reloaded)
89 .add_message_handler(Server::buffer_saved)
90 .add_request_handler(Server::save_buffer)
91 .add_request_handler(Server::format_buffers)
92 .add_request_handler(Server::get_completions)
93 .add_request_handler(Server::apply_additional_edits_for_completion)
94 .add_request_handler(Server::get_code_actions)
95 .add_request_handler(Server::apply_code_action)
96 .add_request_handler(Server::prepare_rename)
97 .add_request_handler(Server::perform_rename)
98 .add_request_handler(Server::get_channels)
99 .add_request_handler(Server::get_users)
100 .add_request_handler(Server::join_channel)
101 .add_message_handler(Server::leave_channel)
102 .add_request_handler(Server::send_channel_message)
103 .add_request_handler(Server::get_channel_messages);
104
105 Arc::new(server)
106 }
107
108 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
109 where
110 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
111 Fut: 'static + Send + Future<Output = tide::Result<()>>,
112 M: EnvelopedMessage,
113 {
114 let prev_handler = self.handlers.insert(
115 TypeId::of::<M>(),
116 Box::new(move |server, envelope| {
117 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
118 (handler)(server, *envelope).boxed()
119 }),
120 );
121 if prev_handler.is_some() {
122 panic!("registered a handler for the same message twice");
123 }
124 self
125 }
126
127 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
128 where
129 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
130 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
131 M: RequestMessage,
132 {
133 self.add_message_handler(move |server, envelope| {
134 let receipt = envelope.receipt();
135 let response = (handler)(server.clone(), envelope);
136 async move {
137 match response.await {
138 Ok(response) => {
139 server.peer.respond(receipt, response)?;
140 Ok(())
141 }
142 Err(error) => {
143 server.peer.respond_with_error(
144 receipt,
145 proto::Error {
146 message: error.to_string(),
147 },
148 )?;
149 Err(error)
150 }
151 }
152 }
153 })
154 }
155
156 pub fn handle_connection<E: Executor>(
157 self: &Arc<Self>,
158 connection: Connection,
159 addr: String,
160 user_id: UserId,
161 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
162 executor: E,
163 ) -> impl Future<Output = ()> {
164 let mut this = self.clone();
165 async move {
166 let (connection_id, handle_io, mut incoming_rx) =
167 this.peer.add_connection(connection).await;
168
169 if let Some(send_connection_id) = send_connection_id.as_mut() {
170 let _ = send_connection_id.send(connection_id).await;
171 }
172
173 this.state_mut().add_connection(connection_id, user_id);
174 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
175 log::error!("error updating contacts for {:?}: {}", user_id, err);
176 }
177
178 let handle_io = handle_io.fuse();
179 futures::pin_mut!(handle_io);
180 loop {
181 let next_message = incoming_rx.next().fuse();
182 futures::pin_mut!(next_message);
183 futures::select_biased! {
184 result = handle_io => {
185 if let Err(err) = result {
186 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
187 }
188 break;
189 }
190 message = next_message => {
191 if let Some(message) = message {
192 let start_time = Instant::now();
193 let type_name = message.payload_type_name();
194 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
195 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
196 let notifications = this.notifications.clone();
197 let is_background = message.is_background();
198 let handle_message = (handler)(this.clone(), message);
199 let handle_message = async move {
200 if let Err(err) = handle_message.await {
201 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
202 } else {
203 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
204 }
205 if let Some(mut notifications) = notifications {
206 let _ = notifications.send(()).await;
207 }
208 };
209 if is_background {
210 executor.spawn_detached(handle_message);
211 } else {
212 handle_message.await;
213 }
214 } else {
215 log::warn!("unhandled message: {}", type_name);
216 }
217 } else {
218 log::info!("rpc connection closed {:?}", addr);
219 break;
220 }
221 }
222 }
223 }
224
225 if let Err(err) = this.sign_out(connection_id).await {
226 log::error!("error signing out connection {:?} - {:?}", addr, err);
227 }
228 }
229 }
230
231 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
232 self.peer.disconnect(connection_id);
233 let removed_connection = self.state_mut().remove_connection(connection_id)?;
234
235 for (project_id, project) in removed_connection.hosted_projects {
236 if let Some(share) = project.share {
237 broadcast(
238 connection_id,
239 share.guests.keys().copied().collect(),
240 |conn_id| {
241 self.peer
242 .send(conn_id, proto::UnshareProject { project_id })
243 },
244 )?;
245 }
246 }
247
248 for (project_id, peer_ids) in removed_connection.guest_project_ids {
249 broadcast(connection_id, peer_ids, |conn_id| {
250 self.peer.send(
251 conn_id,
252 proto::RemoveProjectCollaborator {
253 project_id,
254 peer_id: connection_id.0,
255 },
256 )
257 })?;
258 }
259
260 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
261 Ok(())
262 }
263
264 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
265 Ok(proto::Ack {})
266 }
267
268 async fn register_project(
269 mut self: Arc<Server>,
270 request: TypedEnvelope<proto::RegisterProject>,
271 ) -> tide::Result<proto::RegisterProjectResponse> {
272 let project_id = {
273 let mut state = self.state_mut();
274 let user_id = state.user_id_for_connection(request.sender_id)?;
275 state.register_project(request.sender_id, user_id)
276 };
277 Ok(proto::RegisterProjectResponse { project_id })
278 }
279
280 async fn unregister_project(
281 mut self: Arc<Server>,
282 request: TypedEnvelope<proto::UnregisterProject>,
283 ) -> tide::Result<()> {
284 let project = self
285 .state_mut()
286 .unregister_project(request.payload.project_id, request.sender_id)?;
287 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
288 Ok(())
289 }
290
291 async fn share_project(
292 mut self: Arc<Server>,
293 request: TypedEnvelope<proto::ShareProject>,
294 ) -> tide::Result<proto::Ack> {
295 self.state_mut()
296 .share_project(request.payload.project_id, request.sender_id);
297 Ok(proto::Ack {})
298 }
299
300 async fn unshare_project(
301 mut self: Arc<Server>,
302 request: TypedEnvelope<proto::UnshareProject>,
303 ) -> tide::Result<()> {
304 let project_id = request.payload.project_id;
305 let project = self
306 .state_mut()
307 .unshare_project(project_id, request.sender_id)?;
308
309 broadcast(request.sender_id, project.connection_ids, |conn_id| {
310 self.peer
311 .send(conn_id, proto::UnshareProject { project_id })
312 })?;
313 self.update_contacts_for_users(&project.authorized_user_ids)?;
314 Ok(())
315 }
316
317 async fn join_project(
318 mut self: Arc<Server>,
319 request: TypedEnvelope<proto::JoinProject>,
320 ) -> tide::Result<proto::JoinProjectResponse> {
321 let project_id = request.payload.project_id;
322
323 let user_id = self.state().user_id_for_connection(request.sender_id)?;
324 let (response, connection_ids, contact_user_ids) = self
325 .state_mut()
326 .join_project(request.sender_id, user_id, project_id)
327 .and_then(|joined| {
328 let share = joined.project.share()?;
329 let peer_count = share.guests.len();
330 let mut collaborators = Vec::with_capacity(peer_count);
331 collaborators.push(proto::Collaborator {
332 peer_id: joined.project.host_connection_id.0,
333 replica_id: 0,
334 user_id: joined.project.host_user_id.to_proto(),
335 });
336 let worktrees = joined
337 .project
338 .worktrees
339 .iter()
340 .filter_map(|(id, worktree)| {
341 worktree.share.as_ref().map(|share| proto::Worktree {
342 id: *id,
343 root_name: worktree.root_name.clone(),
344 entries: share.entries.values().cloned().collect(),
345 diagnostic_summaries: share
346 .diagnostic_summaries
347 .values()
348 .cloned()
349 .collect(),
350 weak: worktree.weak,
351 next_update_id: share.next_update_id as u64,
352 })
353 })
354 .collect();
355 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
356 if *peer_conn_id != request.sender_id {
357 collaborators.push(proto::Collaborator {
358 peer_id: peer_conn_id.0,
359 replica_id: *peer_replica_id as u32,
360 user_id: peer_user_id.to_proto(),
361 });
362 }
363 }
364 let response = proto::JoinProjectResponse {
365 worktrees,
366 replica_id: joined.replica_id as u32,
367 collaborators,
368 };
369 let connection_ids = joined.project.connection_ids();
370 let contact_user_ids = joined.project.authorized_user_ids();
371 Ok((response, connection_ids, contact_user_ids))
372 })?;
373
374 broadcast(request.sender_id, connection_ids, |conn_id| {
375 self.peer.send(
376 conn_id,
377 proto::AddProjectCollaborator {
378 project_id,
379 collaborator: Some(proto::Collaborator {
380 peer_id: request.sender_id.0,
381 replica_id: response.replica_id,
382 user_id: user_id.to_proto(),
383 }),
384 },
385 )
386 })?;
387 self.update_contacts_for_users(&contact_user_ids)?;
388 Ok(response)
389 }
390
391 async fn leave_project(
392 mut self: Arc<Server>,
393 request: TypedEnvelope<proto::LeaveProject>,
394 ) -> tide::Result<()> {
395 let sender_id = request.sender_id;
396 let project_id = request.payload.project_id;
397 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
398
399 broadcast(sender_id, worktree.connection_ids, |conn_id| {
400 self.peer.send(
401 conn_id,
402 proto::RemoveProjectCollaborator {
403 project_id,
404 peer_id: sender_id.0,
405 },
406 )
407 })?;
408 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
409
410 Ok(())
411 }
412
413 async fn register_worktree(
414 mut self: Arc<Server>,
415 request: TypedEnvelope<proto::RegisterWorktree>,
416 ) -> tide::Result<proto::Ack> {
417 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
418
419 let mut contact_user_ids = HashSet::default();
420 contact_user_ids.insert(host_user_id);
421 for github_login in request.payload.authorized_logins {
422 let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
423 contact_user_ids.insert(contact_user_id);
424 }
425
426 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
427 self.state_mut().register_worktree(
428 request.payload.project_id,
429 request.payload.worktree_id,
430 request.sender_id,
431 Worktree {
432 authorized_user_ids: contact_user_ids.clone(),
433 root_name: request.payload.root_name,
434 share: None,
435 weak: false,
436 },
437 )?;
438 self.update_contacts_for_users(&contact_user_ids)?;
439 Ok(proto::Ack {})
440 }
441
442 async fn unregister_worktree(
443 mut self: Arc<Server>,
444 request: TypedEnvelope<proto::UnregisterWorktree>,
445 ) -> tide::Result<()> {
446 let project_id = request.payload.project_id;
447 let worktree_id = request.payload.worktree_id;
448 let (worktree, guest_connection_ids) =
449 self.state_mut()
450 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
451 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
452 self.peer.send(
453 conn_id,
454 proto::UnregisterWorktree {
455 project_id,
456 worktree_id,
457 },
458 )
459 })?;
460 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
461 Ok(())
462 }
463
464 async fn share_worktree(
465 mut self: Arc<Server>,
466 mut request: TypedEnvelope<proto::ShareWorktree>,
467 ) -> tide::Result<proto::Ack> {
468 let worktree = request
469 .payload
470 .worktree
471 .as_mut()
472 .ok_or_else(|| anyhow!("missing worktree"))?;
473 let entries = worktree
474 .entries
475 .iter()
476 .map(|entry| (entry.id, entry.clone()))
477 .collect();
478 let diagnostic_summaries = worktree
479 .diagnostic_summaries
480 .iter()
481 .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
482 .collect();
483
484 let shared_worktree = self.state_mut().share_worktree(
485 request.payload.project_id,
486 worktree.id,
487 request.sender_id,
488 entries,
489 diagnostic_summaries,
490 worktree.next_update_id,
491 )?;
492
493 broadcast(
494 request.sender_id,
495 shared_worktree.connection_ids,
496 |connection_id| {
497 self.peer
498 .forward_send(request.sender_id, connection_id, request.payload.clone())
499 },
500 )?;
501 self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
502
503 Ok(proto::Ack {})
504 }
505
506 async fn update_worktree(
507 mut self: Arc<Server>,
508 request: TypedEnvelope<proto::UpdateWorktree>,
509 ) -> tide::Result<proto::Ack> {
510 let connection_ids = self.state_mut().update_worktree(
511 request.sender_id,
512 request.payload.project_id,
513 request.payload.worktree_id,
514 request.payload.id,
515 &request.payload.removed_entries,
516 &request.payload.updated_entries,
517 )?;
518
519 broadcast(request.sender_id, connection_ids, |connection_id| {
520 self.peer
521 .forward_send(request.sender_id, connection_id, request.payload.clone())
522 })?;
523
524 Ok(proto::Ack {})
525 }
526
527 async fn update_diagnostic_summary(
528 mut self: Arc<Server>,
529 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
530 ) -> tide::Result<()> {
531 let summary = request
532 .payload
533 .summary
534 .clone()
535 .ok_or_else(|| anyhow!("invalid summary"))?;
536 let receiver_ids = self.state_mut().update_diagnostic_summary(
537 request.payload.project_id,
538 request.payload.worktree_id,
539 request.sender_id,
540 summary,
541 )?;
542
543 broadcast(request.sender_id, receiver_ids, |connection_id| {
544 self.peer
545 .forward_send(request.sender_id, connection_id, request.payload.clone())
546 })?;
547 Ok(())
548 }
549
550 async fn disk_based_diagnostics_updating(
551 self: Arc<Server>,
552 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
553 ) -> tide::Result<()> {
554 let receiver_ids = self
555 .state()
556 .project_connection_ids(request.payload.project_id, request.sender_id)?;
557 broadcast(request.sender_id, receiver_ids, |connection_id| {
558 self.peer
559 .forward_send(request.sender_id, connection_id, request.payload.clone())
560 })?;
561 Ok(())
562 }
563
564 async fn disk_based_diagnostics_updated(
565 self: Arc<Server>,
566 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
567 ) -> tide::Result<()> {
568 let receiver_ids = self
569 .state()
570 .project_connection_ids(request.payload.project_id, request.sender_id)?;
571 broadcast(request.sender_id, receiver_ids, |connection_id| {
572 self.peer
573 .forward_send(request.sender_id, connection_id, request.payload.clone())
574 })?;
575 Ok(())
576 }
577
578 async fn get_definition(
579 self: Arc<Server>,
580 request: TypedEnvelope<proto::GetDefinition>,
581 ) -> tide::Result<proto::GetDefinitionResponse> {
582 let host_connection_id = self
583 .state()
584 .read_project(request.payload.project_id, request.sender_id)?
585 .host_connection_id;
586 Ok(self
587 .peer
588 .forward_request(request.sender_id, host_connection_id, request.payload)
589 .await?)
590 }
591
592 async fn get_project_symbols(
593 self: Arc<Server>,
594 request: TypedEnvelope<proto::GetProjectSymbols>,
595 ) -> tide::Result<proto::GetProjectSymbolsResponse> {
596 let host_connection_id = self
597 .state()
598 .read_project(request.payload.project_id, request.sender_id)?
599 .host_connection_id;
600 Ok(self
601 .peer
602 .forward_request(request.sender_id, host_connection_id, request.payload)
603 .await?)
604 }
605
606 async fn open_buffer_for_symbol(
607 self: Arc<Server>,
608 request: TypedEnvelope<proto::OpenBufferForSymbol>,
609 ) -> tide::Result<proto::OpenBufferForSymbolResponse> {
610 let host_connection_id = self
611 .state()
612 .read_project(request.payload.project_id, request.sender_id)?
613 .host_connection_id;
614 Ok(self
615 .peer
616 .forward_request(request.sender_id, host_connection_id, request.payload)
617 .await?)
618 }
619
620 async fn open_buffer(
621 self: Arc<Server>,
622 request: TypedEnvelope<proto::OpenBuffer>,
623 ) -> tide::Result<proto::OpenBufferResponse> {
624 let host_connection_id = self
625 .state()
626 .read_project(request.payload.project_id, request.sender_id)?
627 .host_connection_id;
628 Ok(self
629 .peer
630 .forward_request(request.sender_id, host_connection_id, request.payload)
631 .await?)
632 }
633
634 async fn close_buffer(
635 self: Arc<Server>,
636 request: TypedEnvelope<proto::CloseBuffer>,
637 ) -> tide::Result<()> {
638 let host_connection_id = self
639 .state()
640 .read_project(request.payload.project_id, request.sender_id)?
641 .host_connection_id;
642 self.peer
643 .forward_send(request.sender_id, host_connection_id, request.payload)?;
644 Ok(())
645 }
646
647 async fn save_buffer(
648 self: Arc<Server>,
649 request: TypedEnvelope<proto::SaveBuffer>,
650 ) -> tide::Result<proto::BufferSaved> {
651 let host;
652 let mut guests;
653 {
654 let state = self.state();
655 let project = state.read_project(request.payload.project_id, request.sender_id)?;
656 host = project.host_connection_id;
657 guests = project.guest_connection_ids()
658 }
659
660 let response = self
661 .peer
662 .forward_request(request.sender_id, host, request.payload.clone())
663 .await?;
664
665 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
666 broadcast(host, guests, |conn_id| {
667 self.peer.forward_send(host, conn_id, response.clone())
668 })?;
669
670 Ok(response)
671 }
672
673 async fn format_buffers(
674 self: Arc<Server>,
675 request: TypedEnvelope<proto::FormatBuffers>,
676 ) -> tide::Result<proto::FormatBuffersResponse> {
677 let host = self
678 .state()
679 .read_project(request.payload.project_id, request.sender_id)?
680 .host_connection_id;
681 Ok(self
682 .peer
683 .forward_request(request.sender_id, host, request.payload.clone())
684 .await?)
685 }
686
687 async fn get_completions(
688 self: Arc<Server>,
689 request: TypedEnvelope<proto::GetCompletions>,
690 ) -> tide::Result<proto::GetCompletionsResponse> {
691 let host = self
692 .state()
693 .read_project(request.payload.project_id, request.sender_id)?
694 .host_connection_id;
695 Ok(self
696 .peer
697 .forward_request(request.sender_id, host, request.payload.clone())
698 .await?)
699 }
700
701 async fn apply_additional_edits_for_completion(
702 self: Arc<Server>,
703 request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
704 ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
705 let host = self
706 .state()
707 .read_project(request.payload.project_id, request.sender_id)?
708 .host_connection_id;
709 Ok(self
710 .peer
711 .forward_request(request.sender_id, host, request.payload.clone())
712 .await?)
713 }
714
715 async fn get_code_actions(
716 self: Arc<Server>,
717 request: TypedEnvelope<proto::GetCodeActions>,
718 ) -> tide::Result<proto::GetCodeActionsResponse> {
719 let host = self
720 .state()
721 .read_project(request.payload.project_id, request.sender_id)?
722 .host_connection_id;
723 Ok(self
724 .peer
725 .forward_request(request.sender_id, host, request.payload.clone())
726 .await?)
727 }
728
729 async fn apply_code_action(
730 self: Arc<Server>,
731 request: TypedEnvelope<proto::ApplyCodeAction>,
732 ) -> tide::Result<proto::ApplyCodeActionResponse> {
733 let host = self
734 .state()
735 .read_project(request.payload.project_id, request.sender_id)?
736 .host_connection_id;
737 Ok(self
738 .peer
739 .forward_request(request.sender_id, host, request.payload.clone())
740 .await?)
741 }
742
743 async fn prepare_rename(
744 self: Arc<Server>,
745 request: TypedEnvelope<proto::PrepareRename>,
746 ) -> tide::Result<proto::PrepareRenameResponse> {
747 let host = self
748 .state()
749 .read_project(request.payload.project_id, request.sender_id)?
750 .host_connection_id;
751 Ok(self
752 .peer
753 .forward_request(request.sender_id, host, request.payload.clone())
754 .await?)
755 }
756
757 async fn perform_rename(
758 self: Arc<Server>,
759 request: TypedEnvelope<proto::PerformRename>,
760 ) -> tide::Result<proto::PerformRenameResponse> {
761 let host = self
762 .state()
763 .read_project(request.payload.project_id, request.sender_id)?
764 .host_connection_id;
765 Ok(self
766 .peer
767 .forward_request(request.sender_id, host, request.payload.clone())
768 .await?)
769 }
770
771 async fn update_buffer(
772 self: Arc<Server>,
773 request: TypedEnvelope<proto::UpdateBuffer>,
774 ) -> tide::Result<proto::Ack> {
775 let receiver_ids = self
776 .state()
777 .project_connection_ids(request.payload.project_id, request.sender_id)?;
778 broadcast(request.sender_id, receiver_ids, |connection_id| {
779 self.peer
780 .forward_send(request.sender_id, connection_id, request.payload.clone())
781 })?;
782 Ok(proto::Ack {})
783 }
784
785 async fn update_buffer_file(
786 self: Arc<Server>,
787 request: TypedEnvelope<proto::UpdateBufferFile>,
788 ) -> tide::Result<()> {
789 let receiver_ids = self
790 .state()
791 .project_connection_ids(request.payload.project_id, request.sender_id)?;
792 broadcast(request.sender_id, receiver_ids, |connection_id| {
793 self.peer
794 .forward_send(request.sender_id, connection_id, request.payload.clone())
795 })?;
796 Ok(())
797 }
798
799 async fn buffer_reloaded(
800 self: Arc<Server>,
801 request: TypedEnvelope<proto::BufferReloaded>,
802 ) -> tide::Result<()> {
803 let receiver_ids = self
804 .state()
805 .project_connection_ids(request.payload.project_id, request.sender_id)?;
806 broadcast(request.sender_id, receiver_ids, |connection_id| {
807 self.peer
808 .forward_send(request.sender_id, connection_id, request.payload.clone())
809 })?;
810 Ok(())
811 }
812
813 async fn buffer_saved(
814 self: Arc<Server>,
815 request: TypedEnvelope<proto::BufferSaved>,
816 ) -> tide::Result<()> {
817 let receiver_ids = self
818 .state()
819 .project_connection_ids(request.payload.project_id, request.sender_id)?;
820 broadcast(request.sender_id, receiver_ids, |connection_id| {
821 self.peer
822 .forward_send(request.sender_id, connection_id, request.payload.clone())
823 })?;
824 Ok(())
825 }
826
827 async fn get_channels(
828 self: Arc<Server>,
829 request: TypedEnvelope<proto::GetChannels>,
830 ) -> tide::Result<proto::GetChannelsResponse> {
831 let user_id = self.state().user_id_for_connection(request.sender_id)?;
832 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
833 Ok(proto::GetChannelsResponse {
834 channels: channels
835 .into_iter()
836 .map(|chan| proto::Channel {
837 id: chan.id.to_proto(),
838 name: chan.name,
839 })
840 .collect(),
841 })
842 }
843
844 async fn get_users(
845 self: Arc<Server>,
846 request: TypedEnvelope<proto::GetUsers>,
847 ) -> tide::Result<proto::GetUsersResponse> {
848 let user_ids = request
849 .payload
850 .user_ids
851 .into_iter()
852 .map(UserId::from_proto)
853 .collect();
854 let users = self
855 .app_state
856 .db
857 .get_users_by_ids(user_ids)
858 .await?
859 .into_iter()
860 .map(|user| proto::User {
861 id: user.id.to_proto(),
862 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
863 github_login: user.github_login,
864 })
865 .collect();
866 Ok(proto::GetUsersResponse { users })
867 }
868
869 fn update_contacts_for_users<'a>(
870 self: &Arc<Server>,
871 user_ids: impl IntoIterator<Item = &'a UserId>,
872 ) -> anyhow::Result<()> {
873 let mut result = Ok(());
874 let state = self.state();
875 for user_id in user_ids {
876 let contacts = state.contacts_for_user(*user_id);
877 for connection_id in state.connection_ids_for_user(*user_id) {
878 if let Err(error) = self.peer.send(
879 connection_id,
880 proto::UpdateContacts {
881 contacts: contacts.clone(),
882 },
883 ) {
884 result = Err(error);
885 }
886 }
887 }
888 result
889 }
890
891 async fn join_channel(
892 mut self: Arc<Self>,
893 request: TypedEnvelope<proto::JoinChannel>,
894 ) -> tide::Result<proto::JoinChannelResponse> {
895 let user_id = self.state().user_id_for_connection(request.sender_id)?;
896 let channel_id = ChannelId::from_proto(request.payload.channel_id);
897 if !self
898 .app_state
899 .db
900 .can_user_access_channel(user_id, channel_id)
901 .await?
902 {
903 Err(anyhow!("access denied"))?;
904 }
905
906 self.state_mut().join_channel(request.sender_id, channel_id);
907 let messages = self
908 .app_state
909 .db
910 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
911 .await?
912 .into_iter()
913 .map(|msg| proto::ChannelMessage {
914 id: msg.id.to_proto(),
915 body: msg.body,
916 timestamp: msg.sent_at.unix_timestamp() as u64,
917 sender_id: msg.sender_id.to_proto(),
918 nonce: Some(msg.nonce.as_u128().into()),
919 })
920 .collect::<Vec<_>>();
921 Ok(proto::JoinChannelResponse {
922 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
923 messages,
924 })
925 }
926
927 async fn leave_channel(
928 mut self: Arc<Self>,
929 request: TypedEnvelope<proto::LeaveChannel>,
930 ) -> tide::Result<()> {
931 let user_id = self.state().user_id_for_connection(request.sender_id)?;
932 let channel_id = ChannelId::from_proto(request.payload.channel_id);
933 if !self
934 .app_state
935 .db
936 .can_user_access_channel(user_id, channel_id)
937 .await?
938 {
939 Err(anyhow!("access denied"))?;
940 }
941
942 self.state_mut()
943 .leave_channel(request.sender_id, channel_id);
944
945 Ok(())
946 }
947
948 async fn send_channel_message(
949 self: Arc<Self>,
950 request: TypedEnvelope<proto::SendChannelMessage>,
951 ) -> tide::Result<proto::SendChannelMessageResponse> {
952 let channel_id = ChannelId::from_proto(request.payload.channel_id);
953 let user_id;
954 let connection_ids;
955 {
956 let state = self.state();
957 user_id = state.user_id_for_connection(request.sender_id)?;
958 connection_ids = state.channel_connection_ids(channel_id)?;
959 }
960
961 // Validate the message body.
962 let body = request.payload.body.trim().to_string();
963 if body.len() > MAX_MESSAGE_LEN {
964 return Err(anyhow!("message is too long"))?;
965 }
966 if body.is_empty() {
967 return Err(anyhow!("message can't be blank"))?;
968 }
969
970 let timestamp = OffsetDateTime::now_utc();
971 let nonce = request
972 .payload
973 .nonce
974 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
975
976 let message_id = self
977 .app_state
978 .db
979 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
980 .await?
981 .to_proto();
982 let message = proto::ChannelMessage {
983 sender_id: user_id.to_proto(),
984 id: message_id,
985 body,
986 timestamp: timestamp.unix_timestamp() as u64,
987 nonce: Some(nonce),
988 };
989 broadcast(request.sender_id, connection_ids, |conn_id| {
990 self.peer.send(
991 conn_id,
992 proto::ChannelMessageSent {
993 channel_id: channel_id.to_proto(),
994 message: Some(message.clone()),
995 },
996 )
997 })?;
998 Ok(proto::SendChannelMessageResponse {
999 message: Some(message),
1000 })
1001 }
1002
1003 async fn get_channel_messages(
1004 self: Arc<Self>,
1005 request: TypedEnvelope<proto::GetChannelMessages>,
1006 ) -> tide::Result<proto::GetChannelMessagesResponse> {
1007 let user_id = self.state().user_id_for_connection(request.sender_id)?;
1008 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1009 if !self
1010 .app_state
1011 .db
1012 .can_user_access_channel(user_id, channel_id)
1013 .await?
1014 {
1015 Err(anyhow!("access denied"))?;
1016 }
1017
1018 let messages = self
1019 .app_state
1020 .db
1021 .get_channel_messages(
1022 channel_id,
1023 MESSAGE_COUNT_PER_PAGE,
1024 Some(MessageId::from_proto(request.payload.before_message_id)),
1025 )
1026 .await?
1027 .into_iter()
1028 .map(|msg| proto::ChannelMessage {
1029 id: msg.id.to_proto(),
1030 body: msg.body,
1031 timestamp: msg.sent_at.unix_timestamp() as u64,
1032 sender_id: msg.sender_id.to_proto(),
1033 nonce: Some(msg.nonce.as_u128().into()),
1034 })
1035 .collect::<Vec<_>>();
1036
1037 Ok(proto::GetChannelMessagesResponse {
1038 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1039 messages,
1040 })
1041 }
1042
1043 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1044 self.store.read()
1045 }
1046
1047 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1048 self.store.write()
1049 }
1050}
1051
1052impl Executor for RealExecutor {
1053 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1054 task::spawn(future);
1055 }
1056}
1057
1058fn broadcast<F>(
1059 sender_id: ConnectionId,
1060 receiver_ids: Vec<ConnectionId>,
1061 mut f: F,
1062) -> anyhow::Result<()>
1063where
1064 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1065{
1066 let mut result = Ok(());
1067 for receiver_id in receiver_ids {
1068 if receiver_id != sender_id {
1069 if let Err(error) = f(receiver_id) {
1070 if result.is_ok() {
1071 result = Err(error);
1072 }
1073 }
1074 }
1075 }
1076 result
1077}
1078
1079pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1080 let server = Server::new(app.state().clone(), rpc.clone(), None);
1081 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1082 let server = server.clone();
1083 async move {
1084 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1085
1086 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1087 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1088 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1089 let client_protocol_version: Option<u32> = request
1090 .header("X-Zed-Protocol-Version")
1091 .and_then(|v| v.as_str().parse().ok());
1092
1093 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1094 return Ok(Response::new(StatusCode::UpgradeRequired));
1095 }
1096
1097 let header = match request.header("Sec-Websocket-Key") {
1098 Some(h) => h.as_str(),
1099 None => return Err(anyhow!("expected sec-websocket-key"))?,
1100 };
1101
1102 let user_id = process_auth_header(&request).await?;
1103
1104 let mut response = Response::new(StatusCode::SwitchingProtocols);
1105 response.insert_header(UPGRADE, "websocket");
1106 response.insert_header(CONNECTION, "Upgrade");
1107 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1108 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1109 response.insert_header("Sec-Websocket-Version", "13");
1110
1111 let http_res: &mut tide::http::Response = response.as_mut();
1112 let upgrade_receiver = http_res.recv_upgrade().await;
1113 let addr = request.remote().unwrap_or("unknown").to_string();
1114 task::spawn(async move {
1115 if let Some(stream) = upgrade_receiver.await {
1116 server
1117 .handle_connection(
1118 Connection::new(
1119 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1120 ),
1121 addr,
1122 user_id,
1123 None,
1124 RealExecutor,
1125 )
1126 .await;
1127 }
1128 });
1129
1130 Ok(response)
1131 }
1132 });
1133}
1134
1135fn header_contains_ignore_case<T>(
1136 request: &tide::Request<T>,
1137 header_name: HeaderName,
1138 value: &str,
1139) -> bool {
1140 request
1141 .header(header_name)
1142 .map(|h| {
1143 h.as_str()
1144 .split(',')
1145 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1146 })
1147 .unwrap_or(false)
1148}
1149
1150#[cfg(test)]
1151mod tests {
1152 use super::*;
1153 use crate::{
1154 auth,
1155 db::{tests::TestDb, UserId},
1156 github, AppState, Config,
1157 };
1158 use ::rpc::Peer;
1159 use collections::BTreeMap;
1160 use gpui::{executor, ModelHandle, TestAppContext};
1161 use parking_lot::Mutex;
1162 use postage::{sink::Sink, watch};
1163 use rand::prelude::*;
1164 use rpc::PeerId;
1165 use serde_json::json;
1166 use sqlx::types::time::OffsetDateTime;
1167 use std::{
1168 cell::Cell,
1169 env,
1170 ops::Deref,
1171 path::Path,
1172 rc::Rc,
1173 sync::{
1174 atomic::{AtomicBool, Ordering::SeqCst},
1175 Arc,
1176 },
1177 time::Duration,
1178 };
1179 use zed::{
1180 client::{
1181 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1182 EstablishConnectionError, UserStore,
1183 },
1184 editor::{
1185 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, EditorSettings,
1186 Input, MultiBuffer, Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1187 },
1188 fs::{FakeFs, Fs as _},
1189 language::{
1190 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1191 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1192 },
1193 lsp,
1194 project::{DiagnosticSummary, Project, ProjectPath},
1195 workspace::{Workspace, WorkspaceParams},
1196 };
1197
1198 #[cfg(test)]
1199 #[ctor::ctor]
1200 fn init_logger() {
1201 if std::env::var("RUST_LOG").is_ok() {
1202 env_logger::init();
1203 }
1204 }
1205
1206 #[gpui::test(iterations = 10)]
1207 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1208 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1209 let lang_registry = Arc::new(LanguageRegistry::new());
1210 let fs = FakeFs::new(cx_a.background());
1211 cx_a.foreground().forbid_parking();
1212
1213 // Connect to a server as 2 clients.
1214 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1215 let client_a = server.create_client(&mut cx_a, "user_a").await;
1216 let client_b = server.create_client(&mut cx_b, "user_b").await;
1217
1218 // Share a project as client A
1219 fs.insert_tree(
1220 "/a",
1221 json!({
1222 ".zed.toml": r#"collaborators = ["user_b"]"#,
1223 "a.txt": "a-contents",
1224 "b.txt": "b-contents",
1225 }),
1226 )
1227 .await;
1228 let project_a = cx_a.update(|cx| {
1229 Project::local(
1230 client_a.clone(),
1231 client_a.user_store.clone(),
1232 lang_registry.clone(),
1233 fs.clone(),
1234 cx,
1235 )
1236 });
1237 let (worktree_a, _) = project_a
1238 .update(&mut cx_a, |p, cx| {
1239 p.find_or_create_local_worktree("/a", false, cx)
1240 })
1241 .await
1242 .unwrap();
1243 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1244 worktree_a
1245 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1246 .await;
1247 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1248 project_a
1249 .update(&mut cx_a, |p, cx| p.share(cx))
1250 .await
1251 .unwrap();
1252
1253 // Join that project as client B
1254 let project_b = Project::remote(
1255 project_id,
1256 client_b.clone(),
1257 client_b.user_store.clone(),
1258 lang_registry.clone(),
1259 fs.clone(),
1260 &mut cx_b.to_async(),
1261 )
1262 .await
1263 .unwrap();
1264
1265 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1266 assert_eq!(
1267 project
1268 .collaborators()
1269 .get(&client_a.peer_id)
1270 .unwrap()
1271 .user
1272 .github_login,
1273 "user_a"
1274 );
1275 project.replica_id()
1276 });
1277 project_a
1278 .condition(&cx_a, |tree, _| {
1279 tree.collaborators()
1280 .get(&client_b.peer_id)
1281 .map_or(false, |collaborator| {
1282 collaborator.replica_id == replica_id_b
1283 && collaborator.user.github_login == "user_b"
1284 })
1285 })
1286 .await;
1287
1288 // Open the same file as client B and client A.
1289 let buffer_b = project_b
1290 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1291 .await
1292 .unwrap();
1293 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1294 buffer_b.read_with(&cx_b, |buf, cx| {
1295 assert_eq!(buf.read(cx).text(), "b-contents")
1296 });
1297 project_a.read_with(&cx_a, |project, cx| {
1298 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1299 });
1300 let buffer_a = project_a
1301 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1302 .await
1303 .unwrap();
1304
1305 let editor_b = cx_b.add_view(window_b, |cx| {
1306 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1307 });
1308
1309 // TODO
1310 // // Create a selection set as client B and see that selection set as client A.
1311 // buffer_a
1312 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1313 // .await;
1314
1315 // Edit the buffer as client B and see that edit as client A.
1316 editor_b.update(&mut cx_b, |editor, cx| {
1317 editor.handle_input(&Input("ok, ".into()), cx)
1318 });
1319 buffer_a
1320 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1321 .await;
1322
1323 // TODO
1324 // // Remove the selection set as client B, see those selections disappear as client A.
1325 cx_b.update(move |_| drop(editor_b));
1326 // buffer_a
1327 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1328 // .await;
1329
1330 // Close the buffer as client A, see that the buffer is closed.
1331 cx_a.update(move |_| drop(buffer_a));
1332 project_a
1333 .condition(&cx_a, |project, cx| {
1334 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1335 })
1336 .await;
1337
1338 // Dropping the client B's project removes client B from client A's collaborators.
1339 cx_b.update(move |_| drop(project_b));
1340 project_a
1341 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1342 .await;
1343 }
1344
1345 #[gpui::test(iterations = 10)]
1346 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1347 let lang_registry = Arc::new(LanguageRegistry::new());
1348 let fs = FakeFs::new(cx_a.background());
1349 cx_a.foreground().forbid_parking();
1350
1351 // Connect to a server as 2 clients.
1352 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1353 let client_a = server.create_client(&mut cx_a, "user_a").await;
1354 let client_b = server.create_client(&mut cx_b, "user_b").await;
1355
1356 // Share a project as client A
1357 fs.insert_tree(
1358 "/a",
1359 json!({
1360 ".zed.toml": r#"collaborators = ["user_b"]"#,
1361 "a.txt": "a-contents",
1362 "b.txt": "b-contents",
1363 }),
1364 )
1365 .await;
1366 let project_a = cx_a.update(|cx| {
1367 Project::local(
1368 client_a.clone(),
1369 client_a.user_store.clone(),
1370 lang_registry.clone(),
1371 fs.clone(),
1372 cx,
1373 )
1374 });
1375 let (worktree_a, _) = project_a
1376 .update(&mut cx_a, |p, cx| {
1377 p.find_or_create_local_worktree("/a", false, cx)
1378 })
1379 .await
1380 .unwrap();
1381 worktree_a
1382 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1383 .await;
1384 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1385 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1386 project_a
1387 .update(&mut cx_a, |p, cx| p.share(cx))
1388 .await
1389 .unwrap();
1390 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1391
1392 // Join that project as client B
1393 let project_b = Project::remote(
1394 project_id,
1395 client_b.clone(),
1396 client_b.user_store.clone(),
1397 lang_registry.clone(),
1398 fs.clone(),
1399 &mut cx_b.to_async(),
1400 )
1401 .await
1402 .unwrap();
1403 project_b
1404 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1405 .await
1406 .unwrap();
1407
1408 // Unshare the project as client A
1409 project_a
1410 .update(&mut cx_a, |project, cx| project.unshare(cx))
1411 .await
1412 .unwrap();
1413 project_b
1414 .condition(&mut cx_b, |project, _| project.is_read_only())
1415 .await;
1416 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1417 drop(project_b);
1418
1419 // Share the project again and ensure guests can still join.
1420 project_a
1421 .update(&mut cx_a, |project, cx| project.share(cx))
1422 .await
1423 .unwrap();
1424 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1425
1426 let project_c = Project::remote(
1427 project_id,
1428 client_b.clone(),
1429 client_b.user_store.clone(),
1430 lang_registry.clone(),
1431 fs.clone(),
1432 &mut cx_b.to_async(),
1433 )
1434 .await
1435 .unwrap();
1436 project_c
1437 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1438 .await
1439 .unwrap();
1440 }
1441
1442 #[gpui::test(iterations = 10)]
1443 async fn test_propagate_saves_and_fs_changes(
1444 mut cx_a: TestAppContext,
1445 mut cx_b: TestAppContext,
1446 mut cx_c: TestAppContext,
1447 ) {
1448 let lang_registry = Arc::new(LanguageRegistry::new());
1449 let fs = FakeFs::new(cx_a.background());
1450 cx_a.foreground().forbid_parking();
1451
1452 // Connect to a server as 3 clients.
1453 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1454 let client_a = server.create_client(&mut cx_a, "user_a").await;
1455 let client_b = server.create_client(&mut cx_b, "user_b").await;
1456 let client_c = server.create_client(&mut cx_c, "user_c").await;
1457
1458 // Share a worktree as client A.
1459 fs.insert_tree(
1460 "/a",
1461 json!({
1462 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1463 "file1": "",
1464 "file2": ""
1465 }),
1466 )
1467 .await;
1468 let project_a = cx_a.update(|cx| {
1469 Project::local(
1470 client_a.clone(),
1471 client_a.user_store.clone(),
1472 lang_registry.clone(),
1473 fs.clone(),
1474 cx,
1475 )
1476 });
1477 let (worktree_a, _) = project_a
1478 .update(&mut cx_a, |p, cx| {
1479 p.find_or_create_local_worktree("/a", false, cx)
1480 })
1481 .await
1482 .unwrap();
1483 worktree_a
1484 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1485 .await;
1486 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1487 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1488 project_a
1489 .update(&mut cx_a, |p, cx| p.share(cx))
1490 .await
1491 .unwrap();
1492
1493 // Join that worktree as clients B and C.
1494 let project_b = Project::remote(
1495 project_id,
1496 client_b.clone(),
1497 client_b.user_store.clone(),
1498 lang_registry.clone(),
1499 fs.clone(),
1500 &mut cx_b.to_async(),
1501 )
1502 .await
1503 .unwrap();
1504 let project_c = Project::remote(
1505 project_id,
1506 client_c.clone(),
1507 client_c.user_store.clone(),
1508 lang_registry.clone(),
1509 fs.clone(),
1510 &mut cx_c.to_async(),
1511 )
1512 .await
1513 .unwrap();
1514 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1515 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1516
1517 // Open and edit a buffer as both guests B and C.
1518 let buffer_b = project_b
1519 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1520 .await
1521 .unwrap();
1522 let buffer_c = project_c
1523 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1524 .await
1525 .unwrap();
1526 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1527 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1528
1529 // Open and edit that buffer as the host.
1530 let buffer_a = project_a
1531 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1532 .await
1533 .unwrap();
1534
1535 buffer_a
1536 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1537 .await;
1538 buffer_a.update(&mut cx_a, |buf, cx| {
1539 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1540 });
1541
1542 // Wait for edits to propagate
1543 buffer_a
1544 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1545 .await;
1546 buffer_b
1547 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1548 .await;
1549 buffer_c
1550 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1551 .await;
1552
1553 // Edit the buffer as the host and concurrently save as guest B.
1554 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1555 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1556 save_b.await.unwrap();
1557 assert_eq!(
1558 fs.load("/a/file1".as_ref()).await.unwrap(),
1559 "hi-a, i-am-c, i-am-b, i-am-a"
1560 );
1561 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1562 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1563 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1564
1565 // Make changes on host's file system, see those changes on guest worktrees.
1566 fs.rename(
1567 "/a/file1".as_ref(),
1568 "/a/file1-renamed".as_ref(),
1569 Default::default(),
1570 )
1571 .await
1572 .unwrap();
1573
1574 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1575 .await
1576 .unwrap();
1577 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1578
1579 worktree_a
1580 .condition(&cx_a, |tree, _| {
1581 tree.paths()
1582 .map(|p| p.to_string_lossy())
1583 .collect::<Vec<_>>()
1584 == [".zed.toml", "file1-renamed", "file3", "file4"]
1585 })
1586 .await;
1587 worktree_b
1588 .condition(&cx_b, |tree, _| {
1589 tree.paths()
1590 .map(|p| p.to_string_lossy())
1591 .collect::<Vec<_>>()
1592 == [".zed.toml", "file1-renamed", "file3", "file4"]
1593 })
1594 .await;
1595 worktree_c
1596 .condition(&cx_c, |tree, _| {
1597 tree.paths()
1598 .map(|p| p.to_string_lossy())
1599 .collect::<Vec<_>>()
1600 == [".zed.toml", "file1-renamed", "file3", "file4"]
1601 })
1602 .await;
1603
1604 // Ensure buffer files are updated as well.
1605 buffer_a
1606 .condition(&cx_a, |buf, _| {
1607 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1608 })
1609 .await;
1610 buffer_b
1611 .condition(&cx_b, |buf, _| {
1612 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1613 })
1614 .await;
1615 buffer_c
1616 .condition(&cx_c, |buf, _| {
1617 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1618 })
1619 .await;
1620 }
1621
1622 #[gpui::test(iterations = 10)]
1623 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1624 cx_a.foreground().forbid_parking();
1625 let lang_registry = Arc::new(LanguageRegistry::new());
1626 let fs = FakeFs::new(cx_a.background());
1627
1628 // Connect to a server as 2 clients.
1629 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1630 let client_a = server.create_client(&mut cx_a, "user_a").await;
1631 let client_b = server.create_client(&mut cx_b, "user_b").await;
1632
1633 // Share a project as client A
1634 fs.insert_tree(
1635 "/dir",
1636 json!({
1637 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1638 "a.txt": "a-contents",
1639 }),
1640 )
1641 .await;
1642
1643 let project_a = cx_a.update(|cx| {
1644 Project::local(
1645 client_a.clone(),
1646 client_a.user_store.clone(),
1647 lang_registry.clone(),
1648 fs.clone(),
1649 cx,
1650 )
1651 });
1652 let (worktree_a, _) = project_a
1653 .update(&mut cx_a, |p, cx| {
1654 p.find_or_create_local_worktree("/dir", false, cx)
1655 })
1656 .await
1657 .unwrap();
1658 worktree_a
1659 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1660 .await;
1661 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1662 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1663 project_a
1664 .update(&mut cx_a, |p, cx| p.share(cx))
1665 .await
1666 .unwrap();
1667
1668 // Join that project as client B
1669 let project_b = Project::remote(
1670 project_id,
1671 client_b.clone(),
1672 client_b.user_store.clone(),
1673 lang_registry.clone(),
1674 fs.clone(),
1675 &mut cx_b.to_async(),
1676 )
1677 .await
1678 .unwrap();
1679
1680 // Open a buffer as client B
1681 let buffer_b = project_b
1682 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1683 .await
1684 .unwrap();
1685
1686 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1687 buffer_b.read_with(&cx_b, |buf, _| {
1688 assert!(buf.is_dirty());
1689 assert!(!buf.has_conflict());
1690 });
1691
1692 buffer_b
1693 .update(&mut cx_b, |buf, cx| buf.save(cx))
1694 .await
1695 .unwrap();
1696 buffer_b
1697 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1698 .await;
1699 buffer_b.read_with(&cx_b, |buf, _| {
1700 assert!(!buf.has_conflict());
1701 });
1702
1703 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1704 buffer_b.read_with(&cx_b, |buf, _| {
1705 assert!(buf.is_dirty());
1706 assert!(!buf.has_conflict());
1707 });
1708 }
1709
1710 #[gpui::test(iterations = 10)]
1711 async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1712 cx_a.foreground().forbid_parking();
1713 let lang_registry = Arc::new(LanguageRegistry::new());
1714 let fs = FakeFs::new(cx_a.background());
1715
1716 // Connect to a server as 2 clients.
1717 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1718 let client_a = server.create_client(&mut cx_a, "user_a").await;
1719 let client_b = server.create_client(&mut cx_b, "user_b").await;
1720
1721 // Share a project as client A
1722 fs.insert_tree(
1723 "/dir",
1724 json!({
1725 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1726 "a.txt": "a-contents",
1727 }),
1728 )
1729 .await;
1730
1731 let project_a = cx_a.update(|cx| {
1732 Project::local(
1733 client_a.clone(),
1734 client_a.user_store.clone(),
1735 lang_registry.clone(),
1736 fs.clone(),
1737 cx,
1738 )
1739 });
1740 let (worktree_a, _) = project_a
1741 .update(&mut cx_a, |p, cx| {
1742 p.find_or_create_local_worktree("/dir", false, cx)
1743 })
1744 .await
1745 .unwrap();
1746 worktree_a
1747 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1748 .await;
1749 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1750 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1751 project_a
1752 .update(&mut cx_a, |p, cx| p.share(cx))
1753 .await
1754 .unwrap();
1755
1756 // Join that project as client B
1757 let project_b = Project::remote(
1758 project_id,
1759 client_b.clone(),
1760 client_b.user_store.clone(),
1761 lang_registry.clone(),
1762 fs.clone(),
1763 &mut cx_b.to_async(),
1764 )
1765 .await
1766 .unwrap();
1767 let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1768
1769 // Open a buffer as client B
1770 let buffer_b = project_b
1771 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1772 .await
1773 .unwrap();
1774 buffer_b.read_with(&cx_b, |buf, _| {
1775 assert!(!buf.is_dirty());
1776 assert!(!buf.has_conflict());
1777 });
1778
1779 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1780 .await
1781 .unwrap();
1782 buffer_b
1783 .condition(&cx_b, |buf, _| {
1784 buf.text() == "new contents" && !buf.is_dirty()
1785 })
1786 .await;
1787 buffer_b.read_with(&cx_b, |buf, _| {
1788 assert!(!buf.has_conflict());
1789 });
1790 }
1791
1792 #[gpui::test(iterations = 10)]
1793 async fn test_editing_while_guest_opens_buffer(
1794 mut cx_a: TestAppContext,
1795 mut cx_b: TestAppContext,
1796 ) {
1797 cx_a.foreground().forbid_parking();
1798 let lang_registry = Arc::new(LanguageRegistry::new());
1799 let fs = FakeFs::new(cx_a.background());
1800
1801 // Connect to a server as 2 clients.
1802 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1803 let client_a = server.create_client(&mut cx_a, "user_a").await;
1804 let client_b = server.create_client(&mut cx_b, "user_b").await;
1805
1806 // Share a project as client A
1807 fs.insert_tree(
1808 "/dir",
1809 json!({
1810 ".zed.toml": r#"collaborators = ["user_b"]"#,
1811 "a.txt": "a-contents",
1812 }),
1813 )
1814 .await;
1815 let project_a = cx_a.update(|cx| {
1816 Project::local(
1817 client_a.clone(),
1818 client_a.user_store.clone(),
1819 lang_registry.clone(),
1820 fs.clone(),
1821 cx,
1822 )
1823 });
1824 let (worktree_a, _) = project_a
1825 .update(&mut cx_a, |p, cx| {
1826 p.find_or_create_local_worktree("/dir", false, cx)
1827 })
1828 .await
1829 .unwrap();
1830 worktree_a
1831 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1832 .await;
1833 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1834 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1835 project_a
1836 .update(&mut cx_a, |p, cx| p.share(cx))
1837 .await
1838 .unwrap();
1839
1840 // Join that project as client B
1841 let project_b = Project::remote(
1842 project_id,
1843 client_b.clone(),
1844 client_b.user_store.clone(),
1845 lang_registry.clone(),
1846 fs.clone(),
1847 &mut cx_b.to_async(),
1848 )
1849 .await
1850 .unwrap();
1851
1852 // Open a buffer as client A
1853 let buffer_a = project_a
1854 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1855 .await
1856 .unwrap();
1857
1858 // Start opening the same buffer as client B
1859 let buffer_b = cx_b
1860 .background()
1861 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1862
1863 // Edit the buffer as client A while client B is still opening it.
1864 cx_b.background().simulate_random_delay().await;
1865 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1866 cx_b.background().simulate_random_delay().await;
1867 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1868
1869 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1870 let buffer_b = buffer_b.await.unwrap();
1871 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1872 }
1873
1874 #[gpui::test(iterations = 10)]
1875 async fn test_leaving_worktree_while_opening_buffer(
1876 mut cx_a: TestAppContext,
1877 mut cx_b: TestAppContext,
1878 ) {
1879 cx_a.foreground().forbid_parking();
1880 let lang_registry = Arc::new(LanguageRegistry::new());
1881 let fs = FakeFs::new(cx_a.background());
1882
1883 // Connect to a server as 2 clients.
1884 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1885 let client_a = server.create_client(&mut cx_a, "user_a").await;
1886 let client_b = server.create_client(&mut cx_b, "user_b").await;
1887
1888 // Share a project as client A
1889 fs.insert_tree(
1890 "/dir",
1891 json!({
1892 ".zed.toml": r#"collaborators = ["user_b"]"#,
1893 "a.txt": "a-contents",
1894 }),
1895 )
1896 .await;
1897 let project_a = cx_a.update(|cx| {
1898 Project::local(
1899 client_a.clone(),
1900 client_a.user_store.clone(),
1901 lang_registry.clone(),
1902 fs.clone(),
1903 cx,
1904 )
1905 });
1906 let (worktree_a, _) = project_a
1907 .update(&mut cx_a, |p, cx| {
1908 p.find_or_create_local_worktree("/dir", false, cx)
1909 })
1910 .await
1911 .unwrap();
1912 worktree_a
1913 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1914 .await;
1915 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1916 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1917 project_a
1918 .update(&mut cx_a, |p, cx| p.share(cx))
1919 .await
1920 .unwrap();
1921
1922 // Join that project as client B
1923 let project_b = Project::remote(
1924 project_id,
1925 client_b.clone(),
1926 client_b.user_store.clone(),
1927 lang_registry.clone(),
1928 fs.clone(),
1929 &mut cx_b.to_async(),
1930 )
1931 .await
1932 .unwrap();
1933
1934 // See that a guest has joined as client A.
1935 project_a
1936 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1937 .await;
1938
1939 // Begin opening a buffer as client B, but leave the project before the open completes.
1940 let buffer_b = cx_b
1941 .background()
1942 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1943 cx_b.update(|_| drop(project_b));
1944 drop(buffer_b);
1945
1946 // See that the guest has left.
1947 project_a
1948 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1949 .await;
1950 }
1951
1952 #[gpui::test(iterations = 10)]
1953 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1954 cx_a.foreground().forbid_parking();
1955 let lang_registry = Arc::new(LanguageRegistry::new());
1956 let fs = FakeFs::new(cx_a.background());
1957
1958 // Connect to a server as 2 clients.
1959 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1960 let client_a = server.create_client(&mut cx_a, "user_a").await;
1961 let client_b = server.create_client(&mut cx_b, "user_b").await;
1962
1963 // Share a project as client A
1964 fs.insert_tree(
1965 "/a",
1966 json!({
1967 ".zed.toml": r#"collaborators = ["user_b"]"#,
1968 "a.txt": "a-contents",
1969 "b.txt": "b-contents",
1970 }),
1971 )
1972 .await;
1973 let project_a = cx_a.update(|cx| {
1974 Project::local(
1975 client_a.clone(),
1976 client_a.user_store.clone(),
1977 lang_registry.clone(),
1978 fs.clone(),
1979 cx,
1980 )
1981 });
1982 let (worktree_a, _) = project_a
1983 .update(&mut cx_a, |p, cx| {
1984 p.find_or_create_local_worktree("/a", false, cx)
1985 })
1986 .await
1987 .unwrap();
1988 worktree_a
1989 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1990 .await;
1991 let project_id = project_a
1992 .update(&mut cx_a, |project, _| project.next_remote_id())
1993 .await;
1994 project_a
1995 .update(&mut cx_a, |project, cx| project.share(cx))
1996 .await
1997 .unwrap();
1998
1999 // Join that project as client B
2000 let _project_b = Project::remote(
2001 project_id,
2002 client_b.clone(),
2003 client_b.user_store.clone(),
2004 lang_registry.clone(),
2005 fs.clone(),
2006 &mut cx_b.to_async(),
2007 )
2008 .await
2009 .unwrap();
2010
2011 // See that a guest has joined as client A.
2012 project_a
2013 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2014 .await;
2015
2016 // Drop client B's connection and ensure client A observes client B leaving the worktree.
2017 client_b.disconnect(&cx_b.to_async()).unwrap();
2018 project_a
2019 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2020 .await;
2021 }
2022
2023 #[gpui::test(iterations = 10)]
2024 async fn test_collaborating_with_diagnostics(
2025 mut cx_a: TestAppContext,
2026 mut cx_b: TestAppContext,
2027 ) {
2028 cx_a.foreground().forbid_parking();
2029 let mut lang_registry = Arc::new(LanguageRegistry::new());
2030 let fs = FakeFs::new(cx_a.background());
2031
2032 // Set up a fake language server.
2033 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2034 Arc::get_mut(&mut lang_registry)
2035 .unwrap()
2036 .add(Arc::new(Language::new(
2037 LanguageConfig {
2038 name: "Rust".into(),
2039 path_suffixes: vec!["rs".to_string()],
2040 language_server: Some(language_server_config),
2041 ..Default::default()
2042 },
2043 Some(tree_sitter_rust::language()),
2044 )));
2045
2046 // Connect to a server as 2 clients.
2047 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2048 let client_a = server.create_client(&mut cx_a, "user_a").await;
2049 let client_b = server.create_client(&mut cx_b, "user_b").await;
2050
2051 // Share a project as client A
2052 fs.insert_tree(
2053 "/a",
2054 json!({
2055 ".zed.toml": r#"collaborators = ["user_b"]"#,
2056 "a.rs": "let one = two",
2057 "other.rs": "",
2058 }),
2059 )
2060 .await;
2061 let project_a = cx_a.update(|cx| {
2062 Project::local(
2063 client_a.clone(),
2064 client_a.user_store.clone(),
2065 lang_registry.clone(),
2066 fs.clone(),
2067 cx,
2068 )
2069 });
2070 let (worktree_a, _) = project_a
2071 .update(&mut cx_a, |p, cx| {
2072 p.find_or_create_local_worktree("/a", false, cx)
2073 })
2074 .await
2075 .unwrap();
2076 worktree_a
2077 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2078 .await;
2079 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2080 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2081 project_a
2082 .update(&mut cx_a, |p, cx| p.share(cx))
2083 .await
2084 .unwrap();
2085
2086 // Cause the language server to start.
2087 let _ = cx_a
2088 .background()
2089 .spawn(project_a.update(&mut cx_a, |project, cx| {
2090 project.open_buffer(
2091 ProjectPath {
2092 worktree_id,
2093 path: Path::new("other.rs").into(),
2094 },
2095 cx,
2096 )
2097 }))
2098 .await
2099 .unwrap();
2100
2101 // Simulate a language server reporting errors for a file.
2102 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2103 fake_language_server
2104 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2105 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2106 version: None,
2107 diagnostics: vec![lsp::Diagnostic {
2108 severity: Some(lsp::DiagnosticSeverity::ERROR),
2109 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2110 message: "message 1".to_string(),
2111 ..Default::default()
2112 }],
2113 })
2114 .await;
2115
2116 // Wait for server to see the diagnostics update.
2117 server
2118 .condition(|store| {
2119 let worktree = store
2120 .project(project_id)
2121 .unwrap()
2122 .worktrees
2123 .get(&worktree_id.to_proto())
2124 .unwrap();
2125
2126 !worktree
2127 .share
2128 .as_ref()
2129 .unwrap()
2130 .diagnostic_summaries
2131 .is_empty()
2132 })
2133 .await;
2134
2135 // Join the worktree as client B.
2136 let project_b = Project::remote(
2137 project_id,
2138 client_b.clone(),
2139 client_b.user_store.clone(),
2140 lang_registry.clone(),
2141 fs.clone(),
2142 &mut cx_b.to_async(),
2143 )
2144 .await
2145 .unwrap();
2146
2147 project_b.read_with(&cx_b, |project, cx| {
2148 assert_eq!(
2149 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2150 &[(
2151 ProjectPath {
2152 worktree_id,
2153 path: Arc::from(Path::new("a.rs")),
2154 },
2155 DiagnosticSummary {
2156 error_count: 1,
2157 warning_count: 0,
2158 ..Default::default()
2159 },
2160 )]
2161 )
2162 });
2163
2164 // Simulate a language server reporting more errors for a file.
2165 fake_language_server
2166 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2167 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2168 version: None,
2169 diagnostics: vec![
2170 lsp::Diagnostic {
2171 severity: Some(lsp::DiagnosticSeverity::ERROR),
2172 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2173 message: "message 1".to_string(),
2174 ..Default::default()
2175 },
2176 lsp::Diagnostic {
2177 severity: Some(lsp::DiagnosticSeverity::WARNING),
2178 range: lsp::Range::new(
2179 lsp::Position::new(0, 10),
2180 lsp::Position::new(0, 13),
2181 ),
2182 message: "message 2".to_string(),
2183 ..Default::default()
2184 },
2185 ],
2186 })
2187 .await;
2188
2189 // Client b gets the updated summaries
2190 project_b
2191 .condition(&cx_b, |project, cx| {
2192 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2193 == &[(
2194 ProjectPath {
2195 worktree_id,
2196 path: Arc::from(Path::new("a.rs")),
2197 },
2198 DiagnosticSummary {
2199 error_count: 1,
2200 warning_count: 1,
2201 ..Default::default()
2202 },
2203 )]
2204 })
2205 .await;
2206
2207 // Open the file with the errors on client B. They should be present.
2208 let buffer_b = cx_b
2209 .background()
2210 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2211 .await
2212 .unwrap();
2213
2214 buffer_b.read_with(&cx_b, |buffer, _| {
2215 assert_eq!(
2216 buffer
2217 .snapshot()
2218 .diagnostics_in_range::<_, Point>(0..buffer.len())
2219 .map(|entry| entry)
2220 .collect::<Vec<_>>(),
2221 &[
2222 DiagnosticEntry {
2223 range: Point::new(0, 4)..Point::new(0, 7),
2224 diagnostic: Diagnostic {
2225 group_id: 0,
2226 message: "message 1".to_string(),
2227 severity: lsp::DiagnosticSeverity::ERROR,
2228 is_primary: true,
2229 ..Default::default()
2230 }
2231 },
2232 DiagnosticEntry {
2233 range: Point::new(0, 10)..Point::new(0, 13),
2234 diagnostic: Diagnostic {
2235 group_id: 1,
2236 severity: lsp::DiagnosticSeverity::WARNING,
2237 message: "message 2".to_string(),
2238 is_primary: true,
2239 ..Default::default()
2240 }
2241 }
2242 ]
2243 );
2244 });
2245 }
2246
2247 #[gpui::test(iterations = 10)]
2248 async fn test_collaborating_with_completion(
2249 mut cx_a: TestAppContext,
2250 mut cx_b: TestAppContext,
2251 ) {
2252 cx_a.foreground().forbid_parking();
2253 let mut lang_registry = Arc::new(LanguageRegistry::new());
2254 let fs = FakeFs::new(cx_a.background());
2255
2256 // Set up a fake language server.
2257 let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2258 language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2259 completion_provider: Some(lsp::CompletionOptions {
2260 trigger_characters: Some(vec![".".to_string()]),
2261 ..Default::default()
2262 }),
2263 ..Default::default()
2264 });
2265 Arc::get_mut(&mut lang_registry)
2266 .unwrap()
2267 .add(Arc::new(Language::new(
2268 LanguageConfig {
2269 name: "Rust".into(),
2270 path_suffixes: vec!["rs".to_string()],
2271 language_server: Some(language_server_config),
2272 ..Default::default()
2273 },
2274 Some(tree_sitter_rust::language()),
2275 )));
2276
2277 // Connect to a server as 2 clients.
2278 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2279 let client_a = server.create_client(&mut cx_a, "user_a").await;
2280 let client_b = server.create_client(&mut cx_b, "user_b").await;
2281
2282 // Share a project as client A
2283 fs.insert_tree(
2284 "/a",
2285 json!({
2286 ".zed.toml": r#"collaborators = ["user_b"]"#,
2287 "main.rs": "fn main() { a }",
2288 "other.rs": "",
2289 }),
2290 )
2291 .await;
2292 let project_a = cx_a.update(|cx| {
2293 Project::local(
2294 client_a.clone(),
2295 client_a.user_store.clone(),
2296 lang_registry.clone(),
2297 fs.clone(),
2298 cx,
2299 )
2300 });
2301 let (worktree_a, _) = project_a
2302 .update(&mut cx_a, |p, cx| {
2303 p.find_or_create_local_worktree("/a", false, cx)
2304 })
2305 .await
2306 .unwrap();
2307 worktree_a
2308 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2309 .await;
2310 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2311 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2312 project_a
2313 .update(&mut cx_a, |p, cx| p.share(cx))
2314 .await
2315 .unwrap();
2316
2317 // Join the worktree as client B.
2318 let project_b = Project::remote(
2319 project_id,
2320 client_b.clone(),
2321 client_b.user_store.clone(),
2322 lang_registry.clone(),
2323 fs.clone(),
2324 &mut cx_b.to_async(),
2325 )
2326 .await
2327 .unwrap();
2328
2329 // Open a file in an editor as the guest.
2330 let buffer_b = project_b
2331 .update(&mut cx_b, |p, cx| {
2332 p.open_buffer((worktree_id, "main.rs"), cx)
2333 })
2334 .await
2335 .unwrap();
2336 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2337 let editor_b = cx_b.add_view(window_b, |cx| {
2338 Editor::for_buffer(
2339 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2340 Arc::new(|cx| EditorSettings::test(cx)),
2341 Some(project_b.clone()),
2342 cx,
2343 )
2344 });
2345
2346 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2347 buffer_b
2348 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2349 .await;
2350
2351 // Type a completion trigger character as the guest.
2352 editor_b.update(&mut cx_b, |editor, cx| {
2353 editor.select_ranges([13..13], None, cx);
2354 editor.handle_input(&Input(".".into()), cx);
2355 cx.focus(&editor_b);
2356 });
2357
2358 // Receive a completion request as the host's language server.
2359 // Return some completions from the host's language server.
2360 cx_a.foreground().start_waiting();
2361 fake_language_server
2362 .handle_request::<lsp::request::Completion, _>(|params| {
2363 assert_eq!(
2364 params.text_document_position.text_document.uri,
2365 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2366 );
2367 assert_eq!(
2368 params.text_document_position.position,
2369 lsp::Position::new(0, 14),
2370 );
2371
2372 Some(lsp::CompletionResponse::Array(vec![
2373 lsp::CompletionItem {
2374 label: "first_method(…)".into(),
2375 detail: Some("fn(&mut self, B) -> C".into()),
2376 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2377 new_text: "first_method($1)".to_string(),
2378 range: lsp::Range::new(
2379 lsp::Position::new(0, 14),
2380 lsp::Position::new(0, 14),
2381 ),
2382 })),
2383 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2384 ..Default::default()
2385 },
2386 lsp::CompletionItem {
2387 label: "second_method(…)".into(),
2388 detail: Some("fn(&mut self, C) -> D<E>".into()),
2389 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2390 new_text: "second_method()".to_string(),
2391 range: lsp::Range::new(
2392 lsp::Position::new(0, 14),
2393 lsp::Position::new(0, 14),
2394 ),
2395 })),
2396 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2397 ..Default::default()
2398 },
2399 ]))
2400 })
2401 .next()
2402 .await
2403 .unwrap();
2404 cx_a.foreground().finish_waiting();
2405
2406 // Open the buffer on the host.
2407 let buffer_a = project_a
2408 .update(&mut cx_a, |p, cx| {
2409 p.open_buffer((worktree_id, "main.rs"), cx)
2410 })
2411 .await
2412 .unwrap();
2413 buffer_a
2414 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2415 .await;
2416
2417 // Confirm a completion on the guest.
2418 editor_b
2419 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2420 .await;
2421 editor_b.update(&mut cx_b, |editor, cx| {
2422 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2423 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2424 });
2425
2426 // Return a resolved completion from the host's language server.
2427 // The resolved completion has an additional text edit.
2428 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2429 assert_eq!(params.label, "first_method(…)");
2430 lsp::CompletionItem {
2431 label: "first_method(…)".into(),
2432 detail: Some("fn(&mut self, B) -> C".into()),
2433 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2434 new_text: "first_method($1)".to_string(),
2435 range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2436 })),
2437 additional_text_edits: Some(vec![lsp::TextEdit {
2438 new_text: "use d::SomeTrait;\n".to_string(),
2439 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2440 }]),
2441 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2442 ..Default::default()
2443 }
2444 });
2445
2446 // The additional edit is applied.
2447 buffer_a
2448 .condition(&cx_a, |buffer, _| {
2449 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2450 })
2451 .await;
2452 buffer_b
2453 .condition(&cx_b, |buffer, _| {
2454 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2455 })
2456 .await;
2457 }
2458
2459 #[gpui::test(iterations = 10)]
2460 async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2461 cx_a.foreground().forbid_parking();
2462 let mut lang_registry = Arc::new(LanguageRegistry::new());
2463 let fs = FakeFs::new(cx_a.background());
2464
2465 // Set up a fake language server.
2466 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2467 Arc::get_mut(&mut lang_registry)
2468 .unwrap()
2469 .add(Arc::new(Language::new(
2470 LanguageConfig {
2471 name: "Rust".into(),
2472 path_suffixes: vec!["rs".to_string()],
2473 language_server: Some(language_server_config),
2474 ..Default::default()
2475 },
2476 Some(tree_sitter_rust::language()),
2477 )));
2478
2479 // Connect to a server as 2 clients.
2480 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2481 let client_a = server.create_client(&mut cx_a, "user_a").await;
2482 let client_b = server.create_client(&mut cx_b, "user_b").await;
2483
2484 // Share a project as client A
2485 fs.insert_tree(
2486 "/a",
2487 json!({
2488 ".zed.toml": r#"collaborators = ["user_b"]"#,
2489 "a.rs": "let one = two",
2490 }),
2491 )
2492 .await;
2493 let project_a = cx_a.update(|cx| {
2494 Project::local(
2495 client_a.clone(),
2496 client_a.user_store.clone(),
2497 lang_registry.clone(),
2498 fs.clone(),
2499 cx,
2500 )
2501 });
2502 let (worktree_a, _) = project_a
2503 .update(&mut cx_a, |p, cx| {
2504 p.find_or_create_local_worktree("/a", false, cx)
2505 })
2506 .await
2507 .unwrap();
2508 worktree_a
2509 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2510 .await;
2511 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2512 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2513 project_a
2514 .update(&mut cx_a, |p, cx| p.share(cx))
2515 .await
2516 .unwrap();
2517
2518 // Join the worktree as client B.
2519 let project_b = Project::remote(
2520 project_id,
2521 client_b.clone(),
2522 client_b.user_store.clone(),
2523 lang_registry.clone(),
2524 fs.clone(),
2525 &mut cx_b.to_async(),
2526 )
2527 .await
2528 .unwrap();
2529
2530 let buffer_b = cx_b
2531 .background()
2532 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2533 .await
2534 .unwrap();
2535
2536 let format = project_b.update(&mut cx_b, |project, cx| {
2537 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2538 });
2539
2540 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2541 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2542 Some(vec![
2543 lsp::TextEdit {
2544 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2545 new_text: "h".to_string(),
2546 },
2547 lsp::TextEdit {
2548 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2549 new_text: "y".to_string(),
2550 },
2551 ])
2552 });
2553
2554 format.await.unwrap();
2555 assert_eq!(
2556 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2557 "let honey = two"
2558 );
2559 }
2560
2561 #[gpui::test(iterations = 10)]
2562 async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2563 cx_a.foreground().forbid_parking();
2564 let mut lang_registry = Arc::new(LanguageRegistry::new());
2565 let fs = FakeFs::new(cx_a.background());
2566 fs.insert_tree(
2567 "/root-1",
2568 json!({
2569 ".zed.toml": r#"collaborators = ["user_b"]"#,
2570 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2571 }),
2572 )
2573 .await;
2574 fs.insert_tree(
2575 "/root-2",
2576 json!({
2577 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2578 }),
2579 )
2580 .await;
2581
2582 // Set up a fake language server.
2583 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2584 Arc::get_mut(&mut lang_registry)
2585 .unwrap()
2586 .add(Arc::new(Language::new(
2587 LanguageConfig {
2588 name: "Rust".into(),
2589 path_suffixes: vec!["rs".to_string()],
2590 language_server: Some(language_server_config),
2591 ..Default::default()
2592 },
2593 Some(tree_sitter_rust::language()),
2594 )));
2595
2596 // Connect to a server as 2 clients.
2597 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2598 let client_a = server.create_client(&mut cx_a, "user_a").await;
2599 let client_b = server.create_client(&mut cx_b, "user_b").await;
2600
2601 // Share a project as client A
2602 let project_a = cx_a.update(|cx| {
2603 Project::local(
2604 client_a.clone(),
2605 client_a.user_store.clone(),
2606 lang_registry.clone(),
2607 fs.clone(),
2608 cx,
2609 )
2610 });
2611 let (worktree_a, _) = project_a
2612 .update(&mut cx_a, |p, cx| {
2613 p.find_or_create_local_worktree("/root-1", false, cx)
2614 })
2615 .await
2616 .unwrap();
2617 worktree_a
2618 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2619 .await;
2620 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2621 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2622 project_a
2623 .update(&mut cx_a, |p, cx| p.share(cx))
2624 .await
2625 .unwrap();
2626
2627 // Join the worktree as client B.
2628 let project_b = Project::remote(
2629 project_id,
2630 client_b.clone(),
2631 client_b.user_store.clone(),
2632 lang_registry.clone(),
2633 fs.clone(),
2634 &mut cx_b.to_async(),
2635 )
2636 .await
2637 .unwrap();
2638
2639 // Open the file on client B.
2640 let buffer_b = cx_b
2641 .background()
2642 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2643 .await
2644 .unwrap();
2645
2646 // Request the definition of a symbol as the guest.
2647 let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2648
2649 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2650 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2651 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2652 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2653 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2654 )))
2655 });
2656
2657 let definitions_1 = definitions_1.await.unwrap();
2658 cx_b.read(|cx| {
2659 assert_eq!(definitions_1.len(), 1);
2660 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2661 let target_buffer = definitions_1[0].target_buffer.read(cx);
2662 assert_eq!(
2663 target_buffer.text(),
2664 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2665 );
2666 assert_eq!(
2667 definitions_1[0].target_range.to_point(target_buffer),
2668 Point::new(0, 6)..Point::new(0, 9)
2669 );
2670 });
2671
2672 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2673 // the previous call to `definition`.
2674 let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2675 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2676 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2677 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2678 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2679 )))
2680 });
2681
2682 let definitions_2 = definitions_2.await.unwrap();
2683 cx_b.read(|cx| {
2684 assert_eq!(definitions_2.len(), 1);
2685 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2686 let target_buffer = definitions_2[0].target_buffer.read(cx);
2687 assert_eq!(
2688 target_buffer.text(),
2689 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2690 );
2691 assert_eq!(
2692 definitions_2[0].target_range.to_point(target_buffer),
2693 Point::new(1, 6)..Point::new(1, 11)
2694 );
2695 });
2696 assert_eq!(
2697 definitions_1[0].target_buffer,
2698 definitions_2[0].target_buffer
2699 );
2700
2701 cx_b.update(|_| {
2702 drop(definitions_1);
2703 drop(definitions_2);
2704 });
2705 project_b
2706 .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2707 .await;
2708 }
2709
2710 #[gpui::test(iterations = 10)]
2711 async fn test_project_symbols(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2712 cx_a.foreground().forbid_parking();
2713 let mut lang_registry = Arc::new(LanguageRegistry::new());
2714 let fs = FakeFs::new(cx_a.background());
2715 fs.insert_tree(
2716 "/code",
2717 json!({
2718 "crate-1": {
2719 ".zed.toml": r#"collaborators = ["user_b"]"#,
2720 "one.rs": "const ONE: usize = 1;",
2721 },
2722 "crate-2": {
2723 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2724 },
2725 "private": {
2726 "passwords.txt": "the-password",
2727 }
2728 }),
2729 )
2730 .await;
2731
2732 // Set up a fake language server.
2733 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2734 Arc::get_mut(&mut lang_registry)
2735 .unwrap()
2736 .add(Arc::new(Language::new(
2737 LanguageConfig {
2738 name: "Rust".into(),
2739 path_suffixes: vec!["rs".to_string()],
2740 language_server: Some(language_server_config),
2741 ..Default::default()
2742 },
2743 Some(tree_sitter_rust::language()),
2744 )));
2745
2746 // Connect to a server as 2 clients.
2747 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2748 let client_a = server.create_client(&mut cx_a, "user_a").await;
2749 let client_b = server.create_client(&mut cx_b, "user_b").await;
2750
2751 // Share a project as client A
2752 let project_a = cx_a.update(|cx| {
2753 Project::local(
2754 client_a.clone(),
2755 client_a.user_store.clone(),
2756 lang_registry.clone(),
2757 fs.clone(),
2758 cx,
2759 )
2760 });
2761 let (worktree_a, _) = project_a
2762 .update(&mut cx_a, |p, cx| {
2763 p.find_or_create_local_worktree("/code/crate-1", false, cx)
2764 })
2765 .await
2766 .unwrap();
2767 worktree_a
2768 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2769 .await;
2770 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2771 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2772 project_a
2773 .update(&mut cx_a, |p, cx| p.share(cx))
2774 .await
2775 .unwrap();
2776
2777 // Join the worktree as client B.
2778 let project_b = Project::remote(
2779 project_id,
2780 client_b.clone(),
2781 client_b.user_store.clone(),
2782 lang_registry.clone(),
2783 fs.clone(),
2784 &mut cx_b.to_async(),
2785 )
2786 .await
2787 .unwrap();
2788
2789 // Cause the language server to start.
2790 let _buffer = cx_b
2791 .background()
2792 .spawn(project_b.update(&mut cx_b, |p, cx| {
2793 p.open_buffer((worktree_id, "one.rs"), cx)
2794 }))
2795 .await
2796 .unwrap();
2797
2798 // Request the definition of a symbol as the guest.
2799 let symbols = project_b.update(&mut cx_b, |p, cx| p.symbols("two", cx));
2800 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2801 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_| {
2802 #[allow(deprecated)]
2803 Some(vec![lsp::SymbolInformation {
2804 name: "TWO".into(),
2805 location: lsp::Location {
2806 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
2807 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2808 },
2809 kind: lsp::SymbolKind::CONSTANT,
2810 tags: None,
2811 container_name: None,
2812 deprecated: None,
2813 }])
2814 });
2815
2816 let symbols = symbols.await.unwrap();
2817 assert_eq!(symbols.len(), 1);
2818 assert_eq!(symbols[0].name, "TWO");
2819
2820 // Open one of the returned symbols.
2821 let buffer_b_2 = project_b
2822 .update(&mut cx_b, |project, cx| {
2823 project.open_buffer_for_symbol(&symbols[0], cx)
2824 })
2825 .await
2826 .unwrap();
2827 buffer_b_2.read_with(&cx_b, |buffer, _| {
2828 assert_eq!(
2829 buffer.file().unwrap().path().as_ref(),
2830 Path::new("../crate-2/two.rs")
2831 );
2832 });
2833
2834 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
2835 let mut fake_symbol = symbols[0].clone();
2836 fake_symbol.path = Path::new("/code/secrets").into();
2837 let error = project_b
2838 .update(&mut cx_b, |project, cx| {
2839 project.open_buffer_for_symbol(&fake_symbol, cx)
2840 })
2841 .await
2842 .unwrap_err();
2843 assert!(error.to_string().contains("invalid symbol signature"));
2844 }
2845
2846 #[gpui::test(iterations = 10)]
2847 async fn test_open_buffer_while_getting_definition_pointing_to_it(
2848 mut cx_a: TestAppContext,
2849 mut cx_b: TestAppContext,
2850 mut rng: StdRng,
2851 ) {
2852 cx_a.foreground().forbid_parking();
2853 let mut lang_registry = Arc::new(LanguageRegistry::new());
2854 let fs = FakeFs::new(cx_a.background());
2855 fs.insert_tree(
2856 "/root",
2857 json!({
2858 ".zed.toml": r#"collaborators = ["user_b"]"#,
2859 "a.rs": "const ONE: usize = b::TWO;",
2860 "b.rs": "const TWO: usize = 2",
2861 }),
2862 )
2863 .await;
2864
2865 // Set up a fake language server.
2866 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2867
2868 Arc::get_mut(&mut lang_registry)
2869 .unwrap()
2870 .add(Arc::new(Language::new(
2871 LanguageConfig {
2872 name: "Rust".into(),
2873 path_suffixes: vec!["rs".to_string()],
2874 language_server: Some(language_server_config),
2875 ..Default::default()
2876 },
2877 Some(tree_sitter_rust::language()),
2878 )));
2879
2880 // Connect to a server as 2 clients.
2881 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2882 let client_a = server.create_client(&mut cx_a, "user_a").await;
2883 let client_b = server.create_client(&mut cx_b, "user_b").await;
2884
2885 // Share a project as client A
2886 let project_a = cx_a.update(|cx| {
2887 Project::local(
2888 client_a.clone(),
2889 client_a.user_store.clone(),
2890 lang_registry.clone(),
2891 fs.clone(),
2892 cx,
2893 )
2894 });
2895
2896 let (worktree_a, _) = project_a
2897 .update(&mut cx_a, |p, cx| {
2898 p.find_or_create_local_worktree("/root", false, cx)
2899 })
2900 .await
2901 .unwrap();
2902 worktree_a
2903 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2904 .await;
2905 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2906 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2907 project_a
2908 .update(&mut cx_a, |p, cx| p.share(cx))
2909 .await
2910 .unwrap();
2911
2912 // Join the worktree as client B.
2913 let project_b = Project::remote(
2914 project_id,
2915 client_b.clone(),
2916 client_b.user_store.clone(),
2917 lang_registry.clone(),
2918 fs.clone(),
2919 &mut cx_b.to_async(),
2920 )
2921 .await
2922 .unwrap();
2923
2924 let buffer_b1 = cx_b
2925 .background()
2926 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2927 .await
2928 .unwrap();
2929
2930 let definitions;
2931 let buffer_b2;
2932 if rng.gen() {
2933 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2934 buffer_b2 =
2935 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2936 } else {
2937 buffer_b2 =
2938 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2939 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2940 }
2941
2942 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2943 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2944 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2945 lsp::Url::from_file_path("/root/b.rs").unwrap(),
2946 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2947 )))
2948 });
2949
2950 let buffer_b2 = buffer_b2.await.unwrap();
2951 let definitions = definitions.await.unwrap();
2952 assert_eq!(definitions.len(), 1);
2953 assert_eq!(definitions[0].target_buffer, buffer_b2);
2954 }
2955
2956 #[gpui::test(iterations = 10)]
2957 async fn test_collaborating_with_code_actions(
2958 mut cx_a: TestAppContext,
2959 mut cx_b: TestAppContext,
2960 ) {
2961 cx_a.foreground().forbid_parking();
2962 let mut lang_registry = Arc::new(LanguageRegistry::new());
2963 let fs = FakeFs::new(cx_a.background());
2964 let mut path_openers_b = Vec::new();
2965 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2966
2967 // Set up a fake language server.
2968 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2969 Arc::get_mut(&mut lang_registry)
2970 .unwrap()
2971 .add(Arc::new(Language::new(
2972 LanguageConfig {
2973 name: "Rust".into(),
2974 path_suffixes: vec!["rs".to_string()],
2975 language_server: Some(language_server_config),
2976 ..Default::default()
2977 },
2978 Some(tree_sitter_rust::language()),
2979 )));
2980
2981 // Connect to a server as 2 clients.
2982 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2983 let client_a = server.create_client(&mut cx_a, "user_a").await;
2984 let client_b = server.create_client(&mut cx_b, "user_b").await;
2985
2986 // Share a project as client A
2987 fs.insert_tree(
2988 "/a",
2989 json!({
2990 ".zed.toml": r#"collaborators = ["user_b"]"#,
2991 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2992 "other.rs": "pub fn foo() -> usize { 4 }",
2993 }),
2994 )
2995 .await;
2996 let project_a = cx_a.update(|cx| {
2997 Project::local(
2998 client_a.clone(),
2999 client_a.user_store.clone(),
3000 lang_registry.clone(),
3001 fs.clone(),
3002 cx,
3003 )
3004 });
3005 let (worktree_a, _) = project_a
3006 .update(&mut cx_a, |p, cx| {
3007 p.find_or_create_local_worktree("/a", false, cx)
3008 })
3009 .await
3010 .unwrap();
3011 worktree_a
3012 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3013 .await;
3014 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3015 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3016 project_a
3017 .update(&mut cx_a, |p, cx| p.share(cx))
3018 .await
3019 .unwrap();
3020
3021 // Join the worktree as client B.
3022 let project_b = Project::remote(
3023 project_id,
3024 client_b.clone(),
3025 client_b.user_store.clone(),
3026 lang_registry.clone(),
3027 fs.clone(),
3028 &mut cx_b.to_async(),
3029 )
3030 .await
3031 .unwrap();
3032 let mut params = cx_b.update(WorkspaceParams::test);
3033 params.languages = lang_registry.clone();
3034 params.client = client_b.client.clone();
3035 params.user_store = client_b.user_store.clone();
3036 params.project = project_b;
3037 params.path_openers = path_openers_b.into();
3038
3039 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3040 let editor_b = workspace_b
3041 .update(&mut cx_b, |workspace, cx| {
3042 workspace.open_path((worktree_id, "main.rs").into(), cx)
3043 })
3044 .await
3045 .unwrap()
3046 .downcast::<Editor>()
3047 .unwrap();
3048
3049 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3050 fake_language_server
3051 .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
3052 assert_eq!(
3053 params.text_document.uri,
3054 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3055 );
3056 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3057 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3058 None
3059 })
3060 .next()
3061 .await;
3062
3063 // Move cursor to a location that contains code actions.
3064 editor_b.update(&mut cx_b, |editor, cx| {
3065 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3066 cx.focus(&editor_b);
3067 });
3068
3069 fake_language_server
3070 .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
3071 assert_eq!(
3072 params.text_document.uri,
3073 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3074 );
3075 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3076 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3077
3078 Some(vec![lsp::CodeActionOrCommand::CodeAction(
3079 lsp::CodeAction {
3080 title: "Inline into all callers".to_string(),
3081 edit: Some(lsp::WorkspaceEdit {
3082 changes: Some(
3083 [
3084 (
3085 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3086 vec![lsp::TextEdit::new(
3087 lsp::Range::new(
3088 lsp::Position::new(1, 22),
3089 lsp::Position::new(1, 34),
3090 ),
3091 "4".to_string(),
3092 )],
3093 ),
3094 (
3095 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3096 vec![lsp::TextEdit::new(
3097 lsp::Range::new(
3098 lsp::Position::new(0, 0),
3099 lsp::Position::new(0, 27),
3100 ),
3101 "".to_string(),
3102 )],
3103 ),
3104 ]
3105 .into_iter()
3106 .collect(),
3107 ),
3108 ..Default::default()
3109 }),
3110 data: Some(json!({
3111 "codeActionParams": {
3112 "range": {
3113 "start": {"line": 1, "column": 31},
3114 "end": {"line": 1, "column": 31},
3115 }
3116 }
3117 })),
3118 ..Default::default()
3119 },
3120 )])
3121 })
3122 .next()
3123 .await;
3124
3125 // Toggle code actions and wait for them to display.
3126 editor_b.update(&mut cx_b, |editor, cx| {
3127 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3128 });
3129 editor_b
3130 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3131 .await;
3132
3133 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3134
3135 // Confirming the code action will trigger a resolve request.
3136 let confirm_action = workspace_b
3137 .update(&mut cx_b, |workspace, cx| {
3138 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3139 })
3140 .unwrap();
3141 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
3142 lsp::CodeAction {
3143 title: "Inline into all callers".to_string(),
3144 edit: Some(lsp::WorkspaceEdit {
3145 changes: Some(
3146 [
3147 (
3148 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3149 vec![lsp::TextEdit::new(
3150 lsp::Range::new(
3151 lsp::Position::new(1, 22),
3152 lsp::Position::new(1, 34),
3153 ),
3154 "4".to_string(),
3155 )],
3156 ),
3157 (
3158 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3159 vec![lsp::TextEdit::new(
3160 lsp::Range::new(
3161 lsp::Position::new(0, 0),
3162 lsp::Position::new(0, 27),
3163 ),
3164 "".to_string(),
3165 )],
3166 ),
3167 ]
3168 .into_iter()
3169 .collect(),
3170 ),
3171 ..Default::default()
3172 }),
3173 ..Default::default()
3174 }
3175 });
3176
3177 // After the action is confirmed, an editor containing both modified files is opened.
3178 confirm_action.await.unwrap();
3179 let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3180 workspace
3181 .active_item(cx)
3182 .unwrap()
3183 .downcast::<Editor>()
3184 .unwrap()
3185 });
3186 code_action_editor.update(&mut cx_b, |editor, cx| {
3187 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3188 editor.undo(&Undo, cx);
3189 assert_eq!(
3190 editor.text(cx),
3191 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3192 );
3193 editor.redo(&Redo, cx);
3194 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3195 });
3196 }
3197
3198 #[gpui::test(iterations = 10)]
3199 async fn test_collaborating_with_renames(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3200 cx_a.foreground().forbid_parking();
3201 let mut lang_registry = Arc::new(LanguageRegistry::new());
3202 let fs = FakeFs::new(cx_a.background());
3203 let mut path_openers_b = Vec::new();
3204 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3205
3206 // Set up a fake language server.
3207 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3208 Arc::get_mut(&mut lang_registry)
3209 .unwrap()
3210 .add(Arc::new(Language::new(
3211 LanguageConfig {
3212 name: "Rust".into(),
3213 path_suffixes: vec!["rs".to_string()],
3214 language_server: Some(language_server_config),
3215 ..Default::default()
3216 },
3217 Some(tree_sitter_rust::language()),
3218 )));
3219
3220 // Connect to a server as 2 clients.
3221 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3222 let client_a = server.create_client(&mut cx_a, "user_a").await;
3223 let client_b = server.create_client(&mut cx_b, "user_b").await;
3224
3225 // Share a project as client A
3226 fs.insert_tree(
3227 "/dir",
3228 json!({
3229 ".zed.toml": r#"collaborators = ["user_b"]"#,
3230 "one.rs": "const ONE: usize = 1;",
3231 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3232 }),
3233 )
3234 .await;
3235 let project_a = cx_a.update(|cx| {
3236 Project::local(
3237 client_a.clone(),
3238 client_a.user_store.clone(),
3239 lang_registry.clone(),
3240 fs.clone(),
3241 cx,
3242 )
3243 });
3244 let (worktree_a, _) = project_a
3245 .update(&mut cx_a, |p, cx| {
3246 p.find_or_create_local_worktree("/dir", false, cx)
3247 })
3248 .await
3249 .unwrap();
3250 worktree_a
3251 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3252 .await;
3253 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3254 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3255 project_a
3256 .update(&mut cx_a, |p, cx| p.share(cx))
3257 .await
3258 .unwrap();
3259
3260 // Join the worktree as client B.
3261 let project_b = Project::remote(
3262 project_id,
3263 client_b.clone(),
3264 client_b.user_store.clone(),
3265 lang_registry.clone(),
3266 fs.clone(),
3267 &mut cx_b.to_async(),
3268 )
3269 .await
3270 .unwrap();
3271 let mut params = cx_b.update(WorkspaceParams::test);
3272 params.languages = lang_registry.clone();
3273 params.client = client_b.client.clone();
3274 params.user_store = client_b.user_store.clone();
3275 params.project = project_b;
3276 params.path_openers = path_openers_b.into();
3277
3278 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3279 let editor_b = workspace_b
3280 .update(&mut cx_b, |workspace, cx| {
3281 workspace.open_path((worktree_id, "one.rs").into(), cx)
3282 })
3283 .await
3284 .unwrap()
3285 .downcast::<Editor>()
3286 .unwrap();
3287 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3288
3289 // Move cursor to a location that can be renamed.
3290 let prepare_rename = editor_b.update(&mut cx_b, |editor, cx| {
3291 editor.select_ranges([7..7], None, cx);
3292 editor.rename(&Rename, cx).unwrap()
3293 });
3294
3295 fake_language_server
3296 .handle_request::<lsp::request::PrepareRenameRequest, _>(|params| {
3297 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3298 assert_eq!(params.position, lsp::Position::new(0, 7));
3299 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3300 lsp::Position::new(0, 6),
3301 lsp::Position::new(0, 9),
3302 )))
3303 })
3304 .next()
3305 .await
3306 .unwrap();
3307 prepare_rename.await.unwrap();
3308 editor_b.update(&mut cx_b, |editor, cx| {
3309 let rename = editor.pending_rename().unwrap();
3310 let buffer = editor.buffer().read(cx).snapshot(cx);
3311 assert_eq!(
3312 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3313 6..9
3314 );
3315 rename.editor.update(cx, |rename_editor, cx| {
3316 rename_editor.buffer().update(cx, |rename_buffer, cx| {
3317 rename_buffer.edit([0..3], "THREE", cx);
3318 });
3319 });
3320 });
3321
3322 let confirm_rename = workspace_b.update(&mut cx_b, |workspace, cx| {
3323 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3324 });
3325 fake_language_server
3326 .handle_request::<lsp::request::Rename, _>(|params| {
3327 assert_eq!(
3328 params.text_document_position.text_document.uri.as_str(),
3329 "file:///dir/one.rs"
3330 );
3331 assert_eq!(
3332 params.text_document_position.position,
3333 lsp::Position::new(0, 6)
3334 );
3335 assert_eq!(params.new_name, "THREE");
3336 Some(lsp::WorkspaceEdit {
3337 changes: Some(
3338 [
3339 (
3340 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3341 vec![lsp::TextEdit::new(
3342 lsp::Range::new(
3343 lsp::Position::new(0, 6),
3344 lsp::Position::new(0, 9),
3345 ),
3346 "THREE".to_string(),
3347 )],
3348 ),
3349 (
3350 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3351 vec![
3352 lsp::TextEdit::new(
3353 lsp::Range::new(
3354 lsp::Position::new(0, 24),
3355 lsp::Position::new(0, 27),
3356 ),
3357 "THREE".to_string(),
3358 ),
3359 lsp::TextEdit::new(
3360 lsp::Range::new(
3361 lsp::Position::new(0, 35),
3362 lsp::Position::new(0, 38),
3363 ),
3364 "THREE".to_string(),
3365 ),
3366 ],
3367 ),
3368 ]
3369 .into_iter()
3370 .collect(),
3371 ),
3372 ..Default::default()
3373 })
3374 })
3375 .next()
3376 .await
3377 .unwrap();
3378 confirm_rename.await.unwrap();
3379
3380 let rename_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3381 workspace
3382 .active_item(cx)
3383 .unwrap()
3384 .downcast::<Editor>()
3385 .unwrap()
3386 });
3387 rename_editor.update(&mut cx_b, |editor, cx| {
3388 assert_eq!(
3389 editor.text(cx),
3390 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3391 );
3392 editor.undo(&Undo, cx);
3393 assert_eq!(
3394 editor.text(cx),
3395 "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3396 );
3397 editor.redo(&Redo, cx);
3398 assert_eq!(
3399 editor.text(cx),
3400 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3401 );
3402 });
3403
3404 // Ensure temporary rename edits cannot be undone/redone.
3405 editor_b.update(&mut cx_b, |editor, cx| {
3406 editor.undo(&Undo, cx);
3407 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3408 editor.undo(&Undo, cx);
3409 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3410 editor.redo(&Redo, cx);
3411 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3412 })
3413 }
3414
3415 #[gpui::test(iterations = 10)]
3416 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3417 cx_a.foreground().forbid_parking();
3418
3419 // Connect to a server as 2 clients.
3420 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3421 let client_a = server.create_client(&mut cx_a, "user_a").await;
3422 let client_b = server.create_client(&mut cx_b, "user_b").await;
3423
3424 // Create an org that includes these 2 users.
3425 let db = &server.app_state.db;
3426 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3427 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3428 .await
3429 .unwrap();
3430 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3431 .await
3432 .unwrap();
3433
3434 // Create a channel that includes all the users.
3435 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3436 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3437 .await
3438 .unwrap();
3439 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3440 .await
3441 .unwrap();
3442 db.create_channel_message(
3443 channel_id,
3444 client_b.current_user_id(&cx_b),
3445 "hello A, it's B.",
3446 OffsetDateTime::now_utc(),
3447 1,
3448 )
3449 .await
3450 .unwrap();
3451
3452 let channels_a = cx_a
3453 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3454 channels_a
3455 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3456 .await;
3457 channels_a.read_with(&cx_a, |list, _| {
3458 assert_eq!(
3459 list.available_channels().unwrap(),
3460 &[ChannelDetails {
3461 id: channel_id.to_proto(),
3462 name: "test-channel".to_string()
3463 }]
3464 )
3465 });
3466 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3467 this.get_channel(channel_id.to_proto(), cx).unwrap()
3468 });
3469 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3470 channel_a
3471 .condition(&cx_a, |channel, _| {
3472 channel_messages(channel)
3473 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3474 })
3475 .await;
3476
3477 let channels_b = cx_b
3478 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3479 channels_b
3480 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3481 .await;
3482 channels_b.read_with(&cx_b, |list, _| {
3483 assert_eq!(
3484 list.available_channels().unwrap(),
3485 &[ChannelDetails {
3486 id: channel_id.to_proto(),
3487 name: "test-channel".to_string()
3488 }]
3489 )
3490 });
3491
3492 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3493 this.get_channel(channel_id.to_proto(), cx).unwrap()
3494 });
3495 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3496 channel_b
3497 .condition(&cx_b, |channel, _| {
3498 channel_messages(channel)
3499 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3500 })
3501 .await;
3502
3503 channel_a
3504 .update(&mut cx_a, |channel, cx| {
3505 channel
3506 .send_message("oh, hi B.".to_string(), cx)
3507 .unwrap()
3508 .detach();
3509 let task = channel.send_message("sup".to_string(), cx).unwrap();
3510 assert_eq!(
3511 channel_messages(channel),
3512 &[
3513 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3514 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3515 ("user_a".to_string(), "sup".to_string(), true)
3516 ]
3517 );
3518 task
3519 })
3520 .await
3521 .unwrap();
3522
3523 channel_b
3524 .condition(&cx_b, |channel, _| {
3525 channel_messages(channel)
3526 == [
3527 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3528 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3529 ("user_a".to_string(), "sup".to_string(), false),
3530 ]
3531 })
3532 .await;
3533
3534 assert_eq!(
3535 server
3536 .state()
3537 .await
3538 .channel(channel_id)
3539 .unwrap()
3540 .connection_ids
3541 .len(),
3542 2
3543 );
3544 cx_b.update(|_| drop(channel_b));
3545 server
3546 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3547 .await;
3548
3549 cx_a.update(|_| drop(channel_a));
3550 server
3551 .condition(|state| state.channel(channel_id).is_none())
3552 .await;
3553 }
3554
3555 #[gpui::test(iterations = 10)]
3556 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3557 cx_a.foreground().forbid_parking();
3558
3559 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3560 let client_a = server.create_client(&mut cx_a, "user_a").await;
3561
3562 let db = &server.app_state.db;
3563 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3564 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3565 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3566 .await
3567 .unwrap();
3568 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3569 .await
3570 .unwrap();
3571
3572 let channels_a = cx_a
3573 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3574 channels_a
3575 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3576 .await;
3577 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3578 this.get_channel(channel_id.to_proto(), cx).unwrap()
3579 });
3580
3581 // Messages aren't allowed to be too long.
3582 channel_a
3583 .update(&mut cx_a, |channel, cx| {
3584 let long_body = "this is long.\n".repeat(1024);
3585 channel.send_message(long_body, cx).unwrap()
3586 })
3587 .await
3588 .unwrap_err();
3589
3590 // Messages aren't allowed to be blank.
3591 channel_a.update(&mut cx_a, |channel, cx| {
3592 channel.send_message(String::new(), cx).unwrap_err()
3593 });
3594
3595 // Leading and trailing whitespace are trimmed.
3596 channel_a
3597 .update(&mut cx_a, |channel, cx| {
3598 channel
3599 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3600 .unwrap()
3601 })
3602 .await
3603 .unwrap();
3604 assert_eq!(
3605 db.get_channel_messages(channel_id, 10, None)
3606 .await
3607 .unwrap()
3608 .iter()
3609 .map(|m| &m.body)
3610 .collect::<Vec<_>>(),
3611 &["surrounded by whitespace"]
3612 );
3613 }
3614
3615 #[gpui::test(iterations = 10)]
3616 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3617 cx_a.foreground().forbid_parking();
3618
3619 // Connect to a server as 2 clients.
3620 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3621 let client_a = server.create_client(&mut cx_a, "user_a").await;
3622 let client_b = server.create_client(&mut cx_b, "user_b").await;
3623 let mut status_b = client_b.status();
3624
3625 // Create an org that includes these 2 users.
3626 let db = &server.app_state.db;
3627 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3628 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3629 .await
3630 .unwrap();
3631 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3632 .await
3633 .unwrap();
3634
3635 // Create a channel that includes all the users.
3636 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3637 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3638 .await
3639 .unwrap();
3640 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3641 .await
3642 .unwrap();
3643 db.create_channel_message(
3644 channel_id,
3645 client_b.current_user_id(&cx_b),
3646 "hello A, it's B.",
3647 OffsetDateTime::now_utc(),
3648 2,
3649 )
3650 .await
3651 .unwrap();
3652
3653 let channels_a = cx_a
3654 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3655 channels_a
3656 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3657 .await;
3658
3659 channels_a.read_with(&cx_a, |list, _| {
3660 assert_eq!(
3661 list.available_channels().unwrap(),
3662 &[ChannelDetails {
3663 id: channel_id.to_proto(),
3664 name: "test-channel".to_string()
3665 }]
3666 )
3667 });
3668 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3669 this.get_channel(channel_id.to_proto(), cx).unwrap()
3670 });
3671 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3672 channel_a
3673 .condition(&cx_a, |channel, _| {
3674 channel_messages(channel)
3675 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3676 })
3677 .await;
3678
3679 let channels_b = cx_b
3680 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3681 channels_b
3682 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3683 .await;
3684 channels_b.read_with(&cx_b, |list, _| {
3685 assert_eq!(
3686 list.available_channels().unwrap(),
3687 &[ChannelDetails {
3688 id: channel_id.to_proto(),
3689 name: "test-channel".to_string()
3690 }]
3691 )
3692 });
3693
3694 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3695 this.get_channel(channel_id.to_proto(), cx).unwrap()
3696 });
3697 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3698 channel_b
3699 .condition(&cx_b, |channel, _| {
3700 channel_messages(channel)
3701 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3702 })
3703 .await;
3704
3705 // Disconnect client B, ensuring we can still access its cached channel data.
3706 server.forbid_connections();
3707 server.disconnect_client(client_b.current_user_id(&cx_b));
3708 while !matches!(
3709 status_b.next().await,
3710 Some(client::Status::ReconnectionError { .. })
3711 ) {}
3712
3713 channels_b.read_with(&cx_b, |channels, _| {
3714 assert_eq!(
3715 channels.available_channels().unwrap(),
3716 [ChannelDetails {
3717 id: channel_id.to_proto(),
3718 name: "test-channel".to_string()
3719 }]
3720 )
3721 });
3722 channel_b.read_with(&cx_b, |channel, _| {
3723 assert_eq!(
3724 channel_messages(channel),
3725 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3726 )
3727 });
3728
3729 // Send a message from client B while it is disconnected.
3730 channel_b
3731 .update(&mut cx_b, |channel, cx| {
3732 let task = channel
3733 .send_message("can you see this?".to_string(), cx)
3734 .unwrap();
3735 assert_eq!(
3736 channel_messages(channel),
3737 &[
3738 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3739 ("user_b".to_string(), "can you see this?".to_string(), true)
3740 ]
3741 );
3742 task
3743 })
3744 .await
3745 .unwrap_err();
3746
3747 // Send a message from client A while B is disconnected.
3748 channel_a
3749 .update(&mut cx_a, |channel, cx| {
3750 channel
3751 .send_message("oh, hi B.".to_string(), cx)
3752 .unwrap()
3753 .detach();
3754 let task = channel.send_message("sup".to_string(), cx).unwrap();
3755 assert_eq!(
3756 channel_messages(channel),
3757 &[
3758 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3759 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3760 ("user_a".to_string(), "sup".to_string(), true)
3761 ]
3762 );
3763 task
3764 })
3765 .await
3766 .unwrap();
3767
3768 // Give client B a chance to reconnect.
3769 server.allow_connections();
3770 cx_b.foreground().advance_clock(Duration::from_secs(10));
3771
3772 // Verify that B sees the new messages upon reconnection, as well as the message client B
3773 // sent while offline.
3774 channel_b
3775 .condition(&cx_b, |channel, _| {
3776 channel_messages(channel)
3777 == [
3778 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3779 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3780 ("user_a".to_string(), "sup".to_string(), false),
3781 ("user_b".to_string(), "can you see this?".to_string(), false),
3782 ]
3783 })
3784 .await;
3785
3786 // Ensure client A and B can communicate normally after reconnection.
3787 channel_a
3788 .update(&mut cx_a, |channel, cx| {
3789 channel.send_message("you online?".to_string(), cx).unwrap()
3790 })
3791 .await
3792 .unwrap();
3793 channel_b
3794 .condition(&cx_b, |channel, _| {
3795 channel_messages(channel)
3796 == [
3797 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3798 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3799 ("user_a".to_string(), "sup".to_string(), false),
3800 ("user_b".to_string(), "can you see this?".to_string(), false),
3801 ("user_a".to_string(), "you online?".to_string(), false),
3802 ]
3803 })
3804 .await;
3805
3806 channel_b
3807 .update(&mut cx_b, |channel, cx| {
3808 channel.send_message("yep".to_string(), cx).unwrap()
3809 })
3810 .await
3811 .unwrap();
3812 channel_a
3813 .condition(&cx_a, |channel, _| {
3814 channel_messages(channel)
3815 == [
3816 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3817 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3818 ("user_a".to_string(), "sup".to_string(), false),
3819 ("user_b".to_string(), "can you see this?".to_string(), false),
3820 ("user_a".to_string(), "you online?".to_string(), false),
3821 ("user_b".to_string(), "yep".to_string(), false),
3822 ]
3823 })
3824 .await;
3825 }
3826
3827 #[gpui::test(iterations = 10)]
3828 async fn test_contacts(
3829 mut cx_a: TestAppContext,
3830 mut cx_b: TestAppContext,
3831 mut cx_c: TestAppContext,
3832 ) {
3833 cx_a.foreground().forbid_parking();
3834 let lang_registry = Arc::new(LanguageRegistry::new());
3835 let fs = FakeFs::new(cx_a.background());
3836
3837 // Connect to a server as 3 clients.
3838 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3839 let client_a = server.create_client(&mut cx_a, "user_a").await;
3840 let client_b = server.create_client(&mut cx_b, "user_b").await;
3841 let client_c = server.create_client(&mut cx_c, "user_c").await;
3842
3843 // Share a worktree as client A.
3844 fs.insert_tree(
3845 "/a",
3846 json!({
3847 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3848 }),
3849 )
3850 .await;
3851
3852 let project_a = cx_a.update(|cx| {
3853 Project::local(
3854 client_a.clone(),
3855 client_a.user_store.clone(),
3856 lang_registry.clone(),
3857 fs.clone(),
3858 cx,
3859 )
3860 });
3861 let (worktree_a, _) = project_a
3862 .update(&mut cx_a, |p, cx| {
3863 p.find_or_create_local_worktree("/a", false, cx)
3864 })
3865 .await
3866 .unwrap();
3867 worktree_a
3868 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3869 .await;
3870
3871 client_a
3872 .user_store
3873 .condition(&cx_a, |user_store, _| {
3874 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3875 })
3876 .await;
3877 client_b
3878 .user_store
3879 .condition(&cx_b, |user_store, _| {
3880 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3881 })
3882 .await;
3883 client_c
3884 .user_store
3885 .condition(&cx_c, |user_store, _| {
3886 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3887 })
3888 .await;
3889
3890 let project_id = project_a
3891 .update(&mut cx_a, |project, _| project.next_remote_id())
3892 .await;
3893 project_a
3894 .update(&mut cx_a, |project, cx| project.share(cx))
3895 .await
3896 .unwrap();
3897
3898 let _project_b = Project::remote(
3899 project_id,
3900 client_b.clone(),
3901 client_b.user_store.clone(),
3902 lang_registry.clone(),
3903 fs.clone(),
3904 &mut cx_b.to_async(),
3905 )
3906 .await
3907 .unwrap();
3908
3909 client_a
3910 .user_store
3911 .condition(&cx_a, |user_store, _| {
3912 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3913 })
3914 .await;
3915 client_b
3916 .user_store
3917 .condition(&cx_b, |user_store, _| {
3918 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3919 })
3920 .await;
3921 client_c
3922 .user_store
3923 .condition(&cx_c, |user_store, _| {
3924 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3925 })
3926 .await;
3927
3928 project_a
3929 .condition(&cx_a, |project, _| {
3930 project.collaborators().contains_key(&client_b.peer_id)
3931 })
3932 .await;
3933
3934 cx_a.update(move |_| drop(project_a));
3935 client_a
3936 .user_store
3937 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3938 .await;
3939 client_b
3940 .user_store
3941 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3942 .await;
3943 client_c
3944 .user_store
3945 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3946 .await;
3947
3948 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3949 user_store
3950 .contacts()
3951 .iter()
3952 .map(|contact| {
3953 let worktrees = contact
3954 .projects
3955 .iter()
3956 .map(|p| {
3957 (
3958 p.worktree_root_names[0].as_str(),
3959 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3960 )
3961 })
3962 .collect();
3963 (contact.user.github_login.as_str(), worktrees)
3964 })
3965 .collect()
3966 }
3967 }
3968
3969 #[gpui::test(iterations = 100)]
3970 async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
3971 cx.foreground().forbid_parking();
3972 let max_peers = env::var("MAX_PEERS")
3973 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
3974 .unwrap_or(5);
3975 let max_operations = env::var("OPERATIONS")
3976 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
3977 .unwrap_or(10);
3978
3979 let rng = Arc::new(Mutex::new(rng));
3980
3981 let guest_lang_registry = Arc::new(LanguageRegistry::new());
3982 let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
3983
3984 let fs = FakeFs::new(cx.background());
3985 fs.insert_tree(
3986 "/_collab",
3987 json!({
3988 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
3989 }),
3990 )
3991 .await;
3992
3993 let operations = Rc::new(Cell::new(0));
3994 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
3995 let mut clients = Vec::new();
3996
3997 let mut next_entity_id = 100000;
3998 let mut host_cx = TestAppContext::new(
3999 cx.foreground_platform(),
4000 cx.platform(),
4001 cx.foreground(),
4002 cx.background(),
4003 cx.font_cache(),
4004 next_entity_id,
4005 );
4006 let host = server.create_client(&mut host_cx, "host").await;
4007 let host_project = host_cx.update(|cx| {
4008 Project::local(
4009 host.client.clone(),
4010 host.user_store.clone(),
4011 Arc::new(LanguageRegistry::new()),
4012 fs.clone(),
4013 cx,
4014 )
4015 });
4016 let host_project_id = host_project
4017 .update(&mut host_cx, |p, _| p.next_remote_id())
4018 .await;
4019
4020 let (collab_worktree, _) = host_project
4021 .update(&mut host_cx, |project, cx| {
4022 project.find_or_create_local_worktree("/_collab", false, cx)
4023 })
4024 .await
4025 .unwrap();
4026 collab_worktree
4027 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4028 .await;
4029 host_project
4030 .update(&mut host_cx, |project, cx| project.share(cx))
4031 .await
4032 .unwrap();
4033
4034 clients.push(cx.foreground().spawn(host.simulate_host(
4035 host_project.clone(),
4036 language_server_config,
4037 operations.clone(),
4038 max_operations,
4039 rng.clone(),
4040 host_cx.clone(),
4041 )));
4042
4043 while operations.get() < max_operations {
4044 cx.background().simulate_random_delay().await;
4045 if clients.len() < max_peers && rng.lock().gen_bool(0.05) {
4046 operations.set(operations.get() + 1);
4047
4048 let guest_id = clients.len();
4049 log::info!("Adding guest {}", guest_id);
4050 next_entity_id += 100000;
4051 let mut guest_cx = TestAppContext::new(
4052 cx.foreground_platform(),
4053 cx.platform(),
4054 cx.foreground(),
4055 cx.background(),
4056 cx.font_cache(),
4057 next_entity_id,
4058 );
4059 let guest = server
4060 .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4061 .await;
4062 let guest_project = Project::remote(
4063 host_project_id,
4064 guest.client.clone(),
4065 guest.user_store.clone(),
4066 guest_lang_registry.clone(),
4067 fs.clone(),
4068 &mut guest_cx.to_async(),
4069 )
4070 .await
4071 .unwrap();
4072 clients.push(cx.foreground().spawn(guest.simulate_guest(
4073 guest_id,
4074 guest_project,
4075 operations.clone(),
4076 max_operations,
4077 rng.clone(),
4078 guest_cx,
4079 )));
4080
4081 log::info!("Guest {} added", guest_id);
4082 }
4083 }
4084
4085 let clients = futures::future::join_all(clients).await;
4086 cx.foreground().run_until_parked();
4087
4088 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4089 project
4090 .worktrees(cx)
4091 .map(|worktree| {
4092 let snapshot = worktree.read(cx).snapshot();
4093 (snapshot.id(), snapshot)
4094 })
4095 .collect::<BTreeMap<_, _>>()
4096 });
4097
4098 for (guest_client, guest_cx) in clients.iter().skip(1) {
4099 let guest_id = guest_client.client.id();
4100 let worktree_snapshots =
4101 guest_client
4102 .project
4103 .as_ref()
4104 .unwrap()
4105 .read_with(guest_cx, |project, cx| {
4106 project
4107 .worktrees(cx)
4108 .map(|worktree| {
4109 let worktree = worktree.read(cx);
4110 assert!(
4111 !worktree.as_remote().unwrap().has_pending_updates(),
4112 "Guest {} worktree {:?} contains deferred updates",
4113 guest_id,
4114 worktree.id()
4115 );
4116 (worktree.id(), worktree.snapshot())
4117 })
4118 .collect::<BTreeMap<_, _>>()
4119 });
4120
4121 assert_eq!(
4122 worktree_snapshots.keys().collect::<Vec<_>>(),
4123 host_worktree_snapshots.keys().collect::<Vec<_>>(),
4124 "guest {} has different worktrees than the host",
4125 guest_id
4126 );
4127 for (id, host_snapshot) in &host_worktree_snapshots {
4128 let guest_snapshot = &worktree_snapshots[id];
4129 assert_eq!(
4130 guest_snapshot.root_name(),
4131 host_snapshot.root_name(),
4132 "guest {} has different root name than the host for worktree {}",
4133 guest_id,
4134 id
4135 );
4136 assert_eq!(
4137 guest_snapshot.entries(false).collect::<Vec<_>>(),
4138 host_snapshot.entries(false).collect::<Vec<_>>(),
4139 "guest {} has different snapshot than the host for worktree {}",
4140 guest_id,
4141 id
4142 );
4143 }
4144
4145 guest_client
4146 .project
4147 .as_ref()
4148 .unwrap()
4149 .read_with(guest_cx, |project, _| {
4150 assert!(
4151 !project.has_buffered_operations(),
4152 "guest {} has buffered operations ",
4153 guest_id,
4154 );
4155 });
4156
4157 for guest_buffer in &guest_client.buffers {
4158 let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4159 let host_buffer = host_project.read_with(&host_cx, |project, _| {
4160 project
4161 .shared_buffer(guest_client.peer_id, buffer_id)
4162 .expect(&format!(
4163 "host doest not have buffer for guest:{}, peer:{}, id:{}",
4164 guest_id, guest_client.peer_id, buffer_id
4165 ))
4166 });
4167 assert_eq!(
4168 guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4169 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4170 "guest {} buffer {} differs from the host's buffer",
4171 guest_id,
4172 buffer_id,
4173 );
4174 }
4175 }
4176 }
4177
4178 struct TestServer {
4179 peer: Arc<Peer>,
4180 app_state: Arc<AppState>,
4181 server: Arc<Server>,
4182 foreground: Rc<executor::Foreground>,
4183 notifications: mpsc::UnboundedReceiver<()>,
4184 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4185 forbid_connections: Arc<AtomicBool>,
4186 _test_db: TestDb,
4187 }
4188
4189 impl TestServer {
4190 async fn start(
4191 foreground: Rc<executor::Foreground>,
4192 background: Arc<executor::Background>,
4193 ) -> Self {
4194 let test_db = TestDb::fake(background);
4195 let app_state = Self::build_app_state(&test_db).await;
4196 let peer = Peer::new();
4197 let notifications = mpsc::unbounded();
4198 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4199 Self {
4200 peer,
4201 app_state,
4202 server,
4203 foreground,
4204 notifications: notifications.1,
4205 connection_killers: Default::default(),
4206 forbid_connections: Default::default(),
4207 _test_db: test_db,
4208 }
4209 }
4210
4211 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4212 let http = FakeHttpClient::with_404_response();
4213 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4214 let client_name = name.to_string();
4215 let mut client = Client::new(http.clone());
4216 let server = self.server.clone();
4217 let connection_killers = self.connection_killers.clone();
4218 let forbid_connections = self.forbid_connections.clone();
4219 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4220
4221 Arc::get_mut(&mut client)
4222 .unwrap()
4223 .override_authenticate(move |cx| {
4224 cx.spawn(|_| async move {
4225 let access_token = "the-token".to_string();
4226 Ok(Credentials {
4227 user_id: user_id.0 as u64,
4228 access_token,
4229 })
4230 })
4231 })
4232 .override_establish_connection(move |credentials, cx| {
4233 assert_eq!(credentials.user_id, user_id.0 as u64);
4234 assert_eq!(credentials.access_token, "the-token");
4235
4236 let server = server.clone();
4237 let connection_killers = connection_killers.clone();
4238 let forbid_connections = forbid_connections.clone();
4239 let client_name = client_name.clone();
4240 let connection_id_tx = connection_id_tx.clone();
4241 cx.spawn(move |cx| async move {
4242 if forbid_connections.load(SeqCst) {
4243 Err(EstablishConnectionError::other(anyhow!(
4244 "server is forbidding connections"
4245 )))
4246 } else {
4247 let (client_conn, server_conn, kill_conn) =
4248 Connection::in_memory(cx.background());
4249 connection_killers.lock().insert(user_id, kill_conn);
4250 cx.background()
4251 .spawn(server.handle_connection(
4252 server_conn,
4253 client_name,
4254 user_id,
4255 Some(connection_id_tx),
4256 cx.background(),
4257 ))
4258 .detach();
4259 Ok(client_conn)
4260 }
4261 })
4262 });
4263
4264 client
4265 .authenticate_and_connect(&cx.to_async())
4266 .await
4267 .unwrap();
4268
4269 Channel::init(&client);
4270 Project::init(&client);
4271
4272 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4273 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4274 let mut authed_user =
4275 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4276 while authed_user.next().await.unwrap().is_none() {}
4277
4278 TestClient {
4279 client,
4280 peer_id,
4281 user_store,
4282 project: Default::default(),
4283 buffers: Default::default(),
4284 }
4285 }
4286
4287 fn disconnect_client(&self, user_id: UserId) {
4288 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4289 let _ = kill_conn.try_send(Some(()));
4290 }
4291 }
4292
4293 fn forbid_connections(&self) {
4294 self.forbid_connections.store(true, SeqCst);
4295 }
4296
4297 fn allow_connections(&self) {
4298 self.forbid_connections.store(false, SeqCst);
4299 }
4300
4301 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4302 let mut config = Config::default();
4303 config.session_secret = "a".repeat(32);
4304 config.database_url = test_db.url.clone();
4305 let github_client = github::AppClient::test();
4306 Arc::new(AppState {
4307 db: test_db.db().clone(),
4308 handlebars: Default::default(),
4309 auth_client: auth::build_client("", ""),
4310 repo_client: github::RepoClient::test(&github_client),
4311 github_client,
4312 config,
4313 })
4314 }
4315
4316 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4317 self.server.store.read()
4318 }
4319
4320 async fn condition<F>(&mut self, mut predicate: F)
4321 where
4322 F: FnMut(&Store) -> bool,
4323 {
4324 async_std::future::timeout(Duration::from_millis(500), async {
4325 while !(predicate)(&*self.server.store.read()) {
4326 self.foreground.start_waiting();
4327 self.notifications.next().await;
4328 self.foreground.finish_waiting();
4329 }
4330 })
4331 .await
4332 .expect("condition timed out");
4333 }
4334 }
4335
4336 impl Drop for TestServer {
4337 fn drop(&mut self) {
4338 self.peer.reset();
4339 }
4340 }
4341
4342 struct TestClient {
4343 client: Arc<Client>,
4344 pub peer_id: PeerId,
4345 pub user_store: ModelHandle<UserStore>,
4346 project: Option<ModelHandle<Project>>,
4347 buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4348 }
4349
4350 impl Deref for TestClient {
4351 type Target = Arc<Client>;
4352
4353 fn deref(&self) -> &Self::Target {
4354 &self.client
4355 }
4356 }
4357
4358 impl TestClient {
4359 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4360 UserId::from_proto(
4361 self.user_store
4362 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4363 )
4364 }
4365
4366 fn simulate_host(
4367 mut self,
4368 project: ModelHandle<Project>,
4369 mut language_server_config: LanguageServerConfig,
4370 operations: Rc<Cell<usize>>,
4371 max_operations: usize,
4372 rng: Arc<Mutex<StdRng>>,
4373 mut cx: TestAppContext,
4374 ) -> impl Future<Output = (Self, TestAppContext)> {
4375 let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4376
4377 // Set up a fake language server.
4378 language_server_config.set_fake_initializer({
4379 let rng = rng.clone();
4380 let files = files.clone();
4381 move |fake_server| {
4382 fake_server.handle_request::<lsp::request::Completion, _>(|_| {
4383 Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4384 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4385 range: lsp::Range::new(
4386 lsp::Position::new(0, 0),
4387 lsp::Position::new(0, 0),
4388 ),
4389 new_text: "the-new-text".to_string(),
4390 })),
4391 ..Default::default()
4392 }]))
4393 });
4394
4395 fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_| {
4396 Some(vec![lsp::CodeActionOrCommand::CodeAction(
4397 lsp::CodeAction {
4398 title: "the-code-action".to_string(),
4399 ..Default::default()
4400 },
4401 )])
4402 });
4403
4404 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(|params| {
4405 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4406 params.position,
4407 params.position,
4408 )))
4409 });
4410
4411 fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4412 let files = files.clone();
4413 let rng = rng.clone();
4414 move |_| {
4415 let files = files.lock();
4416 let mut rng = rng.lock();
4417 let count = rng.gen_range::<usize, _>(1..3);
4418 Some(lsp::GotoDefinitionResponse::Array(
4419 (0..count)
4420 .map(|_| {
4421 let file = files.choose(&mut *rng).unwrap().as_path();
4422 lsp::Location {
4423 uri: lsp::Url::from_file_path(file).unwrap(),
4424 range: Default::default(),
4425 }
4426 })
4427 .collect(),
4428 ))
4429 }
4430 });
4431 }
4432 });
4433
4434 project.update(&mut cx, |project, _| {
4435 project.languages().add(Arc::new(Language::new(
4436 LanguageConfig {
4437 name: "Rust".into(),
4438 path_suffixes: vec!["rs".to_string()],
4439 language_server: Some(language_server_config),
4440 ..Default::default()
4441 },
4442 None,
4443 )));
4444 });
4445
4446 async move {
4447 let fs = project.read_with(&cx, |project, _| project.fs().clone());
4448 while operations.get() < max_operations {
4449 operations.set(operations.get() + 1);
4450
4451 let distribution = rng.lock().gen_range::<usize, _>(0..100);
4452 match distribution {
4453 0..=20 if !files.lock().is_empty() => {
4454 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4455 let mut path = path.as_path();
4456 while let Some(parent_path) = path.parent() {
4457 path = parent_path;
4458 if rng.lock().gen() {
4459 break;
4460 }
4461 }
4462
4463 log::info!("Host: find/create local worktree {:?}", path);
4464 project
4465 .update(&mut cx, |project, cx| {
4466 project.find_or_create_local_worktree(path, false, cx)
4467 })
4468 .await
4469 .unwrap();
4470 }
4471 10..=80 if !files.lock().is_empty() => {
4472 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4473 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4474 let (worktree, path) = project
4475 .update(&mut cx, |project, cx| {
4476 project.find_or_create_local_worktree(file, false, cx)
4477 })
4478 .await
4479 .unwrap();
4480 let project_path =
4481 worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4482 log::info!("Host: opening path {:?}", project_path);
4483 let buffer = project
4484 .update(&mut cx, |project, cx| {
4485 project.open_buffer(project_path, cx)
4486 })
4487 .await
4488 .unwrap();
4489 self.buffers.insert(buffer.clone());
4490 buffer
4491 } else {
4492 self.buffers
4493 .iter()
4494 .choose(&mut *rng.lock())
4495 .unwrap()
4496 .clone()
4497 };
4498
4499 if rng.lock().gen_bool(0.1) {
4500 cx.update(|cx| {
4501 log::info!(
4502 "Host: dropping buffer {:?}",
4503 buffer.read(cx).file().unwrap().full_path(cx)
4504 );
4505 self.buffers.remove(&buffer);
4506 drop(buffer);
4507 });
4508 } else {
4509 buffer.update(&mut cx, |buffer, cx| {
4510 log::info!(
4511 "Host: updating buffer {:?}",
4512 buffer.file().unwrap().full_path(cx)
4513 );
4514 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4515 });
4516 }
4517 }
4518 _ => loop {
4519 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4520 let mut path = PathBuf::new();
4521 path.push("/");
4522 for _ in 0..path_component_count {
4523 let letter = rng.lock().gen_range(b'a'..=b'z');
4524 path.push(std::str::from_utf8(&[letter]).unwrap());
4525 }
4526 path.set_extension("rs");
4527 let parent_path = path.parent().unwrap();
4528
4529 log::info!("Host: creating file {:?}", path,);
4530
4531 if fs.create_dir(&parent_path).await.is_ok()
4532 && fs.create_file(&path, Default::default()).await.is_ok()
4533 {
4534 files.lock().push(path);
4535 break;
4536 } else {
4537 log::info!("Host: cannot create file");
4538 }
4539 },
4540 }
4541
4542 cx.background().simulate_random_delay().await;
4543 }
4544
4545 self.project = Some(project);
4546 (self, cx)
4547 }
4548 }
4549
4550 pub async fn simulate_guest(
4551 mut self,
4552 guest_id: usize,
4553 project: ModelHandle<Project>,
4554 operations: Rc<Cell<usize>>,
4555 max_operations: usize,
4556 rng: Arc<Mutex<StdRng>>,
4557 mut cx: TestAppContext,
4558 ) -> (Self, TestAppContext) {
4559 while operations.get() < max_operations {
4560 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4561 let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4562 project
4563 .worktrees(&cx)
4564 .filter(|worktree| {
4565 worktree.read(cx).entries(false).any(|e| e.is_file())
4566 })
4567 .choose(&mut *rng.lock())
4568 }) {
4569 worktree
4570 } else {
4571 cx.background().simulate_random_delay().await;
4572 continue;
4573 };
4574
4575 operations.set(operations.get() + 1);
4576 let project_path = worktree.read_with(&cx, |worktree, _| {
4577 let entry = worktree
4578 .entries(false)
4579 .filter(|e| e.is_file())
4580 .choose(&mut *rng.lock())
4581 .unwrap();
4582 (worktree.id(), entry.path.clone())
4583 });
4584 log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4585 let buffer = project
4586 .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4587 .await
4588 .unwrap();
4589 self.buffers.insert(buffer.clone());
4590 buffer
4591 } else {
4592 operations.set(operations.get() + 1);
4593
4594 self.buffers
4595 .iter()
4596 .choose(&mut *rng.lock())
4597 .unwrap()
4598 .clone()
4599 };
4600
4601 let choice = rng.lock().gen_range(0..100);
4602 match choice {
4603 0..=9 => {
4604 cx.update(|cx| {
4605 log::info!(
4606 "Guest {}: dropping buffer {:?}",
4607 guest_id,
4608 buffer.read(cx).file().unwrap().full_path(cx)
4609 );
4610 self.buffers.remove(&buffer);
4611 drop(buffer);
4612 });
4613 }
4614 10..=19 => {
4615 let completions = project.update(&mut cx, |project, cx| {
4616 log::info!(
4617 "Guest {}: requesting completions for buffer {:?}",
4618 guest_id,
4619 buffer.read(cx).file().unwrap().full_path(cx)
4620 );
4621 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4622 project.completions(&buffer, offset, cx)
4623 });
4624 let completions = cx.background().spawn(async move {
4625 completions.await.expect("completions request failed");
4626 });
4627 if rng.lock().gen_bool(0.3) {
4628 log::info!("Guest {}: detaching completions request", guest_id);
4629 completions.detach();
4630 } else {
4631 completions.await;
4632 }
4633 }
4634 20..=29 => {
4635 let code_actions = project.update(&mut cx, |project, cx| {
4636 log::info!(
4637 "Guest {}: requesting code actions for buffer {:?}",
4638 guest_id,
4639 buffer.read(cx).file().unwrap().full_path(cx)
4640 );
4641 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4642 project.code_actions(&buffer, range, cx)
4643 });
4644 let code_actions = cx.background().spawn(async move {
4645 code_actions.await.expect("code actions request failed");
4646 });
4647 if rng.lock().gen_bool(0.3) {
4648 log::info!("Guest {}: detaching code actions request", guest_id);
4649 code_actions.detach();
4650 } else {
4651 code_actions.await;
4652 }
4653 }
4654 30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4655 let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4656 log::info!(
4657 "Guest {}: saving buffer {:?}",
4658 guest_id,
4659 buffer.file().unwrap().full_path(cx)
4660 );
4661 (buffer.version(), buffer.save(cx))
4662 });
4663 let save = cx.spawn(|cx| async move {
4664 let (saved_version, _) = save.await.expect("save request failed");
4665 buffer.read_with(&cx, |buffer, _| {
4666 assert!(buffer.version().observed_all(&saved_version));
4667 assert!(saved_version.observed_all(&requested_version));
4668 });
4669 });
4670 if rng.lock().gen_bool(0.3) {
4671 log::info!("Guest {}: detaching save request", guest_id);
4672 save.detach();
4673 } else {
4674 save.await;
4675 }
4676 }
4677 40..=45 => {
4678 let prepare_rename = project.update(&mut cx, |project, cx| {
4679 log::info!(
4680 "Guest {}: preparing rename for buffer {:?}",
4681 guest_id,
4682 buffer.read(cx).file().unwrap().full_path(cx)
4683 );
4684 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4685 project.prepare_rename(buffer, offset, cx)
4686 });
4687 let prepare_rename = cx.background().spawn(async move {
4688 prepare_rename.await.expect("prepare rename request failed");
4689 });
4690 if rng.lock().gen_bool(0.3) {
4691 log::info!("Guest {}: detaching prepare rename request", guest_id);
4692 prepare_rename.detach();
4693 } else {
4694 prepare_rename.await;
4695 }
4696 }
4697 46..=49 => {
4698 let definitions = project.update(&mut cx, |project, cx| {
4699 log::info!(
4700 "Guest {}: requesting defintions for buffer {:?}",
4701 guest_id,
4702 buffer.read(cx).file().unwrap().full_path(cx)
4703 );
4704 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4705 project.definition(&buffer, offset, cx)
4706 });
4707 let definitions = cx.background().spawn(async move {
4708 definitions.await.expect("definitions request failed");
4709 });
4710 if rng.lock().gen_bool(0.3) {
4711 log::info!("Guest {}: detaching definitions request", guest_id);
4712 definitions.detach();
4713 } else {
4714 definitions.await;
4715 }
4716 }
4717 _ => {
4718 buffer.update(&mut cx, |buffer, cx| {
4719 log::info!(
4720 "Guest {}: updating buffer {:?}",
4721 guest_id,
4722 buffer.file().unwrap().full_path(cx)
4723 );
4724 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4725 });
4726 }
4727 }
4728 cx.background().simulate_random_delay().await;
4729 }
4730
4731 self.project = Some(project);
4732 (self, cx)
4733 }
4734 }
4735
4736 impl Executor for Arc<gpui::executor::Background> {
4737 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4738 self.spawn(future).detach();
4739 }
4740 }
4741
4742 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4743 channel
4744 .messages()
4745 .cursor::<()>()
4746 .map(|m| {
4747 (
4748 m.sender.github_login.clone(),
4749 m.body.clone(),
4750 m.is_pending(),
4751 )
4752 })
4753 .collect()
4754 }
4755
4756 struct EmptyView;
4757
4758 impl gpui::Entity for EmptyView {
4759 type Event = ();
4760 }
4761
4762 impl gpui::View for EmptyView {
4763 fn ui_name() -> &'static str {
4764 "empty view"
4765 }
4766
4767 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
4768 gpui::Element::boxed(gpui::elements::Empty)
4769 }
4770 }
4771}