1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44const MESSAGE_COUNT_PER_PAGE: usize = 100;
45const MAX_MESSAGE_LEN: usize = 1024;
46const NO_SUCH_PROJECT: &'static str = "no such project";
47
48impl Server {
49 pub fn new(
50 app_state: Arc<AppState>,
51 peer: Arc<Peer>,
52 notifications: Option<mpsc::Sender<()>>,
53 ) -> Arc<Self> {
54 let mut server = Self {
55 peer,
56 app_state,
57 store: Default::default(),
58 handlers: Default::default(),
59 notifications,
60 };
61
62 server
63 .add_handler(Server::ping)
64 .add_handler(Server::register_project)
65 .add_handler(Server::unregister_project)
66 .add_handler(Server::share_project)
67 .add_handler(Server::unshare_project)
68 .add_handler(Server::join_project)
69 .add_handler(Server::leave_project)
70 .add_handler(Server::register_worktree)
71 .add_handler(Server::unregister_worktree)
72 .add_handler(Server::share_worktree)
73 .add_handler(Server::update_worktree)
74 .add_handler(Server::update_diagnostic_summary)
75 .add_handler(Server::disk_based_diagnostics_updating)
76 .add_handler(Server::disk_based_diagnostics_updated)
77 .add_handler(Server::get_definition)
78 .add_handler(Server::open_buffer)
79 .add_handler(Server::close_buffer)
80 .add_handler(Server::update_buffer)
81 .add_handler(Server::update_buffer_file)
82 .add_handler(Server::buffer_reloaded)
83 .add_handler(Server::buffer_saved)
84 .add_handler(Server::save_buffer)
85 .add_handler(Server::format_buffer)
86 .add_handler(Server::get_completions)
87 .add_handler(Server::apply_additional_edits_for_completion)
88 .add_handler(Server::get_channels)
89 .add_handler(Server::get_users)
90 .add_handler(Server::join_channel)
91 .add_handler(Server::leave_channel)
92 .add_handler(Server::send_channel_message)
93 .add_handler(Server::get_channel_messages);
94
95 Arc::new(server)
96 }
97
98 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
99 where
100 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
101 Fut: 'static + Send + Future<Output = tide::Result<()>>,
102 M: EnvelopedMessage,
103 {
104 let prev_handler = self.handlers.insert(
105 TypeId::of::<M>(),
106 Box::new(move |server, envelope| {
107 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
108 (handler)(server, *envelope).boxed()
109 }),
110 );
111 if prev_handler.is_some() {
112 panic!("registered a handler for the same message twice");
113 }
114 self
115 }
116
117 pub fn handle_connection(
118 self: &Arc<Self>,
119 connection: Connection,
120 addr: String,
121 user_id: UserId,
122 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
123 ) -> impl Future<Output = ()> {
124 let mut this = self.clone();
125 async move {
126 let (connection_id, handle_io, mut incoming_rx) =
127 this.peer.add_connection(connection).await;
128
129 if let Some(send_connection_id) = send_connection_id.as_mut() {
130 let _ = send_connection_id.send(connection_id).await;
131 }
132
133 this.state_mut().add_connection(connection_id, user_id);
134 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
135 log::error!("error updating contacts for {:?}: {}", user_id, err);
136 }
137
138 let handle_io = handle_io.fuse();
139 futures::pin_mut!(handle_io);
140 loop {
141 let next_message = incoming_rx.next().fuse();
142 futures::pin_mut!(next_message);
143 futures::select_biased! {
144 result = handle_io => {
145 if let Err(err) = result {
146 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
147 }
148 break;
149 }
150 message = next_message => {
151 if let Some(message) = message {
152 let start_time = Instant::now();
153 let type_name = message.payload_type_name();
154 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
155 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
156 if let Err(err) = (handler)(this.clone(), message).await {
157 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
158 } else {
159 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
160 }
161
162 if let Some(mut notifications) = this.notifications.clone() {
163 let _ = notifications.send(()).await;
164 }
165 } else {
166 log::warn!("unhandled message: {}", type_name);
167 }
168 } else {
169 log::info!("rpc connection closed {:?}", addr);
170 break;
171 }
172 }
173 }
174 }
175
176 if let Err(err) = this.sign_out(connection_id).await {
177 log::error!("error signing out connection {:?} - {:?}", addr, err);
178 }
179 }
180 }
181
182 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
183 self.peer.disconnect(connection_id);
184 let removed_connection = self.state_mut().remove_connection(connection_id)?;
185
186 for (project_id, project) in removed_connection.hosted_projects {
187 if let Some(share) = project.share {
188 broadcast(
189 connection_id,
190 share.guests.keys().copied().collect(),
191 |conn_id| {
192 self.peer
193 .send(conn_id, proto::UnshareProject { project_id })
194 },
195 )?;
196 }
197 }
198
199 for (project_id, peer_ids) in removed_connection.guest_project_ids {
200 broadcast(connection_id, peer_ids, |conn_id| {
201 self.peer.send(
202 conn_id,
203 proto::RemoveProjectCollaborator {
204 project_id,
205 peer_id: connection_id.0,
206 },
207 )
208 })?;
209 }
210
211 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
212 Ok(())
213 }
214
215 async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
216 self.peer.respond(request.receipt(), proto::Ack {})?;
217 Ok(())
218 }
219
220 async fn register_project(
221 mut self: Arc<Server>,
222 request: TypedEnvelope<proto::RegisterProject>,
223 ) -> tide::Result<()> {
224 let project_id = {
225 let mut state = self.state_mut();
226 let user_id = state.user_id_for_connection(request.sender_id)?;
227 state.register_project(request.sender_id, user_id)
228 };
229 self.peer.respond(
230 request.receipt(),
231 proto::RegisterProjectResponse { project_id },
232 )?;
233 Ok(())
234 }
235
236 async fn unregister_project(
237 mut self: Arc<Server>,
238 request: TypedEnvelope<proto::UnregisterProject>,
239 ) -> tide::Result<()> {
240 let project = self
241 .state_mut()
242 .unregister_project(request.payload.project_id, request.sender_id)
243 .ok_or_else(|| anyhow!("no such project"))?;
244 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
245 Ok(())
246 }
247
248 async fn share_project(
249 mut self: Arc<Server>,
250 request: TypedEnvelope<proto::ShareProject>,
251 ) -> tide::Result<()> {
252 self.state_mut()
253 .share_project(request.payload.project_id, request.sender_id);
254 self.peer.respond(request.receipt(), proto::Ack {})?;
255 Ok(())
256 }
257
258 async fn unshare_project(
259 mut self: Arc<Server>,
260 request: TypedEnvelope<proto::UnshareProject>,
261 ) -> tide::Result<()> {
262 let project_id = request.payload.project_id;
263 let project = self
264 .state_mut()
265 .unshare_project(project_id, request.sender_id)?;
266
267 broadcast(request.sender_id, project.connection_ids, |conn_id| {
268 self.peer
269 .send(conn_id, proto::UnshareProject { project_id })
270 })?;
271 self.update_contacts_for_users(&project.authorized_user_ids)?;
272 Ok(())
273 }
274
275 async fn join_project(
276 mut self: Arc<Server>,
277 request: TypedEnvelope<proto::JoinProject>,
278 ) -> tide::Result<()> {
279 let project_id = request.payload.project_id;
280
281 let user_id = self.state().user_id_for_connection(request.sender_id)?;
282 let response_data = self
283 .state_mut()
284 .join_project(request.sender_id, user_id, project_id)
285 .and_then(|joined| {
286 let share = joined.project.share()?;
287 let peer_count = share.guests.len();
288 let mut collaborators = Vec::with_capacity(peer_count);
289 collaborators.push(proto::Collaborator {
290 peer_id: joined.project.host_connection_id.0,
291 replica_id: 0,
292 user_id: joined.project.host_user_id.to_proto(),
293 });
294 let worktrees = joined
295 .project
296 .worktrees
297 .iter()
298 .filter_map(|(id, worktree)| {
299 worktree.share.as_ref().map(|share| proto::Worktree {
300 id: *id,
301 root_name: worktree.root_name.clone(),
302 entries: share.entries.values().cloned().collect(),
303 diagnostic_summaries: share
304 .diagnostic_summaries
305 .values()
306 .cloned()
307 .collect(),
308 weak: worktree.weak,
309 })
310 })
311 .collect();
312 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
313 if *peer_conn_id != request.sender_id {
314 collaborators.push(proto::Collaborator {
315 peer_id: peer_conn_id.0,
316 replica_id: *peer_replica_id as u32,
317 user_id: peer_user_id.to_proto(),
318 });
319 }
320 }
321 let response = proto::JoinProjectResponse {
322 worktrees,
323 replica_id: joined.replica_id as u32,
324 collaborators,
325 };
326 let connection_ids = joined.project.connection_ids();
327 let contact_user_ids = joined.project.authorized_user_ids();
328 Ok((response, connection_ids, contact_user_ids))
329 });
330
331 match response_data {
332 Ok((response, connection_ids, contact_user_ids)) => {
333 broadcast(request.sender_id, connection_ids, |conn_id| {
334 self.peer.send(
335 conn_id,
336 proto::AddProjectCollaborator {
337 project_id,
338 collaborator: Some(proto::Collaborator {
339 peer_id: request.sender_id.0,
340 replica_id: response.replica_id,
341 user_id: user_id.to_proto(),
342 }),
343 },
344 )
345 })?;
346 self.peer.respond(request.receipt(), response)?;
347 self.update_contacts_for_users(&contact_user_ids)?;
348 }
349 Err(error) => {
350 self.peer.respond_with_error(
351 request.receipt(),
352 proto::Error {
353 message: error.to_string(),
354 },
355 )?;
356 }
357 }
358
359 Ok(())
360 }
361
362 async fn leave_project(
363 mut self: Arc<Server>,
364 request: TypedEnvelope<proto::LeaveProject>,
365 ) -> tide::Result<()> {
366 let sender_id = request.sender_id;
367 let project_id = request.payload.project_id;
368 let worktree = self.state_mut().leave_project(sender_id, project_id);
369 if let Some(worktree) = worktree {
370 broadcast(sender_id, worktree.connection_ids, |conn_id| {
371 self.peer.send(
372 conn_id,
373 proto::RemoveProjectCollaborator {
374 project_id,
375 peer_id: sender_id.0,
376 },
377 )
378 })?;
379 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
380 }
381 Ok(())
382 }
383
384 async fn register_worktree(
385 mut self: Arc<Server>,
386 request: TypedEnvelope<proto::RegisterWorktree>,
387 ) -> tide::Result<()> {
388 let receipt = request.receipt();
389 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
390
391 let mut contact_user_ids = HashSet::default();
392 contact_user_ids.insert(host_user_id);
393 for github_login in request.payload.authorized_logins {
394 match self.app_state.db.create_user(&github_login, false).await {
395 Ok(contact_user_id) => {
396 contact_user_ids.insert(contact_user_id);
397 }
398 Err(err) => {
399 let message = err.to_string();
400 self.peer
401 .respond_with_error(receipt, proto::Error { message })?;
402 return Ok(());
403 }
404 }
405 }
406
407 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
408 let ok = self.state_mut().register_worktree(
409 request.payload.project_id,
410 request.payload.worktree_id,
411 Worktree {
412 authorized_user_ids: contact_user_ids.clone(),
413 root_name: request.payload.root_name,
414 share: None,
415 weak: false,
416 },
417 );
418
419 if ok {
420 self.peer.respond(receipt, proto::Ack {})?;
421 self.update_contacts_for_users(&contact_user_ids)?;
422 } else {
423 self.peer.respond_with_error(
424 receipt,
425 proto::Error {
426 message: NO_SUCH_PROJECT.to_string(),
427 },
428 )?;
429 }
430
431 Ok(())
432 }
433
434 async fn unregister_worktree(
435 mut self: Arc<Server>,
436 request: TypedEnvelope<proto::UnregisterWorktree>,
437 ) -> tide::Result<()> {
438 let project_id = request.payload.project_id;
439 let worktree_id = request.payload.worktree_id;
440 let (worktree, guest_connection_ids) =
441 self.state_mut()
442 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
443 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
444 self.peer.send(
445 conn_id,
446 proto::UnregisterWorktree {
447 project_id,
448 worktree_id,
449 },
450 )
451 })?;
452 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
453 Ok(())
454 }
455
456 async fn share_worktree(
457 mut self: Arc<Server>,
458 mut request: TypedEnvelope<proto::ShareWorktree>,
459 ) -> tide::Result<()> {
460 let worktree = request
461 .payload
462 .worktree
463 .as_mut()
464 .ok_or_else(|| anyhow!("missing worktree"))?;
465 let entries = worktree
466 .entries
467 .iter()
468 .map(|entry| (entry.id, entry.clone()))
469 .collect();
470 let diagnostic_summaries = worktree
471 .diagnostic_summaries
472 .iter()
473 .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
474 .collect();
475
476 let shared_worktree = self.state_mut().share_worktree(
477 request.payload.project_id,
478 worktree.id,
479 request.sender_id,
480 entries,
481 diagnostic_summaries,
482 );
483 if let Some(shared_worktree) = shared_worktree {
484 broadcast(
485 request.sender_id,
486 shared_worktree.connection_ids,
487 |connection_id| {
488 self.peer.forward_send(
489 request.sender_id,
490 connection_id,
491 request.payload.clone(),
492 )
493 },
494 )?;
495 self.peer.respond(request.receipt(), proto::Ack {})?;
496 self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
497 } else {
498 self.peer.respond_with_error(
499 request.receipt(),
500 proto::Error {
501 message: "no such worktree".to_string(),
502 },
503 )?;
504 }
505 Ok(())
506 }
507
508 async fn update_worktree(
509 mut self: Arc<Server>,
510 request: TypedEnvelope<proto::UpdateWorktree>,
511 ) -> tide::Result<()> {
512 let connection_ids = self
513 .state_mut()
514 .update_worktree(
515 request.sender_id,
516 request.payload.project_id,
517 request.payload.worktree_id,
518 &request.payload.removed_entries,
519 &request.payload.updated_entries,
520 )
521 .ok_or_else(|| anyhow!("no such worktree"))?;
522
523 broadcast(request.sender_id, connection_ids, |connection_id| {
524 self.peer
525 .forward_send(request.sender_id, connection_id, request.payload.clone())
526 })?;
527
528 Ok(())
529 }
530
531 async fn update_diagnostic_summary(
532 mut self: Arc<Server>,
533 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
534 ) -> tide::Result<()> {
535 let receiver_ids = request
536 .payload
537 .summary
538 .clone()
539 .and_then(|summary| {
540 self.state_mut().update_diagnostic_summary(
541 request.payload.project_id,
542 request.payload.worktree_id,
543 request.sender_id,
544 summary,
545 )
546 })
547 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
548
549 broadcast(request.sender_id, receiver_ids, |connection_id| {
550 self.peer
551 .forward_send(request.sender_id, connection_id, request.payload.clone())
552 })?;
553 Ok(())
554 }
555
556 async fn disk_based_diagnostics_updating(
557 self: Arc<Server>,
558 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
559 ) -> tide::Result<()> {
560 let receiver_ids = self
561 .state()
562 .project_connection_ids(request.payload.project_id, request.sender_id)
563 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
564 broadcast(request.sender_id, receiver_ids, |connection_id| {
565 self.peer
566 .forward_send(request.sender_id, connection_id, request.payload.clone())
567 })?;
568 Ok(())
569 }
570
571 async fn disk_based_diagnostics_updated(
572 self: Arc<Server>,
573 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
574 ) -> tide::Result<()> {
575 let receiver_ids = self
576 .state()
577 .project_connection_ids(request.payload.project_id, request.sender_id)
578 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
579 broadcast(request.sender_id, receiver_ids, |connection_id| {
580 self.peer
581 .forward_send(request.sender_id, connection_id, request.payload.clone())
582 })?;
583 Ok(())
584 }
585
586 async fn get_definition(
587 self: Arc<Server>,
588 request: TypedEnvelope<proto::GetDefinition>,
589 ) -> tide::Result<()> {
590 let receipt = request.receipt();
591 let host_connection_id = self
592 .state()
593 .read_project(request.payload.project_id, request.sender_id)
594 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
595 .host_connection_id;
596 let response = self
597 .peer
598 .forward_request(request.sender_id, host_connection_id, request.payload)
599 .await?;
600 self.peer.respond(receipt, response)?;
601 Ok(())
602 }
603
604 async fn open_buffer(
605 self: Arc<Server>,
606 request: TypedEnvelope<proto::OpenBuffer>,
607 ) -> tide::Result<()> {
608 let receipt = request.receipt();
609 let host_connection_id = self
610 .state()
611 .read_project(request.payload.project_id, request.sender_id)
612 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
613 .host_connection_id;
614 let response = self
615 .peer
616 .forward_request(request.sender_id, host_connection_id, request.payload)
617 .await?;
618 self.peer.respond(receipt, response)?;
619 Ok(())
620 }
621
622 async fn close_buffer(
623 self: Arc<Server>,
624 request: TypedEnvelope<proto::CloseBuffer>,
625 ) -> tide::Result<()> {
626 let host_connection_id = self
627 .state()
628 .read_project(request.payload.project_id, request.sender_id)
629 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
630 .host_connection_id;
631 self.peer
632 .forward_send(request.sender_id, host_connection_id, request.payload)?;
633 Ok(())
634 }
635
636 async fn save_buffer(
637 self: Arc<Server>,
638 request: TypedEnvelope<proto::SaveBuffer>,
639 ) -> tide::Result<()> {
640 let host;
641 let guests;
642 {
643 let state = self.state();
644 let project = state
645 .read_project(request.payload.project_id, request.sender_id)
646 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
647 host = project.host_connection_id;
648 guests = project.guest_connection_ids()
649 }
650
651 let sender = request.sender_id;
652 let receipt = request.receipt();
653 let response = self
654 .peer
655 .forward_request(sender, host, request.payload.clone())
656 .await?;
657
658 broadcast(host, guests, |conn_id| {
659 let response = response.clone();
660 if conn_id == sender {
661 self.peer.respond(receipt, response)
662 } else {
663 self.peer.forward_send(host, conn_id, response)
664 }
665 })?;
666
667 Ok(())
668 }
669
670 async fn format_buffer(
671 self: Arc<Server>,
672 request: TypedEnvelope<proto::FormatBuffer>,
673 ) -> tide::Result<()> {
674 let host;
675 {
676 let state = self.state();
677 let project = state
678 .read_project(request.payload.project_id, request.sender_id)
679 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
680 host = project.host_connection_id;
681 }
682
683 let sender = request.sender_id;
684 let receipt = request.receipt();
685 let response = self
686 .peer
687 .forward_request(sender, host, request.payload.clone())
688 .await?;
689 self.peer.respond(receipt, response)?;
690
691 Ok(())
692 }
693
694 async fn get_completions(
695 self: Arc<Server>,
696 request: TypedEnvelope<proto::GetCompletions>,
697 ) -> tide::Result<()> {
698 let host;
699 {
700 let state = self.state();
701 let project = state
702 .read_project(request.payload.project_id, request.sender_id)
703 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
704 host = project.host_connection_id;
705 }
706
707 let sender = request.sender_id;
708 let receipt = request.receipt();
709 let response = self
710 .peer
711 .forward_request(sender, host, request.payload.clone())
712 .await?;
713 self.peer.respond(receipt, response)?;
714 Ok(())
715 }
716
717 async fn apply_additional_edits_for_completion(
718 self: Arc<Server>,
719 request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
720 ) -> tide::Result<()> {
721 let host;
722 {
723 let state = self.state();
724 let project = state
725 .read_project(request.payload.project_id, request.sender_id)
726 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
727 host = project.host_connection_id;
728 }
729
730 let sender = request.sender_id;
731 let receipt = request.receipt();
732 let response = self
733 .peer
734 .forward_request(sender, host, request.payload.clone())
735 .await?;
736 self.peer.respond(receipt, response)?;
737 Ok(())
738 }
739
740 async fn update_buffer(
741 self: Arc<Server>,
742 request: TypedEnvelope<proto::UpdateBuffer>,
743 ) -> tide::Result<()> {
744 let receiver_ids = self
745 .state()
746 .project_connection_ids(request.payload.project_id, request.sender_id)
747 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
748 broadcast(request.sender_id, receiver_ids, |connection_id| {
749 self.peer
750 .forward_send(request.sender_id, connection_id, request.payload.clone())
751 })?;
752 self.peer.respond(request.receipt(), proto::Ack {})?;
753 Ok(())
754 }
755
756 async fn update_buffer_file(
757 self: Arc<Server>,
758 request: TypedEnvelope<proto::UpdateBufferFile>,
759 ) -> tide::Result<()> {
760 let receiver_ids = self
761 .state()
762 .project_connection_ids(request.payload.project_id, request.sender_id)
763 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
764 broadcast(request.sender_id, receiver_ids, |connection_id| {
765 self.peer
766 .forward_send(request.sender_id, connection_id, request.payload.clone())
767 })?;
768 Ok(())
769 }
770
771 async fn buffer_reloaded(
772 self: Arc<Server>,
773 request: TypedEnvelope<proto::BufferReloaded>,
774 ) -> tide::Result<()> {
775 let receiver_ids = self
776 .state()
777 .project_connection_ids(request.payload.project_id, request.sender_id)
778 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
779 broadcast(request.sender_id, receiver_ids, |connection_id| {
780 self.peer
781 .forward_send(request.sender_id, connection_id, request.payload.clone())
782 })?;
783 Ok(())
784 }
785
786 async fn buffer_saved(
787 self: Arc<Server>,
788 request: TypedEnvelope<proto::BufferSaved>,
789 ) -> tide::Result<()> {
790 let receiver_ids = self
791 .state()
792 .project_connection_ids(request.payload.project_id, request.sender_id)
793 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
794 broadcast(request.sender_id, receiver_ids, |connection_id| {
795 self.peer
796 .forward_send(request.sender_id, connection_id, request.payload.clone())
797 })?;
798 Ok(())
799 }
800
801 async fn get_channels(
802 self: Arc<Server>,
803 request: TypedEnvelope<proto::GetChannels>,
804 ) -> tide::Result<()> {
805 let user_id = self.state().user_id_for_connection(request.sender_id)?;
806 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
807 self.peer.respond(
808 request.receipt(),
809 proto::GetChannelsResponse {
810 channels: channels
811 .into_iter()
812 .map(|chan| proto::Channel {
813 id: chan.id.to_proto(),
814 name: chan.name,
815 })
816 .collect(),
817 },
818 )?;
819 Ok(())
820 }
821
822 async fn get_users(
823 self: Arc<Server>,
824 request: TypedEnvelope<proto::GetUsers>,
825 ) -> tide::Result<()> {
826 let receipt = request.receipt();
827 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
828 let users = self
829 .app_state
830 .db
831 .get_users_by_ids(user_ids)
832 .await?
833 .into_iter()
834 .map(|user| proto::User {
835 id: user.id.to_proto(),
836 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
837 github_login: user.github_login,
838 })
839 .collect();
840 self.peer
841 .respond(receipt, proto::GetUsersResponse { users })?;
842 Ok(())
843 }
844
845 fn update_contacts_for_users<'a>(
846 self: &Arc<Server>,
847 user_ids: impl IntoIterator<Item = &'a UserId>,
848 ) -> anyhow::Result<()> {
849 let mut result = Ok(());
850 let state = self.state();
851 for user_id in user_ids {
852 let contacts = state.contacts_for_user(*user_id);
853 for connection_id in state.connection_ids_for_user(*user_id) {
854 if let Err(error) = self.peer.send(
855 connection_id,
856 proto::UpdateContacts {
857 contacts: contacts.clone(),
858 },
859 ) {
860 result = Err(error);
861 }
862 }
863 }
864 result
865 }
866
867 async fn join_channel(
868 mut self: Arc<Self>,
869 request: TypedEnvelope<proto::JoinChannel>,
870 ) -> tide::Result<()> {
871 let user_id = self.state().user_id_for_connection(request.sender_id)?;
872 let channel_id = ChannelId::from_proto(request.payload.channel_id);
873 if !self
874 .app_state
875 .db
876 .can_user_access_channel(user_id, channel_id)
877 .await?
878 {
879 Err(anyhow!("access denied"))?;
880 }
881
882 self.state_mut().join_channel(request.sender_id, channel_id);
883 let messages = self
884 .app_state
885 .db
886 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
887 .await?
888 .into_iter()
889 .map(|msg| proto::ChannelMessage {
890 id: msg.id.to_proto(),
891 body: msg.body,
892 timestamp: msg.sent_at.unix_timestamp() as u64,
893 sender_id: msg.sender_id.to_proto(),
894 nonce: Some(msg.nonce.as_u128().into()),
895 })
896 .collect::<Vec<_>>();
897 self.peer.respond(
898 request.receipt(),
899 proto::JoinChannelResponse {
900 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
901 messages,
902 },
903 )?;
904 Ok(())
905 }
906
907 async fn leave_channel(
908 mut self: Arc<Self>,
909 request: TypedEnvelope<proto::LeaveChannel>,
910 ) -> tide::Result<()> {
911 let user_id = self.state().user_id_for_connection(request.sender_id)?;
912 let channel_id = ChannelId::from_proto(request.payload.channel_id);
913 if !self
914 .app_state
915 .db
916 .can_user_access_channel(user_id, channel_id)
917 .await?
918 {
919 Err(anyhow!("access denied"))?;
920 }
921
922 self.state_mut()
923 .leave_channel(request.sender_id, channel_id);
924
925 Ok(())
926 }
927
928 async fn send_channel_message(
929 self: Arc<Self>,
930 request: TypedEnvelope<proto::SendChannelMessage>,
931 ) -> tide::Result<()> {
932 let receipt = request.receipt();
933 let channel_id = ChannelId::from_proto(request.payload.channel_id);
934 let user_id;
935 let connection_ids;
936 {
937 let state = self.state();
938 user_id = state.user_id_for_connection(request.sender_id)?;
939 if let Some(ids) = state.channel_connection_ids(channel_id) {
940 connection_ids = ids;
941 } else {
942 return Ok(());
943 }
944 }
945
946 // Validate the message body.
947 let body = request.payload.body.trim().to_string();
948 if body.len() > MAX_MESSAGE_LEN {
949 self.peer.respond_with_error(
950 receipt,
951 proto::Error {
952 message: "message is too long".to_string(),
953 },
954 )?;
955 return Ok(());
956 }
957 if body.is_empty() {
958 self.peer.respond_with_error(
959 receipt,
960 proto::Error {
961 message: "message can't be blank".to_string(),
962 },
963 )?;
964 return Ok(());
965 }
966
967 let timestamp = OffsetDateTime::now_utc();
968 let nonce = if let Some(nonce) = request.payload.nonce {
969 nonce
970 } else {
971 self.peer.respond_with_error(
972 receipt,
973 proto::Error {
974 message: "nonce can't be blank".to_string(),
975 },
976 )?;
977 return Ok(());
978 };
979
980 let message_id = self
981 .app_state
982 .db
983 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
984 .await?
985 .to_proto();
986 let message = proto::ChannelMessage {
987 sender_id: user_id.to_proto(),
988 id: message_id,
989 body,
990 timestamp: timestamp.unix_timestamp() as u64,
991 nonce: Some(nonce),
992 };
993 broadcast(request.sender_id, connection_ids, |conn_id| {
994 self.peer.send(
995 conn_id,
996 proto::ChannelMessageSent {
997 channel_id: channel_id.to_proto(),
998 message: Some(message.clone()),
999 },
1000 )
1001 })?;
1002 self.peer.respond(
1003 receipt,
1004 proto::SendChannelMessageResponse {
1005 message: Some(message),
1006 },
1007 )?;
1008 Ok(())
1009 }
1010
1011 async fn get_channel_messages(
1012 self: Arc<Self>,
1013 request: TypedEnvelope<proto::GetChannelMessages>,
1014 ) -> tide::Result<()> {
1015 let user_id = self.state().user_id_for_connection(request.sender_id)?;
1016 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1017 if !self
1018 .app_state
1019 .db
1020 .can_user_access_channel(user_id, channel_id)
1021 .await?
1022 {
1023 Err(anyhow!("access denied"))?;
1024 }
1025
1026 let messages = self
1027 .app_state
1028 .db
1029 .get_channel_messages(
1030 channel_id,
1031 MESSAGE_COUNT_PER_PAGE,
1032 Some(MessageId::from_proto(request.payload.before_message_id)),
1033 )
1034 .await?
1035 .into_iter()
1036 .map(|msg| proto::ChannelMessage {
1037 id: msg.id.to_proto(),
1038 body: msg.body,
1039 timestamp: msg.sent_at.unix_timestamp() as u64,
1040 sender_id: msg.sender_id.to_proto(),
1041 nonce: Some(msg.nonce.as_u128().into()),
1042 })
1043 .collect::<Vec<_>>();
1044 self.peer.respond(
1045 request.receipt(),
1046 proto::GetChannelMessagesResponse {
1047 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1048 messages,
1049 },
1050 )?;
1051 Ok(())
1052 }
1053
1054 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1055 self.store.read()
1056 }
1057
1058 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1059 self.store.write()
1060 }
1061}
1062
1063fn broadcast<F>(
1064 sender_id: ConnectionId,
1065 receiver_ids: Vec<ConnectionId>,
1066 mut f: F,
1067) -> anyhow::Result<()>
1068where
1069 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1070{
1071 let mut result = Ok(());
1072 for receiver_id in receiver_ids {
1073 if receiver_id != sender_id {
1074 if let Err(error) = f(receiver_id) {
1075 if result.is_ok() {
1076 result = Err(error);
1077 }
1078 }
1079 }
1080 }
1081 result
1082}
1083
1084pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1085 let server = Server::new(app.state().clone(), rpc.clone(), None);
1086 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1087 let server = server.clone();
1088 async move {
1089 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1090
1091 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1092 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1093 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1094 let client_protocol_version: Option<u32> = request
1095 .header("X-Zed-Protocol-Version")
1096 .and_then(|v| v.as_str().parse().ok());
1097
1098 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1099 return Ok(Response::new(StatusCode::UpgradeRequired));
1100 }
1101
1102 let header = match request.header("Sec-Websocket-Key") {
1103 Some(h) => h.as_str(),
1104 None => return Err(anyhow!("expected sec-websocket-key"))?,
1105 };
1106
1107 let user_id = process_auth_header(&request).await?;
1108
1109 let mut response = Response::new(StatusCode::SwitchingProtocols);
1110 response.insert_header(UPGRADE, "websocket");
1111 response.insert_header(CONNECTION, "Upgrade");
1112 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1113 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1114 response.insert_header("Sec-Websocket-Version", "13");
1115
1116 let http_res: &mut tide::http::Response = response.as_mut();
1117 let upgrade_receiver = http_res.recv_upgrade().await;
1118 let addr = request.remote().unwrap_or("unknown").to_string();
1119 task::spawn(async move {
1120 if let Some(stream) = upgrade_receiver.await {
1121 server
1122 .handle_connection(
1123 Connection::new(
1124 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1125 ),
1126 addr,
1127 user_id,
1128 None,
1129 )
1130 .await;
1131 }
1132 });
1133
1134 Ok(response)
1135 }
1136 });
1137}
1138
1139fn header_contains_ignore_case<T>(
1140 request: &tide::Request<T>,
1141 header_name: HeaderName,
1142 value: &str,
1143) -> bool {
1144 request
1145 .header(header_name)
1146 .map(|h| {
1147 h.as_str()
1148 .split(',')
1149 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1150 })
1151 .unwrap_or(false)
1152}
1153
1154#[cfg(test)]
1155mod tests {
1156 use super::*;
1157 use crate::{
1158 auth,
1159 db::{tests::TestDb, UserId},
1160 github, AppState, Config,
1161 };
1162 use ::rpc::Peer;
1163 use async_std::task;
1164 use gpui::{executor, ModelHandle, TestAppContext};
1165 use parking_lot::Mutex;
1166 use postage::{mpsc, watch};
1167 use rand::prelude::*;
1168 use rpc::PeerId;
1169 use serde_json::json;
1170 use sqlx::types::time::OffsetDateTime;
1171 use std::{
1172 ops::Deref,
1173 path::Path,
1174 rc::Rc,
1175 sync::{
1176 atomic::{AtomicBool, Ordering::SeqCst},
1177 Arc,
1178 },
1179 time::Duration,
1180 };
1181 use zed::{
1182 client::{
1183 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1184 EstablishConnectionError, UserStore,
1185 },
1186 editor::{Editor, EditorSettings, Input, MultiBuffer},
1187 fs::{FakeFs, Fs as _},
1188 language::{
1189 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1190 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1191 },
1192 lsp,
1193 project::{DiagnosticSummary, Project, ProjectPath},
1194 };
1195
1196 #[cfg(test)]
1197 #[ctor::ctor]
1198 fn init_logger() {
1199 // std::env::set_var("RUST_LOG", "info");
1200 env_logger::init();
1201 }
1202
1203 #[gpui::test]
1204 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1205 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1206 let lang_registry = Arc::new(LanguageRegistry::new());
1207 let fs = Arc::new(FakeFs::new(cx_a.background()));
1208 cx_a.foreground().forbid_parking();
1209
1210 // Connect to a server as 2 clients.
1211 let mut server = TestServer::start(cx_a.foreground()).await;
1212 let client_a = server.create_client(&mut cx_a, "user_a").await;
1213 let client_b = server.create_client(&mut cx_b, "user_b").await;
1214
1215 // Share a project as client A
1216 fs.insert_tree(
1217 "/a",
1218 json!({
1219 ".zed.toml": r#"collaborators = ["user_b"]"#,
1220 "a.txt": "a-contents",
1221 "b.txt": "b-contents",
1222 }),
1223 )
1224 .await;
1225 let project_a = cx_a.update(|cx| {
1226 Project::local(
1227 client_a.clone(),
1228 client_a.user_store.clone(),
1229 lang_registry.clone(),
1230 fs.clone(),
1231 cx,
1232 )
1233 });
1234 let (worktree_a, _) = project_a
1235 .update(&mut cx_a, |p, cx| {
1236 p.find_or_create_local_worktree("/a", false, cx)
1237 })
1238 .await
1239 .unwrap();
1240 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1241 worktree_a
1242 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1243 .await;
1244 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1245 project_a
1246 .update(&mut cx_a, |p, cx| p.share(cx))
1247 .await
1248 .unwrap();
1249
1250 // Join that project as client B
1251 let project_b = Project::remote(
1252 project_id,
1253 client_b.clone(),
1254 client_b.user_store.clone(),
1255 lang_registry.clone(),
1256 fs.clone(),
1257 &mut cx_b.to_async(),
1258 )
1259 .await
1260 .unwrap();
1261
1262 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1263 assert_eq!(
1264 project
1265 .collaborators()
1266 .get(&client_a.peer_id)
1267 .unwrap()
1268 .user
1269 .github_login,
1270 "user_a"
1271 );
1272 project.replica_id()
1273 });
1274 project_a
1275 .condition(&cx_a, |tree, _| {
1276 tree.collaborators()
1277 .get(&client_b.peer_id)
1278 .map_or(false, |collaborator| {
1279 collaborator.replica_id == replica_id_b
1280 && collaborator.user.github_login == "user_b"
1281 })
1282 })
1283 .await;
1284
1285 // Open the same file as client B and client A.
1286 let buffer_b = project_b
1287 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1288 .await
1289 .unwrap();
1290 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1291 buffer_b.read_with(&cx_b, |buf, cx| {
1292 assert_eq!(buf.read(cx).text(), "b-contents")
1293 });
1294 project_a.read_with(&cx_a, |project, cx| {
1295 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1296 });
1297 let buffer_a = project_a
1298 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1299 .await
1300 .unwrap();
1301
1302 let editor_b = cx_b.add_view(window_b, |cx| {
1303 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1304 });
1305
1306 // TODO
1307 // // Create a selection set as client B and see that selection set as client A.
1308 // buffer_a
1309 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1310 // .await;
1311
1312 // Edit the buffer as client B and see that edit as client A.
1313 editor_b.update(&mut cx_b, |editor, cx| {
1314 editor.handle_input(&Input("ok, ".into()), cx)
1315 });
1316 buffer_a
1317 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1318 .await;
1319
1320 // TODO
1321 // // Remove the selection set as client B, see those selections disappear as client A.
1322 cx_b.update(move |_| drop(editor_b));
1323 // buffer_a
1324 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1325 // .await;
1326
1327 // Close the buffer as client A, see that the buffer is closed.
1328 cx_a.update(move |_| drop(buffer_a));
1329 project_a
1330 .condition(&cx_a, |project, cx| {
1331 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1332 })
1333 .await;
1334
1335 // Dropping the client B's project removes client B from client A's collaborators.
1336 cx_b.update(move |_| drop(project_b));
1337 project_a
1338 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1339 .await;
1340 }
1341
1342 #[gpui::test]
1343 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1344 let lang_registry = Arc::new(LanguageRegistry::new());
1345 let fs = Arc::new(FakeFs::new(cx_a.background()));
1346 cx_a.foreground().forbid_parking();
1347
1348 // Connect to a server as 2 clients.
1349 let mut server = TestServer::start(cx_a.foreground()).await;
1350 let client_a = server.create_client(&mut cx_a, "user_a").await;
1351 let client_b = server.create_client(&mut cx_b, "user_b").await;
1352
1353 // Share a project as client A
1354 fs.insert_tree(
1355 "/a",
1356 json!({
1357 ".zed.toml": r#"collaborators = ["user_b"]"#,
1358 "a.txt": "a-contents",
1359 "b.txt": "b-contents",
1360 }),
1361 )
1362 .await;
1363 let project_a = cx_a.update(|cx| {
1364 Project::local(
1365 client_a.clone(),
1366 client_a.user_store.clone(),
1367 lang_registry.clone(),
1368 fs.clone(),
1369 cx,
1370 )
1371 });
1372 let (worktree_a, _) = project_a
1373 .update(&mut cx_a, |p, cx| {
1374 p.find_or_create_local_worktree("/a", false, cx)
1375 })
1376 .await
1377 .unwrap();
1378 worktree_a
1379 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1380 .await;
1381 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1382 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1383 project_a
1384 .update(&mut cx_a, |p, cx| p.share(cx))
1385 .await
1386 .unwrap();
1387 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1388
1389 // Join that project as client B
1390 let project_b = Project::remote(
1391 project_id,
1392 client_b.clone(),
1393 client_b.user_store.clone(),
1394 lang_registry.clone(),
1395 fs.clone(),
1396 &mut cx_b.to_async(),
1397 )
1398 .await
1399 .unwrap();
1400 project_b
1401 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1402 .await
1403 .unwrap();
1404
1405 // Unshare the project as client A
1406 project_a
1407 .update(&mut cx_a, |project, cx| project.unshare(cx))
1408 .await
1409 .unwrap();
1410 project_b
1411 .condition(&mut cx_b, |project, _| project.is_read_only())
1412 .await;
1413 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1414 drop(project_b);
1415
1416 // Share the project again and ensure guests can still join.
1417 project_a
1418 .update(&mut cx_a, |project, cx| project.share(cx))
1419 .await
1420 .unwrap();
1421 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1422
1423 let project_c = Project::remote(
1424 project_id,
1425 client_b.clone(),
1426 client_b.user_store.clone(),
1427 lang_registry.clone(),
1428 fs.clone(),
1429 &mut cx_b.to_async(),
1430 )
1431 .await
1432 .unwrap();
1433 project_c
1434 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1435 .await
1436 .unwrap();
1437 }
1438
1439 #[gpui::test]
1440 async fn test_propagate_saves_and_fs_changes(
1441 mut cx_a: TestAppContext,
1442 mut cx_b: TestAppContext,
1443 mut cx_c: TestAppContext,
1444 ) {
1445 let lang_registry = Arc::new(LanguageRegistry::new());
1446 let fs = Arc::new(FakeFs::new(cx_a.background()));
1447 cx_a.foreground().forbid_parking();
1448
1449 // Connect to a server as 3 clients.
1450 let mut server = TestServer::start(cx_a.foreground()).await;
1451 let client_a = server.create_client(&mut cx_a, "user_a").await;
1452 let client_b = server.create_client(&mut cx_b, "user_b").await;
1453 let client_c = server.create_client(&mut cx_c, "user_c").await;
1454
1455 // Share a worktree as client A.
1456 fs.insert_tree(
1457 "/a",
1458 json!({
1459 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1460 "file1": "",
1461 "file2": ""
1462 }),
1463 )
1464 .await;
1465 let project_a = cx_a.update(|cx| {
1466 Project::local(
1467 client_a.clone(),
1468 client_a.user_store.clone(),
1469 lang_registry.clone(),
1470 fs.clone(),
1471 cx,
1472 )
1473 });
1474 let (worktree_a, _) = project_a
1475 .update(&mut cx_a, |p, cx| {
1476 p.find_or_create_local_worktree("/a", false, cx)
1477 })
1478 .await
1479 .unwrap();
1480 worktree_a
1481 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1482 .await;
1483 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1484 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1485 project_a
1486 .update(&mut cx_a, |p, cx| p.share(cx))
1487 .await
1488 .unwrap();
1489
1490 // Join that worktree as clients B and C.
1491 let project_b = Project::remote(
1492 project_id,
1493 client_b.clone(),
1494 client_b.user_store.clone(),
1495 lang_registry.clone(),
1496 fs.clone(),
1497 &mut cx_b.to_async(),
1498 )
1499 .await
1500 .unwrap();
1501 let project_c = Project::remote(
1502 project_id,
1503 client_c.clone(),
1504 client_c.user_store.clone(),
1505 lang_registry.clone(),
1506 fs.clone(),
1507 &mut cx_c.to_async(),
1508 )
1509 .await
1510 .unwrap();
1511 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1512 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1513
1514 // Open and edit a buffer as both guests B and C.
1515 let buffer_b = project_b
1516 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1517 .await
1518 .unwrap();
1519 let buffer_c = project_c
1520 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1521 .await
1522 .unwrap();
1523 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1524 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1525
1526 // Open and edit that buffer as the host.
1527 let buffer_a = project_a
1528 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1529 .await
1530 .unwrap();
1531
1532 buffer_a
1533 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1534 .await;
1535 buffer_a.update(&mut cx_a, |buf, cx| {
1536 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1537 });
1538
1539 // Wait for edits to propagate
1540 buffer_a
1541 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1542 .await;
1543 buffer_b
1544 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1545 .await;
1546 buffer_c
1547 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1548 .await;
1549
1550 // Edit the buffer as the host and concurrently save as guest B.
1551 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1552 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1553 save_b.await.unwrap();
1554 assert_eq!(
1555 fs.load("/a/file1".as_ref()).await.unwrap(),
1556 "hi-a, i-am-c, i-am-b, i-am-a"
1557 );
1558 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1559 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1560 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1561
1562 // Make changes on host's file system, see those changes on guest worktrees.
1563 fs.rename("/a/file1".as_ref(), "/a/file1-renamed".as_ref())
1564 .await
1565 .unwrap();
1566 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1567 .await
1568 .unwrap();
1569 fs.insert_file(Path::new("/a/file4"), "4".into())
1570 .await
1571 .unwrap();
1572
1573 worktree_a
1574 .condition(&cx_a, |tree, _| tree.file_count() == 4)
1575 .await;
1576 worktree_b
1577 .condition(&cx_b, |tree, _| tree.file_count() == 4)
1578 .await;
1579 worktree_c
1580 .condition(&cx_c, |tree, _| tree.file_count() == 4)
1581 .await;
1582 worktree_a.read_with(&cx_a, |tree, _| {
1583 assert_eq!(
1584 tree.paths()
1585 .map(|p| p.to_string_lossy())
1586 .collect::<Vec<_>>(),
1587 &[".zed.toml", "file1-renamed", "file3", "file4"]
1588 )
1589 });
1590 worktree_b.read_with(&cx_b, |tree, _| {
1591 assert_eq!(
1592 tree.paths()
1593 .map(|p| p.to_string_lossy())
1594 .collect::<Vec<_>>(),
1595 &[".zed.toml", "file1-renamed", "file3", "file4"]
1596 )
1597 });
1598 worktree_c.read_with(&cx_c, |tree, _| {
1599 assert_eq!(
1600 tree.paths()
1601 .map(|p| p.to_string_lossy())
1602 .collect::<Vec<_>>(),
1603 &[".zed.toml", "file1-renamed", "file3", "file4"]
1604 )
1605 });
1606
1607 // Ensure buffer files are updated as well.
1608 buffer_a
1609 .condition(&cx_a, |buf, _| {
1610 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1611 })
1612 .await;
1613 buffer_b
1614 .condition(&cx_b, |buf, _| {
1615 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1616 })
1617 .await;
1618 buffer_c
1619 .condition(&cx_c, |buf, _| {
1620 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1621 })
1622 .await;
1623 }
1624
1625 #[gpui::test]
1626 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1627 cx_a.foreground().forbid_parking();
1628 let lang_registry = Arc::new(LanguageRegistry::new());
1629 let fs = Arc::new(FakeFs::new(cx_a.background()));
1630
1631 // Connect to a server as 2 clients.
1632 let mut server = TestServer::start(cx_a.foreground()).await;
1633 let client_a = server.create_client(&mut cx_a, "user_a").await;
1634 let client_b = server.create_client(&mut cx_b, "user_b").await;
1635
1636 // Share a project as client A
1637 fs.insert_tree(
1638 "/dir",
1639 json!({
1640 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1641 "a.txt": "a-contents",
1642 }),
1643 )
1644 .await;
1645
1646 let project_a = cx_a.update(|cx| {
1647 Project::local(
1648 client_a.clone(),
1649 client_a.user_store.clone(),
1650 lang_registry.clone(),
1651 fs.clone(),
1652 cx,
1653 )
1654 });
1655 let (worktree_a, _) = project_a
1656 .update(&mut cx_a, |p, cx| {
1657 p.find_or_create_local_worktree("/dir", false, cx)
1658 })
1659 .await
1660 .unwrap();
1661 worktree_a
1662 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1663 .await;
1664 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1665 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1666 project_a
1667 .update(&mut cx_a, |p, cx| p.share(cx))
1668 .await
1669 .unwrap();
1670
1671 // Join that project as client B
1672 let project_b = Project::remote(
1673 project_id,
1674 client_b.clone(),
1675 client_b.user_store.clone(),
1676 lang_registry.clone(),
1677 fs.clone(),
1678 &mut cx_b.to_async(),
1679 )
1680 .await
1681 .unwrap();
1682 let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1683
1684 // Open a buffer as client B
1685 let buffer_b = project_b
1686 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1687 .await
1688 .unwrap();
1689 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1690
1691 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1692 buffer_b.read_with(&cx_b, |buf, _| {
1693 assert!(buf.is_dirty());
1694 assert!(!buf.has_conflict());
1695 });
1696
1697 buffer_b
1698 .update(&mut cx_b, |buf, cx| buf.save(cx))
1699 .await
1700 .unwrap();
1701 worktree_b
1702 .condition(&cx_b, |_, cx| {
1703 buffer_b.read(cx).file().unwrap().mtime() != mtime
1704 })
1705 .await;
1706 buffer_b.read_with(&cx_b, |buf, _| {
1707 assert!(!buf.is_dirty());
1708 assert!(!buf.has_conflict());
1709 });
1710
1711 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1712 buffer_b.read_with(&cx_b, |buf, _| {
1713 assert!(buf.is_dirty());
1714 assert!(!buf.has_conflict());
1715 });
1716 }
1717
1718 #[gpui::test]
1719 async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1720 cx_a.foreground().forbid_parking();
1721 let lang_registry = Arc::new(LanguageRegistry::new());
1722 let fs = Arc::new(FakeFs::new(cx_a.background()));
1723
1724 // Connect to a server as 2 clients.
1725 let mut server = TestServer::start(cx_a.foreground()).await;
1726 let client_a = server.create_client(&mut cx_a, "user_a").await;
1727 let client_b = server.create_client(&mut cx_b, "user_b").await;
1728
1729 // Share a project as client A
1730 fs.insert_tree(
1731 "/dir",
1732 json!({
1733 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1734 "a.txt": "a-contents",
1735 }),
1736 )
1737 .await;
1738
1739 let project_a = cx_a.update(|cx| {
1740 Project::local(
1741 client_a.clone(),
1742 client_a.user_store.clone(),
1743 lang_registry.clone(),
1744 fs.clone(),
1745 cx,
1746 )
1747 });
1748 let (worktree_a, _) = project_a
1749 .update(&mut cx_a, |p, cx| {
1750 p.find_or_create_local_worktree("/dir", false, cx)
1751 })
1752 .await
1753 .unwrap();
1754 worktree_a
1755 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1756 .await;
1757 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1758 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1759 project_a
1760 .update(&mut cx_a, |p, cx| p.share(cx))
1761 .await
1762 .unwrap();
1763
1764 // Join that project as client B
1765 let project_b = Project::remote(
1766 project_id,
1767 client_b.clone(),
1768 client_b.user_store.clone(),
1769 lang_registry.clone(),
1770 fs.clone(),
1771 &mut cx_b.to_async(),
1772 )
1773 .await
1774 .unwrap();
1775 let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1776
1777 // Open a buffer as client B
1778 let buffer_b = project_b
1779 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1780 .await
1781 .unwrap();
1782 buffer_b.read_with(&cx_b, |buf, _| {
1783 assert!(!buf.is_dirty());
1784 assert!(!buf.has_conflict());
1785 });
1786
1787 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1788 .await
1789 .unwrap();
1790 buffer_b
1791 .condition(&cx_b, |buf, _| {
1792 buf.text() == "new contents" && !buf.is_dirty()
1793 })
1794 .await;
1795 buffer_b.read_with(&cx_b, |buf, _| {
1796 assert!(!buf.has_conflict());
1797 });
1798 }
1799
1800 #[gpui::test(iterations = 100)]
1801 async fn test_editing_while_guest_opens_buffer(
1802 mut cx_a: TestAppContext,
1803 mut cx_b: TestAppContext,
1804 ) {
1805 cx_a.foreground().forbid_parking();
1806 let lang_registry = Arc::new(LanguageRegistry::new());
1807 let fs = Arc::new(FakeFs::new(cx_a.background()));
1808
1809 // Connect to a server as 2 clients.
1810 let mut server = TestServer::start(cx_a.foreground()).await;
1811 let client_a = server.create_client(&mut cx_a, "user_a").await;
1812 let client_b = server.create_client(&mut cx_b, "user_b").await;
1813
1814 // Share a project as client A
1815 fs.insert_tree(
1816 "/dir",
1817 json!({
1818 ".zed.toml": r#"collaborators = ["user_b"]"#,
1819 "a.txt": "a-contents",
1820 }),
1821 )
1822 .await;
1823 let project_a = cx_a.update(|cx| {
1824 Project::local(
1825 client_a.clone(),
1826 client_a.user_store.clone(),
1827 lang_registry.clone(),
1828 fs.clone(),
1829 cx,
1830 )
1831 });
1832 let (worktree_a, _) = project_a
1833 .update(&mut cx_a, |p, cx| {
1834 p.find_or_create_local_worktree("/dir", false, cx)
1835 })
1836 .await
1837 .unwrap();
1838 worktree_a
1839 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1840 .await;
1841 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1842 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1843 project_a
1844 .update(&mut cx_a, |p, cx| p.share(cx))
1845 .await
1846 .unwrap();
1847
1848 // Join that project as client B
1849 let project_b = Project::remote(
1850 project_id,
1851 client_b.clone(),
1852 client_b.user_store.clone(),
1853 lang_registry.clone(),
1854 fs.clone(),
1855 &mut cx_b.to_async(),
1856 )
1857 .await
1858 .unwrap();
1859
1860 // Open a buffer as client A
1861 let buffer_a = project_a
1862 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1863 .await
1864 .unwrap();
1865
1866 // Start opening the same buffer as client B
1867 let buffer_b = cx_b
1868 .background()
1869 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1870 task::yield_now().await;
1871
1872 // Edit the buffer as client A while client B is still opening it.
1873 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1874
1875 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1876 let buffer_b = buffer_b.await.unwrap();
1877 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1878 }
1879
1880 #[gpui::test]
1881 async fn test_leaving_worktree_while_opening_buffer(
1882 mut cx_a: TestAppContext,
1883 mut cx_b: TestAppContext,
1884 ) {
1885 cx_a.foreground().forbid_parking();
1886 let lang_registry = Arc::new(LanguageRegistry::new());
1887 let fs = Arc::new(FakeFs::new(cx_a.background()));
1888
1889 // Connect to a server as 2 clients.
1890 let mut server = TestServer::start(cx_a.foreground()).await;
1891 let client_a = server.create_client(&mut cx_a, "user_a").await;
1892 let client_b = server.create_client(&mut cx_b, "user_b").await;
1893
1894 // Share a project as client A
1895 fs.insert_tree(
1896 "/dir",
1897 json!({
1898 ".zed.toml": r#"collaborators = ["user_b"]"#,
1899 "a.txt": "a-contents",
1900 }),
1901 )
1902 .await;
1903 let project_a = cx_a.update(|cx| {
1904 Project::local(
1905 client_a.clone(),
1906 client_a.user_store.clone(),
1907 lang_registry.clone(),
1908 fs.clone(),
1909 cx,
1910 )
1911 });
1912 let (worktree_a, _) = project_a
1913 .update(&mut cx_a, |p, cx| {
1914 p.find_or_create_local_worktree("/dir", false, cx)
1915 })
1916 .await
1917 .unwrap();
1918 worktree_a
1919 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1920 .await;
1921 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1922 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1923 project_a
1924 .update(&mut cx_a, |p, cx| p.share(cx))
1925 .await
1926 .unwrap();
1927
1928 // Join that project as client B
1929 let project_b = Project::remote(
1930 project_id,
1931 client_b.clone(),
1932 client_b.user_store.clone(),
1933 lang_registry.clone(),
1934 fs.clone(),
1935 &mut cx_b.to_async(),
1936 )
1937 .await
1938 .unwrap();
1939
1940 // See that a guest has joined as client A.
1941 project_a
1942 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1943 .await;
1944
1945 // Begin opening a buffer as client B, but leave the project before the open completes.
1946 let buffer_b = cx_b
1947 .background()
1948 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1949 cx_b.update(|_| drop(project_b));
1950 drop(buffer_b);
1951
1952 // See that the guest has left.
1953 project_a
1954 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1955 .await;
1956 }
1957
1958 #[gpui::test]
1959 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1960 cx_a.foreground().forbid_parking();
1961 let lang_registry = Arc::new(LanguageRegistry::new());
1962 let fs = Arc::new(FakeFs::new(cx_a.background()));
1963
1964 // Connect to a server as 2 clients.
1965 let mut server = TestServer::start(cx_a.foreground()).await;
1966 let client_a = server.create_client(&mut cx_a, "user_a").await;
1967 let client_b = server.create_client(&mut cx_b, "user_b").await;
1968
1969 // Share a project as client A
1970 fs.insert_tree(
1971 "/a",
1972 json!({
1973 ".zed.toml": r#"collaborators = ["user_b"]"#,
1974 "a.txt": "a-contents",
1975 "b.txt": "b-contents",
1976 }),
1977 )
1978 .await;
1979 let project_a = cx_a.update(|cx| {
1980 Project::local(
1981 client_a.clone(),
1982 client_a.user_store.clone(),
1983 lang_registry.clone(),
1984 fs.clone(),
1985 cx,
1986 )
1987 });
1988 let (worktree_a, _) = project_a
1989 .update(&mut cx_a, |p, cx| {
1990 p.find_or_create_local_worktree("/a", false, cx)
1991 })
1992 .await
1993 .unwrap();
1994 worktree_a
1995 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1996 .await;
1997 let project_id = project_a
1998 .update(&mut cx_a, |project, _| project.next_remote_id())
1999 .await;
2000 project_a
2001 .update(&mut cx_a, |project, cx| project.share(cx))
2002 .await
2003 .unwrap();
2004
2005 // Join that project as client B
2006 let _project_b = Project::remote(
2007 project_id,
2008 client_b.clone(),
2009 client_b.user_store.clone(),
2010 lang_registry.clone(),
2011 fs.clone(),
2012 &mut cx_b.to_async(),
2013 )
2014 .await
2015 .unwrap();
2016
2017 // See that a guest has joined as client A.
2018 project_a
2019 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2020 .await;
2021
2022 // Drop client B's connection and ensure client A observes client B leaving the worktree.
2023 client_b.disconnect(&cx_b.to_async()).unwrap();
2024 project_a
2025 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2026 .await;
2027 }
2028
2029 #[gpui::test]
2030 async fn test_collaborating_with_diagnostics(
2031 mut cx_a: TestAppContext,
2032 mut cx_b: TestAppContext,
2033 ) {
2034 cx_a.foreground().forbid_parking();
2035 let mut lang_registry = Arc::new(LanguageRegistry::new());
2036 let fs = Arc::new(FakeFs::new(cx_a.background()));
2037
2038 // Set up a fake language server.
2039 let (language_server_config, mut fake_language_server) =
2040 LanguageServerConfig::fake(cx_a.background()).await;
2041 Arc::get_mut(&mut lang_registry)
2042 .unwrap()
2043 .add(Arc::new(Language::new(
2044 LanguageConfig {
2045 name: "Rust".to_string(),
2046 path_suffixes: vec!["rs".to_string()],
2047 language_server: Some(language_server_config),
2048 ..Default::default()
2049 },
2050 Some(tree_sitter_rust::language()),
2051 )));
2052
2053 // Connect to a server as 2 clients.
2054 let mut server = TestServer::start(cx_a.foreground()).await;
2055 let client_a = server.create_client(&mut cx_a, "user_a").await;
2056 let client_b = server.create_client(&mut cx_b, "user_b").await;
2057
2058 // Share a project as client A
2059 fs.insert_tree(
2060 "/a",
2061 json!({
2062 ".zed.toml": r#"collaborators = ["user_b"]"#,
2063 "a.rs": "let one = two",
2064 "other.rs": "",
2065 }),
2066 )
2067 .await;
2068 let project_a = cx_a.update(|cx| {
2069 Project::local(
2070 client_a.clone(),
2071 client_a.user_store.clone(),
2072 lang_registry.clone(),
2073 fs.clone(),
2074 cx,
2075 )
2076 });
2077 let (worktree_a, _) = project_a
2078 .update(&mut cx_a, |p, cx| {
2079 p.find_or_create_local_worktree("/a", false, cx)
2080 })
2081 .await
2082 .unwrap();
2083 worktree_a
2084 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2085 .await;
2086 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2087 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2088 project_a
2089 .update(&mut cx_a, |p, cx| p.share(cx))
2090 .await
2091 .unwrap();
2092
2093 // Cause the language server to start.
2094 let _ = cx_a
2095 .background()
2096 .spawn(project_a.update(&mut cx_a, |project, cx| {
2097 project.open_buffer(
2098 ProjectPath {
2099 worktree_id,
2100 path: Path::new("other.rs").into(),
2101 },
2102 cx,
2103 )
2104 }))
2105 .await
2106 .unwrap();
2107
2108 // Simulate a language server reporting errors for a file.
2109 fake_language_server
2110 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2111 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2112 version: None,
2113 diagnostics: vec![lsp::Diagnostic {
2114 severity: Some(lsp::DiagnosticSeverity::ERROR),
2115 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2116 message: "message 1".to_string(),
2117 ..Default::default()
2118 }],
2119 })
2120 .await;
2121
2122 // Wait for server to see the diagnostics update.
2123 server
2124 .condition(|store| {
2125 let worktree = store
2126 .project(project_id)
2127 .unwrap()
2128 .worktrees
2129 .get(&worktree_id.to_proto())
2130 .unwrap();
2131
2132 !worktree
2133 .share
2134 .as_ref()
2135 .unwrap()
2136 .diagnostic_summaries
2137 .is_empty()
2138 })
2139 .await;
2140
2141 // Join the worktree as client B.
2142 let project_b = Project::remote(
2143 project_id,
2144 client_b.clone(),
2145 client_b.user_store.clone(),
2146 lang_registry.clone(),
2147 fs.clone(),
2148 &mut cx_b.to_async(),
2149 )
2150 .await
2151 .unwrap();
2152
2153 project_b.read_with(&cx_b, |project, cx| {
2154 assert_eq!(
2155 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2156 &[(
2157 ProjectPath {
2158 worktree_id,
2159 path: Arc::from(Path::new("a.rs")),
2160 },
2161 DiagnosticSummary {
2162 error_count: 1,
2163 warning_count: 0,
2164 ..Default::default()
2165 },
2166 )]
2167 )
2168 });
2169
2170 // Simulate a language server reporting more errors for a file.
2171 fake_language_server
2172 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2173 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2174 version: None,
2175 diagnostics: vec![
2176 lsp::Diagnostic {
2177 severity: Some(lsp::DiagnosticSeverity::ERROR),
2178 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2179 message: "message 1".to_string(),
2180 ..Default::default()
2181 },
2182 lsp::Diagnostic {
2183 severity: Some(lsp::DiagnosticSeverity::WARNING),
2184 range: lsp::Range::new(
2185 lsp::Position::new(0, 10),
2186 lsp::Position::new(0, 13),
2187 ),
2188 message: "message 2".to_string(),
2189 ..Default::default()
2190 },
2191 ],
2192 })
2193 .await;
2194
2195 // Client b gets the updated summaries
2196 project_b
2197 .condition(&cx_b, |project, cx| {
2198 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2199 == &[(
2200 ProjectPath {
2201 worktree_id,
2202 path: Arc::from(Path::new("a.rs")),
2203 },
2204 DiagnosticSummary {
2205 error_count: 1,
2206 warning_count: 1,
2207 ..Default::default()
2208 },
2209 )]
2210 })
2211 .await;
2212
2213 // Open the file with the errors on client B. They should be present.
2214 let buffer_b = cx_b
2215 .background()
2216 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2217 .await
2218 .unwrap();
2219
2220 buffer_b.read_with(&cx_b, |buffer, _| {
2221 assert_eq!(
2222 buffer
2223 .snapshot()
2224 .diagnostics_in_range::<_, Point>(0..buffer.len())
2225 .map(|entry| entry)
2226 .collect::<Vec<_>>(),
2227 &[
2228 DiagnosticEntry {
2229 range: Point::new(0, 4)..Point::new(0, 7),
2230 diagnostic: Diagnostic {
2231 group_id: 0,
2232 message: "message 1".to_string(),
2233 severity: lsp::DiagnosticSeverity::ERROR,
2234 is_primary: true,
2235 ..Default::default()
2236 }
2237 },
2238 DiagnosticEntry {
2239 range: Point::new(0, 10)..Point::new(0, 13),
2240 diagnostic: Diagnostic {
2241 group_id: 1,
2242 severity: lsp::DiagnosticSeverity::WARNING,
2243 message: "message 2".to_string(),
2244 is_primary: true,
2245 ..Default::default()
2246 }
2247 }
2248 ]
2249 );
2250 });
2251 }
2252
2253 #[gpui::test]
2254 async fn test_collaborating_with_completion(
2255 mut cx_a: TestAppContext,
2256 mut cx_b: TestAppContext,
2257 ) {
2258 cx_a.foreground().forbid_parking();
2259 let mut lang_registry = Arc::new(LanguageRegistry::new());
2260 let fs = Arc::new(FakeFs::new(cx_a.background()));
2261
2262 // Set up a fake language server.
2263 let (language_server_config, mut fake_language_server) =
2264 LanguageServerConfig::fake_with_capabilities(
2265 lsp::ServerCapabilities {
2266 completion_provider: Some(lsp::CompletionOptions {
2267 trigger_characters: Some(vec![".".to_string()]),
2268 ..Default::default()
2269 }),
2270 ..Default::default()
2271 },
2272 cx_a.background(),
2273 )
2274 .await;
2275 Arc::get_mut(&mut lang_registry)
2276 .unwrap()
2277 .add(Arc::new(Language::new(
2278 LanguageConfig {
2279 name: "Rust".to_string(),
2280 path_suffixes: vec!["rs".to_string()],
2281 language_server: Some(language_server_config),
2282 ..Default::default()
2283 },
2284 Some(tree_sitter_rust::language()),
2285 )));
2286
2287 // Connect to a server as 2 clients.
2288 let mut server = TestServer::start(cx_a.foreground()).await;
2289 let client_a = server.create_client(&mut cx_a, "user_a").await;
2290 let client_b = server.create_client(&mut cx_b, "user_b").await;
2291
2292 // Share a project as client A
2293 fs.insert_tree(
2294 "/a",
2295 json!({
2296 ".zed.toml": r#"collaborators = ["user_b"]"#,
2297 "main.rs": "fn main() { a }",
2298 "other.rs": "",
2299 }),
2300 )
2301 .await;
2302 let project_a = cx_a.update(|cx| {
2303 Project::local(
2304 client_a.clone(),
2305 client_a.user_store.clone(),
2306 lang_registry.clone(),
2307 fs.clone(),
2308 cx,
2309 )
2310 });
2311 let (worktree_a, _) = project_a
2312 .update(&mut cx_a, |p, cx| {
2313 p.find_or_create_local_worktree("/a", false, cx)
2314 })
2315 .await
2316 .unwrap();
2317 worktree_a
2318 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2319 .await;
2320 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2321 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2322 project_a
2323 .update(&mut cx_a, |p, cx| p.share(cx))
2324 .await
2325 .unwrap();
2326
2327 // Join the worktree as client B.
2328 let project_b = Project::remote(
2329 project_id,
2330 client_b.clone(),
2331 client_b.user_store.clone(),
2332 lang_registry.clone(),
2333 fs.clone(),
2334 &mut cx_b.to_async(),
2335 )
2336 .await
2337 .unwrap();
2338
2339 // Open a file in an editor as the guest.
2340 let buffer_b = project_b
2341 .update(&mut cx_b, |p, cx| {
2342 p.open_buffer((worktree_id, "main.rs"), cx)
2343 })
2344 .await
2345 .unwrap();
2346 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2347 let editor_b = cx_b.add_view(window_b, |cx| {
2348 Editor::for_buffer(
2349 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2350 Arc::new(|cx| EditorSettings::test(cx)),
2351 cx,
2352 )
2353 });
2354
2355 // Type a completion trigger character as the guest.
2356 editor_b.update(&mut cx_b, |editor, cx| {
2357 editor.select_ranges([13..13], None, cx);
2358 editor.handle_input(&Input(".".into()), cx);
2359 cx.focus(&editor_b);
2360 });
2361
2362 // Receive a completion request as the host's language server.
2363 let (request_id, params) = fake_language_server
2364 .receive_request::<lsp::request::Completion>()
2365 .await;
2366 assert_eq!(
2367 params.text_document_position.text_document.uri,
2368 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2369 );
2370 assert_eq!(
2371 params.text_document_position.position,
2372 lsp::Position::new(0, 14),
2373 );
2374
2375 // Return some completions from the host's language server.
2376 fake_language_server
2377 .respond(
2378 request_id,
2379 Some(lsp::CompletionResponse::Array(vec![
2380 lsp::CompletionItem {
2381 label: "first_method(…)".into(),
2382 detail: Some("fn(&mut self, B) -> C".into()),
2383 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2384 new_text: "first_method($1)".to_string(),
2385 range: lsp::Range::new(
2386 lsp::Position::new(0, 14),
2387 lsp::Position::new(0, 14),
2388 ),
2389 })),
2390 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2391 ..Default::default()
2392 },
2393 lsp::CompletionItem {
2394 label: "second_method(…)".into(),
2395 detail: Some("fn(&mut self, C) -> D<E>".into()),
2396 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2397 new_text: "second_method()".to_string(),
2398 range: lsp::Range::new(
2399 lsp::Position::new(0, 14),
2400 lsp::Position::new(0, 14),
2401 ),
2402 })),
2403 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2404 ..Default::default()
2405 },
2406 ])),
2407 )
2408 .await;
2409
2410 // Open the buffer on the host.
2411 let buffer_a = project_a
2412 .update(&mut cx_a, |p, cx| {
2413 p.open_buffer((worktree_id, "main.rs"), cx)
2414 })
2415 .await
2416 .unwrap();
2417 buffer_a
2418 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2419 .await;
2420
2421 // Confirm a completion on the guest.
2422 editor_b.next_notification(&cx_b).await;
2423 editor_b.update(&mut cx_b, |editor, cx| {
2424 assert!(editor.has_completions());
2425 editor.confirm_completion(Some(0), cx);
2426 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2427 });
2428
2429 buffer_a
2430 .condition(&cx_a, |buffer, _| {
2431 buffer.text() == "fn main() { a.first_method() }"
2432 })
2433 .await;
2434
2435 // Receive a request resolve the selected completion on the host's language server.
2436 let (request_id, params) = fake_language_server
2437 .receive_request::<lsp::request::ResolveCompletionItem>()
2438 .await;
2439 assert_eq!(params.label, "first_method(…)");
2440
2441 // Return a resolved completion from the host's language server.
2442 // The resolved completion has an additional text edit.
2443 fake_language_server
2444 .respond(
2445 request_id,
2446 lsp::CompletionItem {
2447 label: "first_method(…)".into(),
2448 detail: Some("fn(&mut self, B) -> C".into()),
2449 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2450 new_text: "first_method($1)".to_string(),
2451 range: lsp::Range::new(
2452 lsp::Position::new(0, 14),
2453 lsp::Position::new(0, 14),
2454 ),
2455 })),
2456 additional_text_edits: Some(vec![lsp::TextEdit {
2457 new_text: "use d::SomeTrait;\n".to_string(),
2458 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2459 }]),
2460 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2461 ..Default::default()
2462 },
2463 )
2464 .await;
2465
2466 // The additional edit is applied.
2467 buffer_b
2468 .condition(&cx_b, |buffer, _| {
2469 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2470 })
2471 .await;
2472 assert_eq!(
2473 buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2474 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2475 );
2476 }
2477
2478 #[gpui::test]
2479 async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2480 cx_a.foreground().forbid_parking();
2481 let mut lang_registry = Arc::new(LanguageRegistry::new());
2482 let fs = Arc::new(FakeFs::new(cx_a.background()));
2483
2484 // Set up a fake language server.
2485 let (language_server_config, mut fake_language_server) =
2486 LanguageServerConfig::fake(cx_a.background()).await;
2487 Arc::get_mut(&mut lang_registry)
2488 .unwrap()
2489 .add(Arc::new(Language::new(
2490 LanguageConfig {
2491 name: "Rust".to_string(),
2492 path_suffixes: vec!["rs".to_string()],
2493 language_server: Some(language_server_config),
2494 ..Default::default()
2495 },
2496 Some(tree_sitter_rust::language()),
2497 )));
2498
2499 // Connect to a server as 2 clients.
2500 let mut server = TestServer::start(cx_a.foreground()).await;
2501 let client_a = server.create_client(&mut cx_a, "user_a").await;
2502 let client_b = server.create_client(&mut cx_b, "user_b").await;
2503
2504 // Share a project as client A
2505 fs.insert_tree(
2506 "/a",
2507 json!({
2508 ".zed.toml": r#"collaborators = ["user_b"]"#,
2509 "a.rs": "let one = two",
2510 }),
2511 )
2512 .await;
2513 let project_a = cx_a.update(|cx| {
2514 Project::local(
2515 client_a.clone(),
2516 client_a.user_store.clone(),
2517 lang_registry.clone(),
2518 fs.clone(),
2519 cx,
2520 )
2521 });
2522 let (worktree_a, _) = project_a
2523 .update(&mut cx_a, |p, cx| {
2524 p.find_or_create_local_worktree("/a", false, cx)
2525 })
2526 .await
2527 .unwrap();
2528 worktree_a
2529 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2530 .await;
2531 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2532 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2533 project_a
2534 .update(&mut cx_a, |p, cx| p.share(cx))
2535 .await
2536 .unwrap();
2537
2538 // Join the worktree as client B.
2539 let project_b = Project::remote(
2540 project_id,
2541 client_b.clone(),
2542 client_b.user_store.clone(),
2543 lang_registry.clone(),
2544 fs.clone(),
2545 &mut cx_b.to_async(),
2546 )
2547 .await
2548 .unwrap();
2549
2550 let buffer_b = cx_b
2551 .background()
2552 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2553 .await
2554 .unwrap();
2555
2556 let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
2557 let (request_id, _) = fake_language_server
2558 .receive_request::<lsp::request::Formatting>()
2559 .await;
2560 fake_language_server
2561 .respond(
2562 request_id,
2563 Some(vec![
2564 lsp::TextEdit {
2565 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2566 new_text: "h".to_string(),
2567 },
2568 lsp::TextEdit {
2569 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2570 new_text: "y".to_string(),
2571 },
2572 ]),
2573 )
2574 .await;
2575 format.await.unwrap();
2576 assert_eq!(
2577 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2578 "let honey = two"
2579 );
2580 }
2581
2582 #[gpui::test]
2583 async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2584 cx_a.foreground().forbid_parking();
2585 let mut lang_registry = Arc::new(LanguageRegistry::new());
2586 let fs = Arc::new(FakeFs::new(cx_a.background()));
2587 fs.insert_tree(
2588 "/root-1",
2589 json!({
2590 ".zed.toml": r#"collaborators = ["user_b"]"#,
2591 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2592 }),
2593 )
2594 .await;
2595 fs.insert_tree(
2596 "/root-2",
2597 json!({
2598 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2599 }),
2600 )
2601 .await;
2602
2603 // Set up a fake language server.
2604 let (language_server_config, mut fake_language_server) =
2605 LanguageServerConfig::fake(cx_a.background()).await;
2606 Arc::get_mut(&mut lang_registry)
2607 .unwrap()
2608 .add(Arc::new(Language::new(
2609 LanguageConfig {
2610 name: "Rust".to_string(),
2611 path_suffixes: vec!["rs".to_string()],
2612 language_server: Some(language_server_config),
2613 ..Default::default()
2614 },
2615 Some(tree_sitter_rust::language()),
2616 )));
2617
2618 // Connect to a server as 2 clients.
2619 let mut server = TestServer::start(cx_a.foreground()).await;
2620 let client_a = server.create_client(&mut cx_a, "user_a").await;
2621 let client_b = server.create_client(&mut cx_b, "user_b").await;
2622
2623 // Share a project as client A
2624 let project_a = cx_a.update(|cx| {
2625 Project::local(
2626 client_a.clone(),
2627 client_a.user_store.clone(),
2628 lang_registry.clone(),
2629 fs.clone(),
2630 cx,
2631 )
2632 });
2633 let (worktree_a, _) = project_a
2634 .update(&mut cx_a, |p, cx| {
2635 p.find_or_create_local_worktree("/root-1", false, cx)
2636 })
2637 .await
2638 .unwrap();
2639 worktree_a
2640 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2641 .await;
2642 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2643 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2644 project_a
2645 .update(&mut cx_a, |p, cx| p.share(cx))
2646 .await
2647 .unwrap();
2648
2649 // Join the worktree as client B.
2650 let project_b = Project::remote(
2651 project_id,
2652 client_b.clone(),
2653 client_b.user_store.clone(),
2654 lang_registry.clone(),
2655 fs.clone(),
2656 &mut cx_b.to_async(),
2657 )
2658 .await
2659 .unwrap();
2660
2661 // Open the file to be formatted on client B.
2662 let buffer_b = cx_b
2663 .background()
2664 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2665 .await
2666 .unwrap();
2667
2668 let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2669 let (request_id, _) = fake_language_server
2670 .receive_request::<lsp::request::GotoDefinition>()
2671 .await;
2672 fake_language_server
2673 .respond(
2674 request_id,
2675 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2676 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2677 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2678 ))),
2679 )
2680 .await;
2681 let definitions_1 = definitions_1.await.unwrap();
2682 cx_b.read(|cx| {
2683 assert_eq!(definitions_1.len(), 1);
2684 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2685 let target_buffer = definitions_1[0].target_buffer.read(cx);
2686 assert_eq!(
2687 target_buffer.text(),
2688 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2689 );
2690 assert_eq!(
2691 definitions_1[0].target_range.to_point(target_buffer),
2692 Point::new(0, 6)..Point::new(0, 9)
2693 );
2694 });
2695
2696 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2697 // the previous call to `definition`.
2698 let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2699 let (request_id, _) = fake_language_server
2700 .receive_request::<lsp::request::GotoDefinition>()
2701 .await;
2702 fake_language_server
2703 .respond(
2704 request_id,
2705 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2706 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2707 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2708 ))),
2709 )
2710 .await;
2711 let definitions_2 = definitions_2.await.unwrap();
2712 cx_b.read(|cx| {
2713 assert_eq!(definitions_2.len(), 1);
2714 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2715 let target_buffer = definitions_2[0].target_buffer.read(cx);
2716 assert_eq!(
2717 target_buffer.text(),
2718 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2719 );
2720 assert_eq!(
2721 definitions_2[0].target_range.to_point(target_buffer),
2722 Point::new(1, 6)..Point::new(1, 11)
2723 );
2724 });
2725 assert_eq!(
2726 definitions_1[0].target_buffer,
2727 definitions_2[0].target_buffer
2728 );
2729
2730 cx_b.update(|_| {
2731 drop(definitions_1);
2732 drop(definitions_2);
2733 });
2734 project_b
2735 .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2736 .await;
2737 }
2738
2739 #[gpui::test]
2740 async fn test_open_buffer_while_getting_definition_pointing_to_it(
2741 mut cx_a: TestAppContext,
2742 mut cx_b: TestAppContext,
2743 mut rng: StdRng,
2744 ) {
2745 cx_a.foreground().forbid_parking();
2746 let mut lang_registry = Arc::new(LanguageRegistry::new());
2747 let fs = Arc::new(FakeFs::new(cx_a.background()));
2748 fs.insert_tree(
2749 "/root",
2750 json!({
2751 ".zed.toml": r#"collaborators = ["user_b"]"#,
2752 "a.rs": "const ONE: usize = b::TWO;",
2753 "b.rs": "const TWO: usize = 2",
2754 }),
2755 )
2756 .await;
2757
2758 // Set up a fake language server.
2759 let (language_server_config, mut fake_language_server) =
2760 LanguageServerConfig::fake(cx_a.background()).await;
2761 Arc::get_mut(&mut lang_registry)
2762 .unwrap()
2763 .add(Arc::new(Language::new(
2764 LanguageConfig {
2765 name: "Rust".to_string(),
2766 path_suffixes: vec!["rs".to_string()],
2767 language_server: Some(language_server_config),
2768 ..Default::default()
2769 },
2770 Some(tree_sitter_rust::language()),
2771 )));
2772
2773 // Connect to a server as 2 clients.
2774 let mut server = TestServer::start(cx_a.foreground()).await;
2775 let client_a = server.create_client(&mut cx_a, "user_a").await;
2776 let client_b = server.create_client(&mut cx_b, "user_b").await;
2777
2778 // Share a project as client A
2779 let project_a = cx_a.update(|cx| {
2780 Project::local(
2781 client_a.clone(),
2782 client_a.user_store.clone(),
2783 lang_registry.clone(),
2784 fs.clone(),
2785 cx,
2786 )
2787 });
2788 let (worktree_a, _) = project_a
2789 .update(&mut cx_a, |p, cx| {
2790 p.find_or_create_local_worktree("/root", false, cx)
2791 })
2792 .await
2793 .unwrap();
2794 worktree_a
2795 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2796 .await;
2797 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2798 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2799 project_a
2800 .update(&mut cx_a, |p, cx| p.share(cx))
2801 .await
2802 .unwrap();
2803
2804 // Join the worktree as client B.
2805 let project_b = Project::remote(
2806 project_id,
2807 client_b.clone(),
2808 client_b.user_store.clone(),
2809 lang_registry.clone(),
2810 fs.clone(),
2811 &mut cx_b.to_async(),
2812 )
2813 .await
2814 .unwrap();
2815
2816 let buffer_b1 = cx_b
2817 .background()
2818 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2819 .await
2820 .unwrap();
2821
2822 let definitions;
2823 let buffer_b2;
2824 if rng.gen() {
2825 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2826 buffer_b2 =
2827 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2828 } else {
2829 buffer_b2 =
2830 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2831 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2832 }
2833
2834 let (request_id, _) = fake_language_server
2835 .receive_request::<lsp::request::GotoDefinition>()
2836 .await;
2837 fake_language_server
2838 .respond(
2839 request_id,
2840 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2841 lsp::Url::from_file_path("/root/b.rs").unwrap(),
2842 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2843 ))),
2844 )
2845 .await;
2846
2847 let buffer_b2 = buffer_b2.await.unwrap();
2848 let definitions = definitions.await.unwrap();
2849 assert_eq!(definitions.len(), 1);
2850 assert_eq!(definitions[0].target_buffer, buffer_b2);
2851 }
2852
2853 #[gpui::test]
2854 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2855 cx_a.foreground().forbid_parking();
2856
2857 // Connect to a server as 2 clients.
2858 let mut server = TestServer::start(cx_a.foreground()).await;
2859 let client_a = server.create_client(&mut cx_a, "user_a").await;
2860 let client_b = server.create_client(&mut cx_b, "user_b").await;
2861
2862 // Create an org that includes these 2 users.
2863 let db = &server.app_state.db;
2864 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2865 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2866 .await
2867 .unwrap();
2868 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2869 .await
2870 .unwrap();
2871
2872 // Create a channel that includes all the users.
2873 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2874 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2875 .await
2876 .unwrap();
2877 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2878 .await
2879 .unwrap();
2880 db.create_channel_message(
2881 channel_id,
2882 client_b.current_user_id(&cx_b),
2883 "hello A, it's B.",
2884 OffsetDateTime::now_utc(),
2885 1,
2886 )
2887 .await
2888 .unwrap();
2889
2890 let channels_a = cx_a
2891 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2892 channels_a
2893 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2894 .await;
2895 channels_a.read_with(&cx_a, |list, _| {
2896 assert_eq!(
2897 list.available_channels().unwrap(),
2898 &[ChannelDetails {
2899 id: channel_id.to_proto(),
2900 name: "test-channel".to_string()
2901 }]
2902 )
2903 });
2904 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2905 this.get_channel(channel_id.to_proto(), cx).unwrap()
2906 });
2907 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2908 channel_a
2909 .condition(&cx_a, |channel, _| {
2910 channel_messages(channel)
2911 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2912 })
2913 .await;
2914
2915 let channels_b = cx_b
2916 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2917 channels_b
2918 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2919 .await;
2920 channels_b.read_with(&cx_b, |list, _| {
2921 assert_eq!(
2922 list.available_channels().unwrap(),
2923 &[ChannelDetails {
2924 id: channel_id.to_proto(),
2925 name: "test-channel".to_string()
2926 }]
2927 )
2928 });
2929
2930 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2931 this.get_channel(channel_id.to_proto(), cx).unwrap()
2932 });
2933 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2934 channel_b
2935 .condition(&cx_b, |channel, _| {
2936 channel_messages(channel)
2937 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2938 })
2939 .await;
2940
2941 channel_a
2942 .update(&mut cx_a, |channel, cx| {
2943 channel
2944 .send_message("oh, hi B.".to_string(), cx)
2945 .unwrap()
2946 .detach();
2947 let task = channel.send_message("sup".to_string(), cx).unwrap();
2948 assert_eq!(
2949 channel_messages(channel),
2950 &[
2951 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2952 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2953 ("user_a".to_string(), "sup".to_string(), true)
2954 ]
2955 );
2956 task
2957 })
2958 .await
2959 .unwrap();
2960
2961 channel_b
2962 .condition(&cx_b, |channel, _| {
2963 channel_messages(channel)
2964 == [
2965 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2966 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2967 ("user_a".to_string(), "sup".to_string(), false),
2968 ]
2969 })
2970 .await;
2971
2972 assert_eq!(
2973 server
2974 .state()
2975 .await
2976 .channel(channel_id)
2977 .unwrap()
2978 .connection_ids
2979 .len(),
2980 2
2981 );
2982 cx_b.update(|_| drop(channel_b));
2983 server
2984 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2985 .await;
2986
2987 cx_a.update(|_| drop(channel_a));
2988 server
2989 .condition(|state| state.channel(channel_id).is_none())
2990 .await;
2991 }
2992
2993 #[gpui::test]
2994 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2995 cx_a.foreground().forbid_parking();
2996
2997 let mut server = TestServer::start(cx_a.foreground()).await;
2998 let client_a = server.create_client(&mut cx_a, "user_a").await;
2999
3000 let db = &server.app_state.db;
3001 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3002 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3003 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3004 .await
3005 .unwrap();
3006 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3007 .await
3008 .unwrap();
3009
3010 let channels_a = cx_a
3011 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3012 channels_a
3013 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3014 .await;
3015 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3016 this.get_channel(channel_id.to_proto(), cx).unwrap()
3017 });
3018
3019 // Messages aren't allowed to be too long.
3020 channel_a
3021 .update(&mut cx_a, |channel, cx| {
3022 let long_body = "this is long.\n".repeat(1024);
3023 channel.send_message(long_body, cx).unwrap()
3024 })
3025 .await
3026 .unwrap_err();
3027
3028 // Messages aren't allowed to be blank.
3029 channel_a.update(&mut cx_a, |channel, cx| {
3030 channel.send_message(String::new(), cx).unwrap_err()
3031 });
3032
3033 // Leading and trailing whitespace are trimmed.
3034 channel_a
3035 .update(&mut cx_a, |channel, cx| {
3036 channel
3037 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3038 .unwrap()
3039 })
3040 .await
3041 .unwrap();
3042 assert_eq!(
3043 db.get_channel_messages(channel_id, 10, None)
3044 .await
3045 .unwrap()
3046 .iter()
3047 .map(|m| &m.body)
3048 .collect::<Vec<_>>(),
3049 &["surrounded by whitespace"]
3050 );
3051 }
3052
3053 #[gpui::test]
3054 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3055 cx_a.foreground().forbid_parking();
3056
3057 // Connect to a server as 2 clients.
3058 let mut server = TestServer::start(cx_a.foreground()).await;
3059 let client_a = server.create_client(&mut cx_a, "user_a").await;
3060 let client_b = server.create_client(&mut cx_b, "user_b").await;
3061 let mut status_b = client_b.status();
3062
3063 // Create an org that includes these 2 users.
3064 let db = &server.app_state.db;
3065 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3066 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3067 .await
3068 .unwrap();
3069 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3070 .await
3071 .unwrap();
3072
3073 // Create a channel that includes all the users.
3074 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3075 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3076 .await
3077 .unwrap();
3078 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3079 .await
3080 .unwrap();
3081 db.create_channel_message(
3082 channel_id,
3083 client_b.current_user_id(&cx_b),
3084 "hello A, it's B.",
3085 OffsetDateTime::now_utc(),
3086 2,
3087 )
3088 .await
3089 .unwrap();
3090
3091 let channels_a = cx_a
3092 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3093 channels_a
3094 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3095 .await;
3096
3097 channels_a.read_with(&cx_a, |list, _| {
3098 assert_eq!(
3099 list.available_channels().unwrap(),
3100 &[ChannelDetails {
3101 id: channel_id.to_proto(),
3102 name: "test-channel".to_string()
3103 }]
3104 )
3105 });
3106 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3107 this.get_channel(channel_id.to_proto(), cx).unwrap()
3108 });
3109 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3110 channel_a
3111 .condition(&cx_a, |channel, _| {
3112 channel_messages(channel)
3113 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3114 })
3115 .await;
3116
3117 let channels_b = cx_b
3118 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3119 channels_b
3120 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3121 .await;
3122 channels_b.read_with(&cx_b, |list, _| {
3123 assert_eq!(
3124 list.available_channels().unwrap(),
3125 &[ChannelDetails {
3126 id: channel_id.to_proto(),
3127 name: "test-channel".to_string()
3128 }]
3129 )
3130 });
3131
3132 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3133 this.get_channel(channel_id.to_proto(), cx).unwrap()
3134 });
3135 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3136 channel_b
3137 .condition(&cx_b, |channel, _| {
3138 channel_messages(channel)
3139 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3140 })
3141 .await;
3142
3143 // Disconnect client B, ensuring we can still access its cached channel data.
3144 server.forbid_connections();
3145 server.disconnect_client(client_b.current_user_id(&cx_b));
3146 while !matches!(
3147 status_b.next().await,
3148 Some(client::Status::ReconnectionError { .. })
3149 ) {}
3150
3151 channels_b.read_with(&cx_b, |channels, _| {
3152 assert_eq!(
3153 channels.available_channels().unwrap(),
3154 [ChannelDetails {
3155 id: channel_id.to_proto(),
3156 name: "test-channel".to_string()
3157 }]
3158 )
3159 });
3160 channel_b.read_with(&cx_b, |channel, _| {
3161 assert_eq!(
3162 channel_messages(channel),
3163 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3164 )
3165 });
3166
3167 // Send a message from client B while it is disconnected.
3168 channel_b
3169 .update(&mut cx_b, |channel, cx| {
3170 let task = channel
3171 .send_message("can you see this?".to_string(), cx)
3172 .unwrap();
3173 assert_eq!(
3174 channel_messages(channel),
3175 &[
3176 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3177 ("user_b".to_string(), "can you see this?".to_string(), true)
3178 ]
3179 );
3180 task
3181 })
3182 .await
3183 .unwrap_err();
3184
3185 // Send a message from client A while B is disconnected.
3186 channel_a
3187 .update(&mut cx_a, |channel, cx| {
3188 channel
3189 .send_message("oh, hi B.".to_string(), cx)
3190 .unwrap()
3191 .detach();
3192 let task = channel.send_message("sup".to_string(), cx).unwrap();
3193 assert_eq!(
3194 channel_messages(channel),
3195 &[
3196 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3197 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3198 ("user_a".to_string(), "sup".to_string(), true)
3199 ]
3200 );
3201 task
3202 })
3203 .await
3204 .unwrap();
3205
3206 // Give client B a chance to reconnect.
3207 server.allow_connections();
3208 cx_b.foreground().advance_clock(Duration::from_secs(10));
3209
3210 // Verify that B sees the new messages upon reconnection, as well as the message client B
3211 // sent while offline.
3212 channel_b
3213 .condition(&cx_b, |channel, _| {
3214 channel_messages(channel)
3215 == [
3216 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3217 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3218 ("user_a".to_string(), "sup".to_string(), false),
3219 ("user_b".to_string(), "can you see this?".to_string(), false),
3220 ]
3221 })
3222 .await;
3223
3224 // Ensure client A and B can communicate normally after reconnection.
3225 channel_a
3226 .update(&mut cx_a, |channel, cx| {
3227 channel.send_message("you online?".to_string(), cx).unwrap()
3228 })
3229 .await
3230 .unwrap();
3231 channel_b
3232 .condition(&cx_b, |channel, _| {
3233 channel_messages(channel)
3234 == [
3235 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3236 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3237 ("user_a".to_string(), "sup".to_string(), false),
3238 ("user_b".to_string(), "can you see this?".to_string(), false),
3239 ("user_a".to_string(), "you online?".to_string(), false),
3240 ]
3241 })
3242 .await;
3243
3244 channel_b
3245 .update(&mut cx_b, |channel, cx| {
3246 channel.send_message("yep".to_string(), cx).unwrap()
3247 })
3248 .await
3249 .unwrap();
3250 channel_a
3251 .condition(&cx_a, |channel, _| {
3252 channel_messages(channel)
3253 == [
3254 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3255 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3256 ("user_a".to_string(), "sup".to_string(), false),
3257 ("user_b".to_string(), "can you see this?".to_string(), false),
3258 ("user_a".to_string(), "you online?".to_string(), false),
3259 ("user_b".to_string(), "yep".to_string(), false),
3260 ]
3261 })
3262 .await;
3263 }
3264
3265 #[gpui::test]
3266 async fn test_contacts(
3267 mut cx_a: TestAppContext,
3268 mut cx_b: TestAppContext,
3269 mut cx_c: TestAppContext,
3270 ) {
3271 cx_a.foreground().forbid_parking();
3272 let lang_registry = Arc::new(LanguageRegistry::new());
3273 let fs = Arc::new(FakeFs::new(cx_a.background()));
3274
3275 // Connect to a server as 3 clients.
3276 let mut server = TestServer::start(cx_a.foreground()).await;
3277 let client_a = server.create_client(&mut cx_a, "user_a").await;
3278 let client_b = server.create_client(&mut cx_b, "user_b").await;
3279 let client_c = server.create_client(&mut cx_c, "user_c").await;
3280
3281 // Share a worktree as client A.
3282 fs.insert_tree(
3283 "/a",
3284 json!({
3285 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3286 }),
3287 )
3288 .await;
3289
3290 let project_a = cx_a.update(|cx| {
3291 Project::local(
3292 client_a.clone(),
3293 client_a.user_store.clone(),
3294 lang_registry.clone(),
3295 fs.clone(),
3296 cx,
3297 )
3298 });
3299 let (worktree_a, _) = project_a
3300 .update(&mut cx_a, |p, cx| {
3301 p.find_or_create_local_worktree("/a", false, cx)
3302 })
3303 .await
3304 .unwrap();
3305 worktree_a
3306 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3307 .await;
3308
3309 client_a
3310 .user_store
3311 .condition(&cx_a, |user_store, _| {
3312 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3313 })
3314 .await;
3315 client_b
3316 .user_store
3317 .condition(&cx_b, |user_store, _| {
3318 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3319 })
3320 .await;
3321 client_c
3322 .user_store
3323 .condition(&cx_c, |user_store, _| {
3324 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3325 })
3326 .await;
3327
3328 let project_id = project_a
3329 .update(&mut cx_a, |project, _| project.next_remote_id())
3330 .await;
3331 project_a
3332 .update(&mut cx_a, |project, cx| project.share(cx))
3333 .await
3334 .unwrap();
3335
3336 let _project_b = Project::remote(
3337 project_id,
3338 client_b.clone(),
3339 client_b.user_store.clone(),
3340 lang_registry.clone(),
3341 fs.clone(),
3342 &mut cx_b.to_async(),
3343 )
3344 .await
3345 .unwrap();
3346
3347 client_a
3348 .user_store
3349 .condition(&cx_a, |user_store, _| {
3350 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3351 })
3352 .await;
3353 client_b
3354 .user_store
3355 .condition(&cx_b, |user_store, _| {
3356 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3357 })
3358 .await;
3359 client_c
3360 .user_store
3361 .condition(&cx_c, |user_store, _| {
3362 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3363 })
3364 .await;
3365
3366 project_a
3367 .condition(&cx_a, |project, _| {
3368 project.collaborators().contains_key(&client_b.peer_id)
3369 })
3370 .await;
3371
3372 cx_a.update(move |_| drop(project_a));
3373 client_a
3374 .user_store
3375 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3376 .await;
3377 client_b
3378 .user_store
3379 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3380 .await;
3381 client_c
3382 .user_store
3383 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3384 .await;
3385
3386 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3387 user_store
3388 .contacts()
3389 .iter()
3390 .map(|contact| {
3391 let worktrees = contact
3392 .projects
3393 .iter()
3394 .map(|p| {
3395 (
3396 p.worktree_root_names[0].as_str(),
3397 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3398 )
3399 })
3400 .collect();
3401 (contact.user.github_login.as_str(), worktrees)
3402 })
3403 .collect()
3404 }
3405 }
3406
3407 struct TestServer {
3408 peer: Arc<Peer>,
3409 app_state: Arc<AppState>,
3410 server: Arc<Server>,
3411 foreground: Rc<executor::Foreground>,
3412 notifications: mpsc::Receiver<()>,
3413 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3414 forbid_connections: Arc<AtomicBool>,
3415 _test_db: TestDb,
3416 }
3417
3418 impl TestServer {
3419 async fn start(foreground: Rc<executor::Foreground>) -> Self {
3420 let test_db = TestDb::new();
3421 let app_state = Self::build_app_state(&test_db).await;
3422 let peer = Peer::new();
3423 let notifications = mpsc::channel(128);
3424 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3425 Self {
3426 peer,
3427 app_state,
3428 server,
3429 foreground,
3430 notifications: notifications.1,
3431 connection_killers: Default::default(),
3432 forbid_connections: Default::default(),
3433 _test_db: test_db,
3434 }
3435 }
3436
3437 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3438 let http = FakeHttpClient::with_404_response();
3439 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3440 let client_name = name.to_string();
3441 let mut client = Client::new(http.clone());
3442 let server = self.server.clone();
3443 let connection_killers = self.connection_killers.clone();
3444 let forbid_connections = self.forbid_connections.clone();
3445 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3446
3447 Arc::get_mut(&mut client)
3448 .unwrap()
3449 .override_authenticate(move |cx| {
3450 cx.spawn(|_| async move {
3451 let access_token = "the-token".to_string();
3452 Ok(Credentials {
3453 user_id: user_id.0 as u64,
3454 access_token,
3455 })
3456 })
3457 })
3458 .override_establish_connection(move |credentials, cx| {
3459 assert_eq!(credentials.user_id, user_id.0 as u64);
3460 assert_eq!(credentials.access_token, "the-token");
3461
3462 let server = server.clone();
3463 let connection_killers = connection_killers.clone();
3464 let forbid_connections = forbid_connections.clone();
3465 let client_name = client_name.clone();
3466 let connection_id_tx = connection_id_tx.clone();
3467 cx.spawn(move |cx| async move {
3468 if forbid_connections.load(SeqCst) {
3469 Err(EstablishConnectionError::other(anyhow!(
3470 "server is forbidding connections"
3471 )))
3472 } else {
3473 let (client_conn, server_conn, kill_conn) =
3474 Connection::in_memory(cx.background());
3475 connection_killers.lock().insert(user_id, kill_conn);
3476 cx.background()
3477 .spawn(server.handle_connection(
3478 server_conn,
3479 client_name,
3480 user_id,
3481 Some(connection_id_tx),
3482 ))
3483 .detach();
3484 Ok(client_conn)
3485 }
3486 })
3487 });
3488
3489 client
3490 .authenticate_and_connect(&cx.to_async())
3491 .await
3492 .unwrap();
3493
3494 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3495 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3496 let mut authed_user =
3497 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3498 while authed_user.next().await.unwrap().is_none() {}
3499
3500 TestClient {
3501 client,
3502 peer_id,
3503 user_store,
3504 }
3505 }
3506
3507 fn disconnect_client(&self, user_id: UserId) {
3508 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3509 let _ = kill_conn.try_send(Some(()));
3510 }
3511 }
3512
3513 fn forbid_connections(&self) {
3514 self.forbid_connections.store(true, SeqCst);
3515 }
3516
3517 fn allow_connections(&self) {
3518 self.forbid_connections.store(false, SeqCst);
3519 }
3520
3521 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3522 let mut config = Config::default();
3523 config.session_secret = "a".repeat(32);
3524 config.database_url = test_db.url.clone();
3525 let github_client = github::AppClient::test();
3526 Arc::new(AppState {
3527 db: test_db.db().clone(),
3528 handlebars: Default::default(),
3529 auth_client: auth::build_client("", ""),
3530 repo_client: github::RepoClient::test(&github_client),
3531 github_client,
3532 config,
3533 })
3534 }
3535
3536 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3537 self.server.store.read()
3538 }
3539
3540 async fn condition<F>(&mut self, mut predicate: F)
3541 where
3542 F: FnMut(&Store) -> bool,
3543 {
3544 async_std::future::timeout(Duration::from_millis(500), async {
3545 while !(predicate)(&*self.server.store.read()) {
3546 self.foreground.start_waiting();
3547 self.notifications.next().await;
3548 self.foreground.finish_waiting();
3549 }
3550 })
3551 .await
3552 .expect("condition timed out");
3553 }
3554 }
3555
3556 impl Drop for TestServer {
3557 fn drop(&mut self) {
3558 self.peer.reset();
3559 }
3560 }
3561
3562 struct TestClient {
3563 client: Arc<Client>,
3564 pub peer_id: PeerId,
3565 pub user_store: ModelHandle<UserStore>,
3566 }
3567
3568 impl Deref for TestClient {
3569 type Target = Arc<Client>;
3570
3571 fn deref(&self) -> &Self::Target {
3572 &self.client
3573 }
3574 }
3575
3576 impl TestClient {
3577 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3578 UserId::from_proto(
3579 self.user_store
3580 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3581 )
3582 }
3583 }
3584
3585 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3586 channel
3587 .messages()
3588 .cursor::<()>()
3589 .map(|m| {
3590 (
3591 m.sender.github_login.clone(),
3592 m.body.clone(),
3593 m.is_pending(),
3594 )
3595 })
3596 .collect()
3597 }
3598
3599 struct EmptyView;
3600
3601 impl gpui::Entity for EmptyView {
3602 type Event = ();
3603 }
3604
3605 impl gpui::View for EmptyView {
3606 fn ui_name() -> &'static str {
3607 "empty view"
3608 }
3609
3610 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3611 gpui::Element::boxed(gpui::elements::Empty)
3612 }
3613 }
3614}