1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use rpc::{
15 proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
16 Connection, ConnectionId, Peer, TypedEnvelope,
17};
18use sha1::{Digest as _, Sha1};
19use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
20use store::{Store, Worktree};
21use surf::StatusCode;
22use tide::log;
23use tide::{
24 http::headers::{HeaderName, CONNECTION, UPGRADE},
25 Request, Response,
26};
27use time::OffsetDateTime;
28
29type MessageHandler = Box<
30 dyn Send
31 + Sync
32 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
33>;
34
35pub struct Server {
36 peer: Arc<Peer>,
37 store: RwLock<Store>,
38 app_state: Arc<AppState>,
39 handlers: HashMap<TypeId, MessageHandler>,
40 notifications: Option<mpsc::UnboundedSender<()>>,
41}
42
43pub trait Executor {
44 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
45}
46
47pub struct RealExecutor;
48
49const MESSAGE_COUNT_PER_PAGE: usize = 100;
50const MAX_MESSAGE_LEN: usize = 1024;
51
52impl Server {
53 pub fn new(
54 app_state: Arc<AppState>,
55 peer: Arc<Peer>,
56 notifications: Option<mpsc::UnboundedSender<()>>,
57 ) -> Arc<Self> {
58 let mut server = Self {
59 peer,
60 app_state,
61 store: Default::default(),
62 handlers: Default::default(),
63 notifications,
64 };
65
66 server
67 .add_request_handler(Server::ping)
68 .add_request_handler(Server::register_project)
69 .add_message_handler(Server::unregister_project)
70 .add_request_handler(Server::share_project)
71 .add_message_handler(Server::unshare_project)
72 .add_request_handler(Server::join_project)
73 .add_message_handler(Server::leave_project)
74 .add_request_handler(Server::register_worktree)
75 .add_message_handler(Server::unregister_worktree)
76 .add_request_handler(Server::share_worktree)
77 .add_request_handler(Server::update_worktree)
78 .add_message_handler(Server::update_diagnostic_summary)
79 .add_message_handler(Server::disk_based_diagnostics_updating)
80 .add_message_handler(Server::disk_based_diagnostics_updated)
81 .add_request_handler(Server::get_definition)
82 .add_request_handler(Server::get_project_symbols)
83 .add_request_handler(Server::open_buffer)
84 .add_message_handler(Server::close_buffer)
85 .add_request_handler(Server::update_buffer)
86 .add_message_handler(Server::update_buffer_file)
87 .add_message_handler(Server::buffer_reloaded)
88 .add_message_handler(Server::buffer_saved)
89 .add_request_handler(Server::save_buffer)
90 .add_request_handler(Server::format_buffers)
91 .add_request_handler(Server::get_completions)
92 .add_request_handler(Server::apply_additional_edits_for_completion)
93 .add_request_handler(Server::get_code_actions)
94 .add_request_handler(Server::apply_code_action)
95 .add_request_handler(Server::prepare_rename)
96 .add_request_handler(Server::perform_rename)
97 .add_request_handler(Server::get_channels)
98 .add_request_handler(Server::get_users)
99 .add_request_handler(Server::join_channel)
100 .add_message_handler(Server::leave_channel)
101 .add_request_handler(Server::send_channel_message)
102 .add_request_handler(Server::get_channel_messages);
103
104 Arc::new(server)
105 }
106
107 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
108 where
109 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
110 Fut: 'static + Send + Future<Output = tide::Result<()>>,
111 M: EnvelopedMessage,
112 {
113 let prev_handler = self.handlers.insert(
114 TypeId::of::<M>(),
115 Box::new(move |server, envelope| {
116 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
117 (handler)(server, *envelope).boxed()
118 }),
119 );
120 if prev_handler.is_some() {
121 panic!("registered a handler for the same message twice");
122 }
123 self
124 }
125
126 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
127 where
128 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
129 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
130 M: RequestMessage,
131 {
132 self.add_message_handler(move |server, envelope| {
133 let receipt = envelope.receipt();
134 let response = (handler)(server.clone(), envelope);
135 async move {
136 match response.await {
137 Ok(response) => {
138 server.peer.respond(receipt, response)?;
139 Ok(())
140 }
141 Err(error) => {
142 server.peer.respond_with_error(
143 receipt,
144 proto::Error {
145 message: error.to_string(),
146 },
147 )?;
148 Err(error)
149 }
150 }
151 }
152 })
153 }
154
155 pub fn handle_connection<E: Executor>(
156 self: &Arc<Self>,
157 connection: Connection,
158 addr: String,
159 user_id: UserId,
160 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
161 executor: E,
162 ) -> impl Future<Output = ()> {
163 let mut this = self.clone();
164 async move {
165 let (connection_id, handle_io, mut incoming_rx) =
166 this.peer.add_connection(connection).await;
167
168 if let Some(send_connection_id) = send_connection_id.as_mut() {
169 let _ = send_connection_id.send(connection_id).await;
170 }
171
172 this.state_mut().add_connection(connection_id, user_id);
173 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
174 log::error!("error updating contacts for {:?}: {}", user_id, err);
175 }
176
177 let handle_io = handle_io.fuse();
178 futures::pin_mut!(handle_io);
179 loop {
180 let next_message = incoming_rx.next().fuse();
181 futures::pin_mut!(next_message);
182 futures::select_biased! {
183 result = handle_io => {
184 if let Err(err) = result {
185 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
186 }
187 break;
188 }
189 message = next_message => {
190 if let Some(message) = message {
191 let start_time = Instant::now();
192 let type_name = message.payload_type_name();
193 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
194 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
195 let notifications = this.notifications.clone();
196 let is_background = message.is_background();
197 let handle_message = (handler)(this.clone(), message);
198 let handle_message = async move {
199 if let Err(err) = handle_message.await {
200 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
201 } else {
202 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
203 }
204 if let Some(mut notifications) = notifications {
205 let _ = notifications.send(()).await;
206 }
207 };
208 if is_background {
209 executor.spawn_detached(handle_message);
210 } else {
211 handle_message.await;
212 }
213 } else {
214 log::warn!("unhandled message: {}", type_name);
215 }
216 } else {
217 log::info!("rpc connection closed {:?}", addr);
218 break;
219 }
220 }
221 }
222 }
223
224 if let Err(err) = this.sign_out(connection_id).await {
225 log::error!("error signing out connection {:?} - {:?}", addr, err);
226 }
227 }
228 }
229
230 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
231 self.peer.disconnect(connection_id);
232 let removed_connection = self.state_mut().remove_connection(connection_id)?;
233
234 for (project_id, project) in removed_connection.hosted_projects {
235 if let Some(share) = project.share {
236 broadcast(
237 connection_id,
238 share.guests.keys().copied().collect(),
239 |conn_id| {
240 self.peer
241 .send(conn_id, proto::UnshareProject { project_id })
242 },
243 )?;
244 }
245 }
246
247 for (project_id, peer_ids) in removed_connection.guest_project_ids {
248 broadcast(connection_id, peer_ids, |conn_id| {
249 self.peer.send(
250 conn_id,
251 proto::RemoveProjectCollaborator {
252 project_id,
253 peer_id: connection_id.0,
254 },
255 )
256 })?;
257 }
258
259 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
260 Ok(())
261 }
262
263 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
264 Ok(proto::Ack {})
265 }
266
267 async fn register_project(
268 mut self: Arc<Server>,
269 request: TypedEnvelope<proto::RegisterProject>,
270 ) -> tide::Result<proto::RegisterProjectResponse> {
271 let project_id = {
272 let mut state = self.state_mut();
273 let user_id = state.user_id_for_connection(request.sender_id)?;
274 state.register_project(request.sender_id, user_id)
275 };
276 Ok(proto::RegisterProjectResponse { project_id })
277 }
278
279 async fn unregister_project(
280 mut self: Arc<Server>,
281 request: TypedEnvelope<proto::UnregisterProject>,
282 ) -> tide::Result<()> {
283 let project = self
284 .state_mut()
285 .unregister_project(request.payload.project_id, request.sender_id)?;
286 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
287 Ok(())
288 }
289
290 async fn share_project(
291 mut self: Arc<Server>,
292 request: TypedEnvelope<proto::ShareProject>,
293 ) -> tide::Result<proto::Ack> {
294 self.state_mut()
295 .share_project(request.payload.project_id, request.sender_id);
296 Ok(proto::Ack {})
297 }
298
299 async fn unshare_project(
300 mut self: Arc<Server>,
301 request: TypedEnvelope<proto::UnshareProject>,
302 ) -> tide::Result<()> {
303 let project_id = request.payload.project_id;
304 let project = self
305 .state_mut()
306 .unshare_project(project_id, request.sender_id)?;
307
308 broadcast(request.sender_id, project.connection_ids, |conn_id| {
309 self.peer
310 .send(conn_id, proto::UnshareProject { project_id })
311 })?;
312 self.update_contacts_for_users(&project.authorized_user_ids)?;
313 Ok(())
314 }
315
316 async fn join_project(
317 mut self: Arc<Server>,
318 request: TypedEnvelope<proto::JoinProject>,
319 ) -> tide::Result<proto::JoinProjectResponse> {
320 let project_id = request.payload.project_id;
321
322 let user_id = self.state().user_id_for_connection(request.sender_id)?;
323 let (response, connection_ids, contact_user_ids) = self
324 .state_mut()
325 .join_project(request.sender_id, user_id, project_id)
326 .and_then(|joined| {
327 let share = joined.project.share()?;
328 let peer_count = share.guests.len();
329 let mut collaborators = Vec::with_capacity(peer_count);
330 collaborators.push(proto::Collaborator {
331 peer_id: joined.project.host_connection_id.0,
332 replica_id: 0,
333 user_id: joined.project.host_user_id.to_proto(),
334 });
335 let worktrees = joined
336 .project
337 .worktrees
338 .iter()
339 .filter_map(|(id, worktree)| {
340 worktree.share.as_ref().map(|share| proto::Worktree {
341 id: *id,
342 root_name: worktree.root_name.clone(),
343 entries: share.entries.values().cloned().collect(),
344 diagnostic_summaries: share
345 .diagnostic_summaries
346 .values()
347 .cloned()
348 .collect(),
349 weak: worktree.weak,
350 next_update_id: share.next_update_id as u64,
351 })
352 })
353 .collect();
354 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
355 if *peer_conn_id != request.sender_id {
356 collaborators.push(proto::Collaborator {
357 peer_id: peer_conn_id.0,
358 replica_id: *peer_replica_id as u32,
359 user_id: peer_user_id.to_proto(),
360 });
361 }
362 }
363 let response = proto::JoinProjectResponse {
364 worktrees,
365 replica_id: joined.replica_id as u32,
366 collaborators,
367 };
368 let connection_ids = joined.project.connection_ids();
369 let contact_user_ids = joined.project.authorized_user_ids();
370 Ok((response, connection_ids, contact_user_ids))
371 })?;
372
373 broadcast(request.sender_id, connection_ids, |conn_id| {
374 self.peer.send(
375 conn_id,
376 proto::AddProjectCollaborator {
377 project_id,
378 collaborator: Some(proto::Collaborator {
379 peer_id: request.sender_id.0,
380 replica_id: response.replica_id,
381 user_id: user_id.to_proto(),
382 }),
383 },
384 )
385 })?;
386 self.update_contacts_for_users(&contact_user_ids)?;
387 Ok(response)
388 }
389
390 async fn leave_project(
391 mut self: Arc<Server>,
392 request: TypedEnvelope<proto::LeaveProject>,
393 ) -> tide::Result<()> {
394 let sender_id = request.sender_id;
395 let project_id = request.payload.project_id;
396 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
397
398 broadcast(sender_id, worktree.connection_ids, |conn_id| {
399 self.peer.send(
400 conn_id,
401 proto::RemoveProjectCollaborator {
402 project_id,
403 peer_id: sender_id.0,
404 },
405 )
406 })?;
407 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
408
409 Ok(())
410 }
411
412 async fn register_worktree(
413 mut self: Arc<Server>,
414 request: TypedEnvelope<proto::RegisterWorktree>,
415 ) -> tide::Result<proto::Ack> {
416 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
417
418 let mut contact_user_ids = HashSet::default();
419 contact_user_ids.insert(host_user_id);
420 for github_login in request.payload.authorized_logins {
421 let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
422 contact_user_ids.insert(contact_user_id);
423 }
424
425 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
426 self.state_mut().register_worktree(
427 request.payload.project_id,
428 request.payload.worktree_id,
429 request.sender_id,
430 Worktree {
431 authorized_user_ids: contact_user_ids.clone(),
432 root_name: request.payload.root_name,
433 share: None,
434 weak: false,
435 },
436 )?;
437 self.update_contacts_for_users(&contact_user_ids)?;
438 Ok(proto::Ack {})
439 }
440
441 async fn unregister_worktree(
442 mut self: Arc<Server>,
443 request: TypedEnvelope<proto::UnregisterWorktree>,
444 ) -> tide::Result<()> {
445 let project_id = request.payload.project_id;
446 let worktree_id = request.payload.worktree_id;
447 let (worktree, guest_connection_ids) =
448 self.state_mut()
449 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
450 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
451 self.peer.send(
452 conn_id,
453 proto::UnregisterWorktree {
454 project_id,
455 worktree_id,
456 },
457 )
458 })?;
459 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
460 Ok(())
461 }
462
463 async fn share_worktree(
464 mut self: Arc<Server>,
465 mut request: TypedEnvelope<proto::ShareWorktree>,
466 ) -> tide::Result<proto::Ack> {
467 let worktree = request
468 .payload
469 .worktree
470 .as_mut()
471 .ok_or_else(|| anyhow!("missing worktree"))?;
472 let entries = worktree
473 .entries
474 .iter()
475 .map(|entry| (entry.id, entry.clone()))
476 .collect();
477 let diagnostic_summaries = worktree
478 .diagnostic_summaries
479 .iter()
480 .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
481 .collect();
482
483 let shared_worktree = self.state_mut().share_worktree(
484 request.payload.project_id,
485 worktree.id,
486 request.sender_id,
487 entries,
488 diagnostic_summaries,
489 worktree.next_update_id,
490 )?;
491
492 broadcast(
493 request.sender_id,
494 shared_worktree.connection_ids,
495 |connection_id| {
496 self.peer
497 .forward_send(request.sender_id, connection_id, request.payload.clone())
498 },
499 )?;
500 self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
501
502 Ok(proto::Ack {})
503 }
504
505 async fn update_worktree(
506 mut self: Arc<Server>,
507 request: TypedEnvelope<proto::UpdateWorktree>,
508 ) -> tide::Result<proto::Ack> {
509 let connection_ids = self.state_mut().update_worktree(
510 request.sender_id,
511 request.payload.project_id,
512 request.payload.worktree_id,
513 request.payload.id,
514 &request.payload.removed_entries,
515 &request.payload.updated_entries,
516 )?;
517
518 broadcast(request.sender_id, connection_ids, |connection_id| {
519 self.peer
520 .forward_send(request.sender_id, connection_id, request.payload.clone())
521 })?;
522
523 Ok(proto::Ack {})
524 }
525
526 async fn update_diagnostic_summary(
527 mut self: Arc<Server>,
528 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
529 ) -> tide::Result<()> {
530 let summary = request
531 .payload
532 .summary
533 .clone()
534 .ok_or_else(|| anyhow!("invalid summary"))?;
535 let receiver_ids = self.state_mut().update_diagnostic_summary(
536 request.payload.project_id,
537 request.payload.worktree_id,
538 request.sender_id,
539 summary,
540 )?;
541
542 broadcast(request.sender_id, receiver_ids, |connection_id| {
543 self.peer
544 .forward_send(request.sender_id, connection_id, request.payload.clone())
545 })?;
546 Ok(())
547 }
548
549 async fn disk_based_diagnostics_updating(
550 self: Arc<Server>,
551 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
552 ) -> tide::Result<()> {
553 let receiver_ids = self
554 .state()
555 .project_connection_ids(request.payload.project_id, request.sender_id)?;
556 broadcast(request.sender_id, receiver_ids, |connection_id| {
557 self.peer
558 .forward_send(request.sender_id, connection_id, request.payload.clone())
559 })?;
560 Ok(())
561 }
562
563 async fn disk_based_diagnostics_updated(
564 self: Arc<Server>,
565 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
566 ) -> tide::Result<()> {
567 let receiver_ids = self
568 .state()
569 .project_connection_ids(request.payload.project_id, request.sender_id)?;
570 broadcast(request.sender_id, receiver_ids, |connection_id| {
571 self.peer
572 .forward_send(request.sender_id, connection_id, request.payload.clone())
573 })?;
574 Ok(())
575 }
576
577 async fn get_definition(
578 self: Arc<Server>,
579 request: TypedEnvelope<proto::GetDefinition>,
580 ) -> tide::Result<proto::GetDefinitionResponse> {
581 let host_connection_id = self
582 .state()
583 .read_project(request.payload.project_id, request.sender_id)?
584 .host_connection_id;
585 Ok(self
586 .peer
587 .forward_request(request.sender_id, host_connection_id, request.payload)
588 .await?)
589 }
590
591 async fn get_project_symbols(
592 self: Arc<Server>,
593 request: TypedEnvelope<proto::GetProjectSymbols>,
594 ) -> tide::Result<proto::GetProjectSymbolsResponse> {
595 let host_connection_id = self
596 .state()
597 .read_project(request.payload.project_id, request.sender_id)?
598 .host_connection_id;
599 Ok(self
600 .peer
601 .forward_request(request.sender_id, host_connection_id, request.payload)
602 .await?)
603 }
604
605 async fn open_buffer(
606 self: Arc<Server>,
607 request: TypedEnvelope<proto::OpenBuffer>,
608 ) -> tide::Result<proto::OpenBufferResponse> {
609 let host_connection_id = self
610 .state()
611 .read_project(request.payload.project_id, request.sender_id)?
612 .host_connection_id;
613 Ok(self
614 .peer
615 .forward_request(request.sender_id, host_connection_id, request.payload)
616 .await?)
617 }
618
619 async fn close_buffer(
620 self: Arc<Server>,
621 request: TypedEnvelope<proto::CloseBuffer>,
622 ) -> tide::Result<()> {
623 let host_connection_id = self
624 .state()
625 .read_project(request.payload.project_id, request.sender_id)?
626 .host_connection_id;
627 self.peer
628 .forward_send(request.sender_id, host_connection_id, request.payload)?;
629 Ok(())
630 }
631
632 async fn save_buffer(
633 self: Arc<Server>,
634 request: TypedEnvelope<proto::SaveBuffer>,
635 ) -> tide::Result<proto::BufferSaved> {
636 let host;
637 let mut guests;
638 {
639 let state = self.state();
640 let project = state.read_project(request.payload.project_id, request.sender_id)?;
641 host = project.host_connection_id;
642 guests = project.guest_connection_ids()
643 }
644
645 let response = self
646 .peer
647 .forward_request(request.sender_id, host, request.payload.clone())
648 .await?;
649
650 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
651 broadcast(host, guests, |conn_id| {
652 self.peer.forward_send(host, conn_id, response.clone())
653 })?;
654
655 Ok(response)
656 }
657
658 async fn format_buffers(
659 self: Arc<Server>,
660 request: TypedEnvelope<proto::FormatBuffers>,
661 ) -> tide::Result<proto::FormatBuffersResponse> {
662 let host = self
663 .state()
664 .read_project(request.payload.project_id, request.sender_id)?
665 .host_connection_id;
666 Ok(self
667 .peer
668 .forward_request(request.sender_id, host, request.payload.clone())
669 .await?)
670 }
671
672 async fn get_completions(
673 self: Arc<Server>,
674 request: TypedEnvelope<proto::GetCompletions>,
675 ) -> tide::Result<proto::GetCompletionsResponse> {
676 let host = self
677 .state()
678 .read_project(request.payload.project_id, request.sender_id)?
679 .host_connection_id;
680 Ok(self
681 .peer
682 .forward_request(request.sender_id, host, request.payload.clone())
683 .await?)
684 }
685
686 async fn apply_additional_edits_for_completion(
687 self: Arc<Server>,
688 request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
689 ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
690 let host = self
691 .state()
692 .read_project(request.payload.project_id, request.sender_id)?
693 .host_connection_id;
694 Ok(self
695 .peer
696 .forward_request(request.sender_id, host, request.payload.clone())
697 .await?)
698 }
699
700 async fn get_code_actions(
701 self: Arc<Server>,
702 request: TypedEnvelope<proto::GetCodeActions>,
703 ) -> tide::Result<proto::GetCodeActionsResponse> {
704 let host = self
705 .state()
706 .read_project(request.payload.project_id, request.sender_id)?
707 .host_connection_id;
708 Ok(self
709 .peer
710 .forward_request(request.sender_id, host, request.payload.clone())
711 .await?)
712 }
713
714 async fn apply_code_action(
715 self: Arc<Server>,
716 request: TypedEnvelope<proto::ApplyCodeAction>,
717 ) -> tide::Result<proto::ApplyCodeActionResponse> {
718 let host = self
719 .state()
720 .read_project(request.payload.project_id, request.sender_id)?
721 .host_connection_id;
722 Ok(self
723 .peer
724 .forward_request(request.sender_id, host, request.payload.clone())
725 .await?)
726 }
727
728 async fn prepare_rename(
729 self: Arc<Server>,
730 request: TypedEnvelope<proto::PrepareRename>,
731 ) -> tide::Result<proto::PrepareRenameResponse> {
732 let host = self
733 .state()
734 .read_project(request.payload.project_id, request.sender_id)?
735 .host_connection_id;
736 Ok(self
737 .peer
738 .forward_request(request.sender_id, host, request.payload.clone())
739 .await?)
740 }
741
742 async fn perform_rename(
743 self: Arc<Server>,
744 request: TypedEnvelope<proto::PerformRename>,
745 ) -> tide::Result<proto::PerformRenameResponse> {
746 let host = self
747 .state()
748 .read_project(request.payload.project_id, request.sender_id)?
749 .host_connection_id;
750 Ok(self
751 .peer
752 .forward_request(request.sender_id, host, request.payload.clone())
753 .await?)
754 }
755
756 async fn update_buffer(
757 self: Arc<Server>,
758 request: TypedEnvelope<proto::UpdateBuffer>,
759 ) -> tide::Result<proto::Ack> {
760 let receiver_ids = self
761 .state()
762 .project_connection_ids(request.payload.project_id, request.sender_id)?;
763 broadcast(request.sender_id, receiver_ids, |connection_id| {
764 self.peer
765 .forward_send(request.sender_id, connection_id, request.payload.clone())
766 })?;
767 Ok(proto::Ack {})
768 }
769
770 async fn update_buffer_file(
771 self: Arc<Server>,
772 request: TypedEnvelope<proto::UpdateBufferFile>,
773 ) -> tide::Result<()> {
774 let receiver_ids = self
775 .state()
776 .project_connection_ids(request.payload.project_id, request.sender_id)?;
777 broadcast(request.sender_id, receiver_ids, |connection_id| {
778 self.peer
779 .forward_send(request.sender_id, connection_id, request.payload.clone())
780 })?;
781 Ok(())
782 }
783
784 async fn buffer_reloaded(
785 self: Arc<Server>,
786 request: TypedEnvelope<proto::BufferReloaded>,
787 ) -> tide::Result<()> {
788 let receiver_ids = self
789 .state()
790 .project_connection_ids(request.payload.project_id, request.sender_id)?;
791 broadcast(request.sender_id, receiver_ids, |connection_id| {
792 self.peer
793 .forward_send(request.sender_id, connection_id, request.payload.clone())
794 })?;
795 Ok(())
796 }
797
798 async fn buffer_saved(
799 self: Arc<Server>,
800 request: TypedEnvelope<proto::BufferSaved>,
801 ) -> tide::Result<()> {
802 let receiver_ids = self
803 .state()
804 .project_connection_ids(request.payload.project_id, request.sender_id)?;
805 broadcast(request.sender_id, receiver_ids, |connection_id| {
806 self.peer
807 .forward_send(request.sender_id, connection_id, request.payload.clone())
808 })?;
809 Ok(())
810 }
811
812 async fn get_channels(
813 self: Arc<Server>,
814 request: TypedEnvelope<proto::GetChannels>,
815 ) -> tide::Result<proto::GetChannelsResponse> {
816 let user_id = self.state().user_id_for_connection(request.sender_id)?;
817 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
818 Ok(proto::GetChannelsResponse {
819 channels: channels
820 .into_iter()
821 .map(|chan| proto::Channel {
822 id: chan.id.to_proto(),
823 name: chan.name,
824 })
825 .collect(),
826 })
827 }
828
829 async fn get_users(
830 self: Arc<Server>,
831 request: TypedEnvelope<proto::GetUsers>,
832 ) -> tide::Result<proto::GetUsersResponse> {
833 let user_ids = request
834 .payload
835 .user_ids
836 .into_iter()
837 .map(UserId::from_proto)
838 .collect();
839 let users = self
840 .app_state
841 .db
842 .get_users_by_ids(user_ids)
843 .await?
844 .into_iter()
845 .map(|user| proto::User {
846 id: user.id.to_proto(),
847 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
848 github_login: user.github_login,
849 })
850 .collect();
851 Ok(proto::GetUsersResponse { users })
852 }
853
854 fn update_contacts_for_users<'a>(
855 self: &Arc<Server>,
856 user_ids: impl IntoIterator<Item = &'a UserId>,
857 ) -> anyhow::Result<()> {
858 let mut result = Ok(());
859 let state = self.state();
860 for user_id in user_ids {
861 let contacts = state.contacts_for_user(*user_id);
862 for connection_id in state.connection_ids_for_user(*user_id) {
863 if let Err(error) = self.peer.send(
864 connection_id,
865 proto::UpdateContacts {
866 contacts: contacts.clone(),
867 },
868 ) {
869 result = Err(error);
870 }
871 }
872 }
873 result
874 }
875
876 async fn join_channel(
877 mut self: Arc<Self>,
878 request: TypedEnvelope<proto::JoinChannel>,
879 ) -> tide::Result<proto::JoinChannelResponse> {
880 let user_id = self.state().user_id_for_connection(request.sender_id)?;
881 let channel_id = ChannelId::from_proto(request.payload.channel_id);
882 if !self
883 .app_state
884 .db
885 .can_user_access_channel(user_id, channel_id)
886 .await?
887 {
888 Err(anyhow!("access denied"))?;
889 }
890
891 self.state_mut().join_channel(request.sender_id, channel_id);
892 let messages = self
893 .app_state
894 .db
895 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
896 .await?
897 .into_iter()
898 .map(|msg| proto::ChannelMessage {
899 id: msg.id.to_proto(),
900 body: msg.body,
901 timestamp: msg.sent_at.unix_timestamp() as u64,
902 sender_id: msg.sender_id.to_proto(),
903 nonce: Some(msg.nonce.as_u128().into()),
904 })
905 .collect::<Vec<_>>();
906 Ok(proto::JoinChannelResponse {
907 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
908 messages,
909 })
910 }
911
912 async fn leave_channel(
913 mut self: Arc<Self>,
914 request: TypedEnvelope<proto::LeaveChannel>,
915 ) -> tide::Result<()> {
916 let user_id = self.state().user_id_for_connection(request.sender_id)?;
917 let channel_id = ChannelId::from_proto(request.payload.channel_id);
918 if !self
919 .app_state
920 .db
921 .can_user_access_channel(user_id, channel_id)
922 .await?
923 {
924 Err(anyhow!("access denied"))?;
925 }
926
927 self.state_mut()
928 .leave_channel(request.sender_id, channel_id);
929
930 Ok(())
931 }
932
933 async fn send_channel_message(
934 self: Arc<Self>,
935 request: TypedEnvelope<proto::SendChannelMessage>,
936 ) -> tide::Result<proto::SendChannelMessageResponse> {
937 let channel_id = ChannelId::from_proto(request.payload.channel_id);
938 let user_id;
939 let connection_ids;
940 {
941 let state = self.state();
942 user_id = state.user_id_for_connection(request.sender_id)?;
943 connection_ids = state.channel_connection_ids(channel_id)?;
944 }
945
946 // Validate the message body.
947 let body = request.payload.body.trim().to_string();
948 if body.len() > MAX_MESSAGE_LEN {
949 return Err(anyhow!("message is too long"))?;
950 }
951 if body.is_empty() {
952 return Err(anyhow!("message can't be blank"))?;
953 }
954
955 let timestamp = OffsetDateTime::now_utc();
956 let nonce = request
957 .payload
958 .nonce
959 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
960
961 let message_id = self
962 .app_state
963 .db
964 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
965 .await?
966 .to_proto();
967 let message = proto::ChannelMessage {
968 sender_id: user_id.to_proto(),
969 id: message_id,
970 body,
971 timestamp: timestamp.unix_timestamp() as u64,
972 nonce: Some(nonce),
973 };
974 broadcast(request.sender_id, connection_ids, |conn_id| {
975 self.peer.send(
976 conn_id,
977 proto::ChannelMessageSent {
978 channel_id: channel_id.to_proto(),
979 message: Some(message.clone()),
980 },
981 )
982 })?;
983 Ok(proto::SendChannelMessageResponse {
984 message: Some(message),
985 })
986 }
987
988 async fn get_channel_messages(
989 self: Arc<Self>,
990 request: TypedEnvelope<proto::GetChannelMessages>,
991 ) -> tide::Result<proto::GetChannelMessagesResponse> {
992 let user_id = self.state().user_id_for_connection(request.sender_id)?;
993 let channel_id = ChannelId::from_proto(request.payload.channel_id);
994 if !self
995 .app_state
996 .db
997 .can_user_access_channel(user_id, channel_id)
998 .await?
999 {
1000 Err(anyhow!("access denied"))?;
1001 }
1002
1003 let messages = self
1004 .app_state
1005 .db
1006 .get_channel_messages(
1007 channel_id,
1008 MESSAGE_COUNT_PER_PAGE,
1009 Some(MessageId::from_proto(request.payload.before_message_id)),
1010 )
1011 .await?
1012 .into_iter()
1013 .map(|msg| proto::ChannelMessage {
1014 id: msg.id.to_proto(),
1015 body: msg.body,
1016 timestamp: msg.sent_at.unix_timestamp() as u64,
1017 sender_id: msg.sender_id.to_proto(),
1018 nonce: Some(msg.nonce.as_u128().into()),
1019 })
1020 .collect::<Vec<_>>();
1021
1022 Ok(proto::GetChannelMessagesResponse {
1023 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1024 messages,
1025 })
1026 }
1027
1028 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1029 self.store.read()
1030 }
1031
1032 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1033 self.store.write()
1034 }
1035}
1036
1037impl Executor for RealExecutor {
1038 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1039 task::spawn(future);
1040 }
1041}
1042
1043fn broadcast<F>(
1044 sender_id: ConnectionId,
1045 receiver_ids: Vec<ConnectionId>,
1046 mut f: F,
1047) -> anyhow::Result<()>
1048where
1049 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1050{
1051 let mut result = Ok(());
1052 for receiver_id in receiver_ids {
1053 if receiver_id != sender_id {
1054 if let Err(error) = f(receiver_id) {
1055 if result.is_ok() {
1056 result = Err(error);
1057 }
1058 }
1059 }
1060 }
1061 result
1062}
1063
1064pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1065 let server = Server::new(app.state().clone(), rpc.clone(), None);
1066 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1067 let server = server.clone();
1068 async move {
1069 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1070
1071 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1072 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1073 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1074 let client_protocol_version: Option<u32> = request
1075 .header("X-Zed-Protocol-Version")
1076 .and_then(|v| v.as_str().parse().ok());
1077
1078 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1079 return Ok(Response::new(StatusCode::UpgradeRequired));
1080 }
1081
1082 let header = match request.header("Sec-Websocket-Key") {
1083 Some(h) => h.as_str(),
1084 None => return Err(anyhow!("expected sec-websocket-key"))?,
1085 };
1086
1087 let user_id = process_auth_header(&request).await?;
1088
1089 let mut response = Response::new(StatusCode::SwitchingProtocols);
1090 response.insert_header(UPGRADE, "websocket");
1091 response.insert_header(CONNECTION, "Upgrade");
1092 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1093 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1094 response.insert_header("Sec-Websocket-Version", "13");
1095
1096 let http_res: &mut tide::http::Response = response.as_mut();
1097 let upgrade_receiver = http_res.recv_upgrade().await;
1098 let addr = request.remote().unwrap_or("unknown").to_string();
1099 task::spawn(async move {
1100 if let Some(stream) = upgrade_receiver.await {
1101 server
1102 .handle_connection(
1103 Connection::new(
1104 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1105 ),
1106 addr,
1107 user_id,
1108 None,
1109 RealExecutor,
1110 )
1111 .await;
1112 }
1113 });
1114
1115 Ok(response)
1116 }
1117 });
1118}
1119
1120fn header_contains_ignore_case<T>(
1121 request: &tide::Request<T>,
1122 header_name: HeaderName,
1123 value: &str,
1124) -> bool {
1125 request
1126 .header(header_name)
1127 .map(|h| {
1128 h.as_str()
1129 .split(',')
1130 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1131 })
1132 .unwrap_or(false)
1133}
1134
1135#[cfg(test)]
1136mod tests {
1137 use super::*;
1138 use crate::{
1139 auth,
1140 db::{tests::TestDb, UserId},
1141 github, AppState, Config,
1142 };
1143 use ::rpc::Peer;
1144 use collections::BTreeMap;
1145 use gpui::{executor, ModelHandle, TestAppContext};
1146 use parking_lot::Mutex;
1147 use postage::{sink::Sink, watch};
1148 use rand::prelude::*;
1149 use rpc::PeerId;
1150 use serde_json::json;
1151 use sqlx::types::time::OffsetDateTime;
1152 use std::{
1153 cell::{Cell, RefCell},
1154 env,
1155 ops::Deref,
1156 path::Path,
1157 rc::Rc,
1158 sync::{
1159 atomic::{AtomicBool, Ordering::SeqCst},
1160 Arc,
1161 },
1162 time::Duration,
1163 };
1164 use zed::{
1165 client::{
1166 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1167 EstablishConnectionError, UserStore,
1168 },
1169 editor::{
1170 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, EditorSettings,
1171 Input, MultiBuffer, Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1172 },
1173 fs::{FakeFs, Fs as _},
1174 language::{
1175 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1176 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1177 },
1178 lsp,
1179 project::{DiagnosticSummary, Project, ProjectPath},
1180 workspace::{Workspace, WorkspaceParams},
1181 };
1182
1183 #[cfg(test)]
1184 #[ctor::ctor]
1185 fn init_logger() {
1186 if std::env::var("RUST_LOG").is_ok() {
1187 env_logger::init();
1188 }
1189 }
1190
1191 #[gpui::test(iterations = 10)]
1192 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1193 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1194 let lang_registry = Arc::new(LanguageRegistry::new());
1195 let fs = FakeFs::new(cx_a.background());
1196 cx_a.foreground().forbid_parking();
1197
1198 // Connect to a server as 2 clients.
1199 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1200 let client_a = server.create_client(&mut cx_a, "user_a").await;
1201 let client_b = server.create_client(&mut cx_b, "user_b").await;
1202
1203 // Share a project as client A
1204 fs.insert_tree(
1205 "/a",
1206 json!({
1207 ".zed.toml": r#"collaborators = ["user_b"]"#,
1208 "a.txt": "a-contents",
1209 "b.txt": "b-contents",
1210 }),
1211 )
1212 .await;
1213 let project_a = cx_a.update(|cx| {
1214 Project::local(
1215 client_a.clone(),
1216 client_a.user_store.clone(),
1217 lang_registry.clone(),
1218 fs.clone(),
1219 cx,
1220 )
1221 });
1222 let (worktree_a, _) = project_a
1223 .update(&mut cx_a, |p, cx| {
1224 p.find_or_create_local_worktree("/a", false, cx)
1225 })
1226 .await
1227 .unwrap();
1228 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1229 worktree_a
1230 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1231 .await;
1232 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1233 project_a
1234 .update(&mut cx_a, |p, cx| p.share(cx))
1235 .await
1236 .unwrap();
1237
1238 // Join that project as client B
1239 let project_b = Project::remote(
1240 project_id,
1241 client_b.clone(),
1242 client_b.user_store.clone(),
1243 lang_registry.clone(),
1244 fs.clone(),
1245 &mut cx_b.to_async(),
1246 )
1247 .await
1248 .unwrap();
1249
1250 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1251 assert_eq!(
1252 project
1253 .collaborators()
1254 .get(&client_a.peer_id)
1255 .unwrap()
1256 .user
1257 .github_login,
1258 "user_a"
1259 );
1260 project.replica_id()
1261 });
1262 project_a
1263 .condition(&cx_a, |tree, _| {
1264 tree.collaborators()
1265 .get(&client_b.peer_id)
1266 .map_or(false, |collaborator| {
1267 collaborator.replica_id == replica_id_b
1268 && collaborator.user.github_login == "user_b"
1269 })
1270 })
1271 .await;
1272
1273 // Open the same file as client B and client A.
1274 let buffer_b = project_b
1275 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1276 .await
1277 .unwrap();
1278 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1279 buffer_b.read_with(&cx_b, |buf, cx| {
1280 assert_eq!(buf.read(cx).text(), "b-contents")
1281 });
1282 project_a.read_with(&cx_a, |project, cx| {
1283 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1284 });
1285 let buffer_a = project_a
1286 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1287 .await
1288 .unwrap();
1289
1290 let editor_b = cx_b.add_view(window_b, |cx| {
1291 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1292 });
1293
1294 // TODO
1295 // // Create a selection set as client B and see that selection set as client A.
1296 // buffer_a
1297 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1298 // .await;
1299
1300 // Edit the buffer as client B and see that edit as client A.
1301 editor_b.update(&mut cx_b, |editor, cx| {
1302 editor.handle_input(&Input("ok, ".into()), cx)
1303 });
1304 buffer_a
1305 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1306 .await;
1307
1308 // TODO
1309 // // Remove the selection set as client B, see those selections disappear as client A.
1310 cx_b.update(move |_| drop(editor_b));
1311 // buffer_a
1312 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1313 // .await;
1314
1315 // Close the buffer as client A, see that the buffer is closed.
1316 cx_a.update(move |_| drop(buffer_a));
1317 project_a
1318 .condition(&cx_a, |project, cx| {
1319 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1320 })
1321 .await;
1322
1323 // Dropping the client B's project removes client B from client A's collaborators.
1324 cx_b.update(move |_| drop(project_b));
1325 project_a
1326 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1327 .await;
1328 }
1329
1330 #[gpui::test(iterations = 10)]
1331 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1332 let lang_registry = Arc::new(LanguageRegistry::new());
1333 let fs = FakeFs::new(cx_a.background());
1334 cx_a.foreground().forbid_parking();
1335
1336 // Connect to a server as 2 clients.
1337 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1338 let client_a = server.create_client(&mut cx_a, "user_a").await;
1339 let client_b = server.create_client(&mut cx_b, "user_b").await;
1340
1341 // Share a project as client A
1342 fs.insert_tree(
1343 "/a",
1344 json!({
1345 ".zed.toml": r#"collaborators = ["user_b"]"#,
1346 "a.txt": "a-contents",
1347 "b.txt": "b-contents",
1348 }),
1349 )
1350 .await;
1351 let project_a = cx_a.update(|cx| {
1352 Project::local(
1353 client_a.clone(),
1354 client_a.user_store.clone(),
1355 lang_registry.clone(),
1356 fs.clone(),
1357 cx,
1358 )
1359 });
1360 let (worktree_a, _) = project_a
1361 .update(&mut cx_a, |p, cx| {
1362 p.find_or_create_local_worktree("/a", false, cx)
1363 })
1364 .await
1365 .unwrap();
1366 worktree_a
1367 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1368 .await;
1369 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1370 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1371 project_a
1372 .update(&mut cx_a, |p, cx| p.share(cx))
1373 .await
1374 .unwrap();
1375 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1376
1377 // Join that project as client B
1378 let project_b = Project::remote(
1379 project_id,
1380 client_b.clone(),
1381 client_b.user_store.clone(),
1382 lang_registry.clone(),
1383 fs.clone(),
1384 &mut cx_b.to_async(),
1385 )
1386 .await
1387 .unwrap();
1388 project_b
1389 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1390 .await
1391 .unwrap();
1392
1393 // Unshare the project as client A
1394 project_a
1395 .update(&mut cx_a, |project, cx| project.unshare(cx))
1396 .await
1397 .unwrap();
1398 project_b
1399 .condition(&mut cx_b, |project, _| project.is_read_only())
1400 .await;
1401 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1402 drop(project_b);
1403
1404 // Share the project again and ensure guests can still join.
1405 project_a
1406 .update(&mut cx_a, |project, cx| project.share(cx))
1407 .await
1408 .unwrap();
1409 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1410
1411 let project_c = Project::remote(
1412 project_id,
1413 client_b.clone(),
1414 client_b.user_store.clone(),
1415 lang_registry.clone(),
1416 fs.clone(),
1417 &mut cx_b.to_async(),
1418 )
1419 .await
1420 .unwrap();
1421 project_c
1422 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1423 .await
1424 .unwrap();
1425 }
1426
1427 #[gpui::test(iterations = 10)]
1428 async fn test_propagate_saves_and_fs_changes(
1429 mut cx_a: TestAppContext,
1430 mut cx_b: TestAppContext,
1431 mut cx_c: TestAppContext,
1432 ) {
1433 let lang_registry = Arc::new(LanguageRegistry::new());
1434 let fs = FakeFs::new(cx_a.background());
1435 cx_a.foreground().forbid_parking();
1436
1437 // Connect to a server as 3 clients.
1438 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1439 let client_a = server.create_client(&mut cx_a, "user_a").await;
1440 let client_b = server.create_client(&mut cx_b, "user_b").await;
1441 let client_c = server.create_client(&mut cx_c, "user_c").await;
1442
1443 // Share a worktree as client A.
1444 fs.insert_tree(
1445 "/a",
1446 json!({
1447 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1448 "file1": "",
1449 "file2": ""
1450 }),
1451 )
1452 .await;
1453 let project_a = cx_a.update(|cx| {
1454 Project::local(
1455 client_a.clone(),
1456 client_a.user_store.clone(),
1457 lang_registry.clone(),
1458 fs.clone(),
1459 cx,
1460 )
1461 });
1462 let (worktree_a, _) = project_a
1463 .update(&mut cx_a, |p, cx| {
1464 p.find_or_create_local_worktree("/a", false, cx)
1465 })
1466 .await
1467 .unwrap();
1468 worktree_a
1469 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1470 .await;
1471 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1472 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1473 project_a
1474 .update(&mut cx_a, |p, cx| p.share(cx))
1475 .await
1476 .unwrap();
1477
1478 // Join that worktree as clients B and C.
1479 let project_b = Project::remote(
1480 project_id,
1481 client_b.clone(),
1482 client_b.user_store.clone(),
1483 lang_registry.clone(),
1484 fs.clone(),
1485 &mut cx_b.to_async(),
1486 )
1487 .await
1488 .unwrap();
1489 let project_c = Project::remote(
1490 project_id,
1491 client_c.clone(),
1492 client_c.user_store.clone(),
1493 lang_registry.clone(),
1494 fs.clone(),
1495 &mut cx_c.to_async(),
1496 )
1497 .await
1498 .unwrap();
1499 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1500 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1501
1502 // Open and edit a buffer as both guests B and C.
1503 let buffer_b = project_b
1504 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1505 .await
1506 .unwrap();
1507 let buffer_c = project_c
1508 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1509 .await
1510 .unwrap();
1511 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1512 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1513
1514 // Open and edit that buffer as the host.
1515 let buffer_a = project_a
1516 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1517 .await
1518 .unwrap();
1519
1520 buffer_a
1521 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1522 .await;
1523 buffer_a.update(&mut cx_a, |buf, cx| {
1524 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1525 });
1526
1527 // Wait for edits to propagate
1528 buffer_a
1529 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1530 .await;
1531 buffer_b
1532 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1533 .await;
1534 buffer_c
1535 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1536 .await;
1537
1538 // Edit the buffer as the host and concurrently save as guest B.
1539 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1540 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1541 save_b.await.unwrap();
1542 assert_eq!(
1543 fs.load("/a/file1".as_ref()).await.unwrap(),
1544 "hi-a, i-am-c, i-am-b, i-am-a"
1545 );
1546 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1547 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1548 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1549
1550 // Make changes on host's file system, see those changes on guest worktrees.
1551 fs.rename(
1552 "/a/file1".as_ref(),
1553 "/a/file1-renamed".as_ref(),
1554 Default::default(),
1555 )
1556 .await
1557 .unwrap();
1558
1559 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1560 .await
1561 .unwrap();
1562 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1563
1564 worktree_a
1565 .condition(&cx_a, |tree, _| {
1566 tree.paths()
1567 .map(|p| p.to_string_lossy())
1568 .collect::<Vec<_>>()
1569 == [".zed.toml", "file1-renamed", "file3", "file4"]
1570 })
1571 .await;
1572 worktree_b
1573 .condition(&cx_b, |tree, _| {
1574 tree.paths()
1575 .map(|p| p.to_string_lossy())
1576 .collect::<Vec<_>>()
1577 == [".zed.toml", "file1-renamed", "file3", "file4"]
1578 })
1579 .await;
1580 worktree_c
1581 .condition(&cx_c, |tree, _| {
1582 tree.paths()
1583 .map(|p| p.to_string_lossy())
1584 .collect::<Vec<_>>()
1585 == [".zed.toml", "file1-renamed", "file3", "file4"]
1586 })
1587 .await;
1588
1589 // Ensure buffer files are updated as well.
1590 buffer_a
1591 .condition(&cx_a, |buf, _| {
1592 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1593 })
1594 .await;
1595 buffer_b
1596 .condition(&cx_b, |buf, _| {
1597 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1598 })
1599 .await;
1600 buffer_c
1601 .condition(&cx_c, |buf, _| {
1602 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1603 })
1604 .await;
1605 }
1606
1607 #[gpui::test(iterations = 10)]
1608 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1609 cx_a.foreground().forbid_parking();
1610 let lang_registry = Arc::new(LanguageRegistry::new());
1611 let fs = FakeFs::new(cx_a.background());
1612
1613 // Connect to a server as 2 clients.
1614 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1615 let client_a = server.create_client(&mut cx_a, "user_a").await;
1616 let client_b = server.create_client(&mut cx_b, "user_b").await;
1617
1618 // Share a project as client A
1619 fs.insert_tree(
1620 "/dir",
1621 json!({
1622 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1623 "a.txt": "a-contents",
1624 }),
1625 )
1626 .await;
1627
1628 let project_a = cx_a.update(|cx| {
1629 Project::local(
1630 client_a.clone(),
1631 client_a.user_store.clone(),
1632 lang_registry.clone(),
1633 fs.clone(),
1634 cx,
1635 )
1636 });
1637 let (worktree_a, _) = project_a
1638 .update(&mut cx_a, |p, cx| {
1639 p.find_or_create_local_worktree("/dir", false, cx)
1640 })
1641 .await
1642 .unwrap();
1643 worktree_a
1644 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1645 .await;
1646 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1647 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1648 project_a
1649 .update(&mut cx_a, |p, cx| p.share(cx))
1650 .await
1651 .unwrap();
1652
1653 // Join that project as client B
1654 let project_b = Project::remote(
1655 project_id,
1656 client_b.clone(),
1657 client_b.user_store.clone(),
1658 lang_registry.clone(),
1659 fs.clone(),
1660 &mut cx_b.to_async(),
1661 )
1662 .await
1663 .unwrap();
1664
1665 // Open a buffer as client B
1666 let buffer_b = project_b
1667 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1668 .await
1669 .unwrap();
1670
1671 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1672 buffer_b.read_with(&cx_b, |buf, _| {
1673 assert!(buf.is_dirty());
1674 assert!(!buf.has_conflict());
1675 });
1676
1677 buffer_b
1678 .update(&mut cx_b, |buf, cx| buf.save(cx))
1679 .await
1680 .unwrap();
1681 buffer_b
1682 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1683 .await;
1684 buffer_b.read_with(&cx_b, |buf, _| {
1685 assert!(!buf.has_conflict());
1686 });
1687
1688 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1689 buffer_b.read_with(&cx_b, |buf, _| {
1690 assert!(buf.is_dirty());
1691 assert!(!buf.has_conflict());
1692 });
1693 }
1694
1695 #[gpui::test(iterations = 10)]
1696 async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1697 cx_a.foreground().forbid_parking();
1698 let lang_registry = Arc::new(LanguageRegistry::new());
1699 let fs = FakeFs::new(cx_a.background());
1700
1701 // Connect to a server as 2 clients.
1702 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1703 let client_a = server.create_client(&mut cx_a, "user_a").await;
1704 let client_b = server.create_client(&mut cx_b, "user_b").await;
1705
1706 // Share a project as client A
1707 fs.insert_tree(
1708 "/dir",
1709 json!({
1710 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1711 "a.txt": "a-contents",
1712 }),
1713 )
1714 .await;
1715
1716 let project_a = cx_a.update(|cx| {
1717 Project::local(
1718 client_a.clone(),
1719 client_a.user_store.clone(),
1720 lang_registry.clone(),
1721 fs.clone(),
1722 cx,
1723 )
1724 });
1725 let (worktree_a, _) = project_a
1726 .update(&mut cx_a, |p, cx| {
1727 p.find_or_create_local_worktree("/dir", false, cx)
1728 })
1729 .await
1730 .unwrap();
1731 worktree_a
1732 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1733 .await;
1734 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1735 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1736 project_a
1737 .update(&mut cx_a, |p, cx| p.share(cx))
1738 .await
1739 .unwrap();
1740
1741 // Join that project as client B
1742 let project_b = Project::remote(
1743 project_id,
1744 client_b.clone(),
1745 client_b.user_store.clone(),
1746 lang_registry.clone(),
1747 fs.clone(),
1748 &mut cx_b.to_async(),
1749 )
1750 .await
1751 .unwrap();
1752 let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1753
1754 // Open a buffer as client B
1755 let buffer_b = project_b
1756 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1757 .await
1758 .unwrap();
1759 buffer_b.read_with(&cx_b, |buf, _| {
1760 assert!(!buf.is_dirty());
1761 assert!(!buf.has_conflict());
1762 });
1763
1764 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1765 .await
1766 .unwrap();
1767 buffer_b
1768 .condition(&cx_b, |buf, _| {
1769 buf.text() == "new contents" && !buf.is_dirty()
1770 })
1771 .await;
1772 buffer_b.read_with(&cx_b, |buf, _| {
1773 assert!(!buf.has_conflict());
1774 });
1775 }
1776
1777 #[gpui::test(iterations = 10)]
1778 async fn test_editing_while_guest_opens_buffer(
1779 mut cx_a: TestAppContext,
1780 mut cx_b: TestAppContext,
1781 ) {
1782 cx_a.foreground().forbid_parking();
1783 let lang_registry = Arc::new(LanguageRegistry::new());
1784 let fs = FakeFs::new(cx_a.background());
1785
1786 // Connect to a server as 2 clients.
1787 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1788 let client_a = server.create_client(&mut cx_a, "user_a").await;
1789 let client_b = server.create_client(&mut cx_b, "user_b").await;
1790
1791 // Share a project as client A
1792 fs.insert_tree(
1793 "/dir",
1794 json!({
1795 ".zed.toml": r#"collaborators = ["user_b"]"#,
1796 "a.txt": "a-contents",
1797 }),
1798 )
1799 .await;
1800 let project_a = cx_a.update(|cx| {
1801 Project::local(
1802 client_a.clone(),
1803 client_a.user_store.clone(),
1804 lang_registry.clone(),
1805 fs.clone(),
1806 cx,
1807 )
1808 });
1809 let (worktree_a, _) = project_a
1810 .update(&mut cx_a, |p, cx| {
1811 p.find_or_create_local_worktree("/dir", false, cx)
1812 })
1813 .await
1814 .unwrap();
1815 worktree_a
1816 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1817 .await;
1818 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1819 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1820 project_a
1821 .update(&mut cx_a, |p, cx| p.share(cx))
1822 .await
1823 .unwrap();
1824
1825 // Join that project as client B
1826 let project_b = Project::remote(
1827 project_id,
1828 client_b.clone(),
1829 client_b.user_store.clone(),
1830 lang_registry.clone(),
1831 fs.clone(),
1832 &mut cx_b.to_async(),
1833 )
1834 .await
1835 .unwrap();
1836
1837 // Open a buffer as client A
1838 let buffer_a = project_a
1839 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1840 .await
1841 .unwrap();
1842
1843 // Start opening the same buffer as client B
1844 let buffer_b = cx_b
1845 .background()
1846 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1847
1848 // Edit the buffer as client A while client B is still opening it.
1849 cx_b.background().simulate_random_delay().await;
1850 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1851 cx_b.background().simulate_random_delay().await;
1852 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1853
1854 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1855 let buffer_b = buffer_b.await.unwrap();
1856 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1857 }
1858
1859 #[gpui::test(iterations = 10)]
1860 async fn test_leaving_worktree_while_opening_buffer(
1861 mut cx_a: TestAppContext,
1862 mut cx_b: TestAppContext,
1863 ) {
1864 cx_a.foreground().forbid_parking();
1865 let lang_registry = Arc::new(LanguageRegistry::new());
1866 let fs = FakeFs::new(cx_a.background());
1867
1868 // Connect to a server as 2 clients.
1869 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1870 let client_a = server.create_client(&mut cx_a, "user_a").await;
1871 let client_b = server.create_client(&mut cx_b, "user_b").await;
1872
1873 // Share a project as client A
1874 fs.insert_tree(
1875 "/dir",
1876 json!({
1877 ".zed.toml": r#"collaborators = ["user_b"]"#,
1878 "a.txt": "a-contents",
1879 }),
1880 )
1881 .await;
1882 let project_a = cx_a.update(|cx| {
1883 Project::local(
1884 client_a.clone(),
1885 client_a.user_store.clone(),
1886 lang_registry.clone(),
1887 fs.clone(),
1888 cx,
1889 )
1890 });
1891 let (worktree_a, _) = project_a
1892 .update(&mut cx_a, |p, cx| {
1893 p.find_or_create_local_worktree("/dir", false, cx)
1894 })
1895 .await
1896 .unwrap();
1897 worktree_a
1898 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1899 .await;
1900 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1901 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1902 project_a
1903 .update(&mut cx_a, |p, cx| p.share(cx))
1904 .await
1905 .unwrap();
1906
1907 // Join that project as client B
1908 let project_b = Project::remote(
1909 project_id,
1910 client_b.clone(),
1911 client_b.user_store.clone(),
1912 lang_registry.clone(),
1913 fs.clone(),
1914 &mut cx_b.to_async(),
1915 )
1916 .await
1917 .unwrap();
1918
1919 // See that a guest has joined as client A.
1920 project_a
1921 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1922 .await;
1923
1924 // Begin opening a buffer as client B, but leave the project before the open completes.
1925 let buffer_b = cx_b
1926 .background()
1927 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1928 cx_b.update(|_| drop(project_b));
1929 drop(buffer_b);
1930
1931 // See that the guest has left.
1932 project_a
1933 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1934 .await;
1935 }
1936
1937 #[gpui::test(iterations = 10)]
1938 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1939 cx_a.foreground().forbid_parking();
1940 let lang_registry = Arc::new(LanguageRegistry::new());
1941 let fs = FakeFs::new(cx_a.background());
1942
1943 // Connect to a server as 2 clients.
1944 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1945 let client_a = server.create_client(&mut cx_a, "user_a").await;
1946 let client_b = server.create_client(&mut cx_b, "user_b").await;
1947
1948 // Share a project as client A
1949 fs.insert_tree(
1950 "/a",
1951 json!({
1952 ".zed.toml": r#"collaborators = ["user_b"]"#,
1953 "a.txt": "a-contents",
1954 "b.txt": "b-contents",
1955 }),
1956 )
1957 .await;
1958 let project_a = cx_a.update(|cx| {
1959 Project::local(
1960 client_a.clone(),
1961 client_a.user_store.clone(),
1962 lang_registry.clone(),
1963 fs.clone(),
1964 cx,
1965 )
1966 });
1967 let (worktree_a, _) = project_a
1968 .update(&mut cx_a, |p, cx| {
1969 p.find_or_create_local_worktree("/a", false, cx)
1970 })
1971 .await
1972 .unwrap();
1973 worktree_a
1974 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1975 .await;
1976 let project_id = project_a
1977 .update(&mut cx_a, |project, _| project.next_remote_id())
1978 .await;
1979 project_a
1980 .update(&mut cx_a, |project, cx| project.share(cx))
1981 .await
1982 .unwrap();
1983
1984 // Join that project as client B
1985 let _project_b = Project::remote(
1986 project_id,
1987 client_b.clone(),
1988 client_b.user_store.clone(),
1989 lang_registry.clone(),
1990 fs.clone(),
1991 &mut cx_b.to_async(),
1992 )
1993 .await
1994 .unwrap();
1995
1996 // See that a guest has joined as client A.
1997 project_a
1998 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1999 .await;
2000
2001 // Drop client B's connection and ensure client A observes client B leaving the worktree.
2002 client_b.disconnect(&cx_b.to_async()).unwrap();
2003 project_a
2004 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2005 .await;
2006 }
2007
2008 #[gpui::test(iterations = 10)]
2009 async fn test_collaborating_with_diagnostics(
2010 mut cx_a: TestAppContext,
2011 mut cx_b: TestAppContext,
2012 ) {
2013 cx_a.foreground().forbid_parking();
2014 let mut lang_registry = Arc::new(LanguageRegistry::new());
2015 let fs = FakeFs::new(cx_a.background());
2016
2017 // Set up a fake language server.
2018 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2019 Arc::get_mut(&mut lang_registry)
2020 .unwrap()
2021 .add(Arc::new(Language::new(
2022 LanguageConfig {
2023 name: "Rust".to_string(),
2024 path_suffixes: vec!["rs".to_string()],
2025 language_server: Some(language_server_config),
2026 ..Default::default()
2027 },
2028 Some(tree_sitter_rust::language()),
2029 )));
2030
2031 // Connect to a server as 2 clients.
2032 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2033 let client_a = server.create_client(&mut cx_a, "user_a").await;
2034 let client_b = server.create_client(&mut cx_b, "user_b").await;
2035
2036 // Share a project as client A
2037 fs.insert_tree(
2038 "/a",
2039 json!({
2040 ".zed.toml": r#"collaborators = ["user_b"]"#,
2041 "a.rs": "let one = two",
2042 "other.rs": "",
2043 }),
2044 )
2045 .await;
2046 let project_a = cx_a.update(|cx| {
2047 Project::local(
2048 client_a.clone(),
2049 client_a.user_store.clone(),
2050 lang_registry.clone(),
2051 fs.clone(),
2052 cx,
2053 )
2054 });
2055 let (worktree_a, _) = project_a
2056 .update(&mut cx_a, |p, cx| {
2057 p.find_or_create_local_worktree("/a", false, cx)
2058 })
2059 .await
2060 .unwrap();
2061 worktree_a
2062 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2063 .await;
2064 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2065 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2066 project_a
2067 .update(&mut cx_a, |p, cx| p.share(cx))
2068 .await
2069 .unwrap();
2070
2071 // Cause the language server to start.
2072 let _ = cx_a
2073 .background()
2074 .spawn(project_a.update(&mut cx_a, |project, cx| {
2075 project.open_buffer(
2076 ProjectPath {
2077 worktree_id,
2078 path: Path::new("other.rs").into(),
2079 },
2080 cx,
2081 )
2082 }))
2083 .await
2084 .unwrap();
2085
2086 // Simulate a language server reporting errors for a file.
2087 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2088 fake_language_server
2089 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2090 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2091 version: None,
2092 diagnostics: vec![lsp::Diagnostic {
2093 severity: Some(lsp::DiagnosticSeverity::ERROR),
2094 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2095 message: "message 1".to_string(),
2096 ..Default::default()
2097 }],
2098 })
2099 .await;
2100
2101 // Wait for server to see the diagnostics update.
2102 server
2103 .condition(|store| {
2104 let worktree = store
2105 .project(project_id)
2106 .unwrap()
2107 .worktrees
2108 .get(&worktree_id.to_proto())
2109 .unwrap();
2110
2111 !worktree
2112 .share
2113 .as_ref()
2114 .unwrap()
2115 .diagnostic_summaries
2116 .is_empty()
2117 })
2118 .await;
2119
2120 // Join the worktree as client B.
2121 let project_b = Project::remote(
2122 project_id,
2123 client_b.clone(),
2124 client_b.user_store.clone(),
2125 lang_registry.clone(),
2126 fs.clone(),
2127 &mut cx_b.to_async(),
2128 )
2129 .await
2130 .unwrap();
2131
2132 project_b.read_with(&cx_b, |project, cx| {
2133 assert_eq!(
2134 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2135 &[(
2136 ProjectPath {
2137 worktree_id,
2138 path: Arc::from(Path::new("a.rs")),
2139 },
2140 DiagnosticSummary {
2141 error_count: 1,
2142 warning_count: 0,
2143 ..Default::default()
2144 },
2145 )]
2146 )
2147 });
2148
2149 // Simulate a language server reporting more errors for a file.
2150 fake_language_server
2151 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2152 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2153 version: None,
2154 diagnostics: vec![
2155 lsp::Diagnostic {
2156 severity: Some(lsp::DiagnosticSeverity::ERROR),
2157 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2158 message: "message 1".to_string(),
2159 ..Default::default()
2160 },
2161 lsp::Diagnostic {
2162 severity: Some(lsp::DiagnosticSeverity::WARNING),
2163 range: lsp::Range::new(
2164 lsp::Position::new(0, 10),
2165 lsp::Position::new(0, 13),
2166 ),
2167 message: "message 2".to_string(),
2168 ..Default::default()
2169 },
2170 ],
2171 })
2172 .await;
2173
2174 // Client b gets the updated summaries
2175 project_b
2176 .condition(&cx_b, |project, cx| {
2177 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2178 == &[(
2179 ProjectPath {
2180 worktree_id,
2181 path: Arc::from(Path::new("a.rs")),
2182 },
2183 DiagnosticSummary {
2184 error_count: 1,
2185 warning_count: 1,
2186 ..Default::default()
2187 },
2188 )]
2189 })
2190 .await;
2191
2192 // Open the file with the errors on client B. They should be present.
2193 let buffer_b = cx_b
2194 .background()
2195 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2196 .await
2197 .unwrap();
2198
2199 buffer_b.read_with(&cx_b, |buffer, _| {
2200 assert_eq!(
2201 buffer
2202 .snapshot()
2203 .diagnostics_in_range::<_, Point>(0..buffer.len())
2204 .map(|entry| entry)
2205 .collect::<Vec<_>>(),
2206 &[
2207 DiagnosticEntry {
2208 range: Point::new(0, 4)..Point::new(0, 7),
2209 diagnostic: Diagnostic {
2210 group_id: 0,
2211 message: "message 1".to_string(),
2212 severity: lsp::DiagnosticSeverity::ERROR,
2213 is_primary: true,
2214 ..Default::default()
2215 }
2216 },
2217 DiagnosticEntry {
2218 range: Point::new(0, 10)..Point::new(0, 13),
2219 diagnostic: Diagnostic {
2220 group_id: 1,
2221 severity: lsp::DiagnosticSeverity::WARNING,
2222 message: "message 2".to_string(),
2223 is_primary: true,
2224 ..Default::default()
2225 }
2226 }
2227 ]
2228 );
2229 });
2230 }
2231
2232 #[gpui::test(iterations = 10)]
2233 async fn test_collaborating_with_completion(
2234 mut cx_a: TestAppContext,
2235 mut cx_b: TestAppContext,
2236 ) {
2237 cx_a.foreground().forbid_parking();
2238 let mut lang_registry = Arc::new(LanguageRegistry::new());
2239 let fs = FakeFs::new(cx_a.background());
2240
2241 // Set up a fake language server.
2242 let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2243 language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2244 completion_provider: Some(lsp::CompletionOptions {
2245 trigger_characters: Some(vec![".".to_string()]),
2246 ..Default::default()
2247 }),
2248 ..Default::default()
2249 });
2250 Arc::get_mut(&mut lang_registry)
2251 .unwrap()
2252 .add(Arc::new(Language::new(
2253 LanguageConfig {
2254 name: "Rust".to_string(),
2255 path_suffixes: vec!["rs".to_string()],
2256 language_server: Some(language_server_config),
2257 ..Default::default()
2258 },
2259 Some(tree_sitter_rust::language()),
2260 )));
2261
2262 // Connect to a server as 2 clients.
2263 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2264 let client_a = server.create_client(&mut cx_a, "user_a").await;
2265 let client_b = server.create_client(&mut cx_b, "user_b").await;
2266
2267 // Share a project as client A
2268 fs.insert_tree(
2269 "/a",
2270 json!({
2271 ".zed.toml": r#"collaborators = ["user_b"]"#,
2272 "main.rs": "fn main() { a }",
2273 "other.rs": "",
2274 }),
2275 )
2276 .await;
2277 let project_a = cx_a.update(|cx| {
2278 Project::local(
2279 client_a.clone(),
2280 client_a.user_store.clone(),
2281 lang_registry.clone(),
2282 fs.clone(),
2283 cx,
2284 )
2285 });
2286 let (worktree_a, _) = project_a
2287 .update(&mut cx_a, |p, cx| {
2288 p.find_or_create_local_worktree("/a", false, cx)
2289 })
2290 .await
2291 .unwrap();
2292 worktree_a
2293 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2294 .await;
2295 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2296 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2297 project_a
2298 .update(&mut cx_a, |p, cx| p.share(cx))
2299 .await
2300 .unwrap();
2301
2302 // Join the worktree as client B.
2303 let project_b = Project::remote(
2304 project_id,
2305 client_b.clone(),
2306 client_b.user_store.clone(),
2307 lang_registry.clone(),
2308 fs.clone(),
2309 &mut cx_b.to_async(),
2310 )
2311 .await
2312 .unwrap();
2313
2314 // Open a file in an editor as the guest.
2315 let buffer_b = project_b
2316 .update(&mut cx_b, |p, cx| {
2317 p.open_buffer((worktree_id, "main.rs"), cx)
2318 })
2319 .await
2320 .unwrap();
2321 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2322 let editor_b = cx_b.add_view(window_b, |cx| {
2323 Editor::for_buffer(
2324 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2325 Arc::new(|cx| EditorSettings::test(cx)),
2326 Some(project_b.clone()),
2327 cx,
2328 )
2329 });
2330
2331 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2332 buffer_b
2333 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2334 .await;
2335
2336 // Type a completion trigger character as the guest.
2337 editor_b.update(&mut cx_b, |editor, cx| {
2338 editor.select_ranges([13..13], None, cx);
2339 editor.handle_input(&Input(".".into()), cx);
2340 cx.focus(&editor_b);
2341 });
2342
2343 // Receive a completion request as the host's language server.
2344 // Return some completions from the host's language server.
2345 cx_a.foreground().start_waiting();
2346 fake_language_server
2347 .handle_request::<lsp::request::Completion, _>(|params| {
2348 assert_eq!(
2349 params.text_document_position.text_document.uri,
2350 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2351 );
2352 assert_eq!(
2353 params.text_document_position.position,
2354 lsp::Position::new(0, 14),
2355 );
2356
2357 Some(lsp::CompletionResponse::Array(vec![
2358 lsp::CompletionItem {
2359 label: "first_method(…)".into(),
2360 detail: Some("fn(&mut self, B) -> C".into()),
2361 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2362 new_text: "first_method($1)".to_string(),
2363 range: lsp::Range::new(
2364 lsp::Position::new(0, 14),
2365 lsp::Position::new(0, 14),
2366 ),
2367 })),
2368 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2369 ..Default::default()
2370 },
2371 lsp::CompletionItem {
2372 label: "second_method(…)".into(),
2373 detail: Some("fn(&mut self, C) -> D<E>".into()),
2374 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2375 new_text: "second_method()".to_string(),
2376 range: lsp::Range::new(
2377 lsp::Position::new(0, 14),
2378 lsp::Position::new(0, 14),
2379 ),
2380 })),
2381 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2382 ..Default::default()
2383 },
2384 ]))
2385 })
2386 .next()
2387 .await
2388 .unwrap();
2389 cx_a.foreground().finish_waiting();
2390
2391 // Open the buffer on the host.
2392 let buffer_a = project_a
2393 .update(&mut cx_a, |p, cx| {
2394 p.open_buffer((worktree_id, "main.rs"), cx)
2395 })
2396 .await
2397 .unwrap();
2398 buffer_a
2399 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2400 .await;
2401
2402 // Confirm a completion on the guest.
2403 editor_b
2404 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2405 .await;
2406 editor_b.update(&mut cx_b, |editor, cx| {
2407 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2408 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2409 });
2410
2411 // Return a resolved completion from the host's language server.
2412 // The resolved completion has an additional text edit.
2413 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2414 assert_eq!(params.label, "first_method(…)");
2415 lsp::CompletionItem {
2416 label: "first_method(…)".into(),
2417 detail: Some("fn(&mut self, B) -> C".into()),
2418 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2419 new_text: "first_method($1)".to_string(),
2420 range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2421 })),
2422 additional_text_edits: Some(vec![lsp::TextEdit {
2423 new_text: "use d::SomeTrait;\n".to_string(),
2424 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2425 }]),
2426 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2427 ..Default::default()
2428 }
2429 });
2430
2431 // The additional edit is applied.
2432 buffer_a
2433 .condition(&cx_a, |buffer, _| {
2434 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2435 })
2436 .await;
2437 buffer_b
2438 .condition(&cx_b, |buffer, _| {
2439 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2440 })
2441 .await;
2442 }
2443
2444 #[gpui::test(iterations = 10)]
2445 async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2446 cx_a.foreground().forbid_parking();
2447 let mut lang_registry = Arc::new(LanguageRegistry::new());
2448 let fs = FakeFs::new(cx_a.background());
2449
2450 // Set up a fake language server.
2451 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2452 Arc::get_mut(&mut lang_registry)
2453 .unwrap()
2454 .add(Arc::new(Language::new(
2455 LanguageConfig {
2456 name: "Rust".to_string(),
2457 path_suffixes: vec!["rs".to_string()],
2458 language_server: Some(language_server_config),
2459 ..Default::default()
2460 },
2461 Some(tree_sitter_rust::language()),
2462 )));
2463
2464 // Connect to a server as 2 clients.
2465 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2466 let client_a = server.create_client(&mut cx_a, "user_a").await;
2467 let client_b = server.create_client(&mut cx_b, "user_b").await;
2468
2469 // Share a project as client A
2470 fs.insert_tree(
2471 "/a",
2472 json!({
2473 ".zed.toml": r#"collaborators = ["user_b"]"#,
2474 "a.rs": "let one = two",
2475 }),
2476 )
2477 .await;
2478 let project_a = cx_a.update(|cx| {
2479 Project::local(
2480 client_a.clone(),
2481 client_a.user_store.clone(),
2482 lang_registry.clone(),
2483 fs.clone(),
2484 cx,
2485 )
2486 });
2487 let (worktree_a, _) = project_a
2488 .update(&mut cx_a, |p, cx| {
2489 p.find_or_create_local_worktree("/a", false, cx)
2490 })
2491 .await
2492 .unwrap();
2493 worktree_a
2494 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2495 .await;
2496 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2497 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2498 project_a
2499 .update(&mut cx_a, |p, cx| p.share(cx))
2500 .await
2501 .unwrap();
2502
2503 // Join the worktree as client B.
2504 let project_b = Project::remote(
2505 project_id,
2506 client_b.clone(),
2507 client_b.user_store.clone(),
2508 lang_registry.clone(),
2509 fs.clone(),
2510 &mut cx_b.to_async(),
2511 )
2512 .await
2513 .unwrap();
2514
2515 let buffer_b = cx_b
2516 .background()
2517 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2518 .await
2519 .unwrap();
2520
2521 let format = project_b.update(&mut cx_b, |project, cx| {
2522 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2523 });
2524
2525 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2526 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2527 Some(vec![
2528 lsp::TextEdit {
2529 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2530 new_text: "h".to_string(),
2531 },
2532 lsp::TextEdit {
2533 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2534 new_text: "y".to_string(),
2535 },
2536 ])
2537 });
2538
2539 format.await.unwrap();
2540 assert_eq!(
2541 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2542 "let honey = two"
2543 );
2544 }
2545
2546 #[gpui::test(iterations = 10)]
2547 async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2548 cx_a.foreground().forbid_parking();
2549 let mut lang_registry = Arc::new(LanguageRegistry::new());
2550 let fs = FakeFs::new(cx_a.background());
2551 fs.insert_tree(
2552 "/root-1",
2553 json!({
2554 ".zed.toml": r#"collaborators = ["user_b"]"#,
2555 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2556 }),
2557 )
2558 .await;
2559 fs.insert_tree(
2560 "/root-2",
2561 json!({
2562 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2563 }),
2564 )
2565 .await;
2566
2567 // Set up a fake language server.
2568 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2569 Arc::get_mut(&mut lang_registry)
2570 .unwrap()
2571 .add(Arc::new(Language::new(
2572 LanguageConfig {
2573 name: "Rust".to_string(),
2574 path_suffixes: vec!["rs".to_string()],
2575 language_server: Some(language_server_config),
2576 ..Default::default()
2577 },
2578 Some(tree_sitter_rust::language()),
2579 )));
2580
2581 // Connect to a server as 2 clients.
2582 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2583 let client_a = server.create_client(&mut cx_a, "user_a").await;
2584 let client_b = server.create_client(&mut cx_b, "user_b").await;
2585
2586 // Share a project as client A
2587 let project_a = cx_a.update(|cx| {
2588 Project::local(
2589 client_a.clone(),
2590 client_a.user_store.clone(),
2591 lang_registry.clone(),
2592 fs.clone(),
2593 cx,
2594 )
2595 });
2596 let (worktree_a, _) = project_a
2597 .update(&mut cx_a, |p, cx| {
2598 p.find_or_create_local_worktree("/root-1", false, cx)
2599 })
2600 .await
2601 .unwrap();
2602 worktree_a
2603 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2604 .await;
2605 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2606 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2607 project_a
2608 .update(&mut cx_a, |p, cx| p.share(cx))
2609 .await
2610 .unwrap();
2611
2612 // Join the worktree as client B.
2613 let project_b = Project::remote(
2614 project_id,
2615 client_b.clone(),
2616 client_b.user_store.clone(),
2617 lang_registry.clone(),
2618 fs.clone(),
2619 &mut cx_b.to_async(),
2620 )
2621 .await
2622 .unwrap();
2623
2624 // Open the file on client B.
2625 let buffer_b = cx_b
2626 .background()
2627 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2628 .await
2629 .unwrap();
2630
2631 // Request the definition of a symbol as the guest.
2632 let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2633
2634 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2635 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2636 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2637 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2638 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2639 )))
2640 });
2641
2642 let definitions_1 = definitions_1.await.unwrap();
2643 cx_b.read(|cx| {
2644 assert_eq!(definitions_1.len(), 1);
2645 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2646 let target_buffer = definitions_1[0].target_buffer.read(cx);
2647 assert_eq!(
2648 target_buffer.text(),
2649 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2650 );
2651 assert_eq!(
2652 definitions_1[0].target_range.to_point(target_buffer),
2653 Point::new(0, 6)..Point::new(0, 9)
2654 );
2655 });
2656
2657 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2658 // the previous call to `definition`.
2659 let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2660 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2661 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2662 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2663 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2664 )))
2665 });
2666
2667 let definitions_2 = definitions_2.await.unwrap();
2668 cx_b.read(|cx| {
2669 assert_eq!(definitions_2.len(), 1);
2670 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2671 let target_buffer = definitions_2[0].target_buffer.read(cx);
2672 assert_eq!(
2673 target_buffer.text(),
2674 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2675 );
2676 assert_eq!(
2677 definitions_2[0].target_range.to_point(target_buffer),
2678 Point::new(1, 6)..Point::new(1, 11)
2679 );
2680 });
2681 assert_eq!(
2682 definitions_1[0].target_buffer,
2683 definitions_2[0].target_buffer
2684 );
2685
2686 cx_b.update(|_| {
2687 drop(definitions_1);
2688 drop(definitions_2);
2689 });
2690 project_b
2691 .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2692 .await;
2693 }
2694
2695 #[gpui::test(iterations = 10)]
2696 async fn test_open_buffer_while_getting_definition_pointing_to_it(
2697 mut cx_a: TestAppContext,
2698 mut cx_b: TestAppContext,
2699 mut rng: StdRng,
2700 ) {
2701 cx_a.foreground().forbid_parking();
2702 let mut lang_registry = Arc::new(LanguageRegistry::new());
2703 let fs = FakeFs::new(cx_a.background());
2704 fs.insert_tree(
2705 "/root",
2706 json!({
2707 ".zed.toml": r#"collaborators = ["user_b"]"#,
2708 "a.rs": "const ONE: usize = b::TWO;",
2709 "b.rs": "const TWO: usize = 2",
2710 }),
2711 )
2712 .await;
2713
2714 // Set up a fake language server.
2715 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2716
2717 Arc::get_mut(&mut lang_registry)
2718 .unwrap()
2719 .add(Arc::new(Language::new(
2720 LanguageConfig {
2721 name: "Rust".to_string(),
2722 path_suffixes: vec!["rs".to_string()],
2723 language_server: Some(language_server_config),
2724 ..Default::default()
2725 },
2726 Some(tree_sitter_rust::language()),
2727 )));
2728
2729 // Connect to a server as 2 clients.
2730 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2731 let client_a = server.create_client(&mut cx_a, "user_a").await;
2732 let client_b = server.create_client(&mut cx_b, "user_b").await;
2733
2734 // Share a project as client A
2735 let project_a = cx_a.update(|cx| {
2736 Project::local(
2737 client_a.clone(),
2738 client_a.user_store.clone(),
2739 lang_registry.clone(),
2740 fs.clone(),
2741 cx,
2742 )
2743 });
2744
2745 let (worktree_a, _) = project_a
2746 .update(&mut cx_a, |p, cx| {
2747 p.find_or_create_local_worktree("/root", false, cx)
2748 })
2749 .await
2750 .unwrap();
2751 worktree_a
2752 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2753 .await;
2754 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2755 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2756 project_a
2757 .update(&mut cx_a, |p, cx| p.share(cx))
2758 .await
2759 .unwrap();
2760
2761 // Join the worktree as client B.
2762 let project_b = Project::remote(
2763 project_id,
2764 client_b.clone(),
2765 client_b.user_store.clone(),
2766 lang_registry.clone(),
2767 fs.clone(),
2768 &mut cx_b.to_async(),
2769 )
2770 .await
2771 .unwrap();
2772
2773 let buffer_b1 = cx_b
2774 .background()
2775 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2776 .await
2777 .unwrap();
2778
2779 let definitions;
2780 let buffer_b2;
2781 if rng.gen() {
2782 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2783 buffer_b2 =
2784 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2785 } else {
2786 buffer_b2 =
2787 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2788 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2789 }
2790
2791 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2792 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2793 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2794 lsp::Url::from_file_path("/root/b.rs").unwrap(),
2795 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2796 )))
2797 });
2798
2799 let buffer_b2 = buffer_b2.await.unwrap();
2800 let definitions = definitions.await.unwrap();
2801 assert_eq!(definitions.len(), 1);
2802 assert_eq!(definitions[0].target_buffer, buffer_b2);
2803 }
2804
2805 #[gpui::test(iterations = 10)]
2806 async fn test_collaborating_with_code_actions(
2807 mut cx_a: TestAppContext,
2808 mut cx_b: TestAppContext,
2809 ) {
2810 cx_a.foreground().forbid_parking();
2811 let mut lang_registry = Arc::new(LanguageRegistry::new());
2812 let fs = FakeFs::new(cx_a.background());
2813 let mut path_openers_b = Vec::new();
2814 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2815
2816 // Set up a fake language server.
2817 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2818 Arc::get_mut(&mut lang_registry)
2819 .unwrap()
2820 .add(Arc::new(Language::new(
2821 LanguageConfig {
2822 name: "Rust".to_string(),
2823 path_suffixes: vec!["rs".to_string()],
2824 language_server: Some(language_server_config),
2825 ..Default::default()
2826 },
2827 Some(tree_sitter_rust::language()),
2828 )));
2829
2830 // Connect to a server as 2 clients.
2831 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2832 let client_a = server.create_client(&mut cx_a, "user_a").await;
2833 let client_b = server.create_client(&mut cx_b, "user_b").await;
2834
2835 // Share a project as client A
2836 fs.insert_tree(
2837 "/a",
2838 json!({
2839 ".zed.toml": r#"collaborators = ["user_b"]"#,
2840 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2841 "other.rs": "pub fn foo() -> usize { 4 }",
2842 }),
2843 )
2844 .await;
2845 let project_a = cx_a.update(|cx| {
2846 Project::local(
2847 client_a.clone(),
2848 client_a.user_store.clone(),
2849 lang_registry.clone(),
2850 fs.clone(),
2851 cx,
2852 )
2853 });
2854 let (worktree_a, _) = project_a
2855 .update(&mut cx_a, |p, cx| {
2856 p.find_or_create_local_worktree("/a", false, cx)
2857 })
2858 .await
2859 .unwrap();
2860 worktree_a
2861 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2862 .await;
2863 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2864 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2865 project_a
2866 .update(&mut cx_a, |p, cx| p.share(cx))
2867 .await
2868 .unwrap();
2869
2870 // Join the worktree as client B.
2871 let project_b = Project::remote(
2872 project_id,
2873 client_b.clone(),
2874 client_b.user_store.clone(),
2875 lang_registry.clone(),
2876 fs.clone(),
2877 &mut cx_b.to_async(),
2878 )
2879 .await
2880 .unwrap();
2881 let mut params = cx_b.update(WorkspaceParams::test);
2882 params.languages = lang_registry.clone();
2883 params.client = client_b.client.clone();
2884 params.user_store = client_b.user_store.clone();
2885 params.project = project_b;
2886 params.path_openers = path_openers_b.into();
2887
2888 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
2889 let editor_b = workspace_b
2890 .update(&mut cx_b, |workspace, cx| {
2891 workspace.open_path((worktree_id, "main.rs").into(), cx)
2892 })
2893 .await
2894 .unwrap()
2895 .downcast::<Editor>()
2896 .unwrap();
2897
2898 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2899 fake_language_server
2900 .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2901 assert_eq!(
2902 params.text_document.uri,
2903 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2904 );
2905 assert_eq!(params.range.start, lsp::Position::new(0, 0));
2906 assert_eq!(params.range.end, lsp::Position::new(0, 0));
2907 None
2908 })
2909 .next()
2910 .await;
2911
2912 // Move cursor to a location that contains code actions.
2913 editor_b.update(&mut cx_b, |editor, cx| {
2914 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2915 cx.focus(&editor_b);
2916 });
2917
2918 fake_language_server
2919 .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2920 assert_eq!(
2921 params.text_document.uri,
2922 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2923 );
2924 assert_eq!(params.range.start, lsp::Position::new(1, 31));
2925 assert_eq!(params.range.end, lsp::Position::new(1, 31));
2926
2927 Some(vec![lsp::CodeActionOrCommand::CodeAction(
2928 lsp::CodeAction {
2929 title: "Inline into all callers".to_string(),
2930 edit: Some(lsp::WorkspaceEdit {
2931 changes: Some(
2932 [
2933 (
2934 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2935 vec![lsp::TextEdit::new(
2936 lsp::Range::new(
2937 lsp::Position::new(1, 22),
2938 lsp::Position::new(1, 34),
2939 ),
2940 "4".to_string(),
2941 )],
2942 ),
2943 (
2944 lsp::Url::from_file_path("/a/other.rs").unwrap(),
2945 vec![lsp::TextEdit::new(
2946 lsp::Range::new(
2947 lsp::Position::new(0, 0),
2948 lsp::Position::new(0, 27),
2949 ),
2950 "".to_string(),
2951 )],
2952 ),
2953 ]
2954 .into_iter()
2955 .collect(),
2956 ),
2957 ..Default::default()
2958 }),
2959 data: Some(json!({
2960 "codeActionParams": {
2961 "range": {
2962 "start": {"line": 1, "column": 31},
2963 "end": {"line": 1, "column": 31},
2964 }
2965 }
2966 })),
2967 ..Default::default()
2968 },
2969 )])
2970 })
2971 .next()
2972 .await;
2973
2974 // Toggle code actions and wait for them to display.
2975 editor_b.update(&mut cx_b, |editor, cx| {
2976 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2977 });
2978 editor_b
2979 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2980 .await;
2981
2982 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
2983
2984 // Confirming the code action will trigger a resolve request.
2985 let confirm_action = workspace_b
2986 .update(&mut cx_b, |workspace, cx| {
2987 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
2988 })
2989 .unwrap();
2990 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
2991 lsp::CodeAction {
2992 title: "Inline into all callers".to_string(),
2993 edit: Some(lsp::WorkspaceEdit {
2994 changes: Some(
2995 [
2996 (
2997 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2998 vec![lsp::TextEdit::new(
2999 lsp::Range::new(
3000 lsp::Position::new(1, 22),
3001 lsp::Position::new(1, 34),
3002 ),
3003 "4".to_string(),
3004 )],
3005 ),
3006 (
3007 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3008 vec![lsp::TextEdit::new(
3009 lsp::Range::new(
3010 lsp::Position::new(0, 0),
3011 lsp::Position::new(0, 27),
3012 ),
3013 "".to_string(),
3014 )],
3015 ),
3016 ]
3017 .into_iter()
3018 .collect(),
3019 ),
3020 ..Default::default()
3021 }),
3022 ..Default::default()
3023 }
3024 });
3025
3026 // After the action is confirmed, an editor containing both modified files is opened.
3027 confirm_action.await.unwrap();
3028 let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3029 workspace
3030 .active_item(cx)
3031 .unwrap()
3032 .downcast::<Editor>()
3033 .unwrap()
3034 });
3035 code_action_editor.update(&mut cx_b, |editor, cx| {
3036 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3037 editor.undo(&Undo, cx);
3038 assert_eq!(
3039 editor.text(cx),
3040 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3041 );
3042 editor.redo(&Redo, cx);
3043 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3044 });
3045 }
3046
3047 #[gpui::test(iterations = 10)]
3048 async fn test_collaborating_with_renames(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3049 cx_a.foreground().forbid_parking();
3050 let mut lang_registry = Arc::new(LanguageRegistry::new());
3051 let fs = FakeFs::new(cx_a.background());
3052 let mut path_openers_b = Vec::new();
3053 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3054
3055 // Set up a fake language server.
3056 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3057 Arc::get_mut(&mut lang_registry)
3058 .unwrap()
3059 .add(Arc::new(Language::new(
3060 LanguageConfig {
3061 name: "Rust".to_string(),
3062 path_suffixes: vec!["rs".to_string()],
3063 language_server: Some(language_server_config),
3064 ..Default::default()
3065 },
3066 Some(tree_sitter_rust::language()),
3067 )));
3068
3069 // Connect to a server as 2 clients.
3070 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3071 let client_a = server.create_client(&mut cx_a, "user_a").await;
3072 let client_b = server.create_client(&mut cx_b, "user_b").await;
3073
3074 // Share a project as client A
3075 fs.insert_tree(
3076 "/dir",
3077 json!({
3078 ".zed.toml": r#"collaborators = ["user_b"]"#,
3079 "one.rs": "const ONE: usize = 1;",
3080 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3081 }),
3082 )
3083 .await;
3084 let project_a = cx_a.update(|cx| {
3085 Project::local(
3086 client_a.clone(),
3087 client_a.user_store.clone(),
3088 lang_registry.clone(),
3089 fs.clone(),
3090 cx,
3091 )
3092 });
3093 let (worktree_a, _) = project_a
3094 .update(&mut cx_a, |p, cx| {
3095 p.find_or_create_local_worktree("/dir", false, cx)
3096 })
3097 .await
3098 .unwrap();
3099 worktree_a
3100 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3101 .await;
3102 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3103 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3104 project_a
3105 .update(&mut cx_a, |p, cx| p.share(cx))
3106 .await
3107 .unwrap();
3108
3109 // Join the worktree as client B.
3110 let project_b = Project::remote(
3111 project_id,
3112 client_b.clone(),
3113 client_b.user_store.clone(),
3114 lang_registry.clone(),
3115 fs.clone(),
3116 &mut cx_b.to_async(),
3117 )
3118 .await
3119 .unwrap();
3120 let mut params = cx_b.update(WorkspaceParams::test);
3121 params.languages = lang_registry.clone();
3122 params.client = client_b.client.clone();
3123 params.user_store = client_b.user_store.clone();
3124 params.project = project_b;
3125 params.path_openers = path_openers_b.into();
3126
3127 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3128 let editor_b = workspace_b
3129 .update(&mut cx_b, |workspace, cx| {
3130 workspace.open_path((worktree_id, "one.rs").into(), cx)
3131 })
3132 .await
3133 .unwrap()
3134 .downcast::<Editor>()
3135 .unwrap();
3136 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3137
3138 // Move cursor to a location that can be renamed.
3139 let prepare_rename = editor_b.update(&mut cx_b, |editor, cx| {
3140 editor.select_ranges([7..7], None, cx);
3141 editor.rename(&Rename, cx).unwrap()
3142 });
3143
3144 fake_language_server
3145 .handle_request::<lsp::request::PrepareRenameRequest, _>(|params| {
3146 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3147 assert_eq!(params.position, lsp::Position::new(0, 7));
3148 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3149 lsp::Position::new(0, 6),
3150 lsp::Position::new(0, 9),
3151 )))
3152 })
3153 .next()
3154 .await
3155 .unwrap();
3156 prepare_rename.await.unwrap();
3157 editor_b.update(&mut cx_b, |editor, cx| {
3158 let rename = editor.pending_rename().unwrap();
3159 let buffer = editor.buffer().read(cx).snapshot(cx);
3160 assert_eq!(
3161 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3162 6..9
3163 );
3164 rename.editor.update(cx, |rename_editor, cx| {
3165 rename_editor.buffer().update(cx, |rename_buffer, cx| {
3166 rename_buffer.edit([0..3], "THREE", cx);
3167 });
3168 });
3169 });
3170
3171 let confirm_rename = workspace_b.update(&mut cx_b, |workspace, cx| {
3172 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3173 });
3174 fake_language_server
3175 .handle_request::<lsp::request::Rename, _>(|params| {
3176 assert_eq!(
3177 params.text_document_position.text_document.uri.as_str(),
3178 "file:///dir/one.rs"
3179 );
3180 assert_eq!(
3181 params.text_document_position.position,
3182 lsp::Position::new(0, 6)
3183 );
3184 assert_eq!(params.new_name, "THREE");
3185 Some(lsp::WorkspaceEdit {
3186 changes: Some(
3187 [
3188 (
3189 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3190 vec![lsp::TextEdit::new(
3191 lsp::Range::new(
3192 lsp::Position::new(0, 6),
3193 lsp::Position::new(0, 9),
3194 ),
3195 "THREE".to_string(),
3196 )],
3197 ),
3198 (
3199 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3200 vec![
3201 lsp::TextEdit::new(
3202 lsp::Range::new(
3203 lsp::Position::new(0, 24),
3204 lsp::Position::new(0, 27),
3205 ),
3206 "THREE".to_string(),
3207 ),
3208 lsp::TextEdit::new(
3209 lsp::Range::new(
3210 lsp::Position::new(0, 35),
3211 lsp::Position::new(0, 38),
3212 ),
3213 "THREE".to_string(),
3214 ),
3215 ],
3216 ),
3217 ]
3218 .into_iter()
3219 .collect(),
3220 ),
3221 ..Default::default()
3222 })
3223 })
3224 .next()
3225 .await
3226 .unwrap();
3227 confirm_rename.await.unwrap();
3228
3229 let rename_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3230 workspace
3231 .active_item(cx)
3232 .unwrap()
3233 .downcast::<Editor>()
3234 .unwrap()
3235 });
3236 rename_editor.update(&mut cx_b, |editor, cx| {
3237 assert_eq!(
3238 editor.text(cx),
3239 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3240 );
3241 editor.undo(&Undo, cx);
3242 assert_eq!(
3243 editor.text(cx),
3244 "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3245 );
3246 editor.redo(&Redo, cx);
3247 assert_eq!(
3248 editor.text(cx),
3249 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3250 );
3251 });
3252
3253 // Ensure temporary rename edits cannot be undone/redone.
3254 editor_b.update(&mut cx_b, |editor, cx| {
3255 editor.undo(&Undo, cx);
3256 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3257 editor.undo(&Undo, cx);
3258 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3259 editor.redo(&Redo, cx);
3260 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3261 })
3262 }
3263
3264 #[gpui::test(iterations = 10)]
3265 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3266 cx_a.foreground().forbid_parking();
3267
3268 // Connect to a server as 2 clients.
3269 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3270 let client_a = server.create_client(&mut cx_a, "user_a").await;
3271 let client_b = server.create_client(&mut cx_b, "user_b").await;
3272
3273 // Create an org that includes these 2 users.
3274 let db = &server.app_state.db;
3275 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3276 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3277 .await
3278 .unwrap();
3279 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3280 .await
3281 .unwrap();
3282
3283 // Create a channel that includes all the users.
3284 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3285 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3286 .await
3287 .unwrap();
3288 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3289 .await
3290 .unwrap();
3291 db.create_channel_message(
3292 channel_id,
3293 client_b.current_user_id(&cx_b),
3294 "hello A, it's B.",
3295 OffsetDateTime::now_utc(),
3296 1,
3297 )
3298 .await
3299 .unwrap();
3300
3301 let channels_a = cx_a
3302 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3303 channels_a
3304 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3305 .await;
3306 channels_a.read_with(&cx_a, |list, _| {
3307 assert_eq!(
3308 list.available_channels().unwrap(),
3309 &[ChannelDetails {
3310 id: channel_id.to_proto(),
3311 name: "test-channel".to_string()
3312 }]
3313 )
3314 });
3315 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3316 this.get_channel(channel_id.to_proto(), cx).unwrap()
3317 });
3318 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3319 channel_a
3320 .condition(&cx_a, |channel, _| {
3321 channel_messages(channel)
3322 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3323 })
3324 .await;
3325
3326 let channels_b = cx_b
3327 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3328 channels_b
3329 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3330 .await;
3331 channels_b.read_with(&cx_b, |list, _| {
3332 assert_eq!(
3333 list.available_channels().unwrap(),
3334 &[ChannelDetails {
3335 id: channel_id.to_proto(),
3336 name: "test-channel".to_string()
3337 }]
3338 )
3339 });
3340
3341 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3342 this.get_channel(channel_id.to_proto(), cx).unwrap()
3343 });
3344 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3345 channel_b
3346 .condition(&cx_b, |channel, _| {
3347 channel_messages(channel)
3348 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3349 })
3350 .await;
3351
3352 channel_a
3353 .update(&mut cx_a, |channel, cx| {
3354 channel
3355 .send_message("oh, hi B.".to_string(), cx)
3356 .unwrap()
3357 .detach();
3358 let task = channel.send_message("sup".to_string(), cx).unwrap();
3359 assert_eq!(
3360 channel_messages(channel),
3361 &[
3362 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3363 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3364 ("user_a".to_string(), "sup".to_string(), true)
3365 ]
3366 );
3367 task
3368 })
3369 .await
3370 .unwrap();
3371
3372 channel_b
3373 .condition(&cx_b, |channel, _| {
3374 channel_messages(channel)
3375 == [
3376 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3377 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3378 ("user_a".to_string(), "sup".to_string(), false),
3379 ]
3380 })
3381 .await;
3382
3383 assert_eq!(
3384 server
3385 .state()
3386 .await
3387 .channel(channel_id)
3388 .unwrap()
3389 .connection_ids
3390 .len(),
3391 2
3392 );
3393 cx_b.update(|_| drop(channel_b));
3394 server
3395 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3396 .await;
3397
3398 cx_a.update(|_| drop(channel_a));
3399 server
3400 .condition(|state| state.channel(channel_id).is_none())
3401 .await;
3402 }
3403
3404 #[gpui::test(iterations = 10)]
3405 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3406 cx_a.foreground().forbid_parking();
3407
3408 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3409 let client_a = server.create_client(&mut cx_a, "user_a").await;
3410
3411 let db = &server.app_state.db;
3412 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3413 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3414 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3415 .await
3416 .unwrap();
3417 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3418 .await
3419 .unwrap();
3420
3421 let channels_a = cx_a
3422 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3423 channels_a
3424 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3425 .await;
3426 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3427 this.get_channel(channel_id.to_proto(), cx).unwrap()
3428 });
3429
3430 // Messages aren't allowed to be too long.
3431 channel_a
3432 .update(&mut cx_a, |channel, cx| {
3433 let long_body = "this is long.\n".repeat(1024);
3434 channel.send_message(long_body, cx).unwrap()
3435 })
3436 .await
3437 .unwrap_err();
3438
3439 // Messages aren't allowed to be blank.
3440 channel_a.update(&mut cx_a, |channel, cx| {
3441 channel.send_message(String::new(), cx).unwrap_err()
3442 });
3443
3444 // Leading and trailing whitespace are trimmed.
3445 channel_a
3446 .update(&mut cx_a, |channel, cx| {
3447 channel
3448 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3449 .unwrap()
3450 })
3451 .await
3452 .unwrap();
3453 assert_eq!(
3454 db.get_channel_messages(channel_id, 10, None)
3455 .await
3456 .unwrap()
3457 .iter()
3458 .map(|m| &m.body)
3459 .collect::<Vec<_>>(),
3460 &["surrounded by whitespace"]
3461 );
3462 }
3463
3464 #[gpui::test(iterations = 10)]
3465 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3466 cx_a.foreground().forbid_parking();
3467
3468 // Connect to a server as 2 clients.
3469 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3470 let client_a = server.create_client(&mut cx_a, "user_a").await;
3471 let client_b = server.create_client(&mut cx_b, "user_b").await;
3472 let mut status_b = client_b.status();
3473
3474 // Create an org that includes these 2 users.
3475 let db = &server.app_state.db;
3476 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3477 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3478 .await
3479 .unwrap();
3480 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3481 .await
3482 .unwrap();
3483
3484 // Create a channel that includes all the users.
3485 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3486 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3487 .await
3488 .unwrap();
3489 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3490 .await
3491 .unwrap();
3492 db.create_channel_message(
3493 channel_id,
3494 client_b.current_user_id(&cx_b),
3495 "hello A, it's B.",
3496 OffsetDateTime::now_utc(),
3497 2,
3498 )
3499 .await
3500 .unwrap();
3501
3502 let channels_a = cx_a
3503 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3504 channels_a
3505 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3506 .await;
3507
3508 channels_a.read_with(&cx_a, |list, _| {
3509 assert_eq!(
3510 list.available_channels().unwrap(),
3511 &[ChannelDetails {
3512 id: channel_id.to_proto(),
3513 name: "test-channel".to_string()
3514 }]
3515 )
3516 });
3517 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3518 this.get_channel(channel_id.to_proto(), cx).unwrap()
3519 });
3520 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3521 channel_a
3522 .condition(&cx_a, |channel, _| {
3523 channel_messages(channel)
3524 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3525 })
3526 .await;
3527
3528 let channels_b = cx_b
3529 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3530 channels_b
3531 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3532 .await;
3533 channels_b.read_with(&cx_b, |list, _| {
3534 assert_eq!(
3535 list.available_channels().unwrap(),
3536 &[ChannelDetails {
3537 id: channel_id.to_proto(),
3538 name: "test-channel".to_string()
3539 }]
3540 )
3541 });
3542
3543 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3544 this.get_channel(channel_id.to_proto(), cx).unwrap()
3545 });
3546 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3547 channel_b
3548 .condition(&cx_b, |channel, _| {
3549 channel_messages(channel)
3550 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3551 })
3552 .await;
3553
3554 // Disconnect client B, ensuring we can still access its cached channel data.
3555 server.forbid_connections();
3556 server.disconnect_client(client_b.current_user_id(&cx_b));
3557 while !matches!(
3558 status_b.next().await,
3559 Some(client::Status::ReconnectionError { .. })
3560 ) {}
3561
3562 channels_b.read_with(&cx_b, |channels, _| {
3563 assert_eq!(
3564 channels.available_channels().unwrap(),
3565 [ChannelDetails {
3566 id: channel_id.to_proto(),
3567 name: "test-channel".to_string()
3568 }]
3569 )
3570 });
3571 channel_b.read_with(&cx_b, |channel, _| {
3572 assert_eq!(
3573 channel_messages(channel),
3574 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3575 )
3576 });
3577
3578 // Send a message from client B while it is disconnected.
3579 channel_b
3580 .update(&mut cx_b, |channel, cx| {
3581 let task = channel
3582 .send_message("can you see this?".to_string(), cx)
3583 .unwrap();
3584 assert_eq!(
3585 channel_messages(channel),
3586 &[
3587 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3588 ("user_b".to_string(), "can you see this?".to_string(), true)
3589 ]
3590 );
3591 task
3592 })
3593 .await
3594 .unwrap_err();
3595
3596 // Send a message from client A while B is disconnected.
3597 channel_a
3598 .update(&mut cx_a, |channel, cx| {
3599 channel
3600 .send_message("oh, hi B.".to_string(), cx)
3601 .unwrap()
3602 .detach();
3603 let task = channel.send_message("sup".to_string(), cx).unwrap();
3604 assert_eq!(
3605 channel_messages(channel),
3606 &[
3607 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3608 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3609 ("user_a".to_string(), "sup".to_string(), true)
3610 ]
3611 );
3612 task
3613 })
3614 .await
3615 .unwrap();
3616
3617 // Give client B a chance to reconnect.
3618 server.allow_connections();
3619 cx_b.foreground().advance_clock(Duration::from_secs(10));
3620
3621 // Verify that B sees the new messages upon reconnection, as well as the message client B
3622 // sent while offline.
3623 channel_b
3624 .condition(&cx_b, |channel, _| {
3625 channel_messages(channel)
3626 == [
3627 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3628 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3629 ("user_a".to_string(), "sup".to_string(), false),
3630 ("user_b".to_string(), "can you see this?".to_string(), false),
3631 ]
3632 })
3633 .await;
3634
3635 // Ensure client A and B can communicate normally after reconnection.
3636 channel_a
3637 .update(&mut cx_a, |channel, cx| {
3638 channel.send_message("you online?".to_string(), cx).unwrap()
3639 })
3640 .await
3641 .unwrap();
3642 channel_b
3643 .condition(&cx_b, |channel, _| {
3644 channel_messages(channel)
3645 == [
3646 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3647 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3648 ("user_a".to_string(), "sup".to_string(), false),
3649 ("user_b".to_string(), "can you see this?".to_string(), false),
3650 ("user_a".to_string(), "you online?".to_string(), false),
3651 ]
3652 })
3653 .await;
3654
3655 channel_b
3656 .update(&mut cx_b, |channel, cx| {
3657 channel.send_message("yep".to_string(), cx).unwrap()
3658 })
3659 .await
3660 .unwrap();
3661 channel_a
3662 .condition(&cx_a, |channel, _| {
3663 channel_messages(channel)
3664 == [
3665 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3666 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3667 ("user_a".to_string(), "sup".to_string(), false),
3668 ("user_b".to_string(), "can you see this?".to_string(), false),
3669 ("user_a".to_string(), "you online?".to_string(), false),
3670 ("user_b".to_string(), "yep".to_string(), false),
3671 ]
3672 })
3673 .await;
3674 }
3675
3676 #[gpui::test(iterations = 10)]
3677 async fn test_contacts(
3678 mut cx_a: TestAppContext,
3679 mut cx_b: TestAppContext,
3680 mut cx_c: TestAppContext,
3681 ) {
3682 cx_a.foreground().forbid_parking();
3683 let lang_registry = Arc::new(LanguageRegistry::new());
3684 let fs = FakeFs::new(cx_a.background());
3685
3686 // Connect to a server as 3 clients.
3687 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3688 let client_a = server.create_client(&mut cx_a, "user_a").await;
3689 let client_b = server.create_client(&mut cx_b, "user_b").await;
3690 let client_c = server.create_client(&mut cx_c, "user_c").await;
3691
3692 // Share a worktree as client A.
3693 fs.insert_tree(
3694 "/a",
3695 json!({
3696 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3697 }),
3698 )
3699 .await;
3700
3701 let project_a = cx_a.update(|cx| {
3702 Project::local(
3703 client_a.clone(),
3704 client_a.user_store.clone(),
3705 lang_registry.clone(),
3706 fs.clone(),
3707 cx,
3708 )
3709 });
3710 let (worktree_a, _) = project_a
3711 .update(&mut cx_a, |p, cx| {
3712 p.find_or_create_local_worktree("/a", false, cx)
3713 })
3714 .await
3715 .unwrap();
3716 worktree_a
3717 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3718 .await;
3719
3720 client_a
3721 .user_store
3722 .condition(&cx_a, |user_store, _| {
3723 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3724 })
3725 .await;
3726 client_b
3727 .user_store
3728 .condition(&cx_b, |user_store, _| {
3729 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3730 })
3731 .await;
3732 client_c
3733 .user_store
3734 .condition(&cx_c, |user_store, _| {
3735 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3736 })
3737 .await;
3738
3739 let project_id = project_a
3740 .update(&mut cx_a, |project, _| project.next_remote_id())
3741 .await;
3742 project_a
3743 .update(&mut cx_a, |project, cx| project.share(cx))
3744 .await
3745 .unwrap();
3746
3747 let _project_b = Project::remote(
3748 project_id,
3749 client_b.clone(),
3750 client_b.user_store.clone(),
3751 lang_registry.clone(),
3752 fs.clone(),
3753 &mut cx_b.to_async(),
3754 )
3755 .await
3756 .unwrap();
3757
3758 client_a
3759 .user_store
3760 .condition(&cx_a, |user_store, _| {
3761 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3762 })
3763 .await;
3764 client_b
3765 .user_store
3766 .condition(&cx_b, |user_store, _| {
3767 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3768 })
3769 .await;
3770 client_c
3771 .user_store
3772 .condition(&cx_c, |user_store, _| {
3773 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3774 })
3775 .await;
3776
3777 project_a
3778 .condition(&cx_a, |project, _| {
3779 project.collaborators().contains_key(&client_b.peer_id)
3780 })
3781 .await;
3782
3783 cx_a.update(move |_| drop(project_a));
3784 client_a
3785 .user_store
3786 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3787 .await;
3788 client_b
3789 .user_store
3790 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3791 .await;
3792 client_c
3793 .user_store
3794 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3795 .await;
3796
3797 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3798 user_store
3799 .contacts()
3800 .iter()
3801 .map(|contact| {
3802 let worktrees = contact
3803 .projects
3804 .iter()
3805 .map(|p| {
3806 (
3807 p.worktree_root_names[0].as_str(),
3808 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3809 )
3810 })
3811 .collect();
3812 (contact.user.github_login.as_str(), worktrees)
3813 })
3814 .collect()
3815 }
3816 }
3817
3818 #[gpui::test(iterations = 100)]
3819 async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
3820 cx.foreground().forbid_parking();
3821 let max_peers = env::var("MAX_PEERS")
3822 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
3823 .unwrap_or(5);
3824 let max_operations = env::var("OPERATIONS")
3825 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
3826 .unwrap_or(10);
3827
3828 let rng = Rc::new(RefCell::new(rng));
3829
3830 let mut host_lang_registry = Arc::new(LanguageRegistry::new());
3831 let guest_lang_registry = Arc::new(LanguageRegistry::new());
3832
3833 // Set up a fake language server.
3834 let (mut language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
3835 language_server_config.set_fake_initializer(|fake_server| {
3836 fake_server.handle_request::<lsp::request::Completion, _>(|_| {
3837 Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
3838 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3839 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3840 new_text: "the-new-text".to_string(),
3841 })),
3842 ..Default::default()
3843 }]))
3844 });
3845
3846 fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_| {
3847 Some(vec![lsp::CodeActionOrCommand::CodeAction(
3848 lsp::CodeAction {
3849 title: "the-code-action".to_string(),
3850 ..Default::default()
3851 },
3852 )])
3853 });
3854
3855 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(|params| {
3856 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3857 params.position,
3858 params.position,
3859 )))
3860 });
3861 });
3862
3863 Arc::get_mut(&mut host_lang_registry)
3864 .unwrap()
3865 .add(Arc::new(Language::new(
3866 LanguageConfig {
3867 name: "Rust".to_string(),
3868 path_suffixes: vec!["rs".to_string()],
3869 language_server: Some(language_server_config),
3870 ..Default::default()
3871 },
3872 None,
3873 )));
3874
3875 let fs = FakeFs::new(cx.background());
3876 fs.insert_tree(
3877 "/_collab",
3878 json!({
3879 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
3880 }),
3881 )
3882 .await;
3883
3884 let operations = Rc::new(Cell::new(0));
3885 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
3886 let mut clients = Vec::new();
3887
3888 let mut next_entity_id = 100000;
3889 let mut host_cx = TestAppContext::new(
3890 cx.foreground_platform(),
3891 cx.platform(),
3892 cx.foreground(),
3893 cx.background(),
3894 cx.font_cache(),
3895 next_entity_id,
3896 );
3897 let host = server.create_client(&mut host_cx, "host").await;
3898 let host_project = host_cx.update(|cx| {
3899 Project::local(
3900 host.client.clone(),
3901 host.user_store.clone(),
3902 host_lang_registry.clone(),
3903 fs.clone(),
3904 cx,
3905 )
3906 });
3907 let host_project_id = host_project
3908 .update(&mut host_cx, |p, _| p.next_remote_id())
3909 .await;
3910
3911 let (collab_worktree, _) = host_project
3912 .update(&mut host_cx, |project, cx| {
3913 project.find_or_create_local_worktree("/_collab", false, cx)
3914 })
3915 .await
3916 .unwrap();
3917 collab_worktree
3918 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
3919 .await;
3920 host_project
3921 .update(&mut host_cx, |project, cx| project.share(cx))
3922 .await
3923 .unwrap();
3924
3925 clients.push(cx.foreground().spawn(host.simulate_host(
3926 host_project.clone(),
3927 operations.clone(),
3928 max_operations,
3929 rng.clone(),
3930 host_cx.clone(),
3931 )));
3932
3933 while operations.get() < max_operations {
3934 cx.background().simulate_random_delay().await;
3935 if clients.len() < max_peers && rng.borrow_mut().gen_bool(0.05) {
3936 operations.set(operations.get() + 1);
3937
3938 let guest_id = clients.len();
3939 log::info!("Adding guest {}", guest_id);
3940 next_entity_id += 100000;
3941 let mut guest_cx = TestAppContext::new(
3942 cx.foreground_platform(),
3943 cx.platform(),
3944 cx.foreground(),
3945 cx.background(),
3946 cx.font_cache(),
3947 next_entity_id,
3948 );
3949 let guest = server
3950 .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
3951 .await;
3952 let guest_project = Project::remote(
3953 host_project_id,
3954 guest.client.clone(),
3955 guest.user_store.clone(),
3956 guest_lang_registry.clone(),
3957 fs.clone(),
3958 &mut guest_cx.to_async(),
3959 )
3960 .await
3961 .unwrap();
3962 clients.push(cx.foreground().spawn(guest.simulate_guest(
3963 guest_id,
3964 guest_project,
3965 operations.clone(),
3966 max_operations,
3967 rng.clone(),
3968 guest_cx,
3969 )));
3970
3971 log::info!("Guest {} added", guest_id);
3972 }
3973 }
3974
3975 let clients = futures::future::join_all(clients).await;
3976 cx.foreground().run_until_parked();
3977
3978 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
3979 project
3980 .worktrees(cx)
3981 .map(|worktree| {
3982 let snapshot = worktree.read(cx).snapshot();
3983 (snapshot.id(), snapshot)
3984 })
3985 .collect::<BTreeMap<_, _>>()
3986 });
3987
3988 for (guest_client, guest_cx) in clients.iter().skip(1) {
3989 let guest_id = guest_client.client.id();
3990 let worktree_snapshots =
3991 guest_client
3992 .project
3993 .as_ref()
3994 .unwrap()
3995 .read_with(guest_cx, |project, cx| {
3996 project
3997 .worktrees(cx)
3998 .map(|worktree| {
3999 let worktree = worktree.read(cx);
4000 assert!(
4001 !worktree.as_remote().unwrap().has_pending_updates(),
4002 "Guest {} worktree {:?} contains deferred updates",
4003 guest_id,
4004 worktree.id()
4005 );
4006 (worktree.id(), worktree.snapshot())
4007 })
4008 .collect::<BTreeMap<_, _>>()
4009 });
4010
4011 assert_eq!(
4012 worktree_snapshots.keys().collect::<Vec<_>>(),
4013 host_worktree_snapshots.keys().collect::<Vec<_>>(),
4014 "guest {} has different worktrees than the host",
4015 guest_id
4016 );
4017 for (id, host_snapshot) in &host_worktree_snapshots {
4018 let guest_snapshot = &worktree_snapshots[id];
4019 assert_eq!(
4020 guest_snapshot.root_name(),
4021 host_snapshot.root_name(),
4022 "guest {} has different root name than the host for worktree {}",
4023 guest_id,
4024 id
4025 );
4026 assert_eq!(
4027 guest_snapshot.entries(false).collect::<Vec<_>>(),
4028 host_snapshot.entries(false).collect::<Vec<_>>(),
4029 "guest {} has different snapshot than the host for worktree {}",
4030 guest_id,
4031 id
4032 );
4033 }
4034
4035 guest_client
4036 .project
4037 .as_ref()
4038 .unwrap()
4039 .read_with(guest_cx, |project, _| {
4040 assert!(
4041 !project.has_buffered_operations(),
4042 "guest {} has buffered operations ",
4043 guest_id,
4044 );
4045 });
4046
4047 for guest_buffer in &guest_client.buffers {
4048 let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4049 let host_buffer = host_project.read_with(&host_cx, |project, _| {
4050 project
4051 .shared_buffer(guest_client.peer_id, buffer_id)
4052 .expect(&format!(
4053 "host doest not have buffer for guest:{}, peer:{}, id:{}",
4054 guest_id, guest_client.peer_id, buffer_id
4055 ))
4056 });
4057 assert_eq!(
4058 guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4059 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4060 "guest {} buffer {} differs from the host's buffer",
4061 guest_id,
4062 buffer_id,
4063 );
4064 }
4065 }
4066 }
4067
4068 struct TestServer {
4069 peer: Arc<Peer>,
4070 app_state: Arc<AppState>,
4071 server: Arc<Server>,
4072 foreground: Rc<executor::Foreground>,
4073 notifications: mpsc::UnboundedReceiver<()>,
4074 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4075 forbid_connections: Arc<AtomicBool>,
4076 _test_db: TestDb,
4077 }
4078
4079 impl TestServer {
4080 async fn start(
4081 foreground: Rc<executor::Foreground>,
4082 background: Arc<executor::Background>,
4083 ) -> Self {
4084 let test_db = TestDb::fake(background);
4085 let app_state = Self::build_app_state(&test_db).await;
4086 let peer = Peer::new();
4087 let notifications = mpsc::unbounded();
4088 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4089 Self {
4090 peer,
4091 app_state,
4092 server,
4093 foreground,
4094 notifications: notifications.1,
4095 connection_killers: Default::default(),
4096 forbid_connections: Default::default(),
4097 _test_db: test_db,
4098 }
4099 }
4100
4101 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4102 let http = FakeHttpClient::with_404_response();
4103 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4104 let client_name = name.to_string();
4105 let mut client = Client::new(http.clone());
4106 let server = self.server.clone();
4107 let connection_killers = self.connection_killers.clone();
4108 let forbid_connections = self.forbid_connections.clone();
4109 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4110
4111 Arc::get_mut(&mut client)
4112 .unwrap()
4113 .override_authenticate(move |cx| {
4114 cx.spawn(|_| async move {
4115 let access_token = "the-token".to_string();
4116 Ok(Credentials {
4117 user_id: user_id.0 as u64,
4118 access_token,
4119 })
4120 })
4121 })
4122 .override_establish_connection(move |credentials, cx| {
4123 assert_eq!(credentials.user_id, user_id.0 as u64);
4124 assert_eq!(credentials.access_token, "the-token");
4125
4126 let server = server.clone();
4127 let connection_killers = connection_killers.clone();
4128 let forbid_connections = forbid_connections.clone();
4129 let client_name = client_name.clone();
4130 let connection_id_tx = connection_id_tx.clone();
4131 cx.spawn(move |cx| async move {
4132 if forbid_connections.load(SeqCst) {
4133 Err(EstablishConnectionError::other(anyhow!(
4134 "server is forbidding connections"
4135 )))
4136 } else {
4137 let (client_conn, server_conn, kill_conn) =
4138 Connection::in_memory(cx.background());
4139 connection_killers.lock().insert(user_id, kill_conn);
4140 cx.background()
4141 .spawn(server.handle_connection(
4142 server_conn,
4143 client_name,
4144 user_id,
4145 Some(connection_id_tx),
4146 cx.background(),
4147 ))
4148 .detach();
4149 Ok(client_conn)
4150 }
4151 })
4152 });
4153
4154 client
4155 .authenticate_and_connect(&cx.to_async())
4156 .await
4157 .unwrap();
4158
4159 Channel::init(&client);
4160 Project::init(&client);
4161
4162 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4163 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4164 let mut authed_user =
4165 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4166 while authed_user.next().await.unwrap().is_none() {}
4167
4168 TestClient {
4169 client,
4170 peer_id,
4171 user_store,
4172 project: Default::default(),
4173 buffers: Default::default(),
4174 }
4175 }
4176
4177 fn disconnect_client(&self, user_id: UserId) {
4178 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4179 let _ = kill_conn.try_send(Some(()));
4180 }
4181 }
4182
4183 fn forbid_connections(&self) {
4184 self.forbid_connections.store(true, SeqCst);
4185 }
4186
4187 fn allow_connections(&self) {
4188 self.forbid_connections.store(false, SeqCst);
4189 }
4190
4191 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4192 let mut config = Config::default();
4193 config.session_secret = "a".repeat(32);
4194 config.database_url = test_db.url.clone();
4195 let github_client = github::AppClient::test();
4196 Arc::new(AppState {
4197 db: test_db.db().clone(),
4198 handlebars: Default::default(),
4199 auth_client: auth::build_client("", ""),
4200 repo_client: github::RepoClient::test(&github_client),
4201 github_client,
4202 config,
4203 })
4204 }
4205
4206 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4207 self.server.store.read()
4208 }
4209
4210 async fn condition<F>(&mut self, mut predicate: F)
4211 where
4212 F: FnMut(&Store) -> bool,
4213 {
4214 async_std::future::timeout(Duration::from_millis(500), async {
4215 while !(predicate)(&*self.server.store.read()) {
4216 self.foreground.start_waiting();
4217 self.notifications.next().await;
4218 self.foreground.finish_waiting();
4219 }
4220 })
4221 .await
4222 .expect("condition timed out");
4223 }
4224 }
4225
4226 impl Drop for TestServer {
4227 fn drop(&mut self) {
4228 self.peer.reset();
4229 }
4230 }
4231
4232 struct TestClient {
4233 client: Arc<Client>,
4234 pub peer_id: PeerId,
4235 pub user_store: ModelHandle<UserStore>,
4236 project: Option<ModelHandle<Project>>,
4237 buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4238 }
4239
4240 impl Deref for TestClient {
4241 type Target = Arc<Client>;
4242
4243 fn deref(&self) -> &Self::Target {
4244 &self.client
4245 }
4246 }
4247
4248 impl TestClient {
4249 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4250 UserId::from_proto(
4251 self.user_store
4252 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4253 )
4254 }
4255
4256 async fn simulate_host(
4257 mut self,
4258 project: ModelHandle<Project>,
4259 operations: Rc<Cell<usize>>,
4260 max_operations: usize,
4261 rng: Rc<RefCell<StdRng>>,
4262 mut cx: TestAppContext,
4263 ) -> (Self, TestAppContext) {
4264 let fs = project.read_with(&cx, |project, _| project.fs().clone());
4265 let mut files: Vec<PathBuf> = Default::default();
4266 while operations.get() < max_operations {
4267 operations.set(operations.get() + 1);
4268
4269 let distribution = rng.borrow_mut().gen_range(0..100);
4270 match distribution {
4271 0..=20 if !files.is_empty() => {
4272 let mut path = files.choose(&mut *rng.borrow_mut()).unwrap().as_path();
4273 while let Some(parent_path) = path.parent() {
4274 path = parent_path;
4275 if rng.borrow_mut().gen() {
4276 break;
4277 }
4278 }
4279
4280 log::info!("Host: find/create local worktree {:?}", path);
4281 project
4282 .update(&mut cx, |project, cx| {
4283 project.find_or_create_local_worktree(path, false, cx)
4284 })
4285 .await
4286 .unwrap();
4287 }
4288 10..=80 if !files.is_empty() => {
4289 let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4290 let file = files.choose(&mut *rng.borrow_mut()).unwrap();
4291 let (worktree, path) = project
4292 .update(&mut cx, |project, cx| {
4293 project.find_or_create_local_worktree(file, false, cx)
4294 })
4295 .await
4296 .unwrap();
4297 let project_path =
4298 worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4299 log::info!("Host: opening path {:?}", project_path);
4300 let buffer = project
4301 .update(&mut cx, |project, cx| {
4302 project.open_buffer(project_path, cx)
4303 })
4304 .await
4305 .unwrap();
4306 self.buffers.insert(buffer.clone());
4307 buffer
4308 } else {
4309 self.buffers
4310 .iter()
4311 .choose(&mut *rng.borrow_mut())
4312 .unwrap()
4313 .clone()
4314 };
4315
4316 if rng.borrow_mut().gen_bool(0.1) {
4317 cx.update(|cx| {
4318 log::info!(
4319 "Host: dropping buffer {:?}",
4320 buffer.read(cx).file().unwrap().full_path(cx)
4321 );
4322 self.buffers.remove(&buffer);
4323 drop(buffer);
4324 });
4325 } else {
4326 buffer.update(&mut cx, |buffer, cx| {
4327 log::info!(
4328 "Host: updating buffer {:?}",
4329 buffer.file().unwrap().full_path(cx)
4330 );
4331 buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4332 });
4333 }
4334 }
4335 _ => loop {
4336 let path_component_count = rng.borrow_mut().gen_range(1..=5);
4337 let mut path = PathBuf::new();
4338 path.push("/");
4339 for _ in 0..path_component_count {
4340 let letter = rng.borrow_mut().gen_range(b'a'..=b'z');
4341 path.push(std::str::from_utf8(&[letter]).unwrap());
4342 }
4343 path.set_extension("rs");
4344 let parent_path = path.parent().unwrap();
4345
4346 log::info!("Host: creating file {:?}", path);
4347 if fs.create_dir(&parent_path).await.is_ok()
4348 && fs.create_file(&path, Default::default()).await.is_ok()
4349 {
4350 files.push(path);
4351 break;
4352 } else {
4353 log::info!("Host: cannot create file");
4354 }
4355 },
4356 }
4357
4358 cx.background().simulate_random_delay().await;
4359 }
4360
4361 self.project = Some(project);
4362 (self, cx)
4363 }
4364
4365 pub async fn simulate_guest(
4366 mut self,
4367 guest_id: usize,
4368 project: ModelHandle<Project>,
4369 operations: Rc<Cell<usize>>,
4370 max_operations: usize,
4371 rng: Rc<RefCell<StdRng>>,
4372 mut cx: TestAppContext,
4373 ) -> (Self, TestAppContext) {
4374 while operations.get() < max_operations {
4375 let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4376 let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4377 project
4378 .worktrees(&cx)
4379 .filter(|worktree| {
4380 worktree.read(cx).entries(false).any(|e| e.is_file())
4381 })
4382 .choose(&mut *rng.borrow_mut())
4383 }) {
4384 worktree
4385 } else {
4386 cx.background().simulate_random_delay().await;
4387 continue;
4388 };
4389
4390 operations.set(operations.get() + 1);
4391 let project_path = worktree.read_with(&cx, |worktree, _| {
4392 let entry = worktree
4393 .entries(false)
4394 .filter(|e| e.is_file())
4395 .choose(&mut *rng.borrow_mut())
4396 .unwrap();
4397 (worktree.id(), entry.path.clone())
4398 });
4399 log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4400 let buffer = project
4401 .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4402 .await
4403 .unwrap();
4404 self.buffers.insert(buffer.clone());
4405 buffer
4406 } else {
4407 operations.set(operations.get() + 1);
4408
4409 self.buffers
4410 .iter()
4411 .choose(&mut *rng.borrow_mut())
4412 .unwrap()
4413 .clone()
4414 };
4415
4416 let choice = rng.borrow_mut().gen_range(0..100);
4417 match choice {
4418 0..=9 => {
4419 cx.update(|cx| {
4420 log::info!(
4421 "Guest {}: dropping buffer {:?}",
4422 guest_id,
4423 buffer.read(cx).file().unwrap().full_path(cx)
4424 );
4425 self.buffers.remove(&buffer);
4426 drop(buffer);
4427 });
4428 }
4429 10..=19 => {
4430 let completions = project.update(&mut cx, |project, cx| {
4431 log::info!(
4432 "Guest {}: requesting completions for buffer {:?}",
4433 guest_id,
4434 buffer.read(cx).file().unwrap().full_path(cx)
4435 );
4436 let offset = rng.borrow_mut().gen_range(0..=buffer.read(cx).len());
4437 project.completions(&buffer, offset, cx)
4438 });
4439 let completions = cx.background().spawn(async move {
4440 completions.await.expect("completions request failed");
4441 });
4442 if rng.borrow_mut().gen_bool(0.3) {
4443 log::info!("Guest {}: detaching completions request", guest_id);
4444 completions.detach();
4445 } else {
4446 completions.await;
4447 }
4448 }
4449 20..=29 => {
4450 let code_actions = project.update(&mut cx, |project, cx| {
4451 log::info!(
4452 "Guest {}: requesting code actions for buffer {:?}",
4453 guest_id,
4454 buffer.read(cx).file().unwrap().full_path(cx)
4455 );
4456 let range =
4457 buffer.read(cx).random_byte_range(0, &mut *rng.borrow_mut());
4458 project.code_actions(&buffer, range, cx)
4459 });
4460 let code_actions = cx.background().spawn(async move {
4461 code_actions.await.expect("code actions request failed");
4462 });
4463 if rng.borrow_mut().gen_bool(0.3) {
4464 log::info!("Guest {}: detaching code actions request", guest_id);
4465 code_actions.detach();
4466 } else {
4467 code_actions.await;
4468 }
4469 }
4470 30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4471 let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4472 log::info!(
4473 "Guest {}: saving buffer {:?}",
4474 guest_id,
4475 buffer.file().unwrap().full_path(cx)
4476 );
4477 (buffer.version(), buffer.save(cx))
4478 });
4479 let save = cx.spawn(|cx| async move {
4480 let (saved_version, _) = save.await.expect("save request failed");
4481 buffer.read_with(&cx, |buffer, _| {
4482 assert!(buffer.version().observed_all(&saved_version));
4483 assert!(saved_version.observed_all(&requested_version));
4484 });
4485 });
4486 if rng.borrow_mut().gen_bool(0.3) {
4487 log::info!("Guest {}: detaching save request", guest_id);
4488 save.detach();
4489 } else {
4490 save.await;
4491 }
4492 }
4493 40..=45 => {
4494 let prepare_rename = project.update(&mut cx, |project, cx| {
4495 log::info!(
4496 "Guest {}: preparing rename for buffer {:?}",
4497 guest_id,
4498 buffer.read(cx).file().unwrap().full_path(cx)
4499 );
4500 let offset = rng.borrow_mut().gen_range(0..=buffer.read(cx).len());
4501 project.prepare_rename(buffer, offset, cx)
4502 });
4503 let prepare_rename = cx.background().spawn(async move {
4504 prepare_rename.await.expect("prepare rename request failed");
4505 });
4506 if rng.borrow_mut().gen_bool(0.3) {
4507 log::info!("Guest {}: detaching prepare rename request", guest_id);
4508 prepare_rename.detach();
4509 } else {
4510 prepare_rename.await;
4511 }
4512 }
4513 _ => {
4514 buffer.update(&mut cx, |buffer, cx| {
4515 log::info!(
4516 "Guest {}: updating buffer {:?}",
4517 guest_id,
4518 buffer.file().unwrap().full_path(cx)
4519 );
4520 buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4521 });
4522 }
4523 }
4524 cx.background().simulate_random_delay().await;
4525 }
4526
4527 self.project = Some(project);
4528 (self, cx)
4529 }
4530 }
4531
4532 impl Executor for Arc<gpui::executor::Background> {
4533 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4534 self.spawn(future).detach();
4535 }
4536 }
4537
4538 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4539 channel
4540 .messages()
4541 .cursor::<()>()
4542 .map(|m| {
4543 (
4544 m.sender.github_login.clone(),
4545 m.body.clone(),
4546 m.is_pending(),
4547 )
4548 })
4549 .collect()
4550 }
4551
4552 struct EmptyView;
4553
4554 impl gpui::Entity for EmptyView {
4555 type Event = ();
4556 }
4557
4558 impl gpui::View for EmptyView {
4559 fn ui_name() -> &'static str {
4560 "empty view"
4561 }
4562
4563 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
4564 gpui::Element::boxed(gpui::elements::Empty)
4565 }
4566 }
4567}