1use super::{
2 auth::{self, PeerExt as _},
3 db::{ChannelId, UserId},
4 AppState,
5};
6use anyhow::anyhow;
7use async_std::task;
8use async_tungstenite::{
9 tungstenite::{protocol::Role, Error as WebSocketError, Message as WebSocketMessage},
10 WebSocketStream,
11};
12use futures::{future::BoxFuture, FutureExt};
13use postage::prelude::Stream as _;
14use sha1::{Digest as _, Sha1};
15use std::{
16 any::{Any, TypeId},
17 collections::{HashMap, HashSet},
18 future::Future,
19 mem,
20 sync::Arc,
21 time::Instant,
22};
23use surf::StatusCode;
24use tide::log;
25use tide::{
26 http::headers::{HeaderName, CONNECTION, UPGRADE},
27 Request, Response,
28};
29use time::OffsetDateTime;
30use zrpc::{
31 auth::random_token,
32 proto::{self, EnvelopedMessage},
33 ConnectionId, Peer, TypedEnvelope,
34};
35
36type ReplicaId = u16;
37
38type Handler = Box<
39 dyn Send
40 + Sync
41 + Fn(&mut Option<Box<dyn Any + Send + Sync>>, Arc<Server>) -> Option<BoxFuture<'static, ()>>,
42>;
43
44#[derive(Default)]
45struct ServerBuilder {
46 handlers: Vec<Handler>,
47 handler_types: HashSet<TypeId>,
48}
49
50impl ServerBuilder {
51 pub fn on_message<F, Fut, M>(&mut self, handler: F) -> &mut Self
52 where
53 F: 'static + Send + Sync + Fn(Box<TypedEnvelope<M>>, Arc<Server>) -> Fut,
54 Fut: 'static + Send + Future<Output = ()>,
55 M: EnvelopedMessage,
56 {
57 if self.handler_types.insert(TypeId::of::<M>()) {
58 panic!("registered a handler for the same message twice");
59 }
60
61 self.handlers
62 .push(Box::new(move |untyped_envelope, server| {
63 if let Some(typed_envelope) = untyped_envelope.take() {
64 match typed_envelope.downcast::<TypedEnvelope<M>>() {
65 Ok(typed_envelope) => Some((handler)(typed_envelope, server).boxed()),
66 Err(envelope) => {
67 *untyped_envelope = Some(envelope);
68 None
69 }
70 }
71 } else {
72 None
73 }
74 }));
75 self
76 }
77
78 pub fn build(self, rpc: &Arc<Peer>, state: &Arc<AppState>) -> Arc<Server> {
79 Arc::new(Server {
80 rpc: rpc.clone(),
81 state: state.clone(),
82 handlers: self.handlers,
83 })
84 }
85}
86
87pub struct Server {
88 rpc: Arc<Peer>,
89 state: Arc<AppState>,
90 handlers: Vec<Handler>,
91}
92
93impl Server {
94 pub async fn handle_connection<Conn>(
95 self: &Arc<Self>,
96 connection: Conn,
97 addr: String,
98 user_id: UserId,
99 ) where
100 Conn: 'static
101 + futures::Sink<WebSocketMessage, Error = WebSocketError>
102 + futures::Stream<Item = Result<WebSocketMessage, WebSocketError>>
103 + Send
104 + Unpin,
105 {
106 let this = self.clone();
107 let (connection_id, handle_io, mut incoming_rx) = this.rpc.add_connection(connection).await;
108 this.state
109 .rpc
110 .write()
111 .await
112 .add_connection(connection_id, user_id);
113
114 let handle_io = handle_io.fuse();
115 futures::pin_mut!(handle_io);
116 loop {
117 let next_message = incoming_rx.recv().fuse();
118 futures::pin_mut!(next_message);
119 futures::select_biased! {
120 message = next_message => {
121 if let Some(message) = message {
122 let mut message = Some(message);
123 for handler in &this.handlers {
124 if let Some(future) = (handler)(&mut message, this.clone()) {
125 future.await;
126 break;
127 }
128 }
129
130 if let Some(message) = message {
131 log::warn!("unhandled message: {:?}", message);
132 }
133 } else {
134 log::info!("rpc connection closed {:?}", addr);
135 break;
136 }
137 }
138 handle_io = handle_io => {
139 if let Err(err) = handle_io {
140 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
141 }
142 break;
143 }
144 }
145 }
146
147 if let Err(err) = this.rpc.sign_out(connection_id, &this.state).await {
148 log::error!("error signing out connection {:?} - {:?}", addr, err);
149 }
150 }
151}
152
153#[derive(Default)]
154pub struct State {
155 connections: HashMap<ConnectionId, Connection>,
156 pub worktrees: HashMap<u64, Worktree>,
157 channels: HashMap<ChannelId, Channel>,
158 next_worktree_id: u64,
159}
160
161struct Connection {
162 user_id: UserId,
163 worktrees: HashSet<u64>,
164 channels: HashSet<ChannelId>,
165}
166
167pub struct Worktree {
168 host_connection_id: Option<ConnectionId>,
169 guest_connection_ids: HashMap<ConnectionId, ReplicaId>,
170 active_replica_ids: HashSet<ReplicaId>,
171 access_token: String,
172 root_name: String,
173 entries: HashMap<u64, proto::Entry>,
174}
175
176#[derive(Default)]
177struct Channel {
178 connection_ids: HashSet<ConnectionId>,
179}
180
181impl Worktree {
182 pub fn connection_ids(&self) -> Vec<ConnectionId> {
183 self.guest_connection_ids
184 .keys()
185 .copied()
186 .chain(self.host_connection_id)
187 .collect()
188 }
189
190 fn host_connection_id(&self) -> tide::Result<ConnectionId> {
191 Ok(self
192 .host_connection_id
193 .ok_or_else(|| anyhow!("host disconnected from worktree"))?)
194 }
195}
196
197impl Channel {
198 fn connection_ids(&self) -> Vec<ConnectionId> {
199 self.connection_ids.iter().copied().collect()
200 }
201}
202
203impl State {
204 // Add a new connection associated with a given user.
205 pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) {
206 self.connections.insert(
207 connection_id,
208 Connection {
209 user_id,
210 worktrees: Default::default(),
211 channels: Default::default(),
212 },
213 );
214 }
215
216 // Remove the given connection and its association with any worktrees.
217 pub fn remove_connection(&mut self, connection_id: ConnectionId) -> Vec<u64> {
218 let mut worktree_ids = Vec::new();
219 if let Some(connection) = self.connections.remove(&connection_id) {
220 for channel_id in connection.channels {
221 if let Some(channel) = self.channels.get_mut(&channel_id) {
222 channel.connection_ids.remove(&connection_id);
223 }
224 }
225 for worktree_id in connection.worktrees {
226 if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
227 if worktree.host_connection_id == Some(connection_id) {
228 worktree_ids.push(worktree_id);
229 } else if let Some(replica_id) =
230 worktree.guest_connection_ids.remove(&connection_id)
231 {
232 worktree.active_replica_ids.remove(&replica_id);
233 worktree_ids.push(worktree_id);
234 }
235 }
236 }
237 }
238 worktree_ids
239 }
240
241 fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
242 if let Some(connection) = self.connections.get_mut(&connection_id) {
243 connection.channels.insert(channel_id);
244 self.channels
245 .entry(channel_id)
246 .or_default()
247 .connection_ids
248 .insert(connection_id);
249 }
250 }
251
252 // Add the given connection as a guest of the given worktree
253 pub fn join_worktree(
254 &mut self,
255 connection_id: ConnectionId,
256 worktree_id: u64,
257 access_token: &str,
258 ) -> Option<(ReplicaId, &Worktree)> {
259 if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
260 if access_token == worktree.access_token {
261 if let Some(connection) = self.connections.get_mut(&connection_id) {
262 connection.worktrees.insert(worktree_id);
263 }
264
265 let mut replica_id = 1;
266 while worktree.active_replica_ids.contains(&replica_id) {
267 replica_id += 1;
268 }
269 worktree.active_replica_ids.insert(replica_id);
270 worktree
271 .guest_connection_ids
272 .insert(connection_id, replica_id);
273 Some((replica_id, worktree))
274 } else {
275 None
276 }
277 } else {
278 None
279 }
280 }
281
282 fn user_id_for_connection(&self, connection_id: ConnectionId) -> tide::Result<UserId> {
283 Ok(self
284 .connections
285 .get(&connection_id)
286 .ok_or_else(|| anyhow!("unknown connection"))?
287 .user_id)
288 }
289
290 fn read_worktree(
291 &self,
292 worktree_id: u64,
293 connection_id: ConnectionId,
294 ) -> tide::Result<&Worktree> {
295 let worktree = self
296 .worktrees
297 .get(&worktree_id)
298 .ok_or_else(|| anyhow!("worktree not found"))?;
299
300 if worktree.host_connection_id == Some(connection_id)
301 || worktree.guest_connection_ids.contains_key(&connection_id)
302 {
303 Ok(worktree)
304 } else {
305 Err(anyhow!(
306 "{} is not a member of worktree {}",
307 connection_id,
308 worktree_id
309 ))?
310 }
311 }
312
313 fn write_worktree(
314 &mut self,
315 worktree_id: u64,
316 connection_id: ConnectionId,
317 ) -> tide::Result<&mut Worktree> {
318 let worktree = self
319 .worktrees
320 .get_mut(&worktree_id)
321 .ok_or_else(|| anyhow!("worktree not found"))?;
322
323 if worktree.host_connection_id == Some(connection_id)
324 || worktree.guest_connection_ids.contains_key(&connection_id)
325 {
326 Ok(worktree)
327 } else {
328 Err(anyhow!(
329 "{} is not a member of worktree {}",
330 connection_id,
331 worktree_id
332 ))?
333 }
334 }
335}
336
337pub fn build_server(state: &Arc<AppState>, rpc: &Arc<Peer>) -> Arc<Server> {
338 ServerBuilder::default()
339 // .on_message(share_worktree)
340 // .on_message(join_worktree)
341 // .on_message(update_worktree)
342 // .on_message(close_worktree)
343 // .on_message(open_buffer)
344 // .on_message(close_buffer)
345 // .on_message(update_buffer)
346 // .on_message(buffer_saved)
347 // .on_message(save_buffer)
348 // .on_message(get_channels)
349 // .on_message(get_users)
350 // .on_message(join_channel)
351 // .on_message(send_channel_message)
352 .build(rpc, state)
353}
354
355pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
356 let server = build_server(app.state(), rpc);
357
358 let rpc = rpc.clone();
359 app.at("/rpc").with(auth::VerifyToken).get(move |request: Request<Arc<AppState>>| {
360 let user_id = request.ext::<UserId>().copied();
361 let server = server.clone();
362 async move {
363 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
364
365 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
366 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
367 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
368
369 if !upgrade_requested {
370 return Ok(Response::new(StatusCode::UpgradeRequired));
371 }
372
373 let header = match request.header("Sec-Websocket-Key") {
374 Some(h) => h.as_str(),
375 None => return Err(anyhow!("expected sec-websocket-key"))?,
376 };
377
378 let mut response = Response::new(StatusCode::SwitchingProtocols);
379 response.insert_header(UPGRADE, "websocket");
380 response.insert_header(CONNECTION, "Upgrade");
381 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
382 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
383 response.insert_header("Sec-Websocket-Version", "13");
384
385 let http_res: &mut tide::http::Response = response.as_mut();
386 let upgrade_receiver = http_res.recv_upgrade().await;
387 let addr = request.remote().unwrap_or("unknown").to_string();
388 let user_id = user_id.ok_or_else(|| anyhow!("user_id is not present on request. ensure auth::VerifyToken middleware is present"))?;
389 task::spawn(async move {
390 if let Some(stream) = upgrade_receiver.await {
391 let stream = WebSocketStream::from_raw_socket(stream, Role::Server, None).await;
392 server.handle_connection(stream, addr, user_id).await;
393 }
394 });
395
396 Ok(response)
397 }
398 });
399}
400
401async fn share_worktree(
402 mut request: TypedEnvelope<proto::ShareWorktree>,
403 rpc: &Arc<Peer>,
404 state: &Arc<AppState>,
405) -> tide::Result<()> {
406 let mut state = state.rpc.write().await;
407 let worktree_id = state.next_worktree_id;
408 state.next_worktree_id += 1;
409 let access_token = random_token();
410 let worktree = request
411 .payload
412 .worktree
413 .as_mut()
414 .ok_or_else(|| anyhow!("missing worktree"))?;
415 let entries = mem::take(&mut worktree.entries)
416 .into_iter()
417 .map(|entry| (entry.id, entry))
418 .collect();
419 state.worktrees.insert(
420 worktree_id,
421 Worktree {
422 host_connection_id: Some(request.sender_id),
423 guest_connection_ids: Default::default(),
424 active_replica_ids: Default::default(),
425 access_token: access_token.clone(),
426 root_name: mem::take(&mut worktree.root_name),
427 entries,
428 },
429 );
430
431 rpc.respond(
432 request.receipt(),
433 proto::ShareWorktreeResponse {
434 worktree_id,
435 access_token,
436 },
437 )
438 .await?;
439 Ok(())
440}
441
442async fn join_worktree(
443 request: TypedEnvelope<proto::OpenWorktree>,
444 rpc: &Arc<Peer>,
445 state: &Arc<AppState>,
446) -> tide::Result<()> {
447 let worktree_id = request.payload.worktree_id;
448 let access_token = &request.payload.access_token;
449
450 let mut state = state.rpc.write().await;
451 if let Some((peer_replica_id, worktree)) =
452 state.join_worktree(request.sender_id, worktree_id, access_token)
453 {
454 let mut peers = Vec::new();
455 if let Some(host_connection_id) = worktree.host_connection_id {
456 peers.push(proto::Peer {
457 peer_id: host_connection_id.0,
458 replica_id: 0,
459 });
460 }
461 for (peer_conn_id, peer_replica_id) in &worktree.guest_connection_ids {
462 if *peer_conn_id != request.sender_id {
463 peers.push(proto::Peer {
464 peer_id: peer_conn_id.0,
465 replica_id: *peer_replica_id as u32,
466 });
467 }
468 }
469
470 broadcast(request.sender_id, worktree.connection_ids(), |conn_id| {
471 rpc.send(
472 conn_id,
473 proto::AddPeer {
474 worktree_id,
475 peer: Some(proto::Peer {
476 peer_id: request.sender_id.0,
477 replica_id: peer_replica_id as u32,
478 }),
479 },
480 )
481 })
482 .await?;
483 rpc.respond(
484 request.receipt(),
485 proto::OpenWorktreeResponse {
486 worktree_id,
487 worktree: Some(proto::Worktree {
488 root_name: worktree.root_name.clone(),
489 entries: worktree.entries.values().cloned().collect(),
490 }),
491 replica_id: peer_replica_id as u32,
492 peers,
493 },
494 )
495 .await?;
496 } else {
497 rpc.respond(
498 request.receipt(),
499 proto::OpenWorktreeResponse {
500 worktree_id,
501 worktree: None,
502 replica_id: 0,
503 peers: Vec::new(),
504 },
505 )
506 .await?;
507 }
508
509 Ok(())
510}
511
512async fn update_worktree(
513 request: TypedEnvelope<proto::UpdateWorktree>,
514 rpc: &Arc<Peer>,
515 state: &Arc<AppState>,
516) -> tide::Result<()> {
517 {
518 let mut state = state.rpc.write().await;
519 let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
520 for entry_id in &request.payload.removed_entries {
521 worktree.entries.remove(&entry_id);
522 }
523
524 for entry in &request.payload.updated_entries {
525 worktree.entries.insert(entry.id, entry.clone());
526 }
527 }
528
529 broadcast_in_worktree(request.payload.worktree_id, request, rpc, state).await?;
530 Ok(())
531}
532
533async fn close_worktree(
534 request: TypedEnvelope<proto::CloseWorktree>,
535 rpc: &Arc<Peer>,
536 state: &Arc<AppState>,
537) -> tide::Result<()> {
538 let connection_ids;
539 {
540 let mut state = state.rpc.write().await;
541 let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
542 connection_ids = worktree.connection_ids();
543 if worktree.host_connection_id == Some(request.sender_id) {
544 worktree.host_connection_id = None;
545 } else if let Some(replica_id) = worktree.guest_connection_ids.remove(&request.sender_id) {
546 worktree.active_replica_ids.remove(&replica_id);
547 }
548 }
549
550 broadcast(request.sender_id, connection_ids, |conn_id| {
551 rpc.send(
552 conn_id,
553 proto::RemovePeer {
554 worktree_id: request.payload.worktree_id,
555 peer_id: request.sender_id.0,
556 },
557 )
558 })
559 .await?;
560
561 Ok(())
562}
563
564async fn open_buffer(
565 request: TypedEnvelope<proto::OpenBuffer>,
566 rpc: &Arc<Peer>,
567 state: &Arc<AppState>,
568) -> tide::Result<()> {
569 let receipt = request.receipt();
570 let worktree_id = request.payload.worktree_id;
571 let host_connection_id = state
572 .rpc
573 .read()
574 .await
575 .read_worktree(worktree_id, request.sender_id)?
576 .host_connection_id()?;
577
578 let response = rpc
579 .forward_request(request.sender_id, host_connection_id, request.payload)
580 .await?;
581 rpc.respond(receipt, response).await?;
582 Ok(())
583}
584
585async fn close_buffer(
586 request: TypedEnvelope<proto::CloseBuffer>,
587 rpc: &Arc<Peer>,
588 state: &Arc<AppState>,
589) -> tide::Result<()> {
590 let host_connection_id = state
591 .rpc
592 .read()
593 .await
594 .read_worktree(request.payload.worktree_id, request.sender_id)?
595 .host_connection_id()?;
596
597 rpc.forward_send(request.sender_id, host_connection_id, request.payload)
598 .await?;
599
600 Ok(())
601}
602
603async fn save_buffer(
604 request: TypedEnvelope<proto::SaveBuffer>,
605 rpc: &Arc<Peer>,
606 state: &Arc<AppState>,
607) -> tide::Result<()> {
608 let host;
609 let guests;
610 {
611 let state = state.rpc.read().await;
612 let worktree = state.read_worktree(request.payload.worktree_id, request.sender_id)?;
613 host = worktree.host_connection_id()?;
614 guests = worktree
615 .guest_connection_ids
616 .keys()
617 .copied()
618 .collect::<Vec<_>>();
619 }
620
621 let sender = request.sender_id;
622 let receipt = request.receipt();
623 let response = rpc
624 .forward_request(sender, host, request.payload.clone())
625 .await?;
626
627 broadcast(host, guests, |conn_id| {
628 let response = response.clone();
629 async move {
630 if conn_id == sender {
631 rpc.respond(receipt, response).await
632 } else {
633 rpc.forward_send(host, conn_id, response).await
634 }
635 }
636 })
637 .await?;
638
639 Ok(())
640}
641
642async fn update_buffer(
643 request: TypedEnvelope<proto::UpdateBuffer>,
644 rpc: &Arc<Peer>,
645 state: &Arc<AppState>,
646) -> tide::Result<()> {
647 broadcast_in_worktree(request.payload.worktree_id, request, rpc, state).await
648}
649
650async fn buffer_saved(
651 request: TypedEnvelope<proto::BufferSaved>,
652 rpc: &Arc<Peer>,
653 state: &Arc<AppState>,
654) -> tide::Result<()> {
655 broadcast_in_worktree(request.payload.worktree_id, request, rpc, state).await
656}
657
658async fn get_channels(
659 request: TypedEnvelope<proto::GetChannels>,
660 rpc: &Arc<Peer>,
661 state: &Arc<AppState>,
662) -> tide::Result<()> {
663 let user_id = state
664 .rpc
665 .read()
666 .await
667 .user_id_for_connection(request.sender_id)?;
668 let channels = state.db.get_channels_for_user(user_id).await?;
669 rpc.respond(
670 request.receipt(),
671 proto::GetChannelsResponse {
672 channels: channels
673 .into_iter()
674 .map(|chan| proto::Channel {
675 id: chan.id.to_proto(),
676 name: chan.name,
677 })
678 .collect(),
679 },
680 )
681 .await?;
682 Ok(())
683}
684
685async fn get_users(
686 request: TypedEnvelope<proto::GetUsers>,
687 rpc: &Arc<Peer>,
688 state: &Arc<AppState>,
689) -> tide::Result<()> {
690 let user_id = state
691 .rpc
692 .read()
693 .await
694 .user_id_for_connection(request.sender_id)?;
695 let receipt = request.receipt();
696 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
697 let users = state
698 .db
699 .get_users_by_ids(user_id, user_ids)
700 .await?
701 .into_iter()
702 .map(|user| proto::User {
703 id: user.id.to_proto(),
704 github_login: user.github_login,
705 avatar_url: String::new(),
706 })
707 .collect();
708 rpc.respond(receipt, proto::GetUsersResponse { users })
709 .await?;
710 Ok(())
711}
712
713async fn join_channel(
714 request: TypedEnvelope<proto::JoinChannel>,
715 rpc: &Arc<Peer>,
716 state: &Arc<AppState>,
717) -> tide::Result<()> {
718 let user_id = state
719 .rpc
720 .read()
721 .await
722 .user_id_for_connection(request.sender_id)?;
723 let channel_id = ChannelId::from_proto(request.payload.channel_id);
724 if !state
725 .db
726 .can_user_access_channel(user_id, channel_id)
727 .await?
728 {
729 Err(anyhow!("access denied"))?;
730 }
731
732 state
733 .rpc
734 .write()
735 .await
736 .join_channel(request.sender_id, channel_id);
737 let messages = state
738 .db
739 .get_recent_channel_messages(channel_id, 50)
740 .await?
741 .into_iter()
742 .map(|msg| proto::ChannelMessage {
743 id: msg.id.to_proto(),
744 body: msg.body,
745 timestamp: msg.sent_at.unix_timestamp() as u64,
746 sender_id: msg.sender_id.to_proto(),
747 })
748 .collect();
749 rpc.respond(request.receipt(), proto::JoinChannelResponse { messages })
750 .await?;
751 Ok(())
752}
753
754async fn send_channel_message(
755 request: TypedEnvelope<proto::SendChannelMessage>,
756 peer: &Arc<Peer>,
757 app: &Arc<AppState>,
758) -> tide::Result<()> {
759 let channel_id = ChannelId::from_proto(request.payload.channel_id);
760 let user_id;
761 let connection_ids;
762 {
763 let state = app.rpc.read().await;
764 user_id = state.user_id_for_connection(request.sender_id)?;
765 if let Some(channel) = state.channels.get(&channel_id) {
766 connection_ids = channel.connection_ids();
767 } else {
768 return Ok(());
769 }
770 }
771
772 let timestamp = OffsetDateTime::now_utc();
773 let message_id = app
774 .db
775 .create_channel_message(channel_id, user_id, &request.payload.body, timestamp)
776 .await?;
777 let message = proto::ChannelMessageSent {
778 channel_id: channel_id.to_proto(),
779 message: Some(proto::ChannelMessage {
780 sender_id: user_id.to_proto(),
781 id: message_id.to_proto(),
782 body: request.payload.body,
783 timestamp: timestamp.unix_timestamp() as u64,
784 }),
785 };
786 broadcast(request.sender_id, connection_ids, |conn_id| {
787 peer.send(conn_id, message.clone())
788 })
789 .await?;
790
791 Ok(())
792}
793
794async fn broadcast_in_worktree<T: proto::EnvelopedMessage>(
795 worktree_id: u64,
796 request: TypedEnvelope<T>,
797 rpc: &Arc<Peer>,
798 state: &Arc<AppState>,
799) -> tide::Result<()> {
800 let connection_ids = state
801 .rpc
802 .read()
803 .await
804 .read_worktree(worktree_id, request.sender_id)?
805 .connection_ids();
806
807 broadcast(request.sender_id, connection_ids, |conn_id| {
808 rpc.forward_send(request.sender_id, conn_id, request.payload.clone())
809 })
810 .await?;
811
812 Ok(())
813}
814
815pub async fn broadcast<F, T>(
816 sender_id: ConnectionId,
817 receiver_ids: Vec<ConnectionId>,
818 mut f: F,
819) -> anyhow::Result<()>
820where
821 F: FnMut(ConnectionId) -> T,
822 T: Future<Output = anyhow::Result<()>>,
823{
824 let futures = receiver_ids
825 .into_iter()
826 .filter(|id| *id != sender_id)
827 .map(|id| f(id));
828 futures::future::try_join_all(futures).await?;
829 Ok(())
830}
831
832fn header_contains_ignore_case<T>(
833 request: &tide::Request<T>,
834 header_name: HeaderName,
835 value: &str,
836) -> bool {
837 request
838 .header(header_name)
839 .map(|h| {
840 h.as_str()
841 .split(',')
842 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
843 })
844 .unwrap_or(false)
845}