1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44const MESSAGE_COUNT_PER_PAGE: usize = 100;
45const MAX_MESSAGE_LEN: usize = 1024;
46const NO_SUCH_PROJECT: &'static str = "no such project";
47
48impl Server {
49 pub fn new(
50 app_state: Arc<AppState>,
51 peer: Arc<Peer>,
52 notifications: Option<mpsc::Sender<()>>,
53 ) -> Arc<Self> {
54 let mut server = Self {
55 peer,
56 app_state,
57 store: Default::default(),
58 handlers: Default::default(),
59 notifications,
60 };
61
62 server
63 .add_handler(Server::ping)
64 .add_handler(Server::register_project)
65 .add_handler(Server::unregister_project)
66 .add_handler(Server::share_project)
67 .add_handler(Server::unshare_project)
68 .add_handler(Server::join_project)
69 .add_handler(Server::leave_project)
70 .add_handler(Server::register_worktree)
71 .add_handler(Server::unregister_worktree)
72 .add_handler(Server::share_worktree)
73 .add_handler(Server::update_worktree)
74 .add_handler(Server::update_diagnostic_summary)
75 .add_handler(Server::disk_based_diagnostics_updating)
76 .add_handler(Server::disk_based_diagnostics_updated)
77 .add_handler(Server::get_definition)
78 .add_handler(Server::open_buffer)
79 .add_handler(Server::close_buffer)
80 .add_handler(Server::update_buffer)
81 .add_handler(Server::update_buffer_file)
82 .add_handler(Server::buffer_reloaded)
83 .add_handler(Server::buffer_saved)
84 .add_handler(Server::save_buffer)
85 .add_handler(Server::format_buffer)
86 .add_handler(Server::get_completions)
87 .add_handler(Server::apply_additional_edits_for_completion)
88 .add_handler(Server::get_channels)
89 .add_handler(Server::get_users)
90 .add_handler(Server::join_channel)
91 .add_handler(Server::leave_channel)
92 .add_handler(Server::send_channel_message)
93 .add_handler(Server::get_channel_messages);
94
95 Arc::new(server)
96 }
97
98 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
99 where
100 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
101 Fut: 'static + Send + Future<Output = tide::Result<()>>,
102 M: EnvelopedMessage,
103 {
104 let prev_handler = self.handlers.insert(
105 TypeId::of::<M>(),
106 Box::new(move |server, envelope| {
107 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
108 (handler)(server, *envelope).boxed()
109 }),
110 );
111 if prev_handler.is_some() {
112 panic!("registered a handler for the same message twice");
113 }
114 self
115 }
116
117 pub fn handle_connection(
118 self: &Arc<Self>,
119 connection: Connection,
120 addr: String,
121 user_id: UserId,
122 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
123 ) -> impl Future<Output = ()> {
124 let mut this = self.clone();
125 async move {
126 let (connection_id, handle_io, mut incoming_rx) =
127 this.peer.add_connection(connection).await;
128
129 if let Some(send_connection_id) = send_connection_id.as_mut() {
130 let _ = send_connection_id.send(connection_id).await;
131 }
132
133 this.state_mut().add_connection(connection_id, user_id);
134 if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
135 log::error!("error updating contacts for {:?}: {}", user_id, err);
136 }
137
138 let handle_io = handle_io.fuse();
139 futures::pin_mut!(handle_io);
140 loop {
141 let next_message = incoming_rx.next().fuse();
142 futures::pin_mut!(next_message);
143 futures::select_biased! {
144 message = next_message => {
145 if let Some(message) = message {
146 let start_time = Instant::now();
147 log::info!("RPC message received: {}", message.payload_type_name());
148 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
149 if let Err(err) = (handler)(this.clone(), message).await {
150 log::error!("error handling message: {:?}", err);
151 } else {
152 log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
153 }
154
155 if let Some(mut notifications) = this.notifications.clone() {
156 let _ = notifications.send(()).await;
157 }
158 } else {
159 log::warn!("unhandled message: {}", message.payload_type_name());
160 }
161 } else {
162 log::info!("rpc connection closed {:?}", addr);
163 break;
164 }
165 }
166 handle_io = handle_io => {
167 if let Err(err) = handle_io {
168 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
169 }
170 break;
171 }
172 }
173 }
174
175 if let Err(err) = this.sign_out(connection_id).await {
176 log::error!("error signing out connection {:?} - {:?}", addr, err);
177 }
178 }
179 }
180
181 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
182 self.peer.disconnect(connection_id);
183 let removed_connection = self.state_mut().remove_connection(connection_id)?;
184
185 for (project_id, project) in removed_connection.hosted_projects {
186 if let Some(share) = project.share {
187 broadcast(
188 connection_id,
189 share.guests.keys().copied().collect(),
190 |conn_id| {
191 self.peer
192 .send(conn_id, proto::UnshareProject { project_id })
193 },
194 )
195 .await?;
196 }
197 }
198
199 for (project_id, peer_ids) in removed_connection.guest_project_ids {
200 broadcast(connection_id, peer_ids, |conn_id| {
201 self.peer.send(
202 conn_id,
203 proto::RemoveProjectCollaborator {
204 project_id,
205 peer_id: connection_id.0,
206 },
207 )
208 })
209 .await?;
210 }
211
212 self.update_contacts_for_users(removed_connection.contact_ids.iter())
213 .await?;
214
215 Ok(())
216 }
217
218 async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
219 self.peer.respond(request.receipt(), proto::Ack {}).await?;
220 Ok(())
221 }
222
223 async fn register_project(
224 mut self: Arc<Server>,
225 request: TypedEnvelope<proto::RegisterProject>,
226 ) -> tide::Result<()> {
227 let project_id = {
228 let mut state = self.state_mut();
229 let user_id = state.user_id_for_connection(request.sender_id)?;
230 state.register_project(request.sender_id, user_id)
231 };
232 self.peer
233 .respond(
234 request.receipt(),
235 proto::RegisterProjectResponse { project_id },
236 )
237 .await?;
238 Ok(())
239 }
240
241 async fn unregister_project(
242 mut self: Arc<Server>,
243 request: TypedEnvelope<proto::UnregisterProject>,
244 ) -> tide::Result<()> {
245 let project = self
246 .state_mut()
247 .unregister_project(request.payload.project_id, request.sender_id)
248 .ok_or_else(|| anyhow!("no such project"))?;
249 self.update_contacts_for_users(project.authorized_user_ids().iter())
250 .await?;
251 Ok(())
252 }
253
254 async fn share_project(
255 mut self: Arc<Server>,
256 request: TypedEnvelope<proto::ShareProject>,
257 ) -> tide::Result<()> {
258 self.state_mut()
259 .share_project(request.payload.project_id, request.sender_id);
260 self.peer.respond(request.receipt(), proto::Ack {}).await?;
261 Ok(())
262 }
263
264 async fn unshare_project(
265 mut self: Arc<Server>,
266 request: TypedEnvelope<proto::UnshareProject>,
267 ) -> tide::Result<()> {
268 let project_id = request.payload.project_id;
269 let project = self
270 .state_mut()
271 .unshare_project(project_id, request.sender_id)?;
272
273 broadcast(request.sender_id, project.connection_ids, |conn_id| {
274 self.peer
275 .send(conn_id, proto::UnshareProject { project_id })
276 })
277 .await?;
278 self.update_contacts_for_users(&project.authorized_user_ids)
279 .await?;
280
281 Ok(())
282 }
283
284 async fn join_project(
285 mut self: Arc<Server>,
286 request: TypedEnvelope<proto::JoinProject>,
287 ) -> tide::Result<()> {
288 let project_id = request.payload.project_id;
289
290 let user_id = self.state().user_id_for_connection(request.sender_id)?;
291 let response_data = self
292 .state_mut()
293 .join_project(request.sender_id, user_id, project_id)
294 .and_then(|joined| {
295 let share = joined.project.share()?;
296 let peer_count = share.guests.len();
297 let mut collaborators = Vec::with_capacity(peer_count);
298 collaborators.push(proto::Collaborator {
299 peer_id: joined.project.host_connection_id.0,
300 replica_id: 0,
301 user_id: joined.project.host_user_id.to_proto(),
302 });
303 let worktrees = joined
304 .project
305 .worktrees
306 .iter()
307 .filter_map(|(id, worktree)| {
308 worktree.share.as_ref().map(|share| proto::Worktree {
309 id: *id,
310 root_name: worktree.root_name.clone(),
311 entries: share.entries.values().cloned().collect(),
312 diagnostic_summaries: share
313 .diagnostic_summaries
314 .values()
315 .cloned()
316 .collect(),
317 weak: worktree.weak,
318 })
319 })
320 .collect();
321 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
322 if *peer_conn_id != request.sender_id {
323 collaborators.push(proto::Collaborator {
324 peer_id: peer_conn_id.0,
325 replica_id: *peer_replica_id as u32,
326 user_id: peer_user_id.to_proto(),
327 });
328 }
329 }
330 let response = proto::JoinProjectResponse {
331 worktrees,
332 replica_id: joined.replica_id as u32,
333 collaborators,
334 };
335 let connection_ids = joined.project.connection_ids();
336 let contact_user_ids = joined.project.authorized_user_ids();
337 Ok((response, connection_ids, contact_user_ids))
338 });
339
340 match response_data {
341 Ok((response, connection_ids, contact_user_ids)) => {
342 broadcast(request.sender_id, connection_ids, |conn_id| {
343 self.peer.send(
344 conn_id,
345 proto::AddProjectCollaborator {
346 project_id,
347 collaborator: Some(proto::Collaborator {
348 peer_id: request.sender_id.0,
349 replica_id: response.replica_id,
350 user_id: user_id.to_proto(),
351 }),
352 },
353 )
354 })
355 .await?;
356 self.peer.respond(request.receipt(), response).await?;
357 self.update_contacts_for_users(&contact_user_ids).await?;
358 }
359 Err(error) => {
360 self.peer
361 .respond_with_error(
362 request.receipt(),
363 proto::Error {
364 message: error.to_string(),
365 },
366 )
367 .await?;
368 }
369 }
370
371 Ok(())
372 }
373
374 async fn leave_project(
375 mut self: Arc<Server>,
376 request: TypedEnvelope<proto::LeaveProject>,
377 ) -> tide::Result<()> {
378 let sender_id = request.sender_id;
379 let project_id = request.payload.project_id;
380 let worktree = self.state_mut().leave_project(sender_id, project_id);
381 if let Some(worktree) = worktree {
382 broadcast(sender_id, worktree.connection_ids, |conn_id| {
383 self.peer.send(
384 conn_id,
385 proto::RemoveProjectCollaborator {
386 project_id,
387 peer_id: sender_id.0,
388 },
389 )
390 })
391 .await?;
392 self.update_contacts_for_users(&worktree.authorized_user_ids)
393 .await?;
394 }
395 Ok(())
396 }
397
398 async fn register_worktree(
399 mut self: Arc<Server>,
400 request: TypedEnvelope<proto::RegisterWorktree>,
401 ) -> tide::Result<()> {
402 let receipt = request.receipt();
403 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
404
405 let mut contact_user_ids = HashSet::default();
406 contact_user_ids.insert(host_user_id);
407 for github_login in request.payload.authorized_logins {
408 match self.app_state.db.create_user(&github_login, false).await {
409 Ok(contact_user_id) => {
410 contact_user_ids.insert(contact_user_id);
411 }
412 Err(err) => {
413 let message = err.to_string();
414 self.peer
415 .respond_with_error(receipt, proto::Error { message })
416 .await?;
417 return Ok(());
418 }
419 }
420 }
421
422 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
423 let ok = self.state_mut().register_worktree(
424 request.payload.project_id,
425 request.payload.worktree_id,
426 Worktree {
427 authorized_user_ids: contact_user_ids.clone(),
428 root_name: request.payload.root_name,
429 share: None,
430 weak: false,
431 },
432 );
433
434 if ok {
435 self.peer.respond(receipt, proto::Ack {}).await?;
436 self.update_contacts_for_users(&contact_user_ids).await?;
437 } else {
438 self.peer
439 .respond_with_error(
440 receipt,
441 proto::Error {
442 message: NO_SUCH_PROJECT.to_string(),
443 },
444 )
445 .await?;
446 }
447
448 Ok(())
449 }
450
451 async fn unregister_worktree(
452 mut self: Arc<Server>,
453 request: TypedEnvelope<proto::UnregisterWorktree>,
454 ) -> tide::Result<()> {
455 let project_id = request.payload.project_id;
456 let worktree_id = request.payload.worktree_id;
457 let (worktree, guest_connection_ids) =
458 self.state_mut()
459 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
460
461 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
462 self.peer.send(
463 conn_id,
464 proto::UnregisterWorktree {
465 project_id,
466 worktree_id,
467 },
468 )
469 })
470 .await?;
471 self.update_contacts_for_users(&worktree.authorized_user_ids)
472 .await?;
473 Ok(())
474 }
475
476 async fn share_worktree(
477 mut self: Arc<Server>,
478 mut request: TypedEnvelope<proto::ShareWorktree>,
479 ) -> tide::Result<()> {
480 let worktree = request
481 .payload
482 .worktree
483 .as_mut()
484 .ok_or_else(|| anyhow!("missing worktree"))?;
485 let entries = worktree
486 .entries
487 .iter()
488 .map(|entry| (entry.id, entry.clone()))
489 .collect();
490 let diagnostic_summaries = worktree
491 .diagnostic_summaries
492 .iter()
493 .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
494 .collect();
495
496 let shared_worktree = self.state_mut().share_worktree(
497 request.payload.project_id,
498 worktree.id,
499 request.sender_id,
500 entries,
501 diagnostic_summaries,
502 );
503 if let Some(shared_worktree) = shared_worktree {
504 broadcast(
505 request.sender_id,
506 shared_worktree.connection_ids,
507 |connection_id| {
508 self.peer.forward_send(
509 request.sender_id,
510 connection_id,
511 request.payload.clone(),
512 )
513 },
514 )
515 .await?;
516 self.peer.respond(request.receipt(), proto::Ack {}).await?;
517 self.update_contacts_for_users(&shared_worktree.authorized_user_ids)
518 .await?;
519 } else {
520 self.peer
521 .respond_with_error(
522 request.receipt(),
523 proto::Error {
524 message: "no such worktree".to_string(),
525 },
526 )
527 .await?;
528 }
529 Ok(())
530 }
531
532 async fn update_worktree(
533 mut self: Arc<Server>,
534 request: TypedEnvelope<proto::UpdateWorktree>,
535 ) -> tide::Result<()> {
536 let connection_ids = self
537 .state_mut()
538 .update_worktree(
539 request.sender_id,
540 request.payload.project_id,
541 request.payload.worktree_id,
542 &request.payload.removed_entries,
543 &request.payload.updated_entries,
544 )
545 .ok_or_else(|| anyhow!("no such worktree"))?;
546
547 broadcast(request.sender_id, connection_ids, |connection_id| {
548 self.peer
549 .forward_send(request.sender_id, connection_id, request.payload.clone())
550 })
551 .await?;
552
553 Ok(())
554 }
555
556 async fn update_diagnostic_summary(
557 mut self: Arc<Server>,
558 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
559 ) -> tide::Result<()> {
560 let receiver_ids = request
561 .payload
562 .summary
563 .clone()
564 .and_then(|summary| {
565 self.state_mut().update_diagnostic_summary(
566 request.payload.project_id,
567 request.payload.worktree_id,
568 request.sender_id,
569 summary,
570 )
571 })
572 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
573
574 broadcast(request.sender_id, receiver_ids, |connection_id| {
575 self.peer
576 .forward_send(request.sender_id, connection_id, request.payload.clone())
577 })
578 .await?;
579 Ok(())
580 }
581
582 async fn disk_based_diagnostics_updating(
583 self: Arc<Server>,
584 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
585 ) -> tide::Result<()> {
586 let receiver_ids = self
587 .state()
588 .project_connection_ids(request.payload.project_id, request.sender_id)
589 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
590 broadcast(request.sender_id, receiver_ids, |connection_id| {
591 self.peer
592 .forward_send(request.sender_id, connection_id, request.payload.clone())
593 })
594 .await?;
595 Ok(())
596 }
597
598 async fn disk_based_diagnostics_updated(
599 self: Arc<Server>,
600 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
601 ) -> tide::Result<()> {
602 let receiver_ids = self
603 .state()
604 .project_connection_ids(request.payload.project_id, request.sender_id)
605 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
606 broadcast(request.sender_id, receiver_ids, |connection_id| {
607 self.peer
608 .forward_send(request.sender_id, connection_id, request.payload.clone())
609 })
610 .await?;
611 Ok(())
612 }
613
614 async fn get_definition(
615 self: Arc<Server>,
616 request: TypedEnvelope<proto::GetDefinition>,
617 ) -> tide::Result<()> {
618 let receipt = request.receipt();
619 let host_connection_id = self
620 .state()
621 .read_project(request.payload.project_id, request.sender_id)
622 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
623 .host_connection_id;
624 let response = self
625 .peer
626 .forward_request(request.sender_id, host_connection_id, request.payload)
627 .await?;
628 self.peer.respond(receipt, response).await?;
629 Ok(())
630 }
631
632 async fn open_buffer(
633 self: Arc<Server>,
634 request: TypedEnvelope<proto::OpenBuffer>,
635 ) -> tide::Result<()> {
636 let receipt = request.receipt();
637 let host_connection_id = self
638 .state()
639 .read_project(request.payload.project_id, request.sender_id)
640 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
641 .host_connection_id;
642 let response = self
643 .peer
644 .forward_request(request.sender_id, host_connection_id, request.payload)
645 .await?;
646 self.peer.respond(receipt, response).await?;
647 Ok(())
648 }
649
650 async fn close_buffer(
651 self: Arc<Server>,
652 request: TypedEnvelope<proto::CloseBuffer>,
653 ) -> tide::Result<()> {
654 let host_connection_id = self
655 .state()
656 .read_project(request.payload.project_id, request.sender_id)
657 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
658 .host_connection_id;
659 self.peer
660 .forward_send(request.sender_id, host_connection_id, request.payload)
661 .await?;
662 Ok(())
663 }
664
665 async fn save_buffer(
666 self: Arc<Server>,
667 request: TypedEnvelope<proto::SaveBuffer>,
668 ) -> tide::Result<()> {
669 let host;
670 let guests;
671 {
672 let state = self.state();
673 let project = state
674 .read_project(request.payload.project_id, request.sender_id)
675 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
676 host = project.host_connection_id;
677 guests = project.guest_connection_ids()
678 }
679
680 let sender = request.sender_id;
681 let receipt = request.receipt();
682 let response = self
683 .peer
684 .forward_request(sender, host, request.payload.clone())
685 .await?;
686
687 broadcast(host, guests, |conn_id| {
688 let response = response.clone();
689 let peer = &self.peer;
690 async move {
691 if conn_id == sender {
692 peer.respond(receipt, response).await
693 } else {
694 peer.forward_send(host, conn_id, response).await
695 }
696 }
697 })
698 .await?;
699
700 Ok(())
701 }
702
703 async fn format_buffer(
704 self: Arc<Server>,
705 request: TypedEnvelope<proto::FormatBuffer>,
706 ) -> tide::Result<()> {
707 let host;
708 {
709 let state = self.state();
710 let project = state
711 .read_project(request.payload.project_id, request.sender_id)
712 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
713 host = project.host_connection_id;
714 }
715
716 let sender = request.sender_id;
717 let receipt = request.receipt();
718 let response = self
719 .peer
720 .forward_request(sender, host, request.payload.clone())
721 .await?;
722 self.peer.respond(receipt, response).await?;
723
724 Ok(())
725 }
726
727 async fn get_completions(
728 self: Arc<Server>,
729 request: TypedEnvelope<proto::GetCompletions>,
730 ) -> tide::Result<()> {
731 let host;
732 {
733 let state = self.state();
734 let project = state
735 .read_project(request.payload.project_id, request.sender_id)
736 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
737 host = project.host_connection_id;
738 }
739
740 let sender = request.sender_id;
741 let receipt = request.receipt();
742 let response = self
743 .peer
744 .forward_request(sender, host, request.payload.clone())
745 .await?;
746 self.peer.respond(receipt, response).await?;
747
748 Ok(())
749 }
750
751 async fn apply_additional_edits_for_completion(
752 self: Arc<Server>,
753 request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
754 ) -> tide::Result<()> {
755 let host;
756 {
757 let state = self.state();
758 let project = state
759 .read_project(request.payload.project_id, request.sender_id)
760 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
761 host = project.host_connection_id;
762 }
763
764 let sender = request.sender_id;
765 let receipt = request.receipt();
766 let response = self
767 .peer
768 .forward_request(sender, host, request.payload.clone())
769 .await?;
770 self.peer.respond(receipt, response).await?;
771
772 Ok(())
773 }
774
775 async fn update_buffer(
776 self: Arc<Server>,
777 request: TypedEnvelope<proto::UpdateBuffer>,
778 ) -> tide::Result<()> {
779 let receiver_ids = self
780 .state()
781 .project_connection_ids(request.payload.project_id, request.sender_id)
782 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
783 broadcast(request.sender_id, receiver_ids, |connection_id| {
784 self.peer
785 .forward_send(request.sender_id, connection_id, request.payload.clone())
786 })
787 .await?;
788 self.peer.respond(request.receipt(), proto::Ack {}).await?;
789 Ok(())
790 }
791
792 async fn update_buffer_file(
793 self: Arc<Server>,
794 request: TypedEnvelope<proto::UpdateBufferFile>,
795 ) -> tide::Result<()> {
796 let receiver_ids = self
797 .state()
798 .project_connection_ids(request.payload.project_id, request.sender_id)
799 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
800 broadcast(request.sender_id, receiver_ids, |connection_id| {
801 self.peer
802 .forward_send(request.sender_id, connection_id, request.payload.clone())
803 })
804 .await?;
805 Ok(())
806 }
807
808 async fn buffer_reloaded(
809 self: Arc<Server>,
810 request: TypedEnvelope<proto::BufferReloaded>,
811 ) -> tide::Result<()> {
812 let receiver_ids = self
813 .state()
814 .project_connection_ids(request.payload.project_id, request.sender_id)
815 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
816 broadcast(request.sender_id, receiver_ids, |connection_id| {
817 self.peer
818 .forward_send(request.sender_id, connection_id, request.payload.clone())
819 })
820 .await?;
821 Ok(())
822 }
823
824 async fn buffer_saved(
825 self: Arc<Server>,
826 request: TypedEnvelope<proto::BufferSaved>,
827 ) -> tide::Result<()> {
828 let receiver_ids = self
829 .state()
830 .project_connection_ids(request.payload.project_id, request.sender_id)
831 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
832 broadcast(request.sender_id, receiver_ids, |connection_id| {
833 self.peer
834 .forward_send(request.sender_id, connection_id, request.payload.clone())
835 })
836 .await?;
837 Ok(())
838 }
839
840 async fn get_channels(
841 self: Arc<Server>,
842 request: TypedEnvelope<proto::GetChannels>,
843 ) -> tide::Result<()> {
844 let user_id = self.state().user_id_for_connection(request.sender_id)?;
845 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
846 self.peer
847 .respond(
848 request.receipt(),
849 proto::GetChannelsResponse {
850 channels: channels
851 .into_iter()
852 .map(|chan| proto::Channel {
853 id: chan.id.to_proto(),
854 name: chan.name,
855 })
856 .collect(),
857 },
858 )
859 .await?;
860 Ok(())
861 }
862
863 async fn get_users(
864 self: Arc<Server>,
865 request: TypedEnvelope<proto::GetUsers>,
866 ) -> tide::Result<()> {
867 let receipt = request.receipt();
868 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
869 let users = self
870 .app_state
871 .db
872 .get_users_by_ids(user_ids)
873 .await?
874 .into_iter()
875 .map(|user| proto::User {
876 id: user.id.to_proto(),
877 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
878 github_login: user.github_login,
879 })
880 .collect();
881 self.peer
882 .respond(receipt, proto::GetUsersResponse { users })
883 .await?;
884 Ok(())
885 }
886
887 async fn update_contacts_for_users<'a>(
888 self: &Arc<Server>,
889 user_ids: impl IntoIterator<Item = &'a UserId>,
890 ) -> tide::Result<()> {
891 let mut send_futures = Vec::new();
892
893 {
894 let state = self.state();
895 for user_id in user_ids {
896 let contacts = state.contacts_for_user(*user_id);
897 for connection_id in state.connection_ids_for_user(*user_id) {
898 send_futures.push(self.peer.send(
899 connection_id,
900 proto::UpdateContacts {
901 contacts: contacts.clone(),
902 },
903 ));
904 }
905 }
906 }
907 futures::future::try_join_all(send_futures).await?;
908
909 Ok(())
910 }
911
912 async fn join_channel(
913 mut self: Arc<Self>,
914 request: TypedEnvelope<proto::JoinChannel>,
915 ) -> tide::Result<()> {
916 let user_id = self.state().user_id_for_connection(request.sender_id)?;
917 let channel_id = ChannelId::from_proto(request.payload.channel_id);
918 if !self
919 .app_state
920 .db
921 .can_user_access_channel(user_id, channel_id)
922 .await?
923 {
924 Err(anyhow!("access denied"))?;
925 }
926
927 self.state_mut().join_channel(request.sender_id, channel_id);
928 let messages = self
929 .app_state
930 .db
931 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
932 .await?
933 .into_iter()
934 .map(|msg| proto::ChannelMessage {
935 id: msg.id.to_proto(),
936 body: msg.body,
937 timestamp: msg.sent_at.unix_timestamp() as u64,
938 sender_id: msg.sender_id.to_proto(),
939 nonce: Some(msg.nonce.as_u128().into()),
940 })
941 .collect::<Vec<_>>();
942 self.peer
943 .respond(
944 request.receipt(),
945 proto::JoinChannelResponse {
946 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
947 messages,
948 },
949 )
950 .await?;
951 Ok(())
952 }
953
954 async fn leave_channel(
955 mut self: Arc<Self>,
956 request: TypedEnvelope<proto::LeaveChannel>,
957 ) -> tide::Result<()> {
958 let user_id = self.state().user_id_for_connection(request.sender_id)?;
959 let channel_id = ChannelId::from_proto(request.payload.channel_id);
960 if !self
961 .app_state
962 .db
963 .can_user_access_channel(user_id, channel_id)
964 .await?
965 {
966 Err(anyhow!("access denied"))?;
967 }
968
969 self.state_mut()
970 .leave_channel(request.sender_id, channel_id);
971
972 Ok(())
973 }
974
975 async fn send_channel_message(
976 self: Arc<Self>,
977 request: TypedEnvelope<proto::SendChannelMessage>,
978 ) -> tide::Result<()> {
979 let receipt = request.receipt();
980 let channel_id = ChannelId::from_proto(request.payload.channel_id);
981 let user_id;
982 let connection_ids;
983 {
984 let state = self.state();
985 user_id = state.user_id_for_connection(request.sender_id)?;
986 if let Some(ids) = state.channel_connection_ids(channel_id) {
987 connection_ids = ids;
988 } else {
989 return Ok(());
990 }
991 }
992
993 // Validate the message body.
994 let body = request.payload.body.trim().to_string();
995 if body.len() > MAX_MESSAGE_LEN {
996 self.peer
997 .respond_with_error(
998 receipt,
999 proto::Error {
1000 message: "message is too long".to_string(),
1001 },
1002 )
1003 .await?;
1004 return Ok(());
1005 }
1006 if body.is_empty() {
1007 self.peer
1008 .respond_with_error(
1009 receipt,
1010 proto::Error {
1011 message: "message can't be blank".to_string(),
1012 },
1013 )
1014 .await?;
1015 return Ok(());
1016 }
1017
1018 let timestamp = OffsetDateTime::now_utc();
1019 let nonce = if let Some(nonce) = request.payload.nonce {
1020 nonce
1021 } else {
1022 self.peer
1023 .respond_with_error(
1024 receipt,
1025 proto::Error {
1026 message: "nonce can't be blank".to_string(),
1027 },
1028 )
1029 .await?;
1030 return Ok(());
1031 };
1032
1033 let message_id = self
1034 .app_state
1035 .db
1036 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1037 .await?
1038 .to_proto();
1039 let message = proto::ChannelMessage {
1040 sender_id: user_id.to_proto(),
1041 id: message_id,
1042 body,
1043 timestamp: timestamp.unix_timestamp() as u64,
1044 nonce: Some(nonce),
1045 };
1046 broadcast(request.sender_id, connection_ids, |conn_id| {
1047 self.peer.send(
1048 conn_id,
1049 proto::ChannelMessageSent {
1050 channel_id: channel_id.to_proto(),
1051 message: Some(message.clone()),
1052 },
1053 )
1054 })
1055 .await?;
1056 self.peer
1057 .respond(
1058 receipt,
1059 proto::SendChannelMessageResponse {
1060 message: Some(message),
1061 },
1062 )
1063 .await?;
1064 Ok(())
1065 }
1066
1067 async fn get_channel_messages(
1068 self: Arc<Self>,
1069 request: TypedEnvelope<proto::GetChannelMessages>,
1070 ) -> tide::Result<()> {
1071 let user_id = self.state().user_id_for_connection(request.sender_id)?;
1072 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1073 if !self
1074 .app_state
1075 .db
1076 .can_user_access_channel(user_id, channel_id)
1077 .await?
1078 {
1079 Err(anyhow!("access denied"))?;
1080 }
1081
1082 let messages = self
1083 .app_state
1084 .db
1085 .get_channel_messages(
1086 channel_id,
1087 MESSAGE_COUNT_PER_PAGE,
1088 Some(MessageId::from_proto(request.payload.before_message_id)),
1089 )
1090 .await?
1091 .into_iter()
1092 .map(|msg| proto::ChannelMessage {
1093 id: msg.id.to_proto(),
1094 body: msg.body,
1095 timestamp: msg.sent_at.unix_timestamp() as u64,
1096 sender_id: msg.sender_id.to_proto(),
1097 nonce: Some(msg.nonce.as_u128().into()),
1098 })
1099 .collect::<Vec<_>>();
1100 self.peer
1101 .respond(
1102 request.receipt(),
1103 proto::GetChannelMessagesResponse {
1104 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1105 messages,
1106 },
1107 )
1108 .await?;
1109 Ok(())
1110 }
1111
1112 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1113 self.store.read()
1114 }
1115
1116 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1117 self.store.write()
1118 }
1119}
1120
1121pub async fn broadcast<F, T>(
1122 sender_id: ConnectionId,
1123 receiver_ids: Vec<ConnectionId>,
1124 mut f: F,
1125) -> anyhow::Result<()>
1126where
1127 F: FnMut(ConnectionId) -> T,
1128 T: Future<Output = anyhow::Result<()>>,
1129{
1130 let futures = receiver_ids
1131 .into_iter()
1132 .filter(|id| *id != sender_id)
1133 .map(|id| f(id));
1134 futures::future::try_join_all(futures).await?;
1135 Ok(())
1136}
1137
1138pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1139 let server = Server::new(app.state().clone(), rpc.clone(), None);
1140 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1141 let server = server.clone();
1142 async move {
1143 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1144
1145 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1146 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1147 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1148 let client_protocol_version: Option<u32> = request
1149 .header("X-Zed-Protocol-Version")
1150 .and_then(|v| v.as_str().parse().ok());
1151
1152 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1153 return Ok(Response::new(StatusCode::UpgradeRequired));
1154 }
1155
1156 let header = match request.header("Sec-Websocket-Key") {
1157 Some(h) => h.as_str(),
1158 None => return Err(anyhow!("expected sec-websocket-key"))?,
1159 };
1160
1161 let user_id = process_auth_header(&request).await?;
1162
1163 let mut response = Response::new(StatusCode::SwitchingProtocols);
1164 response.insert_header(UPGRADE, "websocket");
1165 response.insert_header(CONNECTION, "Upgrade");
1166 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1167 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1168 response.insert_header("Sec-Websocket-Version", "13");
1169
1170 let http_res: &mut tide::http::Response = response.as_mut();
1171 let upgrade_receiver = http_res.recv_upgrade().await;
1172 let addr = request.remote().unwrap_or("unknown").to_string();
1173 task::spawn(async move {
1174 if let Some(stream) = upgrade_receiver.await {
1175 server
1176 .handle_connection(
1177 Connection::new(
1178 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1179 ),
1180 addr,
1181 user_id,
1182 None,
1183 )
1184 .await;
1185 }
1186 });
1187
1188 Ok(response)
1189 }
1190 });
1191}
1192
1193fn header_contains_ignore_case<T>(
1194 request: &tide::Request<T>,
1195 header_name: HeaderName,
1196 value: &str,
1197) -> bool {
1198 request
1199 .header(header_name)
1200 .map(|h| {
1201 h.as_str()
1202 .split(',')
1203 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1204 })
1205 .unwrap_or(false)
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210 use super::*;
1211 use crate::{
1212 auth,
1213 db::{tests::TestDb, UserId},
1214 github, AppState, Config,
1215 };
1216 use ::rpc::Peer;
1217 use async_std::task;
1218 use gpui::{executor, ModelHandle, TestAppContext};
1219 use parking_lot::Mutex;
1220 use postage::{mpsc, watch};
1221 use rand::prelude::*;
1222 use rpc::PeerId;
1223 use serde_json::json;
1224 use sqlx::types::time::OffsetDateTime;
1225 use std::{
1226 ops::Deref,
1227 path::Path,
1228 rc::Rc,
1229 sync::{
1230 atomic::{AtomicBool, Ordering::SeqCst},
1231 Arc,
1232 },
1233 time::Duration,
1234 };
1235 use zed::{
1236 client::{
1237 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1238 EstablishConnectionError, UserStore,
1239 },
1240 editor::{Editor, EditorSettings, Input, MultiBuffer},
1241 fs::{FakeFs, Fs as _},
1242 language::{
1243 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1244 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1245 },
1246 lsp,
1247 project::{DiagnosticSummary, Project, ProjectPath},
1248 };
1249
1250 #[gpui::test]
1251 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1252 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1253 let lang_registry = Arc::new(LanguageRegistry::new());
1254 let fs = Arc::new(FakeFs::new(cx_a.background()));
1255 cx_a.foreground().forbid_parking();
1256
1257 // Connect to a server as 2 clients.
1258 let mut server = TestServer::start(cx_a.foreground()).await;
1259 let client_a = server.create_client(&mut cx_a, "user_a").await;
1260 let client_b = server.create_client(&mut cx_b, "user_b").await;
1261
1262 // Share a project as client A
1263 fs.insert_tree(
1264 "/a",
1265 json!({
1266 ".zed.toml": r#"collaborators = ["user_b"]"#,
1267 "a.txt": "a-contents",
1268 "b.txt": "b-contents",
1269 }),
1270 )
1271 .await;
1272 let project_a = cx_a.update(|cx| {
1273 Project::local(
1274 client_a.clone(),
1275 client_a.user_store.clone(),
1276 lang_registry.clone(),
1277 fs.clone(),
1278 cx,
1279 )
1280 });
1281 let (worktree_a, _) = project_a
1282 .update(&mut cx_a, |p, cx| {
1283 p.find_or_create_local_worktree("/a", false, cx)
1284 })
1285 .await
1286 .unwrap();
1287 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1288 worktree_a
1289 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1290 .await;
1291 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1292 project_a
1293 .update(&mut cx_a, |p, cx| p.share(cx))
1294 .await
1295 .unwrap();
1296
1297 // Join that project as client B
1298 let project_b = Project::remote(
1299 project_id,
1300 client_b.clone(),
1301 client_b.user_store.clone(),
1302 lang_registry.clone(),
1303 fs.clone(),
1304 &mut cx_b.to_async(),
1305 )
1306 .await
1307 .unwrap();
1308
1309 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1310 assert_eq!(
1311 project
1312 .collaborators()
1313 .get(&client_a.peer_id)
1314 .unwrap()
1315 .user
1316 .github_login,
1317 "user_a"
1318 );
1319 project.replica_id()
1320 });
1321 project_a
1322 .condition(&cx_a, |tree, _| {
1323 tree.collaborators()
1324 .get(&client_b.peer_id)
1325 .map_or(false, |collaborator| {
1326 collaborator.replica_id == replica_id_b
1327 && collaborator.user.github_login == "user_b"
1328 })
1329 })
1330 .await;
1331
1332 // Open the same file as client B and client A.
1333 let buffer_b = project_b
1334 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1335 .await
1336 .unwrap();
1337 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1338 buffer_b.read_with(&cx_b, |buf, cx| {
1339 assert_eq!(buf.read(cx).text(), "b-contents")
1340 });
1341 project_a.read_with(&cx_a, |project, cx| {
1342 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1343 });
1344 let buffer_a = project_a
1345 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1346 .await
1347 .unwrap();
1348
1349 let editor_b = cx_b.add_view(window_b, |cx| {
1350 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1351 });
1352
1353 // TODO
1354 // // Create a selection set as client B and see that selection set as client A.
1355 // buffer_a
1356 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1357 // .await;
1358
1359 // Edit the buffer as client B and see that edit as client A.
1360 editor_b.update(&mut cx_b, |editor, cx| {
1361 editor.handle_input(&Input("ok, ".into()), cx)
1362 });
1363 buffer_a
1364 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1365 .await;
1366
1367 // TODO
1368 // // Remove the selection set as client B, see those selections disappear as client A.
1369 cx_b.update(move |_| drop(editor_b));
1370 // buffer_a
1371 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1372 // .await;
1373
1374 // Close the buffer as client A, see that the buffer is closed.
1375 cx_a.update(move |_| drop(buffer_a));
1376 project_a
1377 .condition(&cx_a, |project, cx| {
1378 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1379 })
1380 .await;
1381
1382 // Dropping the client B's project removes client B from client A's collaborators.
1383 cx_b.update(move |_| drop(project_b));
1384 project_a
1385 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1386 .await;
1387 }
1388
1389 #[gpui::test]
1390 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1391 let lang_registry = Arc::new(LanguageRegistry::new());
1392 let fs = Arc::new(FakeFs::new(cx_a.background()));
1393 cx_a.foreground().forbid_parking();
1394
1395 // Connect to a server as 2 clients.
1396 let mut server = TestServer::start(cx_a.foreground()).await;
1397 let client_a = server.create_client(&mut cx_a, "user_a").await;
1398 let client_b = server.create_client(&mut cx_b, "user_b").await;
1399
1400 // Share a project as client A
1401 fs.insert_tree(
1402 "/a",
1403 json!({
1404 ".zed.toml": r#"collaborators = ["user_b"]"#,
1405 "a.txt": "a-contents",
1406 "b.txt": "b-contents",
1407 }),
1408 )
1409 .await;
1410 let project_a = cx_a.update(|cx| {
1411 Project::local(
1412 client_a.clone(),
1413 client_a.user_store.clone(),
1414 lang_registry.clone(),
1415 fs.clone(),
1416 cx,
1417 )
1418 });
1419 let (worktree_a, _) = project_a
1420 .update(&mut cx_a, |p, cx| {
1421 p.find_or_create_local_worktree("/a", false, cx)
1422 })
1423 .await
1424 .unwrap();
1425 worktree_a
1426 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1427 .await;
1428 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1429 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1430 project_a
1431 .update(&mut cx_a, |p, cx| p.share(cx))
1432 .await
1433 .unwrap();
1434 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1435
1436 // Join that project as client B
1437 let project_b = Project::remote(
1438 project_id,
1439 client_b.clone(),
1440 client_b.user_store.clone(),
1441 lang_registry.clone(),
1442 fs.clone(),
1443 &mut cx_b.to_async(),
1444 )
1445 .await
1446 .unwrap();
1447 project_b
1448 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1449 .await
1450 .unwrap();
1451
1452 // Unshare the project as client A
1453 project_a
1454 .update(&mut cx_a, |project, cx| project.unshare(cx))
1455 .await
1456 .unwrap();
1457 project_b
1458 .condition(&mut cx_b, |project, _| project.is_read_only())
1459 .await;
1460 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1461 drop(project_b);
1462
1463 // Share the project again and ensure guests can still join.
1464 project_a
1465 .update(&mut cx_a, |project, cx| project.share(cx))
1466 .await
1467 .unwrap();
1468 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1469
1470 let project_c = Project::remote(
1471 project_id,
1472 client_b.clone(),
1473 client_b.user_store.clone(),
1474 lang_registry.clone(),
1475 fs.clone(),
1476 &mut cx_b.to_async(),
1477 )
1478 .await
1479 .unwrap();
1480 project_c
1481 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1482 .await
1483 .unwrap();
1484 }
1485
1486 #[gpui::test]
1487 async fn test_propagate_saves_and_fs_changes(
1488 mut cx_a: TestAppContext,
1489 mut cx_b: TestAppContext,
1490 mut cx_c: TestAppContext,
1491 ) {
1492 let lang_registry = Arc::new(LanguageRegistry::new());
1493 let fs = Arc::new(FakeFs::new(cx_a.background()));
1494 cx_a.foreground().forbid_parking();
1495
1496 // Connect to a server as 3 clients.
1497 let mut server = TestServer::start(cx_a.foreground()).await;
1498 let client_a = server.create_client(&mut cx_a, "user_a").await;
1499 let client_b = server.create_client(&mut cx_b, "user_b").await;
1500 let client_c = server.create_client(&mut cx_c, "user_c").await;
1501
1502 // Share a worktree as client A.
1503 fs.insert_tree(
1504 "/a",
1505 json!({
1506 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1507 "file1": "",
1508 "file2": ""
1509 }),
1510 )
1511 .await;
1512 let project_a = cx_a.update(|cx| {
1513 Project::local(
1514 client_a.clone(),
1515 client_a.user_store.clone(),
1516 lang_registry.clone(),
1517 fs.clone(),
1518 cx,
1519 )
1520 });
1521 let (worktree_a, _) = project_a
1522 .update(&mut cx_a, |p, cx| {
1523 p.find_or_create_local_worktree("/a", false, cx)
1524 })
1525 .await
1526 .unwrap();
1527 worktree_a
1528 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1529 .await;
1530 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1531 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1532 project_a
1533 .update(&mut cx_a, |p, cx| p.share(cx))
1534 .await
1535 .unwrap();
1536
1537 // Join that worktree as clients B and C.
1538 let project_b = Project::remote(
1539 project_id,
1540 client_b.clone(),
1541 client_b.user_store.clone(),
1542 lang_registry.clone(),
1543 fs.clone(),
1544 &mut cx_b.to_async(),
1545 )
1546 .await
1547 .unwrap();
1548 let project_c = Project::remote(
1549 project_id,
1550 client_c.clone(),
1551 client_c.user_store.clone(),
1552 lang_registry.clone(),
1553 fs.clone(),
1554 &mut cx_c.to_async(),
1555 )
1556 .await
1557 .unwrap();
1558 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1559 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1560
1561 // Open and edit a buffer as both guests B and C.
1562 let buffer_b = project_b
1563 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1564 .await
1565 .unwrap();
1566 let buffer_c = project_c
1567 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1568 .await
1569 .unwrap();
1570 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1571 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1572
1573 // Open and edit that buffer as the host.
1574 let buffer_a = project_a
1575 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1576 .await
1577 .unwrap();
1578
1579 buffer_a
1580 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1581 .await;
1582 buffer_a.update(&mut cx_a, |buf, cx| {
1583 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1584 });
1585
1586 // Wait for edits to propagate
1587 buffer_a
1588 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1589 .await;
1590 buffer_b
1591 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1592 .await;
1593 buffer_c
1594 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1595 .await;
1596
1597 // Edit the buffer as the host and concurrently save as guest B.
1598 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1599 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1600 save_b.await.unwrap();
1601 assert_eq!(
1602 fs.load("/a/file1".as_ref()).await.unwrap(),
1603 "hi-a, i-am-c, i-am-b, i-am-a"
1604 );
1605 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1606 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1607 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1608
1609 // Make changes on host's file system, see those changes on guest worktrees.
1610 fs.rename("/a/file1".as_ref(), "/a/file1-renamed".as_ref())
1611 .await
1612 .unwrap();
1613 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1614 .await
1615 .unwrap();
1616 fs.insert_file(Path::new("/a/file4"), "4".into())
1617 .await
1618 .unwrap();
1619
1620 worktree_a
1621 .condition(&cx_a, |tree, _| tree.file_count() == 4)
1622 .await;
1623 worktree_b
1624 .condition(&cx_b, |tree, _| tree.file_count() == 4)
1625 .await;
1626 worktree_c
1627 .condition(&cx_c, |tree, _| tree.file_count() == 4)
1628 .await;
1629 worktree_a.read_with(&cx_a, |tree, _| {
1630 assert_eq!(
1631 tree.paths()
1632 .map(|p| p.to_string_lossy())
1633 .collect::<Vec<_>>(),
1634 &[".zed.toml", "file1-renamed", "file3", "file4"]
1635 )
1636 });
1637 worktree_b.read_with(&cx_b, |tree, _| {
1638 assert_eq!(
1639 tree.paths()
1640 .map(|p| p.to_string_lossy())
1641 .collect::<Vec<_>>(),
1642 &[".zed.toml", "file1-renamed", "file3", "file4"]
1643 )
1644 });
1645 worktree_c.read_with(&cx_c, |tree, _| {
1646 assert_eq!(
1647 tree.paths()
1648 .map(|p| p.to_string_lossy())
1649 .collect::<Vec<_>>(),
1650 &[".zed.toml", "file1-renamed", "file3", "file4"]
1651 )
1652 });
1653
1654 // Ensure buffer files are updated as well.
1655 buffer_a
1656 .condition(&cx_a, |buf, _| {
1657 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1658 })
1659 .await;
1660 buffer_b
1661 .condition(&cx_b, |buf, _| {
1662 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1663 })
1664 .await;
1665 buffer_c
1666 .condition(&cx_c, |buf, _| {
1667 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1668 })
1669 .await;
1670 }
1671
1672 #[gpui::test]
1673 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1674 cx_a.foreground().forbid_parking();
1675 let lang_registry = Arc::new(LanguageRegistry::new());
1676 let fs = Arc::new(FakeFs::new(cx_a.background()));
1677
1678 // Connect to a server as 2 clients.
1679 let mut server = TestServer::start(cx_a.foreground()).await;
1680 let client_a = server.create_client(&mut cx_a, "user_a").await;
1681 let client_b = server.create_client(&mut cx_b, "user_b").await;
1682
1683 // Share a project as client A
1684 fs.insert_tree(
1685 "/dir",
1686 json!({
1687 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1688 "a.txt": "a-contents",
1689 }),
1690 )
1691 .await;
1692
1693 let project_a = cx_a.update(|cx| {
1694 Project::local(
1695 client_a.clone(),
1696 client_a.user_store.clone(),
1697 lang_registry.clone(),
1698 fs.clone(),
1699 cx,
1700 )
1701 });
1702 let (worktree_a, _) = project_a
1703 .update(&mut cx_a, |p, cx| {
1704 p.find_or_create_local_worktree("/dir", false, cx)
1705 })
1706 .await
1707 .unwrap();
1708 worktree_a
1709 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1710 .await;
1711 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1712 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1713 project_a
1714 .update(&mut cx_a, |p, cx| p.share(cx))
1715 .await
1716 .unwrap();
1717
1718 // Join that project as client B
1719 let project_b = Project::remote(
1720 project_id,
1721 client_b.clone(),
1722 client_b.user_store.clone(),
1723 lang_registry.clone(),
1724 fs.clone(),
1725 &mut cx_b.to_async(),
1726 )
1727 .await
1728 .unwrap();
1729 let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1730
1731 // Open a buffer as client B
1732 let buffer_b = project_b
1733 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1734 .await
1735 .unwrap();
1736 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1737
1738 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1739 buffer_b.read_with(&cx_b, |buf, _| {
1740 assert!(buf.is_dirty());
1741 assert!(!buf.has_conflict());
1742 });
1743
1744 buffer_b
1745 .update(&mut cx_b, |buf, cx| buf.save(cx))
1746 .await
1747 .unwrap();
1748 worktree_b
1749 .condition(&cx_b, |_, cx| {
1750 buffer_b.read(cx).file().unwrap().mtime() != mtime
1751 })
1752 .await;
1753 buffer_b.read_with(&cx_b, |buf, _| {
1754 assert!(!buf.is_dirty());
1755 assert!(!buf.has_conflict());
1756 });
1757
1758 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1759 buffer_b.read_with(&cx_b, |buf, _| {
1760 assert!(buf.is_dirty());
1761 assert!(!buf.has_conflict());
1762 });
1763 }
1764
1765 #[gpui::test]
1766 async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1767 cx_a.foreground().forbid_parking();
1768 let lang_registry = Arc::new(LanguageRegistry::new());
1769 let fs = Arc::new(FakeFs::new(cx_a.background()));
1770
1771 // Connect to a server as 2 clients.
1772 let mut server = TestServer::start(cx_a.foreground()).await;
1773 let client_a = server.create_client(&mut cx_a, "user_a").await;
1774 let client_b = server.create_client(&mut cx_b, "user_b").await;
1775
1776 // Share a project as client A
1777 fs.insert_tree(
1778 "/dir",
1779 json!({
1780 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1781 "a.txt": "a-contents",
1782 }),
1783 )
1784 .await;
1785
1786 let project_a = cx_a.update(|cx| {
1787 Project::local(
1788 client_a.clone(),
1789 client_a.user_store.clone(),
1790 lang_registry.clone(),
1791 fs.clone(),
1792 cx,
1793 )
1794 });
1795 let (worktree_a, _) = project_a
1796 .update(&mut cx_a, |p, cx| {
1797 p.find_or_create_local_worktree("/dir", false, cx)
1798 })
1799 .await
1800 .unwrap();
1801 worktree_a
1802 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1803 .await;
1804 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1805 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1806 project_a
1807 .update(&mut cx_a, |p, cx| p.share(cx))
1808 .await
1809 .unwrap();
1810
1811 // Join that project as client B
1812 let project_b = Project::remote(
1813 project_id,
1814 client_b.clone(),
1815 client_b.user_store.clone(),
1816 lang_registry.clone(),
1817 fs.clone(),
1818 &mut cx_b.to_async(),
1819 )
1820 .await
1821 .unwrap();
1822 let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1823
1824 // Open a buffer as client B
1825 let buffer_b = project_b
1826 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1827 .await
1828 .unwrap();
1829 buffer_b.read_with(&cx_b, |buf, _| {
1830 assert!(!buf.is_dirty());
1831 assert!(!buf.has_conflict());
1832 });
1833
1834 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1835 .await
1836 .unwrap();
1837 buffer_b
1838 .condition(&cx_b, |buf, _| {
1839 buf.text() == "new contents" && !buf.is_dirty()
1840 })
1841 .await;
1842 buffer_b.read_with(&cx_b, |buf, _| {
1843 assert!(!buf.has_conflict());
1844 });
1845 }
1846
1847 #[gpui::test(iterations = 100)]
1848 async fn test_editing_while_guest_opens_buffer(
1849 mut cx_a: TestAppContext,
1850 mut cx_b: TestAppContext,
1851 ) {
1852 cx_a.foreground().forbid_parking();
1853 let lang_registry = Arc::new(LanguageRegistry::new());
1854 let fs = Arc::new(FakeFs::new(cx_a.background()));
1855
1856 // Connect to a server as 2 clients.
1857 let mut server = TestServer::start(cx_a.foreground()).await;
1858 let client_a = server.create_client(&mut cx_a, "user_a").await;
1859 let client_b = server.create_client(&mut cx_b, "user_b").await;
1860
1861 // Share a project as client A
1862 fs.insert_tree(
1863 "/dir",
1864 json!({
1865 ".zed.toml": r#"collaborators = ["user_b"]"#,
1866 "a.txt": "a-contents",
1867 }),
1868 )
1869 .await;
1870 let project_a = cx_a.update(|cx| {
1871 Project::local(
1872 client_a.clone(),
1873 client_a.user_store.clone(),
1874 lang_registry.clone(),
1875 fs.clone(),
1876 cx,
1877 )
1878 });
1879 let (worktree_a, _) = project_a
1880 .update(&mut cx_a, |p, cx| {
1881 p.find_or_create_local_worktree("/dir", false, cx)
1882 })
1883 .await
1884 .unwrap();
1885 worktree_a
1886 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1887 .await;
1888 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1889 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1890 project_a
1891 .update(&mut cx_a, |p, cx| p.share(cx))
1892 .await
1893 .unwrap();
1894
1895 // Join that project as client B
1896 let project_b = Project::remote(
1897 project_id,
1898 client_b.clone(),
1899 client_b.user_store.clone(),
1900 lang_registry.clone(),
1901 fs.clone(),
1902 &mut cx_b.to_async(),
1903 )
1904 .await
1905 .unwrap();
1906
1907 // Open a buffer as client A
1908 let buffer_a = project_a
1909 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1910 .await
1911 .unwrap();
1912
1913 // Start opening the same buffer as client B
1914 let buffer_b = cx_b
1915 .background()
1916 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1917 task::yield_now().await;
1918
1919 // Edit the buffer as client A while client B is still opening it.
1920 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1921
1922 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1923 let buffer_b = buffer_b.await.unwrap();
1924 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1925 }
1926
1927 #[gpui::test]
1928 async fn test_leaving_worktree_while_opening_buffer(
1929 mut cx_a: TestAppContext,
1930 mut cx_b: TestAppContext,
1931 ) {
1932 cx_a.foreground().forbid_parking();
1933 let lang_registry = Arc::new(LanguageRegistry::new());
1934 let fs = Arc::new(FakeFs::new(cx_a.background()));
1935
1936 // Connect to a server as 2 clients.
1937 let mut server = TestServer::start(cx_a.foreground()).await;
1938 let client_a = server.create_client(&mut cx_a, "user_a").await;
1939 let client_b = server.create_client(&mut cx_b, "user_b").await;
1940
1941 // Share a project as client A
1942 fs.insert_tree(
1943 "/dir",
1944 json!({
1945 ".zed.toml": r#"collaborators = ["user_b"]"#,
1946 "a.txt": "a-contents",
1947 }),
1948 )
1949 .await;
1950 let project_a = cx_a.update(|cx| {
1951 Project::local(
1952 client_a.clone(),
1953 client_a.user_store.clone(),
1954 lang_registry.clone(),
1955 fs.clone(),
1956 cx,
1957 )
1958 });
1959 let (worktree_a, _) = project_a
1960 .update(&mut cx_a, |p, cx| {
1961 p.find_or_create_local_worktree("/dir", false, cx)
1962 })
1963 .await
1964 .unwrap();
1965 worktree_a
1966 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1967 .await;
1968 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1969 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1970 project_a
1971 .update(&mut cx_a, |p, cx| p.share(cx))
1972 .await
1973 .unwrap();
1974
1975 // Join that project as client B
1976 let project_b = Project::remote(
1977 project_id,
1978 client_b.clone(),
1979 client_b.user_store.clone(),
1980 lang_registry.clone(),
1981 fs.clone(),
1982 &mut cx_b.to_async(),
1983 )
1984 .await
1985 .unwrap();
1986
1987 // See that a guest has joined as client A.
1988 project_a
1989 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1990 .await;
1991
1992 // Begin opening a buffer as client B, but leave the project before the open completes.
1993 let buffer_b = cx_b
1994 .background()
1995 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1996 cx_b.update(|_| drop(project_b));
1997 drop(buffer_b);
1998
1999 // See that the guest has left.
2000 project_a
2001 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2002 .await;
2003 }
2004
2005 #[gpui::test]
2006 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2007 cx_a.foreground().forbid_parking();
2008 let lang_registry = Arc::new(LanguageRegistry::new());
2009 let fs = Arc::new(FakeFs::new(cx_a.background()));
2010
2011 // Connect to a server as 2 clients.
2012 let mut server = TestServer::start(cx_a.foreground()).await;
2013 let client_a = server.create_client(&mut cx_a, "user_a").await;
2014 let client_b = server.create_client(&mut cx_b, "user_b").await;
2015
2016 // Share a project as client A
2017 fs.insert_tree(
2018 "/a",
2019 json!({
2020 ".zed.toml": r#"collaborators = ["user_b"]"#,
2021 "a.txt": "a-contents",
2022 "b.txt": "b-contents",
2023 }),
2024 )
2025 .await;
2026 let project_a = cx_a.update(|cx| {
2027 Project::local(
2028 client_a.clone(),
2029 client_a.user_store.clone(),
2030 lang_registry.clone(),
2031 fs.clone(),
2032 cx,
2033 )
2034 });
2035 let (worktree_a, _) = project_a
2036 .update(&mut cx_a, |p, cx| {
2037 p.find_or_create_local_worktree("/a", false, cx)
2038 })
2039 .await
2040 .unwrap();
2041 worktree_a
2042 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2043 .await;
2044 let project_id = project_a
2045 .update(&mut cx_a, |project, _| project.next_remote_id())
2046 .await;
2047 project_a
2048 .update(&mut cx_a, |project, cx| project.share(cx))
2049 .await
2050 .unwrap();
2051
2052 // Join that project as client B
2053 let _project_b = Project::remote(
2054 project_id,
2055 client_b.clone(),
2056 client_b.user_store.clone(),
2057 lang_registry.clone(),
2058 fs.clone(),
2059 &mut cx_b.to_async(),
2060 )
2061 .await
2062 .unwrap();
2063
2064 // See that a guest has joined as client A.
2065 project_a
2066 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2067 .await;
2068
2069 // Drop client B's connection and ensure client A observes client B leaving the worktree.
2070 client_b.disconnect(&cx_b.to_async()).unwrap();
2071 project_a
2072 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2073 .await;
2074 }
2075
2076 #[gpui::test]
2077 async fn test_collaborating_with_diagnostics(
2078 mut cx_a: TestAppContext,
2079 mut cx_b: TestAppContext,
2080 ) {
2081 cx_a.foreground().forbid_parking();
2082 let mut lang_registry = Arc::new(LanguageRegistry::new());
2083 let fs = Arc::new(FakeFs::new(cx_a.background()));
2084
2085 // Set up a fake language server.
2086 let (language_server_config, mut fake_language_server) =
2087 LanguageServerConfig::fake(cx_a.background()).await;
2088 Arc::get_mut(&mut lang_registry)
2089 .unwrap()
2090 .add(Arc::new(Language::new(
2091 LanguageConfig {
2092 name: "Rust".to_string(),
2093 path_suffixes: vec!["rs".to_string()],
2094 language_server: Some(language_server_config),
2095 ..Default::default()
2096 },
2097 Some(tree_sitter_rust::language()),
2098 )));
2099
2100 // Connect to a server as 2 clients.
2101 let mut server = TestServer::start(cx_a.foreground()).await;
2102 let client_a = server.create_client(&mut cx_a, "user_a").await;
2103 let client_b = server.create_client(&mut cx_b, "user_b").await;
2104
2105 // Share a project as client A
2106 fs.insert_tree(
2107 "/a",
2108 json!({
2109 ".zed.toml": r#"collaborators = ["user_b"]"#,
2110 "a.rs": "let one = two",
2111 "other.rs": "",
2112 }),
2113 )
2114 .await;
2115 let project_a = cx_a.update(|cx| {
2116 Project::local(
2117 client_a.clone(),
2118 client_a.user_store.clone(),
2119 lang_registry.clone(),
2120 fs.clone(),
2121 cx,
2122 )
2123 });
2124 let (worktree_a, _) = project_a
2125 .update(&mut cx_a, |p, cx| {
2126 p.find_or_create_local_worktree("/a", false, cx)
2127 })
2128 .await
2129 .unwrap();
2130 worktree_a
2131 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2132 .await;
2133 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2134 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2135 project_a
2136 .update(&mut cx_a, |p, cx| p.share(cx))
2137 .await
2138 .unwrap();
2139
2140 // Cause the language server to start.
2141 let _ = cx_a
2142 .background()
2143 .spawn(project_a.update(&mut cx_a, |project, cx| {
2144 project.open_buffer(
2145 ProjectPath {
2146 worktree_id,
2147 path: Path::new("other.rs").into(),
2148 },
2149 cx,
2150 )
2151 }))
2152 .await
2153 .unwrap();
2154
2155 // Simulate a language server reporting errors for a file.
2156 fake_language_server
2157 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2158 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2159 version: None,
2160 diagnostics: vec![lsp::Diagnostic {
2161 severity: Some(lsp::DiagnosticSeverity::ERROR),
2162 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2163 message: "message 1".to_string(),
2164 ..Default::default()
2165 }],
2166 })
2167 .await;
2168
2169 // Wait for server to see the diagnostics update.
2170 server
2171 .condition(|store| {
2172 let worktree = store
2173 .project(project_id)
2174 .unwrap()
2175 .worktrees
2176 .get(&worktree_id.to_proto())
2177 .unwrap();
2178
2179 !worktree
2180 .share
2181 .as_ref()
2182 .unwrap()
2183 .diagnostic_summaries
2184 .is_empty()
2185 })
2186 .await;
2187
2188 // Join the worktree as client B.
2189 let project_b = Project::remote(
2190 project_id,
2191 client_b.clone(),
2192 client_b.user_store.clone(),
2193 lang_registry.clone(),
2194 fs.clone(),
2195 &mut cx_b.to_async(),
2196 )
2197 .await
2198 .unwrap();
2199
2200 project_b.read_with(&cx_b, |project, cx| {
2201 assert_eq!(
2202 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2203 &[(
2204 ProjectPath {
2205 worktree_id,
2206 path: Arc::from(Path::new("a.rs")),
2207 },
2208 DiagnosticSummary {
2209 error_count: 1,
2210 warning_count: 0,
2211 ..Default::default()
2212 },
2213 )]
2214 )
2215 });
2216
2217 // Simulate a language server reporting more errors for a file.
2218 fake_language_server
2219 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2220 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2221 version: None,
2222 diagnostics: vec![
2223 lsp::Diagnostic {
2224 severity: Some(lsp::DiagnosticSeverity::ERROR),
2225 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2226 message: "message 1".to_string(),
2227 ..Default::default()
2228 },
2229 lsp::Diagnostic {
2230 severity: Some(lsp::DiagnosticSeverity::WARNING),
2231 range: lsp::Range::new(
2232 lsp::Position::new(0, 10),
2233 lsp::Position::new(0, 13),
2234 ),
2235 message: "message 2".to_string(),
2236 ..Default::default()
2237 },
2238 ],
2239 })
2240 .await;
2241
2242 // Client b gets the updated summaries
2243 project_b
2244 .condition(&cx_b, |project, cx| {
2245 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2246 == &[(
2247 ProjectPath {
2248 worktree_id,
2249 path: Arc::from(Path::new("a.rs")),
2250 },
2251 DiagnosticSummary {
2252 error_count: 1,
2253 warning_count: 1,
2254 ..Default::default()
2255 },
2256 )]
2257 })
2258 .await;
2259
2260 // Open the file with the errors on client B. They should be present.
2261 let buffer_b = cx_b
2262 .background()
2263 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2264 .await
2265 .unwrap();
2266
2267 buffer_b.read_with(&cx_b, |buffer, _| {
2268 assert_eq!(
2269 buffer
2270 .snapshot()
2271 .diagnostics_in_range::<_, Point>(0..buffer.len())
2272 .map(|entry| entry)
2273 .collect::<Vec<_>>(),
2274 &[
2275 DiagnosticEntry {
2276 range: Point::new(0, 4)..Point::new(0, 7),
2277 diagnostic: Diagnostic {
2278 group_id: 0,
2279 message: "message 1".to_string(),
2280 severity: lsp::DiagnosticSeverity::ERROR,
2281 is_primary: true,
2282 ..Default::default()
2283 }
2284 },
2285 DiagnosticEntry {
2286 range: Point::new(0, 10)..Point::new(0, 13),
2287 diagnostic: Diagnostic {
2288 group_id: 1,
2289 severity: lsp::DiagnosticSeverity::WARNING,
2290 message: "message 2".to_string(),
2291 is_primary: true,
2292 ..Default::default()
2293 }
2294 }
2295 ]
2296 );
2297 });
2298 }
2299
2300 #[gpui::test]
2301 async fn test_collaborating_with_completion(
2302 mut cx_a: TestAppContext,
2303 mut cx_b: TestAppContext,
2304 ) {
2305 cx_a.foreground().forbid_parking();
2306 let mut lang_registry = Arc::new(LanguageRegistry::new());
2307 let fs = Arc::new(FakeFs::new(cx_a.background()));
2308
2309 // Set up a fake language server.
2310 let (language_server_config, mut fake_language_server) =
2311 LanguageServerConfig::fake_with_capabilities(
2312 lsp::ServerCapabilities {
2313 completion_provider: Some(lsp::CompletionOptions {
2314 trigger_characters: Some(vec![".".to_string()]),
2315 ..Default::default()
2316 }),
2317 ..Default::default()
2318 },
2319 cx_a.background(),
2320 )
2321 .await;
2322 Arc::get_mut(&mut lang_registry)
2323 .unwrap()
2324 .add(Arc::new(Language::new(
2325 LanguageConfig {
2326 name: "Rust".to_string(),
2327 path_suffixes: vec!["rs".to_string()],
2328 language_server: Some(language_server_config),
2329 ..Default::default()
2330 },
2331 Some(tree_sitter_rust::language()),
2332 )));
2333
2334 // Connect to a server as 2 clients.
2335 let mut server = TestServer::start(cx_a.foreground()).await;
2336 let client_a = server.create_client(&mut cx_a, "user_a").await;
2337 let client_b = server.create_client(&mut cx_b, "user_b").await;
2338
2339 // Share a project as client A
2340 fs.insert_tree(
2341 "/a",
2342 json!({
2343 ".zed.toml": r#"collaborators = ["user_b"]"#,
2344 "main.rs": "fn main() { a }",
2345 "other.rs": "",
2346 }),
2347 )
2348 .await;
2349 let project_a = cx_a.update(|cx| {
2350 Project::local(
2351 client_a.clone(),
2352 client_a.user_store.clone(),
2353 lang_registry.clone(),
2354 fs.clone(),
2355 cx,
2356 )
2357 });
2358 let (worktree_a, _) = project_a
2359 .update(&mut cx_a, |p, cx| {
2360 p.find_or_create_local_worktree("/a", false, cx)
2361 })
2362 .await
2363 .unwrap();
2364 worktree_a
2365 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2366 .await;
2367 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2368 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2369 project_a
2370 .update(&mut cx_a, |p, cx| p.share(cx))
2371 .await
2372 .unwrap();
2373
2374 // Join the worktree as client B.
2375 let project_b = Project::remote(
2376 project_id,
2377 client_b.clone(),
2378 client_b.user_store.clone(),
2379 lang_registry.clone(),
2380 fs.clone(),
2381 &mut cx_b.to_async(),
2382 )
2383 .await
2384 .unwrap();
2385
2386 // Open a file in an editor as the guest.
2387 let buffer_b = project_b
2388 .update(&mut cx_b, |p, cx| {
2389 p.open_buffer((worktree_id, "main.rs"), cx)
2390 })
2391 .await
2392 .unwrap();
2393 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2394 let editor_b = cx_b.add_view(window_b, |cx| {
2395 Editor::for_buffer(
2396 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2397 Arc::new(|cx| EditorSettings::test(cx)),
2398 cx,
2399 )
2400 });
2401
2402 // Type a completion trigger character as the guest.
2403 editor_b.update(&mut cx_b, |editor, cx| {
2404 editor.select_ranges([13..13], None, cx);
2405 editor.handle_input(&Input(".".into()), cx);
2406 cx.focus(&editor_b);
2407 });
2408
2409 // Receive a completion request as the host's language server.
2410 let (request_id, params) = fake_language_server
2411 .receive_request::<lsp::request::Completion>()
2412 .await;
2413 assert_eq!(
2414 params.text_document_position.text_document.uri,
2415 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2416 );
2417 assert_eq!(
2418 params.text_document_position.position,
2419 lsp::Position::new(0, 14),
2420 );
2421
2422 // Return some completions from the host's language server.
2423 fake_language_server
2424 .respond(
2425 request_id,
2426 Some(lsp::CompletionResponse::Array(vec![
2427 lsp::CompletionItem {
2428 label: "first_method(…)".into(),
2429 detail: Some("fn(&mut self, B) -> C".into()),
2430 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2431 new_text: "first_method($1)".to_string(),
2432 range: lsp::Range::new(
2433 lsp::Position::new(0, 14),
2434 lsp::Position::new(0, 14),
2435 ),
2436 })),
2437 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2438 ..Default::default()
2439 },
2440 lsp::CompletionItem {
2441 label: "second_method(…)".into(),
2442 detail: Some("fn(&mut self, C) -> D<E>".into()),
2443 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2444 new_text: "second_method()".to_string(),
2445 range: lsp::Range::new(
2446 lsp::Position::new(0, 14),
2447 lsp::Position::new(0, 14),
2448 ),
2449 })),
2450 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2451 ..Default::default()
2452 },
2453 ])),
2454 )
2455 .await;
2456
2457 // Open the buffer on the host.
2458 let buffer_a = project_a
2459 .update(&mut cx_a, |p, cx| {
2460 p.open_buffer((worktree_id, "main.rs"), cx)
2461 })
2462 .await
2463 .unwrap();
2464 buffer_a
2465 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2466 .await;
2467
2468 // Confirm a completion on the guest.
2469 editor_b.next_notification(&cx_b).await;
2470 editor_b.update(&mut cx_b, |editor, cx| {
2471 assert!(editor.has_completions());
2472 editor.confirm_completion(Some(0), cx);
2473 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2474 });
2475
2476 buffer_a
2477 .condition(&cx_a, |buffer, _| {
2478 buffer.text() == "fn main() { a.first_method() }"
2479 })
2480 .await;
2481
2482 // Receive a request resolve the selected completion on the host's language server.
2483 let (request_id, params) = fake_language_server
2484 .receive_request::<lsp::request::ResolveCompletionItem>()
2485 .await;
2486 assert_eq!(params.label, "first_method(…)");
2487
2488 // Return a resolved completion from the host's language server.
2489 // The resolved completion has an additional text edit.
2490 fake_language_server
2491 .respond(
2492 request_id,
2493 lsp::CompletionItem {
2494 label: "first_method(…)".into(),
2495 detail: Some("fn(&mut self, B) -> C".into()),
2496 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2497 new_text: "first_method($1)".to_string(),
2498 range: lsp::Range::new(
2499 lsp::Position::new(0, 14),
2500 lsp::Position::new(0, 14),
2501 ),
2502 })),
2503 additional_text_edits: Some(vec![lsp::TextEdit {
2504 new_text: "use d::SomeTrait;\n".to_string(),
2505 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2506 }]),
2507 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2508 ..Default::default()
2509 },
2510 )
2511 .await;
2512
2513 // The additional edit is applied.
2514 buffer_b
2515 .condition(&cx_b, |buffer, _| {
2516 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2517 })
2518 .await;
2519 assert_eq!(
2520 buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2521 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2522 );
2523 }
2524
2525 #[gpui::test]
2526 async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2527 cx_a.foreground().forbid_parking();
2528 let mut lang_registry = Arc::new(LanguageRegistry::new());
2529 let fs = Arc::new(FakeFs::new(cx_a.background()));
2530
2531 // Set up a fake language server.
2532 let (language_server_config, mut fake_language_server) =
2533 LanguageServerConfig::fake(cx_a.background()).await;
2534 Arc::get_mut(&mut lang_registry)
2535 .unwrap()
2536 .add(Arc::new(Language::new(
2537 LanguageConfig {
2538 name: "Rust".to_string(),
2539 path_suffixes: vec!["rs".to_string()],
2540 language_server: Some(language_server_config),
2541 ..Default::default()
2542 },
2543 Some(tree_sitter_rust::language()),
2544 )));
2545
2546 // Connect to a server as 2 clients.
2547 let mut server = TestServer::start(cx_a.foreground()).await;
2548 let client_a = server.create_client(&mut cx_a, "user_a").await;
2549 let client_b = server.create_client(&mut cx_b, "user_b").await;
2550
2551 // Share a project as client A
2552 fs.insert_tree(
2553 "/a",
2554 json!({
2555 ".zed.toml": r#"collaborators = ["user_b"]"#,
2556 "a.rs": "let one = two",
2557 }),
2558 )
2559 .await;
2560 let project_a = cx_a.update(|cx| {
2561 Project::local(
2562 client_a.clone(),
2563 client_a.user_store.clone(),
2564 lang_registry.clone(),
2565 fs.clone(),
2566 cx,
2567 )
2568 });
2569 let (worktree_a, _) = project_a
2570 .update(&mut cx_a, |p, cx| {
2571 p.find_or_create_local_worktree("/a", false, cx)
2572 })
2573 .await
2574 .unwrap();
2575 worktree_a
2576 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2577 .await;
2578 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2579 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2580 project_a
2581 .update(&mut cx_a, |p, cx| p.share(cx))
2582 .await
2583 .unwrap();
2584
2585 // Join the worktree as client B.
2586 let project_b = Project::remote(
2587 project_id,
2588 client_b.clone(),
2589 client_b.user_store.clone(),
2590 lang_registry.clone(),
2591 fs.clone(),
2592 &mut cx_b.to_async(),
2593 )
2594 .await
2595 .unwrap();
2596
2597 let buffer_b = cx_b
2598 .background()
2599 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2600 .await
2601 .unwrap();
2602
2603 let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
2604 let (request_id, _) = fake_language_server
2605 .receive_request::<lsp::request::Formatting>()
2606 .await;
2607 fake_language_server
2608 .respond(
2609 request_id,
2610 Some(vec![
2611 lsp::TextEdit {
2612 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2613 new_text: "h".to_string(),
2614 },
2615 lsp::TextEdit {
2616 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2617 new_text: "y".to_string(),
2618 },
2619 ]),
2620 )
2621 .await;
2622 format.await.unwrap();
2623 assert_eq!(
2624 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2625 "let honey = two"
2626 );
2627 }
2628
2629 #[gpui::test]
2630 async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2631 cx_a.foreground().forbid_parking();
2632 let mut lang_registry = Arc::new(LanguageRegistry::new());
2633 let fs = Arc::new(FakeFs::new(cx_a.background()));
2634 fs.insert_tree(
2635 "/root-1",
2636 json!({
2637 ".zed.toml": r#"collaborators = ["user_b"]"#,
2638 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2639 }),
2640 )
2641 .await;
2642 fs.insert_tree(
2643 "/root-2",
2644 json!({
2645 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2646 }),
2647 )
2648 .await;
2649
2650 // Set up a fake language server.
2651 let (language_server_config, mut fake_language_server) =
2652 LanguageServerConfig::fake(cx_a.background()).await;
2653 Arc::get_mut(&mut lang_registry)
2654 .unwrap()
2655 .add(Arc::new(Language::new(
2656 LanguageConfig {
2657 name: "Rust".to_string(),
2658 path_suffixes: vec!["rs".to_string()],
2659 language_server: Some(language_server_config),
2660 ..Default::default()
2661 },
2662 Some(tree_sitter_rust::language()),
2663 )));
2664
2665 // Connect to a server as 2 clients.
2666 let mut server = TestServer::start(cx_a.foreground()).await;
2667 let client_a = server.create_client(&mut cx_a, "user_a").await;
2668 let client_b = server.create_client(&mut cx_b, "user_b").await;
2669
2670 // Share a project as client A
2671 let project_a = cx_a.update(|cx| {
2672 Project::local(
2673 client_a.clone(),
2674 client_a.user_store.clone(),
2675 lang_registry.clone(),
2676 fs.clone(),
2677 cx,
2678 )
2679 });
2680 let (worktree_a, _) = project_a
2681 .update(&mut cx_a, |p, cx| {
2682 p.find_or_create_local_worktree("/root-1", false, cx)
2683 })
2684 .await
2685 .unwrap();
2686 worktree_a
2687 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2688 .await;
2689 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2690 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2691 project_a
2692 .update(&mut cx_a, |p, cx| p.share(cx))
2693 .await
2694 .unwrap();
2695
2696 // Join the worktree as client B.
2697 let project_b = Project::remote(
2698 project_id,
2699 client_b.clone(),
2700 client_b.user_store.clone(),
2701 lang_registry.clone(),
2702 fs.clone(),
2703 &mut cx_b.to_async(),
2704 )
2705 .await
2706 .unwrap();
2707
2708 // Open the file to be formatted on client B.
2709 let buffer_b = cx_b
2710 .background()
2711 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2712 .await
2713 .unwrap();
2714
2715 let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2716 let (request_id, _) = fake_language_server
2717 .receive_request::<lsp::request::GotoDefinition>()
2718 .await;
2719 fake_language_server
2720 .respond(
2721 request_id,
2722 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2723 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2724 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2725 ))),
2726 )
2727 .await;
2728 let definitions_1 = definitions_1.await.unwrap();
2729 cx_b.read(|cx| {
2730 assert_eq!(definitions_1.len(), 1);
2731 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2732 let target_buffer = definitions_1[0].target_buffer.read(cx);
2733 assert_eq!(
2734 target_buffer.text(),
2735 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2736 );
2737 assert_eq!(
2738 definitions_1[0].target_range.to_point(target_buffer),
2739 Point::new(0, 6)..Point::new(0, 9)
2740 );
2741 });
2742
2743 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2744 // the previous call to `definition`.
2745 let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2746 let (request_id, _) = fake_language_server
2747 .receive_request::<lsp::request::GotoDefinition>()
2748 .await;
2749 fake_language_server
2750 .respond(
2751 request_id,
2752 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2753 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2754 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2755 ))),
2756 )
2757 .await;
2758 let definitions_2 = definitions_2.await.unwrap();
2759 cx_b.read(|cx| {
2760 assert_eq!(definitions_2.len(), 1);
2761 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2762 let target_buffer = definitions_2[0].target_buffer.read(cx);
2763 assert_eq!(
2764 target_buffer.text(),
2765 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2766 );
2767 assert_eq!(
2768 definitions_2[0].target_range.to_point(target_buffer),
2769 Point::new(1, 6)..Point::new(1, 11)
2770 );
2771 });
2772 assert_eq!(
2773 definitions_1[0].target_buffer,
2774 definitions_2[0].target_buffer
2775 );
2776
2777 cx_b.update(|_| {
2778 drop(definitions_1);
2779 drop(definitions_2);
2780 });
2781 project_b
2782 .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2783 .await;
2784 }
2785
2786 #[gpui::test]
2787 async fn test_open_buffer_while_getting_definition_pointing_to_it(
2788 mut cx_a: TestAppContext,
2789 mut cx_b: TestAppContext,
2790 mut rng: StdRng,
2791 ) {
2792 cx_a.foreground().forbid_parking();
2793 let mut lang_registry = Arc::new(LanguageRegistry::new());
2794 let fs = Arc::new(FakeFs::new(cx_a.background()));
2795 fs.insert_tree(
2796 "/root",
2797 json!({
2798 ".zed.toml": r#"collaborators = ["user_b"]"#,
2799 "a.rs": "const ONE: usize = b::TWO;",
2800 "b.rs": "const TWO: usize = 2",
2801 }),
2802 )
2803 .await;
2804
2805 // Set up a fake language server.
2806 let (language_server_config, mut fake_language_server) =
2807 LanguageServerConfig::fake(cx_a.background()).await;
2808 Arc::get_mut(&mut lang_registry)
2809 .unwrap()
2810 .add(Arc::new(Language::new(
2811 LanguageConfig {
2812 name: "Rust".to_string(),
2813 path_suffixes: vec!["rs".to_string()],
2814 language_server: Some(language_server_config),
2815 ..Default::default()
2816 },
2817 Some(tree_sitter_rust::language()),
2818 )));
2819
2820 // Connect to a server as 2 clients.
2821 let mut server = TestServer::start(cx_a.foreground()).await;
2822 let client_a = server.create_client(&mut cx_a, "user_a").await;
2823 let client_b = server.create_client(&mut cx_b, "user_b").await;
2824
2825 // Share a project as client A
2826 let project_a = cx_a.update(|cx| {
2827 Project::local(
2828 client_a.clone(),
2829 client_a.user_store.clone(),
2830 lang_registry.clone(),
2831 fs.clone(),
2832 cx,
2833 )
2834 });
2835 let (worktree_a, _) = project_a
2836 .update(&mut cx_a, |p, cx| {
2837 p.find_or_create_local_worktree("/root", false, cx)
2838 })
2839 .await
2840 .unwrap();
2841 worktree_a
2842 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2843 .await;
2844 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2845 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2846 project_a
2847 .update(&mut cx_a, |p, cx| p.share(cx))
2848 .await
2849 .unwrap();
2850
2851 // Join the worktree as client B.
2852 let project_b = Project::remote(
2853 project_id,
2854 client_b.clone(),
2855 client_b.user_store.clone(),
2856 lang_registry.clone(),
2857 fs.clone(),
2858 &mut cx_b.to_async(),
2859 )
2860 .await
2861 .unwrap();
2862
2863 let buffer_b1 = cx_b
2864 .background()
2865 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2866 .await
2867 .unwrap();
2868
2869 let definitions;
2870 let buffer_b2;
2871 if rng.gen() {
2872 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2873 buffer_b2 =
2874 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2875 } else {
2876 buffer_b2 =
2877 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2878 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2879 }
2880
2881 let (request_id, _) = fake_language_server
2882 .receive_request::<lsp::request::GotoDefinition>()
2883 .await;
2884 fake_language_server
2885 .respond(
2886 request_id,
2887 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2888 lsp::Url::from_file_path("/root/b.rs").unwrap(),
2889 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2890 ))),
2891 )
2892 .await;
2893
2894 let buffer_b2 = buffer_b2.await.unwrap();
2895 let definitions = definitions.await.unwrap();
2896 assert_eq!(definitions.len(), 1);
2897 assert_eq!(definitions[0].target_buffer, buffer_b2);
2898 }
2899
2900 #[gpui::test]
2901 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2902 cx_a.foreground().forbid_parking();
2903
2904 // Connect to a server as 2 clients.
2905 let mut server = TestServer::start(cx_a.foreground()).await;
2906 let client_a = server.create_client(&mut cx_a, "user_a").await;
2907 let client_b = server.create_client(&mut cx_b, "user_b").await;
2908
2909 // Create an org that includes these 2 users.
2910 let db = &server.app_state.db;
2911 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2912 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2913 .await
2914 .unwrap();
2915 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2916 .await
2917 .unwrap();
2918
2919 // Create a channel that includes all the users.
2920 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2921 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2922 .await
2923 .unwrap();
2924 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2925 .await
2926 .unwrap();
2927 db.create_channel_message(
2928 channel_id,
2929 client_b.current_user_id(&cx_b),
2930 "hello A, it's B.",
2931 OffsetDateTime::now_utc(),
2932 1,
2933 )
2934 .await
2935 .unwrap();
2936
2937 let channels_a = cx_a
2938 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2939 channels_a
2940 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2941 .await;
2942 channels_a.read_with(&cx_a, |list, _| {
2943 assert_eq!(
2944 list.available_channels().unwrap(),
2945 &[ChannelDetails {
2946 id: channel_id.to_proto(),
2947 name: "test-channel".to_string()
2948 }]
2949 )
2950 });
2951 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2952 this.get_channel(channel_id.to_proto(), cx).unwrap()
2953 });
2954 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2955 channel_a
2956 .condition(&cx_a, |channel, _| {
2957 channel_messages(channel)
2958 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2959 })
2960 .await;
2961
2962 let channels_b = cx_b
2963 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2964 channels_b
2965 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2966 .await;
2967 channels_b.read_with(&cx_b, |list, _| {
2968 assert_eq!(
2969 list.available_channels().unwrap(),
2970 &[ChannelDetails {
2971 id: channel_id.to_proto(),
2972 name: "test-channel".to_string()
2973 }]
2974 )
2975 });
2976
2977 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2978 this.get_channel(channel_id.to_proto(), cx).unwrap()
2979 });
2980 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2981 channel_b
2982 .condition(&cx_b, |channel, _| {
2983 channel_messages(channel)
2984 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2985 })
2986 .await;
2987
2988 channel_a
2989 .update(&mut cx_a, |channel, cx| {
2990 channel
2991 .send_message("oh, hi B.".to_string(), cx)
2992 .unwrap()
2993 .detach();
2994 let task = channel.send_message("sup".to_string(), cx).unwrap();
2995 assert_eq!(
2996 channel_messages(channel),
2997 &[
2998 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2999 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3000 ("user_a".to_string(), "sup".to_string(), true)
3001 ]
3002 );
3003 task
3004 })
3005 .await
3006 .unwrap();
3007
3008 channel_b
3009 .condition(&cx_b, |channel, _| {
3010 channel_messages(channel)
3011 == [
3012 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3013 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3014 ("user_a".to_string(), "sup".to_string(), false),
3015 ]
3016 })
3017 .await;
3018
3019 assert_eq!(
3020 server
3021 .state()
3022 .await
3023 .channel(channel_id)
3024 .unwrap()
3025 .connection_ids
3026 .len(),
3027 2
3028 );
3029 cx_b.update(|_| drop(channel_b));
3030 server
3031 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3032 .await;
3033
3034 cx_a.update(|_| drop(channel_a));
3035 server
3036 .condition(|state| state.channel(channel_id).is_none())
3037 .await;
3038 }
3039
3040 #[gpui::test]
3041 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3042 cx_a.foreground().forbid_parking();
3043
3044 let mut server = TestServer::start(cx_a.foreground()).await;
3045 let client_a = server.create_client(&mut cx_a, "user_a").await;
3046
3047 let db = &server.app_state.db;
3048 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3049 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3050 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3051 .await
3052 .unwrap();
3053 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3054 .await
3055 .unwrap();
3056
3057 let channels_a = cx_a
3058 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3059 channels_a
3060 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3061 .await;
3062 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3063 this.get_channel(channel_id.to_proto(), cx).unwrap()
3064 });
3065
3066 // Messages aren't allowed to be too long.
3067 channel_a
3068 .update(&mut cx_a, |channel, cx| {
3069 let long_body = "this is long.\n".repeat(1024);
3070 channel.send_message(long_body, cx).unwrap()
3071 })
3072 .await
3073 .unwrap_err();
3074
3075 // Messages aren't allowed to be blank.
3076 channel_a.update(&mut cx_a, |channel, cx| {
3077 channel.send_message(String::new(), cx).unwrap_err()
3078 });
3079
3080 // Leading and trailing whitespace are trimmed.
3081 channel_a
3082 .update(&mut cx_a, |channel, cx| {
3083 channel
3084 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3085 .unwrap()
3086 })
3087 .await
3088 .unwrap();
3089 assert_eq!(
3090 db.get_channel_messages(channel_id, 10, None)
3091 .await
3092 .unwrap()
3093 .iter()
3094 .map(|m| &m.body)
3095 .collect::<Vec<_>>(),
3096 &["surrounded by whitespace"]
3097 );
3098 }
3099
3100 #[gpui::test]
3101 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3102 cx_a.foreground().forbid_parking();
3103
3104 // Connect to a server as 2 clients.
3105 let mut server = TestServer::start(cx_a.foreground()).await;
3106 let client_a = server.create_client(&mut cx_a, "user_a").await;
3107 let client_b = server.create_client(&mut cx_b, "user_b").await;
3108 let mut status_b = client_b.status();
3109
3110 // Create an org that includes these 2 users.
3111 let db = &server.app_state.db;
3112 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3113 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3114 .await
3115 .unwrap();
3116 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3117 .await
3118 .unwrap();
3119
3120 // Create a channel that includes all the users.
3121 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3122 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3123 .await
3124 .unwrap();
3125 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3126 .await
3127 .unwrap();
3128 db.create_channel_message(
3129 channel_id,
3130 client_b.current_user_id(&cx_b),
3131 "hello A, it's B.",
3132 OffsetDateTime::now_utc(),
3133 2,
3134 )
3135 .await
3136 .unwrap();
3137
3138 let channels_a = cx_a
3139 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3140 channels_a
3141 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3142 .await;
3143
3144 channels_a.read_with(&cx_a, |list, _| {
3145 assert_eq!(
3146 list.available_channels().unwrap(),
3147 &[ChannelDetails {
3148 id: channel_id.to_proto(),
3149 name: "test-channel".to_string()
3150 }]
3151 )
3152 });
3153 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3154 this.get_channel(channel_id.to_proto(), cx).unwrap()
3155 });
3156 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3157 channel_a
3158 .condition(&cx_a, |channel, _| {
3159 channel_messages(channel)
3160 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3161 })
3162 .await;
3163
3164 let channels_b = cx_b
3165 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3166 channels_b
3167 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3168 .await;
3169 channels_b.read_with(&cx_b, |list, _| {
3170 assert_eq!(
3171 list.available_channels().unwrap(),
3172 &[ChannelDetails {
3173 id: channel_id.to_proto(),
3174 name: "test-channel".to_string()
3175 }]
3176 )
3177 });
3178
3179 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3180 this.get_channel(channel_id.to_proto(), cx).unwrap()
3181 });
3182 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3183 channel_b
3184 .condition(&cx_b, |channel, _| {
3185 channel_messages(channel)
3186 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3187 })
3188 .await;
3189
3190 // Disconnect client B, ensuring we can still access its cached channel data.
3191 server.forbid_connections();
3192 server.disconnect_client(client_b.current_user_id(&cx_b));
3193 while !matches!(
3194 status_b.next().await,
3195 Some(client::Status::ReconnectionError { .. })
3196 ) {}
3197
3198 channels_b.read_with(&cx_b, |channels, _| {
3199 assert_eq!(
3200 channels.available_channels().unwrap(),
3201 [ChannelDetails {
3202 id: channel_id.to_proto(),
3203 name: "test-channel".to_string()
3204 }]
3205 )
3206 });
3207 channel_b.read_with(&cx_b, |channel, _| {
3208 assert_eq!(
3209 channel_messages(channel),
3210 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3211 )
3212 });
3213
3214 // Send a message from client B while it is disconnected.
3215 channel_b
3216 .update(&mut cx_b, |channel, cx| {
3217 let task = channel
3218 .send_message("can you see this?".to_string(), cx)
3219 .unwrap();
3220 assert_eq!(
3221 channel_messages(channel),
3222 &[
3223 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3224 ("user_b".to_string(), "can you see this?".to_string(), true)
3225 ]
3226 );
3227 task
3228 })
3229 .await
3230 .unwrap_err();
3231
3232 // Send a message from client A while B is disconnected.
3233 channel_a
3234 .update(&mut cx_a, |channel, cx| {
3235 channel
3236 .send_message("oh, hi B.".to_string(), cx)
3237 .unwrap()
3238 .detach();
3239 let task = channel.send_message("sup".to_string(), cx).unwrap();
3240 assert_eq!(
3241 channel_messages(channel),
3242 &[
3243 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3244 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3245 ("user_a".to_string(), "sup".to_string(), true)
3246 ]
3247 );
3248 task
3249 })
3250 .await
3251 .unwrap();
3252
3253 // Give client B a chance to reconnect.
3254 server.allow_connections();
3255 cx_b.foreground().advance_clock(Duration::from_secs(10));
3256
3257 // Verify that B sees the new messages upon reconnection, as well as the message client B
3258 // sent while offline.
3259 channel_b
3260 .condition(&cx_b, |channel, _| {
3261 channel_messages(channel)
3262 == [
3263 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3264 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3265 ("user_a".to_string(), "sup".to_string(), false),
3266 ("user_b".to_string(), "can you see this?".to_string(), false),
3267 ]
3268 })
3269 .await;
3270
3271 // Ensure client A and B can communicate normally after reconnection.
3272 channel_a
3273 .update(&mut cx_a, |channel, cx| {
3274 channel.send_message("you online?".to_string(), cx).unwrap()
3275 })
3276 .await
3277 .unwrap();
3278 channel_b
3279 .condition(&cx_b, |channel, _| {
3280 channel_messages(channel)
3281 == [
3282 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3283 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3284 ("user_a".to_string(), "sup".to_string(), false),
3285 ("user_b".to_string(), "can you see this?".to_string(), false),
3286 ("user_a".to_string(), "you online?".to_string(), false),
3287 ]
3288 })
3289 .await;
3290
3291 channel_b
3292 .update(&mut cx_b, |channel, cx| {
3293 channel.send_message("yep".to_string(), cx).unwrap()
3294 })
3295 .await
3296 .unwrap();
3297 channel_a
3298 .condition(&cx_a, |channel, _| {
3299 channel_messages(channel)
3300 == [
3301 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3302 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3303 ("user_a".to_string(), "sup".to_string(), false),
3304 ("user_b".to_string(), "can you see this?".to_string(), false),
3305 ("user_a".to_string(), "you online?".to_string(), false),
3306 ("user_b".to_string(), "yep".to_string(), false),
3307 ]
3308 })
3309 .await;
3310 }
3311
3312 #[gpui::test]
3313 async fn test_contacts(
3314 mut cx_a: TestAppContext,
3315 mut cx_b: TestAppContext,
3316 mut cx_c: TestAppContext,
3317 ) {
3318 cx_a.foreground().forbid_parking();
3319 let lang_registry = Arc::new(LanguageRegistry::new());
3320 let fs = Arc::new(FakeFs::new(cx_a.background()));
3321
3322 // Connect to a server as 3 clients.
3323 let mut server = TestServer::start(cx_a.foreground()).await;
3324 let client_a = server.create_client(&mut cx_a, "user_a").await;
3325 let client_b = server.create_client(&mut cx_b, "user_b").await;
3326 let client_c = server.create_client(&mut cx_c, "user_c").await;
3327
3328 // Share a worktree as client A.
3329 fs.insert_tree(
3330 "/a",
3331 json!({
3332 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3333 }),
3334 )
3335 .await;
3336
3337 let project_a = cx_a.update(|cx| {
3338 Project::local(
3339 client_a.clone(),
3340 client_a.user_store.clone(),
3341 lang_registry.clone(),
3342 fs.clone(),
3343 cx,
3344 )
3345 });
3346 let (worktree_a, _) = project_a
3347 .update(&mut cx_a, |p, cx| {
3348 p.find_or_create_local_worktree("/a", false, cx)
3349 })
3350 .await
3351 .unwrap();
3352 worktree_a
3353 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3354 .await;
3355
3356 client_a
3357 .user_store
3358 .condition(&cx_a, |user_store, _| {
3359 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3360 })
3361 .await;
3362 client_b
3363 .user_store
3364 .condition(&cx_b, |user_store, _| {
3365 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3366 })
3367 .await;
3368 client_c
3369 .user_store
3370 .condition(&cx_c, |user_store, _| {
3371 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3372 })
3373 .await;
3374
3375 let project_id = project_a
3376 .update(&mut cx_a, |project, _| project.next_remote_id())
3377 .await;
3378 project_a
3379 .update(&mut cx_a, |project, cx| project.share(cx))
3380 .await
3381 .unwrap();
3382
3383 let _project_b = Project::remote(
3384 project_id,
3385 client_b.clone(),
3386 client_b.user_store.clone(),
3387 lang_registry.clone(),
3388 fs.clone(),
3389 &mut cx_b.to_async(),
3390 )
3391 .await
3392 .unwrap();
3393
3394 client_a
3395 .user_store
3396 .condition(&cx_a, |user_store, _| {
3397 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3398 })
3399 .await;
3400 client_b
3401 .user_store
3402 .condition(&cx_b, |user_store, _| {
3403 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3404 })
3405 .await;
3406 client_c
3407 .user_store
3408 .condition(&cx_c, |user_store, _| {
3409 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3410 })
3411 .await;
3412
3413 project_a
3414 .condition(&cx_a, |project, _| {
3415 project.collaborators().contains_key(&client_b.peer_id)
3416 })
3417 .await;
3418
3419 cx_a.update(move |_| drop(project_a));
3420 client_a
3421 .user_store
3422 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3423 .await;
3424 client_b
3425 .user_store
3426 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3427 .await;
3428 client_c
3429 .user_store
3430 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3431 .await;
3432
3433 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3434 user_store
3435 .contacts()
3436 .iter()
3437 .map(|contact| {
3438 let worktrees = contact
3439 .projects
3440 .iter()
3441 .map(|p| {
3442 (
3443 p.worktree_root_names[0].as_str(),
3444 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3445 )
3446 })
3447 .collect();
3448 (contact.user.github_login.as_str(), worktrees)
3449 })
3450 .collect()
3451 }
3452 }
3453
3454 struct TestServer {
3455 peer: Arc<Peer>,
3456 app_state: Arc<AppState>,
3457 server: Arc<Server>,
3458 foreground: Rc<executor::Foreground>,
3459 notifications: mpsc::Receiver<()>,
3460 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3461 forbid_connections: Arc<AtomicBool>,
3462 _test_db: TestDb,
3463 }
3464
3465 impl TestServer {
3466 async fn start(foreground: Rc<executor::Foreground>) -> Self {
3467 let test_db = TestDb::new();
3468 let app_state = Self::build_app_state(&test_db).await;
3469 let peer = Peer::new();
3470 let notifications = mpsc::channel(128);
3471 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3472 Self {
3473 peer,
3474 app_state,
3475 server,
3476 foreground,
3477 notifications: notifications.1,
3478 connection_killers: Default::default(),
3479 forbid_connections: Default::default(),
3480 _test_db: test_db,
3481 }
3482 }
3483
3484 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3485 let http = FakeHttpClient::with_404_response();
3486 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3487 let client_name = name.to_string();
3488 let mut client = Client::new(http.clone());
3489 let server = self.server.clone();
3490 let connection_killers = self.connection_killers.clone();
3491 let forbid_connections = self.forbid_connections.clone();
3492 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3493
3494 Arc::get_mut(&mut client)
3495 .unwrap()
3496 .override_authenticate(move |cx| {
3497 cx.spawn(|_| async move {
3498 let access_token = "the-token".to_string();
3499 Ok(Credentials {
3500 user_id: user_id.0 as u64,
3501 access_token,
3502 })
3503 })
3504 })
3505 .override_establish_connection(move |credentials, cx| {
3506 assert_eq!(credentials.user_id, user_id.0 as u64);
3507 assert_eq!(credentials.access_token, "the-token");
3508
3509 let server = server.clone();
3510 let connection_killers = connection_killers.clone();
3511 let forbid_connections = forbid_connections.clone();
3512 let client_name = client_name.clone();
3513 let connection_id_tx = connection_id_tx.clone();
3514 cx.spawn(move |cx| async move {
3515 if forbid_connections.load(SeqCst) {
3516 Err(EstablishConnectionError::other(anyhow!(
3517 "server is forbidding connections"
3518 )))
3519 } else {
3520 let (client_conn, server_conn, kill_conn) =
3521 Connection::in_memory(cx.background());
3522 connection_killers.lock().insert(user_id, kill_conn);
3523 cx.background()
3524 .spawn(server.handle_connection(
3525 server_conn,
3526 client_name,
3527 user_id,
3528 Some(connection_id_tx),
3529 ))
3530 .detach();
3531 Ok(client_conn)
3532 }
3533 })
3534 });
3535
3536 client
3537 .authenticate_and_connect(&cx.to_async())
3538 .await
3539 .unwrap();
3540
3541 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3542 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3543 let mut authed_user =
3544 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3545 while authed_user.next().await.unwrap().is_none() {}
3546
3547 TestClient {
3548 client,
3549 peer_id,
3550 user_store,
3551 }
3552 }
3553
3554 fn disconnect_client(&self, user_id: UserId) {
3555 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3556 let _ = kill_conn.try_send(Some(()));
3557 }
3558 }
3559
3560 fn forbid_connections(&self) {
3561 self.forbid_connections.store(true, SeqCst);
3562 }
3563
3564 fn allow_connections(&self) {
3565 self.forbid_connections.store(false, SeqCst);
3566 }
3567
3568 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3569 let mut config = Config::default();
3570 config.session_secret = "a".repeat(32);
3571 config.database_url = test_db.url.clone();
3572 let github_client = github::AppClient::test();
3573 Arc::new(AppState {
3574 db: test_db.db().clone(),
3575 handlebars: Default::default(),
3576 auth_client: auth::build_client("", ""),
3577 repo_client: github::RepoClient::test(&github_client),
3578 github_client,
3579 config,
3580 })
3581 }
3582
3583 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3584 self.server.store.read()
3585 }
3586
3587 async fn condition<F>(&mut self, mut predicate: F)
3588 where
3589 F: FnMut(&Store) -> bool,
3590 {
3591 async_std::future::timeout(Duration::from_millis(500), async {
3592 while !(predicate)(&*self.server.store.read()) {
3593 self.foreground.start_waiting();
3594 self.notifications.next().await;
3595 self.foreground.finish_waiting();
3596 }
3597 })
3598 .await
3599 .expect("condition timed out");
3600 }
3601 }
3602
3603 impl Drop for TestServer {
3604 fn drop(&mut self) {
3605 self.peer.reset();
3606 }
3607 }
3608
3609 struct TestClient {
3610 client: Arc<Client>,
3611 pub peer_id: PeerId,
3612 pub user_store: ModelHandle<UserStore>,
3613 }
3614
3615 impl Deref for TestClient {
3616 type Target = Arc<Client>;
3617
3618 fn deref(&self) -> &Self::Target {
3619 &self.client
3620 }
3621 }
3622
3623 impl TestClient {
3624 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3625 UserId::from_proto(
3626 self.user_store
3627 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3628 )
3629 }
3630 }
3631
3632 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3633 channel
3634 .messages()
3635 .cursor::<()>()
3636 .map(|m| {
3637 (
3638 m.sender.github_login.clone(),
3639 m.body.clone(),
3640 m.is_pending(),
3641 )
3642 })
3643 .collect()
3644 }
3645
3646 struct EmptyView;
3647
3648 impl gpui::Entity for EmptyView {
3649 type Event = ();
3650 }
3651
3652 impl gpui::View for EmptyView {
3653 fn ui_name() -> &'static str {
3654 "empty view"
3655 }
3656
3657 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3658 gpui::Element::boxed(gpui::elements::Empty)
3659 }
3660 }
3661}