1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44const MESSAGE_COUNT_PER_PAGE: usize = 100;
45const MAX_MESSAGE_LEN: usize = 1024;
46const NO_SUCH_PROJECT: &'static str = "no such project";
47
48impl Server {
49 pub fn new(
50 app_state: Arc<AppState>,
51 peer: Arc<Peer>,
52 notifications: Option<mpsc::Sender<()>>,
53 ) -> Arc<Self> {
54 let mut server = Self {
55 peer,
56 app_state,
57 store: Default::default(),
58 handlers: Default::default(),
59 notifications,
60 };
61
62 server
63 .add_handler(Server::ping)
64 .add_handler(Server::register_project)
65 .add_handler(Server::unregister_project)
66 .add_handler(Server::share_project)
67 .add_handler(Server::unshare_project)
68 .add_handler(Server::join_project)
69 .add_handler(Server::leave_project)
70 .add_handler(Server::register_worktree)
71 .add_handler(Server::unregister_worktree)
72 .add_handler(Server::share_worktree)
73 .add_handler(Server::update_worktree)
74 .add_handler(Server::update_diagnostic_summary)
75 .add_handler(Server::disk_based_diagnostics_updating)
76 .add_handler(Server::disk_based_diagnostics_updated)
77 .add_handler(Server::get_definition)
78 .add_handler(Server::open_buffer)
79 .add_handler(Server::close_buffer)
80 .add_handler(Server::update_buffer)
81 .add_handler(Server::update_buffer_file)
82 .add_handler(Server::buffer_reloaded)
83 .add_handler(Server::buffer_saved)
84 .add_handler(Server::save_buffer)
85 .add_handler(Server::format_buffer)
86 .add_handler(Server::get_completions)
87 .add_handler(Server::apply_additional_edits_for_completion)
88 .add_handler(Server::get_channels)
89 .add_handler(Server::get_users)
90 .add_handler(Server::join_channel)
91 .add_handler(Server::leave_channel)
92 .add_handler(Server::send_channel_message)
93 .add_handler(Server::get_channel_messages);
94
95 Arc::new(server)
96 }
97
98 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
99 where
100 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
101 Fut: 'static + Send + Future<Output = tide::Result<()>>,
102 M: EnvelopedMessage,
103 {
104 let prev_handler = self.handlers.insert(
105 TypeId::of::<M>(),
106 Box::new(move |server, envelope| {
107 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
108 (handler)(server, *envelope).boxed()
109 }),
110 );
111 if prev_handler.is_some() {
112 panic!("registered a handler for the same message twice");
113 }
114 self
115 }
116
117 pub fn handle_connection(
118 self: &Arc<Self>,
119 connection: Connection,
120 addr: String,
121 user_id: UserId,
122 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
123 ) -> impl Future<Output = ()> {
124 let mut this = self.clone();
125 async move {
126 let (connection_id, handle_io, mut incoming_rx) =
127 this.peer.add_connection(connection).await;
128
129 if let Some(send_connection_id) = send_connection_id.as_mut() {
130 let _ = send_connection_id.send(connection_id).await;
131 }
132
133 this.state_mut().add_connection(connection_id, user_id);
134 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
135 log::error!("error updating contacts for {:?}: {}", user_id, err);
136 }
137
138 let handle_io = handle_io.fuse();
139 futures::pin_mut!(handle_io);
140 loop {
141 let next_message = incoming_rx.next().fuse();
142 futures::pin_mut!(next_message);
143 futures::select_biased! {
144 result = handle_io => {
145 if let Err(err) = result {
146 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
147 }
148 break;
149 }
150 message = next_message => {
151 if let Some(message) = message {
152 let start_time = Instant::now();
153 log::info!("RPC message received: {}", message.payload_type_name());
154 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
155 if let Err(err) = (handler)(this.clone(), message).await {
156 log::error!("error handling message: {:?}", err);
157 } else {
158 log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
159 }
160
161 if let Some(mut notifications) = this.notifications.clone() {
162 let _ = notifications.send(()).await;
163 }
164 } else {
165 log::warn!("unhandled message: {}", message.payload_type_name());
166 }
167 } else {
168 log::info!("rpc connection closed {:?}", addr);
169 break;
170 }
171 }
172 }
173 }
174
175 if let Err(err) = this.sign_out(connection_id).await {
176 log::error!("error signing out connection {:?} - {:?}", addr, err);
177 }
178 }
179 }
180
181 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
182 self.peer.disconnect(connection_id);
183 let removed_connection = self.state_mut().remove_connection(connection_id)?;
184
185 for (project_id, project) in removed_connection.hosted_projects {
186 if let Some(share) = project.share {
187 broadcast(
188 connection_id,
189 share.guests.keys().copied().collect(),
190 |conn_id| {
191 self.peer
192 .send(conn_id, proto::UnshareProject { project_id })
193 },
194 )?;
195 }
196 }
197
198 for (project_id, peer_ids) in removed_connection.guest_project_ids {
199 broadcast(connection_id, peer_ids, |conn_id| {
200 self.peer.send(
201 conn_id,
202 proto::RemoveProjectCollaborator {
203 project_id,
204 peer_id: connection_id.0,
205 },
206 )
207 })?;
208 }
209
210 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
211 Ok(())
212 }
213
214 async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
215 self.peer.respond(request.receipt(), proto::Ack {})?;
216 Ok(())
217 }
218
219 async fn register_project(
220 mut self: Arc<Server>,
221 request: TypedEnvelope<proto::RegisterProject>,
222 ) -> tide::Result<()> {
223 let project_id = {
224 let mut state = self.state_mut();
225 let user_id = state.user_id_for_connection(request.sender_id)?;
226 state.register_project(request.sender_id, user_id)
227 };
228 self.peer.respond(
229 request.receipt(),
230 proto::RegisterProjectResponse { project_id },
231 )?;
232 Ok(())
233 }
234
235 async fn unregister_project(
236 mut self: Arc<Server>,
237 request: TypedEnvelope<proto::UnregisterProject>,
238 ) -> tide::Result<()> {
239 let project = self
240 .state_mut()
241 .unregister_project(request.payload.project_id, request.sender_id)
242 .ok_or_else(|| anyhow!("no such project"))?;
243 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
244 Ok(())
245 }
246
247 async fn share_project(
248 mut self: Arc<Server>,
249 request: TypedEnvelope<proto::ShareProject>,
250 ) -> tide::Result<()> {
251 self.state_mut()
252 .share_project(request.payload.project_id, request.sender_id);
253 self.peer.respond(request.receipt(), proto::Ack {})?;
254 Ok(())
255 }
256
257 async fn unshare_project(
258 mut self: Arc<Server>,
259 request: TypedEnvelope<proto::UnshareProject>,
260 ) -> tide::Result<()> {
261 let project_id = request.payload.project_id;
262 let project = self
263 .state_mut()
264 .unshare_project(project_id, request.sender_id)?;
265
266 broadcast(request.sender_id, project.connection_ids, |conn_id| {
267 self.peer
268 .send(conn_id, proto::UnshareProject { project_id })
269 })?;
270 self.update_contacts_for_users(&project.authorized_user_ids)?;
271 Ok(())
272 }
273
274 async fn join_project(
275 mut self: Arc<Server>,
276 request: TypedEnvelope<proto::JoinProject>,
277 ) -> tide::Result<()> {
278 let project_id = request.payload.project_id;
279
280 let user_id = self.state().user_id_for_connection(request.sender_id)?;
281 let response_data = self
282 .state_mut()
283 .join_project(request.sender_id, user_id, project_id)
284 .and_then(|joined| {
285 let share = joined.project.share()?;
286 let peer_count = share.guests.len();
287 let mut collaborators = Vec::with_capacity(peer_count);
288 collaborators.push(proto::Collaborator {
289 peer_id: joined.project.host_connection_id.0,
290 replica_id: 0,
291 user_id: joined.project.host_user_id.to_proto(),
292 });
293 let worktrees = joined
294 .project
295 .worktrees
296 .iter()
297 .filter_map(|(id, worktree)| {
298 worktree.share.as_ref().map(|share| proto::Worktree {
299 id: *id,
300 root_name: worktree.root_name.clone(),
301 entries: share.entries.values().cloned().collect(),
302 diagnostic_summaries: share
303 .diagnostic_summaries
304 .values()
305 .cloned()
306 .collect(),
307 weak: worktree.weak,
308 })
309 })
310 .collect();
311 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
312 if *peer_conn_id != request.sender_id {
313 collaborators.push(proto::Collaborator {
314 peer_id: peer_conn_id.0,
315 replica_id: *peer_replica_id as u32,
316 user_id: peer_user_id.to_proto(),
317 });
318 }
319 }
320 let response = proto::JoinProjectResponse {
321 worktrees,
322 replica_id: joined.replica_id as u32,
323 collaborators,
324 };
325 let connection_ids = joined.project.connection_ids();
326 let contact_user_ids = joined.project.authorized_user_ids();
327 Ok((response, connection_ids, contact_user_ids))
328 });
329
330 match response_data {
331 Ok((response, connection_ids, contact_user_ids)) => {
332 broadcast(request.sender_id, connection_ids, |conn_id| {
333 self.peer.send(
334 conn_id,
335 proto::AddProjectCollaborator {
336 project_id,
337 collaborator: Some(proto::Collaborator {
338 peer_id: request.sender_id.0,
339 replica_id: response.replica_id,
340 user_id: user_id.to_proto(),
341 }),
342 },
343 )
344 })?;
345 self.peer.respond(request.receipt(), response)?;
346 self.update_contacts_for_users(&contact_user_ids)?;
347 }
348 Err(error) => {
349 self.peer.respond_with_error(
350 request.receipt(),
351 proto::Error {
352 message: error.to_string(),
353 },
354 )?;
355 }
356 }
357
358 Ok(())
359 }
360
361 async fn leave_project(
362 mut self: Arc<Server>,
363 request: TypedEnvelope<proto::LeaveProject>,
364 ) -> tide::Result<()> {
365 let sender_id = request.sender_id;
366 let project_id = request.payload.project_id;
367 let worktree = self.state_mut().leave_project(sender_id, project_id);
368 if let Some(worktree) = worktree {
369 broadcast(sender_id, worktree.connection_ids, |conn_id| {
370 self.peer.send(
371 conn_id,
372 proto::RemoveProjectCollaborator {
373 project_id,
374 peer_id: sender_id.0,
375 },
376 )
377 })?;
378 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
379 }
380 Ok(())
381 }
382
383 async fn register_worktree(
384 mut self: Arc<Server>,
385 request: TypedEnvelope<proto::RegisterWorktree>,
386 ) -> tide::Result<()> {
387 let receipt = request.receipt();
388 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
389
390 let mut contact_user_ids = HashSet::default();
391 contact_user_ids.insert(host_user_id);
392 for github_login in request.payload.authorized_logins {
393 match self.app_state.db.create_user(&github_login, false).await {
394 Ok(contact_user_id) => {
395 contact_user_ids.insert(contact_user_id);
396 }
397 Err(err) => {
398 let message = err.to_string();
399 self.peer
400 .respond_with_error(receipt, proto::Error { message })?;
401 return Ok(());
402 }
403 }
404 }
405
406 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
407 let ok = self.state_mut().register_worktree(
408 request.payload.project_id,
409 request.payload.worktree_id,
410 Worktree {
411 authorized_user_ids: contact_user_ids.clone(),
412 root_name: request.payload.root_name,
413 share: None,
414 weak: false,
415 },
416 );
417
418 if ok {
419 self.peer.respond(receipt, proto::Ack {})?;
420 self.update_contacts_for_users(&contact_user_ids)?;
421 } else {
422 self.peer.respond_with_error(
423 receipt,
424 proto::Error {
425 message: NO_SUCH_PROJECT.to_string(),
426 },
427 )?;
428 }
429
430 Ok(())
431 }
432
433 async fn unregister_worktree(
434 mut self: Arc<Server>,
435 request: TypedEnvelope<proto::UnregisterWorktree>,
436 ) -> tide::Result<()> {
437 let project_id = request.payload.project_id;
438 let worktree_id = request.payload.worktree_id;
439 let (worktree, guest_connection_ids) =
440 self.state_mut()
441 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
442 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
443 self.peer.send(
444 conn_id,
445 proto::UnregisterWorktree {
446 project_id,
447 worktree_id,
448 },
449 )
450 })?;
451 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
452 Ok(())
453 }
454
455 async fn share_worktree(
456 mut self: Arc<Server>,
457 mut request: TypedEnvelope<proto::ShareWorktree>,
458 ) -> tide::Result<()> {
459 let worktree = request
460 .payload
461 .worktree
462 .as_mut()
463 .ok_or_else(|| anyhow!("missing worktree"))?;
464 let entries = worktree
465 .entries
466 .iter()
467 .map(|entry| (entry.id, entry.clone()))
468 .collect();
469 let diagnostic_summaries = worktree
470 .diagnostic_summaries
471 .iter()
472 .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
473 .collect();
474
475 let shared_worktree = self.state_mut().share_worktree(
476 request.payload.project_id,
477 worktree.id,
478 request.sender_id,
479 entries,
480 diagnostic_summaries,
481 );
482 if let Some(shared_worktree) = shared_worktree {
483 broadcast(
484 request.sender_id,
485 shared_worktree.connection_ids,
486 |connection_id| {
487 self.peer.forward_send(
488 request.sender_id,
489 connection_id,
490 request.payload.clone(),
491 )
492 },
493 )?;
494 self.peer.respond(request.receipt(), proto::Ack {})?;
495 self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
496 } else {
497 self.peer.respond_with_error(
498 request.receipt(),
499 proto::Error {
500 message: "no such worktree".to_string(),
501 },
502 )?;
503 }
504 Ok(())
505 }
506
507 async fn update_worktree(
508 mut self: Arc<Server>,
509 request: TypedEnvelope<proto::UpdateWorktree>,
510 ) -> tide::Result<()> {
511 let connection_ids = self
512 .state_mut()
513 .update_worktree(
514 request.sender_id,
515 request.payload.project_id,
516 request.payload.worktree_id,
517 &request.payload.removed_entries,
518 &request.payload.updated_entries,
519 )
520 .ok_or_else(|| anyhow!("no such worktree"))?;
521
522 broadcast(request.sender_id, connection_ids, |connection_id| {
523 self.peer
524 .forward_send(request.sender_id, connection_id, request.payload.clone())
525 })?;
526
527 Ok(())
528 }
529
530 async fn update_diagnostic_summary(
531 mut self: Arc<Server>,
532 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
533 ) -> tide::Result<()> {
534 let receiver_ids = request
535 .payload
536 .summary
537 .clone()
538 .and_then(|summary| {
539 self.state_mut().update_diagnostic_summary(
540 request.payload.project_id,
541 request.payload.worktree_id,
542 request.sender_id,
543 summary,
544 )
545 })
546 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
547
548 broadcast(request.sender_id, receiver_ids, |connection_id| {
549 self.peer
550 .forward_send(request.sender_id, connection_id, request.payload.clone())
551 })?;
552 Ok(())
553 }
554
555 async fn disk_based_diagnostics_updating(
556 self: Arc<Server>,
557 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
558 ) -> tide::Result<()> {
559 let receiver_ids = self
560 .state()
561 .project_connection_ids(request.payload.project_id, request.sender_id)
562 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
563 broadcast(request.sender_id, receiver_ids, |connection_id| {
564 self.peer
565 .forward_send(request.sender_id, connection_id, request.payload.clone())
566 })?;
567 Ok(())
568 }
569
570 async fn disk_based_diagnostics_updated(
571 self: Arc<Server>,
572 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
573 ) -> tide::Result<()> {
574 let receiver_ids = self
575 .state()
576 .project_connection_ids(request.payload.project_id, request.sender_id)
577 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
578 broadcast(request.sender_id, receiver_ids, |connection_id| {
579 self.peer
580 .forward_send(request.sender_id, connection_id, request.payload.clone())
581 })?;
582 Ok(())
583 }
584
585 async fn get_definition(
586 self: Arc<Server>,
587 request: TypedEnvelope<proto::GetDefinition>,
588 ) -> tide::Result<()> {
589 let receipt = request.receipt();
590 let host_connection_id = self
591 .state()
592 .read_project(request.payload.project_id, request.sender_id)
593 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
594 .host_connection_id;
595 let response = self
596 .peer
597 .forward_request(request.sender_id, host_connection_id, request.payload)
598 .await?;
599 self.peer.respond(receipt, response)?;
600 Ok(())
601 }
602
603 async fn open_buffer(
604 self: Arc<Server>,
605 request: TypedEnvelope<proto::OpenBuffer>,
606 ) -> tide::Result<()> {
607 let receipt = request.receipt();
608 let host_connection_id = self
609 .state()
610 .read_project(request.payload.project_id, request.sender_id)
611 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
612 .host_connection_id;
613 let response = self
614 .peer
615 .forward_request(request.sender_id, host_connection_id, request.payload)
616 .await?;
617 self.peer.respond(receipt, response)?;
618 Ok(())
619 }
620
621 async fn close_buffer(
622 self: Arc<Server>,
623 request: TypedEnvelope<proto::CloseBuffer>,
624 ) -> tide::Result<()> {
625 let host_connection_id = self
626 .state()
627 .read_project(request.payload.project_id, request.sender_id)
628 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
629 .host_connection_id;
630 self.peer
631 .forward_send(request.sender_id, host_connection_id, request.payload)?;
632 Ok(())
633 }
634
635 async fn save_buffer(
636 self: Arc<Server>,
637 request: TypedEnvelope<proto::SaveBuffer>,
638 ) -> tide::Result<()> {
639 let host;
640 let guests;
641 {
642 let state = self.state();
643 let project = state
644 .read_project(request.payload.project_id, request.sender_id)
645 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
646 host = project.host_connection_id;
647 guests = project.guest_connection_ids()
648 }
649
650 let sender = request.sender_id;
651 let receipt = request.receipt();
652 let response = self
653 .peer
654 .forward_request(sender, host, request.payload.clone())
655 .await?;
656
657 broadcast(host, guests, |conn_id| {
658 let response = response.clone();
659 if conn_id == sender {
660 self.peer.respond(receipt, response)
661 } else {
662 self.peer.forward_send(host, conn_id, response)
663 }
664 })?;
665
666 Ok(())
667 }
668
669 async fn format_buffer(
670 self: Arc<Server>,
671 request: TypedEnvelope<proto::FormatBuffer>,
672 ) -> tide::Result<()> {
673 let host;
674 {
675 let state = self.state();
676 let project = state
677 .read_project(request.payload.project_id, request.sender_id)
678 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
679 host = project.host_connection_id;
680 }
681
682 let sender = request.sender_id;
683 let receipt = request.receipt();
684 let response = self
685 .peer
686 .forward_request(sender, host, request.payload.clone())
687 .await?;
688 self.peer.respond(receipt, response)?;
689
690 Ok(())
691 }
692
693 async fn get_completions(
694 self: Arc<Server>,
695 request: TypedEnvelope<proto::GetCompletions>,
696 ) -> tide::Result<()> {
697 let host;
698 {
699 let state = self.state();
700 let project = state
701 .read_project(request.payload.project_id, request.sender_id)
702 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
703 host = project.host_connection_id;
704 }
705
706 let sender = request.sender_id;
707 let receipt = request.receipt();
708 let response = self
709 .peer
710 .forward_request(sender, host, request.payload.clone())
711 .await?;
712 self.peer.respond(receipt, response)?;
713 Ok(())
714 }
715
716 async fn apply_additional_edits_for_completion(
717 self: Arc<Server>,
718 request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
719 ) -> tide::Result<()> {
720 let host;
721 {
722 let state = self.state();
723 let project = state
724 .read_project(request.payload.project_id, request.sender_id)
725 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
726 host = project.host_connection_id;
727 }
728
729 let sender = request.sender_id;
730 let receipt = request.receipt();
731 let response = self
732 .peer
733 .forward_request(sender, host, request.payload.clone())
734 .await?;
735 self.peer.respond(receipt, response)?;
736 Ok(())
737 }
738
739 async fn update_buffer(
740 self: Arc<Server>,
741 request: TypedEnvelope<proto::UpdateBuffer>,
742 ) -> tide::Result<()> {
743 let receiver_ids = self
744 .state()
745 .project_connection_ids(request.payload.project_id, request.sender_id)
746 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
747 broadcast(request.sender_id, receiver_ids, |connection_id| {
748 self.peer
749 .forward_send(request.sender_id, connection_id, request.payload.clone())
750 })?;
751 self.peer.respond(request.receipt(), proto::Ack {})?;
752 Ok(())
753 }
754
755 async fn update_buffer_file(
756 self: Arc<Server>,
757 request: TypedEnvelope<proto::UpdateBufferFile>,
758 ) -> tide::Result<()> {
759 let receiver_ids = self
760 .state()
761 .project_connection_ids(request.payload.project_id, request.sender_id)
762 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
763 broadcast(request.sender_id, receiver_ids, |connection_id| {
764 self.peer
765 .forward_send(request.sender_id, connection_id, request.payload.clone())
766 })?;
767 Ok(())
768 }
769
770 async fn buffer_reloaded(
771 self: Arc<Server>,
772 request: TypedEnvelope<proto::BufferReloaded>,
773 ) -> tide::Result<()> {
774 let receiver_ids = self
775 .state()
776 .project_connection_ids(request.payload.project_id, request.sender_id)
777 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
778 broadcast(request.sender_id, receiver_ids, |connection_id| {
779 self.peer
780 .forward_send(request.sender_id, connection_id, request.payload.clone())
781 })?;
782 Ok(())
783 }
784
785 async fn buffer_saved(
786 self: Arc<Server>,
787 request: TypedEnvelope<proto::BufferSaved>,
788 ) -> tide::Result<()> {
789 let receiver_ids = self
790 .state()
791 .project_connection_ids(request.payload.project_id, request.sender_id)
792 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
793 broadcast(request.sender_id, receiver_ids, |connection_id| {
794 self.peer
795 .forward_send(request.sender_id, connection_id, request.payload.clone())
796 })?;
797 Ok(())
798 }
799
800 async fn get_channels(
801 self: Arc<Server>,
802 request: TypedEnvelope<proto::GetChannels>,
803 ) -> tide::Result<()> {
804 let user_id = self.state().user_id_for_connection(request.sender_id)?;
805 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
806 self.peer.respond(
807 request.receipt(),
808 proto::GetChannelsResponse {
809 channels: channels
810 .into_iter()
811 .map(|chan| proto::Channel {
812 id: chan.id.to_proto(),
813 name: chan.name,
814 })
815 .collect(),
816 },
817 )?;
818 Ok(())
819 }
820
821 async fn get_users(
822 self: Arc<Server>,
823 request: TypedEnvelope<proto::GetUsers>,
824 ) -> tide::Result<()> {
825 let receipt = request.receipt();
826 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
827 let users = self
828 .app_state
829 .db
830 .get_users_by_ids(user_ids)
831 .await?
832 .into_iter()
833 .map(|user| proto::User {
834 id: user.id.to_proto(),
835 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
836 github_login: user.github_login,
837 })
838 .collect();
839 self.peer
840 .respond(receipt, proto::GetUsersResponse { users })?;
841 Ok(())
842 }
843
844 fn update_contacts_for_users<'a>(
845 self: &Arc<Server>,
846 user_ids: impl IntoIterator<Item = &'a UserId>,
847 ) -> anyhow::Result<()> {
848 let mut result = Ok(());
849 let state = self.state();
850 for user_id in user_ids {
851 let contacts = state.contacts_for_user(*user_id);
852 for connection_id in state.connection_ids_for_user(*user_id) {
853 if let Err(error) = self.peer.send(
854 connection_id,
855 proto::UpdateContacts {
856 contacts: contacts.clone(),
857 },
858 ) {
859 result = Err(error);
860 }
861 }
862 }
863 result
864 }
865
866 async fn join_channel(
867 mut self: Arc<Self>,
868 request: TypedEnvelope<proto::JoinChannel>,
869 ) -> tide::Result<()> {
870 let user_id = self.state().user_id_for_connection(request.sender_id)?;
871 let channel_id = ChannelId::from_proto(request.payload.channel_id);
872 if !self
873 .app_state
874 .db
875 .can_user_access_channel(user_id, channel_id)
876 .await?
877 {
878 Err(anyhow!("access denied"))?;
879 }
880
881 self.state_mut().join_channel(request.sender_id, channel_id);
882 let messages = self
883 .app_state
884 .db
885 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
886 .await?
887 .into_iter()
888 .map(|msg| proto::ChannelMessage {
889 id: msg.id.to_proto(),
890 body: msg.body,
891 timestamp: msg.sent_at.unix_timestamp() as u64,
892 sender_id: msg.sender_id.to_proto(),
893 nonce: Some(msg.nonce.as_u128().into()),
894 })
895 .collect::<Vec<_>>();
896 self.peer.respond(
897 request.receipt(),
898 proto::JoinChannelResponse {
899 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
900 messages,
901 },
902 )?;
903 Ok(())
904 }
905
906 async fn leave_channel(
907 mut self: Arc<Self>,
908 request: TypedEnvelope<proto::LeaveChannel>,
909 ) -> tide::Result<()> {
910 let user_id = self.state().user_id_for_connection(request.sender_id)?;
911 let channel_id = ChannelId::from_proto(request.payload.channel_id);
912 if !self
913 .app_state
914 .db
915 .can_user_access_channel(user_id, channel_id)
916 .await?
917 {
918 Err(anyhow!("access denied"))?;
919 }
920
921 self.state_mut()
922 .leave_channel(request.sender_id, channel_id);
923
924 Ok(())
925 }
926
927 async fn send_channel_message(
928 self: Arc<Self>,
929 request: TypedEnvelope<proto::SendChannelMessage>,
930 ) -> tide::Result<()> {
931 let receipt = request.receipt();
932 let channel_id = ChannelId::from_proto(request.payload.channel_id);
933 let user_id;
934 let connection_ids;
935 {
936 let state = self.state();
937 user_id = state.user_id_for_connection(request.sender_id)?;
938 if let Some(ids) = state.channel_connection_ids(channel_id) {
939 connection_ids = ids;
940 } else {
941 return Ok(());
942 }
943 }
944
945 // Validate the message body.
946 let body = request.payload.body.trim().to_string();
947 if body.len() > MAX_MESSAGE_LEN {
948 self.peer.respond_with_error(
949 receipt,
950 proto::Error {
951 message: "message is too long".to_string(),
952 },
953 )?;
954 return Ok(());
955 }
956 if body.is_empty() {
957 self.peer.respond_with_error(
958 receipt,
959 proto::Error {
960 message: "message can't be blank".to_string(),
961 },
962 )?;
963 return Ok(());
964 }
965
966 let timestamp = OffsetDateTime::now_utc();
967 let nonce = if let Some(nonce) = request.payload.nonce {
968 nonce
969 } else {
970 self.peer.respond_with_error(
971 receipt,
972 proto::Error {
973 message: "nonce can't be blank".to_string(),
974 },
975 )?;
976 return Ok(());
977 };
978
979 let message_id = self
980 .app_state
981 .db
982 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
983 .await?
984 .to_proto();
985 let message = proto::ChannelMessage {
986 sender_id: user_id.to_proto(),
987 id: message_id,
988 body,
989 timestamp: timestamp.unix_timestamp() as u64,
990 nonce: Some(nonce),
991 };
992 broadcast(request.sender_id, connection_ids, |conn_id| {
993 self.peer.send(
994 conn_id,
995 proto::ChannelMessageSent {
996 channel_id: channel_id.to_proto(),
997 message: Some(message.clone()),
998 },
999 )
1000 })?;
1001 self.peer.respond(
1002 receipt,
1003 proto::SendChannelMessageResponse {
1004 message: Some(message),
1005 },
1006 )?;
1007 Ok(())
1008 }
1009
1010 async fn get_channel_messages(
1011 self: Arc<Self>,
1012 request: TypedEnvelope<proto::GetChannelMessages>,
1013 ) -> tide::Result<()> {
1014 let user_id = self.state().user_id_for_connection(request.sender_id)?;
1015 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1016 if !self
1017 .app_state
1018 .db
1019 .can_user_access_channel(user_id, channel_id)
1020 .await?
1021 {
1022 Err(anyhow!("access denied"))?;
1023 }
1024
1025 let messages = self
1026 .app_state
1027 .db
1028 .get_channel_messages(
1029 channel_id,
1030 MESSAGE_COUNT_PER_PAGE,
1031 Some(MessageId::from_proto(request.payload.before_message_id)),
1032 )
1033 .await?
1034 .into_iter()
1035 .map(|msg| proto::ChannelMessage {
1036 id: msg.id.to_proto(),
1037 body: msg.body,
1038 timestamp: msg.sent_at.unix_timestamp() as u64,
1039 sender_id: msg.sender_id.to_proto(),
1040 nonce: Some(msg.nonce.as_u128().into()),
1041 })
1042 .collect::<Vec<_>>();
1043 self.peer.respond(
1044 request.receipt(),
1045 proto::GetChannelMessagesResponse {
1046 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1047 messages,
1048 },
1049 )?;
1050 Ok(())
1051 }
1052
1053 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1054 self.store.read()
1055 }
1056
1057 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1058 self.store.write()
1059 }
1060}
1061
1062fn broadcast<F>(
1063 sender_id: ConnectionId,
1064 receiver_ids: Vec<ConnectionId>,
1065 mut f: F,
1066) -> anyhow::Result<()>
1067where
1068 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1069{
1070 let mut result = Ok(());
1071 for receiver_id in receiver_ids {
1072 if receiver_id != sender_id {
1073 if let Err(error) = f(receiver_id) {
1074 if result.is_ok() {
1075 result = Err(error);
1076 }
1077 }
1078 }
1079 }
1080 result
1081}
1082
1083pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1084 let server = Server::new(app.state().clone(), rpc.clone(), None);
1085 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1086 let server = server.clone();
1087 async move {
1088 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1089
1090 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1091 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1092 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1093 let client_protocol_version: Option<u32> = request
1094 .header("X-Zed-Protocol-Version")
1095 .and_then(|v| v.as_str().parse().ok());
1096
1097 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1098 return Ok(Response::new(StatusCode::UpgradeRequired));
1099 }
1100
1101 let header = match request.header("Sec-Websocket-Key") {
1102 Some(h) => h.as_str(),
1103 None => return Err(anyhow!("expected sec-websocket-key"))?,
1104 };
1105
1106 let user_id = process_auth_header(&request).await?;
1107
1108 let mut response = Response::new(StatusCode::SwitchingProtocols);
1109 response.insert_header(UPGRADE, "websocket");
1110 response.insert_header(CONNECTION, "Upgrade");
1111 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1112 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1113 response.insert_header("Sec-Websocket-Version", "13");
1114
1115 let http_res: &mut tide::http::Response = response.as_mut();
1116 let upgrade_receiver = http_res.recv_upgrade().await;
1117 let addr = request.remote().unwrap_or("unknown").to_string();
1118 task::spawn(async move {
1119 if let Some(stream) = upgrade_receiver.await {
1120 server
1121 .handle_connection(
1122 Connection::new(
1123 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1124 ),
1125 addr,
1126 user_id,
1127 None,
1128 )
1129 .await;
1130 }
1131 });
1132
1133 Ok(response)
1134 }
1135 });
1136}
1137
1138fn header_contains_ignore_case<T>(
1139 request: &tide::Request<T>,
1140 header_name: HeaderName,
1141 value: &str,
1142) -> bool {
1143 request
1144 .header(header_name)
1145 .map(|h| {
1146 h.as_str()
1147 .split(',')
1148 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1149 })
1150 .unwrap_or(false)
1151}
1152
1153#[cfg(test)]
1154mod tests {
1155 use super::*;
1156 use crate::{
1157 auth,
1158 db::{tests::TestDb, UserId},
1159 github, AppState, Config,
1160 };
1161 use ::rpc::Peer;
1162 use async_std::task;
1163 use gpui::{executor, ModelHandle, TestAppContext};
1164 use parking_lot::Mutex;
1165 use postage::{mpsc, watch};
1166 use rand::prelude::*;
1167 use rpc::PeerId;
1168 use serde_json::json;
1169 use sqlx::types::time::OffsetDateTime;
1170 use std::{
1171 ops::Deref,
1172 path::Path,
1173 rc::Rc,
1174 sync::{
1175 atomic::{AtomicBool, Ordering::SeqCst},
1176 Arc,
1177 },
1178 time::Duration,
1179 };
1180 use zed::{
1181 client::{
1182 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1183 EstablishConnectionError, UserStore,
1184 },
1185 editor::{Editor, EditorSettings, Input, MultiBuffer},
1186 fs::{FakeFs, Fs as _},
1187 language::{
1188 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1189 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1190 },
1191 lsp,
1192 project::{DiagnosticSummary, Project, ProjectPath},
1193 };
1194
1195 #[gpui::test]
1196 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1197 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1198 let lang_registry = Arc::new(LanguageRegistry::new());
1199 let fs = Arc::new(FakeFs::new(cx_a.background()));
1200 cx_a.foreground().forbid_parking();
1201
1202 // Connect to a server as 2 clients.
1203 let mut server = TestServer::start(cx_a.foreground()).await;
1204 let client_a = server.create_client(&mut cx_a, "user_a").await;
1205 let client_b = server.create_client(&mut cx_b, "user_b").await;
1206
1207 // Share a project as client A
1208 fs.insert_tree(
1209 "/a",
1210 json!({
1211 ".zed.toml": r#"collaborators = ["user_b"]"#,
1212 "a.txt": "a-contents",
1213 "b.txt": "b-contents",
1214 }),
1215 )
1216 .await;
1217 let project_a = cx_a.update(|cx| {
1218 Project::local(
1219 client_a.clone(),
1220 client_a.user_store.clone(),
1221 lang_registry.clone(),
1222 fs.clone(),
1223 cx,
1224 )
1225 });
1226 let (worktree_a, _) = project_a
1227 .update(&mut cx_a, |p, cx| {
1228 p.find_or_create_local_worktree("/a", false, cx)
1229 })
1230 .await
1231 .unwrap();
1232 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1233 worktree_a
1234 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1235 .await;
1236 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1237 project_a
1238 .update(&mut cx_a, |p, cx| p.share(cx))
1239 .await
1240 .unwrap();
1241
1242 // Join that project as client B
1243 let project_b = Project::remote(
1244 project_id,
1245 client_b.clone(),
1246 client_b.user_store.clone(),
1247 lang_registry.clone(),
1248 fs.clone(),
1249 &mut cx_b.to_async(),
1250 )
1251 .await
1252 .unwrap();
1253
1254 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1255 assert_eq!(
1256 project
1257 .collaborators()
1258 .get(&client_a.peer_id)
1259 .unwrap()
1260 .user
1261 .github_login,
1262 "user_a"
1263 );
1264 project.replica_id()
1265 });
1266 project_a
1267 .condition(&cx_a, |tree, _| {
1268 tree.collaborators()
1269 .get(&client_b.peer_id)
1270 .map_or(false, |collaborator| {
1271 collaborator.replica_id == replica_id_b
1272 && collaborator.user.github_login == "user_b"
1273 })
1274 })
1275 .await;
1276
1277 // Open the same file as client B and client A.
1278 let buffer_b = project_b
1279 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1280 .await
1281 .unwrap();
1282 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1283 buffer_b.read_with(&cx_b, |buf, cx| {
1284 assert_eq!(buf.read(cx).text(), "b-contents")
1285 });
1286 project_a.read_with(&cx_a, |project, cx| {
1287 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1288 });
1289 let buffer_a = project_a
1290 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1291 .await
1292 .unwrap();
1293
1294 let editor_b = cx_b.add_view(window_b, |cx| {
1295 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1296 });
1297
1298 // TODO
1299 // // Create a selection set as client B and see that selection set as client A.
1300 // buffer_a
1301 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1302 // .await;
1303
1304 // Edit the buffer as client B and see that edit as client A.
1305 editor_b.update(&mut cx_b, |editor, cx| {
1306 editor.handle_input(&Input("ok, ".into()), cx)
1307 });
1308 buffer_a
1309 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1310 .await;
1311
1312 // TODO
1313 // // Remove the selection set as client B, see those selections disappear as client A.
1314 cx_b.update(move |_| drop(editor_b));
1315 // buffer_a
1316 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1317 // .await;
1318
1319 // Close the buffer as client A, see that the buffer is closed.
1320 cx_a.update(move |_| drop(buffer_a));
1321 project_a
1322 .condition(&cx_a, |project, cx| {
1323 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1324 })
1325 .await;
1326
1327 // Dropping the client B's project removes client B from client A's collaborators.
1328 cx_b.update(move |_| drop(project_b));
1329 project_a
1330 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1331 .await;
1332 }
1333
1334 #[gpui::test]
1335 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1336 let lang_registry = Arc::new(LanguageRegistry::new());
1337 let fs = Arc::new(FakeFs::new(cx_a.background()));
1338 cx_a.foreground().forbid_parking();
1339
1340 // Connect to a server as 2 clients.
1341 let mut server = TestServer::start(cx_a.foreground()).await;
1342 let client_a = server.create_client(&mut cx_a, "user_a").await;
1343 let client_b = server.create_client(&mut cx_b, "user_b").await;
1344
1345 // Share a project as client A
1346 fs.insert_tree(
1347 "/a",
1348 json!({
1349 ".zed.toml": r#"collaborators = ["user_b"]"#,
1350 "a.txt": "a-contents",
1351 "b.txt": "b-contents",
1352 }),
1353 )
1354 .await;
1355 let project_a = cx_a.update(|cx| {
1356 Project::local(
1357 client_a.clone(),
1358 client_a.user_store.clone(),
1359 lang_registry.clone(),
1360 fs.clone(),
1361 cx,
1362 )
1363 });
1364 let (worktree_a, _) = project_a
1365 .update(&mut cx_a, |p, cx| {
1366 p.find_or_create_local_worktree("/a", false, cx)
1367 })
1368 .await
1369 .unwrap();
1370 worktree_a
1371 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1372 .await;
1373 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1374 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1375 project_a
1376 .update(&mut cx_a, |p, cx| p.share(cx))
1377 .await
1378 .unwrap();
1379 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1380
1381 // Join that project as client B
1382 let project_b = Project::remote(
1383 project_id,
1384 client_b.clone(),
1385 client_b.user_store.clone(),
1386 lang_registry.clone(),
1387 fs.clone(),
1388 &mut cx_b.to_async(),
1389 )
1390 .await
1391 .unwrap();
1392 project_b
1393 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1394 .await
1395 .unwrap();
1396
1397 // Unshare the project as client A
1398 project_a
1399 .update(&mut cx_a, |project, cx| project.unshare(cx))
1400 .await
1401 .unwrap();
1402 project_b
1403 .condition(&mut cx_b, |project, _| project.is_read_only())
1404 .await;
1405 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1406 drop(project_b);
1407
1408 // Share the project again and ensure guests can still join.
1409 project_a
1410 .update(&mut cx_a, |project, cx| project.share(cx))
1411 .await
1412 .unwrap();
1413 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1414
1415 let project_c = Project::remote(
1416 project_id,
1417 client_b.clone(),
1418 client_b.user_store.clone(),
1419 lang_registry.clone(),
1420 fs.clone(),
1421 &mut cx_b.to_async(),
1422 )
1423 .await
1424 .unwrap();
1425 project_c
1426 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1427 .await
1428 .unwrap();
1429 }
1430
1431 #[gpui::test]
1432 async fn test_propagate_saves_and_fs_changes(
1433 mut cx_a: TestAppContext,
1434 mut cx_b: TestAppContext,
1435 mut cx_c: TestAppContext,
1436 ) {
1437 let lang_registry = Arc::new(LanguageRegistry::new());
1438 let fs = Arc::new(FakeFs::new(cx_a.background()));
1439 cx_a.foreground().forbid_parking();
1440
1441 // Connect to a server as 3 clients.
1442 let mut server = TestServer::start(cx_a.foreground()).await;
1443 let client_a = server.create_client(&mut cx_a, "user_a").await;
1444 let client_b = server.create_client(&mut cx_b, "user_b").await;
1445 let client_c = server.create_client(&mut cx_c, "user_c").await;
1446
1447 // Share a worktree as client A.
1448 fs.insert_tree(
1449 "/a",
1450 json!({
1451 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1452 "file1": "",
1453 "file2": ""
1454 }),
1455 )
1456 .await;
1457 let project_a = cx_a.update(|cx| {
1458 Project::local(
1459 client_a.clone(),
1460 client_a.user_store.clone(),
1461 lang_registry.clone(),
1462 fs.clone(),
1463 cx,
1464 )
1465 });
1466 let (worktree_a, _) = project_a
1467 .update(&mut cx_a, |p, cx| {
1468 p.find_or_create_local_worktree("/a", false, cx)
1469 })
1470 .await
1471 .unwrap();
1472 worktree_a
1473 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1474 .await;
1475 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1476 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1477 project_a
1478 .update(&mut cx_a, |p, cx| p.share(cx))
1479 .await
1480 .unwrap();
1481
1482 // Join that worktree as clients B and C.
1483 let project_b = Project::remote(
1484 project_id,
1485 client_b.clone(),
1486 client_b.user_store.clone(),
1487 lang_registry.clone(),
1488 fs.clone(),
1489 &mut cx_b.to_async(),
1490 )
1491 .await
1492 .unwrap();
1493 let project_c = Project::remote(
1494 project_id,
1495 client_c.clone(),
1496 client_c.user_store.clone(),
1497 lang_registry.clone(),
1498 fs.clone(),
1499 &mut cx_c.to_async(),
1500 )
1501 .await
1502 .unwrap();
1503 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1504 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1505
1506 // Open and edit a buffer as both guests B and C.
1507 let buffer_b = project_b
1508 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1509 .await
1510 .unwrap();
1511 let buffer_c = project_c
1512 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1513 .await
1514 .unwrap();
1515 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1516 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1517
1518 // Open and edit that buffer as the host.
1519 let buffer_a = project_a
1520 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1521 .await
1522 .unwrap();
1523
1524 buffer_a
1525 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1526 .await;
1527 buffer_a.update(&mut cx_a, |buf, cx| {
1528 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1529 });
1530
1531 // Wait for edits to propagate
1532 buffer_a
1533 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1534 .await;
1535 buffer_b
1536 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1537 .await;
1538 buffer_c
1539 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1540 .await;
1541
1542 // Edit the buffer as the host and concurrently save as guest B.
1543 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1544 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1545 save_b.await.unwrap();
1546 assert_eq!(
1547 fs.load("/a/file1".as_ref()).await.unwrap(),
1548 "hi-a, i-am-c, i-am-b, i-am-a"
1549 );
1550 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1551 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1552 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1553
1554 // Make changes on host's file system, see those changes on guest worktrees.
1555 fs.rename("/a/file1".as_ref(), "/a/file1-renamed".as_ref())
1556 .await
1557 .unwrap();
1558 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1559 .await
1560 .unwrap();
1561 fs.insert_file(Path::new("/a/file4"), "4".into())
1562 .await
1563 .unwrap();
1564
1565 worktree_a
1566 .condition(&cx_a, |tree, _| tree.file_count() == 4)
1567 .await;
1568 worktree_b
1569 .condition(&cx_b, |tree, _| tree.file_count() == 4)
1570 .await;
1571 worktree_c
1572 .condition(&cx_c, |tree, _| tree.file_count() == 4)
1573 .await;
1574 worktree_a.read_with(&cx_a, |tree, _| {
1575 assert_eq!(
1576 tree.paths()
1577 .map(|p| p.to_string_lossy())
1578 .collect::<Vec<_>>(),
1579 &[".zed.toml", "file1-renamed", "file3", "file4"]
1580 )
1581 });
1582 worktree_b.read_with(&cx_b, |tree, _| {
1583 assert_eq!(
1584 tree.paths()
1585 .map(|p| p.to_string_lossy())
1586 .collect::<Vec<_>>(),
1587 &[".zed.toml", "file1-renamed", "file3", "file4"]
1588 )
1589 });
1590 worktree_c.read_with(&cx_c, |tree, _| {
1591 assert_eq!(
1592 tree.paths()
1593 .map(|p| p.to_string_lossy())
1594 .collect::<Vec<_>>(),
1595 &[".zed.toml", "file1-renamed", "file3", "file4"]
1596 )
1597 });
1598
1599 // Ensure buffer files are updated as well.
1600 buffer_a
1601 .condition(&cx_a, |buf, _| {
1602 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1603 })
1604 .await;
1605 buffer_b
1606 .condition(&cx_b, |buf, _| {
1607 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1608 })
1609 .await;
1610 buffer_c
1611 .condition(&cx_c, |buf, _| {
1612 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1613 })
1614 .await;
1615 }
1616
1617 #[gpui::test]
1618 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1619 cx_a.foreground().forbid_parking();
1620 let lang_registry = Arc::new(LanguageRegistry::new());
1621 let fs = Arc::new(FakeFs::new(cx_a.background()));
1622
1623 // Connect to a server as 2 clients.
1624 let mut server = TestServer::start(cx_a.foreground()).await;
1625 let client_a = server.create_client(&mut cx_a, "user_a").await;
1626 let client_b = server.create_client(&mut cx_b, "user_b").await;
1627
1628 // Share a project as client A
1629 fs.insert_tree(
1630 "/dir",
1631 json!({
1632 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1633 "a.txt": "a-contents",
1634 }),
1635 )
1636 .await;
1637
1638 let project_a = cx_a.update(|cx| {
1639 Project::local(
1640 client_a.clone(),
1641 client_a.user_store.clone(),
1642 lang_registry.clone(),
1643 fs.clone(),
1644 cx,
1645 )
1646 });
1647 let (worktree_a, _) = project_a
1648 .update(&mut cx_a, |p, cx| {
1649 p.find_or_create_local_worktree("/dir", false, cx)
1650 })
1651 .await
1652 .unwrap();
1653 worktree_a
1654 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1655 .await;
1656 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1657 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1658 project_a
1659 .update(&mut cx_a, |p, cx| p.share(cx))
1660 .await
1661 .unwrap();
1662
1663 // Join that project as client B
1664 let project_b = Project::remote(
1665 project_id,
1666 client_b.clone(),
1667 client_b.user_store.clone(),
1668 lang_registry.clone(),
1669 fs.clone(),
1670 &mut cx_b.to_async(),
1671 )
1672 .await
1673 .unwrap();
1674 let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1675
1676 // Open a buffer as client B
1677 let buffer_b = project_b
1678 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1679 .await
1680 .unwrap();
1681 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1682
1683 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1684 buffer_b.read_with(&cx_b, |buf, _| {
1685 assert!(buf.is_dirty());
1686 assert!(!buf.has_conflict());
1687 });
1688
1689 buffer_b
1690 .update(&mut cx_b, |buf, cx| buf.save(cx))
1691 .await
1692 .unwrap();
1693 worktree_b
1694 .condition(&cx_b, |_, cx| {
1695 buffer_b.read(cx).file().unwrap().mtime() != mtime
1696 })
1697 .await;
1698 buffer_b.read_with(&cx_b, |buf, _| {
1699 assert!(!buf.is_dirty());
1700 assert!(!buf.has_conflict());
1701 });
1702
1703 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1704 buffer_b.read_with(&cx_b, |buf, _| {
1705 assert!(buf.is_dirty());
1706 assert!(!buf.has_conflict());
1707 });
1708 }
1709
1710 #[gpui::test]
1711 async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1712 cx_a.foreground().forbid_parking();
1713 let lang_registry = Arc::new(LanguageRegistry::new());
1714 let fs = Arc::new(FakeFs::new(cx_a.background()));
1715
1716 // Connect to a server as 2 clients.
1717 let mut server = TestServer::start(cx_a.foreground()).await;
1718 let client_a = server.create_client(&mut cx_a, "user_a").await;
1719 let client_b = server.create_client(&mut cx_b, "user_b").await;
1720
1721 // Share a project as client A
1722 fs.insert_tree(
1723 "/dir",
1724 json!({
1725 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1726 "a.txt": "a-contents",
1727 }),
1728 )
1729 .await;
1730
1731 let project_a = cx_a.update(|cx| {
1732 Project::local(
1733 client_a.clone(),
1734 client_a.user_store.clone(),
1735 lang_registry.clone(),
1736 fs.clone(),
1737 cx,
1738 )
1739 });
1740 let (worktree_a, _) = project_a
1741 .update(&mut cx_a, |p, cx| {
1742 p.find_or_create_local_worktree("/dir", false, cx)
1743 })
1744 .await
1745 .unwrap();
1746 worktree_a
1747 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1748 .await;
1749 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1750 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1751 project_a
1752 .update(&mut cx_a, |p, cx| p.share(cx))
1753 .await
1754 .unwrap();
1755
1756 // Join that project as client B
1757 let project_b = Project::remote(
1758 project_id,
1759 client_b.clone(),
1760 client_b.user_store.clone(),
1761 lang_registry.clone(),
1762 fs.clone(),
1763 &mut cx_b.to_async(),
1764 )
1765 .await
1766 .unwrap();
1767 let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1768
1769 // Open a buffer as client B
1770 let buffer_b = project_b
1771 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1772 .await
1773 .unwrap();
1774 buffer_b.read_with(&cx_b, |buf, _| {
1775 assert!(!buf.is_dirty());
1776 assert!(!buf.has_conflict());
1777 });
1778
1779 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1780 .await
1781 .unwrap();
1782 buffer_b
1783 .condition(&cx_b, |buf, _| {
1784 buf.text() == "new contents" && !buf.is_dirty()
1785 })
1786 .await;
1787 buffer_b.read_with(&cx_b, |buf, _| {
1788 assert!(!buf.has_conflict());
1789 });
1790 }
1791
1792 #[gpui::test(iterations = 100)]
1793 async fn test_editing_while_guest_opens_buffer(
1794 mut cx_a: TestAppContext,
1795 mut cx_b: TestAppContext,
1796 ) {
1797 cx_a.foreground().forbid_parking();
1798 let lang_registry = Arc::new(LanguageRegistry::new());
1799 let fs = Arc::new(FakeFs::new(cx_a.background()));
1800
1801 // Connect to a server as 2 clients.
1802 let mut server = TestServer::start(cx_a.foreground()).await;
1803 let client_a = server.create_client(&mut cx_a, "user_a").await;
1804 let client_b = server.create_client(&mut cx_b, "user_b").await;
1805
1806 // Share a project as client A
1807 fs.insert_tree(
1808 "/dir",
1809 json!({
1810 ".zed.toml": r#"collaborators = ["user_b"]"#,
1811 "a.txt": "a-contents",
1812 }),
1813 )
1814 .await;
1815 let project_a = cx_a.update(|cx| {
1816 Project::local(
1817 client_a.clone(),
1818 client_a.user_store.clone(),
1819 lang_registry.clone(),
1820 fs.clone(),
1821 cx,
1822 )
1823 });
1824 let (worktree_a, _) = project_a
1825 .update(&mut cx_a, |p, cx| {
1826 p.find_or_create_local_worktree("/dir", false, cx)
1827 })
1828 .await
1829 .unwrap();
1830 worktree_a
1831 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1832 .await;
1833 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1834 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1835 project_a
1836 .update(&mut cx_a, |p, cx| p.share(cx))
1837 .await
1838 .unwrap();
1839
1840 // Join that project as client B
1841 let project_b = Project::remote(
1842 project_id,
1843 client_b.clone(),
1844 client_b.user_store.clone(),
1845 lang_registry.clone(),
1846 fs.clone(),
1847 &mut cx_b.to_async(),
1848 )
1849 .await
1850 .unwrap();
1851
1852 // Open a buffer as client A
1853 let buffer_a = project_a
1854 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1855 .await
1856 .unwrap();
1857
1858 // Start opening the same buffer as client B
1859 let buffer_b = cx_b
1860 .background()
1861 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1862 task::yield_now().await;
1863
1864 // Edit the buffer as client A while client B is still opening it.
1865 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1866
1867 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1868 let buffer_b = buffer_b.await.unwrap();
1869 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1870 }
1871
1872 #[gpui::test]
1873 async fn test_leaving_worktree_while_opening_buffer(
1874 mut cx_a: TestAppContext,
1875 mut cx_b: TestAppContext,
1876 ) {
1877 cx_a.foreground().forbid_parking();
1878 let lang_registry = Arc::new(LanguageRegistry::new());
1879 let fs = Arc::new(FakeFs::new(cx_a.background()));
1880
1881 // Connect to a server as 2 clients.
1882 let mut server = TestServer::start(cx_a.foreground()).await;
1883 let client_a = server.create_client(&mut cx_a, "user_a").await;
1884 let client_b = server.create_client(&mut cx_b, "user_b").await;
1885
1886 // Share a project as client A
1887 fs.insert_tree(
1888 "/dir",
1889 json!({
1890 ".zed.toml": r#"collaborators = ["user_b"]"#,
1891 "a.txt": "a-contents",
1892 }),
1893 )
1894 .await;
1895 let project_a = cx_a.update(|cx| {
1896 Project::local(
1897 client_a.clone(),
1898 client_a.user_store.clone(),
1899 lang_registry.clone(),
1900 fs.clone(),
1901 cx,
1902 )
1903 });
1904 let (worktree_a, _) = project_a
1905 .update(&mut cx_a, |p, cx| {
1906 p.find_or_create_local_worktree("/dir", false, cx)
1907 })
1908 .await
1909 .unwrap();
1910 worktree_a
1911 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1912 .await;
1913 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1914 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1915 project_a
1916 .update(&mut cx_a, |p, cx| p.share(cx))
1917 .await
1918 .unwrap();
1919
1920 // Join that project as client B
1921 let project_b = Project::remote(
1922 project_id,
1923 client_b.clone(),
1924 client_b.user_store.clone(),
1925 lang_registry.clone(),
1926 fs.clone(),
1927 &mut cx_b.to_async(),
1928 )
1929 .await
1930 .unwrap();
1931
1932 // See that a guest has joined as client A.
1933 project_a
1934 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1935 .await;
1936
1937 // Begin opening a buffer as client B, but leave the project before the open completes.
1938 let buffer_b = cx_b
1939 .background()
1940 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1941 cx_b.update(|_| drop(project_b));
1942 drop(buffer_b);
1943
1944 // See that the guest has left.
1945 project_a
1946 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1947 .await;
1948 }
1949
1950 #[gpui::test]
1951 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1952 cx_a.foreground().forbid_parking();
1953 let lang_registry = Arc::new(LanguageRegistry::new());
1954 let fs = Arc::new(FakeFs::new(cx_a.background()));
1955
1956 // Connect to a server as 2 clients.
1957 let mut server = TestServer::start(cx_a.foreground()).await;
1958 let client_a = server.create_client(&mut cx_a, "user_a").await;
1959 let client_b = server.create_client(&mut cx_b, "user_b").await;
1960
1961 // Share a project as client A
1962 fs.insert_tree(
1963 "/a",
1964 json!({
1965 ".zed.toml": r#"collaborators = ["user_b"]"#,
1966 "a.txt": "a-contents",
1967 "b.txt": "b-contents",
1968 }),
1969 )
1970 .await;
1971 let project_a = cx_a.update(|cx| {
1972 Project::local(
1973 client_a.clone(),
1974 client_a.user_store.clone(),
1975 lang_registry.clone(),
1976 fs.clone(),
1977 cx,
1978 )
1979 });
1980 let (worktree_a, _) = project_a
1981 .update(&mut cx_a, |p, cx| {
1982 p.find_or_create_local_worktree("/a", false, cx)
1983 })
1984 .await
1985 .unwrap();
1986 worktree_a
1987 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1988 .await;
1989 let project_id = project_a
1990 .update(&mut cx_a, |project, _| project.next_remote_id())
1991 .await;
1992 project_a
1993 .update(&mut cx_a, |project, cx| project.share(cx))
1994 .await
1995 .unwrap();
1996
1997 // Join that project as client B
1998 let _project_b = Project::remote(
1999 project_id,
2000 client_b.clone(),
2001 client_b.user_store.clone(),
2002 lang_registry.clone(),
2003 fs.clone(),
2004 &mut cx_b.to_async(),
2005 )
2006 .await
2007 .unwrap();
2008
2009 // See that a guest has joined as client A.
2010 project_a
2011 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2012 .await;
2013
2014 // Drop client B's connection and ensure client A observes client B leaving the worktree.
2015 client_b.disconnect(&cx_b.to_async()).unwrap();
2016 project_a
2017 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2018 .await;
2019 }
2020
2021 #[gpui::test]
2022 async fn test_collaborating_with_diagnostics(
2023 mut cx_a: TestAppContext,
2024 mut cx_b: TestAppContext,
2025 ) {
2026 cx_a.foreground().forbid_parking();
2027 let mut lang_registry = Arc::new(LanguageRegistry::new());
2028 let fs = Arc::new(FakeFs::new(cx_a.background()));
2029
2030 // Set up a fake language server.
2031 let (language_server_config, mut fake_language_server) =
2032 LanguageServerConfig::fake(cx_a.background()).await;
2033 Arc::get_mut(&mut lang_registry)
2034 .unwrap()
2035 .add(Arc::new(Language::new(
2036 LanguageConfig {
2037 name: "Rust".to_string(),
2038 path_suffixes: vec!["rs".to_string()],
2039 language_server: Some(language_server_config),
2040 ..Default::default()
2041 },
2042 Some(tree_sitter_rust::language()),
2043 )));
2044
2045 // Connect to a server as 2 clients.
2046 let mut server = TestServer::start(cx_a.foreground()).await;
2047 let client_a = server.create_client(&mut cx_a, "user_a").await;
2048 let client_b = server.create_client(&mut cx_b, "user_b").await;
2049
2050 // Share a project as client A
2051 fs.insert_tree(
2052 "/a",
2053 json!({
2054 ".zed.toml": r#"collaborators = ["user_b"]"#,
2055 "a.rs": "let one = two",
2056 "other.rs": "",
2057 }),
2058 )
2059 .await;
2060 let project_a = cx_a.update(|cx| {
2061 Project::local(
2062 client_a.clone(),
2063 client_a.user_store.clone(),
2064 lang_registry.clone(),
2065 fs.clone(),
2066 cx,
2067 )
2068 });
2069 let (worktree_a, _) = project_a
2070 .update(&mut cx_a, |p, cx| {
2071 p.find_or_create_local_worktree("/a", false, cx)
2072 })
2073 .await
2074 .unwrap();
2075 worktree_a
2076 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2077 .await;
2078 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2079 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2080 project_a
2081 .update(&mut cx_a, |p, cx| p.share(cx))
2082 .await
2083 .unwrap();
2084
2085 // Cause the language server to start.
2086 let _ = cx_a
2087 .background()
2088 .spawn(project_a.update(&mut cx_a, |project, cx| {
2089 project.open_buffer(
2090 ProjectPath {
2091 worktree_id,
2092 path: Path::new("other.rs").into(),
2093 },
2094 cx,
2095 )
2096 }))
2097 .await
2098 .unwrap();
2099
2100 // Simulate a language server reporting errors for a file.
2101 fake_language_server
2102 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2103 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2104 version: None,
2105 diagnostics: vec![lsp::Diagnostic {
2106 severity: Some(lsp::DiagnosticSeverity::ERROR),
2107 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2108 message: "message 1".to_string(),
2109 ..Default::default()
2110 }],
2111 })
2112 .await;
2113
2114 // Wait for server to see the diagnostics update.
2115 server
2116 .condition(|store| {
2117 let worktree = store
2118 .project(project_id)
2119 .unwrap()
2120 .worktrees
2121 .get(&worktree_id.to_proto())
2122 .unwrap();
2123
2124 !worktree
2125 .share
2126 .as_ref()
2127 .unwrap()
2128 .diagnostic_summaries
2129 .is_empty()
2130 })
2131 .await;
2132
2133 // Join the worktree as client B.
2134 let project_b = Project::remote(
2135 project_id,
2136 client_b.clone(),
2137 client_b.user_store.clone(),
2138 lang_registry.clone(),
2139 fs.clone(),
2140 &mut cx_b.to_async(),
2141 )
2142 .await
2143 .unwrap();
2144
2145 project_b.read_with(&cx_b, |project, cx| {
2146 assert_eq!(
2147 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2148 &[(
2149 ProjectPath {
2150 worktree_id,
2151 path: Arc::from(Path::new("a.rs")),
2152 },
2153 DiagnosticSummary {
2154 error_count: 1,
2155 warning_count: 0,
2156 ..Default::default()
2157 },
2158 )]
2159 )
2160 });
2161
2162 // Simulate a language server reporting more errors for a file.
2163 fake_language_server
2164 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2165 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2166 version: None,
2167 diagnostics: vec![
2168 lsp::Diagnostic {
2169 severity: Some(lsp::DiagnosticSeverity::ERROR),
2170 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2171 message: "message 1".to_string(),
2172 ..Default::default()
2173 },
2174 lsp::Diagnostic {
2175 severity: Some(lsp::DiagnosticSeverity::WARNING),
2176 range: lsp::Range::new(
2177 lsp::Position::new(0, 10),
2178 lsp::Position::new(0, 13),
2179 ),
2180 message: "message 2".to_string(),
2181 ..Default::default()
2182 },
2183 ],
2184 })
2185 .await;
2186
2187 // Client b gets the updated summaries
2188 project_b
2189 .condition(&cx_b, |project, cx| {
2190 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2191 == &[(
2192 ProjectPath {
2193 worktree_id,
2194 path: Arc::from(Path::new("a.rs")),
2195 },
2196 DiagnosticSummary {
2197 error_count: 1,
2198 warning_count: 1,
2199 ..Default::default()
2200 },
2201 )]
2202 })
2203 .await;
2204
2205 // Open the file with the errors on client B. They should be present.
2206 let buffer_b = cx_b
2207 .background()
2208 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2209 .await
2210 .unwrap();
2211
2212 buffer_b.read_with(&cx_b, |buffer, _| {
2213 assert_eq!(
2214 buffer
2215 .snapshot()
2216 .diagnostics_in_range::<_, Point>(0..buffer.len())
2217 .map(|entry| entry)
2218 .collect::<Vec<_>>(),
2219 &[
2220 DiagnosticEntry {
2221 range: Point::new(0, 4)..Point::new(0, 7),
2222 diagnostic: Diagnostic {
2223 group_id: 0,
2224 message: "message 1".to_string(),
2225 severity: lsp::DiagnosticSeverity::ERROR,
2226 is_primary: true,
2227 ..Default::default()
2228 }
2229 },
2230 DiagnosticEntry {
2231 range: Point::new(0, 10)..Point::new(0, 13),
2232 diagnostic: Diagnostic {
2233 group_id: 1,
2234 severity: lsp::DiagnosticSeverity::WARNING,
2235 message: "message 2".to_string(),
2236 is_primary: true,
2237 ..Default::default()
2238 }
2239 }
2240 ]
2241 );
2242 });
2243 }
2244
2245 #[gpui::test]
2246 async fn test_collaborating_with_completion(
2247 mut cx_a: TestAppContext,
2248 mut cx_b: TestAppContext,
2249 ) {
2250 cx_a.foreground().forbid_parking();
2251 let mut lang_registry = Arc::new(LanguageRegistry::new());
2252 let fs = Arc::new(FakeFs::new(cx_a.background()));
2253
2254 // Set up a fake language server.
2255 let (language_server_config, mut fake_language_server) =
2256 LanguageServerConfig::fake_with_capabilities(
2257 lsp::ServerCapabilities {
2258 completion_provider: Some(lsp::CompletionOptions {
2259 trigger_characters: Some(vec![".".to_string()]),
2260 ..Default::default()
2261 }),
2262 ..Default::default()
2263 },
2264 cx_a.background(),
2265 )
2266 .await;
2267 Arc::get_mut(&mut lang_registry)
2268 .unwrap()
2269 .add(Arc::new(Language::new(
2270 LanguageConfig {
2271 name: "Rust".to_string(),
2272 path_suffixes: vec!["rs".to_string()],
2273 language_server: Some(language_server_config),
2274 ..Default::default()
2275 },
2276 Some(tree_sitter_rust::language()),
2277 )));
2278
2279 // Connect to a server as 2 clients.
2280 let mut server = TestServer::start(cx_a.foreground()).await;
2281 let client_a = server.create_client(&mut cx_a, "user_a").await;
2282 let client_b = server.create_client(&mut cx_b, "user_b").await;
2283
2284 // Share a project as client A
2285 fs.insert_tree(
2286 "/a",
2287 json!({
2288 ".zed.toml": r#"collaborators = ["user_b"]"#,
2289 "main.rs": "fn main() { a }",
2290 "other.rs": "",
2291 }),
2292 )
2293 .await;
2294 let project_a = cx_a.update(|cx| {
2295 Project::local(
2296 client_a.clone(),
2297 client_a.user_store.clone(),
2298 lang_registry.clone(),
2299 fs.clone(),
2300 cx,
2301 )
2302 });
2303 let (worktree_a, _) = project_a
2304 .update(&mut cx_a, |p, cx| {
2305 p.find_or_create_local_worktree("/a", false, cx)
2306 })
2307 .await
2308 .unwrap();
2309 worktree_a
2310 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2311 .await;
2312 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2313 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2314 project_a
2315 .update(&mut cx_a, |p, cx| p.share(cx))
2316 .await
2317 .unwrap();
2318
2319 // Join the worktree as client B.
2320 let project_b = Project::remote(
2321 project_id,
2322 client_b.clone(),
2323 client_b.user_store.clone(),
2324 lang_registry.clone(),
2325 fs.clone(),
2326 &mut cx_b.to_async(),
2327 )
2328 .await
2329 .unwrap();
2330
2331 // Open a file in an editor as the guest.
2332 let buffer_b = project_b
2333 .update(&mut cx_b, |p, cx| {
2334 p.open_buffer((worktree_id, "main.rs"), cx)
2335 })
2336 .await
2337 .unwrap();
2338 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2339 let editor_b = cx_b.add_view(window_b, |cx| {
2340 Editor::for_buffer(
2341 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2342 Arc::new(|cx| EditorSettings::test(cx)),
2343 cx,
2344 )
2345 });
2346
2347 // Type a completion trigger character as the guest.
2348 editor_b.update(&mut cx_b, |editor, cx| {
2349 editor.select_ranges([13..13], None, cx);
2350 editor.handle_input(&Input(".".into()), cx);
2351 cx.focus(&editor_b);
2352 });
2353
2354 // Receive a completion request as the host's language server.
2355 let (request_id, params) = fake_language_server
2356 .receive_request::<lsp::request::Completion>()
2357 .await;
2358 assert_eq!(
2359 params.text_document_position.text_document.uri,
2360 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2361 );
2362 assert_eq!(
2363 params.text_document_position.position,
2364 lsp::Position::new(0, 14),
2365 );
2366
2367 // Return some completions from the host's language server.
2368 fake_language_server
2369 .respond(
2370 request_id,
2371 Some(lsp::CompletionResponse::Array(vec![
2372 lsp::CompletionItem {
2373 label: "first_method(…)".into(),
2374 detail: Some("fn(&mut self, B) -> C".into()),
2375 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2376 new_text: "first_method($1)".to_string(),
2377 range: lsp::Range::new(
2378 lsp::Position::new(0, 14),
2379 lsp::Position::new(0, 14),
2380 ),
2381 })),
2382 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2383 ..Default::default()
2384 },
2385 lsp::CompletionItem {
2386 label: "second_method(…)".into(),
2387 detail: Some("fn(&mut self, C) -> D<E>".into()),
2388 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2389 new_text: "second_method()".to_string(),
2390 range: lsp::Range::new(
2391 lsp::Position::new(0, 14),
2392 lsp::Position::new(0, 14),
2393 ),
2394 })),
2395 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2396 ..Default::default()
2397 },
2398 ])),
2399 )
2400 .await;
2401
2402 // Open the buffer on the host.
2403 let buffer_a = project_a
2404 .update(&mut cx_a, |p, cx| {
2405 p.open_buffer((worktree_id, "main.rs"), cx)
2406 })
2407 .await
2408 .unwrap();
2409 buffer_a
2410 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2411 .await;
2412
2413 // Confirm a completion on the guest.
2414 editor_b.next_notification(&cx_b).await;
2415 editor_b.update(&mut cx_b, |editor, cx| {
2416 assert!(editor.has_completions());
2417 editor.confirm_completion(Some(0), cx);
2418 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2419 });
2420
2421 buffer_a
2422 .condition(&cx_a, |buffer, _| {
2423 buffer.text() == "fn main() { a.first_method() }"
2424 })
2425 .await;
2426
2427 // Receive a request resolve the selected completion on the host's language server.
2428 let (request_id, params) = fake_language_server
2429 .receive_request::<lsp::request::ResolveCompletionItem>()
2430 .await;
2431 assert_eq!(params.label, "first_method(…)");
2432
2433 // Return a resolved completion from the host's language server.
2434 // The resolved completion has an additional text edit.
2435 fake_language_server
2436 .respond(
2437 request_id,
2438 lsp::CompletionItem {
2439 label: "first_method(…)".into(),
2440 detail: Some("fn(&mut self, B) -> C".into()),
2441 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2442 new_text: "first_method($1)".to_string(),
2443 range: lsp::Range::new(
2444 lsp::Position::new(0, 14),
2445 lsp::Position::new(0, 14),
2446 ),
2447 })),
2448 additional_text_edits: Some(vec![lsp::TextEdit {
2449 new_text: "use d::SomeTrait;\n".to_string(),
2450 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2451 }]),
2452 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2453 ..Default::default()
2454 },
2455 )
2456 .await;
2457
2458 // The additional edit is applied.
2459 buffer_b
2460 .condition(&cx_b, |buffer, _| {
2461 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2462 })
2463 .await;
2464 assert_eq!(
2465 buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2466 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2467 );
2468 }
2469
2470 #[gpui::test]
2471 async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2472 cx_a.foreground().forbid_parking();
2473 let mut lang_registry = Arc::new(LanguageRegistry::new());
2474 let fs = Arc::new(FakeFs::new(cx_a.background()));
2475
2476 // Set up a fake language server.
2477 let (language_server_config, mut fake_language_server) =
2478 LanguageServerConfig::fake(cx_a.background()).await;
2479 Arc::get_mut(&mut lang_registry)
2480 .unwrap()
2481 .add(Arc::new(Language::new(
2482 LanguageConfig {
2483 name: "Rust".to_string(),
2484 path_suffixes: vec!["rs".to_string()],
2485 language_server: Some(language_server_config),
2486 ..Default::default()
2487 },
2488 Some(tree_sitter_rust::language()),
2489 )));
2490
2491 // Connect to a server as 2 clients.
2492 let mut server = TestServer::start(cx_a.foreground()).await;
2493 let client_a = server.create_client(&mut cx_a, "user_a").await;
2494 let client_b = server.create_client(&mut cx_b, "user_b").await;
2495
2496 // Share a project as client A
2497 fs.insert_tree(
2498 "/a",
2499 json!({
2500 ".zed.toml": r#"collaborators = ["user_b"]"#,
2501 "a.rs": "let one = two",
2502 }),
2503 )
2504 .await;
2505 let project_a = cx_a.update(|cx| {
2506 Project::local(
2507 client_a.clone(),
2508 client_a.user_store.clone(),
2509 lang_registry.clone(),
2510 fs.clone(),
2511 cx,
2512 )
2513 });
2514 let (worktree_a, _) = project_a
2515 .update(&mut cx_a, |p, cx| {
2516 p.find_or_create_local_worktree("/a", false, cx)
2517 })
2518 .await
2519 .unwrap();
2520 worktree_a
2521 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2522 .await;
2523 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2524 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2525 project_a
2526 .update(&mut cx_a, |p, cx| p.share(cx))
2527 .await
2528 .unwrap();
2529
2530 // Join the worktree as client B.
2531 let project_b = Project::remote(
2532 project_id,
2533 client_b.clone(),
2534 client_b.user_store.clone(),
2535 lang_registry.clone(),
2536 fs.clone(),
2537 &mut cx_b.to_async(),
2538 )
2539 .await
2540 .unwrap();
2541
2542 let buffer_b = cx_b
2543 .background()
2544 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2545 .await
2546 .unwrap();
2547
2548 let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
2549 let (request_id, _) = fake_language_server
2550 .receive_request::<lsp::request::Formatting>()
2551 .await;
2552 fake_language_server
2553 .respond(
2554 request_id,
2555 Some(vec![
2556 lsp::TextEdit {
2557 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2558 new_text: "h".to_string(),
2559 },
2560 lsp::TextEdit {
2561 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2562 new_text: "y".to_string(),
2563 },
2564 ]),
2565 )
2566 .await;
2567 format.await.unwrap();
2568 assert_eq!(
2569 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2570 "let honey = two"
2571 );
2572 }
2573
2574 #[gpui::test]
2575 async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2576 cx_a.foreground().forbid_parking();
2577 let mut lang_registry = Arc::new(LanguageRegistry::new());
2578 let fs = Arc::new(FakeFs::new(cx_a.background()));
2579 fs.insert_tree(
2580 "/root-1",
2581 json!({
2582 ".zed.toml": r#"collaborators = ["user_b"]"#,
2583 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2584 }),
2585 )
2586 .await;
2587 fs.insert_tree(
2588 "/root-2",
2589 json!({
2590 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2591 }),
2592 )
2593 .await;
2594
2595 // Set up a fake language server.
2596 let (language_server_config, mut fake_language_server) =
2597 LanguageServerConfig::fake(cx_a.background()).await;
2598 Arc::get_mut(&mut lang_registry)
2599 .unwrap()
2600 .add(Arc::new(Language::new(
2601 LanguageConfig {
2602 name: "Rust".to_string(),
2603 path_suffixes: vec!["rs".to_string()],
2604 language_server: Some(language_server_config),
2605 ..Default::default()
2606 },
2607 Some(tree_sitter_rust::language()),
2608 )));
2609
2610 // Connect to a server as 2 clients.
2611 let mut server = TestServer::start(cx_a.foreground()).await;
2612 let client_a = server.create_client(&mut cx_a, "user_a").await;
2613 let client_b = server.create_client(&mut cx_b, "user_b").await;
2614
2615 // Share a project as client A
2616 let project_a = cx_a.update(|cx| {
2617 Project::local(
2618 client_a.clone(),
2619 client_a.user_store.clone(),
2620 lang_registry.clone(),
2621 fs.clone(),
2622 cx,
2623 )
2624 });
2625 let (worktree_a, _) = project_a
2626 .update(&mut cx_a, |p, cx| {
2627 p.find_or_create_local_worktree("/root-1", false, cx)
2628 })
2629 .await
2630 .unwrap();
2631 worktree_a
2632 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2633 .await;
2634 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2635 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2636 project_a
2637 .update(&mut cx_a, |p, cx| p.share(cx))
2638 .await
2639 .unwrap();
2640
2641 // Join the worktree as client B.
2642 let project_b = Project::remote(
2643 project_id,
2644 client_b.clone(),
2645 client_b.user_store.clone(),
2646 lang_registry.clone(),
2647 fs.clone(),
2648 &mut cx_b.to_async(),
2649 )
2650 .await
2651 .unwrap();
2652
2653 // Open the file to be formatted on client B.
2654 let buffer_b = cx_b
2655 .background()
2656 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2657 .await
2658 .unwrap();
2659
2660 let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2661 let (request_id, _) = fake_language_server
2662 .receive_request::<lsp::request::GotoDefinition>()
2663 .await;
2664 fake_language_server
2665 .respond(
2666 request_id,
2667 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2668 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2669 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2670 ))),
2671 )
2672 .await;
2673 let definitions_1 = definitions_1.await.unwrap();
2674 cx_b.read(|cx| {
2675 assert_eq!(definitions_1.len(), 1);
2676 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2677 let target_buffer = definitions_1[0].target_buffer.read(cx);
2678 assert_eq!(
2679 target_buffer.text(),
2680 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2681 );
2682 assert_eq!(
2683 definitions_1[0].target_range.to_point(target_buffer),
2684 Point::new(0, 6)..Point::new(0, 9)
2685 );
2686 });
2687
2688 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2689 // the previous call to `definition`.
2690 let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2691 let (request_id, _) = fake_language_server
2692 .receive_request::<lsp::request::GotoDefinition>()
2693 .await;
2694 fake_language_server
2695 .respond(
2696 request_id,
2697 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2698 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2699 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2700 ))),
2701 )
2702 .await;
2703 let definitions_2 = definitions_2.await.unwrap();
2704 cx_b.read(|cx| {
2705 assert_eq!(definitions_2.len(), 1);
2706 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2707 let target_buffer = definitions_2[0].target_buffer.read(cx);
2708 assert_eq!(
2709 target_buffer.text(),
2710 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2711 );
2712 assert_eq!(
2713 definitions_2[0].target_range.to_point(target_buffer),
2714 Point::new(1, 6)..Point::new(1, 11)
2715 );
2716 });
2717 assert_eq!(
2718 definitions_1[0].target_buffer,
2719 definitions_2[0].target_buffer
2720 );
2721
2722 cx_b.update(|_| {
2723 drop(definitions_1);
2724 drop(definitions_2);
2725 });
2726 project_b
2727 .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2728 .await;
2729 }
2730
2731 #[gpui::test]
2732 async fn test_open_buffer_while_getting_definition_pointing_to_it(
2733 mut cx_a: TestAppContext,
2734 mut cx_b: TestAppContext,
2735 mut rng: StdRng,
2736 ) {
2737 cx_a.foreground().forbid_parking();
2738 let mut lang_registry = Arc::new(LanguageRegistry::new());
2739 let fs = Arc::new(FakeFs::new(cx_a.background()));
2740 fs.insert_tree(
2741 "/root",
2742 json!({
2743 ".zed.toml": r#"collaborators = ["user_b"]"#,
2744 "a.rs": "const ONE: usize = b::TWO;",
2745 "b.rs": "const TWO: usize = 2",
2746 }),
2747 )
2748 .await;
2749
2750 // Set up a fake language server.
2751 let (language_server_config, mut fake_language_server) =
2752 LanguageServerConfig::fake(cx_a.background()).await;
2753 Arc::get_mut(&mut lang_registry)
2754 .unwrap()
2755 .add(Arc::new(Language::new(
2756 LanguageConfig {
2757 name: "Rust".to_string(),
2758 path_suffixes: vec!["rs".to_string()],
2759 language_server: Some(language_server_config),
2760 ..Default::default()
2761 },
2762 Some(tree_sitter_rust::language()),
2763 )));
2764
2765 // Connect to a server as 2 clients.
2766 let mut server = TestServer::start(cx_a.foreground()).await;
2767 let client_a = server.create_client(&mut cx_a, "user_a").await;
2768 let client_b = server.create_client(&mut cx_b, "user_b").await;
2769
2770 // Share a project as client A
2771 let project_a = cx_a.update(|cx| {
2772 Project::local(
2773 client_a.clone(),
2774 client_a.user_store.clone(),
2775 lang_registry.clone(),
2776 fs.clone(),
2777 cx,
2778 )
2779 });
2780 let (worktree_a, _) = project_a
2781 .update(&mut cx_a, |p, cx| {
2782 p.find_or_create_local_worktree("/root", false, cx)
2783 })
2784 .await
2785 .unwrap();
2786 worktree_a
2787 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2788 .await;
2789 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2790 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2791 project_a
2792 .update(&mut cx_a, |p, cx| p.share(cx))
2793 .await
2794 .unwrap();
2795
2796 // Join the worktree as client B.
2797 let project_b = Project::remote(
2798 project_id,
2799 client_b.clone(),
2800 client_b.user_store.clone(),
2801 lang_registry.clone(),
2802 fs.clone(),
2803 &mut cx_b.to_async(),
2804 )
2805 .await
2806 .unwrap();
2807
2808 let buffer_b1 = cx_b
2809 .background()
2810 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2811 .await
2812 .unwrap();
2813
2814 let definitions;
2815 let buffer_b2;
2816 if rng.gen() {
2817 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2818 buffer_b2 =
2819 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2820 } else {
2821 buffer_b2 =
2822 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2823 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2824 }
2825
2826 let (request_id, _) = fake_language_server
2827 .receive_request::<lsp::request::GotoDefinition>()
2828 .await;
2829 fake_language_server
2830 .respond(
2831 request_id,
2832 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2833 lsp::Url::from_file_path("/root/b.rs").unwrap(),
2834 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2835 ))),
2836 )
2837 .await;
2838
2839 let buffer_b2 = buffer_b2.await.unwrap();
2840 let definitions = definitions.await.unwrap();
2841 assert_eq!(definitions.len(), 1);
2842 assert_eq!(definitions[0].target_buffer, buffer_b2);
2843 }
2844
2845 #[gpui::test]
2846 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2847 cx_a.foreground().forbid_parking();
2848
2849 // Connect to a server as 2 clients.
2850 let mut server = TestServer::start(cx_a.foreground()).await;
2851 let client_a = server.create_client(&mut cx_a, "user_a").await;
2852 let client_b = server.create_client(&mut cx_b, "user_b").await;
2853
2854 // Create an org that includes these 2 users.
2855 let db = &server.app_state.db;
2856 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2857 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2858 .await
2859 .unwrap();
2860 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2861 .await
2862 .unwrap();
2863
2864 // Create a channel that includes all the users.
2865 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2866 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2867 .await
2868 .unwrap();
2869 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2870 .await
2871 .unwrap();
2872 db.create_channel_message(
2873 channel_id,
2874 client_b.current_user_id(&cx_b),
2875 "hello A, it's B.",
2876 OffsetDateTime::now_utc(),
2877 1,
2878 )
2879 .await
2880 .unwrap();
2881
2882 let channels_a = cx_a
2883 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2884 channels_a
2885 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2886 .await;
2887 channels_a.read_with(&cx_a, |list, _| {
2888 assert_eq!(
2889 list.available_channels().unwrap(),
2890 &[ChannelDetails {
2891 id: channel_id.to_proto(),
2892 name: "test-channel".to_string()
2893 }]
2894 )
2895 });
2896 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2897 this.get_channel(channel_id.to_proto(), cx).unwrap()
2898 });
2899 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2900 channel_a
2901 .condition(&cx_a, |channel, _| {
2902 channel_messages(channel)
2903 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2904 })
2905 .await;
2906
2907 let channels_b = cx_b
2908 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2909 channels_b
2910 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2911 .await;
2912 channels_b.read_with(&cx_b, |list, _| {
2913 assert_eq!(
2914 list.available_channels().unwrap(),
2915 &[ChannelDetails {
2916 id: channel_id.to_proto(),
2917 name: "test-channel".to_string()
2918 }]
2919 )
2920 });
2921
2922 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2923 this.get_channel(channel_id.to_proto(), cx).unwrap()
2924 });
2925 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2926 channel_b
2927 .condition(&cx_b, |channel, _| {
2928 channel_messages(channel)
2929 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2930 })
2931 .await;
2932
2933 channel_a
2934 .update(&mut cx_a, |channel, cx| {
2935 channel
2936 .send_message("oh, hi B.".to_string(), cx)
2937 .unwrap()
2938 .detach();
2939 let task = channel.send_message("sup".to_string(), cx).unwrap();
2940 assert_eq!(
2941 channel_messages(channel),
2942 &[
2943 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2944 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2945 ("user_a".to_string(), "sup".to_string(), true)
2946 ]
2947 );
2948 task
2949 })
2950 .await
2951 .unwrap();
2952
2953 channel_b
2954 .condition(&cx_b, |channel, _| {
2955 channel_messages(channel)
2956 == [
2957 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2958 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2959 ("user_a".to_string(), "sup".to_string(), false),
2960 ]
2961 })
2962 .await;
2963
2964 assert_eq!(
2965 server
2966 .state()
2967 .await
2968 .channel(channel_id)
2969 .unwrap()
2970 .connection_ids
2971 .len(),
2972 2
2973 );
2974 cx_b.update(|_| drop(channel_b));
2975 server
2976 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2977 .await;
2978
2979 cx_a.update(|_| drop(channel_a));
2980 server
2981 .condition(|state| state.channel(channel_id).is_none())
2982 .await;
2983 }
2984
2985 #[gpui::test]
2986 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2987 cx_a.foreground().forbid_parking();
2988
2989 let mut server = TestServer::start(cx_a.foreground()).await;
2990 let client_a = server.create_client(&mut cx_a, "user_a").await;
2991
2992 let db = &server.app_state.db;
2993 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2994 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2995 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2996 .await
2997 .unwrap();
2998 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2999 .await
3000 .unwrap();
3001
3002 let channels_a = cx_a
3003 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3004 channels_a
3005 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3006 .await;
3007 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3008 this.get_channel(channel_id.to_proto(), cx).unwrap()
3009 });
3010
3011 // Messages aren't allowed to be too long.
3012 channel_a
3013 .update(&mut cx_a, |channel, cx| {
3014 let long_body = "this is long.\n".repeat(1024);
3015 channel.send_message(long_body, cx).unwrap()
3016 })
3017 .await
3018 .unwrap_err();
3019
3020 // Messages aren't allowed to be blank.
3021 channel_a.update(&mut cx_a, |channel, cx| {
3022 channel.send_message(String::new(), cx).unwrap_err()
3023 });
3024
3025 // Leading and trailing whitespace are trimmed.
3026 channel_a
3027 .update(&mut cx_a, |channel, cx| {
3028 channel
3029 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3030 .unwrap()
3031 })
3032 .await
3033 .unwrap();
3034 assert_eq!(
3035 db.get_channel_messages(channel_id, 10, None)
3036 .await
3037 .unwrap()
3038 .iter()
3039 .map(|m| &m.body)
3040 .collect::<Vec<_>>(),
3041 &["surrounded by whitespace"]
3042 );
3043 }
3044
3045 #[gpui::test]
3046 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3047 cx_a.foreground().forbid_parking();
3048
3049 // Connect to a server as 2 clients.
3050 let mut server = TestServer::start(cx_a.foreground()).await;
3051 let client_a = server.create_client(&mut cx_a, "user_a").await;
3052 let client_b = server.create_client(&mut cx_b, "user_b").await;
3053 let mut status_b = client_b.status();
3054
3055 // Create an org that includes these 2 users.
3056 let db = &server.app_state.db;
3057 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3058 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3059 .await
3060 .unwrap();
3061 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3062 .await
3063 .unwrap();
3064
3065 // Create a channel that includes all the users.
3066 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3067 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3068 .await
3069 .unwrap();
3070 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3071 .await
3072 .unwrap();
3073 db.create_channel_message(
3074 channel_id,
3075 client_b.current_user_id(&cx_b),
3076 "hello A, it's B.",
3077 OffsetDateTime::now_utc(),
3078 2,
3079 )
3080 .await
3081 .unwrap();
3082
3083 let channels_a = cx_a
3084 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3085 channels_a
3086 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3087 .await;
3088
3089 channels_a.read_with(&cx_a, |list, _| {
3090 assert_eq!(
3091 list.available_channels().unwrap(),
3092 &[ChannelDetails {
3093 id: channel_id.to_proto(),
3094 name: "test-channel".to_string()
3095 }]
3096 )
3097 });
3098 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3099 this.get_channel(channel_id.to_proto(), cx).unwrap()
3100 });
3101 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3102 channel_a
3103 .condition(&cx_a, |channel, _| {
3104 channel_messages(channel)
3105 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3106 })
3107 .await;
3108
3109 let channels_b = cx_b
3110 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3111 channels_b
3112 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3113 .await;
3114 channels_b.read_with(&cx_b, |list, _| {
3115 assert_eq!(
3116 list.available_channels().unwrap(),
3117 &[ChannelDetails {
3118 id: channel_id.to_proto(),
3119 name: "test-channel".to_string()
3120 }]
3121 )
3122 });
3123
3124 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3125 this.get_channel(channel_id.to_proto(), cx).unwrap()
3126 });
3127 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3128 channel_b
3129 .condition(&cx_b, |channel, _| {
3130 channel_messages(channel)
3131 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3132 })
3133 .await;
3134
3135 // Disconnect client B, ensuring we can still access its cached channel data.
3136 server.forbid_connections();
3137 server.disconnect_client(client_b.current_user_id(&cx_b));
3138 while !matches!(
3139 status_b.next().await,
3140 Some(client::Status::ReconnectionError { .. })
3141 ) {}
3142
3143 channels_b.read_with(&cx_b, |channels, _| {
3144 assert_eq!(
3145 channels.available_channels().unwrap(),
3146 [ChannelDetails {
3147 id: channel_id.to_proto(),
3148 name: "test-channel".to_string()
3149 }]
3150 )
3151 });
3152 channel_b.read_with(&cx_b, |channel, _| {
3153 assert_eq!(
3154 channel_messages(channel),
3155 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3156 )
3157 });
3158
3159 // Send a message from client B while it is disconnected.
3160 channel_b
3161 .update(&mut cx_b, |channel, cx| {
3162 let task = channel
3163 .send_message("can you see this?".to_string(), cx)
3164 .unwrap();
3165 assert_eq!(
3166 channel_messages(channel),
3167 &[
3168 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3169 ("user_b".to_string(), "can you see this?".to_string(), true)
3170 ]
3171 );
3172 task
3173 })
3174 .await
3175 .unwrap_err();
3176
3177 // Send a message from client A while B is disconnected.
3178 channel_a
3179 .update(&mut cx_a, |channel, cx| {
3180 channel
3181 .send_message("oh, hi B.".to_string(), cx)
3182 .unwrap()
3183 .detach();
3184 let task = channel.send_message("sup".to_string(), cx).unwrap();
3185 assert_eq!(
3186 channel_messages(channel),
3187 &[
3188 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3189 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3190 ("user_a".to_string(), "sup".to_string(), true)
3191 ]
3192 );
3193 task
3194 })
3195 .await
3196 .unwrap();
3197
3198 // Give client B a chance to reconnect.
3199 server.allow_connections();
3200 cx_b.foreground().advance_clock(Duration::from_secs(10));
3201
3202 // Verify that B sees the new messages upon reconnection, as well as the message client B
3203 // sent while offline.
3204 channel_b
3205 .condition(&cx_b, |channel, _| {
3206 channel_messages(channel)
3207 == [
3208 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3209 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3210 ("user_a".to_string(), "sup".to_string(), false),
3211 ("user_b".to_string(), "can you see this?".to_string(), false),
3212 ]
3213 })
3214 .await;
3215
3216 // Ensure client A and B can communicate normally after reconnection.
3217 channel_a
3218 .update(&mut cx_a, |channel, cx| {
3219 channel.send_message("you online?".to_string(), cx).unwrap()
3220 })
3221 .await
3222 .unwrap();
3223 channel_b
3224 .condition(&cx_b, |channel, _| {
3225 channel_messages(channel)
3226 == [
3227 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3228 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3229 ("user_a".to_string(), "sup".to_string(), false),
3230 ("user_b".to_string(), "can you see this?".to_string(), false),
3231 ("user_a".to_string(), "you online?".to_string(), false),
3232 ]
3233 })
3234 .await;
3235
3236 channel_b
3237 .update(&mut cx_b, |channel, cx| {
3238 channel.send_message("yep".to_string(), cx).unwrap()
3239 })
3240 .await
3241 .unwrap();
3242 channel_a
3243 .condition(&cx_a, |channel, _| {
3244 channel_messages(channel)
3245 == [
3246 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3247 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3248 ("user_a".to_string(), "sup".to_string(), false),
3249 ("user_b".to_string(), "can you see this?".to_string(), false),
3250 ("user_a".to_string(), "you online?".to_string(), false),
3251 ("user_b".to_string(), "yep".to_string(), false),
3252 ]
3253 })
3254 .await;
3255 }
3256
3257 #[gpui::test]
3258 async fn test_contacts(
3259 mut cx_a: TestAppContext,
3260 mut cx_b: TestAppContext,
3261 mut cx_c: TestAppContext,
3262 ) {
3263 cx_a.foreground().forbid_parking();
3264 let lang_registry = Arc::new(LanguageRegistry::new());
3265 let fs = Arc::new(FakeFs::new(cx_a.background()));
3266
3267 // Connect to a server as 3 clients.
3268 let mut server = TestServer::start(cx_a.foreground()).await;
3269 let client_a = server.create_client(&mut cx_a, "user_a").await;
3270 let client_b = server.create_client(&mut cx_b, "user_b").await;
3271 let client_c = server.create_client(&mut cx_c, "user_c").await;
3272
3273 // Share a worktree as client A.
3274 fs.insert_tree(
3275 "/a",
3276 json!({
3277 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3278 }),
3279 )
3280 .await;
3281
3282 let project_a = cx_a.update(|cx| {
3283 Project::local(
3284 client_a.clone(),
3285 client_a.user_store.clone(),
3286 lang_registry.clone(),
3287 fs.clone(),
3288 cx,
3289 )
3290 });
3291 let (worktree_a, _) = project_a
3292 .update(&mut cx_a, |p, cx| {
3293 p.find_or_create_local_worktree("/a", false, cx)
3294 })
3295 .await
3296 .unwrap();
3297 worktree_a
3298 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3299 .await;
3300
3301 client_a
3302 .user_store
3303 .condition(&cx_a, |user_store, _| {
3304 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3305 })
3306 .await;
3307 client_b
3308 .user_store
3309 .condition(&cx_b, |user_store, _| {
3310 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3311 })
3312 .await;
3313 client_c
3314 .user_store
3315 .condition(&cx_c, |user_store, _| {
3316 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3317 })
3318 .await;
3319
3320 let project_id = project_a
3321 .update(&mut cx_a, |project, _| project.next_remote_id())
3322 .await;
3323 project_a
3324 .update(&mut cx_a, |project, cx| project.share(cx))
3325 .await
3326 .unwrap();
3327
3328 let _project_b = Project::remote(
3329 project_id,
3330 client_b.clone(),
3331 client_b.user_store.clone(),
3332 lang_registry.clone(),
3333 fs.clone(),
3334 &mut cx_b.to_async(),
3335 )
3336 .await
3337 .unwrap();
3338
3339 client_a
3340 .user_store
3341 .condition(&cx_a, |user_store, _| {
3342 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3343 })
3344 .await;
3345 client_b
3346 .user_store
3347 .condition(&cx_b, |user_store, _| {
3348 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3349 })
3350 .await;
3351 client_c
3352 .user_store
3353 .condition(&cx_c, |user_store, _| {
3354 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3355 })
3356 .await;
3357
3358 project_a
3359 .condition(&cx_a, |project, _| {
3360 project.collaborators().contains_key(&client_b.peer_id)
3361 })
3362 .await;
3363
3364 cx_a.update(move |_| drop(project_a));
3365 client_a
3366 .user_store
3367 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3368 .await;
3369 client_b
3370 .user_store
3371 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3372 .await;
3373 client_c
3374 .user_store
3375 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3376 .await;
3377
3378 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3379 user_store
3380 .contacts()
3381 .iter()
3382 .map(|contact| {
3383 let worktrees = contact
3384 .projects
3385 .iter()
3386 .map(|p| {
3387 (
3388 p.worktree_root_names[0].as_str(),
3389 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3390 )
3391 })
3392 .collect();
3393 (contact.user.github_login.as_str(), worktrees)
3394 })
3395 .collect()
3396 }
3397 }
3398
3399 struct TestServer {
3400 peer: Arc<Peer>,
3401 app_state: Arc<AppState>,
3402 server: Arc<Server>,
3403 foreground: Rc<executor::Foreground>,
3404 notifications: mpsc::Receiver<()>,
3405 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3406 forbid_connections: Arc<AtomicBool>,
3407 _test_db: TestDb,
3408 }
3409
3410 impl TestServer {
3411 async fn start(foreground: Rc<executor::Foreground>) -> Self {
3412 let test_db = TestDb::new();
3413 let app_state = Self::build_app_state(&test_db).await;
3414 let peer = Peer::new();
3415 let notifications = mpsc::channel(128);
3416 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3417 Self {
3418 peer,
3419 app_state,
3420 server,
3421 foreground,
3422 notifications: notifications.1,
3423 connection_killers: Default::default(),
3424 forbid_connections: Default::default(),
3425 _test_db: test_db,
3426 }
3427 }
3428
3429 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3430 let http = FakeHttpClient::with_404_response();
3431 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3432 let client_name = name.to_string();
3433 let mut client = Client::new(http.clone());
3434 let server = self.server.clone();
3435 let connection_killers = self.connection_killers.clone();
3436 let forbid_connections = self.forbid_connections.clone();
3437 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3438
3439 Arc::get_mut(&mut client)
3440 .unwrap()
3441 .override_authenticate(move |cx| {
3442 cx.spawn(|_| async move {
3443 let access_token = "the-token".to_string();
3444 Ok(Credentials {
3445 user_id: user_id.0 as u64,
3446 access_token,
3447 })
3448 })
3449 })
3450 .override_establish_connection(move |credentials, cx| {
3451 assert_eq!(credentials.user_id, user_id.0 as u64);
3452 assert_eq!(credentials.access_token, "the-token");
3453
3454 let server = server.clone();
3455 let connection_killers = connection_killers.clone();
3456 let forbid_connections = forbid_connections.clone();
3457 let client_name = client_name.clone();
3458 let connection_id_tx = connection_id_tx.clone();
3459 cx.spawn(move |cx| async move {
3460 if forbid_connections.load(SeqCst) {
3461 Err(EstablishConnectionError::other(anyhow!(
3462 "server is forbidding connections"
3463 )))
3464 } else {
3465 let (client_conn, server_conn, kill_conn) =
3466 Connection::in_memory(cx.background());
3467 connection_killers.lock().insert(user_id, kill_conn);
3468 cx.background()
3469 .spawn(server.handle_connection(
3470 server_conn,
3471 client_name,
3472 user_id,
3473 Some(connection_id_tx),
3474 ))
3475 .detach();
3476 Ok(client_conn)
3477 }
3478 })
3479 });
3480
3481 client
3482 .authenticate_and_connect(&cx.to_async())
3483 .await
3484 .unwrap();
3485
3486 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3487 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3488 let mut authed_user =
3489 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3490 while authed_user.next().await.unwrap().is_none() {}
3491
3492 TestClient {
3493 client,
3494 peer_id,
3495 user_store,
3496 }
3497 }
3498
3499 fn disconnect_client(&self, user_id: UserId) {
3500 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3501 let _ = kill_conn.try_send(Some(()));
3502 }
3503 }
3504
3505 fn forbid_connections(&self) {
3506 self.forbid_connections.store(true, SeqCst);
3507 }
3508
3509 fn allow_connections(&self) {
3510 self.forbid_connections.store(false, SeqCst);
3511 }
3512
3513 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3514 let mut config = Config::default();
3515 config.session_secret = "a".repeat(32);
3516 config.database_url = test_db.url.clone();
3517 let github_client = github::AppClient::test();
3518 Arc::new(AppState {
3519 db: test_db.db().clone(),
3520 handlebars: Default::default(),
3521 auth_client: auth::build_client("", ""),
3522 repo_client: github::RepoClient::test(&github_client),
3523 github_client,
3524 config,
3525 })
3526 }
3527
3528 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3529 self.server.store.read()
3530 }
3531
3532 async fn condition<F>(&mut self, mut predicate: F)
3533 where
3534 F: FnMut(&Store) -> bool,
3535 {
3536 async_std::future::timeout(Duration::from_millis(500), async {
3537 while !(predicate)(&*self.server.store.read()) {
3538 self.foreground.start_waiting();
3539 self.notifications.next().await;
3540 self.foreground.finish_waiting();
3541 }
3542 })
3543 .await
3544 .expect("condition timed out");
3545 }
3546 }
3547
3548 impl Drop for TestServer {
3549 fn drop(&mut self) {
3550 self.peer.reset();
3551 }
3552 }
3553
3554 struct TestClient {
3555 client: Arc<Client>,
3556 pub peer_id: PeerId,
3557 pub user_store: ModelHandle<UserStore>,
3558 }
3559
3560 impl Deref for TestClient {
3561 type Target = Arc<Client>;
3562
3563 fn deref(&self) -> &Self::Target {
3564 &self.client
3565 }
3566 }
3567
3568 impl TestClient {
3569 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3570 UserId::from_proto(
3571 self.user_store
3572 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3573 )
3574 }
3575 }
3576
3577 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3578 channel
3579 .messages()
3580 .cursor::<()>()
3581 .map(|m| {
3582 (
3583 m.sender.github_login.clone(),
3584 m.body.clone(),
3585 m.is_pending(),
3586 )
3587 })
3588 .collect()
3589 }
3590
3591 struct EmptyView;
3592
3593 impl gpui::Entity for EmptyView {
3594 type Event = ();
3595 }
3596
3597 impl gpui::View for EmptyView {
3598 fn ui_name() -> &'static str {
3599 "empty view"
3600 }
3601
3602 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3603 gpui::Element::boxed(gpui::elements::Empty)
3604 }
3605 }
3606}