1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44const MESSAGE_COUNT_PER_PAGE: usize = 100;
45const MAX_MESSAGE_LEN: usize = 1024;
46const NO_SUCH_PROJECT: &'static str = "no such project";
47
48impl Server {
49 pub fn new(
50 app_state: Arc<AppState>,
51 peer: Arc<Peer>,
52 notifications: Option<mpsc::Sender<()>>,
53 ) -> Arc<Self> {
54 let mut server = Self {
55 peer,
56 app_state,
57 store: Default::default(),
58 handlers: Default::default(),
59 notifications,
60 };
61
62 server
63 .add_handler(Server::ping)
64 .add_handler(Server::register_project)
65 .add_handler(Server::unregister_project)
66 .add_handler(Server::share_project)
67 .add_handler(Server::unshare_project)
68 .add_handler(Server::join_project)
69 .add_handler(Server::leave_project)
70 .add_handler(Server::register_worktree)
71 .add_handler(Server::unregister_worktree)
72 .add_handler(Server::share_worktree)
73 .add_handler(Server::update_worktree)
74 .add_handler(Server::update_diagnostic_summary)
75 .add_handler(Server::disk_based_diagnostics_updating)
76 .add_handler(Server::disk_based_diagnostics_updated)
77 .add_handler(Server::get_definition)
78 .add_handler(Server::open_buffer)
79 .add_handler(Server::close_buffer)
80 .add_handler(Server::update_buffer)
81 .add_handler(Server::update_buffer_file)
82 .add_handler(Server::buffer_reloaded)
83 .add_handler(Server::buffer_saved)
84 .add_handler(Server::save_buffer)
85 .add_handler(Server::format_buffer)
86 .add_handler(Server::get_completions)
87 .add_handler(Server::get_channels)
88 .add_handler(Server::get_users)
89 .add_handler(Server::join_channel)
90 .add_handler(Server::leave_channel)
91 .add_handler(Server::send_channel_message)
92 .add_handler(Server::get_channel_messages);
93
94 Arc::new(server)
95 }
96
97 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
98 where
99 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
100 Fut: 'static + Send + Future<Output = tide::Result<()>>,
101 M: EnvelopedMessage,
102 {
103 let prev_handler = self.handlers.insert(
104 TypeId::of::<M>(),
105 Box::new(move |server, envelope| {
106 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
107 (handler)(server, *envelope).boxed()
108 }),
109 );
110 if prev_handler.is_some() {
111 panic!("registered a handler for the same message twice");
112 }
113 self
114 }
115
116 pub fn handle_connection(
117 self: &Arc<Self>,
118 connection: Connection,
119 addr: String,
120 user_id: UserId,
121 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
122 ) -> impl Future<Output = ()> {
123 let mut this = self.clone();
124 async move {
125 let (connection_id, handle_io, mut incoming_rx) =
126 this.peer.add_connection(connection).await;
127
128 if let Some(send_connection_id) = send_connection_id.as_mut() {
129 let _ = send_connection_id.send(connection_id).await;
130 }
131
132 this.state_mut().add_connection(connection_id, user_id);
133 if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
134 log::error!("error updating contacts for {:?}: {}", user_id, err);
135 }
136
137 let handle_io = handle_io.fuse();
138 futures::pin_mut!(handle_io);
139 loop {
140 let next_message = incoming_rx.next().fuse();
141 futures::pin_mut!(next_message);
142 futures::select_biased! {
143 message = next_message => {
144 if let Some(message) = message {
145 let start_time = Instant::now();
146 log::info!("RPC message received: {}", message.payload_type_name());
147 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
148 if let Err(err) = (handler)(this.clone(), message).await {
149 log::error!("error handling message: {:?}", err);
150 } else {
151 log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
152 }
153
154 if let Some(mut notifications) = this.notifications.clone() {
155 let _ = notifications.send(()).await;
156 }
157 } else {
158 log::warn!("unhandled message: {}", message.payload_type_name());
159 }
160 } else {
161 log::info!("rpc connection closed {:?}", addr);
162 break;
163 }
164 }
165 handle_io = handle_io => {
166 if let Err(err) = handle_io {
167 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
168 }
169 break;
170 }
171 }
172 }
173
174 if let Err(err) = this.sign_out(connection_id).await {
175 log::error!("error signing out connection {:?} - {:?}", addr, err);
176 }
177 }
178 }
179
180 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
181 self.peer.disconnect(connection_id);
182 let removed_connection = self.state_mut().remove_connection(connection_id)?;
183
184 for (project_id, project) in removed_connection.hosted_projects {
185 if let Some(share) = project.share {
186 broadcast(
187 connection_id,
188 share.guests.keys().copied().collect(),
189 |conn_id| {
190 self.peer
191 .send(conn_id, proto::UnshareProject { project_id })
192 },
193 )
194 .await?;
195 }
196 }
197
198 for (project_id, peer_ids) in removed_connection.guest_project_ids {
199 broadcast(connection_id, peer_ids, |conn_id| {
200 self.peer.send(
201 conn_id,
202 proto::RemoveProjectCollaborator {
203 project_id,
204 peer_id: connection_id.0,
205 },
206 )
207 })
208 .await?;
209 }
210
211 self.update_contacts_for_users(removed_connection.contact_ids.iter())
212 .await?;
213
214 Ok(())
215 }
216
217 async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
218 self.peer.respond(request.receipt(), proto::Ack {}).await?;
219 Ok(())
220 }
221
222 async fn register_project(
223 mut self: Arc<Server>,
224 request: TypedEnvelope<proto::RegisterProject>,
225 ) -> tide::Result<()> {
226 let project_id = {
227 let mut state = self.state_mut();
228 let user_id = state.user_id_for_connection(request.sender_id)?;
229 state.register_project(request.sender_id, user_id)
230 };
231 self.peer
232 .respond(
233 request.receipt(),
234 proto::RegisterProjectResponse { project_id },
235 )
236 .await?;
237 Ok(())
238 }
239
240 async fn unregister_project(
241 mut self: Arc<Server>,
242 request: TypedEnvelope<proto::UnregisterProject>,
243 ) -> tide::Result<()> {
244 let project = self
245 .state_mut()
246 .unregister_project(request.payload.project_id, request.sender_id)
247 .ok_or_else(|| anyhow!("no such project"))?;
248 self.update_contacts_for_users(project.authorized_user_ids().iter())
249 .await?;
250 Ok(())
251 }
252
253 async fn share_project(
254 mut self: Arc<Server>,
255 request: TypedEnvelope<proto::ShareProject>,
256 ) -> tide::Result<()> {
257 self.state_mut()
258 .share_project(request.payload.project_id, request.sender_id);
259 self.peer.respond(request.receipt(), proto::Ack {}).await?;
260 Ok(())
261 }
262
263 async fn unshare_project(
264 mut self: Arc<Server>,
265 request: TypedEnvelope<proto::UnshareProject>,
266 ) -> tide::Result<()> {
267 let project_id = request.payload.project_id;
268 let project = self
269 .state_mut()
270 .unshare_project(project_id, request.sender_id)?;
271
272 broadcast(request.sender_id, project.connection_ids, |conn_id| {
273 self.peer
274 .send(conn_id, proto::UnshareProject { project_id })
275 })
276 .await?;
277 self.update_contacts_for_users(&project.authorized_user_ids)
278 .await?;
279
280 Ok(())
281 }
282
283 async fn join_project(
284 mut self: Arc<Server>,
285 request: TypedEnvelope<proto::JoinProject>,
286 ) -> tide::Result<()> {
287 let project_id = request.payload.project_id;
288
289 let user_id = self.state().user_id_for_connection(request.sender_id)?;
290 let response_data = self
291 .state_mut()
292 .join_project(request.sender_id, user_id, project_id)
293 .and_then(|joined| {
294 let share = joined.project.share()?;
295 let peer_count = share.guests.len();
296 let mut collaborators = Vec::with_capacity(peer_count);
297 collaborators.push(proto::Collaborator {
298 peer_id: joined.project.host_connection_id.0,
299 replica_id: 0,
300 user_id: joined.project.host_user_id.to_proto(),
301 });
302 let worktrees = joined
303 .project
304 .worktrees
305 .iter()
306 .filter_map(|(id, worktree)| {
307 worktree.share.as_ref().map(|share| proto::Worktree {
308 id: *id,
309 root_name: worktree.root_name.clone(),
310 entries: share.entries.values().cloned().collect(),
311 diagnostic_summaries: share
312 .diagnostic_summaries
313 .values()
314 .cloned()
315 .collect(),
316 weak: worktree.weak,
317 })
318 })
319 .collect();
320 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
321 if *peer_conn_id != request.sender_id {
322 collaborators.push(proto::Collaborator {
323 peer_id: peer_conn_id.0,
324 replica_id: *peer_replica_id as u32,
325 user_id: peer_user_id.to_proto(),
326 });
327 }
328 }
329 let response = proto::JoinProjectResponse {
330 worktrees,
331 replica_id: joined.replica_id as u32,
332 collaborators,
333 };
334 let connection_ids = joined.project.connection_ids();
335 let contact_user_ids = joined.project.authorized_user_ids();
336 Ok((response, connection_ids, contact_user_ids))
337 });
338
339 match response_data {
340 Ok((response, connection_ids, contact_user_ids)) => {
341 broadcast(request.sender_id, connection_ids, |conn_id| {
342 self.peer.send(
343 conn_id,
344 proto::AddProjectCollaborator {
345 project_id: project_id,
346 collaborator: Some(proto::Collaborator {
347 peer_id: request.sender_id.0,
348 replica_id: response.replica_id,
349 user_id: user_id.to_proto(),
350 }),
351 },
352 )
353 })
354 .await?;
355 self.peer.respond(request.receipt(), response).await?;
356 self.update_contacts_for_users(&contact_user_ids).await?;
357 }
358 Err(error) => {
359 self.peer
360 .respond_with_error(
361 request.receipt(),
362 proto::Error {
363 message: error.to_string(),
364 },
365 )
366 .await?;
367 }
368 }
369
370 Ok(())
371 }
372
373 async fn leave_project(
374 mut self: Arc<Server>,
375 request: TypedEnvelope<proto::LeaveProject>,
376 ) -> tide::Result<()> {
377 let sender_id = request.sender_id;
378 let project_id = request.payload.project_id;
379 let worktree = self.state_mut().leave_project(sender_id, project_id);
380 if let Some(worktree) = worktree {
381 broadcast(sender_id, worktree.connection_ids, |conn_id| {
382 self.peer.send(
383 conn_id,
384 proto::RemoveProjectCollaborator {
385 project_id,
386 peer_id: sender_id.0,
387 },
388 )
389 })
390 .await?;
391 self.update_contacts_for_users(&worktree.authorized_user_ids)
392 .await?;
393 }
394 Ok(())
395 }
396
397 async fn register_worktree(
398 mut self: Arc<Server>,
399 request: TypedEnvelope<proto::RegisterWorktree>,
400 ) -> tide::Result<()> {
401 let receipt = request.receipt();
402 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
403
404 let mut contact_user_ids = HashSet::default();
405 contact_user_ids.insert(host_user_id);
406 for github_login in request.payload.authorized_logins {
407 match self.app_state.db.create_user(&github_login, false).await {
408 Ok(contact_user_id) => {
409 contact_user_ids.insert(contact_user_id);
410 }
411 Err(err) => {
412 let message = err.to_string();
413 self.peer
414 .respond_with_error(receipt, proto::Error { message })
415 .await?;
416 return Ok(());
417 }
418 }
419 }
420
421 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
422 let ok = self.state_mut().register_worktree(
423 request.payload.project_id,
424 request.payload.worktree_id,
425 Worktree {
426 authorized_user_ids: contact_user_ids.clone(),
427 root_name: request.payload.root_name,
428 share: None,
429 weak: false,
430 },
431 );
432
433 if ok {
434 self.peer.respond(receipt, proto::Ack {}).await?;
435 self.update_contacts_for_users(&contact_user_ids).await?;
436 } else {
437 self.peer
438 .respond_with_error(
439 receipt,
440 proto::Error {
441 message: NO_SUCH_PROJECT.to_string(),
442 },
443 )
444 .await?;
445 }
446
447 Ok(())
448 }
449
450 async fn unregister_worktree(
451 mut self: Arc<Server>,
452 request: TypedEnvelope<proto::UnregisterWorktree>,
453 ) -> tide::Result<()> {
454 let project_id = request.payload.project_id;
455 let worktree_id = request.payload.worktree_id;
456 let (worktree, guest_connection_ids) =
457 self.state_mut()
458 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
459
460 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
461 self.peer.send(
462 conn_id,
463 proto::UnregisterWorktree {
464 project_id,
465 worktree_id,
466 },
467 )
468 })
469 .await?;
470 self.update_contacts_for_users(&worktree.authorized_user_ids)
471 .await?;
472 Ok(())
473 }
474
475 async fn share_worktree(
476 mut self: Arc<Server>,
477 mut request: TypedEnvelope<proto::ShareWorktree>,
478 ) -> tide::Result<()> {
479 let worktree = request
480 .payload
481 .worktree
482 .as_mut()
483 .ok_or_else(|| anyhow!("missing worktree"))?;
484 let entries = worktree
485 .entries
486 .iter()
487 .map(|entry| (entry.id, entry.clone()))
488 .collect();
489 let diagnostic_summaries = worktree
490 .diagnostic_summaries
491 .iter()
492 .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
493 .collect();
494
495 let shared_worktree = self.state_mut().share_worktree(
496 request.payload.project_id,
497 worktree.id,
498 request.sender_id,
499 entries,
500 diagnostic_summaries,
501 );
502 if let Some(shared_worktree) = shared_worktree {
503 broadcast(
504 request.sender_id,
505 shared_worktree.connection_ids,
506 |connection_id| {
507 self.peer.forward_send(
508 request.sender_id,
509 connection_id,
510 request.payload.clone(),
511 )
512 },
513 )
514 .await?;
515 self.peer.respond(request.receipt(), proto::Ack {}).await?;
516 self.update_contacts_for_users(&shared_worktree.authorized_user_ids)
517 .await?;
518 } else {
519 self.peer
520 .respond_with_error(
521 request.receipt(),
522 proto::Error {
523 message: "no such worktree".to_string(),
524 },
525 )
526 .await?;
527 }
528 Ok(())
529 }
530
531 async fn update_worktree(
532 mut self: Arc<Server>,
533 request: TypedEnvelope<proto::UpdateWorktree>,
534 ) -> tide::Result<()> {
535 let connection_ids = self
536 .state_mut()
537 .update_worktree(
538 request.sender_id,
539 request.payload.project_id,
540 request.payload.worktree_id,
541 &request.payload.removed_entries,
542 &request.payload.updated_entries,
543 )
544 .ok_or_else(|| anyhow!("no such worktree"))?;
545
546 broadcast(request.sender_id, connection_ids, |connection_id| {
547 self.peer
548 .forward_send(request.sender_id, connection_id, request.payload.clone())
549 })
550 .await?;
551
552 Ok(())
553 }
554
555 async fn update_diagnostic_summary(
556 mut self: Arc<Server>,
557 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
558 ) -> tide::Result<()> {
559 let receiver_ids = request
560 .payload
561 .summary
562 .clone()
563 .and_then(|summary| {
564 self.state_mut().update_diagnostic_summary(
565 request.payload.project_id,
566 request.payload.worktree_id,
567 request.sender_id,
568 summary,
569 )
570 })
571 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
572
573 broadcast(request.sender_id, receiver_ids, |connection_id| {
574 self.peer
575 .forward_send(request.sender_id, connection_id, request.payload.clone())
576 })
577 .await?;
578 Ok(())
579 }
580
581 async fn disk_based_diagnostics_updating(
582 self: Arc<Server>,
583 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
584 ) -> tide::Result<()> {
585 let receiver_ids = self
586 .state()
587 .project_connection_ids(request.payload.project_id, request.sender_id)
588 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
589 broadcast(request.sender_id, receiver_ids, |connection_id| {
590 self.peer
591 .forward_send(request.sender_id, connection_id, request.payload.clone())
592 })
593 .await?;
594 Ok(())
595 }
596
597 async fn disk_based_diagnostics_updated(
598 self: Arc<Server>,
599 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
600 ) -> tide::Result<()> {
601 let receiver_ids = self
602 .state()
603 .project_connection_ids(request.payload.project_id, request.sender_id)
604 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
605 broadcast(request.sender_id, receiver_ids, |connection_id| {
606 self.peer
607 .forward_send(request.sender_id, connection_id, request.payload.clone())
608 })
609 .await?;
610 Ok(())
611 }
612
613 async fn get_definition(
614 self: Arc<Server>,
615 request: TypedEnvelope<proto::GetDefinition>,
616 ) -> tide::Result<()> {
617 let receipt = request.receipt();
618 let host_connection_id = self
619 .state()
620 .read_project(request.payload.project_id, request.sender_id)
621 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
622 .host_connection_id;
623 let response = self
624 .peer
625 .forward_request(request.sender_id, host_connection_id, request.payload)
626 .await?;
627 self.peer.respond(receipt, response).await?;
628 Ok(())
629 }
630
631 async fn open_buffer(
632 self: Arc<Server>,
633 request: TypedEnvelope<proto::OpenBuffer>,
634 ) -> tide::Result<()> {
635 let receipt = request.receipt();
636 let host_connection_id = self
637 .state()
638 .read_project(request.payload.project_id, request.sender_id)
639 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
640 .host_connection_id;
641 let response = self
642 .peer
643 .forward_request(request.sender_id, host_connection_id, request.payload)
644 .await?;
645 self.peer.respond(receipt, response).await?;
646 Ok(())
647 }
648
649 async fn close_buffer(
650 self: Arc<Server>,
651 request: TypedEnvelope<proto::CloseBuffer>,
652 ) -> tide::Result<()> {
653 let host_connection_id = self
654 .state()
655 .read_project(request.payload.project_id, request.sender_id)
656 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
657 .host_connection_id;
658 self.peer
659 .forward_send(request.sender_id, host_connection_id, request.payload)
660 .await?;
661 Ok(())
662 }
663
664 async fn save_buffer(
665 self: Arc<Server>,
666 request: TypedEnvelope<proto::SaveBuffer>,
667 ) -> tide::Result<()> {
668 let host;
669 let guests;
670 {
671 let state = self.state();
672 let project = state
673 .read_project(request.payload.project_id, request.sender_id)
674 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
675 host = project.host_connection_id;
676 guests = project.guest_connection_ids()
677 }
678
679 let sender = request.sender_id;
680 let receipt = request.receipt();
681 let response = self
682 .peer
683 .forward_request(sender, host, request.payload.clone())
684 .await?;
685
686 broadcast(host, guests, |conn_id| {
687 let response = response.clone();
688 let peer = &self.peer;
689 async move {
690 if conn_id == sender {
691 peer.respond(receipt, response).await
692 } else {
693 peer.forward_send(host, conn_id, response).await
694 }
695 }
696 })
697 .await?;
698
699 Ok(())
700 }
701
702 async fn format_buffer(
703 self: Arc<Server>,
704 request: TypedEnvelope<proto::FormatBuffer>,
705 ) -> tide::Result<()> {
706 let host;
707 {
708 let state = self.state();
709 let project = state
710 .read_project(request.payload.project_id, request.sender_id)
711 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
712 host = project.host_connection_id;
713 }
714
715 let sender = request.sender_id;
716 let receipt = request.receipt();
717 let response = self
718 .peer
719 .forward_request(sender, host, request.payload.clone())
720 .await?;
721 self.peer.respond(receipt, response).await?;
722
723 Ok(())
724 }
725
726 async fn get_completions(
727 self: Arc<Server>,
728 request: TypedEnvelope<proto::GetCompletions>,
729 ) -> tide::Result<()> {
730 let host;
731 {
732 let state = self.state();
733 let project = state
734 .read_project(request.payload.project_id, request.sender_id)
735 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
736 host = project.host_connection_id;
737 }
738
739 let sender = request.sender_id;
740 let receipt = request.receipt();
741 let response = self
742 .peer
743 .forward_request(sender, host, request.payload.clone())
744 .await?;
745 self.peer.respond(receipt, response).await?;
746
747 Ok(())
748 }
749
750 async fn update_buffer(
751 self: Arc<Server>,
752 request: TypedEnvelope<proto::UpdateBuffer>,
753 ) -> tide::Result<()> {
754 let receiver_ids = self
755 .state()
756 .project_connection_ids(request.payload.project_id, request.sender_id)
757 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
758 broadcast(request.sender_id, receiver_ids, |connection_id| {
759 self.peer
760 .forward_send(request.sender_id, connection_id, request.payload.clone())
761 })
762 .await?;
763 self.peer.respond(request.receipt(), proto::Ack {}).await?;
764 Ok(())
765 }
766
767 async fn update_buffer_file(
768 self: Arc<Server>,
769 request: TypedEnvelope<proto::UpdateBufferFile>,
770 ) -> tide::Result<()> {
771 let receiver_ids = self
772 .state()
773 .project_connection_ids(request.payload.project_id, request.sender_id)
774 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
775 broadcast(request.sender_id, receiver_ids, |connection_id| {
776 self.peer
777 .forward_send(request.sender_id, connection_id, request.payload.clone())
778 })
779 .await?;
780 Ok(())
781 }
782
783 async fn buffer_reloaded(
784 self: Arc<Server>,
785 request: TypedEnvelope<proto::BufferReloaded>,
786 ) -> tide::Result<()> {
787 let receiver_ids = self
788 .state()
789 .project_connection_ids(request.payload.project_id, request.sender_id)
790 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
791 broadcast(request.sender_id, receiver_ids, |connection_id| {
792 self.peer
793 .forward_send(request.sender_id, connection_id, request.payload.clone())
794 })
795 .await?;
796 Ok(())
797 }
798
799 async fn buffer_saved(
800 self: Arc<Server>,
801 request: TypedEnvelope<proto::BufferSaved>,
802 ) -> tide::Result<()> {
803 let receiver_ids = self
804 .state()
805 .project_connection_ids(request.payload.project_id, request.sender_id)
806 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
807 broadcast(request.sender_id, receiver_ids, |connection_id| {
808 self.peer
809 .forward_send(request.sender_id, connection_id, request.payload.clone())
810 })
811 .await?;
812 Ok(())
813 }
814
815 async fn get_channels(
816 self: Arc<Server>,
817 request: TypedEnvelope<proto::GetChannels>,
818 ) -> tide::Result<()> {
819 let user_id = self.state().user_id_for_connection(request.sender_id)?;
820 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
821 self.peer
822 .respond(
823 request.receipt(),
824 proto::GetChannelsResponse {
825 channels: channels
826 .into_iter()
827 .map(|chan| proto::Channel {
828 id: chan.id.to_proto(),
829 name: chan.name,
830 })
831 .collect(),
832 },
833 )
834 .await?;
835 Ok(())
836 }
837
838 async fn get_users(
839 self: Arc<Server>,
840 request: TypedEnvelope<proto::GetUsers>,
841 ) -> tide::Result<()> {
842 let receipt = request.receipt();
843 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
844 let users = self
845 .app_state
846 .db
847 .get_users_by_ids(user_ids)
848 .await?
849 .into_iter()
850 .map(|user| proto::User {
851 id: user.id.to_proto(),
852 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
853 github_login: user.github_login,
854 })
855 .collect();
856 self.peer
857 .respond(receipt, proto::GetUsersResponse { users })
858 .await?;
859 Ok(())
860 }
861
862 async fn update_contacts_for_users<'a>(
863 self: &Arc<Server>,
864 user_ids: impl IntoIterator<Item = &'a UserId>,
865 ) -> tide::Result<()> {
866 let mut send_futures = Vec::new();
867
868 {
869 let state = self.state();
870 for user_id in user_ids {
871 let contacts = state.contacts_for_user(*user_id);
872 for connection_id in state.connection_ids_for_user(*user_id) {
873 send_futures.push(self.peer.send(
874 connection_id,
875 proto::UpdateContacts {
876 contacts: contacts.clone(),
877 },
878 ));
879 }
880 }
881 }
882 futures::future::try_join_all(send_futures).await?;
883
884 Ok(())
885 }
886
887 async fn join_channel(
888 mut self: Arc<Self>,
889 request: TypedEnvelope<proto::JoinChannel>,
890 ) -> tide::Result<()> {
891 let user_id = self.state().user_id_for_connection(request.sender_id)?;
892 let channel_id = ChannelId::from_proto(request.payload.channel_id);
893 if !self
894 .app_state
895 .db
896 .can_user_access_channel(user_id, channel_id)
897 .await?
898 {
899 Err(anyhow!("access denied"))?;
900 }
901
902 self.state_mut().join_channel(request.sender_id, channel_id);
903 let messages = self
904 .app_state
905 .db
906 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
907 .await?
908 .into_iter()
909 .map(|msg| proto::ChannelMessage {
910 id: msg.id.to_proto(),
911 body: msg.body,
912 timestamp: msg.sent_at.unix_timestamp() as u64,
913 sender_id: msg.sender_id.to_proto(),
914 nonce: Some(msg.nonce.as_u128().into()),
915 })
916 .collect::<Vec<_>>();
917 self.peer
918 .respond(
919 request.receipt(),
920 proto::JoinChannelResponse {
921 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
922 messages,
923 },
924 )
925 .await?;
926 Ok(())
927 }
928
929 async fn leave_channel(
930 mut self: Arc<Self>,
931 request: TypedEnvelope<proto::LeaveChannel>,
932 ) -> tide::Result<()> {
933 let user_id = self.state().user_id_for_connection(request.sender_id)?;
934 let channel_id = ChannelId::from_proto(request.payload.channel_id);
935 if !self
936 .app_state
937 .db
938 .can_user_access_channel(user_id, channel_id)
939 .await?
940 {
941 Err(anyhow!("access denied"))?;
942 }
943
944 self.state_mut()
945 .leave_channel(request.sender_id, channel_id);
946
947 Ok(())
948 }
949
950 async fn send_channel_message(
951 self: Arc<Self>,
952 request: TypedEnvelope<proto::SendChannelMessage>,
953 ) -> tide::Result<()> {
954 let receipt = request.receipt();
955 let channel_id = ChannelId::from_proto(request.payload.channel_id);
956 let user_id;
957 let connection_ids;
958 {
959 let state = self.state();
960 user_id = state.user_id_for_connection(request.sender_id)?;
961 if let Some(ids) = state.channel_connection_ids(channel_id) {
962 connection_ids = ids;
963 } else {
964 return Ok(());
965 }
966 }
967
968 // Validate the message body.
969 let body = request.payload.body.trim().to_string();
970 if body.len() > MAX_MESSAGE_LEN {
971 self.peer
972 .respond_with_error(
973 receipt,
974 proto::Error {
975 message: "message is too long".to_string(),
976 },
977 )
978 .await?;
979 return Ok(());
980 }
981 if body.is_empty() {
982 self.peer
983 .respond_with_error(
984 receipt,
985 proto::Error {
986 message: "message can't be blank".to_string(),
987 },
988 )
989 .await?;
990 return Ok(());
991 }
992
993 let timestamp = OffsetDateTime::now_utc();
994 let nonce = if let Some(nonce) = request.payload.nonce {
995 nonce
996 } else {
997 self.peer
998 .respond_with_error(
999 receipt,
1000 proto::Error {
1001 message: "nonce can't be blank".to_string(),
1002 },
1003 )
1004 .await?;
1005 return Ok(());
1006 };
1007
1008 let message_id = self
1009 .app_state
1010 .db
1011 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1012 .await?
1013 .to_proto();
1014 let message = proto::ChannelMessage {
1015 sender_id: user_id.to_proto(),
1016 id: message_id,
1017 body,
1018 timestamp: timestamp.unix_timestamp() as u64,
1019 nonce: Some(nonce),
1020 };
1021 broadcast(request.sender_id, connection_ids, |conn_id| {
1022 self.peer.send(
1023 conn_id,
1024 proto::ChannelMessageSent {
1025 channel_id: channel_id.to_proto(),
1026 message: Some(message.clone()),
1027 },
1028 )
1029 })
1030 .await?;
1031 self.peer
1032 .respond(
1033 receipt,
1034 proto::SendChannelMessageResponse {
1035 message: Some(message),
1036 },
1037 )
1038 .await?;
1039 Ok(())
1040 }
1041
1042 async fn get_channel_messages(
1043 self: Arc<Self>,
1044 request: TypedEnvelope<proto::GetChannelMessages>,
1045 ) -> tide::Result<()> {
1046 let user_id = self.state().user_id_for_connection(request.sender_id)?;
1047 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1048 if !self
1049 .app_state
1050 .db
1051 .can_user_access_channel(user_id, channel_id)
1052 .await?
1053 {
1054 Err(anyhow!("access denied"))?;
1055 }
1056
1057 let messages = self
1058 .app_state
1059 .db
1060 .get_channel_messages(
1061 channel_id,
1062 MESSAGE_COUNT_PER_PAGE,
1063 Some(MessageId::from_proto(request.payload.before_message_id)),
1064 )
1065 .await?
1066 .into_iter()
1067 .map(|msg| proto::ChannelMessage {
1068 id: msg.id.to_proto(),
1069 body: msg.body,
1070 timestamp: msg.sent_at.unix_timestamp() as u64,
1071 sender_id: msg.sender_id.to_proto(),
1072 nonce: Some(msg.nonce.as_u128().into()),
1073 })
1074 .collect::<Vec<_>>();
1075 self.peer
1076 .respond(
1077 request.receipt(),
1078 proto::GetChannelMessagesResponse {
1079 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1080 messages,
1081 },
1082 )
1083 .await?;
1084 Ok(())
1085 }
1086
1087 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1088 self.store.read()
1089 }
1090
1091 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1092 self.store.write()
1093 }
1094}
1095
1096pub async fn broadcast<F, T>(
1097 sender_id: ConnectionId,
1098 receiver_ids: Vec<ConnectionId>,
1099 mut f: F,
1100) -> anyhow::Result<()>
1101where
1102 F: FnMut(ConnectionId) -> T,
1103 T: Future<Output = anyhow::Result<()>>,
1104{
1105 let futures = receiver_ids
1106 .into_iter()
1107 .filter(|id| *id != sender_id)
1108 .map(|id| f(id));
1109 futures::future::try_join_all(futures).await?;
1110 Ok(())
1111}
1112
1113pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1114 let server = Server::new(app.state().clone(), rpc.clone(), None);
1115 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1116 let server = server.clone();
1117 async move {
1118 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1119
1120 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1121 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1122 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1123 let client_protocol_version: Option<u32> = request
1124 .header("X-Zed-Protocol-Version")
1125 .and_then(|v| v.as_str().parse().ok());
1126
1127 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1128 return Ok(Response::new(StatusCode::UpgradeRequired));
1129 }
1130
1131 let header = match request.header("Sec-Websocket-Key") {
1132 Some(h) => h.as_str(),
1133 None => return Err(anyhow!("expected sec-websocket-key"))?,
1134 };
1135
1136 let user_id = process_auth_header(&request).await?;
1137
1138 let mut response = Response::new(StatusCode::SwitchingProtocols);
1139 response.insert_header(UPGRADE, "websocket");
1140 response.insert_header(CONNECTION, "Upgrade");
1141 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1142 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1143 response.insert_header("Sec-Websocket-Version", "13");
1144
1145 let http_res: &mut tide::http::Response = response.as_mut();
1146 let upgrade_receiver = http_res.recv_upgrade().await;
1147 let addr = request.remote().unwrap_or("unknown").to_string();
1148 task::spawn(async move {
1149 if let Some(stream) = upgrade_receiver.await {
1150 server
1151 .handle_connection(
1152 Connection::new(
1153 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1154 ),
1155 addr,
1156 user_id,
1157 None,
1158 )
1159 .await;
1160 }
1161 });
1162
1163 Ok(response)
1164 }
1165 });
1166}
1167
1168fn header_contains_ignore_case<T>(
1169 request: &tide::Request<T>,
1170 header_name: HeaderName,
1171 value: &str,
1172) -> bool {
1173 request
1174 .header(header_name)
1175 .map(|h| {
1176 h.as_str()
1177 .split(',')
1178 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1179 })
1180 .unwrap_or(false)
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185 use super::*;
1186 use crate::{
1187 auth,
1188 db::{tests::TestDb, UserId},
1189 github, AppState, Config,
1190 };
1191 use ::rpc::Peer;
1192 use async_std::task;
1193 use gpui::{executor, ModelHandle, TestAppContext};
1194 use parking_lot::Mutex;
1195 use postage::{mpsc, watch};
1196 use rand::prelude::*;
1197 use rpc::PeerId;
1198 use serde_json::json;
1199 use sqlx::types::time::OffsetDateTime;
1200 use std::{
1201 ops::Deref,
1202 path::Path,
1203 rc::Rc,
1204 sync::{
1205 atomic::{AtomicBool, Ordering::SeqCst},
1206 Arc,
1207 },
1208 time::Duration,
1209 };
1210 use zed::{
1211 client::{
1212 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1213 EstablishConnectionError, UserStore,
1214 },
1215 editor::{Editor, EditorSettings, Input, MultiBuffer},
1216 fs::{FakeFs, Fs as _},
1217 language::{
1218 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1219 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1220 },
1221 lsp,
1222 project::{DiagnosticSummary, Project, ProjectPath},
1223 };
1224
1225 #[gpui::test]
1226 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1227 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1228 let lang_registry = Arc::new(LanguageRegistry::new());
1229 let fs = Arc::new(FakeFs::new(cx_a.background()));
1230 cx_a.foreground().forbid_parking();
1231
1232 // Connect to a server as 2 clients.
1233 let mut server = TestServer::start(cx_a.foreground()).await;
1234 let client_a = server.create_client(&mut cx_a, "user_a").await;
1235 let client_b = server.create_client(&mut cx_b, "user_b").await;
1236
1237 // Share a project as client A
1238 fs.insert_tree(
1239 "/a",
1240 json!({
1241 ".zed.toml": r#"collaborators = ["user_b"]"#,
1242 "a.txt": "a-contents",
1243 "b.txt": "b-contents",
1244 }),
1245 )
1246 .await;
1247 let project_a = cx_a.update(|cx| {
1248 Project::local(
1249 client_a.clone(),
1250 client_a.user_store.clone(),
1251 lang_registry.clone(),
1252 fs.clone(),
1253 cx,
1254 )
1255 });
1256 let (worktree_a, _) = project_a
1257 .update(&mut cx_a, |p, cx| {
1258 p.find_or_create_local_worktree("/a", false, cx)
1259 })
1260 .await
1261 .unwrap();
1262 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1263 worktree_a
1264 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1265 .await;
1266 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1267 project_a
1268 .update(&mut cx_a, |p, cx| p.share(cx))
1269 .await
1270 .unwrap();
1271
1272 // Join that project as client B
1273 let project_b = Project::remote(
1274 project_id,
1275 client_b.clone(),
1276 client_b.user_store.clone(),
1277 lang_registry.clone(),
1278 fs.clone(),
1279 &mut cx_b.to_async(),
1280 )
1281 .await
1282 .unwrap();
1283
1284 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1285 assert_eq!(
1286 project
1287 .collaborators()
1288 .get(&client_a.peer_id)
1289 .unwrap()
1290 .user
1291 .github_login,
1292 "user_a"
1293 );
1294 project.replica_id()
1295 });
1296 project_a
1297 .condition(&cx_a, |tree, _| {
1298 tree.collaborators()
1299 .get(&client_b.peer_id)
1300 .map_or(false, |collaborator| {
1301 collaborator.replica_id == replica_id_b
1302 && collaborator.user.github_login == "user_b"
1303 })
1304 })
1305 .await;
1306
1307 // Open the same file as client B and client A.
1308 let buffer_b = project_b
1309 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1310 .await
1311 .unwrap();
1312 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1313 buffer_b.read_with(&cx_b, |buf, cx| {
1314 assert_eq!(buf.read(cx).text(), "b-contents")
1315 });
1316 project_a.read_with(&cx_a, |project, cx| {
1317 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1318 });
1319 let buffer_a = project_a
1320 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1321 .await
1322 .unwrap();
1323
1324 let editor_b = cx_b.add_view(window_b, |cx| {
1325 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1326 });
1327
1328 // TODO
1329 // // Create a selection set as client B and see that selection set as client A.
1330 // buffer_a
1331 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1332 // .await;
1333
1334 // Edit the buffer as client B and see that edit as client A.
1335 editor_b.update(&mut cx_b, |editor, cx| {
1336 editor.handle_input(&Input("ok, ".into()), cx)
1337 });
1338 buffer_a
1339 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1340 .await;
1341
1342 // TODO
1343 // // Remove the selection set as client B, see those selections disappear as client A.
1344 cx_b.update(move |_| drop(editor_b));
1345 // buffer_a
1346 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1347 // .await;
1348
1349 // Close the buffer as client A, see that the buffer is closed.
1350 cx_a.update(move |_| drop(buffer_a));
1351 project_a
1352 .condition(&cx_a, |project, cx| {
1353 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1354 })
1355 .await;
1356
1357 // Dropping the client B's project removes client B from client A's collaborators.
1358 cx_b.update(move |_| drop(project_b));
1359 project_a
1360 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1361 .await;
1362 }
1363
1364 #[gpui::test]
1365 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1366 let lang_registry = Arc::new(LanguageRegistry::new());
1367 let fs = Arc::new(FakeFs::new(cx_a.background()));
1368 cx_a.foreground().forbid_parking();
1369
1370 // Connect to a server as 2 clients.
1371 let mut server = TestServer::start(cx_a.foreground()).await;
1372 let client_a = server.create_client(&mut cx_a, "user_a").await;
1373 let client_b = server.create_client(&mut cx_b, "user_b").await;
1374
1375 // Share a project as client A
1376 fs.insert_tree(
1377 "/a",
1378 json!({
1379 ".zed.toml": r#"collaborators = ["user_b"]"#,
1380 "a.txt": "a-contents",
1381 "b.txt": "b-contents",
1382 }),
1383 )
1384 .await;
1385 let project_a = cx_a.update(|cx| {
1386 Project::local(
1387 client_a.clone(),
1388 client_a.user_store.clone(),
1389 lang_registry.clone(),
1390 fs.clone(),
1391 cx,
1392 )
1393 });
1394 let (worktree_a, _) = project_a
1395 .update(&mut cx_a, |p, cx| {
1396 p.find_or_create_local_worktree("/a", false, cx)
1397 })
1398 .await
1399 .unwrap();
1400 worktree_a
1401 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1402 .await;
1403 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1404 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1405 project_a
1406 .update(&mut cx_a, |p, cx| p.share(cx))
1407 .await
1408 .unwrap();
1409 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1410
1411 // Join that project as client B
1412 let project_b = Project::remote(
1413 project_id,
1414 client_b.clone(),
1415 client_b.user_store.clone(),
1416 lang_registry.clone(),
1417 fs.clone(),
1418 &mut cx_b.to_async(),
1419 )
1420 .await
1421 .unwrap();
1422 project_b
1423 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1424 .await
1425 .unwrap();
1426
1427 // Unshare the project as client A
1428 project_a
1429 .update(&mut cx_a, |project, cx| project.unshare(cx))
1430 .await
1431 .unwrap();
1432 project_b
1433 .condition(&mut cx_b, |project, _| project.is_read_only())
1434 .await;
1435 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1436 drop(project_b);
1437
1438 // Share the project again and ensure guests can still join.
1439 project_a
1440 .update(&mut cx_a, |project, cx| project.share(cx))
1441 .await
1442 .unwrap();
1443 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1444
1445 let project_c = Project::remote(
1446 project_id,
1447 client_b.clone(),
1448 client_b.user_store.clone(),
1449 lang_registry.clone(),
1450 fs.clone(),
1451 &mut cx_b.to_async(),
1452 )
1453 .await
1454 .unwrap();
1455 project_c
1456 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1457 .await
1458 .unwrap();
1459 }
1460
1461 #[gpui::test]
1462 async fn test_propagate_saves_and_fs_changes(
1463 mut cx_a: TestAppContext,
1464 mut cx_b: TestAppContext,
1465 mut cx_c: TestAppContext,
1466 ) {
1467 let lang_registry = Arc::new(LanguageRegistry::new());
1468 let fs = Arc::new(FakeFs::new(cx_a.background()));
1469 cx_a.foreground().forbid_parking();
1470
1471 // Connect to a server as 3 clients.
1472 let mut server = TestServer::start(cx_a.foreground()).await;
1473 let client_a = server.create_client(&mut cx_a, "user_a").await;
1474 let client_b = server.create_client(&mut cx_b, "user_b").await;
1475 let client_c = server.create_client(&mut cx_c, "user_c").await;
1476
1477 // Share a worktree as client A.
1478 fs.insert_tree(
1479 "/a",
1480 json!({
1481 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1482 "file1": "",
1483 "file2": ""
1484 }),
1485 )
1486 .await;
1487 let project_a = cx_a.update(|cx| {
1488 Project::local(
1489 client_a.clone(),
1490 client_a.user_store.clone(),
1491 lang_registry.clone(),
1492 fs.clone(),
1493 cx,
1494 )
1495 });
1496 let (worktree_a, _) = project_a
1497 .update(&mut cx_a, |p, cx| {
1498 p.find_or_create_local_worktree("/a", false, cx)
1499 })
1500 .await
1501 .unwrap();
1502 worktree_a
1503 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1504 .await;
1505 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1506 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1507 project_a
1508 .update(&mut cx_a, |p, cx| p.share(cx))
1509 .await
1510 .unwrap();
1511
1512 // Join that worktree as clients B and C.
1513 let project_b = Project::remote(
1514 project_id,
1515 client_b.clone(),
1516 client_b.user_store.clone(),
1517 lang_registry.clone(),
1518 fs.clone(),
1519 &mut cx_b.to_async(),
1520 )
1521 .await
1522 .unwrap();
1523 let project_c = Project::remote(
1524 project_id,
1525 client_c.clone(),
1526 client_c.user_store.clone(),
1527 lang_registry.clone(),
1528 fs.clone(),
1529 &mut cx_c.to_async(),
1530 )
1531 .await
1532 .unwrap();
1533 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1534 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1535
1536 // Open and edit a buffer as both guests B and C.
1537 let buffer_b = project_b
1538 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1539 .await
1540 .unwrap();
1541 let buffer_c = project_c
1542 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1543 .await
1544 .unwrap();
1545 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1546 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1547
1548 // Open and edit that buffer as the host.
1549 let buffer_a = project_a
1550 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1551 .await
1552 .unwrap();
1553
1554 buffer_a
1555 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1556 .await;
1557 buffer_a.update(&mut cx_a, |buf, cx| {
1558 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1559 });
1560
1561 // Wait for edits to propagate
1562 buffer_a
1563 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1564 .await;
1565 buffer_b
1566 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1567 .await;
1568 buffer_c
1569 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1570 .await;
1571
1572 // Edit the buffer as the host and concurrently save as guest B.
1573 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1574 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1575 save_b.await.unwrap();
1576 assert_eq!(
1577 fs.load("/a/file1".as_ref()).await.unwrap(),
1578 "hi-a, i-am-c, i-am-b, i-am-a"
1579 );
1580 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1581 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1582 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1583
1584 // Make changes on host's file system, see those changes on guest worktrees.
1585 fs.rename("/a/file1".as_ref(), "/a/file1-renamed".as_ref())
1586 .await
1587 .unwrap();
1588 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1589 .await
1590 .unwrap();
1591 fs.insert_file(Path::new("/a/file4"), "4".into())
1592 .await
1593 .unwrap();
1594
1595 worktree_a
1596 .condition(&cx_a, |tree, _| tree.file_count() == 4)
1597 .await;
1598 worktree_b
1599 .condition(&cx_b, |tree, _| tree.file_count() == 4)
1600 .await;
1601 worktree_c
1602 .condition(&cx_c, |tree, _| tree.file_count() == 4)
1603 .await;
1604 worktree_a.read_with(&cx_a, |tree, _| {
1605 assert_eq!(
1606 tree.paths()
1607 .map(|p| p.to_string_lossy())
1608 .collect::<Vec<_>>(),
1609 &[".zed.toml", "file1-renamed", "file3", "file4"]
1610 )
1611 });
1612 worktree_b.read_with(&cx_b, |tree, _| {
1613 assert_eq!(
1614 tree.paths()
1615 .map(|p| p.to_string_lossy())
1616 .collect::<Vec<_>>(),
1617 &[".zed.toml", "file1-renamed", "file3", "file4"]
1618 )
1619 });
1620 worktree_c.read_with(&cx_c, |tree, _| {
1621 assert_eq!(
1622 tree.paths()
1623 .map(|p| p.to_string_lossy())
1624 .collect::<Vec<_>>(),
1625 &[".zed.toml", "file1-renamed", "file3", "file4"]
1626 )
1627 });
1628
1629 // Ensure buffer files are updated as well.
1630 buffer_a
1631 .condition(&cx_a, |buf, _| {
1632 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1633 })
1634 .await;
1635 buffer_b
1636 .condition(&cx_b, |buf, _| {
1637 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1638 })
1639 .await;
1640 buffer_c
1641 .condition(&cx_c, |buf, _| {
1642 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1643 })
1644 .await;
1645 }
1646
1647 #[gpui::test]
1648 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1649 cx_a.foreground().forbid_parking();
1650 let lang_registry = Arc::new(LanguageRegistry::new());
1651 let fs = Arc::new(FakeFs::new(cx_a.background()));
1652
1653 // Connect to a server as 2 clients.
1654 let mut server = TestServer::start(cx_a.foreground()).await;
1655 let client_a = server.create_client(&mut cx_a, "user_a").await;
1656 let client_b = server.create_client(&mut cx_b, "user_b").await;
1657
1658 // Share a project as client A
1659 fs.insert_tree(
1660 "/dir",
1661 json!({
1662 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1663 "a.txt": "a-contents",
1664 }),
1665 )
1666 .await;
1667
1668 let project_a = cx_a.update(|cx| {
1669 Project::local(
1670 client_a.clone(),
1671 client_a.user_store.clone(),
1672 lang_registry.clone(),
1673 fs.clone(),
1674 cx,
1675 )
1676 });
1677 let (worktree_a, _) = project_a
1678 .update(&mut cx_a, |p, cx| {
1679 p.find_or_create_local_worktree("/dir", false, cx)
1680 })
1681 .await
1682 .unwrap();
1683 worktree_a
1684 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1685 .await;
1686 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1687 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1688 project_a
1689 .update(&mut cx_a, |p, cx| p.share(cx))
1690 .await
1691 .unwrap();
1692
1693 // Join that project as client B
1694 let project_b = Project::remote(
1695 project_id,
1696 client_b.clone(),
1697 client_b.user_store.clone(),
1698 lang_registry.clone(),
1699 fs.clone(),
1700 &mut cx_b.to_async(),
1701 )
1702 .await
1703 .unwrap();
1704 let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1705
1706 // Open a buffer as client B
1707 let buffer_b = project_b
1708 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1709 .await
1710 .unwrap();
1711 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1712
1713 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1714 buffer_b.read_with(&cx_b, |buf, _| {
1715 assert!(buf.is_dirty());
1716 assert!(!buf.has_conflict());
1717 });
1718
1719 buffer_b
1720 .update(&mut cx_b, |buf, cx| buf.save(cx))
1721 .await
1722 .unwrap();
1723 worktree_b
1724 .condition(&cx_b, |_, cx| {
1725 buffer_b.read(cx).file().unwrap().mtime() != mtime
1726 })
1727 .await;
1728 buffer_b.read_with(&cx_b, |buf, _| {
1729 assert!(!buf.is_dirty());
1730 assert!(!buf.has_conflict());
1731 });
1732
1733 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1734 buffer_b.read_with(&cx_b, |buf, _| {
1735 assert!(buf.is_dirty());
1736 assert!(!buf.has_conflict());
1737 });
1738 }
1739
1740 #[gpui::test]
1741 async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1742 cx_a.foreground().forbid_parking();
1743 let lang_registry = Arc::new(LanguageRegistry::new());
1744 let fs = Arc::new(FakeFs::new(cx_a.background()));
1745
1746 // Connect to a server as 2 clients.
1747 let mut server = TestServer::start(cx_a.foreground()).await;
1748 let client_a = server.create_client(&mut cx_a, "user_a").await;
1749 let client_b = server.create_client(&mut cx_b, "user_b").await;
1750
1751 // Share a project as client A
1752 fs.insert_tree(
1753 "/dir",
1754 json!({
1755 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1756 "a.txt": "a-contents",
1757 }),
1758 )
1759 .await;
1760
1761 let project_a = cx_a.update(|cx| {
1762 Project::local(
1763 client_a.clone(),
1764 client_a.user_store.clone(),
1765 lang_registry.clone(),
1766 fs.clone(),
1767 cx,
1768 )
1769 });
1770 let (worktree_a, _) = project_a
1771 .update(&mut cx_a, |p, cx| {
1772 p.find_or_create_local_worktree("/dir", false, cx)
1773 })
1774 .await
1775 .unwrap();
1776 worktree_a
1777 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1778 .await;
1779 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1780 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1781 project_a
1782 .update(&mut cx_a, |p, cx| p.share(cx))
1783 .await
1784 .unwrap();
1785
1786 // Join that project as client B
1787 let project_b = Project::remote(
1788 project_id,
1789 client_b.clone(),
1790 client_b.user_store.clone(),
1791 lang_registry.clone(),
1792 fs.clone(),
1793 &mut cx_b.to_async(),
1794 )
1795 .await
1796 .unwrap();
1797 let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1798
1799 // Open a buffer as client B
1800 let buffer_b = project_b
1801 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1802 .await
1803 .unwrap();
1804 buffer_b.read_with(&cx_b, |buf, _| {
1805 assert!(!buf.is_dirty());
1806 assert!(!buf.has_conflict());
1807 });
1808
1809 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1810 .await
1811 .unwrap();
1812 buffer_b
1813 .condition(&cx_b, |buf, _| {
1814 buf.text() == "new contents" && !buf.is_dirty()
1815 })
1816 .await;
1817 buffer_b.read_with(&cx_b, |buf, _| {
1818 assert!(!buf.has_conflict());
1819 });
1820 }
1821
1822 #[gpui::test(iterations = 100)]
1823 async fn test_editing_while_guest_opens_buffer(
1824 mut cx_a: TestAppContext,
1825 mut cx_b: TestAppContext,
1826 ) {
1827 cx_a.foreground().forbid_parking();
1828 let lang_registry = Arc::new(LanguageRegistry::new());
1829 let fs = Arc::new(FakeFs::new(cx_a.background()));
1830
1831 // Connect to a server as 2 clients.
1832 let mut server = TestServer::start(cx_a.foreground()).await;
1833 let client_a = server.create_client(&mut cx_a, "user_a").await;
1834 let client_b = server.create_client(&mut cx_b, "user_b").await;
1835
1836 // Share a project as client A
1837 fs.insert_tree(
1838 "/dir",
1839 json!({
1840 ".zed.toml": r#"collaborators = ["user_b"]"#,
1841 "a.txt": "a-contents",
1842 }),
1843 )
1844 .await;
1845 let project_a = cx_a.update(|cx| {
1846 Project::local(
1847 client_a.clone(),
1848 client_a.user_store.clone(),
1849 lang_registry.clone(),
1850 fs.clone(),
1851 cx,
1852 )
1853 });
1854 let (worktree_a, _) = project_a
1855 .update(&mut cx_a, |p, cx| {
1856 p.find_or_create_local_worktree("/dir", false, cx)
1857 })
1858 .await
1859 .unwrap();
1860 worktree_a
1861 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1862 .await;
1863 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1864 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1865 project_a
1866 .update(&mut cx_a, |p, cx| p.share(cx))
1867 .await
1868 .unwrap();
1869
1870 // Join that project as client B
1871 let project_b = Project::remote(
1872 project_id,
1873 client_b.clone(),
1874 client_b.user_store.clone(),
1875 lang_registry.clone(),
1876 fs.clone(),
1877 &mut cx_b.to_async(),
1878 )
1879 .await
1880 .unwrap();
1881
1882 // Open a buffer as client A
1883 let buffer_a = project_a
1884 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1885 .await
1886 .unwrap();
1887
1888 // Start opening the same buffer as client B
1889 let buffer_b = cx_b
1890 .background()
1891 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1892 task::yield_now().await;
1893
1894 // Edit the buffer as client A while client B is still opening it.
1895 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1896
1897 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1898 let buffer_b = buffer_b.await.unwrap();
1899 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1900 }
1901
1902 #[gpui::test]
1903 async fn test_leaving_worktree_while_opening_buffer(
1904 mut cx_a: TestAppContext,
1905 mut cx_b: TestAppContext,
1906 ) {
1907 cx_a.foreground().forbid_parking();
1908 let lang_registry = Arc::new(LanguageRegistry::new());
1909 let fs = Arc::new(FakeFs::new(cx_a.background()));
1910
1911 // Connect to a server as 2 clients.
1912 let mut server = TestServer::start(cx_a.foreground()).await;
1913 let client_a = server.create_client(&mut cx_a, "user_a").await;
1914 let client_b = server.create_client(&mut cx_b, "user_b").await;
1915
1916 // Share a project as client A
1917 fs.insert_tree(
1918 "/dir",
1919 json!({
1920 ".zed.toml": r#"collaborators = ["user_b"]"#,
1921 "a.txt": "a-contents",
1922 }),
1923 )
1924 .await;
1925 let project_a = cx_a.update(|cx| {
1926 Project::local(
1927 client_a.clone(),
1928 client_a.user_store.clone(),
1929 lang_registry.clone(),
1930 fs.clone(),
1931 cx,
1932 )
1933 });
1934 let (worktree_a, _) = project_a
1935 .update(&mut cx_a, |p, cx| {
1936 p.find_or_create_local_worktree("/dir", false, cx)
1937 })
1938 .await
1939 .unwrap();
1940 worktree_a
1941 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1942 .await;
1943 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1944 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1945 project_a
1946 .update(&mut cx_a, |p, cx| p.share(cx))
1947 .await
1948 .unwrap();
1949
1950 // Join that project as client B
1951 let project_b = Project::remote(
1952 project_id,
1953 client_b.clone(),
1954 client_b.user_store.clone(),
1955 lang_registry.clone(),
1956 fs.clone(),
1957 &mut cx_b.to_async(),
1958 )
1959 .await
1960 .unwrap();
1961
1962 // See that a guest has joined as client A.
1963 project_a
1964 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1965 .await;
1966
1967 // Begin opening a buffer as client B, but leave the project before the open completes.
1968 let buffer_b = cx_b
1969 .background()
1970 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1971 cx_b.update(|_| drop(project_b));
1972 drop(buffer_b);
1973
1974 // See that the guest has left.
1975 project_a
1976 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1977 .await;
1978 }
1979
1980 #[gpui::test]
1981 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1982 cx_a.foreground().forbid_parking();
1983 let lang_registry = Arc::new(LanguageRegistry::new());
1984 let fs = Arc::new(FakeFs::new(cx_a.background()));
1985
1986 // Connect to a server as 2 clients.
1987 let mut server = TestServer::start(cx_a.foreground()).await;
1988 let client_a = server.create_client(&mut cx_a, "user_a").await;
1989 let client_b = server.create_client(&mut cx_b, "user_b").await;
1990
1991 // Share a project as client A
1992 fs.insert_tree(
1993 "/a",
1994 json!({
1995 ".zed.toml": r#"collaborators = ["user_b"]"#,
1996 "a.txt": "a-contents",
1997 "b.txt": "b-contents",
1998 }),
1999 )
2000 .await;
2001 let project_a = cx_a.update(|cx| {
2002 Project::local(
2003 client_a.clone(),
2004 client_a.user_store.clone(),
2005 lang_registry.clone(),
2006 fs.clone(),
2007 cx,
2008 )
2009 });
2010 let (worktree_a, _) = project_a
2011 .update(&mut cx_a, |p, cx| {
2012 p.find_or_create_local_worktree("/a", false, cx)
2013 })
2014 .await
2015 .unwrap();
2016 worktree_a
2017 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2018 .await;
2019 let project_id = project_a
2020 .update(&mut cx_a, |project, _| project.next_remote_id())
2021 .await;
2022 project_a
2023 .update(&mut cx_a, |project, cx| project.share(cx))
2024 .await
2025 .unwrap();
2026
2027 // Join that project as client B
2028 let _project_b = Project::remote(
2029 project_id,
2030 client_b.clone(),
2031 client_b.user_store.clone(),
2032 lang_registry.clone(),
2033 fs.clone(),
2034 &mut cx_b.to_async(),
2035 )
2036 .await
2037 .unwrap();
2038
2039 // See that a guest has joined as client A.
2040 project_a
2041 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2042 .await;
2043
2044 // Drop client B's connection and ensure client A observes client B leaving the worktree.
2045 client_b.disconnect(&cx_b.to_async()).unwrap();
2046 project_a
2047 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2048 .await;
2049 }
2050
2051 #[gpui::test]
2052 async fn test_collaborating_with_diagnostics(
2053 mut cx_a: TestAppContext,
2054 mut cx_b: TestAppContext,
2055 ) {
2056 cx_a.foreground().forbid_parking();
2057 let mut lang_registry = Arc::new(LanguageRegistry::new());
2058 let fs = Arc::new(FakeFs::new(cx_a.background()));
2059
2060 // Set up a fake language server.
2061 let (language_server_config, mut fake_language_server) =
2062 LanguageServerConfig::fake(cx_a.background()).await;
2063 Arc::get_mut(&mut lang_registry)
2064 .unwrap()
2065 .add(Arc::new(Language::new(
2066 LanguageConfig {
2067 name: "Rust".to_string(),
2068 path_suffixes: vec!["rs".to_string()],
2069 language_server: Some(language_server_config),
2070 ..Default::default()
2071 },
2072 Some(tree_sitter_rust::language()),
2073 )));
2074
2075 // Connect to a server as 2 clients.
2076 let mut server = TestServer::start(cx_a.foreground()).await;
2077 let client_a = server.create_client(&mut cx_a, "user_a").await;
2078 let client_b = server.create_client(&mut cx_b, "user_b").await;
2079
2080 // Share a project as client A
2081 fs.insert_tree(
2082 "/a",
2083 json!({
2084 ".zed.toml": r#"collaborators = ["user_b"]"#,
2085 "a.rs": "let one = two",
2086 "other.rs": "",
2087 }),
2088 )
2089 .await;
2090 let project_a = cx_a.update(|cx| {
2091 Project::local(
2092 client_a.clone(),
2093 client_a.user_store.clone(),
2094 lang_registry.clone(),
2095 fs.clone(),
2096 cx,
2097 )
2098 });
2099 let (worktree_a, _) = project_a
2100 .update(&mut cx_a, |p, cx| {
2101 p.find_or_create_local_worktree("/a", false, cx)
2102 })
2103 .await
2104 .unwrap();
2105 worktree_a
2106 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2107 .await;
2108 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2109 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2110 project_a
2111 .update(&mut cx_a, |p, cx| p.share(cx))
2112 .await
2113 .unwrap();
2114
2115 // Cause the language server to start.
2116 let _ = cx_a
2117 .background()
2118 .spawn(project_a.update(&mut cx_a, |project, cx| {
2119 project.open_buffer(
2120 ProjectPath {
2121 worktree_id,
2122 path: Path::new("other.rs").into(),
2123 },
2124 cx,
2125 )
2126 }))
2127 .await
2128 .unwrap();
2129
2130 // Simulate a language server reporting errors for a file.
2131 fake_language_server
2132 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2133 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2134 version: None,
2135 diagnostics: vec![lsp::Diagnostic {
2136 severity: Some(lsp::DiagnosticSeverity::ERROR),
2137 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2138 message: "message 1".to_string(),
2139 ..Default::default()
2140 }],
2141 })
2142 .await;
2143
2144 // Wait for server to see the diagnostics update.
2145 server
2146 .condition(|store| {
2147 let worktree = store
2148 .project(project_id)
2149 .unwrap()
2150 .worktrees
2151 .get(&worktree_id.to_proto())
2152 .unwrap();
2153
2154 !worktree
2155 .share
2156 .as_ref()
2157 .unwrap()
2158 .diagnostic_summaries
2159 .is_empty()
2160 })
2161 .await;
2162
2163 // Join the worktree as client B.
2164 let project_b = Project::remote(
2165 project_id,
2166 client_b.clone(),
2167 client_b.user_store.clone(),
2168 lang_registry.clone(),
2169 fs.clone(),
2170 &mut cx_b.to_async(),
2171 )
2172 .await
2173 .unwrap();
2174
2175 project_b.read_with(&cx_b, |project, cx| {
2176 assert_eq!(
2177 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2178 &[(
2179 ProjectPath {
2180 worktree_id,
2181 path: Arc::from(Path::new("a.rs")),
2182 },
2183 DiagnosticSummary {
2184 error_count: 1,
2185 warning_count: 0,
2186 ..Default::default()
2187 },
2188 )]
2189 )
2190 });
2191
2192 // Simulate a language server reporting more errors for a file.
2193 fake_language_server
2194 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2195 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2196 version: None,
2197 diagnostics: vec![
2198 lsp::Diagnostic {
2199 severity: Some(lsp::DiagnosticSeverity::ERROR),
2200 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2201 message: "message 1".to_string(),
2202 ..Default::default()
2203 },
2204 lsp::Diagnostic {
2205 severity: Some(lsp::DiagnosticSeverity::WARNING),
2206 range: lsp::Range::new(
2207 lsp::Position::new(0, 10),
2208 lsp::Position::new(0, 13),
2209 ),
2210 message: "message 2".to_string(),
2211 ..Default::default()
2212 },
2213 ],
2214 })
2215 .await;
2216
2217 // Client b gets the updated summaries
2218 project_b
2219 .condition(&cx_b, |project, cx| {
2220 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2221 == &[(
2222 ProjectPath {
2223 worktree_id,
2224 path: Arc::from(Path::new("a.rs")),
2225 },
2226 DiagnosticSummary {
2227 error_count: 1,
2228 warning_count: 1,
2229 ..Default::default()
2230 },
2231 )]
2232 })
2233 .await;
2234
2235 // Open the file with the errors on client B. They should be present.
2236 let buffer_b = cx_b
2237 .background()
2238 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2239 .await
2240 .unwrap();
2241
2242 buffer_b.read_with(&cx_b, |buffer, _| {
2243 assert_eq!(
2244 buffer
2245 .snapshot()
2246 .diagnostics_in_range::<_, Point>(0..buffer.len())
2247 .map(|entry| entry)
2248 .collect::<Vec<_>>(),
2249 &[
2250 DiagnosticEntry {
2251 range: Point::new(0, 4)..Point::new(0, 7),
2252 diagnostic: Diagnostic {
2253 group_id: 0,
2254 message: "message 1".to_string(),
2255 severity: lsp::DiagnosticSeverity::ERROR,
2256 is_primary: true,
2257 ..Default::default()
2258 }
2259 },
2260 DiagnosticEntry {
2261 range: Point::new(0, 10)..Point::new(0, 13),
2262 diagnostic: Diagnostic {
2263 group_id: 1,
2264 severity: lsp::DiagnosticSeverity::WARNING,
2265 message: "message 2".to_string(),
2266 is_primary: true,
2267 ..Default::default()
2268 }
2269 }
2270 ]
2271 );
2272 });
2273 }
2274
2275 #[gpui::test]
2276 async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2277 cx_a.foreground().forbid_parking();
2278 let mut lang_registry = Arc::new(LanguageRegistry::new());
2279 let fs = Arc::new(FakeFs::new(cx_a.background()));
2280
2281 // Set up a fake language server.
2282 let (language_server_config, mut fake_language_server) =
2283 LanguageServerConfig::fake(cx_a.background()).await;
2284 Arc::get_mut(&mut lang_registry)
2285 .unwrap()
2286 .add(Arc::new(Language::new(
2287 LanguageConfig {
2288 name: "Rust".to_string(),
2289 path_suffixes: vec!["rs".to_string()],
2290 language_server: Some(language_server_config),
2291 ..Default::default()
2292 },
2293 Some(tree_sitter_rust::language()),
2294 )));
2295
2296 // Connect to a server as 2 clients.
2297 let mut server = TestServer::start(cx_a.foreground()).await;
2298 let client_a = server.create_client(&mut cx_a, "user_a").await;
2299 let client_b = server.create_client(&mut cx_b, "user_b").await;
2300
2301 // Share a project as client A
2302 fs.insert_tree(
2303 "/a",
2304 json!({
2305 ".zed.toml": r#"collaborators = ["user_b"]"#,
2306 "a.rs": "let one = two",
2307 }),
2308 )
2309 .await;
2310 let project_a = cx_a.update(|cx| {
2311 Project::local(
2312 client_a.clone(),
2313 client_a.user_store.clone(),
2314 lang_registry.clone(),
2315 fs.clone(),
2316 cx,
2317 )
2318 });
2319 let (worktree_a, _) = project_a
2320 .update(&mut cx_a, |p, cx| {
2321 p.find_or_create_local_worktree("/a", false, cx)
2322 })
2323 .await
2324 .unwrap();
2325 worktree_a
2326 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2327 .await;
2328 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2329 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2330 project_a
2331 .update(&mut cx_a, |p, cx| p.share(cx))
2332 .await
2333 .unwrap();
2334
2335 // Join the worktree as client B.
2336 let project_b = Project::remote(
2337 project_id,
2338 client_b.clone(),
2339 client_b.user_store.clone(),
2340 lang_registry.clone(),
2341 fs.clone(),
2342 &mut cx_b.to_async(),
2343 )
2344 .await
2345 .unwrap();
2346
2347 let buffer_b = cx_b
2348 .background()
2349 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2350 .await
2351 .unwrap();
2352
2353 let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
2354 let (request_id, _) = fake_language_server
2355 .receive_request::<lsp::request::Formatting>()
2356 .await;
2357 fake_language_server
2358 .respond(
2359 request_id,
2360 Some(vec![
2361 lsp::TextEdit {
2362 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2363 new_text: "h".to_string(),
2364 },
2365 lsp::TextEdit {
2366 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2367 new_text: "y".to_string(),
2368 },
2369 ]),
2370 )
2371 .await;
2372 format.await.unwrap();
2373 assert_eq!(
2374 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2375 "let honey = two"
2376 );
2377 }
2378
2379 #[gpui::test]
2380 async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2381 cx_a.foreground().forbid_parking();
2382 let mut lang_registry = Arc::new(LanguageRegistry::new());
2383 let fs = Arc::new(FakeFs::new(cx_a.background()));
2384 fs.insert_tree(
2385 "/root-1",
2386 json!({
2387 ".zed.toml": r#"collaborators = ["user_b"]"#,
2388 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2389 }),
2390 )
2391 .await;
2392 fs.insert_tree(
2393 "/root-2",
2394 json!({
2395 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2396 }),
2397 )
2398 .await;
2399
2400 // Set up a fake language server.
2401 let (language_server_config, mut fake_language_server) =
2402 LanguageServerConfig::fake(cx_a.background()).await;
2403 Arc::get_mut(&mut lang_registry)
2404 .unwrap()
2405 .add(Arc::new(Language::new(
2406 LanguageConfig {
2407 name: "Rust".to_string(),
2408 path_suffixes: vec!["rs".to_string()],
2409 language_server: Some(language_server_config),
2410 ..Default::default()
2411 },
2412 Some(tree_sitter_rust::language()),
2413 )));
2414
2415 // Connect to a server as 2 clients.
2416 let mut server = TestServer::start(cx_a.foreground()).await;
2417 let client_a = server.create_client(&mut cx_a, "user_a").await;
2418 let client_b = server.create_client(&mut cx_b, "user_b").await;
2419
2420 // Share a project as client A
2421 let project_a = cx_a.update(|cx| {
2422 Project::local(
2423 client_a.clone(),
2424 client_a.user_store.clone(),
2425 lang_registry.clone(),
2426 fs.clone(),
2427 cx,
2428 )
2429 });
2430 let (worktree_a, _) = project_a
2431 .update(&mut cx_a, |p, cx| {
2432 p.find_or_create_local_worktree("/root-1", false, cx)
2433 })
2434 .await
2435 .unwrap();
2436 worktree_a
2437 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2438 .await;
2439 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2440 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2441 project_a
2442 .update(&mut cx_a, |p, cx| p.share(cx))
2443 .await
2444 .unwrap();
2445
2446 // Join the worktree as client B.
2447 let project_b = Project::remote(
2448 project_id,
2449 client_b.clone(),
2450 client_b.user_store.clone(),
2451 lang_registry.clone(),
2452 fs.clone(),
2453 &mut cx_b.to_async(),
2454 )
2455 .await
2456 .unwrap();
2457
2458 // Open the file to be formatted on client B.
2459 let buffer_b = cx_b
2460 .background()
2461 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2462 .await
2463 .unwrap();
2464
2465 let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2466 let (request_id, _) = fake_language_server
2467 .receive_request::<lsp::request::GotoDefinition>()
2468 .await;
2469 fake_language_server
2470 .respond(
2471 request_id,
2472 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2473 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2474 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2475 ))),
2476 )
2477 .await;
2478 let definitions_1 = definitions_1.await.unwrap();
2479 cx_b.read(|cx| {
2480 assert_eq!(definitions_1.len(), 1);
2481 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2482 let target_buffer = definitions_1[0].target_buffer.read(cx);
2483 assert_eq!(
2484 target_buffer.text(),
2485 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2486 );
2487 assert_eq!(
2488 definitions_1[0].target_range.to_point(target_buffer),
2489 Point::new(0, 6)..Point::new(0, 9)
2490 );
2491 });
2492
2493 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2494 // the previous call to `definition`.
2495 let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2496 let (request_id, _) = fake_language_server
2497 .receive_request::<lsp::request::GotoDefinition>()
2498 .await;
2499 fake_language_server
2500 .respond(
2501 request_id,
2502 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2503 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2504 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2505 ))),
2506 )
2507 .await;
2508 let definitions_2 = definitions_2.await.unwrap();
2509 cx_b.read(|cx| {
2510 assert_eq!(definitions_2.len(), 1);
2511 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2512 let target_buffer = definitions_2[0].target_buffer.read(cx);
2513 assert_eq!(
2514 target_buffer.text(),
2515 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2516 );
2517 assert_eq!(
2518 definitions_2[0].target_range.to_point(target_buffer),
2519 Point::new(1, 6)..Point::new(1, 11)
2520 );
2521 });
2522 assert_eq!(
2523 definitions_1[0].target_buffer,
2524 definitions_2[0].target_buffer
2525 );
2526
2527 cx_b.update(|_| {
2528 drop(definitions_1);
2529 drop(definitions_2);
2530 });
2531 project_b
2532 .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2533 .await;
2534 }
2535
2536 #[gpui::test]
2537 async fn test_open_buffer_while_getting_definition_pointing_to_it(
2538 mut cx_a: TestAppContext,
2539 mut cx_b: TestAppContext,
2540 mut rng: StdRng,
2541 ) {
2542 cx_a.foreground().forbid_parking();
2543 let mut lang_registry = Arc::new(LanguageRegistry::new());
2544 let fs = Arc::new(FakeFs::new(cx_a.background()));
2545 fs.insert_tree(
2546 "/root",
2547 json!({
2548 ".zed.toml": r#"collaborators = ["user_b"]"#,
2549 "a.rs": "const ONE: usize = b::TWO;",
2550 "b.rs": "const TWO: usize = 2",
2551 }),
2552 )
2553 .await;
2554
2555 // Set up a fake language server.
2556 let (language_server_config, mut fake_language_server) =
2557 LanguageServerConfig::fake(cx_a.background()).await;
2558 Arc::get_mut(&mut lang_registry)
2559 .unwrap()
2560 .add(Arc::new(Language::new(
2561 LanguageConfig {
2562 name: "Rust".to_string(),
2563 path_suffixes: vec!["rs".to_string()],
2564 language_server: Some(language_server_config),
2565 ..Default::default()
2566 },
2567 Some(tree_sitter_rust::language()),
2568 )));
2569
2570 // Connect to a server as 2 clients.
2571 let mut server = TestServer::start(cx_a.foreground()).await;
2572 let client_a = server.create_client(&mut cx_a, "user_a").await;
2573 let client_b = server.create_client(&mut cx_b, "user_b").await;
2574
2575 // Share a project as client A
2576 let project_a = cx_a.update(|cx| {
2577 Project::local(
2578 client_a.clone(),
2579 client_a.user_store.clone(),
2580 lang_registry.clone(),
2581 fs.clone(),
2582 cx,
2583 )
2584 });
2585 let (worktree_a, _) = project_a
2586 .update(&mut cx_a, |p, cx| {
2587 p.find_or_create_local_worktree("/root", false, cx)
2588 })
2589 .await
2590 .unwrap();
2591 worktree_a
2592 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2593 .await;
2594 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2595 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2596 project_a
2597 .update(&mut cx_a, |p, cx| p.share(cx))
2598 .await
2599 .unwrap();
2600
2601 // Join the worktree as client B.
2602 let project_b = Project::remote(
2603 project_id,
2604 client_b.clone(),
2605 client_b.user_store.clone(),
2606 lang_registry.clone(),
2607 fs.clone(),
2608 &mut cx_b.to_async(),
2609 )
2610 .await
2611 .unwrap();
2612
2613 let buffer_b1 = cx_b
2614 .background()
2615 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2616 .await
2617 .unwrap();
2618
2619 let definitions;
2620 let buffer_b2;
2621 if rng.gen() {
2622 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2623 buffer_b2 =
2624 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2625 } else {
2626 buffer_b2 =
2627 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2628 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2629 }
2630
2631 let (request_id, _) = fake_language_server
2632 .receive_request::<lsp::request::GotoDefinition>()
2633 .await;
2634 fake_language_server
2635 .respond(
2636 request_id,
2637 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2638 lsp::Url::from_file_path("/root/b.rs").unwrap(),
2639 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2640 ))),
2641 )
2642 .await;
2643
2644 let buffer_b2 = buffer_b2.await.unwrap();
2645 let definitions = definitions.await.unwrap();
2646 assert_eq!(definitions.len(), 1);
2647 assert_eq!(definitions[0].target_buffer, buffer_b2);
2648 }
2649
2650 #[gpui::test]
2651 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2652 cx_a.foreground().forbid_parking();
2653
2654 // Connect to a server as 2 clients.
2655 let mut server = TestServer::start(cx_a.foreground()).await;
2656 let client_a = server.create_client(&mut cx_a, "user_a").await;
2657 let client_b = server.create_client(&mut cx_b, "user_b").await;
2658
2659 // Create an org that includes these 2 users.
2660 let db = &server.app_state.db;
2661 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2662 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2663 .await
2664 .unwrap();
2665 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2666 .await
2667 .unwrap();
2668
2669 // Create a channel that includes all the users.
2670 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2671 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2672 .await
2673 .unwrap();
2674 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2675 .await
2676 .unwrap();
2677 db.create_channel_message(
2678 channel_id,
2679 client_b.current_user_id(&cx_b),
2680 "hello A, it's B.",
2681 OffsetDateTime::now_utc(),
2682 1,
2683 )
2684 .await
2685 .unwrap();
2686
2687 let channels_a = cx_a
2688 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2689 channels_a
2690 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2691 .await;
2692 channels_a.read_with(&cx_a, |list, _| {
2693 assert_eq!(
2694 list.available_channels().unwrap(),
2695 &[ChannelDetails {
2696 id: channel_id.to_proto(),
2697 name: "test-channel".to_string()
2698 }]
2699 )
2700 });
2701 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2702 this.get_channel(channel_id.to_proto(), cx).unwrap()
2703 });
2704 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2705 channel_a
2706 .condition(&cx_a, |channel, _| {
2707 channel_messages(channel)
2708 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2709 })
2710 .await;
2711
2712 let channels_b = cx_b
2713 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2714 channels_b
2715 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2716 .await;
2717 channels_b.read_with(&cx_b, |list, _| {
2718 assert_eq!(
2719 list.available_channels().unwrap(),
2720 &[ChannelDetails {
2721 id: channel_id.to_proto(),
2722 name: "test-channel".to_string()
2723 }]
2724 )
2725 });
2726
2727 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2728 this.get_channel(channel_id.to_proto(), cx).unwrap()
2729 });
2730 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2731 channel_b
2732 .condition(&cx_b, |channel, _| {
2733 channel_messages(channel)
2734 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2735 })
2736 .await;
2737
2738 channel_a
2739 .update(&mut cx_a, |channel, cx| {
2740 channel
2741 .send_message("oh, hi B.".to_string(), cx)
2742 .unwrap()
2743 .detach();
2744 let task = channel.send_message("sup".to_string(), cx).unwrap();
2745 assert_eq!(
2746 channel_messages(channel),
2747 &[
2748 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2749 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2750 ("user_a".to_string(), "sup".to_string(), true)
2751 ]
2752 );
2753 task
2754 })
2755 .await
2756 .unwrap();
2757
2758 channel_b
2759 .condition(&cx_b, |channel, _| {
2760 channel_messages(channel)
2761 == [
2762 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2763 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2764 ("user_a".to_string(), "sup".to_string(), false),
2765 ]
2766 })
2767 .await;
2768
2769 assert_eq!(
2770 server
2771 .state()
2772 .await
2773 .channel(channel_id)
2774 .unwrap()
2775 .connection_ids
2776 .len(),
2777 2
2778 );
2779 cx_b.update(|_| drop(channel_b));
2780 server
2781 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2782 .await;
2783
2784 cx_a.update(|_| drop(channel_a));
2785 server
2786 .condition(|state| state.channel(channel_id).is_none())
2787 .await;
2788 }
2789
2790 #[gpui::test]
2791 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2792 cx_a.foreground().forbid_parking();
2793
2794 let mut server = TestServer::start(cx_a.foreground()).await;
2795 let client_a = server.create_client(&mut cx_a, "user_a").await;
2796
2797 let db = &server.app_state.db;
2798 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2799 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2800 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2801 .await
2802 .unwrap();
2803 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2804 .await
2805 .unwrap();
2806
2807 let channels_a = cx_a
2808 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2809 channels_a
2810 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2811 .await;
2812 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2813 this.get_channel(channel_id.to_proto(), cx).unwrap()
2814 });
2815
2816 // Messages aren't allowed to be too long.
2817 channel_a
2818 .update(&mut cx_a, |channel, cx| {
2819 let long_body = "this is long.\n".repeat(1024);
2820 channel.send_message(long_body, cx).unwrap()
2821 })
2822 .await
2823 .unwrap_err();
2824
2825 // Messages aren't allowed to be blank.
2826 channel_a.update(&mut cx_a, |channel, cx| {
2827 channel.send_message(String::new(), cx).unwrap_err()
2828 });
2829
2830 // Leading and trailing whitespace are trimmed.
2831 channel_a
2832 .update(&mut cx_a, |channel, cx| {
2833 channel
2834 .send_message("\n surrounded by whitespace \n".to_string(), cx)
2835 .unwrap()
2836 })
2837 .await
2838 .unwrap();
2839 assert_eq!(
2840 db.get_channel_messages(channel_id, 10, None)
2841 .await
2842 .unwrap()
2843 .iter()
2844 .map(|m| &m.body)
2845 .collect::<Vec<_>>(),
2846 &["surrounded by whitespace"]
2847 );
2848 }
2849
2850 #[gpui::test]
2851 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2852 cx_a.foreground().forbid_parking();
2853
2854 // Connect to a server as 2 clients.
2855 let mut server = TestServer::start(cx_a.foreground()).await;
2856 let client_a = server.create_client(&mut cx_a, "user_a").await;
2857 let client_b = server.create_client(&mut cx_b, "user_b").await;
2858 let mut status_b = client_b.status();
2859
2860 // Create an org that includes these 2 users.
2861 let db = &server.app_state.db;
2862 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2863 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2864 .await
2865 .unwrap();
2866 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2867 .await
2868 .unwrap();
2869
2870 // Create a channel that includes all the users.
2871 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2872 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2873 .await
2874 .unwrap();
2875 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2876 .await
2877 .unwrap();
2878 db.create_channel_message(
2879 channel_id,
2880 client_b.current_user_id(&cx_b),
2881 "hello A, it's B.",
2882 OffsetDateTime::now_utc(),
2883 2,
2884 )
2885 .await
2886 .unwrap();
2887
2888 let channels_a = cx_a
2889 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2890 channels_a
2891 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2892 .await;
2893
2894 channels_a.read_with(&cx_a, |list, _| {
2895 assert_eq!(
2896 list.available_channels().unwrap(),
2897 &[ChannelDetails {
2898 id: channel_id.to_proto(),
2899 name: "test-channel".to_string()
2900 }]
2901 )
2902 });
2903 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2904 this.get_channel(channel_id.to_proto(), cx).unwrap()
2905 });
2906 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2907 channel_a
2908 .condition(&cx_a, |channel, _| {
2909 channel_messages(channel)
2910 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2911 })
2912 .await;
2913
2914 let channels_b = cx_b
2915 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2916 channels_b
2917 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2918 .await;
2919 channels_b.read_with(&cx_b, |list, _| {
2920 assert_eq!(
2921 list.available_channels().unwrap(),
2922 &[ChannelDetails {
2923 id: channel_id.to_proto(),
2924 name: "test-channel".to_string()
2925 }]
2926 )
2927 });
2928
2929 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2930 this.get_channel(channel_id.to_proto(), cx).unwrap()
2931 });
2932 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2933 channel_b
2934 .condition(&cx_b, |channel, _| {
2935 channel_messages(channel)
2936 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2937 })
2938 .await;
2939
2940 // Disconnect client B, ensuring we can still access its cached channel data.
2941 server.forbid_connections();
2942 server.disconnect_client(client_b.current_user_id(&cx_b));
2943 while !matches!(
2944 status_b.next().await,
2945 Some(client::Status::ReconnectionError { .. })
2946 ) {}
2947
2948 channels_b.read_with(&cx_b, |channels, _| {
2949 assert_eq!(
2950 channels.available_channels().unwrap(),
2951 [ChannelDetails {
2952 id: channel_id.to_proto(),
2953 name: "test-channel".to_string()
2954 }]
2955 )
2956 });
2957 channel_b.read_with(&cx_b, |channel, _| {
2958 assert_eq!(
2959 channel_messages(channel),
2960 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2961 )
2962 });
2963
2964 // Send a message from client B while it is disconnected.
2965 channel_b
2966 .update(&mut cx_b, |channel, cx| {
2967 let task = channel
2968 .send_message("can you see this?".to_string(), cx)
2969 .unwrap();
2970 assert_eq!(
2971 channel_messages(channel),
2972 &[
2973 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2974 ("user_b".to_string(), "can you see this?".to_string(), true)
2975 ]
2976 );
2977 task
2978 })
2979 .await
2980 .unwrap_err();
2981
2982 // Send a message from client A while B is disconnected.
2983 channel_a
2984 .update(&mut cx_a, |channel, cx| {
2985 channel
2986 .send_message("oh, hi B.".to_string(), cx)
2987 .unwrap()
2988 .detach();
2989 let task = channel.send_message("sup".to_string(), cx).unwrap();
2990 assert_eq!(
2991 channel_messages(channel),
2992 &[
2993 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2994 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2995 ("user_a".to_string(), "sup".to_string(), true)
2996 ]
2997 );
2998 task
2999 })
3000 .await
3001 .unwrap();
3002
3003 // Give client B a chance to reconnect.
3004 server.allow_connections();
3005 cx_b.foreground().advance_clock(Duration::from_secs(10));
3006
3007 // Verify that B sees the new messages upon reconnection, as well as the message client B
3008 // sent while offline.
3009 channel_b
3010 .condition(&cx_b, |channel, _| {
3011 channel_messages(channel)
3012 == [
3013 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3014 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3015 ("user_a".to_string(), "sup".to_string(), false),
3016 ("user_b".to_string(), "can you see this?".to_string(), false),
3017 ]
3018 })
3019 .await;
3020
3021 // Ensure client A and B can communicate normally after reconnection.
3022 channel_a
3023 .update(&mut cx_a, |channel, cx| {
3024 channel.send_message("you online?".to_string(), cx).unwrap()
3025 })
3026 .await
3027 .unwrap();
3028 channel_b
3029 .condition(&cx_b, |channel, _| {
3030 channel_messages(channel)
3031 == [
3032 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3033 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3034 ("user_a".to_string(), "sup".to_string(), false),
3035 ("user_b".to_string(), "can you see this?".to_string(), false),
3036 ("user_a".to_string(), "you online?".to_string(), false),
3037 ]
3038 })
3039 .await;
3040
3041 channel_b
3042 .update(&mut cx_b, |channel, cx| {
3043 channel.send_message("yep".to_string(), cx).unwrap()
3044 })
3045 .await
3046 .unwrap();
3047 channel_a
3048 .condition(&cx_a, |channel, _| {
3049 channel_messages(channel)
3050 == [
3051 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3052 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3053 ("user_a".to_string(), "sup".to_string(), false),
3054 ("user_b".to_string(), "can you see this?".to_string(), false),
3055 ("user_a".to_string(), "you online?".to_string(), false),
3056 ("user_b".to_string(), "yep".to_string(), false),
3057 ]
3058 })
3059 .await;
3060 }
3061
3062 #[gpui::test]
3063 async fn test_contacts(
3064 mut cx_a: TestAppContext,
3065 mut cx_b: TestAppContext,
3066 mut cx_c: TestAppContext,
3067 ) {
3068 cx_a.foreground().forbid_parking();
3069 let lang_registry = Arc::new(LanguageRegistry::new());
3070 let fs = Arc::new(FakeFs::new(cx_a.background()));
3071
3072 // Connect to a server as 3 clients.
3073 let mut server = TestServer::start(cx_a.foreground()).await;
3074 let client_a = server.create_client(&mut cx_a, "user_a").await;
3075 let client_b = server.create_client(&mut cx_b, "user_b").await;
3076 let client_c = server.create_client(&mut cx_c, "user_c").await;
3077
3078 // Share a worktree as client A.
3079 fs.insert_tree(
3080 "/a",
3081 json!({
3082 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3083 }),
3084 )
3085 .await;
3086
3087 let project_a = cx_a.update(|cx| {
3088 Project::local(
3089 client_a.clone(),
3090 client_a.user_store.clone(),
3091 lang_registry.clone(),
3092 fs.clone(),
3093 cx,
3094 )
3095 });
3096 let (worktree_a, _) = project_a
3097 .update(&mut cx_a, |p, cx| {
3098 p.find_or_create_local_worktree("/a", false, cx)
3099 })
3100 .await
3101 .unwrap();
3102 worktree_a
3103 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3104 .await;
3105
3106 client_a
3107 .user_store
3108 .condition(&cx_a, |user_store, _| {
3109 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3110 })
3111 .await;
3112 client_b
3113 .user_store
3114 .condition(&cx_b, |user_store, _| {
3115 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3116 })
3117 .await;
3118 client_c
3119 .user_store
3120 .condition(&cx_c, |user_store, _| {
3121 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3122 })
3123 .await;
3124
3125 let project_id = project_a
3126 .update(&mut cx_a, |project, _| project.next_remote_id())
3127 .await;
3128 project_a
3129 .update(&mut cx_a, |project, cx| project.share(cx))
3130 .await
3131 .unwrap();
3132
3133 let _project_b = Project::remote(
3134 project_id,
3135 client_b.clone(),
3136 client_b.user_store.clone(),
3137 lang_registry.clone(),
3138 fs.clone(),
3139 &mut cx_b.to_async(),
3140 )
3141 .await
3142 .unwrap();
3143
3144 client_a
3145 .user_store
3146 .condition(&cx_a, |user_store, _| {
3147 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3148 })
3149 .await;
3150 client_b
3151 .user_store
3152 .condition(&cx_b, |user_store, _| {
3153 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3154 })
3155 .await;
3156 client_c
3157 .user_store
3158 .condition(&cx_c, |user_store, _| {
3159 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3160 })
3161 .await;
3162
3163 project_a
3164 .condition(&cx_a, |project, _| {
3165 project.collaborators().contains_key(&client_b.peer_id)
3166 })
3167 .await;
3168
3169 cx_a.update(move |_| drop(project_a));
3170 client_a
3171 .user_store
3172 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3173 .await;
3174 client_b
3175 .user_store
3176 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3177 .await;
3178 client_c
3179 .user_store
3180 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3181 .await;
3182
3183 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3184 user_store
3185 .contacts()
3186 .iter()
3187 .map(|contact| {
3188 let worktrees = contact
3189 .projects
3190 .iter()
3191 .map(|p| {
3192 (
3193 p.worktree_root_names[0].as_str(),
3194 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3195 )
3196 })
3197 .collect();
3198 (contact.user.github_login.as_str(), worktrees)
3199 })
3200 .collect()
3201 }
3202 }
3203
3204 struct TestServer {
3205 peer: Arc<Peer>,
3206 app_state: Arc<AppState>,
3207 server: Arc<Server>,
3208 foreground: Rc<executor::Foreground>,
3209 notifications: mpsc::Receiver<()>,
3210 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3211 forbid_connections: Arc<AtomicBool>,
3212 _test_db: TestDb,
3213 }
3214
3215 impl TestServer {
3216 async fn start(foreground: Rc<executor::Foreground>) -> Self {
3217 let test_db = TestDb::new();
3218 let app_state = Self::build_app_state(&test_db).await;
3219 let peer = Peer::new();
3220 let notifications = mpsc::channel(128);
3221 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3222 Self {
3223 peer,
3224 app_state,
3225 server,
3226 foreground,
3227 notifications: notifications.1,
3228 connection_killers: Default::default(),
3229 forbid_connections: Default::default(),
3230 _test_db: test_db,
3231 }
3232 }
3233
3234 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3235 let http = FakeHttpClient::with_404_response();
3236 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3237 let client_name = name.to_string();
3238 let mut client = Client::new(http.clone());
3239 let server = self.server.clone();
3240 let connection_killers = self.connection_killers.clone();
3241 let forbid_connections = self.forbid_connections.clone();
3242 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3243
3244 Arc::get_mut(&mut client)
3245 .unwrap()
3246 .override_authenticate(move |cx| {
3247 cx.spawn(|_| async move {
3248 let access_token = "the-token".to_string();
3249 Ok(Credentials {
3250 user_id: user_id.0 as u64,
3251 access_token,
3252 })
3253 })
3254 })
3255 .override_establish_connection(move |credentials, cx| {
3256 assert_eq!(credentials.user_id, user_id.0 as u64);
3257 assert_eq!(credentials.access_token, "the-token");
3258
3259 let server = server.clone();
3260 let connection_killers = connection_killers.clone();
3261 let forbid_connections = forbid_connections.clone();
3262 let client_name = client_name.clone();
3263 let connection_id_tx = connection_id_tx.clone();
3264 cx.spawn(move |cx| async move {
3265 if forbid_connections.load(SeqCst) {
3266 Err(EstablishConnectionError::other(anyhow!(
3267 "server is forbidding connections"
3268 )))
3269 } else {
3270 let (client_conn, server_conn, kill_conn) =
3271 Connection::in_memory(cx.background());
3272 connection_killers.lock().insert(user_id, kill_conn);
3273 cx.background()
3274 .spawn(server.handle_connection(
3275 server_conn,
3276 client_name,
3277 user_id,
3278 Some(connection_id_tx),
3279 ))
3280 .detach();
3281 Ok(client_conn)
3282 }
3283 })
3284 });
3285
3286 client
3287 .authenticate_and_connect(&cx.to_async())
3288 .await
3289 .unwrap();
3290
3291 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3292 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3293 let mut authed_user =
3294 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3295 while authed_user.next().await.unwrap().is_none() {}
3296
3297 TestClient {
3298 client,
3299 peer_id,
3300 user_store,
3301 }
3302 }
3303
3304 fn disconnect_client(&self, user_id: UserId) {
3305 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3306 let _ = kill_conn.try_send(Some(()));
3307 }
3308 }
3309
3310 fn forbid_connections(&self) {
3311 self.forbid_connections.store(true, SeqCst);
3312 }
3313
3314 fn allow_connections(&self) {
3315 self.forbid_connections.store(false, SeqCst);
3316 }
3317
3318 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3319 let mut config = Config::default();
3320 config.session_secret = "a".repeat(32);
3321 config.database_url = test_db.url.clone();
3322 let github_client = github::AppClient::test();
3323 Arc::new(AppState {
3324 db: test_db.db().clone(),
3325 handlebars: Default::default(),
3326 auth_client: auth::build_client("", ""),
3327 repo_client: github::RepoClient::test(&github_client),
3328 github_client,
3329 config,
3330 })
3331 }
3332
3333 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3334 self.server.store.read()
3335 }
3336
3337 async fn condition<F>(&mut self, mut predicate: F)
3338 where
3339 F: FnMut(&Store) -> bool,
3340 {
3341 async_std::future::timeout(Duration::from_millis(500), async {
3342 while !(predicate)(&*self.server.store.read()) {
3343 self.foreground.start_waiting();
3344 self.notifications.next().await;
3345 self.foreground.finish_waiting();
3346 }
3347 })
3348 .await
3349 .expect("condition timed out");
3350 }
3351 }
3352
3353 impl Drop for TestServer {
3354 fn drop(&mut self) {
3355 self.peer.reset();
3356 }
3357 }
3358
3359 struct TestClient {
3360 client: Arc<Client>,
3361 pub peer_id: PeerId,
3362 pub user_store: ModelHandle<UserStore>,
3363 }
3364
3365 impl Deref for TestClient {
3366 type Target = Arc<Client>;
3367
3368 fn deref(&self) -> &Self::Target {
3369 &self.client
3370 }
3371 }
3372
3373 impl TestClient {
3374 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3375 UserId::from_proto(
3376 self.user_store
3377 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3378 )
3379 }
3380 }
3381
3382 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3383 channel
3384 .messages()
3385 .cursor::<()>()
3386 .map(|m| {
3387 (
3388 m.sender.github_login.clone(),
3389 m.body.clone(),
3390 m.is_pending(),
3391 )
3392 })
3393 .collect()
3394 }
3395
3396 struct EmptyView;
3397
3398 impl gpui::Entity for EmptyView {
3399 type Event = ();
3400 }
3401
3402 impl gpui::View for EmptyView {
3403 fn ui_name() -> &'static str {
3404 "empty view"
3405 }
3406
3407 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3408 gpui::Element::boxed(gpui::elements::Empty)
3409 }
3410 }
3411}