1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44const MESSAGE_COUNT_PER_PAGE: usize = 100;
45const MAX_MESSAGE_LEN: usize = 1024;
46const NO_SUCH_PROJECT: &'static str = "no such project";
47
48impl Server {
49 pub fn new(
50 app_state: Arc<AppState>,
51 peer: Arc<Peer>,
52 notifications: Option<mpsc::Sender<()>>,
53 ) -> Arc<Self> {
54 let mut server = Self {
55 peer,
56 app_state,
57 store: Default::default(),
58 handlers: Default::default(),
59 notifications,
60 };
61
62 server
63 .add_handler(Server::ping)
64 .add_handler(Server::register_project)
65 .add_handler(Server::unregister_project)
66 .add_handler(Server::share_project)
67 .add_handler(Server::unshare_project)
68 .add_handler(Server::join_project)
69 .add_handler(Server::leave_project)
70 .add_handler(Server::register_worktree)
71 .add_handler(Server::unregister_worktree)
72 .add_handler(Server::share_worktree)
73 .add_handler(Server::update_worktree)
74 .add_handler(Server::update_diagnostic_summary)
75 .add_handler(Server::disk_based_diagnostics_updating)
76 .add_handler(Server::disk_based_diagnostics_updated)
77 .add_handler(Server::get_definition)
78 .add_handler(Server::open_buffer)
79 .add_handler(Server::close_buffer)
80 .add_handler(Server::update_buffer)
81 .add_handler(Server::update_buffer_file)
82 .add_handler(Server::buffer_reloaded)
83 .add_handler(Server::buffer_saved)
84 .add_handler(Server::save_buffer)
85 .add_handler(Server::format_buffer)
86 .add_handler(Server::get_completions)
87 .add_handler(Server::apply_additional_edits_for_completion)
88 .add_handler(Server::get_channels)
89 .add_handler(Server::get_users)
90 .add_handler(Server::join_channel)
91 .add_handler(Server::leave_channel)
92 .add_handler(Server::send_channel_message)
93 .add_handler(Server::get_channel_messages);
94
95 Arc::new(server)
96 }
97
98 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
99 where
100 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
101 Fut: 'static + Send + Future<Output = tide::Result<()>>,
102 M: EnvelopedMessage,
103 {
104 let prev_handler = self.handlers.insert(
105 TypeId::of::<M>(),
106 Box::new(move |server, envelope| {
107 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
108 (handler)(server, *envelope).boxed()
109 }),
110 );
111 if prev_handler.is_some() {
112 panic!("registered a handler for the same message twice");
113 }
114 self
115 }
116
117 pub fn handle_connection(
118 self: &Arc<Self>,
119 connection: Connection,
120 addr: String,
121 user_id: UserId,
122 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
123 ) -> impl Future<Output = ()> {
124 let mut this = self.clone();
125 async move {
126 let (connection_id, handle_io, mut incoming_rx) =
127 this.peer.add_connection(connection).await;
128
129 if let Some(send_connection_id) = send_connection_id.as_mut() {
130 let _ = send_connection_id.send(connection_id).await;
131 }
132
133 this.state_mut().add_connection(connection_id, user_id);
134 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
135 log::error!("error updating contacts for {:?}: {}", user_id, err);
136 }
137
138 let handle_io = handle_io.fuse();
139 futures::pin_mut!(handle_io);
140 loop {
141 let next_message = incoming_rx.next().fuse();
142 futures::pin_mut!(next_message);
143 futures::select_biased! {
144 result = handle_io => {
145 if let Err(err) = result {
146 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
147 }
148 break;
149 }
150 message = next_message => {
151 if let Some(message) = message {
152 let start_time = Instant::now();
153 let type_name = message.payload_type_name();
154 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
155 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
156 if let Err(err) = (handler)(this.clone(), message).await {
157 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
158 } else {
159 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
160 }
161
162 if let Some(mut notifications) = this.notifications.clone() {
163 let _ = notifications.send(()).await;
164 }
165 } else {
166 log::warn!("unhandled message: {}", type_name);
167 }
168 } else {
169 log::info!("rpc connection closed {:?}", addr);
170 break;
171 }
172 }
173 }
174 }
175
176 if let Err(err) = this.sign_out(connection_id).await {
177 log::error!("error signing out connection {:?} - {:?}", addr, err);
178 }
179 }
180 }
181
182 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
183 self.peer.disconnect(connection_id);
184 let removed_connection = self.state_mut().remove_connection(connection_id)?;
185
186 for (project_id, project) in removed_connection.hosted_projects {
187 if let Some(share) = project.share {
188 broadcast(
189 connection_id,
190 share.guests.keys().copied().collect(),
191 |conn_id| {
192 self.peer
193 .send(conn_id, proto::UnshareProject { project_id })
194 },
195 )?;
196 }
197 }
198
199 for (project_id, peer_ids) in removed_connection.guest_project_ids {
200 broadcast(connection_id, peer_ids, |conn_id| {
201 self.peer.send(
202 conn_id,
203 proto::RemoveProjectCollaborator {
204 project_id,
205 peer_id: connection_id.0,
206 },
207 )
208 })?;
209 }
210
211 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
212 Ok(())
213 }
214
215 async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
216 self.peer.respond(request.receipt(), proto::Ack {})?;
217 Ok(())
218 }
219
220 async fn register_project(
221 mut self: Arc<Server>,
222 request: TypedEnvelope<proto::RegisterProject>,
223 ) -> tide::Result<()> {
224 let project_id = {
225 let mut state = self.state_mut();
226 let user_id = state.user_id_for_connection(request.sender_id)?;
227 state.register_project(request.sender_id, user_id)
228 };
229 self.peer.respond(
230 request.receipt(),
231 proto::RegisterProjectResponse { project_id },
232 )?;
233 Ok(())
234 }
235
236 async fn unregister_project(
237 mut self: Arc<Server>,
238 request: TypedEnvelope<proto::UnregisterProject>,
239 ) -> tide::Result<()> {
240 let project = self
241 .state_mut()
242 .unregister_project(request.payload.project_id, request.sender_id)
243 .ok_or_else(|| anyhow!("no such project"))?;
244 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
245 Ok(())
246 }
247
248 async fn share_project(
249 mut self: Arc<Server>,
250 request: TypedEnvelope<proto::ShareProject>,
251 ) -> tide::Result<()> {
252 self.state_mut()
253 .share_project(request.payload.project_id, request.sender_id);
254 self.peer.respond(request.receipt(), proto::Ack {})?;
255 Ok(())
256 }
257
258 async fn unshare_project(
259 mut self: Arc<Server>,
260 request: TypedEnvelope<proto::UnshareProject>,
261 ) -> tide::Result<()> {
262 let project_id = request.payload.project_id;
263 let project = self
264 .state_mut()
265 .unshare_project(project_id, request.sender_id)?;
266
267 broadcast(request.sender_id, project.connection_ids, |conn_id| {
268 self.peer
269 .send(conn_id, proto::UnshareProject { project_id })
270 })?;
271 self.update_contacts_for_users(&project.authorized_user_ids)?;
272 Ok(())
273 }
274
275 async fn join_project(
276 mut self: Arc<Server>,
277 request: TypedEnvelope<proto::JoinProject>,
278 ) -> tide::Result<()> {
279 let project_id = request.payload.project_id;
280
281 let user_id = self.state().user_id_for_connection(request.sender_id)?;
282 let response_data = self
283 .state_mut()
284 .join_project(request.sender_id, user_id, project_id)
285 .and_then(|joined| {
286 let share = joined.project.share()?;
287 let peer_count = share.guests.len();
288 let mut collaborators = Vec::with_capacity(peer_count);
289 collaborators.push(proto::Collaborator {
290 peer_id: joined.project.host_connection_id.0,
291 replica_id: 0,
292 user_id: joined.project.host_user_id.to_proto(),
293 });
294 let worktrees = joined
295 .project
296 .worktrees
297 .iter()
298 .filter_map(|(id, worktree)| {
299 worktree.share.as_ref().map(|share| proto::Worktree {
300 id: *id,
301 root_name: worktree.root_name.clone(),
302 entries: share.entries.values().cloned().collect(),
303 diagnostic_summaries: share
304 .diagnostic_summaries
305 .values()
306 .cloned()
307 .collect(),
308 weak: worktree.weak,
309 })
310 })
311 .collect();
312 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
313 if *peer_conn_id != request.sender_id {
314 collaborators.push(proto::Collaborator {
315 peer_id: peer_conn_id.0,
316 replica_id: *peer_replica_id as u32,
317 user_id: peer_user_id.to_proto(),
318 });
319 }
320 }
321 let response = proto::JoinProjectResponse {
322 worktrees,
323 replica_id: joined.replica_id as u32,
324 collaborators,
325 };
326 let connection_ids = joined.project.connection_ids();
327 let contact_user_ids = joined.project.authorized_user_ids();
328 Ok((response, connection_ids, contact_user_ids))
329 });
330
331 match response_data {
332 Ok((response, connection_ids, contact_user_ids)) => {
333 broadcast(request.sender_id, connection_ids, |conn_id| {
334 self.peer.send(
335 conn_id,
336 proto::AddProjectCollaborator {
337 project_id,
338 collaborator: Some(proto::Collaborator {
339 peer_id: request.sender_id.0,
340 replica_id: response.replica_id,
341 user_id: user_id.to_proto(),
342 }),
343 },
344 )
345 })?;
346 self.peer.respond(request.receipt(), response)?;
347 self.update_contacts_for_users(&contact_user_ids)?;
348 }
349 Err(error) => {
350 self.peer.respond_with_error(
351 request.receipt(),
352 proto::Error {
353 message: error.to_string(),
354 },
355 )?;
356 }
357 }
358
359 Ok(())
360 }
361
362 async fn leave_project(
363 mut self: Arc<Server>,
364 request: TypedEnvelope<proto::LeaveProject>,
365 ) -> tide::Result<()> {
366 let sender_id = request.sender_id;
367 let project_id = request.payload.project_id;
368 let worktree = self.state_mut().leave_project(sender_id, project_id);
369 if let Some(worktree) = worktree {
370 broadcast(sender_id, worktree.connection_ids, |conn_id| {
371 self.peer.send(
372 conn_id,
373 proto::RemoveProjectCollaborator {
374 project_id,
375 peer_id: sender_id.0,
376 },
377 )
378 })?;
379 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
380 }
381 Ok(())
382 }
383
384 async fn register_worktree(
385 mut self: Arc<Server>,
386 request: TypedEnvelope<proto::RegisterWorktree>,
387 ) -> tide::Result<()> {
388 let receipt = request.receipt();
389 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
390
391 let mut contact_user_ids = HashSet::default();
392 contact_user_ids.insert(host_user_id);
393 for github_login in request.payload.authorized_logins {
394 match self.app_state.db.create_user(&github_login, false).await {
395 Ok(contact_user_id) => {
396 contact_user_ids.insert(contact_user_id);
397 }
398 Err(err) => {
399 let message = err.to_string();
400 self.peer
401 .respond_with_error(receipt, proto::Error { message })?;
402 return Ok(());
403 }
404 }
405 }
406
407 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
408 let ok = self.state_mut().register_worktree(
409 request.payload.project_id,
410 request.payload.worktree_id,
411 Worktree {
412 authorized_user_ids: contact_user_ids.clone(),
413 root_name: request.payload.root_name,
414 share: None,
415 weak: false,
416 },
417 );
418
419 if ok {
420 self.peer.respond(receipt, proto::Ack {})?;
421 self.update_contacts_for_users(&contact_user_ids)?;
422 } else {
423 self.peer.respond_with_error(
424 receipt,
425 proto::Error {
426 message: NO_SUCH_PROJECT.to_string(),
427 },
428 )?;
429 }
430
431 Ok(())
432 }
433
434 async fn unregister_worktree(
435 mut self: Arc<Server>,
436 request: TypedEnvelope<proto::UnregisterWorktree>,
437 ) -> tide::Result<()> {
438 let project_id = request.payload.project_id;
439 let worktree_id = request.payload.worktree_id;
440 let (worktree, guest_connection_ids) =
441 self.state_mut()
442 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
443 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
444 self.peer.send(
445 conn_id,
446 proto::UnregisterWorktree {
447 project_id,
448 worktree_id,
449 },
450 )
451 })?;
452 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
453 Ok(())
454 }
455
456 async fn share_worktree(
457 mut self: Arc<Server>,
458 mut request: TypedEnvelope<proto::ShareWorktree>,
459 ) -> tide::Result<()> {
460 let worktree = request
461 .payload
462 .worktree
463 .as_mut()
464 .ok_or_else(|| anyhow!("missing worktree"))?;
465 let entries = worktree
466 .entries
467 .iter()
468 .map(|entry| (entry.id, entry.clone()))
469 .collect();
470 let diagnostic_summaries = worktree
471 .diagnostic_summaries
472 .iter()
473 .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
474 .collect();
475
476 let shared_worktree = self.state_mut().share_worktree(
477 request.payload.project_id,
478 worktree.id,
479 request.sender_id,
480 entries,
481 diagnostic_summaries,
482 );
483 if let Some(shared_worktree) = shared_worktree {
484 broadcast(
485 request.sender_id,
486 shared_worktree.connection_ids,
487 |connection_id| {
488 self.peer.forward_send(
489 request.sender_id,
490 connection_id,
491 request.payload.clone(),
492 )
493 },
494 )?;
495 self.peer.respond(request.receipt(), proto::Ack {})?;
496 self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
497 } else {
498 self.peer.respond_with_error(
499 request.receipt(),
500 proto::Error {
501 message: "no such worktree".to_string(),
502 },
503 )?;
504 }
505 Ok(())
506 }
507
508 async fn update_worktree(
509 mut self: Arc<Server>,
510 request: TypedEnvelope<proto::UpdateWorktree>,
511 ) -> tide::Result<()> {
512 let connection_ids = self
513 .state_mut()
514 .update_worktree(
515 request.sender_id,
516 request.payload.project_id,
517 request.payload.worktree_id,
518 &request.payload.removed_entries,
519 &request.payload.updated_entries,
520 )
521 .ok_or_else(|| anyhow!("no such worktree"))?;
522
523 broadcast(request.sender_id, connection_ids, |connection_id| {
524 self.peer
525 .forward_send(request.sender_id, connection_id, request.payload.clone())
526 })?;
527
528 Ok(())
529 }
530
531 async fn update_diagnostic_summary(
532 mut self: Arc<Server>,
533 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
534 ) -> tide::Result<()> {
535 let receiver_ids = request
536 .payload
537 .summary
538 .clone()
539 .and_then(|summary| {
540 self.state_mut().update_diagnostic_summary(
541 request.payload.project_id,
542 request.payload.worktree_id,
543 request.sender_id,
544 summary,
545 )
546 })
547 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
548
549 broadcast(request.sender_id, receiver_ids, |connection_id| {
550 self.peer
551 .forward_send(request.sender_id, connection_id, request.payload.clone())
552 })?;
553 Ok(())
554 }
555
556 async fn disk_based_diagnostics_updating(
557 self: Arc<Server>,
558 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
559 ) -> tide::Result<()> {
560 let receiver_ids = self
561 .state()
562 .project_connection_ids(request.payload.project_id, request.sender_id)
563 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
564 broadcast(request.sender_id, receiver_ids, |connection_id| {
565 self.peer
566 .forward_send(request.sender_id, connection_id, request.payload.clone())
567 })?;
568 Ok(())
569 }
570
571 async fn disk_based_diagnostics_updated(
572 self: Arc<Server>,
573 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
574 ) -> tide::Result<()> {
575 let receiver_ids = self
576 .state()
577 .project_connection_ids(request.payload.project_id, request.sender_id)
578 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
579 broadcast(request.sender_id, receiver_ids, |connection_id| {
580 self.peer
581 .forward_send(request.sender_id, connection_id, request.payload.clone())
582 })?;
583 Ok(())
584 }
585
586 async fn get_definition(
587 self: Arc<Server>,
588 request: TypedEnvelope<proto::GetDefinition>,
589 ) -> tide::Result<()> {
590 let receipt = request.receipt();
591 let host_connection_id = self
592 .state()
593 .read_project(request.payload.project_id, request.sender_id)
594 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
595 .host_connection_id;
596 let response = self
597 .peer
598 .forward_request(request.sender_id, host_connection_id, request.payload)
599 .await?;
600 self.peer.respond(receipt, response)?;
601 Ok(())
602 }
603
604 async fn open_buffer(
605 self: Arc<Server>,
606 request: TypedEnvelope<proto::OpenBuffer>,
607 ) -> tide::Result<()> {
608 let receipt = request.receipt();
609 let host_connection_id = self
610 .state()
611 .read_project(request.payload.project_id, request.sender_id)
612 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
613 .host_connection_id;
614 let response = self
615 .peer
616 .forward_request(request.sender_id, host_connection_id, request.payload)
617 .await?;
618 self.peer.respond(receipt, response)?;
619 Ok(())
620 }
621
622 async fn close_buffer(
623 self: Arc<Server>,
624 request: TypedEnvelope<proto::CloseBuffer>,
625 ) -> tide::Result<()> {
626 let host_connection_id = self
627 .state()
628 .read_project(request.payload.project_id, request.sender_id)
629 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
630 .host_connection_id;
631 self.peer
632 .forward_send(request.sender_id, host_connection_id, request.payload)?;
633 Ok(())
634 }
635
636 async fn save_buffer(
637 self: Arc<Server>,
638 request: TypedEnvelope<proto::SaveBuffer>,
639 ) -> tide::Result<()> {
640 let host;
641 let guests;
642 {
643 let state = self.state();
644 let project = state
645 .read_project(request.payload.project_id, request.sender_id)
646 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
647 host = project.host_connection_id;
648 guests = project.guest_connection_ids()
649 }
650
651 let sender = request.sender_id;
652 let receipt = request.receipt();
653 let response = self
654 .peer
655 .forward_request(sender, host, request.payload.clone())
656 .await?;
657
658 broadcast(host, guests, |conn_id| {
659 let response = response.clone();
660 if conn_id == sender {
661 self.peer.respond(receipt, response)
662 } else {
663 self.peer.forward_send(host, conn_id, response)
664 }
665 })?;
666
667 Ok(())
668 }
669
670 async fn format_buffer(
671 self: Arc<Server>,
672 request: TypedEnvelope<proto::FormatBuffer>,
673 ) -> tide::Result<()> {
674 let host;
675 {
676 let state = self.state();
677 let project = state
678 .read_project(request.payload.project_id, request.sender_id)
679 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
680 host = project.host_connection_id;
681 }
682
683 let sender = request.sender_id;
684 let receipt = request.receipt();
685 let response = self
686 .peer
687 .forward_request(sender, host, request.payload.clone())
688 .await?;
689 self.peer.respond(receipt, response)?;
690
691 Ok(())
692 }
693
694 async fn get_completions(
695 self: Arc<Server>,
696 request: TypedEnvelope<proto::GetCompletions>,
697 ) -> tide::Result<()> {
698 let host;
699 {
700 let state = self.state();
701 let project = state
702 .read_project(request.payload.project_id, request.sender_id)
703 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
704 host = project.host_connection_id;
705 }
706
707 let sender = request.sender_id;
708 let receipt = request.receipt();
709 let response = self
710 .peer
711 .forward_request(sender, host, request.payload.clone())
712 .await?;
713 self.peer.respond(receipt, response)?;
714 Ok(())
715 }
716
717 async fn apply_additional_edits_for_completion(
718 self: Arc<Server>,
719 request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
720 ) -> tide::Result<()> {
721 let host;
722 {
723 let state = self.state();
724 let project = state
725 .read_project(request.payload.project_id, request.sender_id)
726 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
727 host = project.host_connection_id;
728 }
729
730 let sender = request.sender_id;
731 let receipt = request.receipt();
732 let response = self
733 .peer
734 .forward_request(sender, host, request.payload.clone())
735 .await?;
736 self.peer.respond(receipt, response)?;
737 Ok(())
738 }
739
740 async fn update_buffer(
741 self: Arc<Server>,
742 request: TypedEnvelope<proto::UpdateBuffer>,
743 ) -> tide::Result<()> {
744 let receiver_ids = self
745 .state()
746 .project_connection_ids(request.payload.project_id, request.sender_id)
747 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
748 broadcast(request.sender_id, receiver_ids, |connection_id| {
749 self.peer
750 .forward_send(request.sender_id, connection_id, request.payload.clone())
751 })?;
752 self.peer.respond(request.receipt(), proto::Ack {})?;
753 Ok(())
754 }
755
756 async fn update_buffer_file(
757 self: Arc<Server>,
758 request: TypedEnvelope<proto::UpdateBufferFile>,
759 ) -> tide::Result<()> {
760 let receiver_ids = self
761 .state()
762 .project_connection_ids(request.payload.project_id, request.sender_id)
763 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
764 broadcast(request.sender_id, receiver_ids, |connection_id| {
765 self.peer
766 .forward_send(request.sender_id, connection_id, request.payload.clone())
767 })?;
768 Ok(())
769 }
770
771 async fn buffer_reloaded(
772 self: Arc<Server>,
773 request: TypedEnvelope<proto::BufferReloaded>,
774 ) -> tide::Result<()> {
775 let receiver_ids = self
776 .state()
777 .project_connection_ids(request.payload.project_id, request.sender_id)
778 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
779 broadcast(request.sender_id, receiver_ids, |connection_id| {
780 self.peer
781 .forward_send(request.sender_id, connection_id, request.payload.clone())
782 })?;
783 Ok(())
784 }
785
786 async fn buffer_saved(
787 self: Arc<Server>,
788 request: TypedEnvelope<proto::BufferSaved>,
789 ) -> tide::Result<()> {
790 let receiver_ids = self
791 .state()
792 .project_connection_ids(request.payload.project_id, request.sender_id)
793 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
794 broadcast(request.sender_id, receiver_ids, |connection_id| {
795 self.peer
796 .forward_send(request.sender_id, connection_id, request.payload.clone())
797 })?;
798 Ok(())
799 }
800
801 async fn get_channels(
802 self: Arc<Server>,
803 request: TypedEnvelope<proto::GetChannels>,
804 ) -> tide::Result<()> {
805 let user_id = self.state().user_id_for_connection(request.sender_id)?;
806 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
807 self.peer.respond(
808 request.receipt(),
809 proto::GetChannelsResponse {
810 channels: channels
811 .into_iter()
812 .map(|chan| proto::Channel {
813 id: chan.id.to_proto(),
814 name: chan.name,
815 })
816 .collect(),
817 },
818 )?;
819 Ok(())
820 }
821
822 async fn get_users(
823 self: Arc<Server>,
824 request: TypedEnvelope<proto::GetUsers>,
825 ) -> tide::Result<()> {
826 let receipt = request.receipt();
827 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
828 let users = self
829 .app_state
830 .db
831 .get_users_by_ids(user_ids)
832 .await?
833 .into_iter()
834 .map(|user| proto::User {
835 id: user.id.to_proto(),
836 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
837 github_login: user.github_login,
838 })
839 .collect();
840 self.peer
841 .respond(receipt, proto::GetUsersResponse { users })?;
842 Ok(())
843 }
844
845 fn update_contacts_for_users<'a>(
846 self: &Arc<Server>,
847 user_ids: impl IntoIterator<Item = &'a UserId>,
848 ) -> anyhow::Result<()> {
849 let mut result = Ok(());
850 let state = self.state();
851 for user_id in user_ids {
852 let contacts = state.contacts_for_user(*user_id);
853 for connection_id in state.connection_ids_for_user(*user_id) {
854 if let Err(error) = self.peer.send(
855 connection_id,
856 proto::UpdateContacts {
857 contacts: contacts.clone(),
858 },
859 ) {
860 result = Err(error);
861 }
862 }
863 }
864 result
865 }
866
867 async fn join_channel(
868 mut self: Arc<Self>,
869 request: TypedEnvelope<proto::JoinChannel>,
870 ) -> tide::Result<()> {
871 let user_id = self.state().user_id_for_connection(request.sender_id)?;
872 let channel_id = ChannelId::from_proto(request.payload.channel_id);
873 if !self
874 .app_state
875 .db
876 .can_user_access_channel(user_id, channel_id)
877 .await?
878 {
879 Err(anyhow!("access denied"))?;
880 }
881
882 self.state_mut().join_channel(request.sender_id, channel_id);
883 let messages = self
884 .app_state
885 .db
886 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
887 .await?
888 .into_iter()
889 .map(|msg| proto::ChannelMessage {
890 id: msg.id.to_proto(),
891 body: msg.body,
892 timestamp: msg.sent_at.unix_timestamp() as u64,
893 sender_id: msg.sender_id.to_proto(),
894 nonce: Some(msg.nonce.as_u128().into()),
895 })
896 .collect::<Vec<_>>();
897 self.peer.respond(
898 request.receipt(),
899 proto::JoinChannelResponse {
900 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
901 messages,
902 },
903 )?;
904 Ok(())
905 }
906
907 async fn leave_channel(
908 mut self: Arc<Self>,
909 request: TypedEnvelope<proto::LeaveChannel>,
910 ) -> tide::Result<()> {
911 let user_id = self.state().user_id_for_connection(request.sender_id)?;
912 let channel_id = ChannelId::from_proto(request.payload.channel_id);
913 if !self
914 .app_state
915 .db
916 .can_user_access_channel(user_id, channel_id)
917 .await?
918 {
919 Err(anyhow!("access denied"))?;
920 }
921
922 self.state_mut()
923 .leave_channel(request.sender_id, channel_id);
924
925 Ok(())
926 }
927
928 async fn send_channel_message(
929 self: Arc<Self>,
930 request: TypedEnvelope<proto::SendChannelMessage>,
931 ) -> tide::Result<()> {
932 let receipt = request.receipt();
933 let channel_id = ChannelId::from_proto(request.payload.channel_id);
934 let user_id;
935 let connection_ids;
936 {
937 let state = self.state();
938 user_id = state.user_id_for_connection(request.sender_id)?;
939 if let Some(ids) = state.channel_connection_ids(channel_id) {
940 connection_ids = ids;
941 } else {
942 return Ok(());
943 }
944 }
945
946 // Validate the message body.
947 let body = request.payload.body.trim().to_string();
948 if body.len() > MAX_MESSAGE_LEN {
949 self.peer.respond_with_error(
950 receipt,
951 proto::Error {
952 message: "message is too long".to_string(),
953 },
954 )?;
955 return Ok(());
956 }
957 if body.is_empty() {
958 self.peer.respond_with_error(
959 receipt,
960 proto::Error {
961 message: "message can't be blank".to_string(),
962 },
963 )?;
964 return Ok(());
965 }
966
967 let timestamp = OffsetDateTime::now_utc();
968 let nonce = if let Some(nonce) = request.payload.nonce {
969 nonce
970 } else {
971 self.peer.respond_with_error(
972 receipt,
973 proto::Error {
974 message: "nonce can't be blank".to_string(),
975 },
976 )?;
977 return Ok(());
978 };
979
980 let message_id = self
981 .app_state
982 .db
983 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
984 .await?
985 .to_proto();
986 let message = proto::ChannelMessage {
987 sender_id: user_id.to_proto(),
988 id: message_id,
989 body,
990 timestamp: timestamp.unix_timestamp() as u64,
991 nonce: Some(nonce),
992 };
993 broadcast(request.sender_id, connection_ids, |conn_id| {
994 self.peer.send(
995 conn_id,
996 proto::ChannelMessageSent {
997 channel_id: channel_id.to_proto(),
998 message: Some(message.clone()),
999 },
1000 )
1001 })?;
1002 self.peer.respond(
1003 receipt,
1004 proto::SendChannelMessageResponse {
1005 message: Some(message),
1006 },
1007 )?;
1008 Ok(())
1009 }
1010
1011 async fn get_channel_messages(
1012 self: Arc<Self>,
1013 request: TypedEnvelope<proto::GetChannelMessages>,
1014 ) -> tide::Result<()> {
1015 let user_id = self.state().user_id_for_connection(request.sender_id)?;
1016 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1017 if !self
1018 .app_state
1019 .db
1020 .can_user_access_channel(user_id, channel_id)
1021 .await?
1022 {
1023 Err(anyhow!("access denied"))?;
1024 }
1025
1026 let messages = self
1027 .app_state
1028 .db
1029 .get_channel_messages(
1030 channel_id,
1031 MESSAGE_COUNT_PER_PAGE,
1032 Some(MessageId::from_proto(request.payload.before_message_id)),
1033 )
1034 .await?
1035 .into_iter()
1036 .map(|msg| proto::ChannelMessage {
1037 id: msg.id.to_proto(),
1038 body: msg.body,
1039 timestamp: msg.sent_at.unix_timestamp() as u64,
1040 sender_id: msg.sender_id.to_proto(),
1041 nonce: Some(msg.nonce.as_u128().into()),
1042 })
1043 .collect::<Vec<_>>();
1044 self.peer.respond(
1045 request.receipt(),
1046 proto::GetChannelMessagesResponse {
1047 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1048 messages,
1049 },
1050 )?;
1051 Ok(())
1052 }
1053
1054 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1055 self.store.read()
1056 }
1057
1058 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1059 self.store.write()
1060 }
1061}
1062
1063fn broadcast<F>(
1064 sender_id: ConnectionId,
1065 receiver_ids: Vec<ConnectionId>,
1066 mut f: F,
1067) -> anyhow::Result<()>
1068where
1069 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1070{
1071 let mut result = Ok(());
1072 for receiver_id in receiver_ids {
1073 if receiver_id != sender_id {
1074 if let Err(error) = f(receiver_id) {
1075 if result.is_ok() {
1076 result = Err(error);
1077 }
1078 }
1079 }
1080 }
1081 result
1082}
1083
1084pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1085 let server = Server::new(app.state().clone(), rpc.clone(), None);
1086 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1087 let server = server.clone();
1088 async move {
1089 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1090
1091 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1092 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1093 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1094 let client_protocol_version: Option<u32> = request
1095 .header("X-Zed-Protocol-Version")
1096 .and_then(|v| v.as_str().parse().ok());
1097
1098 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1099 return Ok(Response::new(StatusCode::UpgradeRequired));
1100 }
1101
1102 let header = match request.header("Sec-Websocket-Key") {
1103 Some(h) => h.as_str(),
1104 None => return Err(anyhow!("expected sec-websocket-key"))?,
1105 };
1106
1107 let user_id = process_auth_header(&request).await?;
1108
1109 let mut response = Response::new(StatusCode::SwitchingProtocols);
1110 response.insert_header(UPGRADE, "websocket");
1111 response.insert_header(CONNECTION, "Upgrade");
1112 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1113 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1114 response.insert_header("Sec-Websocket-Version", "13");
1115
1116 let http_res: &mut tide::http::Response = response.as_mut();
1117 let upgrade_receiver = http_res.recv_upgrade().await;
1118 let addr = request.remote().unwrap_or("unknown").to_string();
1119 task::spawn(async move {
1120 if let Some(stream) = upgrade_receiver.await {
1121 server
1122 .handle_connection(
1123 Connection::new(
1124 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1125 ),
1126 addr,
1127 user_id,
1128 None,
1129 )
1130 .await;
1131 }
1132 });
1133
1134 Ok(response)
1135 }
1136 });
1137}
1138
1139fn header_contains_ignore_case<T>(
1140 request: &tide::Request<T>,
1141 header_name: HeaderName,
1142 value: &str,
1143) -> bool {
1144 request
1145 .header(header_name)
1146 .map(|h| {
1147 h.as_str()
1148 .split(',')
1149 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1150 })
1151 .unwrap_or(false)
1152}
1153
1154#[cfg(test)]
1155mod tests {
1156 use super::*;
1157 use crate::{
1158 auth,
1159 db::{tests::TestDb, UserId},
1160 github, AppState, Config,
1161 };
1162 use ::rpc::Peer;
1163 use async_std::task;
1164 use gpui::{executor, ModelHandle, TestAppContext};
1165 use parking_lot::Mutex;
1166 use postage::{mpsc, watch};
1167 use rand::prelude::*;
1168 use rpc::PeerId;
1169 use serde_json::json;
1170 use sqlx::types::time::OffsetDateTime;
1171 use std::{
1172 ops::Deref,
1173 path::Path,
1174 rc::Rc,
1175 sync::{
1176 atomic::{AtomicBool, Ordering::SeqCst},
1177 Arc,
1178 },
1179 time::Duration,
1180 };
1181 use zed::{
1182 client::{
1183 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1184 EstablishConnectionError, UserStore,
1185 },
1186 editor::{ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer},
1187 fs::{FakeFs, Fs as _},
1188 language::{
1189 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1190 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1191 },
1192 lsp,
1193 project::{DiagnosticSummary, Project, ProjectPath},
1194 };
1195
1196 #[cfg(test)]
1197 #[ctor::ctor]
1198 fn init_logger() {
1199 // std::env::set_var("RUST_LOG", "info");
1200 env_logger::init();
1201 }
1202
1203 #[gpui::test]
1204 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1205 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1206 let lang_registry = Arc::new(LanguageRegistry::new());
1207 let fs = Arc::new(FakeFs::new(cx_a.background()));
1208 cx_a.foreground().forbid_parking();
1209
1210 // Connect to a server as 2 clients.
1211 let mut server = TestServer::start(cx_a.foreground()).await;
1212 let client_a = server.create_client(&mut cx_a, "user_a").await;
1213 let client_b = server.create_client(&mut cx_b, "user_b").await;
1214
1215 // Share a project as client A
1216 fs.insert_tree(
1217 "/a",
1218 json!({
1219 ".zed.toml": r#"collaborators = ["user_b"]"#,
1220 "a.txt": "a-contents",
1221 "b.txt": "b-contents",
1222 }),
1223 )
1224 .await;
1225 let project_a = cx_a.update(|cx| {
1226 Project::local(
1227 client_a.clone(),
1228 client_a.user_store.clone(),
1229 lang_registry.clone(),
1230 fs.clone(),
1231 cx,
1232 )
1233 });
1234 let (worktree_a, _) = project_a
1235 .update(&mut cx_a, |p, cx| {
1236 p.find_or_create_local_worktree("/a", false, cx)
1237 })
1238 .await
1239 .unwrap();
1240 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1241 worktree_a
1242 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1243 .await;
1244 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1245 project_a
1246 .update(&mut cx_a, |p, cx| p.share(cx))
1247 .await
1248 .unwrap();
1249
1250 // Join that project as client B
1251 let project_b = Project::remote(
1252 project_id,
1253 client_b.clone(),
1254 client_b.user_store.clone(),
1255 lang_registry.clone(),
1256 fs.clone(),
1257 &mut cx_b.to_async(),
1258 )
1259 .await
1260 .unwrap();
1261
1262 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1263 assert_eq!(
1264 project
1265 .collaborators()
1266 .get(&client_a.peer_id)
1267 .unwrap()
1268 .user
1269 .github_login,
1270 "user_a"
1271 );
1272 project.replica_id()
1273 });
1274 project_a
1275 .condition(&cx_a, |tree, _| {
1276 tree.collaborators()
1277 .get(&client_b.peer_id)
1278 .map_or(false, |collaborator| {
1279 collaborator.replica_id == replica_id_b
1280 && collaborator.user.github_login == "user_b"
1281 })
1282 })
1283 .await;
1284
1285 // Open the same file as client B and client A.
1286 let buffer_b = project_b
1287 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1288 .await
1289 .unwrap();
1290 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1291 buffer_b.read_with(&cx_b, |buf, cx| {
1292 assert_eq!(buf.read(cx).text(), "b-contents")
1293 });
1294 project_a.read_with(&cx_a, |project, cx| {
1295 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1296 });
1297 let buffer_a = project_a
1298 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1299 .await
1300 .unwrap();
1301
1302 let editor_b = cx_b.add_view(window_b, |cx| {
1303 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1304 });
1305
1306 // TODO
1307 // // Create a selection set as client B and see that selection set as client A.
1308 // buffer_a
1309 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1310 // .await;
1311
1312 // Edit the buffer as client B and see that edit as client A.
1313 editor_b.update(&mut cx_b, |editor, cx| {
1314 editor.handle_input(&Input("ok, ".into()), cx)
1315 });
1316 buffer_a
1317 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1318 .await;
1319
1320 // TODO
1321 // // Remove the selection set as client B, see those selections disappear as client A.
1322 cx_b.update(move |_| drop(editor_b));
1323 // buffer_a
1324 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1325 // .await;
1326
1327 // Close the buffer as client A, see that the buffer is closed.
1328 cx_a.update(move |_| drop(buffer_a));
1329 project_a
1330 .condition(&cx_a, |project, cx| {
1331 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1332 })
1333 .await;
1334
1335 // Dropping the client B's project removes client B from client A's collaborators.
1336 cx_b.update(move |_| drop(project_b));
1337 project_a
1338 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1339 .await;
1340 }
1341
1342 #[gpui::test]
1343 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1344 let lang_registry = Arc::new(LanguageRegistry::new());
1345 let fs = Arc::new(FakeFs::new(cx_a.background()));
1346 cx_a.foreground().forbid_parking();
1347
1348 // Connect to a server as 2 clients.
1349 let mut server = TestServer::start(cx_a.foreground()).await;
1350 let client_a = server.create_client(&mut cx_a, "user_a").await;
1351 let client_b = server.create_client(&mut cx_b, "user_b").await;
1352
1353 // Share a project as client A
1354 fs.insert_tree(
1355 "/a",
1356 json!({
1357 ".zed.toml": r#"collaborators = ["user_b"]"#,
1358 "a.txt": "a-contents",
1359 "b.txt": "b-contents",
1360 }),
1361 )
1362 .await;
1363 let project_a = cx_a.update(|cx| {
1364 Project::local(
1365 client_a.clone(),
1366 client_a.user_store.clone(),
1367 lang_registry.clone(),
1368 fs.clone(),
1369 cx,
1370 )
1371 });
1372 let (worktree_a, _) = project_a
1373 .update(&mut cx_a, |p, cx| {
1374 p.find_or_create_local_worktree("/a", false, cx)
1375 })
1376 .await
1377 .unwrap();
1378 worktree_a
1379 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1380 .await;
1381 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1382 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1383 project_a
1384 .update(&mut cx_a, |p, cx| p.share(cx))
1385 .await
1386 .unwrap();
1387 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1388
1389 // Join that project as client B
1390 let project_b = Project::remote(
1391 project_id,
1392 client_b.clone(),
1393 client_b.user_store.clone(),
1394 lang_registry.clone(),
1395 fs.clone(),
1396 &mut cx_b.to_async(),
1397 )
1398 .await
1399 .unwrap();
1400 project_b
1401 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1402 .await
1403 .unwrap();
1404
1405 // Unshare the project as client A
1406 project_a
1407 .update(&mut cx_a, |project, cx| project.unshare(cx))
1408 .await
1409 .unwrap();
1410 project_b
1411 .condition(&mut cx_b, |project, _| project.is_read_only())
1412 .await;
1413 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1414 drop(project_b);
1415
1416 // Share the project again and ensure guests can still join.
1417 project_a
1418 .update(&mut cx_a, |project, cx| project.share(cx))
1419 .await
1420 .unwrap();
1421 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1422
1423 let project_c = Project::remote(
1424 project_id,
1425 client_b.clone(),
1426 client_b.user_store.clone(),
1427 lang_registry.clone(),
1428 fs.clone(),
1429 &mut cx_b.to_async(),
1430 )
1431 .await
1432 .unwrap();
1433 project_c
1434 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1435 .await
1436 .unwrap();
1437 }
1438
1439 #[gpui::test]
1440 async fn test_propagate_saves_and_fs_changes(
1441 mut cx_a: TestAppContext,
1442 mut cx_b: TestAppContext,
1443 mut cx_c: TestAppContext,
1444 ) {
1445 let lang_registry = Arc::new(LanguageRegistry::new());
1446 let fs = Arc::new(FakeFs::new(cx_a.background()));
1447 cx_a.foreground().forbid_parking();
1448
1449 // Connect to a server as 3 clients.
1450 let mut server = TestServer::start(cx_a.foreground()).await;
1451 let client_a = server.create_client(&mut cx_a, "user_a").await;
1452 let client_b = server.create_client(&mut cx_b, "user_b").await;
1453 let client_c = server.create_client(&mut cx_c, "user_c").await;
1454
1455 // Share a worktree as client A.
1456 fs.insert_tree(
1457 "/a",
1458 json!({
1459 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1460 "file1": "",
1461 "file2": ""
1462 }),
1463 )
1464 .await;
1465 let project_a = cx_a.update(|cx| {
1466 Project::local(
1467 client_a.clone(),
1468 client_a.user_store.clone(),
1469 lang_registry.clone(),
1470 fs.clone(),
1471 cx,
1472 )
1473 });
1474 let (worktree_a, _) = project_a
1475 .update(&mut cx_a, |p, cx| {
1476 p.find_or_create_local_worktree("/a", false, cx)
1477 })
1478 .await
1479 .unwrap();
1480 worktree_a
1481 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1482 .await;
1483 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1484 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1485 project_a
1486 .update(&mut cx_a, |p, cx| p.share(cx))
1487 .await
1488 .unwrap();
1489
1490 // Join that worktree as clients B and C.
1491 let project_b = Project::remote(
1492 project_id,
1493 client_b.clone(),
1494 client_b.user_store.clone(),
1495 lang_registry.clone(),
1496 fs.clone(),
1497 &mut cx_b.to_async(),
1498 )
1499 .await
1500 .unwrap();
1501 let project_c = Project::remote(
1502 project_id,
1503 client_c.clone(),
1504 client_c.user_store.clone(),
1505 lang_registry.clone(),
1506 fs.clone(),
1507 &mut cx_c.to_async(),
1508 )
1509 .await
1510 .unwrap();
1511 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1512 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1513
1514 // Open and edit a buffer as both guests B and C.
1515 let buffer_b = project_b
1516 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1517 .await
1518 .unwrap();
1519 let buffer_c = project_c
1520 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1521 .await
1522 .unwrap();
1523 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1524 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1525
1526 // Open and edit that buffer as the host.
1527 let buffer_a = project_a
1528 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1529 .await
1530 .unwrap();
1531
1532 buffer_a
1533 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1534 .await;
1535 buffer_a.update(&mut cx_a, |buf, cx| {
1536 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1537 });
1538
1539 // Wait for edits to propagate
1540 buffer_a
1541 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1542 .await;
1543 buffer_b
1544 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1545 .await;
1546 buffer_c
1547 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1548 .await;
1549
1550 // Edit the buffer as the host and concurrently save as guest B.
1551 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1552 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1553 save_b.await.unwrap();
1554 assert_eq!(
1555 fs.load("/a/file1".as_ref()).await.unwrap(),
1556 "hi-a, i-am-c, i-am-b, i-am-a"
1557 );
1558 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1559 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1560 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1561
1562 // Make changes on host's file system, see those changes on guest worktrees.
1563 fs.rename(
1564 "/a/file1".as_ref(),
1565 "/a/file1-renamed".as_ref(),
1566 Default::default(),
1567 )
1568 .await
1569 .unwrap();
1570 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1571 .await
1572 .unwrap();
1573 fs.insert_file(Path::new("/a/file4"), "4".into())
1574 .await
1575 .unwrap();
1576
1577 worktree_a
1578 .condition(&cx_a, |tree, _| tree.file_count() == 4)
1579 .await;
1580 worktree_b
1581 .condition(&cx_b, |tree, _| tree.file_count() == 4)
1582 .await;
1583 worktree_c
1584 .condition(&cx_c, |tree, _| tree.file_count() == 4)
1585 .await;
1586 worktree_a.read_with(&cx_a, |tree, _| {
1587 assert_eq!(
1588 tree.paths()
1589 .map(|p| p.to_string_lossy())
1590 .collect::<Vec<_>>(),
1591 &[".zed.toml", "file1-renamed", "file3", "file4"]
1592 )
1593 });
1594 worktree_b.read_with(&cx_b, |tree, _| {
1595 assert_eq!(
1596 tree.paths()
1597 .map(|p| p.to_string_lossy())
1598 .collect::<Vec<_>>(),
1599 &[".zed.toml", "file1-renamed", "file3", "file4"]
1600 )
1601 });
1602 worktree_c.read_with(&cx_c, |tree, _| {
1603 assert_eq!(
1604 tree.paths()
1605 .map(|p| p.to_string_lossy())
1606 .collect::<Vec<_>>(),
1607 &[".zed.toml", "file1-renamed", "file3", "file4"]
1608 )
1609 });
1610
1611 // Ensure buffer files are updated as well.
1612 buffer_a
1613 .condition(&cx_a, |buf, _| {
1614 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1615 })
1616 .await;
1617 buffer_b
1618 .condition(&cx_b, |buf, _| {
1619 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1620 })
1621 .await;
1622 buffer_c
1623 .condition(&cx_c, |buf, _| {
1624 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1625 })
1626 .await;
1627 }
1628
1629 #[gpui::test]
1630 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1631 cx_a.foreground().forbid_parking();
1632 let lang_registry = Arc::new(LanguageRegistry::new());
1633 let fs = Arc::new(FakeFs::new(cx_a.background()));
1634
1635 // Connect to a server as 2 clients.
1636 let mut server = TestServer::start(cx_a.foreground()).await;
1637 let client_a = server.create_client(&mut cx_a, "user_a").await;
1638 let client_b = server.create_client(&mut cx_b, "user_b").await;
1639
1640 // Share a project as client A
1641 fs.insert_tree(
1642 "/dir",
1643 json!({
1644 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1645 "a.txt": "a-contents",
1646 }),
1647 )
1648 .await;
1649
1650 let project_a = cx_a.update(|cx| {
1651 Project::local(
1652 client_a.clone(),
1653 client_a.user_store.clone(),
1654 lang_registry.clone(),
1655 fs.clone(),
1656 cx,
1657 )
1658 });
1659 let (worktree_a, _) = project_a
1660 .update(&mut cx_a, |p, cx| {
1661 p.find_or_create_local_worktree("/dir", false, cx)
1662 })
1663 .await
1664 .unwrap();
1665 worktree_a
1666 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1667 .await;
1668 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1669 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1670 project_a
1671 .update(&mut cx_a, |p, cx| p.share(cx))
1672 .await
1673 .unwrap();
1674
1675 // Join that project as client B
1676 let project_b = Project::remote(
1677 project_id,
1678 client_b.clone(),
1679 client_b.user_store.clone(),
1680 lang_registry.clone(),
1681 fs.clone(),
1682 &mut cx_b.to_async(),
1683 )
1684 .await
1685 .unwrap();
1686 let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1687
1688 // Open a buffer as client B
1689 let buffer_b = project_b
1690 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1691 .await
1692 .unwrap();
1693 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1694
1695 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1696 buffer_b.read_with(&cx_b, |buf, _| {
1697 assert!(buf.is_dirty());
1698 assert!(!buf.has_conflict());
1699 });
1700
1701 buffer_b
1702 .update(&mut cx_b, |buf, cx| buf.save(cx))
1703 .await
1704 .unwrap();
1705 worktree_b
1706 .condition(&cx_b, |_, cx| {
1707 buffer_b.read(cx).file().unwrap().mtime() != mtime
1708 })
1709 .await;
1710 buffer_b.read_with(&cx_b, |buf, _| {
1711 assert!(!buf.is_dirty());
1712 assert!(!buf.has_conflict());
1713 });
1714
1715 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1716 buffer_b.read_with(&cx_b, |buf, _| {
1717 assert!(buf.is_dirty());
1718 assert!(!buf.has_conflict());
1719 });
1720 }
1721
1722 #[gpui::test]
1723 async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1724 cx_a.foreground().forbid_parking();
1725 let lang_registry = Arc::new(LanguageRegistry::new());
1726 let fs = Arc::new(FakeFs::new(cx_a.background()));
1727
1728 // Connect to a server as 2 clients.
1729 let mut server = TestServer::start(cx_a.foreground()).await;
1730 let client_a = server.create_client(&mut cx_a, "user_a").await;
1731 let client_b = server.create_client(&mut cx_b, "user_b").await;
1732
1733 // Share a project as client A
1734 fs.insert_tree(
1735 "/dir",
1736 json!({
1737 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1738 "a.txt": "a-contents",
1739 }),
1740 )
1741 .await;
1742
1743 let project_a = cx_a.update(|cx| {
1744 Project::local(
1745 client_a.clone(),
1746 client_a.user_store.clone(),
1747 lang_registry.clone(),
1748 fs.clone(),
1749 cx,
1750 )
1751 });
1752 let (worktree_a, _) = project_a
1753 .update(&mut cx_a, |p, cx| {
1754 p.find_or_create_local_worktree("/dir", false, cx)
1755 })
1756 .await
1757 .unwrap();
1758 worktree_a
1759 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1760 .await;
1761 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1762 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1763 project_a
1764 .update(&mut cx_a, |p, cx| p.share(cx))
1765 .await
1766 .unwrap();
1767
1768 // Join that project as client B
1769 let project_b = Project::remote(
1770 project_id,
1771 client_b.clone(),
1772 client_b.user_store.clone(),
1773 lang_registry.clone(),
1774 fs.clone(),
1775 &mut cx_b.to_async(),
1776 )
1777 .await
1778 .unwrap();
1779 let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1780
1781 // Open a buffer as client B
1782 let buffer_b = project_b
1783 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1784 .await
1785 .unwrap();
1786 buffer_b.read_with(&cx_b, |buf, _| {
1787 assert!(!buf.is_dirty());
1788 assert!(!buf.has_conflict());
1789 });
1790
1791 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1792 .await
1793 .unwrap();
1794 buffer_b
1795 .condition(&cx_b, |buf, _| {
1796 buf.text() == "new contents" && !buf.is_dirty()
1797 })
1798 .await;
1799 buffer_b.read_with(&cx_b, |buf, _| {
1800 assert!(!buf.has_conflict());
1801 });
1802 }
1803
1804 #[gpui::test(iterations = 100)]
1805 async fn test_editing_while_guest_opens_buffer(
1806 mut cx_a: TestAppContext,
1807 mut cx_b: TestAppContext,
1808 ) {
1809 cx_a.foreground().forbid_parking();
1810 let lang_registry = Arc::new(LanguageRegistry::new());
1811 let fs = Arc::new(FakeFs::new(cx_a.background()));
1812
1813 // Connect to a server as 2 clients.
1814 let mut server = TestServer::start(cx_a.foreground()).await;
1815 let client_a = server.create_client(&mut cx_a, "user_a").await;
1816 let client_b = server.create_client(&mut cx_b, "user_b").await;
1817
1818 // Share a project as client A
1819 fs.insert_tree(
1820 "/dir",
1821 json!({
1822 ".zed.toml": r#"collaborators = ["user_b"]"#,
1823 "a.txt": "a-contents",
1824 }),
1825 )
1826 .await;
1827 let project_a = cx_a.update(|cx| {
1828 Project::local(
1829 client_a.clone(),
1830 client_a.user_store.clone(),
1831 lang_registry.clone(),
1832 fs.clone(),
1833 cx,
1834 )
1835 });
1836 let (worktree_a, _) = project_a
1837 .update(&mut cx_a, |p, cx| {
1838 p.find_or_create_local_worktree("/dir", false, cx)
1839 })
1840 .await
1841 .unwrap();
1842 worktree_a
1843 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1844 .await;
1845 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1846 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1847 project_a
1848 .update(&mut cx_a, |p, cx| p.share(cx))
1849 .await
1850 .unwrap();
1851
1852 // Join that project as client B
1853 let project_b = Project::remote(
1854 project_id,
1855 client_b.clone(),
1856 client_b.user_store.clone(),
1857 lang_registry.clone(),
1858 fs.clone(),
1859 &mut cx_b.to_async(),
1860 )
1861 .await
1862 .unwrap();
1863
1864 // Open a buffer as client A
1865 let buffer_a = project_a
1866 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1867 .await
1868 .unwrap();
1869
1870 // Start opening the same buffer as client B
1871 let buffer_b = cx_b
1872 .background()
1873 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1874 task::yield_now().await;
1875
1876 // Edit the buffer as client A while client B is still opening it.
1877 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1878
1879 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1880 let buffer_b = buffer_b.await.unwrap();
1881 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1882 }
1883
1884 #[gpui::test]
1885 async fn test_leaving_worktree_while_opening_buffer(
1886 mut cx_a: TestAppContext,
1887 mut cx_b: TestAppContext,
1888 ) {
1889 cx_a.foreground().forbid_parking();
1890 let lang_registry = Arc::new(LanguageRegistry::new());
1891 let fs = Arc::new(FakeFs::new(cx_a.background()));
1892
1893 // Connect to a server as 2 clients.
1894 let mut server = TestServer::start(cx_a.foreground()).await;
1895 let client_a = server.create_client(&mut cx_a, "user_a").await;
1896 let client_b = server.create_client(&mut cx_b, "user_b").await;
1897
1898 // Share a project as client A
1899 fs.insert_tree(
1900 "/dir",
1901 json!({
1902 ".zed.toml": r#"collaborators = ["user_b"]"#,
1903 "a.txt": "a-contents",
1904 }),
1905 )
1906 .await;
1907 let project_a = cx_a.update(|cx| {
1908 Project::local(
1909 client_a.clone(),
1910 client_a.user_store.clone(),
1911 lang_registry.clone(),
1912 fs.clone(),
1913 cx,
1914 )
1915 });
1916 let (worktree_a, _) = project_a
1917 .update(&mut cx_a, |p, cx| {
1918 p.find_or_create_local_worktree("/dir", false, cx)
1919 })
1920 .await
1921 .unwrap();
1922 worktree_a
1923 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1924 .await;
1925 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1926 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1927 project_a
1928 .update(&mut cx_a, |p, cx| p.share(cx))
1929 .await
1930 .unwrap();
1931
1932 // Join that project as client B
1933 let project_b = Project::remote(
1934 project_id,
1935 client_b.clone(),
1936 client_b.user_store.clone(),
1937 lang_registry.clone(),
1938 fs.clone(),
1939 &mut cx_b.to_async(),
1940 )
1941 .await
1942 .unwrap();
1943
1944 // See that a guest has joined as client A.
1945 project_a
1946 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1947 .await;
1948
1949 // Begin opening a buffer as client B, but leave the project before the open completes.
1950 let buffer_b = cx_b
1951 .background()
1952 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1953 cx_b.update(|_| drop(project_b));
1954 drop(buffer_b);
1955
1956 // See that the guest has left.
1957 project_a
1958 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1959 .await;
1960 }
1961
1962 #[gpui::test]
1963 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1964 cx_a.foreground().forbid_parking();
1965 let lang_registry = Arc::new(LanguageRegistry::new());
1966 let fs = Arc::new(FakeFs::new(cx_a.background()));
1967
1968 // Connect to a server as 2 clients.
1969 let mut server = TestServer::start(cx_a.foreground()).await;
1970 let client_a = server.create_client(&mut cx_a, "user_a").await;
1971 let client_b = server.create_client(&mut cx_b, "user_b").await;
1972
1973 // Share a project as client A
1974 fs.insert_tree(
1975 "/a",
1976 json!({
1977 ".zed.toml": r#"collaborators = ["user_b"]"#,
1978 "a.txt": "a-contents",
1979 "b.txt": "b-contents",
1980 }),
1981 )
1982 .await;
1983 let project_a = cx_a.update(|cx| {
1984 Project::local(
1985 client_a.clone(),
1986 client_a.user_store.clone(),
1987 lang_registry.clone(),
1988 fs.clone(),
1989 cx,
1990 )
1991 });
1992 let (worktree_a, _) = project_a
1993 .update(&mut cx_a, |p, cx| {
1994 p.find_or_create_local_worktree("/a", false, cx)
1995 })
1996 .await
1997 .unwrap();
1998 worktree_a
1999 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2000 .await;
2001 let project_id = project_a
2002 .update(&mut cx_a, |project, _| project.next_remote_id())
2003 .await;
2004 project_a
2005 .update(&mut cx_a, |project, cx| project.share(cx))
2006 .await
2007 .unwrap();
2008
2009 // Join that project as client B
2010 let _project_b = Project::remote(
2011 project_id,
2012 client_b.clone(),
2013 client_b.user_store.clone(),
2014 lang_registry.clone(),
2015 fs.clone(),
2016 &mut cx_b.to_async(),
2017 )
2018 .await
2019 .unwrap();
2020
2021 // See that a guest has joined as client A.
2022 project_a
2023 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2024 .await;
2025
2026 // Drop client B's connection and ensure client A observes client B leaving the worktree.
2027 client_b.disconnect(&cx_b.to_async()).unwrap();
2028 project_a
2029 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2030 .await;
2031 }
2032
2033 #[gpui::test]
2034 async fn test_collaborating_with_diagnostics(
2035 mut cx_a: TestAppContext,
2036 mut cx_b: TestAppContext,
2037 ) {
2038 cx_a.foreground().forbid_parking();
2039 let mut lang_registry = Arc::new(LanguageRegistry::new());
2040 let fs = Arc::new(FakeFs::new(cx_a.background()));
2041
2042 // Set up a fake language server.
2043 let (language_server_config, mut fake_language_server) =
2044 LanguageServerConfig::fake(cx_a.background()).await;
2045 Arc::get_mut(&mut lang_registry)
2046 .unwrap()
2047 .add(Arc::new(Language::new(
2048 LanguageConfig {
2049 name: "Rust".to_string(),
2050 path_suffixes: vec!["rs".to_string()],
2051 language_server: Some(language_server_config),
2052 ..Default::default()
2053 },
2054 Some(tree_sitter_rust::language()),
2055 )));
2056
2057 // Connect to a server as 2 clients.
2058 let mut server = TestServer::start(cx_a.foreground()).await;
2059 let client_a = server.create_client(&mut cx_a, "user_a").await;
2060 let client_b = server.create_client(&mut cx_b, "user_b").await;
2061
2062 // Share a project as client A
2063 fs.insert_tree(
2064 "/a",
2065 json!({
2066 ".zed.toml": r#"collaborators = ["user_b"]"#,
2067 "a.rs": "let one = two",
2068 "other.rs": "",
2069 }),
2070 )
2071 .await;
2072 let project_a = cx_a.update(|cx| {
2073 Project::local(
2074 client_a.clone(),
2075 client_a.user_store.clone(),
2076 lang_registry.clone(),
2077 fs.clone(),
2078 cx,
2079 )
2080 });
2081 let (worktree_a, _) = project_a
2082 .update(&mut cx_a, |p, cx| {
2083 p.find_or_create_local_worktree("/a", false, cx)
2084 })
2085 .await
2086 .unwrap();
2087 worktree_a
2088 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2089 .await;
2090 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2091 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2092 project_a
2093 .update(&mut cx_a, |p, cx| p.share(cx))
2094 .await
2095 .unwrap();
2096
2097 // Cause the language server to start.
2098 let _ = cx_a
2099 .background()
2100 .spawn(project_a.update(&mut cx_a, |project, cx| {
2101 project.open_buffer(
2102 ProjectPath {
2103 worktree_id,
2104 path: Path::new("other.rs").into(),
2105 },
2106 cx,
2107 )
2108 }))
2109 .await
2110 .unwrap();
2111
2112 // Simulate a language server reporting errors for a file.
2113 fake_language_server
2114 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2115 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2116 version: None,
2117 diagnostics: vec![lsp::Diagnostic {
2118 severity: Some(lsp::DiagnosticSeverity::ERROR),
2119 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2120 message: "message 1".to_string(),
2121 ..Default::default()
2122 }],
2123 })
2124 .await;
2125
2126 // Wait for server to see the diagnostics update.
2127 server
2128 .condition(|store| {
2129 let worktree = store
2130 .project(project_id)
2131 .unwrap()
2132 .worktrees
2133 .get(&worktree_id.to_proto())
2134 .unwrap();
2135
2136 !worktree
2137 .share
2138 .as_ref()
2139 .unwrap()
2140 .diagnostic_summaries
2141 .is_empty()
2142 })
2143 .await;
2144
2145 // Join the worktree as client B.
2146 let project_b = Project::remote(
2147 project_id,
2148 client_b.clone(),
2149 client_b.user_store.clone(),
2150 lang_registry.clone(),
2151 fs.clone(),
2152 &mut cx_b.to_async(),
2153 )
2154 .await
2155 .unwrap();
2156
2157 project_b.read_with(&cx_b, |project, cx| {
2158 assert_eq!(
2159 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2160 &[(
2161 ProjectPath {
2162 worktree_id,
2163 path: Arc::from(Path::new("a.rs")),
2164 },
2165 DiagnosticSummary {
2166 error_count: 1,
2167 warning_count: 0,
2168 ..Default::default()
2169 },
2170 )]
2171 )
2172 });
2173
2174 // Simulate a language server reporting more errors for a file.
2175 fake_language_server
2176 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2177 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2178 version: None,
2179 diagnostics: vec![
2180 lsp::Diagnostic {
2181 severity: Some(lsp::DiagnosticSeverity::ERROR),
2182 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2183 message: "message 1".to_string(),
2184 ..Default::default()
2185 },
2186 lsp::Diagnostic {
2187 severity: Some(lsp::DiagnosticSeverity::WARNING),
2188 range: lsp::Range::new(
2189 lsp::Position::new(0, 10),
2190 lsp::Position::new(0, 13),
2191 ),
2192 message: "message 2".to_string(),
2193 ..Default::default()
2194 },
2195 ],
2196 })
2197 .await;
2198
2199 // Client b gets the updated summaries
2200 project_b
2201 .condition(&cx_b, |project, cx| {
2202 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2203 == &[(
2204 ProjectPath {
2205 worktree_id,
2206 path: Arc::from(Path::new("a.rs")),
2207 },
2208 DiagnosticSummary {
2209 error_count: 1,
2210 warning_count: 1,
2211 ..Default::default()
2212 },
2213 )]
2214 })
2215 .await;
2216
2217 // Open the file with the errors on client B. They should be present.
2218 let buffer_b = cx_b
2219 .background()
2220 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2221 .await
2222 .unwrap();
2223
2224 buffer_b.read_with(&cx_b, |buffer, _| {
2225 assert_eq!(
2226 buffer
2227 .snapshot()
2228 .diagnostics_in_range::<_, Point>(0..buffer.len())
2229 .map(|entry| entry)
2230 .collect::<Vec<_>>(),
2231 &[
2232 DiagnosticEntry {
2233 range: Point::new(0, 4)..Point::new(0, 7),
2234 diagnostic: Diagnostic {
2235 group_id: 0,
2236 message: "message 1".to_string(),
2237 severity: lsp::DiagnosticSeverity::ERROR,
2238 is_primary: true,
2239 ..Default::default()
2240 }
2241 },
2242 DiagnosticEntry {
2243 range: Point::new(0, 10)..Point::new(0, 13),
2244 diagnostic: Diagnostic {
2245 group_id: 1,
2246 severity: lsp::DiagnosticSeverity::WARNING,
2247 message: "message 2".to_string(),
2248 is_primary: true,
2249 ..Default::default()
2250 }
2251 }
2252 ]
2253 );
2254 });
2255 }
2256
2257 #[gpui::test]
2258 async fn test_collaborating_with_completion(
2259 mut cx_a: TestAppContext,
2260 mut cx_b: TestAppContext,
2261 ) {
2262 cx_a.foreground().forbid_parking();
2263 let mut lang_registry = Arc::new(LanguageRegistry::new());
2264 let fs = Arc::new(FakeFs::new(cx_a.background()));
2265
2266 // Set up a fake language server.
2267 let (language_server_config, mut fake_language_server) =
2268 LanguageServerConfig::fake_with_capabilities(
2269 lsp::ServerCapabilities {
2270 completion_provider: Some(lsp::CompletionOptions {
2271 trigger_characters: Some(vec![".".to_string()]),
2272 ..Default::default()
2273 }),
2274 ..Default::default()
2275 },
2276 cx_a.background(),
2277 )
2278 .await;
2279 Arc::get_mut(&mut lang_registry)
2280 .unwrap()
2281 .add(Arc::new(Language::new(
2282 LanguageConfig {
2283 name: "Rust".to_string(),
2284 path_suffixes: vec!["rs".to_string()],
2285 language_server: Some(language_server_config),
2286 ..Default::default()
2287 },
2288 Some(tree_sitter_rust::language()),
2289 )));
2290
2291 // Connect to a server as 2 clients.
2292 let mut server = TestServer::start(cx_a.foreground()).await;
2293 let client_a = server.create_client(&mut cx_a, "user_a").await;
2294 let client_b = server.create_client(&mut cx_b, "user_b").await;
2295
2296 // Share a project as client A
2297 fs.insert_tree(
2298 "/a",
2299 json!({
2300 ".zed.toml": r#"collaborators = ["user_b"]"#,
2301 "main.rs": "fn main() { a }",
2302 "other.rs": "",
2303 }),
2304 )
2305 .await;
2306 let project_a = cx_a.update(|cx| {
2307 Project::local(
2308 client_a.clone(),
2309 client_a.user_store.clone(),
2310 lang_registry.clone(),
2311 fs.clone(),
2312 cx,
2313 )
2314 });
2315 let (worktree_a, _) = project_a
2316 .update(&mut cx_a, |p, cx| {
2317 p.find_or_create_local_worktree("/a", false, cx)
2318 })
2319 .await
2320 .unwrap();
2321 worktree_a
2322 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2323 .await;
2324 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2325 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2326 project_a
2327 .update(&mut cx_a, |p, cx| p.share(cx))
2328 .await
2329 .unwrap();
2330
2331 // Join the worktree as client B.
2332 let project_b = Project::remote(
2333 project_id,
2334 client_b.clone(),
2335 client_b.user_store.clone(),
2336 lang_registry.clone(),
2337 fs.clone(),
2338 &mut cx_b.to_async(),
2339 )
2340 .await
2341 .unwrap();
2342
2343 // Open a file in an editor as the guest.
2344 let buffer_b = project_b
2345 .update(&mut cx_b, |p, cx| {
2346 p.open_buffer((worktree_id, "main.rs"), cx)
2347 })
2348 .await
2349 .unwrap();
2350 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2351 let editor_b = cx_b.add_view(window_b, |cx| {
2352 Editor::for_buffer(
2353 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2354 Arc::new(|cx| EditorSettings::test(cx)),
2355 cx,
2356 )
2357 });
2358
2359 // Type a completion trigger character as the guest.
2360 editor_b.update(&mut cx_b, |editor, cx| {
2361 editor.select_ranges([13..13], None, cx);
2362 editor.handle_input(&Input(".".into()), cx);
2363 cx.focus(&editor_b);
2364 });
2365
2366 // Receive a completion request as the host's language server.
2367 let (request_id, params) = fake_language_server
2368 .receive_request::<lsp::request::Completion>()
2369 .await;
2370 assert_eq!(
2371 params.text_document_position.text_document.uri,
2372 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2373 );
2374 assert_eq!(
2375 params.text_document_position.position,
2376 lsp::Position::new(0, 14),
2377 );
2378
2379 // Return some completions from the host's language server.
2380 fake_language_server
2381 .respond(
2382 request_id,
2383 Some(lsp::CompletionResponse::Array(vec![
2384 lsp::CompletionItem {
2385 label: "first_method(…)".into(),
2386 detail: Some("fn(&mut self, B) -> C".into()),
2387 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2388 new_text: "first_method($1)".to_string(),
2389 range: lsp::Range::new(
2390 lsp::Position::new(0, 14),
2391 lsp::Position::new(0, 14),
2392 ),
2393 })),
2394 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2395 ..Default::default()
2396 },
2397 lsp::CompletionItem {
2398 label: "second_method(…)".into(),
2399 detail: Some("fn(&mut self, C) -> D<E>".into()),
2400 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2401 new_text: "second_method()".to_string(),
2402 range: lsp::Range::new(
2403 lsp::Position::new(0, 14),
2404 lsp::Position::new(0, 14),
2405 ),
2406 })),
2407 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2408 ..Default::default()
2409 },
2410 ])),
2411 )
2412 .await;
2413
2414 // Open the buffer on the host.
2415 let buffer_a = project_a
2416 .update(&mut cx_a, |p, cx| {
2417 p.open_buffer((worktree_id, "main.rs"), cx)
2418 })
2419 .await
2420 .unwrap();
2421 buffer_a
2422 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2423 .await;
2424
2425 // Confirm a completion on the guest.
2426 editor_b.next_notification(&cx_b).await;
2427 editor_b.update(&mut cx_b, |editor, cx| {
2428 assert!(editor.showing_context_menu());
2429 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2430 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2431 });
2432
2433 buffer_a
2434 .condition(&cx_a, |buffer, _| {
2435 buffer.text() == "fn main() { a.first_method() }"
2436 })
2437 .await;
2438
2439 // Receive a request resolve the selected completion on the host's language server.
2440 let (request_id, params) = fake_language_server
2441 .receive_request::<lsp::request::ResolveCompletionItem>()
2442 .await;
2443 assert_eq!(params.label, "first_method(…)");
2444
2445 // Return a resolved completion from the host's language server.
2446 // The resolved completion has an additional text edit.
2447 fake_language_server
2448 .respond(
2449 request_id,
2450 lsp::CompletionItem {
2451 label: "first_method(…)".into(),
2452 detail: Some("fn(&mut self, B) -> C".into()),
2453 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2454 new_text: "first_method($1)".to_string(),
2455 range: lsp::Range::new(
2456 lsp::Position::new(0, 14),
2457 lsp::Position::new(0, 14),
2458 ),
2459 })),
2460 additional_text_edits: Some(vec![lsp::TextEdit {
2461 new_text: "use d::SomeTrait;\n".to_string(),
2462 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2463 }]),
2464 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2465 ..Default::default()
2466 },
2467 )
2468 .await;
2469
2470 // The additional edit is applied.
2471 buffer_b
2472 .condition(&cx_b, |buffer, _| {
2473 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2474 })
2475 .await;
2476 assert_eq!(
2477 buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2478 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2479 );
2480 }
2481
2482 #[gpui::test]
2483 async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2484 cx_a.foreground().forbid_parking();
2485 let mut lang_registry = Arc::new(LanguageRegistry::new());
2486 let fs = Arc::new(FakeFs::new(cx_a.background()));
2487
2488 // Set up a fake language server.
2489 let (language_server_config, mut fake_language_server) =
2490 LanguageServerConfig::fake(cx_a.background()).await;
2491 Arc::get_mut(&mut lang_registry)
2492 .unwrap()
2493 .add(Arc::new(Language::new(
2494 LanguageConfig {
2495 name: "Rust".to_string(),
2496 path_suffixes: vec!["rs".to_string()],
2497 language_server: Some(language_server_config),
2498 ..Default::default()
2499 },
2500 Some(tree_sitter_rust::language()),
2501 )));
2502
2503 // Connect to a server as 2 clients.
2504 let mut server = TestServer::start(cx_a.foreground()).await;
2505 let client_a = server.create_client(&mut cx_a, "user_a").await;
2506 let client_b = server.create_client(&mut cx_b, "user_b").await;
2507
2508 // Share a project as client A
2509 fs.insert_tree(
2510 "/a",
2511 json!({
2512 ".zed.toml": r#"collaborators = ["user_b"]"#,
2513 "a.rs": "let one = two",
2514 }),
2515 )
2516 .await;
2517 let project_a = cx_a.update(|cx| {
2518 Project::local(
2519 client_a.clone(),
2520 client_a.user_store.clone(),
2521 lang_registry.clone(),
2522 fs.clone(),
2523 cx,
2524 )
2525 });
2526 let (worktree_a, _) = project_a
2527 .update(&mut cx_a, |p, cx| {
2528 p.find_or_create_local_worktree("/a", false, cx)
2529 })
2530 .await
2531 .unwrap();
2532 worktree_a
2533 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2534 .await;
2535 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2536 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2537 project_a
2538 .update(&mut cx_a, |p, cx| p.share(cx))
2539 .await
2540 .unwrap();
2541
2542 // Join the worktree as client B.
2543 let project_b = Project::remote(
2544 project_id,
2545 client_b.clone(),
2546 client_b.user_store.clone(),
2547 lang_registry.clone(),
2548 fs.clone(),
2549 &mut cx_b.to_async(),
2550 )
2551 .await
2552 .unwrap();
2553
2554 let buffer_b = cx_b
2555 .background()
2556 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2557 .await
2558 .unwrap();
2559
2560 let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
2561 let (request_id, _) = fake_language_server
2562 .receive_request::<lsp::request::Formatting>()
2563 .await;
2564 fake_language_server
2565 .respond(
2566 request_id,
2567 Some(vec![
2568 lsp::TextEdit {
2569 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2570 new_text: "h".to_string(),
2571 },
2572 lsp::TextEdit {
2573 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2574 new_text: "y".to_string(),
2575 },
2576 ]),
2577 )
2578 .await;
2579 format.await.unwrap();
2580 assert_eq!(
2581 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2582 "let honey = two"
2583 );
2584 }
2585
2586 #[gpui::test]
2587 async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2588 cx_a.foreground().forbid_parking();
2589 let mut lang_registry = Arc::new(LanguageRegistry::new());
2590 let fs = Arc::new(FakeFs::new(cx_a.background()));
2591 fs.insert_tree(
2592 "/root-1",
2593 json!({
2594 ".zed.toml": r#"collaborators = ["user_b"]"#,
2595 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2596 }),
2597 )
2598 .await;
2599 fs.insert_tree(
2600 "/root-2",
2601 json!({
2602 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2603 }),
2604 )
2605 .await;
2606
2607 // Set up a fake language server.
2608 let (language_server_config, mut fake_language_server) =
2609 LanguageServerConfig::fake(cx_a.background()).await;
2610 Arc::get_mut(&mut lang_registry)
2611 .unwrap()
2612 .add(Arc::new(Language::new(
2613 LanguageConfig {
2614 name: "Rust".to_string(),
2615 path_suffixes: vec!["rs".to_string()],
2616 language_server: Some(language_server_config),
2617 ..Default::default()
2618 },
2619 Some(tree_sitter_rust::language()),
2620 )));
2621
2622 // Connect to a server as 2 clients.
2623 let mut server = TestServer::start(cx_a.foreground()).await;
2624 let client_a = server.create_client(&mut cx_a, "user_a").await;
2625 let client_b = server.create_client(&mut cx_b, "user_b").await;
2626
2627 // Share a project as client A
2628 let project_a = cx_a.update(|cx| {
2629 Project::local(
2630 client_a.clone(),
2631 client_a.user_store.clone(),
2632 lang_registry.clone(),
2633 fs.clone(),
2634 cx,
2635 )
2636 });
2637 let (worktree_a, _) = project_a
2638 .update(&mut cx_a, |p, cx| {
2639 p.find_or_create_local_worktree("/root-1", false, cx)
2640 })
2641 .await
2642 .unwrap();
2643 worktree_a
2644 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2645 .await;
2646 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2647 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2648 project_a
2649 .update(&mut cx_a, |p, cx| p.share(cx))
2650 .await
2651 .unwrap();
2652
2653 // Join the worktree as client B.
2654 let project_b = Project::remote(
2655 project_id,
2656 client_b.clone(),
2657 client_b.user_store.clone(),
2658 lang_registry.clone(),
2659 fs.clone(),
2660 &mut cx_b.to_async(),
2661 )
2662 .await
2663 .unwrap();
2664
2665 // Open the file to be formatted on client B.
2666 let buffer_b = cx_b
2667 .background()
2668 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2669 .await
2670 .unwrap();
2671
2672 let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2673 let (request_id, _) = fake_language_server
2674 .receive_request::<lsp::request::GotoDefinition>()
2675 .await;
2676 fake_language_server
2677 .respond(
2678 request_id,
2679 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2680 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2681 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2682 ))),
2683 )
2684 .await;
2685 let definitions_1 = definitions_1.await.unwrap();
2686 cx_b.read(|cx| {
2687 assert_eq!(definitions_1.len(), 1);
2688 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2689 let target_buffer = definitions_1[0].target_buffer.read(cx);
2690 assert_eq!(
2691 target_buffer.text(),
2692 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2693 );
2694 assert_eq!(
2695 definitions_1[0].target_range.to_point(target_buffer),
2696 Point::new(0, 6)..Point::new(0, 9)
2697 );
2698 });
2699
2700 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2701 // the previous call to `definition`.
2702 let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2703 let (request_id, _) = fake_language_server
2704 .receive_request::<lsp::request::GotoDefinition>()
2705 .await;
2706 fake_language_server
2707 .respond(
2708 request_id,
2709 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2710 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2711 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2712 ))),
2713 )
2714 .await;
2715 let definitions_2 = definitions_2.await.unwrap();
2716 cx_b.read(|cx| {
2717 assert_eq!(definitions_2.len(), 1);
2718 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2719 let target_buffer = definitions_2[0].target_buffer.read(cx);
2720 assert_eq!(
2721 target_buffer.text(),
2722 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2723 );
2724 assert_eq!(
2725 definitions_2[0].target_range.to_point(target_buffer),
2726 Point::new(1, 6)..Point::new(1, 11)
2727 );
2728 });
2729 assert_eq!(
2730 definitions_1[0].target_buffer,
2731 definitions_2[0].target_buffer
2732 );
2733
2734 cx_b.update(|_| {
2735 drop(definitions_1);
2736 drop(definitions_2);
2737 });
2738 project_b
2739 .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2740 .await;
2741 }
2742
2743 #[gpui::test]
2744 async fn test_open_buffer_while_getting_definition_pointing_to_it(
2745 mut cx_a: TestAppContext,
2746 mut cx_b: TestAppContext,
2747 mut rng: StdRng,
2748 ) {
2749 cx_a.foreground().forbid_parking();
2750 let mut lang_registry = Arc::new(LanguageRegistry::new());
2751 let fs = Arc::new(FakeFs::new(cx_a.background()));
2752 fs.insert_tree(
2753 "/root",
2754 json!({
2755 ".zed.toml": r#"collaborators = ["user_b"]"#,
2756 "a.rs": "const ONE: usize = b::TWO;",
2757 "b.rs": "const TWO: usize = 2",
2758 }),
2759 )
2760 .await;
2761
2762 // Set up a fake language server.
2763 let (language_server_config, mut fake_language_server) =
2764 LanguageServerConfig::fake(cx_a.background()).await;
2765 Arc::get_mut(&mut lang_registry)
2766 .unwrap()
2767 .add(Arc::new(Language::new(
2768 LanguageConfig {
2769 name: "Rust".to_string(),
2770 path_suffixes: vec!["rs".to_string()],
2771 language_server: Some(language_server_config),
2772 ..Default::default()
2773 },
2774 Some(tree_sitter_rust::language()),
2775 )));
2776
2777 // Connect to a server as 2 clients.
2778 let mut server = TestServer::start(cx_a.foreground()).await;
2779 let client_a = server.create_client(&mut cx_a, "user_a").await;
2780 let client_b = server.create_client(&mut cx_b, "user_b").await;
2781
2782 // Share a project as client A
2783 let project_a = cx_a.update(|cx| {
2784 Project::local(
2785 client_a.clone(),
2786 client_a.user_store.clone(),
2787 lang_registry.clone(),
2788 fs.clone(),
2789 cx,
2790 )
2791 });
2792 let (worktree_a, _) = project_a
2793 .update(&mut cx_a, |p, cx| {
2794 p.find_or_create_local_worktree("/root", false, cx)
2795 })
2796 .await
2797 .unwrap();
2798 worktree_a
2799 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2800 .await;
2801 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2802 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2803 project_a
2804 .update(&mut cx_a, |p, cx| p.share(cx))
2805 .await
2806 .unwrap();
2807
2808 // Join the worktree as client B.
2809 let project_b = Project::remote(
2810 project_id,
2811 client_b.clone(),
2812 client_b.user_store.clone(),
2813 lang_registry.clone(),
2814 fs.clone(),
2815 &mut cx_b.to_async(),
2816 )
2817 .await
2818 .unwrap();
2819
2820 let buffer_b1 = cx_b
2821 .background()
2822 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2823 .await
2824 .unwrap();
2825
2826 let definitions;
2827 let buffer_b2;
2828 if rng.gen() {
2829 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2830 buffer_b2 =
2831 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2832 } else {
2833 buffer_b2 =
2834 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2835 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2836 }
2837
2838 let (request_id, _) = fake_language_server
2839 .receive_request::<lsp::request::GotoDefinition>()
2840 .await;
2841 fake_language_server
2842 .respond(
2843 request_id,
2844 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2845 lsp::Url::from_file_path("/root/b.rs").unwrap(),
2846 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2847 ))),
2848 )
2849 .await;
2850
2851 let buffer_b2 = buffer_b2.await.unwrap();
2852 let definitions = definitions.await.unwrap();
2853 assert_eq!(definitions.len(), 1);
2854 assert_eq!(definitions[0].target_buffer, buffer_b2);
2855 }
2856
2857 #[gpui::test]
2858 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2859 cx_a.foreground().forbid_parking();
2860
2861 // Connect to a server as 2 clients.
2862 let mut server = TestServer::start(cx_a.foreground()).await;
2863 let client_a = server.create_client(&mut cx_a, "user_a").await;
2864 let client_b = server.create_client(&mut cx_b, "user_b").await;
2865
2866 // Create an org that includes these 2 users.
2867 let db = &server.app_state.db;
2868 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2869 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2870 .await
2871 .unwrap();
2872 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2873 .await
2874 .unwrap();
2875
2876 // Create a channel that includes all the users.
2877 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2878 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2879 .await
2880 .unwrap();
2881 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2882 .await
2883 .unwrap();
2884 db.create_channel_message(
2885 channel_id,
2886 client_b.current_user_id(&cx_b),
2887 "hello A, it's B.",
2888 OffsetDateTime::now_utc(),
2889 1,
2890 )
2891 .await
2892 .unwrap();
2893
2894 let channels_a = cx_a
2895 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2896 channels_a
2897 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2898 .await;
2899 channels_a.read_with(&cx_a, |list, _| {
2900 assert_eq!(
2901 list.available_channels().unwrap(),
2902 &[ChannelDetails {
2903 id: channel_id.to_proto(),
2904 name: "test-channel".to_string()
2905 }]
2906 )
2907 });
2908 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2909 this.get_channel(channel_id.to_proto(), cx).unwrap()
2910 });
2911 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2912 channel_a
2913 .condition(&cx_a, |channel, _| {
2914 channel_messages(channel)
2915 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2916 })
2917 .await;
2918
2919 let channels_b = cx_b
2920 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2921 channels_b
2922 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2923 .await;
2924 channels_b.read_with(&cx_b, |list, _| {
2925 assert_eq!(
2926 list.available_channels().unwrap(),
2927 &[ChannelDetails {
2928 id: channel_id.to_proto(),
2929 name: "test-channel".to_string()
2930 }]
2931 )
2932 });
2933
2934 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2935 this.get_channel(channel_id.to_proto(), cx).unwrap()
2936 });
2937 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2938 channel_b
2939 .condition(&cx_b, |channel, _| {
2940 channel_messages(channel)
2941 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2942 })
2943 .await;
2944
2945 channel_a
2946 .update(&mut cx_a, |channel, cx| {
2947 channel
2948 .send_message("oh, hi B.".to_string(), cx)
2949 .unwrap()
2950 .detach();
2951 let task = channel.send_message("sup".to_string(), cx).unwrap();
2952 assert_eq!(
2953 channel_messages(channel),
2954 &[
2955 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2956 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2957 ("user_a".to_string(), "sup".to_string(), true)
2958 ]
2959 );
2960 task
2961 })
2962 .await
2963 .unwrap();
2964
2965 channel_b
2966 .condition(&cx_b, |channel, _| {
2967 channel_messages(channel)
2968 == [
2969 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2970 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2971 ("user_a".to_string(), "sup".to_string(), false),
2972 ]
2973 })
2974 .await;
2975
2976 assert_eq!(
2977 server
2978 .state()
2979 .await
2980 .channel(channel_id)
2981 .unwrap()
2982 .connection_ids
2983 .len(),
2984 2
2985 );
2986 cx_b.update(|_| drop(channel_b));
2987 server
2988 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2989 .await;
2990
2991 cx_a.update(|_| drop(channel_a));
2992 server
2993 .condition(|state| state.channel(channel_id).is_none())
2994 .await;
2995 }
2996
2997 #[gpui::test]
2998 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2999 cx_a.foreground().forbid_parking();
3000
3001 let mut server = TestServer::start(cx_a.foreground()).await;
3002 let client_a = server.create_client(&mut cx_a, "user_a").await;
3003
3004 let db = &server.app_state.db;
3005 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3006 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3007 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3008 .await
3009 .unwrap();
3010 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3011 .await
3012 .unwrap();
3013
3014 let channels_a = cx_a
3015 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3016 channels_a
3017 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3018 .await;
3019 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3020 this.get_channel(channel_id.to_proto(), cx).unwrap()
3021 });
3022
3023 // Messages aren't allowed to be too long.
3024 channel_a
3025 .update(&mut cx_a, |channel, cx| {
3026 let long_body = "this is long.\n".repeat(1024);
3027 channel.send_message(long_body, cx).unwrap()
3028 })
3029 .await
3030 .unwrap_err();
3031
3032 // Messages aren't allowed to be blank.
3033 channel_a.update(&mut cx_a, |channel, cx| {
3034 channel.send_message(String::new(), cx).unwrap_err()
3035 });
3036
3037 // Leading and trailing whitespace are trimmed.
3038 channel_a
3039 .update(&mut cx_a, |channel, cx| {
3040 channel
3041 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3042 .unwrap()
3043 })
3044 .await
3045 .unwrap();
3046 assert_eq!(
3047 db.get_channel_messages(channel_id, 10, None)
3048 .await
3049 .unwrap()
3050 .iter()
3051 .map(|m| &m.body)
3052 .collect::<Vec<_>>(),
3053 &["surrounded by whitespace"]
3054 );
3055 }
3056
3057 #[gpui::test]
3058 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3059 cx_a.foreground().forbid_parking();
3060
3061 // Connect to a server as 2 clients.
3062 let mut server = TestServer::start(cx_a.foreground()).await;
3063 let client_a = server.create_client(&mut cx_a, "user_a").await;
3064 let client_b = server.create_client(&mut cx_b, "user_b").await;
3065 let mut status_b = client_b.status();
3066
3067 // Create an org that includes these 2 users.
3068 let db = &server.app_state.db;
3069 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3070 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3071 .await
3072 .unwrap();
3073 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3074 .await
3075 .unwrap();
3076
3077 // Create a channel that includes all the users.
3078 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3079 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3080 .await
3081 .unwrap();
3082 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3083 .await
3084 .unwrap();
3085 db.create_channel_message(
3086 channel_id,
3087 client_b.current_user_id(&cx_b),
3088 "hello A, it's B.",
3089 OffsetDateTime::now_utc(),
3090 2,
3091 )
3092 .await
3093 .unwrap();
3094
3095 let channels_a = cx_a
3096 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3097 channels_a
3098 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3099 .await;
3100
3101 channels_a.read_with(&cx_a, |list, _| {
3102 assert_eq!(
3103 list.available_channels().unwrap(),
3104 &[ChannelDetails {
3105 id: channel_id.to_proto(),
3106 name: "test-channel".to_string()
3107 }]
3108 )
3109 });
3110 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3111 this.get_channel(channel_id.to_proto(), cx).unwrap()
3112 });
3113 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3114 channel_a
3115 .condition(&cx_a, |channel, _| {
3116 channel_messages(channel)
3117 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3118 })
3119 .await;
3120
3121 let channels_b = cx_b
3122 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3123 channels_b
3124 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3125 .await;
3126 channels_b.read_with(&cx_b, |list, _| {
3127 assert_eq!(
3128 list.available_channels().unwrap(),
3129 &[ChannelDetails {
3130 id: channel_id.to_proto(),
3131 name: "test-channel".to_string()
3132 }]
3133 )
3134 });
3135
3136 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3137 this.get_channel(channel_id.to_proto(), cx).unwrap()
3138 });
3139 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3140 channel_b
3141 .condition(&cx_b, |channel, _| {
3142 channel_messages(channel)
3143 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3144 })
3145 .await;
3146
3147 // Disconnect client B, ensuring we can still access its cached channel data.
3148 server.forbid_connections();
3149 server.disconnect_client(client_b.current_user_id(&cx_b));
3150 while !matches!(
3151 status_b.next().await,
3152 Some(client::Status::ReconnectionError { .. })
3153 ) {}
3154
3155 channels_b.read_with(&cx_b, |channels, _| {
3156 assert_eq!(
3157 channels.available_channels().unwrap(),
3158 [ChannelDetails {
3159 id: channel_id.to_proto(),
3160 name: "test-channel".to_string()
3161 }]
3162 )
3163 });
3164 channel_b.read_with(&cx_b, |channel, _| {
3165 assert_eq!(
3166 channel_messages(channel),
3167 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3168 )
3169 });
3170
3171 // Send a message from client B while it is disconnected.
3172 channel_b
3173 .update(&mut cx_b, |channel, cx| {
3174 let task = channel
3175 .send_message("can you see this?".to_string(), cx)
3176 .unwrap();
3177 assert_eq!(
3178 channel_messages(channel),
3179 &[
3180 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3181 ("user_b".to_string(), "can you see this?".to_string(), true)
3182 ]
3183 );
3184 task
3185 })
3186 .await
3187 .unwrap_err();
3188
3189 // Send a message from client A while B is disconnected.
3190 channel_a
3191 .update(&mut cx_a, |channel, cx| {
3192 channel
3193 .send_message("oh, hi B.".to_string(), cx)
3194 .unwrap()
3195 .detach();
3196 let task = channel.send_message("sup".to_string(), cx).unwrap();
3197 assert_eq!(
3198 channel_messages(channel),
3199 &[
3200 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3201 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3202 ("user_a".to_string(), "sup".to_string(), true)
3203 ]
3204 );
3205 task
3206 })
3207 .await
3208 .unwrap();
3209
3210 // Give client B a chance to reconnect.
3211 server.allow_connections();
3212 cx_b.foreground().advance_clock(Duration::from_secs(10));
3213
3214 // Verify that B sees the new messages upon reconnection, as well as the message client B
3215 // sent while offline.
3216 channel_b
3217 .condition(&cx_b, |channel, _| {
3218 channel_messages(channel)
3219 == [
3220 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3221 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3222 ("user_a".to_string(), "sup".to_string(), false),
3223 ("user_b".to_string(), "can you see this?".to_string(), false),
3224 ]
3225 })
3226 .await;
3227
3228 // Ensure client A and B can communicate normally after reconnection.
3229 channel_a
3230 .update(&mut cx_a, |channel, cx| {
3231 channel.send_message("you online?".to_string(), cx).unwrap()
3232 })
3233 .await
3234 .unwrap();
3235 channel_b
3236 .condition(&cx_b, |channel, _| {
3237 channel_messages(channel)
3238 == [
3239 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3240 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3241 ("user_a".to_string(), "sup".to_string(), false),
3242 ("user_b".to_string(), "can you see this?".to_string(), false),
3243 ("user_a".to_string(), "you online?".to_string(), false),
3244 ]
3245 })
3246 .await;
3247
3248 channel_b
3249 .update(&mut cx_b, |channel, cx| {
3250 channel.send_message("yep".to_string(), cx).unwrap()
3251 })
3252 .await
3253 .unwrap();
3254 channel_a
3255 .condition(&cx_a, |channel, _| {
3256 channel_messages(channel)
3257 == [
3258 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3259 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3260 ("user_a".to_string(), "sup".to_string(), false),
3261 ("user_b".to_string(), "can you see this?".to_string(), false),
3262 ("user_a".to_string(), "you online?".to_string(), false),
3263 ("user_b".to_string(), "yep".to_string(), false),
3264 ]
3265 })
3266 .await;
3267 }
3268
3269 #[gpui::test]
3270 async fn test_contacts(
3271 mut cx_a: TestAppContext,
3272 mut cx_b: TestAppContext,
3273 mut cx_c: TestAppContext,
3274 ) {
3275 cx_a.foreground().forbid_parking();
3276 let lang_registry = Arc::new(LanguageRegistry::new());
3277 let fs = Arc::new(FakeFs::new(cx_a.background()));
3278
3279 // Connect to a server as 3 clients.
3280 let mut server = TestServer::start(cx_a.foreground()).await;
3281 let client_a = server.create_client(&mut cx_a, "user_a").await;
3282 let client_b = server.create_client(&mut cx_b, "user_b").await;
3283 let client_c = server.create_client(&mut cx_c, "user_c").await;
3284
3285 // Share a worktree as client A.
3286 fs.insert_tree(
3287 "/a",
3288 json!({
3289 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3290 }),
3291 )
3292 .await;
3293
3294 let project_a = cx_a.update(|cx| {
3295 Project::local(
3296 client_a.clone(),
3297 client_a.user_store.clone(),
3298 lang_registry.clone(),
3299 fs.clone(),
3300 cx,
3301 )
3302 });
3303 let (worktree_a, _) = project_a
3304 .update(&mut cx_a, |p, cx| {
3305 p.find_or_create_local_worktree("/a", false, cx)
3306 })
3307 .await
3308 .unwrap();
3309 worktree_a
3310 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3311 .await;
3312
3313 client_a
3314 .user_store
3315 .condition(&cx_a, |user_store, _| {
3316 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3317 })
3318 .await;
3319 client_b
3320 .user_store
3321 .condition(&cx_b, |user_store, _| {
3322 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3323 })
3324 .await;
3325 client_c
3326 .user_store
3327 .condition(&cx_c, |user_store, _| {
3328 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3329 })
3330 .await;
3331
3332 let project_id = project_a
3333 .update(&mut cx_a, |project, _| project.next_remote_id())
3334 .await;
3335 project_a
3336 .update(&mut cx_a, |project, cx| project.share(cx))
3337 .await
3338 .unwrap();
3339
3340 let _project_b = Project::remote(
3341 project_id,
3342 client_b.clone(),
3343 client_b.user_store.clone(),
3344 lang_registry.clone(),
3345 fs.clone(),
3346 &mut cx_b.to_async(),
3347 )
3348 .await
3349 .unwrap();
3350
3351 client_a
3352 .user_store
3353 .condition(&cx_a, |user_store, _| {
3354 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3355 })
3356 .await;
3357 client_b
3358 .user_store
3359 .condition(&cx_b, |user_store, _| {
3360 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3361 })
3362 .await;
3363 client_c
3364 .user_store
3365 .condition(&cx_c, |user_store, _| {
3366 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3367 })
3368 .await;
3369
3370 project_a
3371 .condition(&cx_a, |project, _| {
3372 project.collaborators().contains_key(&client_b.peer_id)
3373 })
3374 .await;
3375
3376 cx_a.update(move |_| drop(project_a));
3377 client_a
3378 .user_store
3379 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3380 .await;
3381 client_b
3382 .user_store
3383 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3384 .await;
3385 client_c
3386 .user_store
3387 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3388 .await;
3389
3390 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3391 user_store
3392 .contacts()
3393 .iter()
3394 .map(|contact| {
3395 let worktrees = contact
3396 .projects
3397 .iter()
3398 .map(|p| {
3399 (
3400 p.worktree_root_names[0].as_str(),
3401 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3402 )
3403 })
3404 .collect();
3405 (contact.user.github_login.as_str(), worktrees)
3406 })
3407 .collect()
3408 }
3409 }
3410
3411 struct TestServer {
3412 peer: Arc<Peer>,
3413 app_state: Arc<AppState>,
3414 server: Arc<Server>,
3415 foreground: Rc<executor::Foreground>,
3416 notifications: mpsc::Receiver<()>,
3417 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3418 forbid_connections: Arc<AtomicBool>,
3419 _test_db: TestDb,
3420 }
3421
3422 impl TestServer {
3423 async fn start(foreground: Rc<executor::Foreground>) -> Self {
3424 let test_db = TestDb::new();
3425 let app_state = Self::build_app_state(&test_db).await;
3426 let peer = Peer::new();
3427 let notifications = mpsc::channel(128);
3428 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3429 Self {
3430 peer,
3431 app_state,
3432 server,
3433 foreground,
3434 notifications: notifications.1,
3435 connection_killers: Default::default(),
3436 forbid_connections: Default::default(),
3437 _test_db: test_db,
3438 }
3439 }
3440
3441 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3442 let http = FakeHttpClient::with_404_response();
3443 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3444 let client_name = name.to_string();
3445 let mut client = Client::new(http.clone());
3446 let server = self.server.clone();
3447 let connection_killers = self.connection_killers.clone();
3448 let forbid_connections = self.forbid_connections.clone();
3449 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3450
3451 Arc::get_mut(&mut client)
3452 .unwrap()
3453 .override_authenticate(move |cx| {
3454 cx.spawn(|_| async move {
3455 let access_token = "the-token".to_string();
3456 Ok(Credentials {
3457 user_id: user_id.0 as u64,
3458 access_token,
3459 })
3460 })
3461 })
3462 .override_establish_connection(move |credentials, cx| {
3463 assert_eq!(credentials.user_id, user_id.0 as u64);
3464 assert_eq!(credentials.access_token, "the-token");
3465
3466 let server = server.clone();
3467 let connection_killers = connection_killers.clone();
3468 let forbid_connections = forbid_connections.clone();
3469 let client_name = client_name.clone();
3470 let connection_id_tx = connection_id_tx.clone();
3471 cx.spawn(move |cx| async move {
3472 if forbid_connections.load(SeqCst) {
3473 Err(EstablishConnectionError::other(anyhow!(
3474 "server is forbidding connections"
3475 )))
3476 } else {
3477 let (client_conn, server_conn, kill_conn) =
3478 Connection::in_memory(cx.background());
3479 connection_killers.lock().insert(user_id, kill_conn);
3480 cx.background()
3481 .spawn(server.handle_connection(
3482 server_conn,
3483 client_name,
3484 user_id,
3485 Some(connection_id_tx),
3486 ))
3487 .detach();
3488 Ok(client_conn)
3489 }
3490 })
3491 });
3492
3493 client
3494 .authenticate_and_connect(&cx.to_async())
3495 .await
3496 .unwrap();
3497
3498 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3499 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3500 let mut authed_user =
3501 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3502 while authed_user.next().await.unwrap().is_none() {}
3503
3504 TestClient {
3505 client,
3506 peer_id,
3507 user_store,
3508 }
3509 }
3510
3511 fn disconnect_client(&self, user_id: UserId) {
3512 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3513 let _ = kill_conn.try_send(Some(()));
3514 }
3515 }
3516
3517 fn forbid_connections(&self) {
3518 self.forbid_connections.store(true, SeqCst);
3519 }
3520
3521 fn allow_connections(&self) {
3522 self.forbid_connections.store(false, SeqCst);
3523 }
3524
3525 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3526 let mut config = Config::default();
3527 config.session_secret = "a".repeat(32);
3528 config.database_url = test_db.url.clone();
3529 let github_client = github::AppClient::test();
3530 Arc::new(AppState {
3531 db: test_db.db().clone(),
3532 handlebars: Default::default(),
3533 auth_client: auth::build_client("", ""),
3534 repo_client: github::RepoClient::test(&github_client),
3535 github_client,
3536 config,
3537 })
3538 }
3539
3540 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3541 self.server.store.read()
3542 }
3543
3544 async fn condition<F>(&mut self, mut predicate: F)
3545 where
3546 F: FnMut(&Store) -> bool,
3547 {
3548 async_std::future::timeout(Duration::from_millis(500), async {
3549 while !(predicate)(&*self.server.store.read()) {
3550 self.foreground.start_waiting();
3551 self.notifications.next().await;
3552 self.foreground.finish_waiting();
3553 }
3554 })
3555 .await
3556 .expect("condition timed out");
3557 }
3558 }
3559
3560 impl Drop for TestServer {
3561 fn drop(&mut self) {
3562 self.peer.reset();
3563 }
3564 }
3565
3566 struct TestClient {
3567 client: Arc<Client>,
3568 pub peer_id: PeerId,
3569 pub user_store: ModelHandle<UserStore>,
3570 }
3571
3572 impl Deref for TestClient {
3573 type Target = Arc<Client>;
3574
3575 fn deref(&self) -> &Self::Target {
3576 &self.client
3577 }
3578 }
3579
3580 impl TestClient {
3581 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3582 UserId::from_proto(
3583 self.user_store
3584 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3585 )
3586 }
3587 }
3588
3589 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3590 channel
3591 .messages()
3592 .cursor::<()>()
3593 .map(|m| {
3594 (
3595 m.sender.github_login.clone(),
3596 m.body.clone(),
3597 m.is_pending(),
3598 )
3599 })
3600 .collect()
3601 }
3602
3603 struct EmptyView;
3604
3605 impl gpui::Entity for EmptyView {
3606 type Event = ();
3607 }
3608
3609 impl gpui::View for EmptyView {
3610 fn ui_name() -> &'static str {
3611 "empty view"
3612 }
3613
3614 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3615 gpui::Element::boxed(gpui::elements::Empty)
3616 }
3617 }
3618}