1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44pub trait Executor {
45 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
46}
47
48pub struct RealExecutor;
49
50const MESSAGE_COUNT_PER_PAGE: usize = 100;
51const MAX_MESSAGE_LEN: usize = 1024;
52
53impl Server {
54 pub fn new(
55 app_state: Arc<AppState>,
56 peer: Arc<Peer>,
57 notifications: Option<mpsc::Sender<()>>,
58 ) -> Arc<Self> {
59 let mut server = Self {
60 peer,
61 app_state,
62 store: Default::default(),
63 handlers: Default::default(),
64 notifications,
65 };
66
67 server
68 .add_request_handler(Server::ping)
69 .add_request_handler(Server::register_project)
70 .add_message_handler(Server::unregister_project)
71 .add_request_handler(Server::share_project)
72 .add_message_handler(Server::unshare_project)
73 .add_request_handler(Server::join_project)
74 .add_message_handler(Server::leave_project)
75 .add_request_handler(Server::register_worktree)
76 .add_message_handler(Server::unregister_worktree)
77 .add_request_handler(Server::share_worktree)
78 .add_message_handler(Server::update_worktree)
79 .add_message_handler(Server::update_diagnostic_summary)
80 .add_message_handler(Server::disk_based_diagnostics_updating)
81 .add_message_handler(Server::disk_based_diagnostics_updated)
82 .add_request_handler(Server::get_definition)
83 .add_request_handler(Server::open_buffer)
84 .add_message_handler(Server::close_buffer)
85 .add_request_handler(Server::update_buffer)
86 .add_message_handler(Server::update_buffer_file)
87 .add_message_handler(Server::buffer_reloaded)
88 .add_message_handler(Server::buffer_saved)
89 .add_request_handler(Server::save_buffer)
90 .add_request_handler(Server::format_buffers)
91 .add_request_handler(Server::get_completions)
92 .add_request_handler(Server::apply_additional_edits_for_completion)
93 .add_request_handler(Server::get_code_actions)
94 .add_request_handler(Server::apply_code_action)
95 .add_request_handler(Server::get_channels)
96 .add_request_handler(Server::get_users)
97 .add_request_handler(Server::join_channel)
98 .add_message_handler(Server::leave_channel)
99 .add_request_handler(Server::send_channel_message)
100 .add_request_handler(Server::get_channel_messages);
101
102 Arc::new(server)
103 }
104
105 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
106 where
107 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
108 Fut: 'static + Send + Future<Output = tide::Result<()>>,
109 M: EnvelopedMessage,
110 {
111 let prev_handler = self.handlers.insert(
112 TypeId::of::<M>(),
113 Box::new(move |server, envelope| {
114 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
115 (handler)(server, *envelope).boxed()
116 }),
117 );
118 if prev_handler.is_some() {
119 panic!("registered a handler for the same message twice");
120 }
121 self
122 }
123
124 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
125 where
126 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
127 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
128 M: RequestMessage,
129 {
130 self.add_message_handler(move |server, envelope| {
131 let receipt = envelope.receipt();
132 let response = (handler)(server.clone(), envelope);
133 async move {
134 match response.await {
135 Ok(response) => {
136 server.peer.respond(receipt, response)?;
137 Ok(())
138 }
139 Err(error) => {
140 server.peer.respond_with_error(
141 receipt,
142 proto::Error {
143 message: error.to_string(),
144 },
145 )?;
146 Err(error)
147 }
148 }
149 }
150 })
151 }
152
153 pub fn handle_connection<E: Executor>(
154 self: &Arc<Self>,
155 connection: Connection,
156 addr: String,
157 user_id: UserId,
158 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
159 executor: E,
160 ) -> impl Future<Output = ()> {
161 let mut this = self.clone();
162 async move {
163 let (connection_id, handle_io, mut incoming_rx) =
164 this.peer.add_connection(connection).await;
165
166 if let Some(send_connection_id) = send_connection_id.as_mut() {
167 let _ = send_connection_id.send(connection_id).await;
168 }
169
170 this.state_mut().add_connection(connection_id, user_id);
171 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
172 log::error!("error updating contacts for {:?}: {}", user_id, err);
173 }
174
175 let handle_io = handle_io.fuse();
176 futures::pin_mut!(handle_io);
177 loop {
178 let next_message = incoming_rx.next().fuse();
179 futures::pin_mut!(next_message);
180 futures::select_biased! {
181 result = handle_io => {
182 if let Err(err) = result {
183 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
184 }
185 break;
186 }
187 message = next_message => {
188 if let Some(message) = message {
189 let start_time = Instant::now();
190 let type_name = message.payload_type_name();
191 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
192 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
193 let handle_message = (handler)(this.clone(), message);
194 let notifications = this.notifications.clone();
195 executor.spawn_detached(async move {
196 if let Err(err) = handle_message.await {
197 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
198 } else {
199 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
200 }
201 if let Some(mut notifications) = notifications {
202 let _ = notifications.send(()).await;
203 }
204 });
205 } else {
206 log::warn!("unhandled message: {}", type_name);
207 }
208 } else {
209 log::info!("rpc connection closed {:?}", addr);
210 break;
211 }
212 }
213 }
214 }
215
216 if let Err(err) = this.sign_out(connection_id).await {
217 log::error!("error signing out connection {:?} - {:?}", addr, err);
218 }
219 }
220 }
221
222 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
223 self.peer.disconnect(connection_id);
224 let removed_connection = self.state_mut().remove_connection(connection_id)?;
225
226 for (project_id, project) in removed_connection.hosted_projects {
227 if let Some(share) = project.share {
228 broadcast(
229 connection_id,
230 share.guests.keys().copied().collect(),
231 |conn_id| {
232 self.peer
233 .send(conn_id, proto::UnshareProject { project_id })
234 },
235 )?;
236 }
237 }
238
239 for (project_id, peer_ids) in removed_connection.guest_project_ids {
240 broadcast(connection_id, peer_ids, |conn_id| {
241 self.peer.send(
242 conn_id,
243 proto::RemoveProjectCollaborator {
244 project_id,
245 peer_id: connection_id.0,
246 },
247 )
248 })?;
249 }
250
251 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
252 Ok(())
253 }
254
255 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
256 Ok(proto::Ack {})
257 }
258
259 async fn register_project(
260 mut self: Arc<Server>,
261 request: TypedEnvelope<proto::RegisterProject>,
262 ) -> tide::Result<proto::RegisterProjectResponse> {
263 let project_id = {
264 let mut state = self.state_mut();
265 let user_id = state.user_id_for_connection(request.sender_id)?;
266 state.register_project(request.sender_id, user_id)
267 };
268 Ok(proto::RegisterProjectResponse { project_id })
269 }
270
271 async fn unregister_project(
272 mut self: Arc<Server>,
273 request: TypedEnvelope<proto::UnregisterProject>,
274 ) -> tide::Result<()> {
275 let project = self
276 .state_mut()
277 .unregister_project(request.payload.project_id, request.sender_id)?;
278 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
279 Ok(())
280 }
281
282 async fn share_project(
283 mut self: Arc<Server>,
284 request: TypedEnvelope<proto::ShareProject>,
285 ) -> tide::Result<proto::Ack> {
286 self.state_mut()
287 .share_project(request.payload.project_id, request.sender_id);
288 Ok(proto::Ack {})
289 }
290
291 async fn unshare_project(
292 mut self: Arc<Server>,
293 request: TypedEnvelope<proto::UnshareProject>,
294 ) -> tide::Result<()> {
295 let project_id = request.payload.project_id;
296 let project = self
297 .state_mut()
298 .unshare_project(project_id, request.sender_id)?;
299
300 broadcast(request.sender_id, project.connection_ids, |conn_id| {
301 self.peer
302 .send(conn_id, proto::UnshareProject { project_id })
303 })?;
304 self.update_contacts_for_users(&project.authorized_user_ids)?;
305 Ok(())
306 }
307
308 async fn join_project(
309 mut self: Arc<Server>,
310 request: TypedEnvelope<proto::JoinProject>,
311 ) -> tide::Result<proto::JoinProjectResponse> {
312 let project_id = request.payload.project_id;
313
314 let user_id = self.state().user_id_for_connection(request.sender_id)?;
315 let (response, connection_ids, contact_user_ids) = self
316 .state_mut()
317 .join_project(request.sender_id, user_id, project_id)
318 .and_then(|joined| {
319 let share = joined.project.share()?;
320 let peer_count = share.guests.len();
321 let mut collaborators = Vec::with_capacity(peer_count);
322 collaborators.push(proto::Collaborator {
323 peer_id: joined.project.host_connection_id.0,
324 replica_id: 0,
325 user_id: joined.project.host_user_id.to_proto(),
326 });
327 let worktrees = joined
328 .project
329 .worktrees
330 .iter()
331 .filter_map(|(id, worktree)| {
332 worktree.share.as_ref().map(|share| proto::Worktree {
333 id: *id,
334 root_name: worktree.root_name.clone(),
335 entries: share.entries.values().cloned().collect(),
336 diagnostic_summaries: share
337 .diagnostic_summaries
338 .values()
339 .cloned()
340 .collect(),
341 weak: worktree.weak,
342 next_update_id: share.next_update_id as u64,
343 })
344 })
345 .collect();
346 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
347 if *peer_conn_id != request.sender_id {
348 collaborators.push(proto::Collaborator {
349 peer_id: peer_conn_id.0,
350 replica_id: *peer_replica_id as u32,
351 user_id: peer_user_id.to_proto(),
352 });
353 }
354 }
355 let response = proto::JoinProjectResponse {
356 worktrees,
357 replica_id: joined.replica_id as u32,
358 collaborators,
359 };
360 let connection_ids = joined.project.connection_ids();
361 let contact_user_ids = joined.project.authorized_user_ids();
362 Ok((response, connection_ids, contact_user_ids))
363 })?;
364
365 broadcast(request.sender_id, connection_ids, |conn_id| {
366 self.peer.send(
367 conn_id,
368 proto::AddProjectCollaborator {
369 project_id,
370 collaborator: Some(proto::Collaborator {
371 peer_id: request.sender_id.0,
372 replica_id: response.replica_id,
373 user_id: user_id.to_proto(),
374 }),
375 },
376 )
377 })?;
378 self.update_contacts_for_users(&contact_user_ids)?;
379 Ok(response)
380 }
381
382 async fn leave_project(
383 mut self: Arc<Server>,
384 request: TypedEnvelope<proto::LeaveProject>,
385 ) -> tide::Result<()> {
386 let sender_id = request.sender_id;
387 let project_id = request.payload.project_id;
388 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
389
390 broadcast(sender_id, worktree.connection_ids, |conn_id| {
391 self.peer.send(
392 conn_id,
393 proto::RemoveProjectCollaborator {
394 project_id,
395 peer_id: sender_id.0,
396 },
397 )
398 })?;
399 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
400
401 Ok(())
402 }
403
404 async fn register_worktree(
405 mut self: Arc<Server>,
406 request: TypedEnvelope<proto::RegisterWorktree>,
407 ) -> tide::Result<proto::Ack> {
408 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
409
410 let mut contact_user_ids = HashSet::default();
411 contact_user_ids.insert(host_user_id);
412 for github_login in request.payload.authorized_logins {
413 let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
414 contact_user_ids.insert(contact_user_id);
415 }
416
417 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
418 self.state_mut().register_worktree(
419 request.payload.project_id,
420 request.payload.worktree_id,
421 request.sender_id,
422 Worktree {
423 authorized_user_ids: contact_user_ids.clone(),
424 root_name: request.payload.root_name,
425 share: None,
426 weak: false,
427 },
428 )?;
429 self.update_contacts_for_users(&contact_user_ids)?;
430 Ok(proto::Ack {})
431 }
432
433 async fn unregister_worktree(
434 mut self: Arc<Server>,
435 request: TypedEnvelope<proto::UnregisterWorktree>,
436 ) -> tide::Result<()> {
437 let project_id = request.payload.project_id;
438 let worktree_id = request.payload.worktree_id;
439 let (worktree, guest_connection_ids) =
440 self.state_mut()
441 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
442 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
443 self.peer.send(
444 conn_id,
445 proto::UnregisterWorktree {
446 project_id,
447 worktree_id,
448 },
449 )
450 })?;
451 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
452 Ok(())
453 }
454
455 async fn share_worktree(
456 mut self: Arc<Server>,
457 mut request: TypedEnvelope<proto::ShareWorktree>,
458 ) -> tide::Result<proto::Ack> {
459 let worktree = request
460 .payload
461 .worktree
462 .as_mut()
463 .ok_or_else(|| anyhow!("missing worktree"))?;
464 let entries = worktree
465 .entries
466 .iter()
467 .map(|entry| (entry.id, entry.clone()))
468 .collect();
469 let diagnostic_summaries = worktree
470 .diagnostic_summaries
471 .iter()
472 .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
473 .collect();
474
475 let shared_worktree = self.state_mut().share_worktree(
476 request.payload.project_id,
477 worktree.id,
478 request.sender_id,
479 entries,
480 diagnostic_summaries,
481 worktree.next_update_id,
482 )?;
483
484 broadcast(
485 request.sender_id,
486 shared_worktree.connection_ids,
487 |connection_id| {
488 self.peer
489 .forward_send(request.sender_id, connection_id, request.payload.clone())
490 },
491 )?;
492 self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
493
494 Ok(proto::Ack {})
495 }
496
497 async fn update_worktree(
498 mut self: Arc<Server>,
499 request: TypedEnvelope<proto::UpdateWorktree>,
500 ) -> tide::Result<()> {
501 let connection_ids = self.state_mut().update_worktree(
502 request.sender_id,
503 request.payload.project_id,
504 request.payload.worktree_id,
505 &request.payload.removed_entries,
506 &request.payload.updated_entries,
507 )?;
508
509 broadcast(request.sender_id, connection_ids, |connection_id| {
510 self.peer
511 .forward_send(request.sender_id, connection_id, request.payload.clone())
512 })?;
513
514 Ok(())
515 }
516
517 async fn update_diagnostic_summary(
518 mut self: Arc<Server>,
519 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
520 ) -> tide::Result<()> {
521 let summary = request
522 .payload
523 .summary
524 .clone()
525 .ok_or_else(|| anyhow!("invalid summary"))?;
526 let receiver_ids = self.state_mut().update_diagnostic_summary(
527 request.payload.project_id,
528 request.payload.worktree_id,
529 request.sender_id,
530 summary,
531 )?;
532
533 broadcast(request.sender_id, receiver_ids, |connection_id| {
534 self.peer
535 .forward_send(request.sender_id, connection_id, request.payload.clone())
536 })?;
537 Ok(())
538 }
539
540 async fn disk_based_diagnostics_updating(
541 self: Arc<Server>,
542 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
543 ) -> tide::Result<()> {
544 let receiver_ids = self
545 .state()
546 .project_connection_ids(request.payload.project_id, request.sender_id)?;
547 broadcast(request.sender_id, receiver_ids, |connection_id| {
548 self.peer
549 .forward_send(request.sender_id, connection_id, request.payload.clone())
550 })?;
551 Ok(())
552 }
553
554 async fn disk_based_diagnostics_updated(
555 self: Arc<Server>,
556 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
557 ) -> tide::Result<()> {
558 let receiver_ids = self
559 .state()
560 .project_connection_ids(request.payload.project_id, request.sender_id)?;
561 broadcast(request.sender_id, receiver_ids, |connection_id| {
562 self.peer
563 .forward_send(request.sender_id, connection_id, request.payload.clone())
564 })?;
565 Ok(())
566 }
567
568 async fn get_definition(
569 self: Arc<Server>,
570 request: TypedEnvelope<proto::GetDefinition>,
571 ) -> tide::Result<proto::GetDefinitionResponse> {
572 let host_connection_id = self
573 .state()
574 .read_project(request.payload.project_id, request.sender_id)?
575 .host_connection_id;
576 Ok(self
577 .peer
578 .forward_request(request.sender_id, host_connection_id, request.payload)
579 .await?)
580 }
581
582 async fn open_buffer(
583 self: Arc<Server>,
584 request: TypedEnvelope<proto::OpenBuffer>,
585 ) -> tide::Result<proto::OpenBufferResponse> {
586 let host_connection_id = self
587 .state()
588 .read_project(request.payload.project_id, request.sender_id)?
589 .host_connection_id;
590 Ok(self
591 .peer
592 .forward_request(request.sender_id, host_connection_id, request.payload)
593 .await?)
594 }
595
596 async fn close_buffer(
597 self: Arc<Server>,
598 request: TypedEnvelope<proto::CloseBuffer>,
599 ) -> tide::Result<()> {
600 let host_connection_id = self
601 .state()
602 .read_project(request.payload.project_id, request.sender_id)?
603 .host_connection_id;
604 self.peer
605 .forward_send(request.sender_id, host_connection_id, request.payload)?;
606 Ok(())
607 }
608
609 async fn save_buffer(
610 self: Arc<Server>,
611 request: TypedEnvelope<proto::SaveBuffer>,
612 ) -> tide::Result<proto::BufferSaved> {
613 let host;
614 let mut guests;
615 {
616 let state = self.state();
617 let project = state.read_project(request.payload.project_id, request.sender_id)?;
618 host = project.host_connection_id;
619 guests = project.guest_connection_ids()
620 }
621
622 let response = self
623 .peer
624 .forward_request(request.sender_id, host, request.payload.clone())
625 .await?;
626
627 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
628 broadcast(host, guests, |conn_id| {
629 self.peer.forward_send(host, conn_id, response.clone())
630 })?;
631
632 Ok(response)
633 }
634
635 async fn format_buffers(
636 self: Arc<Server>,
637 request: TypedEnvelope<proto::FormatBuffers>,
638 ) -> tide::Result<proto::FormatBuffersResponse> {
639 let host = self
640 .state()
641 .read_project(request.payload.project_id, request.sender_id)?
642 .host_connection_id;
643 Ok(self
644 .peer
645 .forward_request(request.sender_id, host, request.payload.clone())
646 .await?)
647 }
648
649 async fn get_completions(
650 self: Arc<Server>,
651 request: TypedEnvelope<proto::GetCompletions>,
652 ) -> tide::Result<proto::GetCompletionsResponse> {
653 let host = self
654 .state()
655 .read_project(request.payload.project_id, request.sender_id)?
656 .host_connection_id;
657 Ok(self
658 .peer
659 .forward_request(request.sender_id, host, request.payload.clone())
660 .await?)
661 }
662
663 async fn apply_additional_edits_for_completion(
664 self: Arc<Server>,
665 request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
666 ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
667 let host = self
668 .state()
669 .read_project(request.payload.project_id, request.sender_id)?
670 .host_connection_id;
671 Ok(self
672 .peer
673 .forward_request(request.sender_id, host, request.payload.clone())
674 .await?)
675 }
676
677 async fn get_code_actions(
678 self: Arc<Server>,
679 request: TypedEnvelope<proto::GetCodeActions>,
680 ) -> tide::Result<proto::GetCodeActionsResponse> {
681 let host = self
682 .state()
683 .read_project(request.payload.project_id, request.sender_id)?
684 .host_connection_id;
685 Ok(self
686 .peer
687 .forward_request(request.sender_id, host, request.payload.clone())
688 .await?)
689 }
690
691 async fn apply_code_action(
692 self: Arc<Server>,
693 request: TypedEnvelope<proto::ApplyCodeAction>,
694 ) -> tide::Result<proto::ApplyCodeActionResponse> {
695 let host = self
696 .state()
697 .read_project(request.payload.project_id, request.sender_id)?
698 .host_connection_id;
699 Ok(self
700 .peer
701 .forward_request(request.sender_id, host, request.payload.clone())
702 .await?)
703 }
704
705 async fn update_buffer(
706 self: Arc<Server>,
707 request: TypedEnvelope<proto::UpdateBuffer>,
708 ) -> tide::Result<proto::Ack> {
709 let receiver_ids = self
710 .state()
711 .project_connection_ids(request.payload.project_id, request.sender_id)?;
712 broadcast(request.sender_id, receiver_ids, |connection_id| {
713 self.peer
714 .forward_send(request.sender_id, connection_id, request.payload.clone())
715 })?;
716 Ok(proto::Ack {})
717 }
718
719 async fn update_buffer_file(
720 self: Arc<Server>,
721 request: TypedEnvelope<proto::UpdateBufferFile>,
722 ) -> tide::Result<()> {
723 let receiver_ids = self
724 .state()
725 .project_connection_ids(request.payload.project_id, request.sender_id)?;
726 broadcast(request.sender_id, receiver_ids, |connection_id| {
727 self.peer
728 .forward_send(request.sender_id, connection_id, request.payload.clone())
729 })?;
730 Ok(())
731 }
732
733 async fn buffer_reloaded(
734 self: Arc<Server>,
735 request: TypedEnvelope<proto::BufferReloaded>,
736 ) -> tide::Result<()> {
737 let receiver_ids = self
738 .state()
739 .project_connection_ids(request.payload.project_id, request.sender_id)?;
740 broadcast(request.sender_id, receiver_ids, |connection_id| {
741 self.peer
742 .forward_send(request.sender_id, connection_id, request.payload.clone())
743 })?;
744 Ok(())
745 }
746
747 async fn buffer_saved(
748 self: Arc<Server>,
749 request: TypedEnvelope<proto::BufferSaved>,
750 ) -> tide::Result<()> {
751 let receiver_ids = self
752 .state()
753 .project_connection_ids(request.payload.project_id, request.sender_id)?;
754 broadcast(request.sender_id, receiver_ids, |connection_id| {
755 self.peer
756 .forward_send(request.sender_id, connection_id, request.payload.clone())
757 })?;
758 Ok(())
759 }
760
761 async fn get_channels(
762 self: Arc<Server>,
763 request: TypedEnvelope<proto::GetChannels>,
764 ) -> tide::Result<proto::GetChannelsResponse> {
765 let user_id = self.state().user_id_for_connection(request.sender_id)?;
766 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
767 Ok(proto::GetChannelsResponse {
768 channels: channels
769 .into_iter()
770 .map(|chan| proto::Channel {
771 id: chan.id.to_proto(),
772 name: chan.name,
773 })
774 .collect(),
775 })
776 }
777
778 async fn get_users(
779 self: Arc<Server>,
780 request: TypedEnvelope<proto::GetUsers>,
781 ) -> tide::Result<proto::GetUsersResponse> {
782 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
783 let users = self
784 .app_state
785 .db
786 .get_users_by_ids(user_ids)
787 .await?
788 .into_iter()
789 .map(|user| proto::User {
790 id: user.id.to_proto(),
791 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
792 github_login: user.github_login,
793 })
794 .collect();
795 Ok(proto::GetUsersResponse { users })
796 }
797
798 fn update_contacts_for_users<'a>(
799 self: &Arc<Server>,
800 user_ids: impl IntoIterator<Item = &'a UserId>,
801 ) -> anyhow::Result<()> {
802 let mut result = Ok(());
803 let state = self.state();
804 for user_id in user_ids {
805 let contacts = state.contacts_for_user(*user_id);
806 for connection_id in state.connection_ids_for_user(*user_id) {
807 if let Err(error) = self.peer.send(
808 connection_id,
809 proto::UpdateContacts {
810 contacts: contacts.clone(),
811 },
812 ) {
813 result = Err(error);
814 }
815 }
816 }
817 result
818 }
819
820 async fn join_channel(
821 mut self: Arc<Self>,
822 request: TypedEnvelope<proto::JoinChannel>,
823 ) -> tide::Result<proto::JoinChannelResponse> {
824 let user_id = self.state().user_id_for_connection(request.sender_id)?;
825 let channel_id = ChannelId::from_proto(request.payload.channel_id);
826 if !self
827 .app_state
828 .db
829 .can_user_access_channel(user_id, channel_id)
830 .await?
831 {
832 Err(anyhow!("access denied"))?;
833 }
834
835 self.state_mut().join_channel(request.sender_id, channel_id);
836 let messages = self
837 .app_state
838 .db
839 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
840 .await?
841 .into_iter()
842 .map(|msg| proto::ChannelMessage {
843 id: msg.id.to_proto(),
844 body: msg.body,
845 timestamp: msg.sent_at.unix_timestamp() as u64,
846 sender_id: msg.sender_id.to_proto(),
847 nonce: Some(msg.nonce.as_u128().into()),
848 })
849 .collect::<Vec<_>>();
850 Ok(proto::JoinChannelResponse {
851 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
852 messages,
853 })
854 }
855
856 async fn leave_channel(
857 mut self: Arc<Self>,
858 request: TypedEnvelope<proto::LeaveChannel>,
859 ) -> tide::Result<()> {
860 let user_id = self.state().user_id_for_connection(request.sender_id)?;
861 let channel_id = ChannelId::from_proto(request.payload.channel_id);
862 if !self
863 .app_state
864 .db
865 .can_user_access_channel(user_id, channel_id)
866 .await?
867 {
868 Err(anyhow!("access denied"))?;
869 }
870
871 self.state_mut()
872 .leave_channel(request.sender_id, channel_id);
873
874 Ok(())
875 }
876
877 async fn send_channel_message(
878 self: Arc<Self>,
879 request: TypedEnvelope<proto::SendChannelMessage>,
880 ) -> tide::Result<proto::SendChannelMessageResponse> {
881 let channel_id = ChannelId::from_proto(request.payload.channel_id);
882 let user_id;
883 let connection_ids;
884 {
885 let state = self.state();
886 user_id = state.user_id_for_connection(request.sender_id)?;
887 connection_ids = state.channel_connection_ids(channel_id)?;
888 }
889
890 // Validate the message body.
891 let body = request.payload.body.trim().to_string();
892 if body.len() > MAX_MESSAGE_LEN {
893 return Err(anyhow!("message is too long"))?;
894 }
895 if body.is_empty() {
896 return Err(anyhow!("message can't be blank"))?;
897 }
898
899 let timestamp = OffsetDateTime::now_utc();
900 let nonce = request
901 .payload
902 .nonce
903 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
904
905 let message_id = self
906 .app_state
907 .db
908 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
909 .await?
910 .to_proto();
911 let message = proto::ChannelMessage {
912 sender_id: user_id.to_proto(),
913 id: message_id,
914 body,
915 timestamp: timestamp.unix_timestamp() as u64,
916 nonce: Some(nonce),
917 };
918 broadcast(request.sender_id, connection_ids, |conn_id| {
919 self.peer.send(
920 conn_id,
921 proto::ChannelMessageSent {
922 channel_id: channel_id.to_proto(),
923 message: Some(message.clone()),
924 },
925 )
926 })?;
927 Ok(proto::SendChannelMessageResponse {
928 message: Some(message),
929 })
930 }
931
932 async fn get_channel_messages(
933 self: Arc<Self>,
934 request: TypedEnvelope<proto::GetChannelMessages>,
935 ) -> tide::Result<proto::GetChannelMessagesResponse> {
936 let user_id = self.state().user_id_for_connection(request.sender_id)?;
937 let channel_id = ChannelId::from_proto(request.payload.channel_id);
938 if !self
939 .app_state
940 .db
941 .can_user_access_channel(user_id, channel_id)
942 .await?
943 {
944 Err(anyhow!("access denied"))?;
945 }
946
947 let messages = self
948 .app_state
949 .db
950 .get_channel_messages(
951 channel_id,
952 MESSAGE_COUNT_PER_PAGE,
953 Some(MessageId::from_proto(request.payload.before_message_id)),
954 )
955 .await?
956 .into_iter()
957 .map(|msg| proto::ChannelMessage {
958 id: msg.id.to_proto(),
959 body: msg.body,
960 timestamp: msg.sent_at.unix_timestamp() as u64,
961 sender_id: msg.sender_id.to_proto(),
962 nonce: Some(msg.nonce.as_u128().into()),
963 })
964 .collect::<Vec<_>>();
965
966 Ok(proto::GetChannelMessagesResponse {
967 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
968 messages,
969 })
970 }
971
972 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
973 self.store.read()
974 }
975
976 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
977 self.store.write()
978 }
979}
980
981impl Executor for RealExecutor {
982 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
983 task::spawn(future);
984 }
985}
986
987fn broadcast<F>(
988 sender_id: ConnectionId,
989 receiver_ids: Vec<ConnectionId>,
990 mut f: F,
991) -> anyhow::Result<()>
992where
993 F: FnMut(ConnectionId) -> anyhow::Result<()>,
994{
995 let mut result = Ok(());
996 for receiver_id in receiver_ids {
997 if receiver_id != sender_id {
998 if let Err(error) = f(receiver_id) {
999 if result.is_ok() {
1000 result = Err(error);
1001 }
1002 }
1003 }
1004 }
1005 result
1006}
1007
1008pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1009 let server = Server::new(app.state().clone(), rpc.clone(), None);
1010 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1011 let server = server.clone();
1012 async move {
1013 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1014
1015 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1016 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1017 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1018 let client_protocol_version: Option<u32> = request
1019 .header("X-Zed-Protocol-Version")
1020 .and_then(|v| v.as_str().parse().ok());
1021
1022 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1023 return Ok(Response::new(StatusCode::UpgradeRequired));
1024 }
1025
1026 let header = match request.header("Sec-Websocket-Key") {
1027 Some(h) => h.as_str(),
1028 None => return Err(anyhow!("expected sec-websocket-key"))?,
1029 };
1030
1031 let user_id = process_auth_header(&request).await?;
1032
1033 let mut response = Response::new(StatusCode::SwitchingProtocols);
1034 response.insert_header(UPGRADE, "websocket");
1035 response.insert_header(CONNECTION, "Upgrade");
1036 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1037 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1038 response.insert_header("Sec-Websocket-Version", "13");
1039
1040 let http_res: &mut tide::http::Response = response.as_mut();
1041 let upgrade_receiver = http_res.recv_upgrade().await;
1042 let addr = request.remote().unwrap_or("unknown").to_string();
1043 task::spawn(async move {
1044 if let Some(stream) = upgrade_receiver.await {
1045 server
1046 .handle_connection(
1047 Connection::new(
1048 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1049 ),
1050 addr,
1051 user_id,
1052 None,
1053 RealExecutor,
1054 )
1055 .await;
1056 }
1057 });
1058
1059 Ok(response)
1060 }
1061 });
1062}
1063
1064fn header_contains_ignore_case<T>(
1065 request: &tide::Request<T>,
1066 header_name: HeaderName,
1067 value: &str,
1068) -> bool {
1069 request
1070 .header(header_name)
1071 .map(|h| {
1072 h.as_str()
1073 .split(',')
1074 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1075 })
1076 .unwrap_or(false)
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081 use super::*;
1082 use crate::{
1083 auth,
1084 db::{tests::TestDb, UserId},
1085 github, AppState, Config,
1086 };
1087 use ::rpc::Peer;
1088 use gpui::{executor, ModelHandle, TestAppContext};
1089 use parking_lot::Mutex;
1090 use postage::{mpsc, watch};
1091 use rand::prelude::*;
1092 use rpc::PeerId;
1093 use serde_json::json;
1094 use sqlx::types::time::OffsetDateTime;
1095 use std::{
1096 cell::{Cell, RefCell},
1097 env,
1098 ops::Deref,
1099 path::Path,
1100 rc::Rc,
1101 sync::{
1102 atomic::{AtomicBool, Ordering::SeqCst},
1103 Arc,
1104 },
1105 time::Duration,
1106 };
1107 use zed::{
1108 client::{
1109 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1110 EstablishConnectionError, UserStore,
1111 },
1112 editor::{
1113 self, ConfirmCodeAction, ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer,
1114 Redo, ToggleCodeActions, Undo,
1115 },
1116 fs::{FakeFs, Fs as _},
1117 language::{
1118 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1119 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1120 },
1121 lsp,
1122 project::{DiagnosticSummary, Project, ProjectPath},
1123 workspace::{Workspace, WorkspaceParams},
1124 };
1125
1126 #[cfg(test)]
1127 #[ctor::ctor]
1128 fn init_logger() {
1129 if std::env::var("RUST_LOG").is_ok() {
1130 env_logger::init();
1131 }
1132 }
1133
1134 #[gpui::test(iterations = 10)]
1135 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1136 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1137 let lang_registry = Arc::new(LanguageRegistry::new());
1138 let fs = Arc::new(FakeFs::new(cx_a.background()));
1139 cx_a.foreground().forbid_parking();
1140
1141 // Connect to a server as 2 clients.
1142 let mut server = TestServer::start(cx_a.foreground()).await;
1143 let client_a = server.create_client(&mut cx_a, "user_a").await;
1144 let client_b = server.create_client(&mut cx_b, "user_b").await;
1145
1146 // Share a project as client A
1147 fs.insert_tree(
1148 "/a",
1149 json!({
1150 ".zed.toml": r#"collaborators = ["user_b"]"#,
1151 "a.txt": "a-contents",
1152 "b.txt": "b-contents",
1153 }),
1154 )
1155 .await;
1156 let project_a = cx_a.update(|cx| {
1157 Project::local(
1158 client_a.clone(),
1159 client_a.user_store.clone(),
1160 lang_registry.clone(),
1161 fs.clone(),
1162 cx,
1163 )
1164 });
1165 let (worktree_a, _) = project_a
1166 .update(&mut cx_a, |p, cx| {
1167 p.find_or_create_local_worktree("/a", false, cx)
1168 })
1169 .await
1170 .unwrap();
1171 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1172 worktree_a
1173 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1174 .await;
1175 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1176 project_a
1177 .update(&mut cx_a, |p, cx| p.share(cx))
1178 .await
1179 .unwrap();
1180
1181 // Join that project as client B
1182 let project_b = Project::remote(
1183 project_id,
1184 client_b.clone(),
1185 client_b.user_store.clone(),
1186 lang_registry.clone(),
1187 fs.clone(),
1188 &mut cx_b.to_async(),
1189 )
1190 .await
1191 .unwrap();
1192
1193 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1194 assert_eq!(
1195 project
1196 .collaborators()
1197 .get(&client_a.peer_id)
1198 .unwrap()
1199 .user
1200 .github_login,
1201 "user_a"
1202 );
1203 project.replica_id()
1204 });
1205 project_a
1206 .condition(&cx_a, |tree, _| {
1207 tree.collaborators()
1208 .get(&client_b.peer_id)
1209 .map_or(false, |collaborator| {
1210 collaborator.replica_id == replica_id_b
1211 && collaborator.user.github_login == "user_b"
1212 })
1213 })
1214 .await;
1215
1216 // Open the same file as client B and client A.
1217 let buffer_b = project_b
1218 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1219 .await
1220 .unwrap();
1221 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1222 buffer_b.read_with(&cx_b, |buf, cx| {
1223 assert_eq!(buf.read(cx).text(), "b-contents")
1224 });
1225 project_a.read_with(&cx_a, |project, cx| {
1226 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1227 });
1228 let buffer_a = project_a
1229 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1230 .await
1231 .unwrap();
1232
1233 let editor_b = cx_b.add_view(window_b, |cx| {
1234 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1235 });
1236
1237 // TODO
1238 // // Create a selection set as client B and see that selection set as client A.
1239 // buffer_a
1240 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1241 // .await;
1242
1243 // Edit the buffer as client B and see that edit as client A.
1244 editor_b.update(&mut cx_b, |editor, cx| {
1245 editor.handle_input(&Input("ok, ".into()), cx)
1246 });
1247 buffer_a
1248 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1249 .await;
1250
1251 // TODO
1252 // // Remove the selection set as client B, see those selections disappear as client A.
1253 cx_b.update(move |_| drop(editor_b));
1254 // buffer_a
1255 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1256 // .await;
1257
1258 // Close the buffer as client A, see that the buffer is closed.
1259 cx_a.update(move |_| drop(buffer_a));
1260 project_a
1261 .condition(&cx_a, |project, cx| {
1262 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1263 })
1264 .await;
1265
1266 // Dropping the client B's project removes client B from client A's collaborators.
1267 cx_b.update(move |_| drop(project_b));
1268 project_a
1269 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1270 .await;
1271 }
1272
1273 #[gpui::test(iterations = 10)]
1274 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1275 let lang_registry = Arc::new(LanguageRegistry::new());
1276 let fs = Arc::new(FakeFs::new(cx_a.background()));
1277 cx_a.foreground().forbid_parking();
1278
1279 // Connect to a server as 2 clients.
1280 let mut server = TestServer::start(cx_a.foreground()).await;
1281 let client_a = server.create_client(&mut cx_a, "user_a").await;
1282 let client_b = server.create_client(&mut cx_b, "user_b").await;
1283
1284 // Share a project as client A
1285 fs.insert_tree(
1286 "/a",
1287 json!({
1288 ".zed.toml": r#"collaborators = ["user_b"]"#,
1289 "a.txt": "a-contents",
1290 "b.txt": "b-contents",
1291 }),
1292 )
1293 .await;
1294 let project_a = cx_a.update(|cx| {
1295 Project::local(
1296 client_a.clone(),
1297 client_a.user_store.clone(),
1298 lang_registry.clone(),
1299 fs.clone(),
1300 cx,
1301 )
1302 });
1303 let (worktree_a, _) = project_a
1304 .update(&mut cx_a, |p, cx| {
1305 p.find_or_create_local_worktree("/a", false, cx)
1306 })
1307 .await
1308 .unwrap();
1309 worktree_a
1310 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1311 .await;
1312 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1313 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1314 project_a
1315 .update(&mut cx_a, |p, cx| p.share(cx))
1316 .await
1317 .unwrap();
1318 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1319
1320 // Join that project as client B
1321 let project_b = Project::remote(
1322 project_id,
1323 client_b.clone(),
1324 client_b.user_store.clone(),
1325 lang_registry.clone(),
1326 fs.clone(),
1327 &mut cx_b.to_async(),
1328 )
1329 .await
1330 .unwrap();
1331 project_b
1332 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1333 .await
1334 .unwrap();
1335
1336 // Unshare the project as client A
1337 project_a
1338 .update(&mut cx_a, |project, cx| project.unshare(cx))
1339 .await
1340 .unwrap();
1341 project_b
1342 .condition(&mut cx_b, |project, _| project.is_read_only())
1343 .await;
1344 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1345 drop(project_b);
1346
1347 // Share the project again and ensure guests can still join.
1348 project_a
1349 .update(&mut cx_a, |project, cx| project.share(cx))
1350 .await
1351 .unwrap();
1352 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1353
1354 let project_c = Project::remote(
1355 project_id,
1356 client_b.clone(),
1357 client_b.user_store.clone(),
1358 lang_registry.clone(),
1359 fs.clone(),
1360 &mut cx_b.to_async(),
1361 )
1362 .await
1363 .unwrap();
1364 project_c
1365 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1366 .await
1367 .unwrap();
1368 }
1369
1370 #[gpui::test(iterations = 10)]
1371 async fn test_propagate_saves_and_fs_changes(
1372 mut cx_a: TestAppContext,
1373 mut cx_b: TestAppContext,
1374 mut cx_c: TestAppContext,
1375 ) {
1376 let lang_registry = Arc::new(LanguageRegistry::new());
1377 let fs = Arc::new(FakeFs::new(cx_a.background()));
1378 cx_a.foreground().forbid_parking();
1379
1380 // Connect to a server as 3 clients.
1381 let mut server = TestServer::start(cx_a.foreground()).await;
1382 let client_a = server.create_client(&mut cx_a, "user_a").await;
1383 let client_b = server.create_client(&mut cx_b, "user_b").await;
1384 let client_c = server.create_client(&mut cx_c, "user_c").await;
1385
1386 // Share a worktree as client A.
1387 fs.insert_tree(
1388 "/a",
1389 json!({
1390 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1391 "file1": "",
1392 "file2": ""
1393 }),
1394 )
1395 .await;
1396 let project_a = cx_a.update(|cx| {
1397 Project::local(
1398 client_a.clone(),
1399 client_a.user_store.clone(),
1400 lang_registry.clone(),
1401 fs.clone(),
1402 cx,
1403 )
1404 });
1405 let (worktree_a, _) = project_a
1406 .update(&mut cx_a, |p, cx| {
1407 p.find_or_create_local_worktree("/a", false, cx)
1408 })
1409 .await
1410 .unwrap();
1411 worktree_a
1412 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1413 .await;
1414 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1415 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1416 project_a
1417 .update(&mut cx_a, |p, cx| p.share(cx))
1418 .await
1419 .unwrap();
1420
1421 // Join that worktree as clients B and C.
1422 let project_b = Project::remote(
1423 project_id,
1424 client_b.clone(),
1425 client_b.user_store.clone(),
1426 lang_registry.clone(),
1427 fs.clone(),
1428 &mut cx_b.to_async(),
1429 )
1430 .await
1431 .unwrap();
1432 let project_c = Project::remote(
1433 project_id,
1434 client_c.clone(),
1435 client_c.user_store.clone(),
1436 lang_registry.clone(),
1437 fs.clone(),
1438 &mut cx_c.to_async(),
1439 )
1440 .await
1441 .unwrap();
1442 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1443 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1444
1445 // Open and edit a buffer as both guests B and C.
1446 let buffer_b = project_b
1447 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1448 .await
1449 .unwrap();
1450 let buffer_c = project_c
1451 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1452 .await
1453 .unwrap();
1454 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1455 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1456
1457 // Open and edit that buffer as the host.
1458 let buffer_a = project_a
1459 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1460 .await
1461 .unwrap();
1462
1463 buffer_a
1464 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1465 .await;
1466 buffer_a.update(&mut cx_a, |buf, cx| {
1467 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1468 });
1469
1470 // Wait for edits to propagate
1471 buffer_a
1472 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1473 .await;
1474 buffer_b
1475 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1476 .await;
1477 buffer_c
1478 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1479 .await;
1480
1481 // Edit the buffer as the host and concurrently save as guest B.
1482 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1483 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1484 save_b.await.unwrap();
1485 assert_eq!(
1486 fs.load("/a/file1".as_ref()).await.unwrap(),
1487 "hi-a, i-am-c, i-am-b, i-am-a"
1488 );
1489 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1490 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1491 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1492
1493 // Make changes on host's file system, see those changes on guest worktrees.
1494 fs.rename(
1495 "/a/file1".as_ref(),
1496 "/a/file1-renamed".as_ref(),
1497 Default::default(),
1498 )
1499 .await
1500 .unwrap();
1501
1502 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1503 .await
1504 .unwrap();
1505 fs.insert_file(Path::new("/a/file4"), "4".into())
1506 .await
1507 .unwrap();
1508
1509 worktree_a
1510 .condition(&cx_a, |tree, _| {
1511 tree.paths()
1512 .map(|p| p.to_string_lossy())
1513 .collect::<Vec<_>>()
1514 == [".zed.toml", "file1-renamed", "file3", "file4"]
1515 })
1516 .await;
1517 worktree_b
1518 .condition(&cx_b, |tree, _| {
1519 tree.paths()
1520 .map(|p| p.to_string_lossy())
1521 .collect::<Vec<_>>()
1522 == [".zed.toml", "file1-renamed", "file3", "file4"]
1523 })
1524 .await;
1525 worktree_c
1526 .condition(&cx_c, |tree, _| {
1527 tree.paths()
1528 .map(|p| p.to_string_lossy())
1529 .collect::<Vec<_>>()
1530 == [".zed.toml", "file1-renamed", "file3", "file4"]
1531 })
1532 .await;
1533
1534 // Ensure buffer files are updated as well.
1535 buffer_a
1536 .condition(&cx_a, |buf, _| {
1537 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1538 })
1539 .await;
1540 buffer_b
1541 .condition(&cx_b, |buf, _| {
1542 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1543 })
1544 .await;
1545 buffer_c
1546 .condition(&cx_c, |buf, _| {
1547 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1548 })
1549 .await;
1550 }
1551
1552 #[gpui::test(iterations = 10)]
1553 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1554 cx_a.foreground().forbid_parking();
1555 let lang_registry = Arc::new(LanguageRegistry::new());
1556 let fs = Arc::new(FakeFs::new(cx_a.background()));
1557
1558 // Connect to a server as 2 clients.
1559 let mut server = TestServer::start(cx_a.foreground()).await;
1560 let client_a = server.create_client(&mut cx_a, "user_a").await;
1561 let client_b = server.create_client(&mut cx_b, "user_b").await;
1562
1563 // Share a project as client A
1564 fs.insert_tree(
1565 "/dir",
1566 json!({
1567 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1568 "a.txt": "a-contents",
1569 }),
1570 )
1571 .await;
1572
1573 let project_a = cx_a.update(|cx| {
1574 Project::local(
1575 client_a.clone(),
1576 client_a.user_store.clone(),
1577 lang_registry.clone(),
1578 fs.clone(),
1579 cx,
1580 )
1581 });
1582 let (worktree_a, _) = project_a
1583 .update(&mut cx_a, |p, cx| {
1584 p.find_or_create_local_worktree("/dir", false, cx)
1585 })
1586 .await
1587 .unwrap();
1588 worktree_a
1589 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1590 .await;
1591 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1592 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1593 project_a
1594 .update(&mut cx_a, |p, cx| p.share(cx))
1595 .await
1596 .unwrap();
1597
1598 // Join that project as client B
1599 let project_b = Project::remote(
1600 project_id,
1601 client_b.clone(),
1602 client_b.user_store.clone(),
1603 lang_registry.clone(),
1604 fs.clone(),
1605 &mut cx_b.to_async(),
1606 )
1607 .await
1608 .unwrap();
1609
1610 // Open a buffer as client B
1611 let buffer_b = project_b
1612 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1613 .await
1614 .unwrap();
1615
1616 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1617 buffer_b.read_with(&cx_b, |buf, _| {
1618 assert!(buf.is_dirty());
1619 assert!(!buf.has_conflict());
1620 });
1621
1622 buffer_b
1623 .update(&mut cx_b, |buf, cx| buf.save(cx))
1624 .await
1625 .unwrap();
1626 buffer_b
1627 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1628 .await;
1629 buffer_b.read_with(&cx_b, |buf, _| {
1630 assert!(!buf.has_conflict());
1631 });
1632
1633 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1634 buffer_b.read_with(&cx_b, |buf, _| {
1635 assert!(buf.is_dirty());
1636 assert!(!buf.has_conflict());
1637 });
1638 }
1639
1640 #[gpui::test(iterations = 10)]
1641 async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1642 cx_a.foreground().forbid_parking();
1643 let lang_registry = Arc::new(LanguageRegistry::new());
1644 let fs = Arc::new(FakeFs::new(cx_a.background()));
1645
1646 // Connect to a server as 2 clients.
1647 let mut server = TestServer::start(cx_a.foreground()).await;
1648 let client_a = server.create_client(&mut cx_a, "user_a").await;
1649 let client_b = server.create_client(&mut cx_b, "user_b").await;
1650
1651 // Share a project as client A
1652 fs.insert_tree(
1653 "/dir",
1654 json!({
1655 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1656 "a.txt": "a-contents",
1657 }),
1658 )
1659 .await;
1660
1661 let project_a = cx_a.update(|cx| {
1662 Project::local(
1663 client_a.clone(),
1664 client_a.user_store.clone(),
1665 lang_registry.clone(),
1666 fs.clone(),
1667 cx,
1668 )
1669 });
1670 let (worktree_a, _) = project_a
1671 .update(&mut cx_a, |p, cx| {
1672 p.find_or_create_local_worktree("/dir", false, cx)
1673 })
1674 .await
1675 .unwrap();
1676 worktree_a
1677 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1678 .await;
1679 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1680 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1681 project_a
1682 .update(&mut cx_a, |p, cx| p.share(cx))
1683 .await
1684 .unwrap();
1685
1686 // Join that project as client B
1687 let project_b = Project::remote(
1688 project_id,
1689 client_b.clone(),
1690 client_b.user_store.clone(),
1691 lang_registry.clone(),
1692 fs.clone(),
1693 &mut cx_b.to_async(),
1694 )
1695 .await
1696 .unwrap();
1697 let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1698
1699 // Open a buffer as client B
1700 let buffer_b = project_b
1701 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1702 .await
1703 .unwrap();
1704 buffer_b.read_with(&cx_b, |buf, _| {
1705 assert!(!buf.is_dirty());
1706 assert!(!buf.has_conflict());
1707 });
1708
1709 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1710 .await
1711 .unwrap();
1712 buffer_b
1713 .condition(&cx_b, |buf, _| {
1714 buf.text() == "new contents" && !buf.is_dirty()
1715 })
1716 .await;
1717 buffer_b.read_with(&cx_b, |buf, _| {
1718 assert!(!buf.has_conflict());
1719 });
1720 }
1721
1722 #[gpui::test(iterations = 10)]
1723 async fn test_editing_while_guest_opens_buffer(
1724 mut cx_a: TestAppContext,
1725 mut cx_b: TestAppContext,
1726 ) {
1727 cx_a.foreground().forbid_parking();
1728 let lang_registry = Arc::new(LanguageRegistry::new());
1729 let fs = Arc::new(FakeFs::new(cx_a.background()));
1730
1731 // Connect to a server as 2 clients.
1732 let mut server = TestServer::start(cx_a.foreground()).await;
1733 let client_a = server.create_client(&mut cx_a, "user_a").await;
1734 let client_b = server.create_client(&mut cx_b, "user_b").await;
1735
1736 // Share a project as client A
1737 fs.insert_tree(
1738 "/dir",
1739 json!({
1740 ".zed.toml": r#"collaborators = ["user_b"]"#,
1741 "a.txt": "a-contents",
1742 }),
1743 )
1744 .await;
1745 let project_a = cx_a.update(|cx| {
1746 Project::local(
1747 client_a.clone(),
1748 client_a.user_store.clone(),
1749 lang_registry.clone(),
1750 fs.clone(),
1751 cx,
1752 )
1753 });
1754 let (worktree_a, _) = project_a
1755 .update(&mut cx_a, |p, cx| {
1756 p.find_or_create_local_worktree("/dir", false, cx)
1757 })
1758 .await
1759 .unwrap();
1760 worktree_a
1761 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1762 .await;
1763 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1764 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1765 project_a
1766 .update(&mut cx_a, |p, cx| p.share(cx))
1767 .await
1768 .unwrap();
1769
1770 // Join that project as client B
1771 let project_b = Project::remote(
1772 project_id,
1773 client_b.clone(),
1774 client_b.user_store.clone(),
1775 lang_registry.clone(),
1776 fs.clone(),
1777 &mut cx_b.to_async(),
1778 )
1779 .await
1780 .unwrap();
1781
1782 // Open a buffer as client A
1783 let buffer_a = project_a
1784 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1785 .await
1786 .unwrap();
1787
1788 // Start opening the same buffer as client B
1789 let buffer_b = cx_b
1790 .background()
1791 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1792
1793 // Edit the buffer as client A while client B is still opening it.
1794 cx_b.background().simulate_random_delay().await;
1795 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1796 cx_b.background().simulate_random_delay().await;
1797 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1798
1799 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1800 let buffer_b = buffer_b.await.unwrap();
1801 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1802 }
1803
1804 #[gpui::test(iterations = 10)]
1805 async fn test_leaving_worktree_while_opening_buffer(
1806 mut cx_a: TestAppContext,
1807 mut cx_b: TestAppContext,
1808 ) {
1809 cx_a.foreground().forbid_parking();
1810 let lang_registry = Arc::new(LanguageRegistry::new());
1811 let fs = Arc::new(FakeFs::new(cx_a.background()));
1812
1813 // Connect to a server as 2 clients.
1814 let mut server = TestServer::start(cx_a.foreground()).await;
1815 let client_a = server.create_client(&mut cx_a, "user_a").await;
1816 let client_b = server.create_client(&mut cx_b, "user_b").await;
1817
1818 // Share a project as client A
1819 fs.insert_tree(
1820 "/dir",
1821 json!({
1822 ".zed.toml": r#"collaborators = ["user_b"]"#,
1823 "a.txt": "a-contents",
1824 }),
1825 )
1826 .await;
1827 let project_a = cx_a.update(|cx| {
1828 Project::local(
1829 client_a.clone(),
1830 client_a.user_store.clone(),
1831 lang_registry.clone(),
1832 fs.clone(),
1833 cx,
1834 )
1835 });
1836 let (worktree_a, _) = project_a
1837 .update(&mut cx_a, |p, cx| {
1838 p.find_or_create_local_worktree("/dir", false, cx)
1839 })
1840 .await
1841 .unwrap();
1842 worktree_a
1843 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1844 .await;
1845 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1846 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1847 project_a
1848 .update(&mut cx_a, |p, cx| p.share(cx))
1849 .await
1850 .unwrap();
1851
1852 // Join that project as client B
1853 let project_b = Project::remote(
1854 project_id,
1855 client_b.clone(),
1856 client_b.user_store.clone(),
1857 lang_registry.clone(),
1858 fs.clone(),
1859 &mut cx_b.to_async(),
1860 )
1861 .await
1862 .unwrap();
1863
1864 // See that a guest has joined as client A.
1865 project_a
1866 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1867 .await;
1868
1869 // Begin opening a buffer as client B, but leave the project before the open completes.
1870 let buffer_b = cx_b
1871 .background()
1872 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1873 cx_b.update(|_| drop(project_b));
1874 drop(buffer_b);
1875
1876 // See that the guest has left.
1877 project_a
1878 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1879 .await;
1880 }
1881
1882 #[gpui::test(iterations = 10)]
1883 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1884 cx_a.foreground().forbid_parking();
1885 let lang_registry = Arc::new(LanguageRegistry::new());
1886 let fs = Arc::new(FakeFs::new(cx_a.background()));
1887
1888 // Connect to a server as 2 clients.
1889 let mut server = TestServer::start(cx_a.foreground()).await;
1890 let client_a = server.create_client(&mut cx_a, "user_a").await;
1891 let client_b = server.create_client(&mut cx_b, "user_b").await;
1892
1893 // Share a project as client A
1894 fs.insert_tree(
1895 "/a",
1896 json!({
1897 ".zed.toml": r#"collaborators = ["user_b"]"#,
1898 "a.txt": "a-contents",
1899 "b.txt": "b-contents",
1900 }),
1901 )
1902 .await;
1903 let project_a = cx_a.update(|cx| {
1904 Project::local(
1905 client_a.clone(),
1906 client_a.user_store.clone(),
1907 lang_registry.clone(),
1908 fs.clone(),
1909 cx,
1910 )
1911 });
1912 let (worktree_a, _) = project_a
1913 .update(&mut cx_a, |p, cx| {
1914 p.find_or_create_local_worktree("/a", false, cx)
1915 })
1916 .await
1917 .unwrap();
1918 worktree_a
1919 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1920 .await;
1921 let project_id = project_a
1922 .update(&mut cx_a, |project, _| project.next_remote_id())
1923 .await;
1924 project_a
1925 .update(&mut cx_a, |project, cx| project.share(cx))
1926 .await
1927 .unwrap();
1928
1929 // Join that project as client B
1930 let _project_b = Project::remote(
1931 project_id,
1932 client_b.clone(),
1933 client_b.user_store.clone(),
1934 lang_registry.clone(),
1935 fs.clone(),
1936 &mut cx_b.to_async(),
1937 )
1938 .await
1939 .unwrap();
1940
1941 // See that a guest has joined as client A.
1942 project_a
1943 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1944 .await;
1945
1946 // Drop client B's connection and ensure client A observes client B leaving the worktree.
1947 client_b.disconnect(&cx_b.to_async()).unwrap();
1948 project_a
1949 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1950 .await;
1951 }
1952
1953 #[gpui::test(iterations = 10)]
1954 async fn test_collaborating_with_diagnostics(
1955 mut cx_a: TestAppContext,
1956 mut cx_b: TestAppContext,
1957 ) {
1958 cx_a.foreground().forbid_parking();
1959 let mut lang_registry = Arc::new(LanguageRegistry::new());
1960 let fs = Arc::new(FakeFs::new(cx_a.background()));
1961
1962 // Set up a fake language server.
1963 let (language_server_config, mut fake_language_server) =
1964 LanguageServerConfig::fake(&cx_a).await;
1965 Arc::get_mut(&mut lang_registry)
1966 .unwrap()
1967 .add(Arc::new(Language::new(
1968 LanguageConfig {
1969 name: "Rust".to_string(),
1970 path_suffixes: vec!["rs".to_string()],
1971 language_server: Some(language_server_config),
1972 ..Default::default()
1973 },
1974 Some(tree_sitter_rust::language()),
1975 )));
1976
1977 // Connect to a server as 2 clients.
1978 let mut server = TestServer::start(cx_a.foreground()).await;
1979 let client_a = server.create_client(&mut cx_a, "user_a").await;
1980 let client_b = server.create_client(&mut cx_b, "user_b").await;
1981
1982 // Share a project as client A
1983 fs.insert_tree(
1984 "/a",
1985 json!({
1986 ".zed.toml": r#"collaborators = ["user_b"]"#,
1987 "a.rs": "let one = two",
1988 "other.rs": "",
1989 }),
1990 )
1991 .await;
1992 let project_a = cx_a.update(|cx| {
1993 Project::local(
1994 client_a.clone(),
1995 client_a.user_store.clone(),
1996 lang_registry.clone(),
1997 fs.clone(),
1998 cx,
1999 )
2000 });
2001 let (worktree_a, _) = project_a
2002 .update(&mut cx_a, |p, cx| {
2003 p.find_or_create_local_worktree("/a", false, cx)
2004 })
2005 .await
2006 .unwrap();
2007 worktree_a
2008 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2009 .await;
2010 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2011 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2012 project_a
2013 .update(&mut cx_a, |p, cx| p.share(cx))
2014 .await
2015 .unwrap();
2016
2017 // Cause the language server to start.
2018 let _ = cx_a
2019 .background()
2020 .spawn(project_a.update(&mut cx_a, |project, cx| {
2021 project.open_buffer(
2022 ProjectPath {
2023 worktree_id,
2024 path: Path::new("other.rs").into(),
2025 },
2026 cx,
2027 )
2028 }))
2029 .await
2030 .unwrap();
2031
2032 // Simulate a language server reporting errors for a file.
2033 fake_language_server
2034 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2035 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2036 version: None,
2037 diagnostics: vec![lsp::Diagnostic {
2038 severity: Some(lsp::DiagnosticSeverity::ERROR),
2039 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2040 message: "message 1".to_string(),
2041 ..Default::default()
2042 }],
2043 })
2044 .await;
2045
2046 // Wait for server to see the diagnostics update.
2047 server
2048 .condition(|store| {
2049 let worktree = store
2050 .project(project_id)
2051 .unwrap()
2052 .worktrees
2053 .get(&worktree_id.to_proto())
2054 .unwrap();
2055
2056 !worktree
2057 .share
2058 .as_ref()
2059 .unwrap()
2060 .diagnostic_summaries
2061 .is_empty()
2062 })
2063 .await;
2064
2065 // Join the worktree as client B.
2066 let project_b = Project::remote(
2067 project_id,
2068 client_b.clone(),
2069 client_b.user_store.clone(),
2070 lang_registry.clone(),
2071 fs.clone(),
2072 &mut cx_b.to_async(),
2073 )
2074 .await
2075 .unwrap();
2076
2077 project_b.read_with(&cx_b, |project, cx| {
2078 assert_eq!(
2079 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2080 &[(
2081 ProjectPath {
2082 worktree_id,
2083 path: Arc::from(Path::new("a.rs")),
2084 },
2085 DiagnosticSummary {
2086 error_count: 1,
2087 warning_count: 0,
2088 ..Default::default()
2089 },
2090 )]
2091 )
2092 });
2093
2094 // Simulate a language server reporting more errors for a file.
2095 fake_language_server
2096 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2097 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2098 version: None,
2099 diagnostics: vec![
2100 lsp::Diagnostic {
2101 severity: Some(lsp::DiagnosticSeverity::ERROR),
2102 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2103 message: "message 1".to_string(),
2104 ..Default::default()
2105 },
2106 lsp::Diagnostic {
2107 severity: Some(lsp::DiagnosticSeverity::WARNING),
2108 range: lsp::Range::new(
2109 lsp::Position::new(0, 10),
2110 lsp::Position::new(0, 13),
2111 ),
2112 message: "message 2".to_string(),
2113 ..Default::default()
2114 },
2115 ],
2116 })
2117 .await;
2118
2119 // Client b gets the updated summaries
2120 project_b
2121 .condition(&cx_b, |project, cx| {
2122 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2123 == &[(
2124 ProjectPath {
2125 worktree_id,
2126 path: Arc::from(Path::new("a.rs")),
2127 },
2128 DiagnosticSummary {
2129 error_count: 1,
2130 warning_count: 1,
2131 ..Default::default()
2132 },
2133 )]
2134 })
2135 .await;
2136
2137 // Open the file with the errors on client B. They should be present.
2138 let buffer_b = cx_b
2139 .background()
2140 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2141 .await
2142 .unwrap();
2143
2144 buffer_b.read_with(&cx_b, |buffer, _| {
2145 assert_eq!(
2146 buffer
2147 .snapshot()
2148 .diagnostics_in_range::<_, Point>(0..buffer.len())
2149 .map(|entry| entry)
2150 .collect::<Vec<_>>(),
2151 &[
2152 DiagnosticEntry {
2153 range: Point::new(0, 4)..Point::new(0, 7),
2154 diagnostic: Diagnostic {
2155 group_id: 0,
2156 message: "message 1".to_string(),
2157 severity: lsp::DiagnosticSeverity::ERROR,
2158 is_primary: true,
2159 ..Default::default()
2160 }
2161 },
2162 DiagnosticEntry {
2163 range: Point::new(0, 10)..Point::new(0, 13),
2164 diagnostic: Diagnostic {
2165 group_id: 1,
2166 severity: lsp::DiagnosticSeverity::WARNING,
2167 message: "message 2".to_string(),
2168 is_primary: true,
2169 ..Default::default()
2170 }
2171 }
2172 ]
2173 );
2174 });
2175 }
2176
2177 #[gpui::test(iterations = 10)]
2178 async fn test_collaborating_with_completion(
2179 mut cx_a: TestAppContext,
2180 mut cx_b: TestAppContext,
2181 ) {
2182 cx_a.foreground().forbid_parking();
2183 let mut lang_registry = Arc::new(LanguageRegistry::new());
2184 let fs = Arc::new(FakeFs::new(cx_a.background()));
2185
2186 // Set up a fake language server.
2187 let (language_server_config, mut fake_language_server) =
2188 LanguageServerConfig::fake_with_capabilities(
2189 lsp::ServerCapabilities {
2190 completion_provider: Some(lsp::CompletionOptions {
2191 trigger_characters: Some(vec![".".to_string()]),
2192 ..Default::default()
2193 }),
2194 ..Default::default()
2195 },
2196 &cx_a,
2197 )
2198 .await;
2199 Arc::get_mut(&mut lang_registry)
2200 .unwrap()
2201 .add(Arc::new(Language::new(
2202 LanguageConfig {
2203 name: "Rust".to_string(),
2204 path_suffixes: vec!["rs".to_string()],
2205 language_server: Some(language_server_config),
2206 ..Default::default()
2207 },
2208 Some(tree_sitter_rust::language()),
2209 )));
2210
2211 // Connect to a server as 2 clients.
2212 let mut server = TestServer::start(cx_a.foreground()).await;
2213 let client_a = server.create_client(&mut cx_a, "user_a").await;
2214 let client_b = server.create_client(&mut cx_b, "user_b").await;
2215
2216 // Share a project as client A
2217 fs.insert_tree(
2218 "/a",
2219 json!({
2220 ".zed.toml": r#"collaborators = ["user_b"]"#,
2221 "main.rs": "fn main() { a }",
2222 "other.rs": "",
2223 }),
2224 )
2225 .await;
2226 let project_a = cx_a.update(|cx| {
2227 Project::local(
2228 client_a.clone(),
2229 client_a.user_store.clone(),
2230 lang_registry.clone(),
2231 fs.clone(),
2232 cx,
2233 )
2234 });
2235 let (worktree_a, _) = project_a
2236 .update(&mut cx_a, |p, cx| {
2237 p.find_or_create_local_worktree("/a", false, cx)
2238 })
2239 .await
2240 .unwrap();
2241 worktree_a
2242 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2243 .await;
2244 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2245 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2246 project_a
2247 .update(&mut cx_a, |p, cx| p.share(cx))
2248 .await
2249 .unwrap();
2250
2251 // Join the worktree as client B.
2252 let project_b = Project::remote(
2253 project_id,
2254 client_b.clone(),
2255 client_b.user_store.clone(),
2256 lang_registry.clone(),
2257 fs.clone(),
2258 &mut cx_b.to_async(),
2259 )
2260 .await
2261 .unwrap();
2262
2263 // Open a file in an editor as the guest.
2264 let buffer_b = project_b
2265 .update(&mut cx_b, |p, cx| {
2266 p.open_buffer((worktree_id, "main.rs"), cx)
2267 })
2268 .await
2269 .unwrap();
2270 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2271 let editor_b = cx_b.add_view(window_b, |cx| {
2272 Editor::for_buffer(
2273 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2274 Arc::new(|cx| EditorSettings::test(cx)),
2275 Some(project_b.clone()),
2276 cx,
2277 )
2278 });
2279
2280 // Type a completion trigger character as the guest.
2281 editor_b.update(&mut cx_b, |editor, cx| {
2282 editor.select_ranges([13..13], None, cx);
2283 editor.handle_input(&Input(".".into()), cx);
2284 cx.focus(&editor_b);
2285 });
2286
2287 // Receive a completion request as the host's language server.
2288 // Return some completions from the host's language server.
2289 fake_language_server.handle_request::<lsp::request::Completion, _>(|params| {
2290 assert_eq!(
2291 params.text_document_position.text_document.uri,
2292 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2293 );
2294 assert_eq!(
2295 params.text_document_position.position,
2296 lsp::Position::new(0, 14),
2297 );
2298
2299 Some(lsp::CompletionResponse::Array(vec![
2300 lsp::CompletionItem {
2301 label: "first_method(…)".into(),
2302 detail: Some("fn(&mut self, B) -> C".into()),
2303 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2304 new_text: "first_method($1)".to_string(),
2305 range: lsp::Range::new(
2306 lsp::Position::new(0, 14),
2307 lsp::Position::new(0, 14),
2308 ),
2309 })),
2310 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2311 ..Default::default()
2312 },
2313 lsp::CompletionItem {
2314 label: "second_method(…)".into(),
2315 detail: Some("fn(&mut self, C) -> D<E>".into()),
2316 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2317 new_text: "second_method()".to_string(),
2318 range: lsp::Range::new(
2319 lsp::Position::new(0, 14),
2320 lsp::Position::new(0, 14),
2321 ),
2322 })),
2323 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2324 ..Default::default()
2325 },
2326 ]))
2327 });
2328
2329 // Open the buffer on the host.
2330 let buffer_a = project_a
2331 .update(&mut cx_a, |p, cx| {
2332 p.open_buffer((worktree_id, "main.rs"), cx)
2333 })
2334 .await
2335 .unwrap();
2336 buffer_a
2337 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2338 .await;
2339
2340 // Confirm a completion on the guest.
2341 editor_b
2342 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2343 .await;
2344 editor_b.update(&mut cx_b, |editor, cx| {
2345 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2346 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2347 });
2348
2349 // Return a resolved completion from the host's language server.
2350 // The resolved completion has an additional text edit.
2351 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2352 assert_eq!(params.label, "first_method(…)");
2353 lsp::CompletionItem {
2354 label: "first_method(…)".into(),
2355 detail: Some("fn(&mut self, B) -> C".into()),
2356 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2357 new_text: "first_method($1)".to_string(),
2358 range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2359 })),
2360 additional_text_edits: Some(vec![lsp::TextEdit {
2361 new_text: "use d::SomeTrait;\n".to_string(),
2362 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2363 }]),
2364 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2365 ..Default::default()
2366 }
2367 });
2368
2369 // The additional edit is applied.
2370 buffer_a
2371 .condition(&cx_a, |buffer, _| {
2372 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2373 })
2374 .await;
2375 buffer_b
2376 .condition(&cx_b, |buffer, _| {
2377 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2378 })
2379 .await;
2380 }
2381
2382 #[gpui::test(iterations = 10)]
2383 async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2384 cx_a.foreground().forbid_parking();
2385 let mut lang_registry = Arc::new(LanguageRegistry::new());
2386 let fs = Arc::new(FakeFs::new(cx_a.background()));
2387
2388 // Set up a fake language server.
2389 let (language_server_config, mut fake_language_server) =
2390 LanguageServerConfig::fake(&cx_a).await;
2391 Arc::get_mut(&mut lang_registry)
2392 .unwrap()
2393 .add(Arc::new(Language::new(
2394 LanguageConfig {
2395 name: "Rust".to_string(),
2396 path_suffixes: vec!["rs".to_string()],
2397 language_server: Some(language_server_config),
2398 ..Default::default()
2399 },
2400 Some(tree_sitter_rust::language()),
2401 )));
2402
2403 // Connect to a server as 2 clients.
2404 let mut server = TestServer::start(cx_a.foreground()).await;
2405 let client_a = server.create_client(&mut cx_a, "user_a").await;
2406 let client_b = server.create_client(&mut cx_b, "user_b").await;
2407
2408 // Share a project as client A
2409 fs.insert_tree(
2410 "/a",
2411 json!({
2412 ".zed.toml": r#"collaborators = ["user_b"]"#,
2413 "a.rs": "let one = two",
2414 }),
2415 )
2416 .await;
2417 let project_a = cx_a.update(|cx| {
2418 Project::local(
2419 client_a.clone(),
2420 client_a.user_store.clone(),
2421 lang_registry.clone(),
2422 fs.clone(),
2423 cx,
2424 )
2425 });
2426 let (worktree_a, _) = project_a
2427 .update(&mut cx_a, |p, cx| {
2428 p.find_or_create_local_worktree("/a", false, cx)
2429 })
2430 .await
2431 .unwrap();
2432 worktree_a
2433 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2434 .await;
2435 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2436 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2437 project_a
2438 .update(&mut cx_a, |p, cx| p.share(cx))
2439 .await
2440 .unwrap();
2441
2442 // Join the worktree as client B.
2443 let project_b = Project::remote(
2444 project_id,
2445 client_b.clone(),
2446 client_b.user_store.clone(),
2447 lang_registry.clone(),
2448 fs.clone(),
2449 &mut cx_b.to_async(),
2450 )
2451 .await
2452 .unwrap();
2453
2454 let buffer_b = cx_b
2455 .background()
2456 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2457 .await
2458 .unwrap();
2459
2460 let format = project_b.update(&mut cx_b, |project, cx| {
2461 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2462 });
2463
2464 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2465 Some(vec![
2466 lsp::TextEdit {
2467 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2468 new_text: "h".to_string(),
2469 },
2470 lsp::TextEdit {
2471 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2472 new_text: "y".to_string(),
2473 },
2474 ])
2475 });
2476
2477 format.await.unwrap();
2478 assert_eq!(
2479 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2480 "let honey = two"
2481 );
2482 }
2483
2484 #[gpui::test(iterations = 10)]
2485 async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2486 cx_a.foreground().forbid_parking();
2487 let mut lang_registry = Arc::new(LanguageRegistry::new());
2488 let fs = Arc::new(FakeFs::new(cx_a.background()));
2489 fs.insert_tree(
2490 "/root-1",
2491 json!({
2492 ".zed.toml": r#"collaborators = ["user_b"]"#,
2493 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2494 }),
2495 )
2496 .await;
2497 fs.insert_tree(
2498 "/root-2",
2499 json!({
2500 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2501 }),
2502 )
2503 .await;
2504
2505 // Set up a fake language server.
2506 let (language_server_config, mut fake_language_server) =
2507 LanguageServerConfig::fake(&cx_a).await;
2508 Arc::get_mut(&mut lang_registry)
2509 .unwrap()
2510 .add(Arc::new(Language::new(
2511 LanguageConfig {
2512 name: "Rust".to_string(),
2513 path_suffixes: vec!["rs".to_string()],
2514 language_server: Some(language_server_config),
2515 ..Default::default()
2516 },
2517 Some(tree_sitter_rust::language()),
2518 )));
2519
2520 // Connect to a server as 2 clients.
2521 let mut server = TestServer::start(cx_a.foreground()).await;
2522 let client_a = server.create_client(&mut cx_a, "user_a").await;
2523 let client_b = server.create_client(&mut cx_b, "user_b").await;
2524
2525 // Share a project as client A
2526 let project_a = cx_a.update(|cx| {
2527 Project::local(
2528 client_a.clone(),
2529 client_a.user_store.clone(),
2530 lang_registry.clone(),
2531 fs.clone(),
2532 cx,
2533 )
2534 });
2535 let (worktree_a, _) = project_a
2536 .update(&mut cx_a, |p, cx| {
2537 p.find_or_create_local_worktree("/root-1", false, cx)
2538 })
2539 .await
2540 .unwrap();
2541 worktree_a
2542 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2543 .await;
2544 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2545 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2546 project_a
2547 .update(&mut cx_a, |p, cx| p.share(cx))
2548 .await
2549 .unwrap();
2550
2551 // Join the worktree as client B.
2552 let project_b = Project::remote(
2553 project_id,
2554 client_b.clone(),
2555 client_b.user_store.clone(),
2556 lang_registry.clone(),
2557 fs.clone(),
2558 &mut cx_b.to_async(),
2559 )
2560 .await
2561 .unwrap();
2562
2563 // Open the file on client B.
2564 let buffer_b = cx_b
2565 .background()
2566 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2567 .await
2568 .unwrap();
2569
2570 // Request the definition of a symbol as the guest.
2571 let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2572 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2573 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2574 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2575 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2576 )))
2577 });
2578
2579 let definitions_1 = definitions_1.await.unwrap();
2580 cx_b.read(|cx| {
2581 assert_eq!(definitions_1.len(), 1);
2582 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2583 let target_buffer = definitions_1[0].target_buffer.read(cx);
2584 assert_eq!(
2585 target_buffer.text(),
2586 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2587 );
2588 assert_eq!(
2589 definitions_1[0].target_range.to_point(target_buffer),
2590 Point::new(0, 6)..Point::new(0, 9)
2591 );
2592 });
2593
2594 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2595 // the previous call to `definition`.
2596 let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2597 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2598 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2599 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2600 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2601 )))
2602 });
2603
2604 let definitions_2 = definitions_2.await.unwrap();
2605 cx_b.read(|cx| {
2606 assert_eq!(definitions_2.len(), 1);
2607 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2608 let target_buffer = definitions_2[0].target_buffer.read(cx);
2609 assert_eq!(
2610 target_buffer.text(),
2611 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2612 );
2613 assert_eq!(
2614 definitions_2[0].target_range.to_point(target_buffer),
2615 Point::new(1, 6)..Point::new(1, 11)
2616 );
2617 });
2618 assert_eq!(
2619 definitions_1[0].target_buffer,
2620 definitions_2[0].target_buffer
2621 );
2622
2623 cx_b.update(|_| {
2624 drop(definitions_1);
2625 drop(definitions_2);
2626 });
2627 project_b
2628 .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2629 .await;
2630 }
2631
2632 #[gpui::test(iterations = 10)]
2633 async fn test_open_buffer_while_getting_definition_pointing_to_it(
2634 mut cx_a: TestAppContext,
2635 mut cx_b: TestAppContext,
2636 mut rng: StdRng,
2637 ) {
2638 cx_a.foreground().forbid_parking();
2639 let mut lang_registry = Arc::new(LanguageRegistry::new());
2640 let fs = Arc::new(FakeFs::new(cx_a.background()));
2641 fs.insert_tree(
2642 "/root",
2643 json!({
2644 ".zed.toml": r#"collaborators = ["user_b"]"#,
2645 "a.rs": "const ONE: usize = b::TWO;",
2646 "b.rs": "const TWO: usize = 2",
2647 }),
2648 )
2649 .await;
2650
2651 // Set up a fake language server.
2652 let (language_server_config, mut fake_language_server) =
2653 LanguageServerConfig::fake(&cx_a).await;
2654
2655 Arc::get_mut(&mut lang_registry)
2656 .unwrap()
2657 .add(Arc::new(Language::new(
2658 LanguageConfig {
2659 name: "Rust".to_string(),
2660 path_suffixes: vec!["rs".to_string()],
2661 language_server: Some(language_server_config),
2662 ..Default::default()
2663 },
2664 Some(tree_sitter_rust::language()),
2665 )));
2666
2667 // Connect to a server as 2 clients.
2668 let mut server = TestServer::start(cx_a.foreground()).await;
2669 let client_a = server.create_client(&mut cx_a, "user_a").await;
2670 let client_b = server.create_client(&mut cx_b, "user_b").await;
2671
2672 // Share a project as client A
2673 let project_a = cx_a.update(|cx| {
2674 Project::local(
2675 client_a.clone(),
2676 client_a.user_store.clone(),
2677 lang_registry.clone(),
2678 fs.clone(),
2679 cx,
2680 )
2681 });
2682
2683 let (worktree_a, _) = project_a
2684 .update(&mut cx_a, |p, cx| {
2685 p.find_or_create_local_worktree("/root", false, cx)
2686 })
2687 .await
2688 .unwrap();
2689 worktree_a
2690 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2691 .await;
2692 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2693 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2694 project_a
2695 .update(&mut cx_a, |p, cx| p.share(cx))
2696 .await
2697 .unwrap();
2698
2699 // Join the worktree as client B.
2700 let project_b = Project::remote(
2701 project_id,
2702 client_b.clone(),
2703 client_b.user_store.clone(),
2704 lang_registry.clone(),
2705 fs.clone(),
2706 &mut cx_b.to_async(),
2707 )
2708 .await
2709 .unwrap();
2710
2711 let buffer_b1 = cx_b
2712 .background()
2713 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2714 .await
2715 .unwrap();
2716
2717 let definitions;
2718 let buffer_b2;
2719 if rng.gen() {
2720 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2721 buffer_b2 =
2722 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2723 } else {
2724 buffer_b2 =
2725 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2726 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2727 }
2728
2729 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2730 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2731 lsp::Url::from_file_path("/root/b.rs").unwrap(),
2732 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2733 )))
2734 });
2735
2736 let buffer_b2 = buffer_b2.await.unwrap();
2737 let definitions = definitions.await.unwrap();
2738 assert_eq!(definitions.len(), 1);
2739 assert_eq!(definitions[0].target_buffer, buffer_b2);
2740 }
2741
2742 #[gpui::test(iterations = 10)]
2743 async fn test_collaborating_with_code_actions(
2744 mut cx_a: TestAppContext,
2745 mut cx_b: TestAppContext,
2746 ) {
2747 cx_a.foreground().forbid_parking();
2748 let mut lang_registry = Arc::new(LanguageRegistry::new());
2749 let fs = Arc::new(FakeFs::new(cx_a.background()));
2750 let mut path_openers_b = Vec::new();
2751 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2752
2753 // Set up a fake language server.
2754 let (language_server_config, mut fake_language_server) =
2755 LanguageServerConfig::fake_with_capabilities(
2756 lsp::ServerCapabilities {
2757 ..Default::default()
2758 },
2759 &cx_a,
2760 )
2761 .await;
2762 Arc::get_mut(&mut lang_registry)
2763 .unwrap()
2764 .add(Arc::new(Language::new(
2765 LanguageConfig {
2766 name: "Rust".to_string(),
2767 path_suffixes: vec!["rs".to_string()],
2768 language_server: Some(language_server_config),
2769 ..Default::default()
2770 },
2771 Some(tree_sitter_rust::language()),
2772 )));
2773
2774 // Connect to a server as 2 clients.
2775 let mut server = TestServer::start(cx_a.foreground()).await;
2776 let client_a = server.create_client(&mut cx_a, "user_a").await;
2777 let client_b = server.create_client(&mut cx_b, "user_b").await;
2778
2779 // Share a project as client A
2780 fs.insert_tree(
2781 "/a",
2782 json!({
2783 ".zed.toml": r#"collaborators = ["user_b"]"#,
2784 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2785 "other.rs": "pub fn foo() -> usize { 4 }",
2786 }),
2787 )
2788 .await;
2789 let project_a = cx_a.update(|cx| {
2790 Project::local(
2791 client_a.clone(),
2792 client_a.user_store.clone(),
2793 lang_registry.clone(),
2794 fs.clone(),
2795 cx,
2796 )
2797 });
2798 let (worktree_a, _) = project_a
2799 .update(&mut cx_a, |p, cx| {
2800 p.find_or_create_local_worktree("/a", false, cx)
2801 })
2802 .await
2803 .unwrap();
2804 worktree_a
2805 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2806 .await;
2807 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2808 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2809 project_a
2810 .update(&mut cx_a, |p, cx| p.share(cx))
2811 .await
2812 .unwrap();
2813
2814 // Join the worktree as client B.
2815 let project_b = Project::remote(
2816 project_id,
2817 client_b.clone(),
2818 client_b.user_store.clone(),
2819 lang_registry.clone(),
2820 fs.clone(),
2821 &mut cx_b.to_async(),
2822 )
2823 .await
2824 .unwrap();
2825 let mut params = cx_b.update(WorkspaceParams::test);
2826 params.languages = lang_registry.clone();
2827 params.client = client_b.client.clone();
2828 params.user_store = client_b.user_store.clone();
2829 params.project = project_b;
2830 params.path_openers = path_openers_b.into();
2831
2832 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
2833 let editor_b = workspace_b
2834 .update(&mut cx_b, |workspace, cx| {
2835 workspace.open_path((worktree_id, "main.rs").into(), cx)
2836 })
2837 .await
2838 .unwrap()
2839 .downcast::<Editor>()
2840 .unwrap();
2841 fake_language_server
2842 .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2843 assert_eq!(
2844 params.text_document.uri,
2845 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2846 );
2847 assert_eq!(params.range.start, lsp::Position::new(0, 0));
2848 assert_eq!(params.range.end, lsp::Position::new(0, 0));
2849 None
2850 })
2851 .next()
2852 .await;
2853
2854 // Move cursor to a location that contains code actions.
2855 editor_b.update(&mut cx_b, |editor, cx| {
2856 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2857 cx.focus(&editor_b);
2858 });
2859 fake_language_server.handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2860 assert_eq!(
2861 params.text_document.uri,
2862 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2863 );
2864 assert_eq!(params.range.start, lsp::Position::new(1, 31));
2865 assert_eq!(params.range.end, lsp::Position::new(1, 31));
2866
2867 Some(vec![lsp::CodeActionOrCommand::CodeAction(
2868 lsp::CodeAction {
2869 title: "Inline into all callers".to_string(),
2870 edit: Some(lsp::WorkspaceEdit {
2871 changes: Some(
2872 [
2873 (
2874 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2875 vec![lsp::TextEdit::new(
2876 lsp::Range::new(
2877 lsp::Position::new(1, 22),
2878 lsp::Position::new(1, 34),
2879 ),
2880 "4".to_string(),
2881 )],
2882 ),
2883 (
2884 lsp::Url::from_file_path("/a/other.rs").unwrap(),
2885 vec![lsp::TextEdit::new(
2886 lsp::Range::new(
2887 lsp::Position::new(0, 0),
2888 lsp::Position::new(0, 27),
2889 ),
2890 "".to_string(),
2891 )],
2892 ),
2893 ]
2894 .into_iter()
2895 .collect(),
2896 ),
2897 ..Default::default()
2898 }),
2899 data: Some(json!({
2900 "codeActionParams": {
2901 "range": {
2902 "start": {"line": 1, "column": 31},
2903 "end": {"line": 1, "column": 31},
2904 }
2905 }
2906 })),
2907 ..Default::default()
2908 },
2909 )])
2910 });
2911
2912 // Toggle code actions and wait for them to display.
2913 editor_b.update(&mut cx_b, |editor, cx| {
2914 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2915 });
2916 editor_b
2917 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2918 .await;
2919
2920 // Confirming the code action will trigger a resolve request.
2921 let confirm_action = workspace_b
2922 .update(&mut cx_b, |workspace, cx| {
2923 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
2924 })
2925 .unwrap();
2926 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
2927 lsp::CodeAction {
2928 title: "Inline into all callers".to_string(),
2929 edit: Some(lsp::WorkspaceEdit {
2930 changes: Some(
2931 [
2932 (
2933 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2934 vec![lsp::TextEdit::new(
2935 lsp::Range::new(
2936 lsp::Position::new(1, 22),
2937 lsp::Position::new(1, 34),
2938 ),
2939 "4".to_string(),
2940 )],
2941 ),
2942 (
2943 lsp::Url::from_file_path("/a/other.rs").unwrap(),
2944 vec![lsp::TextEdit::new(
2945 lsp::Range::new(
2946 lsp::Position::new(0, 0),
2947 lsp::Position::new(0, 27),
2948 ),
2949 "".to_string(),
2950 )],
2951 ),
2952 ]
2953 .into_iter()
2954 .collect(),
2955 ),
2956 ..Default::default()
2957 }),
2958 ..Default::default()
2959 }
2960 });
2961
2962 // After the action is confirmed, an editor containing both modified files is opened.
2963 confirm_action.await.unwrap();
2964 let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
2965 workspace
2966 .active_item(cx)
2967 .unwrap()
2968 .downcast::<Editor>()
2969 .unwrap()
2970 });
2971 code_action_editor.update(&mut cx_b, |editor, cx| {
2972 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
2973 editor.undo(&Undo, cx);
2974 assert_eq!(
2975 editor.text(cx),
2976 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
2977 );
2978 editor.redo(&Redo, cx);
2979 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
2980 });
2981 }
2982
2983 #[gpui::test(iterations = 10)]
2984 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2985 cx_a.foreground().forbid_parking();
2986
2987 // Connect to a server as 2 clients.
2988 let mut server = TestServer::start(cx_a.foreground()).await;
2989 let client_a = server.create_client(&mut cx_a, "user_a").await;
2990 let client_b = server.create_client(&mut cx_b, "user_b").await;
2991
2992 // Create an org that includes these 2 users.
2993 let db = &server.app_state.db;
2994 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2995 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2996 .await
2997 .unwrap();
2998 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2999 .await
3000 .unwrap();
3001
3002 // Create a channel that includes all the users.
3003 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3004 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3005 .await
3006 .unwrap();
3007 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3008 .await
3009 .unwrap();
3010 db.create_channel_message(
3011 channel_id,
3012 client_b.current_user_id(&cx_b),
3013 "hello A, it's B.",
3014 OffsetDateTime::now_utc(),
3015 1,
3016 )
3017 .await
3018 .unwrap();
3019
3020 let channels_a = cx_a
3021 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3022 channels_a
3023 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3024 .await;
3025 channels_a.read_with(&cx_a, |list, _| {
3026 assert_eq!(
3027 list.available_channels().unwrap(),
3028 &[ChannelDetails {
3029 id: channel_id.to_proto(),
3030 name: "test-channel".to_string()
3031 }]
3032 )
3033 });
3034 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3035 this.get_channel(channel_id.to_proto(), cx).unwrap()
3036 });
3037 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3038 channel_a
3039 .condition(&cx_a, |channel, _| {
3040 channel_messages(channel)
3041 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3042 })
3043 .await;
3044
3045 let channels_b = cx_b
3046 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3047 channels_b
3048 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3049 .await;
3050 channels_b.read_with(&cx_b, |list, _| {
3051 assert_eq!(
3052 list.available_channels().unwrap(),
3053 &[ChannelDetails {
3054 id: channel_id.to_proto(),
3055 name: "test-channel".to_string()
3056 }]
3057 )
3058 });
3059
3060 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3061 this.get_channel(channel_id.to_proto(), cx).unwrap()
3062 });
3063 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3064 channel_b
3065 .condition(&cx_b, |channel, _| {
3066 channel_messages(channel)
3067 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3068 })
3069 .await;
3070
3071 channel_a
3072 .update(&mut cx_a, |channel, cx| {
3073 channel
3074 .send_message("oh, hi B.".to_string(), cx)
3075 .unwrap()
3076 .detach();
3077 let task = channel.send_message("sup".to_string(), cx).unwrap();
3078 assert_eq!(
3079 channel_messages(channel),
3080 &[
3081 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3082 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3083 ("user_a".to_string(), "sup".to_string(), true)
3084 ]
3085 );
3086 task
3087 })
3088 .await
3089 .unwrap();
3090
3091 channel_b
3092 .condition(&cx_b, |channel, _| {
3093 channel_messages(channel)
3094 == [
3095 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3096 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3097 ("user_a".to_string(), "sup".to_string(), false),
3098 ]
3099 })
3100 .await;
3101
3102 assert_eq!(
3103 server
3104 .state()
3105 .await
3106 .channel(channel_id)
3107 .unwrap()
3108 .connection_ids
3109 .len(),
3110 2
3111 );
3112 cx_b.update(|_| drop(channel_b));
3113 server
3114 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3115 .await;
3116
3117 cx_a.update(|_| drop(channel_a));
3118 server
3119 .condition(|state| state.channel(channel_id).is_none())
3120 .await;
3121 }
3122
3123 #[gpui::test(iterations = 10)]
3124 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3125 cx_a.foreground().forbid_parking();
3126
3127 let mut server = TestServer::start(cx_a.foreground()).await;
3128 let client_a = server.create_client(&mut cx_a, "user_a").await;
3129
3130 let db = &server.app_state.db;
3131 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3132 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3133 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3134 .await
3135 .unwrap();
3136 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3137 .await
3138 .unwrap();
3139
3140 let channels_a = cx_a
3141 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3142 channels_a
3143 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3144 .await;
3145 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3146 this.get_channel(channel_id.to_proto(), cx).unwrap()
3147 });
3148
3149 // Messages aren't allowed to be too long.
3150 channel_a
3151 .update(&mut cx_a, |channel, cx| {
3152 let long_body = "this is long.\n".repeat(1024);
3153 channel.send_message(long_body, cx).unwrap()
3154 })
3155 .await
3156 .unwrap_err();
3157
3158 // Messages aren't allowed to be blank.
3159 channel_a.update(&mut cx_a, |channel, cx| {
3160 channel.send_message(String::new(), cx).unwrap_err()
3161 });
3162
3163 // Leading and trailing whitespace are trimmed.
3164 channel_a
3165 .update(&mut cx_a, |channel, cx| {
3166 channel
3167 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3168 .unwrap()
3169 })
3170 .await
3171 .unwrap();
3172 assert_eq!(
3173 db.get_channel_messages(channel_id, 10, None)
3174 .await
3175 .unwrap()
3176 .iter()
3177 .map(|m| &m.body)
3178 .collect::<Vec<_>>(),
3179 &["surrounded by whitespace"]
3180 );
3181 }
3182
3183 #[gpui::test(iterations = 10)]
3184 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3185 cx_a.foreground().forbid_parking();
3186
3187 // Connect to a server as 2 clients.
3188 let mut server = TestServer::start(cx_a.foreground()).await;
3189 let client_a = server.create_client(&mut cx_a, "user_a").await;
3190 let client_b = server.create_client(&mut cx_b, "user_b").await;
3191 let mut status_b = client_b.status();
3192
3193 // Create an org that includes these 2 users.
3194 let db = &server.app_state.db;
3195 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3196 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3197 .await
3198 .unwrap();
3199 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3200 .await
3201 .unwrap();
3202
3203 // Create a channel that includes all the users.
3204 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3205 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3206 .await
3207 .unwrap();
3208 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3209 .await
3210 .unwrap();
3211 db.create_channel_message(
3212 channel_id,
3213 client_b.current_user_id(&cx_b),
3214 "hello A, it's B.",
3215 OffsetDateTime::now_utc(),
3216 2,
3217 )
3218 .await
3219 .unwrap();
3220
3221 let channels_a = cx_a
3222 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3223 channels_a
3224 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3225 .await;
3226
3227 channels_a.read_with(&cx_a, |list, _| {
3228 assert_eq!(
3229 list.available_channels().unwrap(),
3230 &[ChannelDetails {
3231 id: channel_id.to_proto(),
3232 name: "test-channel".to_string()
3233 }]
3234 )
3235 });
3236 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3237 this.get_channel(channel_id.to_proto(), cx).unwrap()
3238 });
3239 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3240 channel_a
3241 .condition(&cx_a, |channel, _| {
3242 channel_messages(channel)
3243 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3244 })
3245 .await;
3246
3247 let channels_b = cx_b
3248 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3249 channels_b
3250 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3251 .await;
3252 channels_b.read_with(&cx_b, |list, _| {
3253 assert_eq!(
3254 list.available_channels().unwrap(),
3255 &[ChannelDetails {
3256 id: channel_id.to_proto(),
3257 name: "test-channel".to_string()
3258 }]
3259 )
3260 });
3261
3262 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3263 this.get_channel(channel_id.to_proto(), cx).unwrap()
3264 });
3265 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3266 channel_b
3267 .condition(&cx_b, |channel, _| {
3268 channel_messages(channel)
3269 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3270 })
3271 .await;
3272
3273 // Disconnect client B, ensuring we can still access its cached channel data.
3274 server.forbid_connections();
3275 server.disconnect_client(client_b.current_user_id(&cx_b));
3276 while !matches!(
3277 status_b.next().await,
3278 Some(client::Status::ReconnectionError { .. })
3279 ) {}
3280
3281 channels_b.read_with(&cx_b, |channels, _| {
3282 assert_eq!(
3283 channels.available_channels().unwrap(),
3284 [ChannelDetails {
3285 id: channel_id.to_proto(),
3286 name: "test-channel".to_string()
3287 }]
3288 )
3289 });
3290 channel_b.read_with(&cx_b, |channel, _| {
3291 assert_eq!(
3292 channel_messages(channel),
3293 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3294 )
3295 });
3296
3297 // Send a message from client B while it is disconnected.
3298 channel_b
3299 .update(&mut cx_b, |channel, cx| {
3300 let task = channel
3301 .send_message("can you see this?".to_string(), cx)
3302 .unwrap();
3303 assert_eq!(
3304 channel_messages(channel),
3305 &[
3306 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3307 ("user_b".to_string(), "can you see this?".to_string(), true)
3308 ]
3309 );
3310 task
3311 })
3312 .await
3313 .unwrap_err();
3314
3315 // Send a message from client A while B is disconnected.
3316 channel_a
3317 .update(&mut cx_a, |channel, cx| {
3318 channel
3319 .send_message("oh, hi B.".to_string(), cx)
3320 .unwrap()
3321 .detach();
3322 let task = channel.send_message("sup".to_string(), cx).unwrap();
3323 assert_eq!(
3324 channel_messages(channel),
3325 &[
3326 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3327 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3328 ("user_a".to_string(), "sup".to_string(), true)
3329 ]
3330 );
3331 task
3332 })
3333 .await
3334 .unwrap();
3335
3336 // Give client B a chance to reconnect.
3337 server.allow_connections();
3338 cx_b.foreground().advance_clock(Duration::from_secs(10));
3339
3340 // Verify that B sees the new messages upon reconnection, as well as the message client B
3341 // sent while offline.
3342 channel_b
3343 .condition(&cx_b, |channel, _| {
3344 channel_messages(channel)
3345 == [
3346 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3347 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3348 ("user_a".to_string(), "sup".to_string(), false),
3349 ("user_b".to_string(), "can you see this?".to_string(), false),
3350 ]
3351 })
3352 .await;
3353
3354 // Ensure client A and B can communicate normally after reconnection.
3355 channel_a
3356 .update(&mut cx_a, |channel, cx| {
3357 channel.send_message("you online?".to_string(), cx).unwrap()
3358 })
3359 .await
3360 .unwrap();
3361 channel_b
3362 .condition(&cx_b, |channel, _| {
3363 channel_messages(channel)
3364 == [
3365 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3366 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3367 ("user_a".to_string(), "sup".to_string(), false),
3368 ("user_b".to_string(), "can you see this?".to_string(), false),
3369 ("user_a".to_string(), "you online?".to_string(), false),
3370 ]
3371 })
3372 .await;
3373
3374 channel_b
3375 .update(&mut cx_b, |channel, cx| {
3376 channel.send_message("yep".to_string(), cx).unwrap()
3377 })
3378 .await
3379 .unwrap();
3380 channel_a
3381 .condition(&cx_a, |channel, _| {
3382 channel_messages(channel)
3383 == [
3384 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3385 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3386 ("user_a".to_string(), "sup".to_string(), false),
3387 ("user_b".to_string(), "can you see this?".to_string(), false),
3388 ("user_a".to_string(), "you online?".to_string(), false),
3389 ("user_b".to_string(), "yep".to_string(), false),
3390 ]
3391 })
3392 .await;
3393 }
3394
3395 #[gpui::test(iterations = 10)]
3396 async fn test_contacts(
3397 mut cx_a: TestAppContext,
3398 mut cx_b: TestAppContext,
3399 mut cx_c: TestAppContext,
3400 ) {
3401 cx_a.foreground().forbid_parking();
3402 let lang_registry = Arc::new(LanguageRegistry::new());
3403 let fs = Arc::new(FakeFs::new(cx_a.background()));
3404
3405 // Connect to a server as 3 clients.
3406 let mut server = TestServer::start(cx_a.foreground()).await;
3407 let client_a = server.create_client(&mut cx_a, "user_a").await;
3408 let client_b = server.create_client(&mut cx_b, "user_b").await;
3409 let client_c = server.create_client(&mut cx_c, "user_c").await;
3410
3411 // Share a worktree as client A.
3412 fs.insert_tree(
3413 "/a",
3414 json!({
3415 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3416 }),
3417 )
3418 .await;
3419
3420 let project_a = cx_a.update(|cx| {
3421 Project::local(
3422 client_a.clone(),
3423 client_a.user_store.clone(),
3424 lang_registry.clone(),
3425 fs.clone(),
3426 cx,
3427 )
3428 });
3429 let (worktree_a, _) = project_a
3430 .update(&mut cx_a, |p, cx| {
3431 p.find_or_create_local_worktree("/a", false, cx)
3432 })
3433 .await
3434 .unwrap();
3435 worktree_a
3436 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3437 .await;
3438
3439 client_a
3440 .user_store
3441 .condition(&cx_a, |user_store, _| {
3442 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3443 })
3444 .await;
3445 client_b
3446 .user_store
3447 .condition(&cx_b, |user_store, _| {
3448 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3449 })
3450 .await;
3451 client_c
3452 .user_store
3453 .condition(&cx_c, |user_store, _| {
3454 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3455 })
3456 .await;
3457
3458 let project_id = project_a
3459 .update(&mut cx_a, |project, _| project.next_remote_id())
3460 .await;
3461 project_a
3462 .update(&mut cx_a, |project, cx| project.share(cx))
3463 .await
3464 .unwrap();
3465
3466 let _project_b = Project::remote(
3467 project_id,
3468 client_b.clone(),
3469 client_b.user_store.clone(),
3470 lang_registry.clone(),
3471 fs.clone(),
3472 &mut cx_b.to_async(),
3473 )
3474 .await
3475 .unwrap();
3476
3477 client_a
3478 .user_store
3479 .condition(&cx_a, |user_store, _| {
3480 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3481 })
3482 .await;
3483 client_b
3484 .user_store
3485 .condition(&cx_b, |user_store, _| {
3486 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3487 })
3488 .await;
3489 client_c
3490 .user_store
3491 .condition(&cx_c, |user_store, _| {
3492 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3493 })
3494 .await;
3495
3496 project_a
3497 .condition(&cx_a, |project, _| {
3498 project.collaborators().contains_key(&client_b.peer_id)
3499 })
3500 .await;
3501
3502 cx_a.update(move |_| drop(project_a));
3503 client_a
3504 .user_store
3505 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3506 .await;
3507 client_b
3508 .user_store
3509 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3510 .await;
3511 client_c
3512 .user_store
3513 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3514 .await;
3515
3516 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3517 user_store
3518 .contacts()
3519 .iter()
3520 .map(|contact| {
3521 let worktrees = contact
3522 .projects
3523 .iter()
3524 .map(|p| {
3525 (
3526 p.worktree_root_names[0].as_str(),
3527 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3528 )
3529 })
3530 .collect();
3531 (contact.user.github_login.as_str(), worktrees)
3532 })
3533 .collect()
3534 }
3535 }
3536
3537 #[gpui::test(iterations = 10)]
3538 async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
3539 cx.foreground().forbid_parking();
3540 let max_peers = env::var("MAX_PEERS")
3541 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
3542 .unwrap_or(5);
3543 let max_operations = env::var("OPERATIONS")
3544 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
3545 .unwrap_or(10);
3546
3547 let rng = Rc::new(RefCell::new(rng));
3548 let lang_registry = Arc::new(LanguageRegistry::new());
3549 let fs = Arc::new(FakeFs::new(cx.background()));
3550 fs.insert_tree(
3551 "/_collab",
3552 json!({
3553 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
3554 }),
3555 )
3556 .await;
3557
3558 let operations = Rc::new(Cell::new(0));
3559 let mut server = TestServer::start(cx.foreground()).await;
3560 let mut clients = Vec::new();
3561
3562 let mut next_entity_id = 100000;
3563 let mut host_cx = TestAppContext::new(
3564 cx.foreground_platform(),
3565 cx.platform(),
3566 cx.foreground(),
3567 cx.background(),
3568 cx.font_cache(),
3569 next_entity_id,
3570 );
3571 let host = server.create_client(&mut host_cx, "host").await;
3572 let host_project = host_cx.update(|cx| {
3573 Project::local(
3574 host.client.clone(),
3575 host.user_store.clone(),
3576 lang_registry.clone(),
3577 fs.clone(),
3578 cx,
3579 )
3580 });
3581 let host_project_id = host_project
3582 .update(&mut host_cx, |p, _| p.next_remote_id())
3583 .await;
3584
3585 let (collab_worktree, _) = host_project
3586 .update(&mut host_cx, |project, cx| {
3587 project.find_or_create_local_worktree("/_collab", false, cx)
3588 })
3589 .await
3590 .unwrap();
3591 collab_worktree
3592 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
3593 .await;
3594 host_project
3595 .update(&mut host_cx, |project, cx| project.share(cx))
3596 .await
3597 .unwrap();
3598
3599 clients.push(cx.foreground().spawn(host.simulate_host(
3600 host_project,
3601 operations.clone(),
3602 max_operations,
3603 rng.clone(),
3604 host_cx,
3605 )));
3606
3607 while operations.get() < max_operations {
3608 cx.background().simulate_random_delay().await;
3609 if clients.len() < max_peers && rng.borrow_mut().gen_bool(0.05) {
3610 operations.set(operations.get() + 1);
3611
3612 let guest_id = clients.len();
3613 log::info!("Adding guest {}", guest_id);
3614 next_entity_id += 100000;
3615 let mut guest_cx = TestAppContext::new(
3616 cx.foreground_platform(),
3617 cx.platform(),
3618 cx.foreground(),
3619 cx.background(),
3620 cx.font_cache(),
3621 next_entity_id,
3622 );
3623 let guest = server
3624 .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
3625 .await;
3626 let guest_project = Project::remote(
3627 host_project_id,
3628 guest.client.clone(),
3629 guest.user_store.clone(),
3630 lang_registry.clone(),
3631 fs.clone(),
3632 &mut guest_cx.to_async(),
3633 )
3634 .await
3635 .unwrap();
3636 clients.push(cx.foreground().spawn(guest.simulate_guest(
3637 guest_id,
3638 guest_project,
3639 operations.clone(),
3640 max_operations,
3641 rng.clone(),
3642 guest_cx,
3643 )));
3644
3645 log::info!("Guest {} added", guest_id);
3646 }
3647 }
3648
3649 let clients = futures::future::join_all(clients).await;
3650 for (ix, (client_a, cx_a)) in clients.iter().enumerate() {
3651 for buffer_a in &client_a.buffers {
3652 let buffer_id = buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id());
3653 for (client_b, cx_b) in &clients[ix + 1..] {
3654 if let Some(buffer_b) = client_b.buffers.iter().find(|buffer| {
3655 buffer.read_with(cx_b, |buffer, _| buffer.remote_id() == buffer_id)
3656 }) {
3657 assert_eq!(
3658 buffer_a.read_with(cx_a, |buffer, _| buffer.text()),
3659 buffer_b.read_with(cx_b, |buffer, _| buffer.text())
3660 );
3661 }
3662 }
3663 }
3664 }
3665 }
3666
3667 struct TestServer {
3668 peer: Arc<Peer>,
3669 app_state: Arc<AppState>,
3670 server: Arc<Server>,
3671 foreground: Rc<executor::Foreground>,
3672 notifications: mpsc::Receiver<()>,
3673 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3674 forbid_connections: Arc<AtomicBool>,
3675 _test_db: TestDb,
3676 }
3677
3678 impl TestServer {
3679 async fn start(foreground: Rc<executor::Foreground>) -> Self {
3680 let test_db = TestDb::new();
3681 let app_state = Self::build_app_state(&test_db).await;
3682 let peer = Peer::new();
3683 let notifications = mpsc::channel(128);
3684 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3685 Self {
3686 peer,
3687 app_state,
3688 server,
3689 foreground,
3690 notifications: notifications.1,
3691 connection_killers: Default::default(),
3692 forbid_connections: Default::default(),
3693 _test_db: test_db,
3694 }
3695 }
3696
3697 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3698 let http = FakeHttpClient::with_404_response();
3699 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3700 let client_name = name.to_string();
3701 let mut client = Client::new(http.clone());
3702 let server = self.server.clone();
3703 let connection_killers = self.connection_killers.clone();
3704 let forbid_connections = self.forbid_connections.clone();
3705 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3706
3707 Arc::get_mut(&mut client)
3708 .unwrap()
3709 .override_authenticate(move |cx| {
3710 cx.spawn(|_| async move {
3711 let access_token = "the-token".to_string();
3712 Ok(Credentials {
3713 user_id: user_id.0 as u64,
3714 access_token,
3715 })
3716 })
3717 })
3718 .override_establish_connection(move |credentials, cx| {
3719 assert_eq!(credentials.user_id, user_id.0 as u64);
3720 assert_eq!(credentials.access_token, "the-token");
3721
3722 let server = server.clone();
3723 let connection_killers = connection_killers.clone();
3724 let forbid_connections = forbid_connections.clone();
3725 let client_name = client_name.clone();
3726 let connection_id_tx = connection_id_tx.clone();
3727 cx.spawn(move |cx| async move {
3728 if forbid_connections.load(SeqCst) {
3729 Err(EstablishConnectionError::other(anyhow!(
3730 "server is forbidding connections"
3731 )))
3732 } else {
3733 let (client_conn, server_conn, kill_conn) =
3734 Connection::in_memory(cx.background());
3735 connection_killers.lock().insert(user_id, kill_conn);
3736 cx.background()
3737 .spawn(server.handle_connection(
3738 server_conn,
3739 client_name,
3740 user_id,
3741 Some(connection_id_tx),
3742 cx.background(),
3743 ))
3744 .detach();
3745 Ok(client_conn)
3746 }
3747 })
3748 });
3749
3750 client
3751 .authenticate_and_connect(&cx.to_async())
3752 .await
3753 .unwrap();
3754
3755 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3756 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3757 let mut authed_user =
3758 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3759 while authed_user.next().await.unwrap().is_none() {}
3760
3761 TestClient {
3762 client,
3763 peer_id,
3764 user_store,
3765 project: Default::default(),
3766 buffers: Default::default(),
3767 }
3768 }
3769
3770 fn disconnect_client(&self, user_id: UserId) {
3771 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3772 let _ = kill_conn.try_send(Some(()));
3773 }
3774 }
3775
3776 fn forbid_connections(&self) {
3777 self.forbid_connections.store(true, SeqCst);
3778 }
3779
3780 fn allow_connections(&self) {
3781 self.forbid_connections.store(false, SeqCst);
3782 }
3783
3784 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3785 let mut config = Config::default();
3786 config.session_secret = "a".repeat(32);
3787 config.database_url = test_db.url.clone();
3788 let github_client = github::AppClient::test();
3789 Arc::new(AppState {
3790 db: test_db.db().clone(),
3791 handlebars: Default::default(),
3792 auth_client: auth::build_client("", ""),
3793 repo_client: github::RepoClient::test(&github_client),
3794 github_client,
3795 config,
3796 })
3797 }
3798
3799 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3800 self.server.store.read()
3801 }
3802
3803 async fn condition<F>(&mut self, mut predicate: F)
3804 where
3805 F: FnMut(&Store) -> bool,
3806 {
3807 async_std::future::timeout(Duration::from_millis(500), async {
3808 while !(predicate)(&*self.server.store.read()) {
3809 self.foreground.start_waiting();
3810 self.notifications.next().await;
3811 self.foreground.finish_waiting();
3812 }
3813 })
3814 .await
3815 .expect("condition timed out");
3816 }
3817 }
3818
3819 impl Drop for TestServer {
3820 fn drop(&mut self) {
3821 self.peer.reset();
3822 }
3823 }
3824
3825 struct TestClient {
3826 client: Arc<Client>,
3827 pub peer_id: PeerId,
3828 pub user_store: ModelHandle<UserStore>,
3829 project: Option<ModelHandle<Project>>,
3830 buffers: HashSet<ModelHandle<zed::language::Buffer>>,
3831 }
3832
3833 impl Deref for TestClient {
3834 type Target = Arc<Client>;
3835
3836 fn deref(&self) -> &Self::Target {
3837 &self.client
3838 }
3839 }
3840
3841 impl TestClient {
3842 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3843 UserId::from_proto(
3844 self.user_store
3845 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3846 )
3847 }
3848
3849 async fn simulate_host(
3850 mut self,
3851 project: ModelHandle<Project>,
3852 operations: Rc<Cell<usize>>,
3853 max_operations: usize,
3854 rng: Rc<RefCell<StdRng>>,
3855 mut cx: TestAppContext,
3856 ) -> (Self, TestAppContext) {
3857 let fs = project.read_with(&cx, |project, _| project.fs().clone());
3858 let mut files: Vec<PathBuf> = Default::default();
3859 while operations.get() < max_operations {
3860 operations.set(operations.get() + 1);
3861
3862 let distribution = rng.borrow_mut().gen_range(0..100);
3863 match distribution {
3864 0..=20 if !files.is_empty() => {
3865 let mut path = files.choose(&mut *rng.borrow_mut()).unwrap().as_path();
3866 while let Some(parent_path) = path.parent() {
3867 path = parent_path;
3868 if rng.borrow_mut().gen() {
3869 break;
3870 }
3871 }
3872
3873 log::info!("Host: find/create local worktree {:?}", path);
3874 project
3875 .update(&mut cx, |project, cx| {
3876 project.find_or_create_local_worktree(path, false, cx)
3877 })
3878 .await
3879 .unwrap();
3880 }
3881 10..=80 if !files.is_empty() => {
3882 let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
3883 let file = files.choose(&mut *rng.borrow_mut()).unwrap();
3884 let (worktree, path) = project
3885 .update(&mut cx, |project, cx| {
3886 project.find_or_create_local_worktree(file, false, cx)
3887 })
3888 .await
3889 .unwrap();
3890 let project_path =
3891 worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
3892 log::info!("Host: opening path {:?}", project_path);
3893 let buffer = project
3894 .update(&mut cx, |project, cx| {
3895 project.open_buffer(project_path, cx)
3896 })
3897 .await
3898 .unwrap();
3899 self.buffers.insert(buffer.clone());
3900 buffer
3901 } else {
3902 self.buffers
3903 .iter()
3904 .choose(&mut *rng.borrow_mut())
3905 .unwrap()
3906 .clone()
3907 };
3908
3909 buffer.update(&mut cx, |buffer, cx| {
3910 log::info!(
3911 "Host: updating buffer {:?}",
3912 buffer.file().unwrap().full_path(cx)
3913 );
3914 buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
3915 });
3916 }
3917 _ => loop {
3918 let path_component_count = rng.borrow_mut().gen_range(1..=5);
3919 let mut path = PathBuf::new();
3920 path.push("/");
3921 for _ in 0..path_component_count {
3922 let letter = rng.borrow_mut().gen_range(b'a'..=b'z');
3923 path.push(std::str::from_utf8(&[letter]).unwrap());
3924 }
3925 let parent_path = path.parent().unwrap();
3926
3927 log::info!("Host: creating file {:?}", path);
3928 if fs.create_dir(&parent_path).await.is_ok()
3929 && fs.create_file(&path, Default::default()).await.is_ok()
3930 {
3931 files.push(path);
3932 break;
3933 } else {
3934 log::info!("Host: cannot create file");
3935 }
3936 },
3937 }
3938
3939 cx.background().simulate_random_delay().await;
3940 }
3941
3942 self.project = Some(project);
3943 (self, cx)
3944 }
3945
3946 pub async fn simulate_guest(
3947 mut self,
3948 guest_id: usize,
3949 project: ModelHandle<Project>,
3950 operations: Rc<Cell<usize>>,
3951 max_operations: usize,
3952 rng: Rc<RefCell<StdRng>>,
3953 mut cx: TestAppContext,
3954 ) -> (Self, TestAppContext) {
3955 while operations.get() < max_operations {
3956 let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
3957 let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
3958 project
3959 .worktrees(&cx)
3960 .filter(|worktree| {
3961 worktree.read(cx).entries(false).any(|e| e.is_file())
3962 })
3963 .choose(&mut *rng.borrow_mut())
3964 }) {
3965 worktree
3966 } else {
3967 cx.background().simulate_random_delay().await;
3968 continue;
3969 };
3970
3971 operations.set(operations.get() + 1);
3972 let project_path = worktree.read_with(&cx, |worktree, _| {
3973 let entry = worktree
3974 .entries(false)
3975 .filter(|e| e.is_file())
3976 .choose(&mut *rng.borrow_mut())
3977 .unwrap();
3978 (worktree.id(), entry.path.clone())
3979 });
3980 log::info!("Guest {}: opening path {:?}", guest_id, project_path);
3981 let buffer = project
3982 .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
3983 .await
3984 .unwrap();
3985 self.buffers.insert(buffer.clone());
3986 buffer
3987 } else {
3988 self.buffers
3989 .iter()
3990 .choose(&mut *rng.borrow_mut())
3991 .unwrap()
3992 .clone()
3993 };
3994
3995 buffer.update(&mut cx, |buffer, cx| {
3996 log::info!(
3997 "Guest {}: updating buffer {:?}",
3998 guest_id,
3999 buffer.file().unwrap().full_path(cx)
4000 );
4001 buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4002 });
4003
4004 cx.background().simulate_random_delay().await;
4005 }
4006
4007 self.project = Some(project);
4008 (self, cx)
4009 }
4010 }
4011
4012 impl Executor for Arc<gpui::executor::Background> {
4013 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4014 self.spawn(future).detach();
4015 }
4016 }
4017
4018 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4019 channel
4020 .messages()
4021 .cursor::<()>()
4022 .map(|m| {
4023 (
4024 m.sender.github_login.clone(),
4025 m.body.clone(),
4026 m.is_pending(),
4027 )
4028 })
4029 .collect()
4030 }
4031
4032 struct EmptyView;
4033
4034 impl gpui::Entity for EmptyView {
4035 type Event = ();
4036 }
4037
4038 impl gpui::View for EmptyView {
4039 fn ui_name() -> &'static str {
4040 "empty view"
4041 }
4042
4043 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
4044 gpui::Element::boxed(gpui::elements::Empty)
4045 }
4046 }
4047}