1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44const MESSAGE_COUNT_PER_PAGE: usize = 100;
45const MAX_MESSAGE_LEN: usize = 1024;
46const NO_SUCH_PROJECT: &'static str = "no such project";
47
48impl Server {
49 pub fn new(
50 app_state: Arc<AppState>,
51 peer: Arc<Peer>,
52 notifications: Option<mpsc::Sender<()>>,
53 ) -> Arc<Self> {
54 let mut server = Self {
55 peer,
56 app_state,
57 store: Default::default(),
58 handlers: Default::default(),
59 notifications,
60 };
61
62 server
63 .add_handler(Server::ping)
64 .add_handler(Server::register_project)
65 .add_handler(Server::unregister_project)
66 .add_handler(Server::share_project)
67 .add_handler(Server::unshare_project)
68 .add_handler(Server::join_project)
69 .add_handler(Server::leave_project)
70 .add_handler(Server::register_worktree)
71 .add_handler(Server::unregister_worktree)
72 .add_handler(Server::share_worktree)
73 .add_handler(Server::update_worktree)
74 .add_handler(Server::update_diagnostic_summary)
75 .add_handler(Server::disk_based_diagnostics_updating)
76 .add_handler(Server::disk_based_diagnostics_updated)
77 .add_handler(Server::get_definition)
78 .add_handler(Server::open_buffer)
79 .add_handler(Server::close_buffer)
80 .add_handler(Server::update_buffer)
81 .add_handler(Server::update_buffer_file)
82 .add_handler(Server::buffer_reloaded)
83 .add_handler(Server::buffer_saved)
84 .add_handler(Server::save_buffer)
85 .add_handler(Server::format_buffer)
86 .add_handler(Server::get_completions)
87 .add_handler(Server::apply_additional_edits_for_completion)
88 .add_handler(Server::get_channels)
89 .add_handler(Server::get_users)
90 .add_handler(Server::join_channel)
91 .add_handler(Server::leave_channel)
92 .add_handler(Server::send_channel_message)
93 .add_handler(Server::get_channel_messages);
94
95 Arc::new(server)
96 }
97
98 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
99 where
100 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
101 Fut: 'static + Send + Future<Output = tide::Result<()>>,
102 M: EnvelopedMessage,
103 {
104 let prev_handler = self.handlers.insert(
105 TypeId::of::<M>(),
106 Box::new(move |server, envelope| {
107 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
108 (handler)(server, *envelope).boxed()
109 }),
110 );
111 if prev_handler.is_some() {
112 panic!("registered a handler for the same message twice");
113 }
114 self
115 }
116
117 pub fn handle_connection(
118 self: &Arc<Self>,
119 connection: Connection,
120 addr: String,
121 user_id: UserId,
122 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
123 ) -> impl Future<Output = ()> {
124 let mut this = self.clone();
125 async move {
126 let (connection_id, handle_io, mut incoming_rx) =
127 this.peer.add_connection(connection).await;
128
129 if let Some(send_connection_id) = send_connection_id.as_mut() {
130 let _ = send_connection_id.send(connection_id).await;
131 }
132
133 this.state_mut().add_connection(connection_id, user_id);
134 if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
135 log::error!("error updating contacts for {:?}: {}", user_id, err);
136 }
137
138 let handle_io = handle_io.fuse();
139 futures::pin_mut!(handle_io);
140 loop {
141 let next_message = incoming_rx.next().fuse();
142 futures::pin_mut!(next_message);
143 futures::select_biased! {
144 message = next_message => {
145 if let Some(message) = message {
146 let start_time = Instant::now();
147 log::info!("RPC message received: {}", message.payload_type_name());
148 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
149 if let Err(err) = (handler)(this.clone(), message).await {
150 log::error!("error handling message: {:?}", err);
151 } else {
152 log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
153 }
154
155 if let Some(mut notifications) = this.notifications.clone() {
156 let _ = notifications.send(()).await;
157 }
158 } else {
159 log::warn!("unhandled message: {}", message.payload_type_name());
160 }
161 } else {
162 log::info!("rpc connection closed {:?}", addr);
163 break;
164 }
165 }
166 handle_io = handle_io => {
167 if let Err(err) = handle_io {
168 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
169 }
170 break;
171 }
172 }
173 }
174
175 if let Err(err) = this.sign_out(connection_id).await {
176 log::error!("error signing out connection {:?} - {:?}", addr, err);
177 }
178 }
179 }
180
181 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
182 self.peer.disconnect(connection_id);
183 let removed_connection = self.state_mut().remove_connection(connection_id)?;
184
185 for (project_id, project) in removed_connection.hosted_projects {
186 if let Some(share) = project.share {
187 broadcast(
188 connection_id,
189 share.guests.keys().copied().collect(),
190 |conn_id| {
191 self.peer
192 .send(conn_id, proto::UnshareProject { project_id })
193 },
194 )
195 .await?;
196 }
197 }
198
199 for (project_id, peer_ids) in removed_connection.guest_project_ids {
200 broadcast(connection_id, peer_ids, |conn_id| {
201 self.peer.send(
202 conn_id,
203 proto::RemoveProjectCollaborator {
204 project_id,
205 peer_id: connection_id.0,
206 },
207 )
208 })
209 .await?;
210 }
211
212 self.update_contacts_for_users(removed_connection.contact_ids.iter())
213 .await?;
214
215 Ok(())
216 }
217
218 async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
219 self.peer.respond(request.receipt(), proto::Ack {}).await?;
220 Ok(())
221 }
222
223 async fn register_project(
224 mut self: Arc<Server>,
225 request: TypedEnvelope<proto::RegisterProject>,
226 ) -> tide::Result<()> {
227 let project_id = {
228 let mut state = self.state_mut();
229 let user_id = state.user_id_for_connection(request.sender_id)?;
230 state.register_project(request.sender_id, user_id)
231 };
232 self.peer
233 .respond(
234 request.receipt(),
235 proto::RegisterProjectResponse { project_id },
236 )
237 .await?;
238 Ok(())
239 }
240
241 async fn unregister_project(
242 mut self: Arc<Server>,
243 request: TypedEnvelope<proto::UnregisterProject>,
244 ) -> tide::Result<()> {
245 let project = self
246 .state_mut()
247 .unregister_project(request.payload.project_id, request.sender_id)
248 .ok_or_else(|| anyhow!("no such project"))?;
249 self.update_contacts_for_users(project.authorized_user_ids().iter())
250 .await?;
251 Ok(())
252 }
253
254 async fn share_project(
255 mut self: Arc<Server>,
256 request: TypedEnvelope<proto::ShareProject>,
257 ) -> tide::Result<()> {
258 self.state_mut()
259 .share_project(request.payload.project_id, request.sender_id);
260 self.peer.respond(request.receipt(), proto::Ack {}).await?;
261 Ok(())
262 }
263
264 async fn unshare_project(
265 mut self: Arc<Server>,
266 request: TypedEnvelope<proto::UnshareProject>,
267 ) -> tide::Result<()> {
268 let project_id = request.payload.project_id;
269 let project = self
270 .state_mut()
271 .unshare_project(project_id, request.sender_id)?;
272
273 broadcast(request.sender_id, project.connection_ids, |conn_id| {
274 self.peer
275 .send(conn_id, proto::UnshareProject { project_id })
276 })
277 .await?;
278 self.update_contacts_for_users(&project.authorized_user_ids)
279 .await?;
280
281 Ok(())
282 }
283
284 async fn join_project(
285 mut self: Arc<Server>,
286 request: TypedEnvelope<proto::JoinProject>,
287 ) -> tide::Result<()> {
288 let project_id = request.payload.project_id;
289
290 let user_id = self.state().user_id_for_connection(request.sender_id)?;
291 let response_data = self
292 .state_mut()
293 .join_project(request.sender_id, user_id, project_id)
294 .and_then(|joined| {
295 let share = joined.project.share()?;
296 let peer_count = share.guests.len();
297 let mut collaborators = Vec::with_capacity(peer_count);
298 collaborators.push(proto::Collaborator {
299 peer_id: joined.project.host_connection_id.0,
300 replica_id: 0,
301 user_id: joined.project.host_user_id.to_proto(),
302 });
303 let worktrees = joined
304 .project
305 .worktrees
306 .iter()
307 .filter_map(|(id, worktree)| {
308 worktree.share.as_ref().map(|share| proto::Worktree {
309 id: *id,
310 root_name: worktree.root_name.clone(),
311 entries: share.entries.values().cloned().collect(),
312 diagnostic_summaries: share
313 .diagnostic_summaries
314 .values()
315 .cloned()
316 .collect(),
317 weak: worktree.weak,
318 })
319 })
320 .collect();
321 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
322 if *peer_conn_id != request.sender_id {
323 collaborators.push(proto::Collaborator {
324 peer_id: peer_conn_id.0,
325 replica_id: *peer_replica_id as u32,
326 user_id: peer_user_id.to_proto(),
327 });
328 }
329 }
330 let response = proto::JoinProjectResponse {
331 worktrees,
332 replica_id: joined.replica_id as u32,
333 collaborators,
334 };
335 let connection_ids = joined.project.connection_ids();
336 let contact_user_ids = joined.project.authorized_user_ids();
337 Ok((response, connection_ids, contact_user_ids))
338 });
339
340 match response_data {
341 Ok((response, connection_ids, contact_user_ids)) => {
342 broadcast(request.sender_id, connection_ids, |conn_id| {
343 self.peer.send(
344 conn_id,
345 proto::AddProjectCollaborator {
346 project_id: project_id,
347 collaborator: Some(proto::Collaborator {
348 peer_id: request.sender_id.0,
349 replica_id: response.replica_id,
350 user_id: user_id.to_proto(),
351 }),
352 },
353 )
354 })
355 .await?;
356 self.peer.respond(request.receipt(), response).await?;
357 self.update_contacts_for_users(&contact_user_ids).await?;
358 }
359 Err(error) => {
360 self.peer
361 .respond_with_error(
362 request.receipt(),
363 proto::Error {
364 message: error.to_string(),
365 },
366 )
367 .await?;
368 }
369 }
370
371 Ok(())
372 }
373
374 async fn leave_project(
375 mut self: Arc<Server>,
376 request: TypedEnvelope<proto::LeaveProject>,
377 ) -> tide::Result<()> {
378 let sender_id = request.sender_id;
379 let project_id = request.payload.project_id;
380 let worktree = self.state_mut().leave_project(sender_id, project_id);
381 if let Some(worktree) = worktree {
382 broadcast(sender_id, worktree.connection_ids, |conn_id| {
383 self.peer.send(
384 conn_id,
385 proto::RemoveProjectCollaborator {
386 project_id,
387 peer_id: sender_id.0,
388 },
389 )
390 })
391 .await?;
392 self.update_contacts_for_users(&worktree.authorized_user_ids)
393 .await?;
394 }
395 Ok(())
396 }
397
398 async fn register_worktree(
399 mut self: Arc<Server>,
400 request: TypedEnvelope<proto::RegisterWorktree>,
401 ) -> tide::Result<()> {
402 let receipt = request.receipt();
403 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
404
405 let mut contact_user_ids = HashSet::default();
406 contact_user_ids.insert(host_user_id);
407 for github_login in request.payload.authorized_logins {
408 match self.app_state.db.create_user(&github_login, false).await {
409 Ok(contact_user_id) => {
410 contact_user_ids.insert(contact_user_id);
411 }
412 Err(err) => {
413 let message = err.to_string();
414 self.peer
415 .respond_with_error(receipt, proto::Error { message })
416 .await?;
417 return Ok(());
418 }
419 }
420 }
421
422 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
423 let ok = self.state_mut().register_worktree(
424 request.payload.project_id,
425 request.payload.worktree_id,
426 Worktree {
427 authorized_user_ids: contact_user_ids.clone(),
428 root_name: request.payload.root_name,
429 share: None,
430 weak: false,
431 },
432 );
433
434 if ok {
435 self.peer.respond(receipt, proto::Ack {}).await?;
436 self.update_contacts_for_users(&contact_user_ids).await?;
437 } else {
438 self.peer
439 .respond_with_error(
440 receipt,
441 proto::Error {
442 message: NO_SUCH_PROJECT.to_string(),
443 },
444 )
445 .await?;
446 }
447
448 Ok(())
449 }
450
451 async fn unregister_worktree(
452 mut self: Arc<Server>,
453 request: TypedEnvelope<proto::UnregisterWorktree>,
454 ) -> tide::Result<()> {
455 let project_id = request.payload.project_id;
456 let worktree_id = request.payload.worktree_id;
457 let (worktree, guest_connection_ids) =
458 self.state_mut()
459 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
460
461 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
462 self.peer.send(
463 conn_id,
464 proto::UnregisterWorktree {
465 project_id,
466 worktree_id,
467 },
468 )
469 })
470 .await?;
471 self.update_contacts_for_users(&worktree.authorized_user_ids)
472 .await?;
473 Ok(())
474 }
475
476 async fn share_worktree(
477 mut self: Arc<Server>,
478 mut request: TypedEnvelope<proto::ShareWorktree>,
479 ) -> tide::Result<()> {
480 let worktree = request
481 .payload
482 .worktree
483 .as_mut()
484 .ok_or_else(|| anyhow!("missing worktree"))?;
485 let entries = worktree
486 .entries
487 .iter()
488 .map(|entry| (entry.id, entry.clone()))
489 .collect();
490 let diagnostic_summaries = worktree
491 .diagnostic_summaries
492 .iter()
493 .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
494 .collect();
495
496 let shared_worktree = self.state_mut().share_worktree(
497 request.payload.project_id,
498 worktree.id,
499 request.sender_id,
500 entries,
501 diagnostic_summaries,
502 );
503 if let Some(shared_worktree) = shared_worktree {
504 broadcast(
505 request.sender_id,
506 shared_worktree.connection_ids,
507 |connection_id| {
508 self.peer.forward_send(
509 request.sender_id,
510 connection_id,
511 request.payload.clone(),
512 )
513 },
514 )
515 .await?;
516 self.peer.respond(request.receipt(), proto::Ack {}).await?;
517 self.update_contacts_for_users(&shared_worktree.authorized_user_ids)
518 .await?;
519 } else {
520 self.peer
521 .respond_with_error(
522 request.receipt(),
523 proto::Error {
524 message: "no such worktree".to_string(),
525 },
526 )
527 .await?;
528 }
529 Ok(())
530 }
531
532 async fn update_worktree(
533 mut self: Arc<Server>,
534 request: TypedEnvelope<proto::UpdateWorktree>,
535 ) -> tide::Result<()> {
536 let connection_ids = self
537 .state_mut()
538 .update_worktree(
539 request.sender_id,
540 request.payload.project_id,
541 request.payload.worktree_id,
542 &request.payload.removed_entries,
543 &request.payload.updated_entries,
544 )
545 .ok_or_else(|| anyhow!("no such worktree"))?;
546
547 broadcast(request.sender_id, connection_ids, |connection_id| {
548 self.peer
549 .forward_send(request.sender_id, connection_id, request.payload.clone())
550 })
551 .await?;
552
553 Ok(())
554 }
555
556 async fn update_diagnostic_summary(
557 mut self: Arc<Server>,
558 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
559 ) -> tide::Result<()> {
560 let receiver_ids = request
561 .payload
562 .summary
563 .clone()
564 .and_then(|summary| {
565 self.state_mut().update_diagnostic_summary(
566 request.payload.project_id,
567 request.payload.worktree_id,
568 request.sender_id,
569 summary,
570 )
571 })
572 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
573
574 broadcast(request.sender_id, receiver_ids, |connection_id| {
575 self.peer
576 .forward_send(request.sender_id, connection_id, request.payload.clone())
577 })
578 .await?;
579 Ok(())
580 }
581
582 async fn disk_based_diagnostics_updating(
583 self: Arc<Server>,
584 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
585 ) -> tide::Result<()> {
586 let receiver_ids = self
587 .state()
588 .project_connection_ids(request.payload.project_id, request.sender_id)
589 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
590 broadcast(request.sender_id, receiver_ids, |connection_id| {
591 self.peer
592 .forward_send(request.sender_id, connection_id, request.payload.clone())
593 })
594 .await?;
595 Ok(())
596 }
597
598 async fn disk_based_diagnostics_updated(
599 self: Arc<Server>,
600 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
601 ) -> tide::Result<()> {
602 let receiver_ids = self
603 .state()
604 .project_connection_ids(request.payload.project_id, request.sender_id)
605 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
606 broadcast(request.sender_id, receiver_ids, |connection_id| {
607 self.peer
608 .forward_send(request.sender_id, connection_id, request.payload.clone())
609 })
610 .await?;
611 Ok(())
612 }
613
614 async fn get_definition(
615 self: Arc<Server>,
616 request: TypedEnvelope<proto::GetDefinition>,
617 ) -> tide::Result<()> {
618 let receipt = request.receipt();
619 let host_connection_id = self
620 .state()
621 .read_project(request.payload.project_id, request.sender_id)
622 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
623 .host_connection_id;
624 let response = self
625 .peer
626 .forward_request(request.sender_id, host_connection_id, request.payload)
627 .await?;
628 self.peer.respond(receipt, response).await?;
629 Ok(())
630 }
631
632 async fn open_buffer(
633 self: Arc<Server>,
634 request: TypedEnvelope<proto::OpenBuffer>,
635 ) -> tide::Result<()> {
636 let receipt = request.receipt();
637 let host_connection_id = self
638 .state()
639 .read_project(request.payload.project_id, request.sender_id)
640 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
641 .host_connection_id;
642 let response = self
643 .peer
644 .forward_request(request.sender_id, host_connection_id, request.payload)
645 .await?;
646 self.peer.respond(receipt, response).await?;
647 Ok(())
648 }
649
650 async fn close_buffer(
651 self: Arc<Server>,
652 request: TypedEnvelope<proto::CloseBuffer>,
653 ) -> tide::Result<()> {
654 let host_connection_id = self
655 .state()
656 .read_project(request.payload.project_id, request.sender_id)
657 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
658 .host_connection_id;
659 self.peer
660 .forward_send(request.sender_id, host_connection_id, request.payload)
661 .await?;
662 Ok(())
663 }
664
665 async fn save_buffer(
666 self: Arc<Server>,
667 request: TypedEnvelope<proto::SaveBuffer>,
668 ) -> tide::Result<()> {
669 let host;
670 let guests;
671 {
672 let state = self.state();
673 let project = state
674 .read_project(request.payload.project_id, request.sender_id)
675 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
676 host = project.host_connection_id;
677 guests = project.guest_connection_ids()
678 }
679
680 let sender = request.sender_id;
681 let receipt = request.receipt();
682 let response = self
683 .peer
684 .forward_request(sender, host, request.payload.clone())
685 .await?;
686
687 broadcast(host, guests, |conn_id| {
688 let response = response.clone();
689 let peer = &self.peer;
690 async move {
691 if conn_id == sender {
692 peer.respond(receipt, response).await
693 } else {
694 peer.forward_send(host, conn_id, response).await
695 }
696 }
697 })
698 .await?;
699
700 Ok(())
701 }
702
703 async fn format_buffer(
704 self: Arc<Server>,
705 request: TypedEnvelope<proto::FormatBuffer>,
706 ) -> tide::Result<()> {
707 let host;
708 {
709 let state = self.state();
710 let project = state
711 .read_project(request.payload.project_id, request.sender_id)
712 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
713 host = project.host_connection_id;
714 }
715
716 let sender = request.sender_id;
717 let receipt = request.receipt();
718 let response = self
719 .peer
720 .forward_request(sender, host, request.payload.clone())
721 .await?;
722 self.peer.respond(receipt, response).await?;
723
724 Ok(())
725 }
726
727 async fn get_completions(
728 self: Arc<Server>,
729 request: TypedEnvelope<proto::GetCompletions>,
730 ) -> tide::Result<()> {
731 let host;
732 {
733 let state = self.state();
734 let project = state
735 .read_project(request.payload.project_id, request.sender_id)
736 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
737 host = project.host_connection_id;
738 }
739
740 let sender = request.sender_id;
741 let receipt = request.receipt();
742 let response = self
743 .peer
744 .forward_request(sender, host, request.payload.clone())
745 .await?;
746 self.peer.respond(receipt, response).await?;
747
748 Ok(())
749 }
750
751 async fn apply_additional_edits_for_completion(
752 self: Arc<Server>,
753 request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
754 ) -> tide::Result<()> {
755 let host;
756 {
757 let state = self.state();
758 let project = state
759 .read_project(request.payload.project_id, request.sender_id)
760 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
761 host = project.host_connection_id;
762 }
763
764 let sender = request.sender_id;
765 let receipt = request.receipt();
766 let response = self
767 .peer
768 .forward_request(sender, host, request.payload.clone())
769 .await?;
770 self.peer.respond(receipt, response).await?;
771
772 Ok(())
773 }
774
775 async fn update_buffer(
776 self: Arc<Server>,
777 request: TypedEnvelope<proto::UpdateBuffer>,
778 ) -> tide::Result<()> {
779 let receiver_ids = self
780 .state()
781 .project_connection_ids(request.payload.project_id, request.sender_id)
782 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
783 broadcast(request.sender_id, receiver_ids, |connection_id| {
784 self.peer
785 .forward_send(request.sender_id, connection_id, request.payload.clone())
786 })
787 .await?;
788 self.peer.respond(request.receipt(), proto::Ack {}).await?;
789 Ok(())
790 }
791
792 async fn update_buffer_file(
793 self: Arc<Server>,
794 request: TypedEnvelope<proto::UpdateBufferFile>,
795 ) -> tide::Result<()> {
796 let receiver_ids = self
797 .state()
798 .project_connection_ids(request.payload.project_id, request.sender_id)
799 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
800 broadcast(request.sender_id, receiver_ids, |connection_id| {
801 self.peer
802 .forward_send(request.sender_id, connection_id, request.payload.clone())
803 })
804 .await?;
805 Ok(())
806 }
807
808 async fn buffer_reloaded(
809 self: Arc<Server>,
810 request: TypedEnvelope<proto::BufferReloaded>,
811 ) -> tide::Result<()> {
812 let receiver_ids = self
813 .state()
814 .project_connection_ids(request.payload.project_id, request.sender_id)
815 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
816 broadcast(request.sender_id, receiver_ids, |connection_id| {
817 self.peer
818 .forward_send(request.sender_id, connection_id, request.payload.clone())
819 })
820 .await?;
821 Ok(())
822 }
823
824 async fn buffer_saved(
825 self: Arc<Server>,
826 request: TypedEnvelope<proto::BufferSaved>,
827 ) -> tide::Result<()> {
828 let receiver_ids = self
829 .state()
830 .project_connection_ids(request.payload.project_id, request.sender_id)
831 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
832 broadcast(request.sender_id, receiver_ids, |connection_id| {
833 self.peer
834 .forward_send(request.sender_id, connection_id, request.payload.clone())
835 })
836 .await?;
837 Ok(())
838 }
839
840 async fn get_channels(
841 self: Arc<Server>,
842 request: TypedEnvelope<proto::GetChannels>,
843 ) -> tide::Result<()> {
844 let user_id = self.state().user_id_for_connection(request.sender_id)?;
845 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
846 self.peer
847 .respond(
848 request.receipt(),
849 proto::GetChannelsResponse {
850 channels: channels
851 .into_iter()
852 .map(|chan| proto::Channel {
853 id: chan.id.to_proto(),
854 name: chan.name,
855 })
856 .collect(),
857 },
858 )
859 .await?;
860 Ok(())
861 }
862
863 async fn get_users(
864 self: Arc<Server>,
865 request: TypedEnvelope<proto::GetUsers>,
866 ) -> tide::Result<()> {
867 let receipt = request.receipt();
868 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
869 let users = self
870 .app_state
871 .db
872 .get_users_by_ids(user_ids)
873 .await?
874 .into_iter()
875 .map(|user| proto::User {
876 id: user.id.to_proto(),
877 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
878 github_login: user.github_login,
879 })
880 .collect();
881 self.peer
882 .respond(receipt, proto::GetUsersResponse { users })
883 .await?;
884 Ok(())
885 }
886
887 async fn update_contacts_for_users<'a>(
888 self: &Arc<Server>,
889 user_ids: impl IntoIterator<Item = &'a UserId>,
890 ) -> tide::Result<()> {
891 let mut send_futures = Vec::new();
892
893 {
894 let state = self.state();
895 for user_id in user_ids {
896 let contacts = state.contacts_for_user(*user_id);
897 for connection_id in state.connection_ids_for_user(*user_id) {
898 send_futures.push(self.peer.send(
899 connection_id,
900 proto::UpdateContacts {
901 contacts: contacts.clone(),
902 },
903 ));
904 }
905 }
906 }
907 futures::future::try_join_all(send_futures).await?;
908
909 Ok(())
910 }
911
912 async fn join_channel(
913 mut self: Arc<Self>,
914 request: TypedEnvelope<proto::JoinChannel>,
915 ) -> tide::Result<()> {
916 let user_id = self.state().user_id_for_connection(request.sender_id)?;
917 let channel_id = ChannelId::from_proto(request.payload.channel_id);
918 if !self
919 .app_state
920 .db
921 .can_user_access_channel(user_id, channel_id)
922 .await?
923 {
924 Err(anyhow!("access denied"))?;
925 }
926
927 self.state_mut().join_channel(request.sender_id, channel_id);
928 let messages = self
929 .app_state
930 .db
931 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
932 .await?
933 .into_iter()
934 .map(|msg| proto::ChannelMessage {
935 id: msg.id.to_proto(),
936 body: msg.body,
937 timestamp: msg.sent_at.unix_timestamp() as u64,
938 sender_id: msg.sender_id.to_proto(),
939 nonce: Some(msg.nonce.as_u128().into()),
940 })
941 .collect::<Vec<_>>();
942 self.peer
943 .respond(
944 request.receipt(),
945 proto::JoinChannelResponse {
946 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
947 messages,
948 },
949 )
950 .await?;
951 Ok(())
952 }
953
954 async fn leave_channel(
955 mut self: Arc<Self>,
956 request: TypedEnvelope<proto::LeaveChannel>,
957 ) -> tide::Result<()> {
958 let user_id = self.state().user_id_for_connection(request.sender_id)?;
959 let channel_id = ChannelId::from_proto(request.payload.channel_id);
960 if !self
961 .app_state
962 .db
963 .can_user_access_channel(user_id, channel_id)
964 .await?
965 {
966 Err(anyhow!("access denied"))?;
967 }
968
969 self.state_mut()
970 .leave_channel(request.sender_id, channel_id);
971
972 Ok(())
973 }
974
975 async fn send_channel_message(
976 self: Arc<Self>,
977 request: TypedEnvelope<proto::SendChannelMessage>,
978 ) -> tide::Result<()> {
979 let receipt = request.receipt();
980 let channel_id = ChannelId::from_proto(request.payload.channel_id);
981 let user_id;
982 let connection_ids;
983 {
984 let state = self.state();
985 user_id = state.user_id_for_connection(request.sender_id)?;
986 if let Some(ids) = state.channel_connection_ids(channel_id) {
987 connection_ids = ids;
988 } else {
989 return Ok(());
990 }
991 }
992
993 // Validate the message body.
994 let body = request.payload.body.trim().to_string();
995 if body.len() > MAX_MESSAGE_LEN {
996 self.peer
997 .respond_with_error(
998 receipt,
999 proto::Error {
1000 message: "message is too long".to_string(),
1001 },
1002 )
1003 .await?;
1004 return Ok(());
1005 }
1006 if body.is_empty() {
1007 self.peer
1008 .respond_with_error(
1009 receipt,
1010 proto::Error {
1011 message: "message can't be blank".to_string(),
1012 },
1013 )
1014 .await?;
1015 return Ok(());
1016 }
1017
1018 let timestamp = OffsetDateTime::now_utc();
1019 let nonce = if let Some(nonce) = request.payload.nonce {
1020 nonce
1021 } else {
1022 self.peer
1023 .respond_with_error(
1024 receipt,
1025 proto::Error {
1026 message: "nonce can't be blank".to_string(),
1027 },
1028 )
1029 .await?;
1030 return Ok(());
1031 };
1032
1033 let message_id = self
1034 .app_state
1035 .db
1036 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1037 .await?
1038 .to_proto();
1039 let message = proto::ChannelMessage {
1040 sender_id: user_id.to_proto(),
1041 id: message_id,
1042 body,
1043 timestamp: timestamp.unix_timestamp() as u64,
1044 nonce: Some(nonce),
1045 };
1046 broadcast(request.sender_id, connection_ids, |conn_id| {
1047 self.peer.send(
1048 conn_id,
1049 proto::ChannelMessageSent {
1050 channel_id: channel_id.to_proto(),
1051 message: Some(message.clone()),
1052 },
1053 )
1054 })
1055 .await?;
1056 self.peer
1057 .respond(
1058 receipt,
1059 proto::SendChannelMessageResponse {
1060 message: Some(message),
1061 },
1062 )
1063 .await?;
1064 Ok(())
1065 }
1066
1067 async fn get_channel_messages(
1068 self: Arc<Self>,
1069 request: TypedEnvelope<proto::GetChannelMessages>,
1070 ) -> tide::Result<()> {
1071 let user_id = self.state().user_id_for_connection(request.sender_id)?;
1072 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1073 if !self
1074 .app_state
1075 .db
1076 .can_user_access_channel(user_id, channel_id)
1077 .await?
1078 {
1079 Err(anyhow!("access denied"))?;
1080 }
1081
1082 let messages = self
1083 .app_state
1084 .db
1085 .get_channel_messages(
1086 channel_id,
1087 MESSAGE_COUNT_PER_PAGE,
1088 Some(MessageId::from_proto(request.payload.before_message_id)),
1089 )
1090 .await?
1091 .into_iter()
1092 .map(|msg| proto::ChannelMessage {
1093 id: msg.id.to_proto(),
1094 body: msg.body,
1095 timestamp: msg.sent_at.unix_timestamp() as u64,
1096 sender_id: msg.sender_id.to_proto(),
1097 nonce: Some(msg.nonce.as_u128().into()),
1098 })
1099 .collect::<Vec<_>>();
1100 self.peer
1101 .respond(
1102 request.receipt(),
1103 proto::GetChannelMessagesResponse {
1104 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1105 messages,
1106 },
1107 )
1108 .await?;
1109 Ok(())
1110 }
1111
1112 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1113 self.store.read()
1114 }
1115
1116 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1117 self.store.write()
1118 }
1119}
1120
1121pub async fn broadcast<F, T>(
1122 sender_id: ConnectionId,
1123 receiver_ids: Vec<ConnectionId>,
1124 mut f: F,
1125) -> anyhow::Result<()>
1126where
1127 F: FnMut(ConnectionId) -> T,
1128 T: Future<Output = anyhow::Result<()>>,
1129{
1130 let futures = receiver_ids
1131 .into_iter()
1132 .filter(|id| *id != sender_id)
1133 .map(|id| f(id));
1134 futures::future::try_join_all(futures).await?;
1135 Ok(())
1136}
1137
1138pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1139 let server = Server::new(app.state().clone(), rpc.clone(), None);
1140 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1141 let server = server.clone();
1142 async move {
1143 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1144
1145 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1146 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1147 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1148 let client_protocol_version: Option<u32> = request
1149 .header("X-Zed-Protocol-Version")
1150 .and_then(|v| v.as_str().parse().ok());
1151
1152 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1153 return Ok(Response::new(StatusCode::UpgradeRequired));
1154 }
1155
1156 let header = match request.header("Sec-Websocket-Key") {
1157 Some(h) => h.as_str(),
1158 None => return Err(anyhow!("expected sec-websocket-key"))?,
1159 };
1160
1161 let user_id = process_auth_header(&request).await?;
1162
1163 let mut response = Response::new(StatusCode::SwitchingProtocols);
1164 response.insert_header(UPGRADE, "websocket");
1165 response.insert_header(CONNECTION, "Upgrade");
1166 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1167 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1168 response.insert_header("Sec-Websocket-Version", "13");
1169
1170 let http_res: &mut tide::http::Response = response.as_mut();
1171 let upgrade_receiver = http_res.recv_upgrade().await;
1172 let addr = request.remote().unwrap_or("unknown").to_string();
1173 task::spawn(async move {
1174 if let Some(stream) = upgrade_receiver.await {
1175 server
1176 .handle_connection(
1177 Connection::new(
1178 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1179 ),
1180 addr,
1181 user_id,
1182 None,
1183 )
1184 .await;
1185 }
1186 });
1187
1188 Ok(response)
1189 }
1190 });
1191}
1192
1193fn header_contains_ignore_case<T>(
1194 request: &tide::Request<T>,
1195 header_name: HeaderName,
1196 value: &str,
1197) -> bool {
1198 request
1199 .header(header_name)
1200 .map(|h| {
1201 h.as_str()
1202 .split(',')
1203 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1204 })
1205 .unwrap_or(false)
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210 use super::*;
1211 use crate::{
1212 auth,
1213 db::{tests::TestDb, UserId},
1214 github, AppState, Config,
1215 };
1216 use ::rpc::Peer;
1217 use async_std::task;
1218 use gpui::{executor, ModelHandle, TestAppContext};
1219 use parking_lot::Mutex;
1220 use postage::{mpsc, watch};
1221 use rand::prelude::*;
1222 use rpc::PeerId;
1223 use serde_json::json;
1224 use sqlx::types::time::OffsetDateTime;
1225 use std::{
1226 ops::Deref,
1227 path::Path,
1228 rc::Rc,
1229 sync::{
1230 atomic::{AtomicBool, Ordering::SeqCst},
1231 Arc,
1232 },
1233 time::Duration,
1234 };
1235 use zed::{
1236 client::{
1237 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1238 EstablishConnectionError, UserStore,
1239 },
1240 editor::{Editor, EditorSettings, Input, MultiBuffer},
1241 fs::{FakeFs, Fs as _},
1242 language::{
1243 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1244 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1245 },
1246 lsp,
1247 project::{DiagnosticSummary, Project, ProjectPath},
1248 };
1249
1250 #[gpui::test]
1251 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1252 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1253 let lang_registry = Arc::new(LanguageRegistry::new());
1254 let fs = Arc::new(FakeFs::new(cx_a.background()));
1255 cx_a.foreground().forbid_parking();
1256
1257 // Connect to a server as 2 clients.
1258 let mut server = TestServer::start(cx_a.foreground()).await;
1259 let client_a = server.create_client(&mut cx_a, "user_a").await;
1260 let client_b = server.create_client(&mut cx_b, "user_b").await;
1261
1262 // Share a project as client A
1263 fs.insert_tree(
1264 "/a",
1265 json!({
1266 ".zed.toml": r#"collaborators = ["user_b"]"#,
1267 "a.txt": "a-contents",
1268 "b.txt": "b-contents",
1269 }),
1270 )
1271 .await;
1272 let project_a = cx_a.update(|cx| {
1273 Project::local(
1274 client_a.clone(),
1275 client_a.user_store.clone(),
1276 lang_registry.clone(),
1277 fs.clone(),
1278 cx,
1279 )
1280 });
1281 let (worktree_a, _) = project_a
1282 .update(&mut cx_a, |p, cx| {
1283 p.find_or_create_local_worktree("/a", false, cx)
1284 })
1285 .await
1286 .unwrap();
1287 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1288 worktree_a
1289 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1290 .await;
1291 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1292 project_a
1293 .update(&mut cx_a, |p, cx| p.share(cx))
1294 .await
1295 .unwrap();
1296
1297 // Join that project as client B
1298 let project_b = Project::remote(
1299 project_id,
1300 client_b.clone(),
1301 client_b.user_store.clone(),
1302 lang_registry.clone(),
1303 fs.clone(),
1304 &mut cx_b.to_async(),
1305 )
1306 .await
1307 .unwrap();
1308
1309 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1310 assert_eq!(
1311 project
1312 .collaborators()
1313 .get(&client_a.peer_id)
1314 .unwrap()
1315 .user
1316 .github_login,
1317 "user_a"
1318 );
1319 project.replica_id()
1320 });
1321 project_a
1322 .condition(&cx_a, |tree, _| {
1323 tree.collaborators()
1324 .get(&client_b.peer_id)
1325 .map_or(false, |collaborator| {
1326 collaborator.replica_id == replica_id_b
1327 && collaborator.user.github_login == "user_b"
1328 })
1329 })
1330 .await;
1331
1332 // Open the same file as client B and client A.
1333 let buffer_b = project_b
1334 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1335 .await
1336 .unwrap();
1337 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1338 buffer_b.read_with(&cx_b, |buf, cx| {
1339 assert_eq!(buf.read(cx).text(), "b-contents")
1340 });
1341 project_a.read_with(&cx_a, |project, cx| {
1342 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1343 });
1344 let buffer_a = project_a
1345 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1346 .await
1347 .unwrap();
1348
1349 let editor_b = cx_b.add_view(window_b, |cx| {
1350 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1351 });
1352
1353 // TODO
1354 // // Create a selection set as client B and see that selection set as client A.
1355 // buffer_a
1356 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1357 // .await;
1358
1359 // Edit the buffer as client B and see that edit as client A.
1360 editor_b.update(&mut cx_b, |editor, cx| {
1361 editor.handle_input(&Input("ok, ".into()), cx)
1362 });
1363 buffer_a
1364 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1365 .await;
1366
1367 // TODO
1368 // // Remove the selection set as client B, see those selections disappear as client A.
1369 cx_b.update(move |_| drop(editor_b));
1370 // buffer_a
1371 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1372 // .await;
1373
1374 // Close the buffer as client A, see that the buffer is closed.
1375 cx_a.update(move |_| drop(buffer_a));
1376 project_a
1377 .condition(&cx_a, |project, cx| {
1378 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1379 })
1380 .await;
1381
1382 // Dropping the client B's project removes client B from client A's collaborators.
1383 cx_b.update(move |_| drop(project_b));
1384 project_a
1385 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1386 .await;
1387 }
1388
1389 #[gpui::test]
1390 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1391 let lang_registry = Arc::new(LanguageRegistry::new());
1392 let fs = Arc::new(FakeFs::new(cx_a.background()));
1393 cx_a.foreground().forbid_parking();
1394
1395 // Connect to a server as 2 clients.
1396 let mut server = TestServer::start(cx_a.foreground()).await;
1397 let client_a = server.create_client(&mut cx_a, "user_a").await;
1398 let client_b = server.create_client(&mut cx_b, "user_b").await;
1399
1400 // Share a project as client A
1401 fs.insert_tree(
1402 "/a",
1403 json!({
1404 ".zed.toml": r#"collaborators = ["user_b"]"#,
1405 "a.txt": "a-contents",
1406 "b.txt": "b-contents",
1407 }),
1408 )
1409 .await;
1410 let project_a = cx_a.update(|cx| {
1411 Project::local(
1412 client_a.clone(),
1413 client_a.user_store.clone(),
1414 lang_registry.clone(),
1415 fs.clone(),
1416 cx,
1417 )
1418 });
1419 let (worktree_a, _) = project_a
1420 .update(&mut cx_a, |p, cx| {
1421 p.find_or_create_local_worktree("/a", false, cx)
1422 })
1423 .await
1424 .unwrap();
1425 worktree_a
1426 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1427 .await;
1428 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1429 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1430 project_a
1431 .update(&mut cx_a, |p, cx| p.share(cx))
1432 .await
1433 .unwrap();
1434 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1435
1436 // Join that project as client B
1437 let project_b = Project::remote(
1438 project_id,
1439 client_b.clone(),
1440 client_b.user_store.clone(),
1441 lang_registry.clone(),
1442 fs.clone(),
1443 &mut cx_b.to_async(),
1444 )
1445 .await
1446 .unwrap();
1447 project_b
1448 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1449 .await
1450 .unwrap();
1451
1452 // Unshare the project as client A
1453 project_a
1454 .update(&mut cx_a, |project, cx| project.unshare(cx))
1455 .await
1456 .unwrap();
1457 project_b
1458 .condition(&mut cx_b, |project, _| project.is_read_only())
1459 .await;
1460 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1461 drop(project_b);
1462
1463 // Share the project again and ensure guests can still join.
1464 project_a
1465 .update(&mut cx_a, |project, cx| project.share(cx))
1466 .await
1467 .unwrap();
1468 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1469
1470 let project_c = Project::remote(
1471 project_id,
1472 client_b.clone(),
1473 client_b.user_store.clone(),
1474 lang_registry.clone(),
1475 fs.clone(),
1476 &mut cx_b.to_async(),
1477 )
1478 .await
1479 .unwrap();
1480 project_c
1481 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1482 .await
1483 .unwrap();
1484 }
1485
1486 #[gpui::test]
1487 async fn test_propagate_saves_and_fs_changes(
1488 mut cx_a: TestAppContext,
1489 mut cx_b: TestAppContext,
1490 mut cx_c: TestAppContext,
1491 ) {
1492 let lang_registry = Arc::new(LanguageRegistry::new());
1493 let fs = Arc::new(FakeFs::new(cx_a.background()));
1494 cx_a.foreground().forbid_parking();
1495
1496 // Connect to a server as 3 clients.
1497 let mut server = TestServer::start(cx_a.foreground()).await;
1498 let client_a = server.create_client(&mut cx_a, "user_a").await;
1499 let client_b = server.create_client(&mut cx_b, "user_b").await;
1500 let client_c = server.create_client(&mut cx_c, "user_c").await;
1501
1502 // Share a worktree as client A.
1503 fs.insert_tree(
1504 "/a",
1505 json!({
1506 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1507 "file1": "",
1508 "file2": ""
1509 }),
1510 )
1511 .await;
1512 let project_a = cx_a.update(|cx| {
1513 Project::local(
1514 client_a.clone(),
1515 client_a.user_store.clone(),
1516 lang_registry.clone(),
1517 fs.clone(),
1518 cx,
1519 )
1520 });
1521 let (worktree_a, _) = project_a
1522 .update(&mut cx_a, |p, cx| {
1523 p.find_or_create_local_worktree("/a", false, cx)
1524 })
1525 .await
1526 .unwrap();
1527 worktree_a
1528 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1529 .await;
1530 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1531 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1532 project_a
1533 .update(&mut cx_a, |p, cx| p.share(cx))
1534 .await
1535 .unwrap();
1536
1537 // Join that worktree as clients B and C.
1538 let project_b = Project::remote(
1539 project_id,
1540 client_b.clone(),
1541 client_b.user_store.clone(),
1542 lang_registry.clone(),
1543 fs.clone(),
1544 &mut cx_b.to_async(),
1545 )
1546 .await
1547 .unwrap();
1548 let project_c = Project::remote(
1549 project_id,
1550 client_c.clone(),
1551 client_c.user_store.clone(),
1552 lang_registry.clone(),
1553 fs.clone(),
1554 &mut cx_c.to_async(),
1555 )
1556 .await
1557 .unwrap();
1558 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1559 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1560
1561 // Open and edit a buffer as both guests B and C.
1562 let buffer_b = project_b
1563 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1564 .await
1565 .unwrap();
1566 let buffer_c = project_c
1567 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1568 .await
1569 .unwrap();
1570 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1571 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1572
1573 // Open and edit that buffer as the host.
1574 let buffer_a = project_a
1575 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1576 .await
1577 .unwrap();
1578
1579 buffer_a
1580 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1581 .await;
1582 buffer_a.update(&mut cx_a, |buf, cx| {
1583 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1584 });
1585
1586 // Wait for edits to propagate
1587 buffer_a
1588 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1589 .await;
1590 buffer_b
1591 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1592 .await;
1593 buffer_c
1594 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1595 .await;
1596
1597 // Edit the buffer as the host and concurrently save as guest B.
1598 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1599 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1600 save_b.await.unwrap();
1601 assert_eq!(
1602 fs.load("/a/file1".as_ref()).await.unwrap(),
1603 "hi-a, i-am-c, i-am-b, i-am-a"
1604 );
1605 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1606 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1607 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1608
1609 // Make changes on host's file system, see those changes on guest worktrees.
1610 fs.rename("/a/file1".as_ref(), "/a/file1-renamed".as_ref())
1611 .await
1612 .unwrap();
1613 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1614 .await
1615 .unwrap();
1616 fs.insert_file(Path::new("/a/file4"), "4".into())
1617 .await
1618 .unwrap();
1619
1620 worktree_a
1621 .condition(&cx_a, |tree, _| tree.file_count() == 4)
1622 .await;
1623 worktree_b
1624 .condition(&cx_b, |tree, _| tree.file_count() == 4)
1625 .await;
1626 worktree_c
1627 .condition(&cx_c, |tree, _| tree.file_count() == 4)
1628 .await;
1629 worktree_a.read_with(&cx_a, |tree, _| {
1630 assert_eq!(
1631 tree.paths()
1632 .map(|p| p.to_string_lossy())
1633 .collect::<Vec<_>>(),
1634 &[".zed.toml", "file1-renamed", "file3", "file4"]
1635 )
1636 });
1637 worktree_b.read_with(&cx_b, |tree, _| {
1638 assert_eq!(
1639 tree.paths()
1640 .map(|p| p.to_string_lossy())
1641 .collect::<Vec<_>>(),
1642 &[".zed.toml", "file1-renamed", "file3", "file4"]
1643 )
1644 });
1645 worktree_c.read_with(&cx_c, |tree, _| {
1646 assert_eq!(
1647 tree.paths()
1648 .map(|p| p.to_string_lossy())
1649 .collect::<Vec<_>>(),
1650 &[".zed.toml", "file1-renamed", "file3", "file4"]
1651 )
1652 });
1653
1654 // Ensure buffer files are updated as well.
1655 buffer_a
1656 .condition(&cx_a, |buf, _| {
1657 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1658 })
1659 .await;
1660 buffer_b
1661 .condition(&cx_b, |buf, _| {
1662 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1663 })
1664 .await;
1665 buffer_c
1666 .condition(&cx_c, |buf, _| {
1667 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1668 })
1669 .await;
1670 }
1671
1672 #[gpui::test]
1673 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1674 cx_a.foreground().forbid_parking();
1675 let lang_registry = Arc::new(LanguageRegistry::new());
1676 let fs = Arc::new(FakeFs::new(cx_a.background()));
1677
1678 // Connect to a server as 2 clients.
1679 let mut server = TestServer::start(cx_a.foreground()).await;
1680 let client_a = server.create_client(&mut cx_a, "user_a").await;
1681 let client_b = server.create_client(&mut cx_b, "user_b").await;
1682
1683 // Share a project as client A
1684 fs.insert_tree(
1685 "/dir",
1686 json!({
1687 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1688 "a.txt": "a-contents",
1689 }),
1690 )
1691 .await;
1692
1693 let project_a = cx_a.update(|cx| {
1694 Project::local(
1695 client_a.clone(),
1696 client_a.user_store.clone(),
1697 lang_registry.clone(),
1698 fs.clone(),
1699 cx,
1700 )
1701 });
1702 let (worktree_a, _) = project_a
1703 .update(&mut cx_a, |p, cx| {
1704 p.find_or_create_local_worktree("/dir", false, cx)
1705 })
1706 .await
1707 .unwrap();
1708 worktree_a
1709 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1710 .await;
1711 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1712 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1713 project_a
1714 .update(&mut cx_a, |p, cx| p.share(cx))
1715 .await
1716 .unwrap();
1717
1718 // Join that project as client B
1719 let project_b = Project::remote(
1720 project_id,
1721 client_b.clone(),
1722 client_b.user_store.clone(),
1723 lang_registry.clone(),
1724 fs.clone(),
1725 &mut cx_b.to_async(),
1726 )
1727 .await
1728 .unwrap();
1729 let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1730
1731 // Open a buffer as client B
1732 let buffer_b = project_b
1733 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1734 .await
1735 .unwrap();
1736 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1737
1738 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1739 buffer_b.read_with(&cx_b, |buf, _| {
1740 assert!(buf.is_dirty());
1741 assert!(!buf.has_conflict());
1742 });
1743
1744 buffer_b
1745 .update(&mut cx_b, |buf, cx| buf.save(cx))
1746 .await
1747 .unwrap();
1748 worktree_b
1749 .condition(&cx_b, |_, cx| {
1750 buffer_b.read(cx).file().unwrap().mtime() != mtime
1751 })
1752 .await;
1753 buffer_b.read_with(&cx_b, |buf, _| {
1754 assert!(!buf.is_dirty());
1755 assert!(!buf.has_conflict());
1756 });
1757
1758 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1759 buffer_b.read_with(&cx_b, |buf, _| {
1760 assert!(buf.is_dirty());
1761 assert!(!buf.has_conflict());
1762 });
1763 }
1764
1765 #[gpui::test]
1766 async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1767 cx_a.foreground().forbid_parking();
1768 let lang_registry = Arc::new(LanguageRegistry::new());
1769 let fs = Arc::new(FakeFs::new(cx_a.background()));
1770
1771 // Connect to a server as 2 clients.
1772 let mut server = TestServer::start(cx_a.foreground()).await;
1773 let client_a = server.create_client(&mut cx_a, "user_a").await;
1774 let client_b = server.create_client(&mut cx_b, "user_b").await;
1775
1776 // Share a project as client A
1777 fs.insert_tree(
1778 "/dir",
1779 json!({
1780 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1781 "a.txt": "a-contents",
1782 }),
1783 )
1784 .await;
1785
1786 let project_a = cx_a.update(|cx| {
1787 Project::local(
1788 client_a.clone(),
1789 client_a.user_store.clone(),
1790 lang_registry.clone(),
1791 fs.clone(),
1792 cx,
1793 )
1794 });
1795 let (worktree_a, _) = project_a
1796 .update(&mut cx_a, |p, cx| {
1797 p.find_or_create_local_worktree("/dir", false, cx)
1798 })
1799 .await
1800 .unwrap();
1801 worktree_a
1802 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1803 .await;
1804 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1805 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1806 project_a
1807 .update(&mut cx_a, |p, cx| p.share(cx))
1808 .await
1809 .unwrap();
1810
1811 // Join that project as client B
1812 let project_b = Project::remote(
1813 project_id,
1814 client_b.clone(),
1815 client_b.user_store.clone(),
1816 lang_registry.clone(),
1817 fs.clone(),
1818 &mut cx_b.to_async(),
1819 )
1820 .await
1821 .unwrap();
1822 let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1823
1824 // Open a buffer as client B
1825 let buffer_b = project_b
1826 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1827 .await
1828 .unwrap();
1829 buffer_b.read_with(&cx_b, |buf, _| {
1830 assert!(!buf.is_dirty());
1831 assert!(!buf.has_conflict());
1832 });
1833
1834 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1835 .await
1836 .unwrap();
1837 buffer_b
1838 .condition(&cx_b, |buf, _| {
1839 buf.text() == "new contents" && !buf.is_dirty()
1840 })
1841 .await;
1842 buffer_b.read_with(&cx_b, |buf, _| {
1843 assert!(!buf.has_conflict());
1844 });
1845 }
1846
1847 #[gpui::test(iterations = 100)]
1848 async fn test_editing_while_guest_opens_buffer(
1849 mut cx_a: TestAppContext,
1850 mut cx_b: TestAppContext,
1851 ) {
1852 cx_a.foreground().forbid_parking();
1853 let lang_registry = Arc::new(LanguageRegistry::new());
1854 let fs = Arc::new(FakeFs::new(cx_a.background()));
1855
1856 // Connect to a server as 2 clients.
1857 let mut server = TestServer::start(cx_a.foreground()).await;
1858 let client_a = server.create_client(&mut cx_a, "user_a").await;
1859 let client_b = server.create_client(&mut cx_b, "user_b").await;
1860
1861 // Share a project as client A
1862 fs.insert_tree(
1863 "/dir",
1864 json!({
1865 ".zed.toml": r#"collaborators = ["user_b"]"#,
1866 "a.txt": "a-contents",
1867 }),
1868 )
1869 .await;
1870 let project_a = cx_a.update(|cx| {
1871 Project::local(
1872 client_a.clone(),
1873 client_a.user_store.clone(),
1874 lang_registry.clone(),
1875 fs.clone(),
1876 cx,
1877 )
1878 });
1879 let (worktree_a, _) = project_a
1880 .update(&mut cx_a, |p, cx| {
1881 p.find_or_create_local_worktree("/dir", false, cx)
1882 })
1883 .await
1884 .unwrap();
1885 worktree_a
1886 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1887 .await;
1888 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1889 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1890 project_a
1891 .update(&mut cx_a, |p, cx| p.share(cx))
1892 .await
1893 .unwrap();
1894
1895 // Join that project as client B
1896 let project_b = Project::remote(
1897 project_id,
1898 client_b.clone(),
1899 client_b.user_store.clone(),
1900 lang_registry.clone(),
1901 fs.clone(),
1902 &mut cx_b.to_async(),
1903 )
1904 .await
1905 .unwrap();
1906
1907 // Open a buffer as client A
1908 let buffer_a = project_a
1909 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1910 .await
1911 .unwrap();
1912
1913 // Start opening the same buffer as client B
1914 let buffer_b = cx_b
1915 .background()
1916 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1917 task::yield_now().await;
1918
1919 // Edit the buffer as client A while client B is still opening it.
1920 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1921
1922 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1923 let buffer_b = buffer_b.await.unwrap();
1924 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1925 }
1926
1927 #[gpui::test]
1928 async fn test_leaving_worktree_while_opening_buffer(
1929 mut cx_a: TestAppContext,
1930 mut cx_b: TestAppContext,
1931 ) {
1932 cx_a.foreground().forbid_parking();
1933 let lang_registry = Arc::new(LanguageRegistry::new());
1934 let fs = Arc::new(FakeFs::new(cx_a.background()));
1935
1936 // Connect to a server as 2 clients.
1937 let mut server = TestServer::start(cx_a.foreground()).await;
1938 let client_a = server.create_client(&mut cx_a, "user_a").await;
1939 let client_b = server.create_client(&mut cx_b, "user_b").await;
1940
1941 // Share a project as client A
1942 fs.insert_tree(
1943 "/dir",
1944 json!({
1945 ".zed.toml": r#"collaborators = ["user_b"]"#,
1946 "a.txt": "a-contents",
1947 }),
1948 )
1949 .await;
1950 let project_a = cx_a.update(|cx| {
1951 Project::local(
1952 client_a.clone(),
1953 client_a.user_store.clone(),
1954 lang_registry.clone(),
1955 fs.clone(),
1956 cx,
1957 )
1958 });
1959 let (worktree_a, _) = project_a
1960 .update(&mut cx_a, |p, cx| {
1961 p.find_or_create_local_worktree("/dir", false, cx)
1962 })
1963 .await
1964 .unwrap();
1965 worktree_a
1966 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1967 .await;
1968 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1969 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1970 project_a
1971 .update(&mut cx_a, |p, cx| p.share(cx))
1972 .await
1973 .unwrap();
1974
1975 // Join that project as client B
1976 let project_b = Project::remote(
1977 project_id,
1978 client_b.clone(),
1979 client_b.user_store.clone(),
1980 lang_registry.clone(),
1981 fs.clone(),
1982 &mut cx_b.to_async(),
1983 )
1984 .await
1985 .unwrap();
1986
1987 // See that a guest has joined as client A.
1988 project_a
1989 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1990 .await;
1991
1992 // Begin opening a buffer as client B, but leave the project before the open completes.
1993 let buffer_b = cx_b
1994 .background()
1995 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1996 cx_b.update(|_| drop(project_b));
1997 drop(buffer_b);
1998
1999 // See that the guest has left.
2000 project_a
2001 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2002 .await;
2003 }
2004
2005 #[gpui::test]
2006 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2007 cx_a.foreground().forbid_parking();
2008 let lang_registry = Arc::new(LanguageRegistry::new());
2009 let fs = Arc::new(FakeFs::new(cx_a.background()));
2010
2011 // Connect to a server as 2 clients.
2012 let mut server = TestServer::start(cx_a.foreground()).await;
2013 let client_a = server.create_client(&mut cx_a, "user_a").await;
2014 let client_b = server.create_client(&mut cx_b, "user_b").await;
2015
2016 // Share a project as client A
2017 fs.insert_tree(
2018 "/a",
2019 json!({
2020 ".zed.toml": r#"collaborators = ["user_b"]"#,
2021 "a.txt": "a-contents",
2022 "b.txt": "b-contents",
2023 }),
2024 )
2025 .await;
2026 let project_a = cx_a.update(|cx| {
2027 Project::local(
2028 client_a.clone(),
2029 client_a.user_store.clone(),
2030 lang_registry.clone(),
2031 fs.clone(),
2032 cx,
2033 )
2034 });
2035 let (worktree_a, _) = project_a
2036 .update(&mut cx_a, |p, cx| {
2037 p.find_or_create_local_worktree("/a", false, cx)
2038 })
2039 .await
2040 .unwrap();
2041 worktree_a
2042 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2043 .await;
2044 let project_id = project_a
2045 .update(&mut cx_a, |project, _| project.next_remote_id())
2046 .await;
2047 project_a
2048 .update(&mut cx_a, |project, cx| project.share(cx))
2049 .await
2050 .unwrap();
2051
2052 // Join that project as client B
2053 let _project_b = Project::remote(
2054 project_id,
2055 client_b.clone(),
2056 client_b.user_store.clone(),
2057 lang_registry.clone(),
2058 fs.clone(),
2059 &mut cx_b.to_async(),
2060 )
2061 .await
2062 .unwrap();
2063
2064 // See that a guest has joined as client A.
2065 project_a
2066 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2067 .await;
2068
2069 // Drop client B's connection and ensure client A observes client B leaving the worktree.
2070 client_b.disconnect(&cx_b.to_async()).unwrap();
2071 project_a
2072 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2073 .await;
2074 }
2075
2076 #[gpui::test]
2077 async fn test_collaborating_with_diagnostics(
2078 mut cx_a: TestAppContext,
2079 mut cx_b: TestAppContext,
2080 ) {
2081 cx_a.foreground().forbid_parking();
2082 let mut lang_registry = Arc::new(LanguageRegistry::new());
2083 let fs = Arc::new(FakeFs::new(cx_a.background()));
2084
2085 // Set up a fake language server.
2086 let (language_server_config, mut fake_language_server) =
2087 LanguageServerConfig::fake(cx_a.background()).await;
2088 Arc::get_mut(&mut lang_registry)
2089 .unwrap()
2090 .add(Arc::new(Language::new(
2091 LanguageConfig {
2092 name: "Rust".to_string(),
2093 path_suffixes: vec!["rs".to_string()],
2094 language_server: Some(language_server_config),
2095 ..Default::default()
2096 },
2097 Some(tree_sitter_rust::language()),
2098 )));
2099
2100 // Connect to a server as 2 clients.
2101 let mut server = TestServer::start(cx_a.foreground()).await;
2102 let client_a = server.create_client(&mut cx_a, "user_a").await;
2103 let client_b = server.create_client(&mut cx_b, "user_b").await;
2104
2105 // Share a project as client A
2106 fs.insert_tree(
2107 "/a",
2108 json!({
2109 ".zed.toml": r#"collaborators = ["user_b"]"#,
2110 "a.rs": "let one = two",
2111 "other.rs": "",
2112 }),
2113 )
2114 .await;
2115 let project_a = cx_a.update(|cx| {
2116 Project::local(
2117 client_a.clone(),
2118 client_a.user_store.clone(),
2119 lang_registry.clone(),
2120 fs.clone(),
2121 cx,
2122 )
2123 });
2124 let (worktree_a, _) = project_a
2125 .update(&mut cx_a, |p, cx| {
2126 p.find_or_create_local_worktree("/a", false, cx)
2127 })
2128 .await
2129 .unwrap();
2130 worktree_a
2131 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2132 .await;
2133 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2134 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2135 project_a
2136 .update(&mut cx_a, |p, cx| p.share(cx))
2137 .await
2138 .unwrap();
2139
2140 // Cause the language server to start.
2141 let _ = cx_a
2142 .background()
2143 .spawn(project_a.update(&mut cx_a, |project, cx| {
2144 project.open_buffer(
2145 ProjectPath {
2146 worktree_id,
2147 path: Path::new("other.rs").into(),
2148 },
2149 cx,
2150 )
2151 }))
2152 .await
2153 .unwrap();
2154
2155 // Simulate a language server reporting errors for a file.
2156 fake_language_server
2157 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2158 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2159 version: None,
2160 diagnostics: vec![lsp::Diagnostic {
2161 severity: Some(lsp::DiagnosticSeverity::ERROR),
2162 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2163 message: "message 1".to_string(),
2164 ..Default::default()
2165 }],
2166 })
2167 .await;
2168
2169 // Wait for server to see the diagnostics update.
2170 server
2171 .condition(|store| {
2172 let worktree = store
2173 .project(project_id)
2174 .unwrap()
2175 .worktrees
2176 .get(&worktree_id.to_proto())
2177 .unwrap();
2178
2179 !worktree
2180 .share
2181 .as_ref()
2182 .unwrap()
2183 .diagnostic_summaries
2184 .is_empty()
2185 })
2186 .await;
2187
2188 // Join the worktree as client B.
2189 let project_b = Project::remote(
2190 project_id,
2191 client_b.clone(),
2192 client_b.user_store.clone(),
2193 lang_registry.clone(),
2194 fs.clone(),
2195 &mut cx_b.to_async(),
2196 )
2197 .await
2198 .unwrap();
2199
2200 project_b.read_with(&cx_b, |project, cx| {
2201 assert_eq!(
2202 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2203 &[(
2204 ProjectPath {
2205 worktree_id,
2206 path: Arc::from(Path::new("a.rs")),
2207 },
2208 DiagnosticSummary {
2209 error_count: 1,
2210 warning_count: 0,
2211 ..Default::default()
2212 },
2213 )]
2214 )
2215 });
2216
2217 // Simulate a language server reporting more errors for a file.
2218 fake_language_server
2219 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2220 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2221 version: None,
2222 diagnostics: vec![
2223 lsp::Diagnostic {
2224 severity: Some(lsp::DiagnosticSeverity::ERROR),
2225 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2226 message: "message 1".to_string(),
2227 ..Default::default()
2228 },
2229 lsp::Diagnostic {
2230 severity: Some(lsp::DiagnosticSeverity::WARNING),
2231 range: lsp::Range::new(
2232 lsp::Position::new(0, 10),
2233 lsp::Position::new(0, 13),
2234 ),
2235 message: "message 2".to_string(),
2236 ..Default::default()
2237 },
2238 ],
2239 })
2240 .await;
2241
2242 // Client b gets the updated summaries
2243 project_b
2244 .condition(&cx_b, |project, cx| {
2245 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2246 == &[(
2247 ProjectPath {
2248 worktree_id,
2249 path: Arc::from(Path::new("a.rs")),
2250 },
2251 DiagnosticSummary {
2252 error_count: 1,
2253 warning_count: 1,
2254 ..Default::default()
2255 },
2256 )]
2257 })
2258 .await;
2259
2260 // Open the file with the errors on client B. They should be present.
2261 let buffer_b = cx_b
2262 .background()
2263 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2264 .await
2265 .unwrap();
2266
2267 buffer_b.read_with(&cx_b, |buffer, _| {
2268 assert_eq!(
2269 buffer
2270 .snapshot()
2271 .diagnostics_in_range::<_, Point>(0..buffer.len())
2272 .map(|entry| entry)
2273 .collect::<Vec<_>>(),
2274 &[
2275 DiagnosticEntry {
2276 range: Point::new(0, 4)..Point::new(0, 7),
2277 diagnostic: Diagnostic {
2278 group_id: 0,
2279 message: "message 1".to_string(),
2280 severity: lsp::DiagnosticSeverity::ERROR,
2281 is_primary: true,
2282 ..Default::default()
2283 }
2284 },
2285 DiagnosticEntry {
2286 range: Point::new(0, 10)..Point::new(0, 13),
2287 diagnostic: Diagnostic {
2288 group_id: 1,
2289 severity: lsp::DiagnosticSeverity::WARNING,
2290 message: "message 2".to_string(),
2291 is_primary: true,
2292 ..Default::default()
2293 }
2294 }
2295 ]
2296 );
2297 });
2298 }
2299
2300 #[gpui::test]
2301 async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2302 cx_a.foreground().forbid_parking();
2303 let mut lang_registry = Arc::new(LanguageRegistry::new());
2304 let fs = Arc::new(FakeFs::new(cx_a.background()));
2305
2306 // Set up a fake language server.
2307 let (language_server_config, mut fake_language_server) =
2308 LanguageServerConfig::fake(cx_a.background()).await;
2309 Arc::get_mut(&mut lang_registry)
2310 .unwrap()
2311 .add(Arc::new(Language::new(
2312 LanguageConfig {
2313 name: "Rust".to_string(),
2314 path_suffixes: vec!["rs".to_string()],
2315 language_server: Some(language_server_config),
2316 ..Default::default()
2317 },
2318 Some(tree_sitter_rust::language()),
2319 )));
2320
2321 // Connect to a server as 2 clients.
2322 let mut server = TestServer::start(cx_a.foreground()).await;
2323 let client_a = server.create_client(&mut cx_a, "user_a").await;
2324 let client_b = server.create_client(&mut cx_b, "user_b").await;
2325
2326 // Share a project as client A
2327 fs.insert_tree(
2328 "/a",
2329 json!({
2330 ".zed.toml": r#"collaborators = ["user_b"]"#,
2331 "a.rs": "let one = two",
2332 }),
2333 )
2334 .await;
2335 let project_a = cx_a.update(|cx| {
2336 Project::local(
2337 client_a.clone(),
2338 client_a.user_store.clone(),
2339 lang_registry.clone(),
2340 fs.clone(),
2341 cx,
2342 )
2343 });
2344 let (worktree_a, _) = project_a
2345 .update(&mut cx_a, |p, cx| {
2346 p.find_or_create_local_worktree("/a", false, cx)
2347 })
2348 .await
2349 .unwrap();
2350 worktree_a
2351 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2352 .await;
2353 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2354 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2355 project_a
2356 .update(&mut cx_a, |p, cx| p.share(cx))
2357 .await
2358 .unwrap();
2359
2360 // Join the worktree as client B.
2361 let project_b = Project::remote(
2362 project_id,
2363 client_b.clone(),
2364 client_b.user_store.clone(),
2365 lang_registry.clone(),
2366 fs.clone(),
2367 &mut cx_b.to_async(),
2368 )
2369 .await
2370 .unwrap();
2371
2372 let buffer_b = cx_b
2373 .background()
2374 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2375 .await
2376 .unwrap();
2377
2378 let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
2379 let (request_id, _) = fake_language_server
2380 .receive_request::<lsp::request::Formatting>()
2381 .await;
2382 fake_language_server
2383 .respond(
2384 request_id,
2385 Some(vec![
2386 lsp::TextEdit {
2387 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2388 new_text: "h".to_string(),
2389 },
2390 lsp::TextEdit {
2391 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2392 new_text: "y".to_string(),
2393 },
2394 ]),
2395 )
2396 .await;
2397 format.await.unwrap();
2398 assert_eq!(
2399 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2400 "let honey = two"
2401 );
2402 }
2403
2404 #[gpui::test]
2405 async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2406 cx_a.foreground().forbid_parking();
2407 let mut lang_registry = Arc::new(LanguageRegistry::new());
2408 let fs = Arc::new(FakeFs::new(cx_a.background()));
2409 fs.insert_tree(
2410 "/root-1",
2411 json!({
2412 ".zed.toml": r#"collaborators = ["user_b"]"#,
2413 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2414 }),
2415 )
2416 .await;
2417 fs.insert_tree(
2418 "/root-2",
2419 json!({
2420 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2421 }),
2422 )
2423 .await;
2424
2425 // Set up a fake language server.
2426 let (language_server_config, mut fake_language_server) =
2427 LanguageServerConfig::fake(cx_a.background()).await;
2428 Arc::get_mut(&mut lang_registry)
2429 .unwrap()
2430 .add(Arc::new(Language::new(
2431 LanguageConfig {
2432 name: "Rust".to_string(),
2433 path_suffixes: vec!["rs".to_string()],
2434 language_server: Some(language_server_config),
2435 ..Default::default()
2436 },
2437 Some(tree_sitter_rust::language()),
2438 )));
2439
2440 // Connect to a server as 2 clients.
2441 let mut server = TestServer::start(cx_a.foreground()).await;
2442 let client_a = server.create_client(&mut cx_a, "user_a").await;
2443 let client_b = server.create_client(&mut cx_b, "user_b").await;
2444
2445 // Share a project as client A
2446 let project_a = cx_a.update(|cx| {
2447 Project::local(
2448 client_a.clone(),
2449 client_a.user_store.clone(),
2450 lang_registry.clone(),
2451 fs.clone(),
2452 cx,
2453 )
2454 });
2455 let (worktree_a, _) = project_a
2456 .update(&mut cx_a, |p, cx| {
2457 p.find_or_create_local_worktree("/root-1", false, cx)
2458 })
2459 .await
2460 .unwrap();
2461 worktree_a
2462 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2463 .await;
2464 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2465 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2466 project_a
2467 .update(&mut cx_a, |p, cx| p.share(cx))
2468 .await
2469 .unwrap();
2470
2471 // Join the worktree as client B.
2472 let project_b = Project::remote(
2473 project_id,
2474 client_b.clone(),
2475 client_b.user_store.clone(),
2476 lang_registry.clone(),
2477 fs.clone(),
2478 &mut cx_b.to_async(),
2479 )
2480 .await
2481 .unwrap();
2482
2483 // Open the file to be formatted on client B.
2484 let buffer_b = cx_b
2485 .background()
2486 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2487 .await
2488 .unwrap();
2489
2490 let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2491 let (request_id, _) = fake_language_server
2492 .receive_request::<lsp::request::GotoDefinition>()
2493 .await;
2494 fake_language_server
2495 .respond(
2496 request_id,
2497 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2498 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2499 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2500 ))),
2501 )
2502 .await;
2503 let definitions_1 = definitions_1.await.unwrap();
2504 cx_b.read(|cx| {
2505 assert_eq!(definitions_1.len(), 1);
2506 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2507 let target_buffer = definitions_1[0].target_buffer.read(cx);
2508 assert_eq!(
2509 target_buffer.text(),
2510 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2511 );
2512 assert_eq!(
2513 definitions_1[0].target_range.to_point(target_buffer),
2514 Point::new(0, 6)..Point::new(0, 9)
2515 );
2516 });
2517
2518 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2519 // the previous call to `definition`.
2520 let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2521 let (request_id, _) = fake_language_server
2522 .receive_request::<lsp::request::GotoDefinition>()
2523 .await;
2524 fake_language_server
2525 .respond(
2526 request_id,
2527 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2528 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2529 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2530 ))),
2531 )
2532 .await;
2533 let definitions_2 = definitions_2.await.unwrap();
2534 cx_b.read(|cx| {
2535 assert_eq!(definitions_2.len(), 1);
2536 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2537 let target_buffer = definitions_2[0].target_buffer.read(cx);
2538 assert_eq!(
2539 target_buffer.text(),
2540 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2541 );
2542 assert_eq!(
2543 definitions_2[0].target_range.to_point(target_buffer),
2544 Point::new(1, 6)..Point::new(1, 11)
2545 );
2546 });
2547 assert_eq!(
2548 definitions_1[0].target_buffer,
2549 definitions_2[0].target_buffer
2550 );
2551
2552 cx_b.update(|_| {
2553 drop(definitions_1);
2554 drop(definitions_2);
2555 });
2556 project_b
2557 .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2558 .await;
2559 }
2560
2561 #[gpui::test]
2562 async fn test_open_buffer_while_getting_definition_pointing_to_it(
2563 mut cx_a: TestAppContext,
2564 mut cx_b: TestAppContext,
2565 mut rng: StdRng,
2566 ) {
2567 cx_a.foreground().forbid_parking();
2568 let mut lang_registry = Arc::new(LanguageRegistry::new());
2569 let fs = Arc::new(FakeFs::new(cx_a.background()));
2570 fs.insert_tree(
2571 "/root",
2572 json!({
2573 ".zed.toml": r#"collaborators = ["user_b"]"#,
2574 "a.rs": "const ONE: usize = b::TWO;",
2575 "b.rs": "const TWO: usize = 2",
2576 }),
2577 )
2578 .await;
2579
2580 // Set up a fake language server.
2581 let (language_server_config, mut fake_language_server) =
2582 LanguageServerConfig::fake(cx_a.background()).await;
2583 Arc::get_mut(&mut lang_registry)
2584 .unwrap()
2585 .add(Arc::new(Language::new(
2586 LanguageConfig {
2587 name: "Rust".to_string(),
2588 path_suffixes: vec!["rs".to_string()],
2589 language_server: Some(language_server_config),
2590 ..Default::default()
2591 },
2592 Some(tree_sitter_rust::language()),
2593 )));
2594
2595 // Connect to a server as 2 clients.
2596 let mut server = TestServer::start(cx_a.foreground()).await;
2597 let client_a = server.create_client(&mut cx_a, "user_a").await;
2598 let client_b = server.create_client(&mut cx_b, "user_b").await;
2599
2600 // Share a project as client A
2601 let project_a = cx_a.update(|cx| {
2602 Project::local(
2603 client_a.clone(),
2604 client_a.user_store.clone(),
2605 lang_registry.clone(),
2606 fs.clone(),
2607 cx,
2608 )
2609 });
2610 let (worktree_a, _) = project_a
2611 .update(&mut cx_a, |p, cx| {
2612 p.find_or_create_local_worktree("/root", false, cx)
2613 })
2614 .await
2615 .unwrap();
2616 worktree_a
2617 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2618 .await;
2619 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2620 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2621 project_a
2622 .update(&mut cx_a, |p, cx| p.share(cx))
2623 .await
2624 .unwrap();
2625
2626 // Join the worktree as client B.
2627 let project_b = Project::remote(
2628 project_id,
2629 client_b.clone(),
2630 client_b.user_store.clone(),
2631 lang_registry.clone(),
2632 fs.clone(),
2633 &mut cx_b.to_async(),
2634 )
2635 .await
2636 .unwrap();
2637
2638 let buffer_b1 = cx_b
2639 .background()
2640 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2641 .await
2642 .unwrap();
2643
2644 let definitions;
2645 let buffer_b2;
2646 if rng.gen() {
2647 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2648 buffer_b2 =
2649 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2650 } else {
2651 buffer_b2 =
2652 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2653 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2654 }
2655
2656 let (request_id, _) = fake_language_server
2657 .receive_request::<lsp::request::GotoDefinition>()
2658 .await;
2659 fake_language_server
2660 .respond(
2661 request_id,
2662 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2663 lsp::Url::from_file_path("/root/b.rs").unwrap(),
2664 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2665 ))),
2666 )
2667 .await;
2668
2669 let buffer_b2 = buffer_b2.await.unwrap();
2670 let definitions = definitions.await.unwrap();
2671 assert_eq!(definitions.len(), 1);
2672 assert_eq!(definitions[0].target_buffer, buffer_b2);
2673 }
2674
2675 #[gpui::test]
2676 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2677 cx_a.foreground().forbid_parking();
2678
2679 // Connect to a server as 2 clients.
2680 let mut server = TestServer::start(cx_a.foreground()).await;
2681 let client_a = server.create_client(&mut cx_a, "user_a").await;
2682 let client_b = server.create_client(&mut cx_b, "user_b").await;
2683
2684 // Create an org that includes these 2 users.
2685 let db = &server.app_state.db;
2686 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2687 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2688 .await
2689 .unwrap();
2690 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2691 .await
2692 .unwrap();
2693
2694 // Create a channel that includes all the users.
2695 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2696 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2697 .await
2698 .unwrap();
2699 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2700 .await
2701 .unwrap();
2702 db.create_channel_message(
2703 channel_id,
2704 client_b.current_user_id(&cx_b),
2705 "hello A, it's B.",
2706 OffsetDateTime::now_utc(),
2707 1,
2708 )
2709 .await
2710 .unwrap();
2711
2712 let channels_a = cx_a
2713 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2714 channels_a
2715 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2716 .await;
2717 channels_a.read_with(&cx_a, |list, _| {
2718 assert_eq!(
2719 list.available_channels().unwrap(),
2720 &[ChannelDetails {
2721 id: channel_id.to_proto(),
2722 name: "test-channel".to_string()
2723 }]
2724 )
2725 });
2726 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2727 this.get_channel(channel_id.to_proto(), cx).unwrap()
2728 });
2729 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2730 channel_a
2731 .condition(&cx_a, |channel, _| {
2732 channel_messages(channel)
2733 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2734 })
2735 .await;
2736
2737 let channels_b = cx_b
2738 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2739 channels_b
2740 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2741 .await;
2742 channels_b.read_with(&cx_b, |list, _| {
2743 assert_eq!(
2744 list.available_channels().unwrap(),
2745 &[ChannelDetails {
2746 id: channel_id.to_proto(),
2747 name: "test-channel".to_string()
2748 }]
2749 )
2750 });
2751
2752 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2753 this.get_channel(channel_id.to_proto(), cx).unwrap()
2754 });
2755 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2756 channel_b
2757 .condition(&cx_b, |channel, _| {
2758 channel_messages(channel)
2759 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2760 })
2761 .await;
2762
2763 channel_a
2764 .update(&mut cx_a, |channel, cx| {
2765 channel
2766 .send_message("oh, hi B.".to_string(), cx)
2767 .unwrap()
2768 .detach();
2769 let task = channel.send_message("sup".to_string(), cx).unwrap();
2770 assert_eq!(
2771 channel_messages(channel),
2772 &[
2773 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2774 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2775 ("user_a".to_string(), "sup".to_string(), true)
2776 ]
2777 );
2778 task
2779 })
2780 .await
2781 .unwrap();
2782
2783 channel_b
2784 .condition(&cx_b, |channel, _| {
2785 channel_messages(channel)
2786 == [
2787 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2788 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2789 ("user_a".to_string(), "sup".to_string(), false),
2790 ]
2791 })
2792 .await;
2793
2794 assert_eq!(
2795 server
2796 .state()
2797 .await
2798 .channel(channel_id)
2799 .unwrap()
2800 .connection_ids
2801 .len(),
2802 2
2803 );
2804 cx_b.update(|_| drop(channel_b));
2805 server
2806 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2807 .await;
2808
2809 cx_a.update(|_| drop(channel_a));
2810 server
2811 .condition(|state| state.channel(channel_id).is_none())
2812 .await;
2813 }
2814
2815 #[gpui::test]
2816 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2817 cx_a.foreground().forbid_parking();
2818
2819 let mut server = TestServer::start(cx_a.foreground()).await;
2820 let client_a = server.create_client(&mut cx_a, "user_a").await;
2821
2822 let db = &server.app_state.db;
2823 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2824 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2825 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2826 .await
2827 .unwrap();
2828 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2829 .await
2830 .unwrap();
2831
2832 let channels_a = cx_a
2833 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2834 channels_a
2835 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2836 .await;
2837 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2838 this.get_channel(channel_id.to_proto(), cx).unwrap()
2839 });
2840
2841 // Messages aren't allowed to be too long.
2842 channel_a
2843 .update(&mut cx_a, |channel, cx| {
2844 let long_body = "this is long.\n".repeat(1024);
2845 channel.send_message(long_body, cx).unwrap()
2846 })
2847 .await
2848 .unwrap_err();
2849
2850 // Messages aren't allowed to be blank.
2851 channel_a.update(&mut cx_a, |channel, cx| {
2852 channel.send_message(String::new(), cx).unwrap_err()
2853 });
2854
2855 // Leading and trailing whitespace are trimmed.
2856 channel_a
2857 .update(&mut cx_a, |channel, cx| {
2858 channel
2859 .send_message("\n surrounded by whitespace \n".to_string(), cx)
2860 .unwrap()
2861 })
2862 .await
2863 .unwrap();
2864 assert_eq!(
2865 db.get_channel_messages(channel_id, 10, None)
2866 .await
2867 .unwrap()
2868 .iter()
2869 .map(|m| &m.body)
2870 .collect::<Vec<_>>(),
2871 &["surrounded by whitespace"]
2872 );
2873 }
2874
2875 #[gpui::test]
2876 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2877 cx_a.foreground().forbid_parking();
2878
2879 // Connect to a server as 2 clients.
2880 let mut server = TestServer::start(cx_a.foreground()).await;
2881 let client_a = server.create_client(&mut cx_a, "user_a").await;
2882 let client_b = server.create_client(&mut cx_b, "user_b").await;
2883 let mut status_b = client_b.status();
2884
2885 // Create an org that includes these 2 users.
2886 let db = &server.app_state.db;
2887 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2888 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2889 .await
2890 .unwrap();
2891 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2892 .await
2893 .unwrap();
2894
2895 // Create a channel that includes all the users.
2896 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2897 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2898 .await
2899 .unwrap();
2900 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2901 .await
2902 .unwrap();
2903 db.create_channel_message(
2904 channel_id,
2905 client_b.current_user_id(&cx_b),
2906 "hello A, it's B.",
2907 OffsetDateTime::now_utc(),
2908 2,
2909 )
2910 .await
2911 .unwrap();
2912
2913 let channels_a = cx_a
2914 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2915 channels_a
2916 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2917 .await;
2918
2919 channels_a.read_with(&cx_a, |list, _| {
2920 assert_eq!(
2921 list.available_channels().unwrap(),
2922 &[ChannelDetails {
2923 id: channel_id.to_proto(),
2924 name: "test-channel".to_string()
2925 }]
2926 )
2927 });
2928 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2929 this.get_channel(channel_id.to_proto(), cx).unwrap()
2930 });
2931 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2932 channel_a
2933 .condition(&cx_a, |channel, _| {
2934 channel_messages(channel)
2935 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2936 })
2937 .await;
2938
2939 let channels_b = cx_b
2940 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2941 channels_b
2942 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2943 .await;
2944 channels_b.read_with(&cx_b, |list, _| {
2945 assert_eq!(
2946 list.available_channels().unwrap(),
2947 &[ChannelDetails {
2948 id: channel_id.to_proto(),
2949 name: "test-channel".to_string()
2950 }]
2951 )
2952 });
2953
2954 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2955 this.get_channel(channel_id.to_proto(), cx).unwrap()
2956 });
2957 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2958 channel_b
2959 .condition(&cx_b, |channel, _| {
2960 channel_messages(channel)
2961 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2962 })
2963 .await;
2964
2965 // Disconnect client B, ensuring we can still access its cached channel data.
2966 server.forbid_connections();
2967 server.disconnect_client(client_b.current_user_id(&cx_b));
2968 while !matches!(
2969 status_b.next().await,
2970 Some(client::Status::ReconnectionError { .. })
2971 ) {}
2972
2973 channels_b.read_with(&cx_b, |channels, _| {
2974 assert_eq!(
2975 channels.available_channels().unwrap(),
2976 [ChannelDetails {
2977 id: channel_id.to_proto(),
2978 name: "test-channel".to_string()
2979 }]
2980 )
2981 });
2982 channel_b.read_with(&cx_b, |channel, _| {
2983 assert_eq!(
2984 channel_messages(channel),
2985 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2986 )
2987 });
2988
2989 // Send a message from client B while it is disconnected.
2990 channel_b
2991 .update(&mut cx_b, |channel, cx| {
2992 let task = channel
2993 .send_message("can you see this?".to_string(), cx)
2994 .unwrap();
2995 assert_eq!(
2996 channel_messages(channel),
2997 &[
2998 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2999 ("user_b".to_string(), "can you see this?".to_string(), true)
3000 ]
3001 );
3002 task
3003 })
3004 .await
3005 .unwrap_err();
3006
3007 // Send a message from client A while B is disconnected.
3008 channel_a
3009 .update(&mut cx_a, |channel, cx| {
3010 channel
3011 .send_message("oh, hi B.".to_string(), cx)
3012 .unwrap()
3013 .detach();
3014 let task = channel.send_message("sup".to_string(), cx).unwrap();
3015 assert_eq!(
3016 channel_messages(channel),
3017 &[
3018 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3019 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3020 ("user_a".to_string(), "sup".to_string(), true)
3021 ]
3022 );
3023 task
3024 })
3025 .await
3026 .unwrap();
3027
3028 // Give client B a chance to reconnect.
3029 server.allow_connections();
3030 cx_b.foreground().advance_clock(Duration::from_secs(10));
3031
3032 // Verify that B sees the new messages upon reconnection, as well as the message client B
3033 // sent while offline.
3034 channel_b
3035 .condition(&cx_b, |channel, _| {
3036 channel_messages(channel)
3037 == [
3038 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3039 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3040 ("user_a".to_string(), "sup".to_string(), false),
3041 ("user_b".to_string(), "can you see this?".to_string(), false),
3042 ]
3043 })
3044 .await;
3045
3046 // Ensure client A and B can communicate normally after reconnection.
3047 channel_a
3048 .update(&mut cx_a, |channel, cx| {
3049 channel.send_message("you online?".to_string(), cx).unwrap()
3050 })
3051 .await
3052 .unwrap();
3053 channel_b
3054 .condition(&cx_b, |channel, _| {
3055 channel_messages(channel)
3056 == [
3057 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3058 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3059 ("user_a".to_string(), "sup".to_string(), false),
3060 ("user_b".to_string(), "can you see this?".to_string(), false),
3061 ("user_a".to_string(), "you online?".to_string(), false),
3062 ]
3063 })
3064 .await;
3065
3066 channel_b
3067 .update(&mut cx_b, |channel, cx| {
3068 channel.send_message("yep".to_string(), cx).unwrap()
3069 })
3070 .await
3071 .unwrap();
3072 channel_a
3073 .condition(&cx_a, |channel, _| {
3074 channel_messages(channel)
3075 == [
3076 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3077 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3078 ("user_a".to_string(), "sup".to_string(), false),
3079 ("user_b".to_string(), "can you see this?".to_string(), false),
3080 ("user_a".to_string(), "you online?".to_string(), false),
3081 ("user_b".to_string(), "yep".to_string(), false),
3082 ]
3083 })
3084 .await;
3085 }
3086
3087 #[gpui::test]
3088 async fn test_contacts(
3089 mut cx_a: TestAppContext,
3090 mut cx_b: TestAppContext,
3091 mut cx_c: TestAppContext,
3092 ) {
3093 cx_a.foreground().forbid_parking();
3094 let lang_registry = Arc::new(LanguageRegistry::new());
3095 let fs = Arc::new(FakeFs::new(cx_a.background()));
3096
3097 // Connect to a server as 3 clients.
3098 let mut server = TestServer::start(cx_a.foreground()).await;
3099 let client_a = server.create_client(&mut cx_a, "user_a").await;
3100 let client_b = server.create_client(&mut cx_b, "user_b").await;
3101 let client_c = server.create_client(&mut cx_c, "user_c").await;
3102
3103 // Share a worktree as client A.
3104 fs.insert_tree(
3105 "/a",
3106 json!({
3107 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3108 }),
3109 )
3110 .await;
3111
3112 let project_a = cx_a.update(|cx| {
3113 Project::local(
3114 client_a.clone(),
3115 client_a.user_store.clone(),
3116 lang_registry.clone(),
3117 fs.clone(),
3118 cx,
3119 )
3120 });
3121 let (worktree_a, _) = project_a
3122 .update(&mut cx_a, |p, cx| {
3123 p.find_or_create_local_worktree("/a", false, cx)
3124 })
3125 .await
3126 .unwrap();
3127 worktree_a
3128 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3129 .await;
3130
3131 client_a
3132 .user_store
3133 .condition(&cx_a, |user_store, _| {
3134 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3135 })
3136 .await;
3137 client_b
3138 .user_store
3139 .condition(&cx_b, |user_store, _| {
3140 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3141 })
3142 .await;
3143 client_c
3144 .user_store
3145 .condition(&cx_c, |user_store, _| {
3146 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3147 })
3148 .await;
3149
3150 let project_id = project_a
3151 .update(&mut cx_a, |project, _| project.next_remote_id())
3152 .await;
3153 project_a
3154 .update(&mut cx_a, |project, cx| project.share(cx))
3155 .await
3156 .unwrap();
3157
3158 let _project_b = Project::remote(
3159 project_id,
3160 client_b.clone(),
3161 client_b.user_store.clone(),
3162 lang_registry.clone(),
3163 fs.clone(),
3164 &mut cx_b.to_async(),
3165 )
3166 .await
3167 .unwrap();
3168
3169 client_a
3170 .user_store
3171 .condition(&cx_a, |user_store, _| {
3172 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3173 })
3174 .await;
3175 client_b
3176 .user_store
3177 .condition(&cx_b, |user_store, _| {
3178 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3179 })
3180 .await;
3181 client_c
3182 .user_store
3183 .condition(&cx_c, |user_store, _| {
3184 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3185 })
3186 .await;
3187
3188 project_a
3189 .condition(&cx_a, |project, _| {
3190 project.collaborators().contains_key(&client_b.peer_id)
3191 })
3192 .await;
3193
3194 cx_a.update(move |_| drop(project_a));
3195 client_a
3196 .user_store
3197 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3198 .await;
3199 client_b
3200 .user_store
3201 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3202 .await;
3203 client_c
3204 .user_store
3205 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3206 .await;
3207
3208 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3209 user_store
3210 .contacts()
3211 .iter()
3212 .map(|contact| {
3213 let worktrees = contact
3214 .projects
3215 .iter()
3216 .map(|p| {
3217 (
3218 p.worktree_root_names[0].as_str(),
3219 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3220 )
3221 })
3222 .collect();
3223 (contact.user.github_login.as_str(), worktrees)
3224 })
3225 .collect()
3226 }
3227 }
3228
3229 struct TestServer {
3230 peer: Arc<Peer>,
3231 app_state: Arc<AppState>,
3232 server: Arc<Server>,
3233 foreground: Rc<executor::Foreground>,
3234 notifications: mpsc::Receiver<()>,
3235 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3236 forbid_connections: Arc<AtomicBool>,
3237 _test_db: TestDb,
3238 }
3239
3240 impl TestServer {
3241 async fn start(foreground: Rc<executor::Foreground>) -> Self {
3242 let test_db = TestDb::new();
3243 let app_state = Self::build_app_state(&test_db).await;
3244 let peer = Peer::new();
3245 let notifications = mpsc::channel(128);
3246 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3247 Self {
3248 peer,
3249 app_state,
3250 server,
3251 foreground,
3252 notifications: notifications.1,
3253 connection_killers: Default::default(),
3254 forbid_connections: Default::default(),
3255 _test_db: test_db,
3256 }
3257 }
3258
3259 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3260 let http = FakeHttpClient::with_404_response();
3261 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3262 let client_name = name.to_string();
3263 let mut client = Client::new(http.clone());
3264 let server = self.server.clone();
3265 let connection_killers = self.connection_killers.clone();
3266 let forbid_connections = self.forbid_connections.clone();
3267 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3268
3269 Arc::get_mut(&mut client)
3270 .unwrap()
3271 .override_authenticate(move |cx| {
3272 cx.spawn(|_| async move {
3273 let access_token = "the-token".to_string();
3274 Ok(Credentials {
3275 user_id: user_id.0 as u64,
3276 access_token,
3277 })
3278 })
3279 })
3280 .override_establish_connection(move |credentials, cx| {
3281 assert_eq!(credentials.user_id, user_id.0 as u64);
3282 assert_eq!(credentials.access_token, "the-token");
3283
3284 let server = server.clone();
3285 let connection_killers = connection_killers.clone();
3286 let forbid_connections = forbid_connections.clone();
3287 let client_name = client_name.clone();
3288 let connection_id_tx = connection_id_tx.clone();
3289 cx.spawn(move |cx| async move {
3290 if forbid_connections.load(SeqCst) {
3291 Err(EstablishConnectionError::other(anyhow!(
3292 "server is forbidding connections"
3293 )))
3294 } else {
3295 let (client_conn, server_conn, kill_conn) =
3296 Connection::in_memory(cx.background());
3297 connection_killers.lock().insert(user_id, kill_conn);
3298 cx.background()
3299 .spawn(server.handle_connection(
3300 server_conn,
3301 client_name,
3302 user_id,
3303 Some(connection_id_tx),
3304 ))
3305 .detach();
3306 Ok(client_conn)
3307 }
3308 })
3309 });
3310
3311 client
3312 .authenticate_and_connect(&cx.to_async())
3313 .await
3314 .unwrap();
3315
3316 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3317 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3318 let mut authed_user =
3319 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3320 while authed_user.next().await.unwrap().is_none() {}
3321
3322 TestClient {
3323 client,
3324 peer_id,
3325 user_store,
3326 }
3327 }
3328
3329 fn disconnect_client(&self, user_id: UserId) {
3330 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3331 let _ = kill_conn.try_send(Some(()));
3332 }
3333 }
3334
3335 fn forbid_connections(&self) {
3336 self.forbid_connections.store(true, SeqCst);
3337 }
3338
3339 fn allow_connections(&self) {
3340 self.forbid_connections.store(false, SeqCst);
3341 }
3342
3343 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3344 let mut config = Config::default();
3345 config.session_secret = "a".repeat(32);
3346 config.database_url = test_db.url.clone();
3347 let github_client = github::AppClient::test();
3348 Arc::new(AppState {
3349 db: test_db.db().clone(),
3350 handlebars: Default::default(),
3351 auth_client: auth::build_client("", ""),
3352 repo_client: github::RepoClient::test(&github_client),
3353 github_client,
3354 config,
3355 })
3356 }
3357
3358 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3359 self.server.store.read()
3360 }
3361
3362 async fn condition<F>(&mut self, mut predicate: F)
3363 where
3364 F: FnMut(&Store) -> bool,
3365 {
3366 async_std::future::timeout(Duration::from_millis(500), async {
3367 while !(predicate)(&*self.server.store.read()) {
3368 self.foreground.start_waiting();
3369 self.notifications.next().await;
3370 self.foreground.finish_waiting();
3371 }
3372 })
3373 .await
3374 .expect("condition timed out");
3375 }
3376 }
3377
3378 impl Drop for TestServer {
3379 fn drop(&mut self) {
3380 self.peer.reset();
3381 }
3382 }
3383
3384 struct TestClient {
3385 client: Arc<Client>,
3386 pub peer_id: PeerId,
3387 pub user_store: ModelHandle<UserStore>,
3388 }
3389
3390 impl Deref for TestClient {
3391 type Target = Arc<Client>;
3392
3393 fn deref(&self) -> &Self::Target {
3394 &self.client
3395 }
3396 }
3397
3398 impl TestClient {
3399 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3400 UserId::from_proto(
3401 self.user_store
3402 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3403 )
3404 }
3405 }
3406
3407 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3408 channel
3409 .messages()
3410 .cursor::<()>()
3411 .map(|m| {
3412 (
3413 m.sender.github_login.clone(),
3414 m.body.clone(),
3415 m.is_pending(),
3416 )
3417 })
3418 .collect()
3419 }
3420
3421 struct EmptyView;
3422
3423 impl gpui::Entity for EmptyView {
3424 type Event = ();
3425 }
3426
3427 impl gpui::View for EmptyView {
3428 fn ui_name() -> &'static str {
3429 "empty view"
3430 }
3431
3432 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3433 gpui::Element::boxed(gpui::elements::Empty)
3434 }
3435 }
3436}