1use super::{
2 auth::{self, PeerExt as _},
3 db::{ChannelId, UserId},
4 AppState,
5};
6use anyhow::anyhow;
7use async_std::task;
8use async_tungstenite::{
9 tungstenite::{protocol::Role, Error as WebSocketError, Message as WebSocketMessage},
10 WebSocketStream,
11};
12use futures::{future::BoxFuture, FutureExt};
13use postage::prelude::Stream as _;
14use sha1::{Digest as _, Sha1};
15use std::{
16 any::{Any, TypeId},
17 collections::{HashMap, HashSet},
18 future::Future,
19 mem,
20 sync::Arc,
21 time::Instant,
22};
23use surf::StatusCode;
24use tide::log;
25use tide::{
26 http::headers::{HeaderName, CONNECTION, UPGRADE},
27 Request, Response,
28};
29use time::OffsetDateTime;
30use zrpc::{
31 auth::random_token,
32 proto::{self, AnyTypedEnvelope, EnvelopedMessage},
33 ConnectionId, Peer, TypedEnvelope,
34};
35
36type ReplicaId = u16;
37
38type MessageHandler = Box<
39 dyn Send
40 + Sync
41 + Fn(Box<dyn AnyTypedEnvelope>, Arc<Server>) -> BoxFuture<'static, tide::Result<()>>,
42>;
43
44#[derive(Default)]
45struct ServerBuilder {
46 handlers: HashMap<TypeId, MessageHandler>,
47}
48
49impl ServerBuilder {
50 pub fn on_message<F, Fut, M>(mut self, handler: F) -> Self
51 where
52 F: 'static + Send + Sync + Fn(Box<TypedEnvelope<M>>, Arc<Server>) -> Fut,
53 Fut: 'static + Send + Future<Output = tide::Result<()>>,
54 M: EnvelopedMessage,
55 {
56 let prev_handler = self.handlers.insert(
57 TypeId::of::<M>(),
58 Box::new(move |envelope, server| {
59 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
60 (handler)(envelope, server).boxed()
61 }),
62 );
63 if prev_handler.is_some() {
64 panic!("registered a handler for the same message twice");
65 }
66
67 self
68 }
69
70 pub fn build(self, rpc: &Arc<Peer>, state: &Arc<AppState>) -> Arc<Server> {
71 Arc::new(Server {
72 rpc: rpc.clone(),
73 state: state.clone(),
74 handlers: self.handlers,
75 })
76 }
77}
78
79pub struct Server {
80 rpc: Arc<Peer>,
81 state: Arc<AppState>,
82 handlers: HashMap<TypeId, MessageHandler>,
83}
84
85impl Server {
86 pub fn handle_connection<Conn>(
87 self: &Arc<Self>,
88 connection: Conn,
89 addr: String,
90 user_id: UserId,
91 ) -> impl Future<Output = ()>
92 where
93 Conn: 'static
94 + futures::Sink<WebSocketMessage, Error = WebSocketError>
95 + futures::Stream<Item = Result<WebSocketMessage, WebSocketError>>
96 + Send
97 + Unpin,
98 {
99 let this = self.clone();
100 async move {
101 let (connection_id, handle_io, mut incoming_rx) =
102 this.rpc.add_connection(connection).await;
103 this.state
104 .rpc
105 .write()
106 .await
107 .add_connection(connection_id, user_id);
108
109 let handle_io = handle_io.fuse();
110 futures::pin_mut!(handle_io);
111 loop {
112 let next_message = incoming_rx.recv().fuse();
113 futures::pin_mut!(next_message);
114 futures::select_biased! {
115 message = next_message => {
116 if let Some(message) = message {
117 let start_time = Instant::now();
118 log::info!("RPC message received: {}", message.payload_type_name());
119 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
120 if let Err(err) = (handler)(message, this.clone()).await {
121 log::error!("error handling message: {:?}", err);
122 } else {
123 log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
124 }
125 } else {
126 log::warn!("unhandled message: {}", message.payload_type_name());
127 }
128 } else {
129 log::info!("rpc connection closed {:?}", addr);
130 break;
131 }
132 }
133 handle_io = handle_io => {
134 if let Err(err) = handle_io {
135 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
136 }
137 break;
138 }
139 }
140 }
141
142 if let Err(err) = this.rpc.sign_out(connection_id, &this.state).await {
143 log::error!("error signing out connection {:?} - {:?}", addr, err);
144 }
145 }
146 }
147}
148
149#[derive(Default)]
150pub struct State {
151 connections: HashMap<ConnectionId, Connection>,
152 pub worktrees: HashMap<u64, Worktree>,
153 channels: HashMap<ChannelId, Channel>,
154 next_worktree_id: u64,
155}
156
157struct Connection {
158 user_id: UserId,
159 worktrees: HashSet<u64>,
160 channels: HashSet<ChannelId>,
161}
162
163pub struct Worktree {
164 host_connection_id: Option<ConnectionId>,
165 guest_connection_ids: HashMap<ConnectionId, ReplicaId>,
166 active_replica_ids: HashSet<ReplicaId>,
167 access_token: String,
168 root_name: String,
169 entries: HashMap<u64, proto::Entry>,
170}
171
172#[derive(Default)]
173struct Channel {
174 connection_ids: HashSet<ConnectionId>,
175}
176
177impl Worktree {
178 pub fn connection_ids(&self) -> Vec<ConnectionId> {
179 self.guest_connection_ids
180 .keys()
181 .copied()
182 .chain(self.host_connection_id)
183 .collect()
184 }
185
186 fn host_connection_id(&self) -> tide::Result<ConnectionId> {
187 Ok(self
188 .host_connection_id
189 .ok_or_else(|| anyhow!("host disconnected from worktree"))?)
190 }
191}
192
193impl Channel {
194 fn connection_ids(&self) -> Vec<ConnectionId> {
195 self.connection_ids.iter().copied().collect()
196 }
197}
198
199impl State {
200 // Add a new connection associated with a given user.
201 pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) {
202 self.connections.insert(
203 connection_id,
204 Connection {
205 user_id,
206 worktrees: Default::default(),
207 channels: Default::default(),
208 },
209 );
210 }
211
212 // Remove the given connection and its association with any worktrees.
213 pub fn remove_connection(&mut self, connection_id: ConnectionId) -> Vec<u64> {
214 let mut worktree_ids = Vec::new();
215 if let Some(connection) = self.connections.remove(&connection_id) {
216 for channel_id in connection.channels {
217 if let Some(channel) = self.channels.get_mut(&channel_id) {
218 channel.connection_ids.remove(&connection_id);
219 }
220 }
221 for worktree_id in connection.worktrees {
222 if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
223 if worktree.host_connection_id == Some(connection_id) {
224 worktree_ids.push(worktree_id);
225 } else if let Some(replica_id) =
226 worktree.guest_connection_ids.remove(&connection_id)
227 {
228 worktree.active_replica_ids.remove(&replica_id);
229 worktree_ids.push(worktree_id);
230 }
231 }
232 }
233 }
234 worktree_ids
235 }
236
237 fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
238 if let Some(connection) = self.connections.get_mut(&connection_id) {
239 connection.channels.insert(channel_id);
240 self.channels
241 .entry(channel_id)
242 .or_default()
243 .connection_ids
244 .insert(connection_id);
245 }
246 }
247
248 // Add the given connection as a guest of the given worktree
249 pub fn join_worktree(
250 &mut self,
251 connection_id: ConnectionId,
252 worktree_id: u64,
253 access_token: &str,
254 ) -> Option<(ReplicaId, &Worktree)> {
255 if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
256 if access_token == worktree.access_token {
257 if let Some(connection) = self.connections.get_mut(&connection_id) {
258 connection.worktrees.insert(worktree_id);
259 }
260
261 let mut replica_id = 1;
262 while worktree.active_replica_ids.contains(&replica_id) {
263 replica_id += 1;
264 }
265 worktree.active_replica_ids.insert(replica_id);
266 worktree
267 .guest_connection_ids
268 .insert(connection_id, replica_id);
269 Some((replica_id, worktree))
270 } else {
271 None
272 }
273 } else {
274 None
275 }
276 }
277
278 fn user_id_for_connection(&self, connection_id: ConnectionId) -> tide::Result<UserId> {
279 Ok(self
280 .connections
281 .get(&connection_id)
282 .ok_or_else(|| anyhow!("unknown connection"))?
283 .user_id)
284 }
285
286 fn read_worktree(
287 &self,
288 worktree_id: u64,
289 connection_id: ConnectionId,
290 ) -> tide::Result<&Worktree> {
291 let worktree = self
292 .worktrees
293 .get(&worktree_id)
294 .ok_or_else(|| anyhow!("worktree not found"))?;
295
296 if worktree.host_connection_id == Some(connection_id)
297 || worktree.guest_connection_ids.contains_key(&connection_id)
298 {
299 Ok(worktree)
300 } else {
301 Err(anyhow!(
302 "{} is not a member of worktree {}",
303 connection_id,
304 worktree_id
305 ))?
306 }
307 }
308
309 fn write_worktree(
310 &mut self,
311 worktree_id: u64,
312 connection_id: ConnectionId,
313 ) -> tide::Result<&mut Worktree> {
314 let worktree = self
315 .worktrees
316 .get_mut(&worktree_id)
317 .ok_or_else(|| anyhow!("worktree not found"))?;
318
319 if worktree.host_connection_id == Some(connection_id)
320 || worktree.guest_connection_ids.contains_key(&connection_id)
321 {
322 Ok(worktree)
323 } else {
324 Err(anyhow!(
325 "{} is not a member of worktree {}",
326 connection_id,
327 worktree_id
328 ))?
329 }
330 }
331}
332
333pub fn build_server(state: &Arc<AppState>, rpc: &Arc<Peer>) -> Arc<Server> {
334 ServerBuilder::default()
335 .on_message(share_worktree)
336 .on_message(join_worktree)
337 .on_message(update_worktree)
338 .on_message(close_worktree)
339 .on_message(open_buffer)
340 .on_message(close_buffer)
341 .on_message(update_buffer)
342 .on_message(buffer_saved)
343 .on_message(save_buffer)
344 .on_message(get_channels)
345 .on_message(get_users)
346 .on_message(join_channel)
347 .on_message(send_channel_message)
348 .build(rpc, state)
349}
350
351pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
352 let server = build_server(app.state(), rpc);
353 app.at("/rpc").with(auth::VerifyToken).get(move |request: Request<Arc<AppState>>| {
354 let user_id = request.ext::<UserId>().copied();
355 let server = server.clone();
356 async move {
357 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
358
359 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
360 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
361 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
362
363 if !upgrade_requested {
364 return Ok(Response::new(StatusCode::UpgradeRequired));
365 }
366
367 let header = match request.header("Sec-Websocket-Key") {
368 Some(h) => h.as_str(),
369 None => return Err(anyhow!("expected sec-websocket-key"))?,
370 };
371
372 let mut response = Response::new(StatusCode::SwitchingProtocols);
373 response.insert_header(UPGRADE, "websocket");
374 response.insert_header(CONNECTION, "Upgrade");
375 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
376 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
377 response.insert_header("Sec-Websocket-Version", "13");
378
379 let http_res: &mut tide::http::Response = response.as_mut();
380 let upgrade_receiver = http_res.recv_upgrade().await;
381 let addr = request.remote().unwrap_or("unknown").to_string();
382 let user_id = user_id.ok_or_else(|| anyhow!("user_id is not present on request. ensure auth::VerifyToken middleware is present"))?;
383 task::spawn(async move {
384 if let Some(stream) = upgrade_receiver.await {
385 let stream = WebSocketStream::from_raw_socket(stream, Role::Server, None).await;
386 server.handle_connection(stream, addr, user_id).await;
387 }
388 });
389
390 Ok(response)
391 }
392 });
393}
394
395async fn share_worktree(
396 mut request: Box<TypedEnvelope<proto::ShareWorktree>>,
397 server: Arc<Server>,
398) -> tide::Result<()> {
399 let mut state = server.state.rpc.write().await;
400 let worktree_id = state.next_worktree_id;
401 state.next_worktree_id += 1;
402 let access_token = random_token();
403 let worktree = request
404 .payload
405 .worktree
406 .as_mut()
407 .ok_or_else(|| anyhow!("missing worktree"))?;
408 let entries = mem::take(&mut worktree.entries)
409 .into_iter()
410 .map(|entry| (entry.id, entry))
411 .collect();
412 state.worktrees.insert(
413 worktree_id,
414 Worktree {
415 host_connection_id: Some(request.sender_id),
416 guest_connection_ids: Default::default(),
417 active_replica_ids: Default::default(),
418 access_token: access_token.clone(),
419 root_name: mem::take(&mut worktree.root_name),
420 entries,
421 },
422 );
423
424 server
425 .rpc
426 .respond(
427 request.receipt(),
428 proto::ShareWorktreeResponse {
429 worktree_id,
430 access_token,
431 },
432 )
433 .await?;
434 Ok(())
435}
436
437async fn join_worktree(
438 request: Box<TypedEnvelope<proto::OpenWorktree>>,
439 server: Arc<Server>,
440) -> tide::Result<()> {
441 let worktree_id = request.payload.worktree_id;
442 let access_token = &request.payload.access_token;
443
444 let mut state = server.state.rpc.write().await;
445 if let Some((peer_replica_id, worktree)) =
446 state.join_worktree(request.sender_id, worktree_id, access_token)
447 {
448 let mut peers = Vec::new();
449 if let Some(host_connection_id) = worktree.host_connection_id {
450 peers.push(proto::Peer {
451 peer_id: host_connection_id.0,
452 replica_id: 0,
453 });
454 }
455 for (peer_conn_id, peer_replica_id) in &worktree.guest_connection_ids {
456 if *peer_conn_id != request.sender_id {
457 peers.push(proto::Peer {
458 peer_id: peer_conn_id.0,
459 replica_id: *peer_replica_id as u32,
460 });
461 }
462 }
463
464 broadcast(request.sender_id, worktree.connection_ids(), |conn_id| {
465 server.rpc.send(
466 conn_id,
467 proto::AddPeer {
468 worktree_id,
469 peer: Some(proto::Peer {
470 peer_id: request.sender_id.0,
471 replica_id: peer_replica_id as u32,
472 }),
473 },
474 )
475 })
476 .await?;
477 server
478 .rpc
479 .respond(
480 request.receipt(),
481 proto::OpenWorktreeResponse {
482 worktree_id,
483 worktree: Some(proto::Worktree {
484 root_name: worktree.root_name.clone(),
485 entries: worktree.entries.values().cloned().collect(),
486 }),
487 replica_id: peer_replica_id as u32,
488 peers,
489 },
490 )
491 .await?;
492 } else {
493 server
494 .rpc
495 .respond(
496 request.receipt(),
497 proto::OpenWorktreeResponse {
498 worktree_id,
499 worktree: None,
500 replica_id: 0,
501 peers: Vec::new(),
502 },
503 )
504 .await?;
505 }
506
507 Ok(())
508}
509
510async fn update_worktree(
511 request: Box<TypedEnvelope<proto::UpdateWorktree>>,
512 server: Arc<Server>,
513) -> tide::Result<()> {
514 {
515 let mut state = server.state.rpc.write().await;
516 let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
517 for entry_id in &request.payload.removed_entries {
518 worktree.entries.remove(&entry_id);
519 }
520
521 for entry in &request.payload.updated_entries {
522 worktree.entries.insert(entry.id, entry.clone());
523 }
524 }
525
526 broadcast_in_worktree(request.payload.worktree_id, &request, &server).await?;
527 Ok(())
528}
529
530async fn close_worktree(
531 request: Box<TypedEnvelope<proto::CloseWorktree>>,
532 server: Arc<Server>,
533) -> tide::Result<()> {
534 let connection_ids;
535 {
536 let mut state = server.state.rpc.write().await;
537 let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
538 connection_ids = worktree.connection_ids();
539 if worktree.host_connection_id == Some(request.sender_id) {
540 worktree.host_connection_id = None;
541 } else if let Some(replica_id) = worktree.guest_connection_ids.remove(&request.sender_id) {
542 worktree.active_replica_ids.remove(&replica_id);
543 }
544 }
545
546 broadcast(request.sender_id, connection_ids, |conn_id| {
547 server.rpc.send(
548 conn_id,
549 proto::RemovePeer {
550 worktree_id: request.payload.worktree_id,
551 peer_id: request.sender_id.0,
552 },
553 )
554 })
555 .await?;
556
557 Ok(())
558}
559
560async fn open_buffer(
561 request: Box<TypedEnvelope<proto::OpenBuffer>>,
562 server: Arc<Server>,
563) -> tide::Result<()> {
564 let receipt = request.receipt();
565 let worktree_id = request.payload.worktree_id;
566 let host_connection_id = server
567 .state
568 .rpc
569 .read()
570 .await
571 .read_worktree(worktree_id, request.sender_id)?
572 .host_connection_id()?;
573
574 let response = server
575 .rpc
576 .forward_request(request.sender_id, host_connection_id, request.payload)
577 .await?;
578 server.rpc.respond(receipt, response).await?;
579 Ok(())
580}
581
582async fn close_buffer(
583 request: Box<TypedEnvelope<proto::CloseBuffer>>,
584 server: Arc<Server>,
585) -> tide::Result<()> {
586 let host_connection_id = server
587 .state
588 .rpc
589 .read()
590 .await
591 .read_worktree(request.payload.worktree_id, request.sender_id)?
592 .host_connection_id()?;
593
594 server
595 .rpc
596 .forward_send(request.sender_id, host_connection_id, request.payload)
597 .await?;
598
599 Ok(())
600}
601
602async fn save_buffer(
603 request: Box<TypedEnvelope<proto::SaveBuffer>>,
604 server: Arc<Server>,
605) -> tide::Result<()> {
606 let host;
607 let guests;
608 {
609 let state = server.state.rpc.read().await;
610 let worktree = state.read_worktree(request.payload.worktree_id, request.sender_id)?;
611 host = worktree.host_connection_id()?;
612 guests = worktree
613 .guest_connection_ids
614 .keys()
615 .copied()
616 .collect::<Vec<_>>();
617 }
618
619 let sender = request.sender_id;
620 let receipt = request.receipt();
621 let response = server
622 .rpc
623 .forward_request(sender, host, request.payload.clone())
624 .await?;
625
626 broadcast(host, guests, |conn_id| {
627 let response = response.clone();
628 let server = &server;
629 async move {
630 if conn_id == sender {
631 server.rpc.respond(receipt, response).await
632 } else {
633 server.rpc.forward_send(host, conn_id, response).await
634 }
635 }
636 })
637 .await?;
638
639 Ok(())
640}
641
642async fn update_buffer(
643 request: Box<TypedEnvelope<proto::UpdateBuffer>>,
644 server: Arc<Server>,
645) -> tide::Result<()> {
646 broadcast_in_worktree(request.payload.worktree_id, &request, &server).await
647}
648
649async fn buffer_saved(
650 request: Box<TypedEnvelope<proto::BufferSaved>>,
651 server: Arc<Server>,
652) -> tide::Result<()> {
653 broadcast_in_worktree(request.payload.worktree_id, &request, &server).await
654}
655
656async fn get_channels(
657 request: Box<TypedEnvelope<proto::GetChannels>>,
658 server: Arc<Server>,
659) -> tide::Result<()> {
660 let user_id = server
661 .state
662 .rpc
663 .read()
664 .await
665 .user_id_for_connection(request.sender_id)?;
666 let channels = server.state.db.get_channels_for_user(user_id).await?;
667 server
668 .rpc
669 .respond(
670 request.receipt(),
671 proto::GetChannelsResponse {
672 channels: channels
673 .into_iter()
674 .map(|chan| proto::Channel {
675 id: chan.id.to_proto(),
676 name: chan.name,
677 })
678 .collect(),
679 },
680 )
681 .await?;
682 Ok(())
683}
684
685async fn get_users(
686 request: Box<TypedEnvelope<proto::GetUsers>>,
687 server: Arc<Server>,
688) -> tide::Result<()> {
689 let user_id = server
690 .state
691 .rpc
692 .read()
693 .await
694 .user_id_for_connection(request.sender_id)?;
695 let receipt = request.receipt();
696 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
697 let users = server
698 .state
699 .db
700 .get_users_by_ids(user_id, user_ids)
701 .await?
702 .into_iter()
703 .map(|user| proto::User {
704 id: user.id.to_proto(),
705 github_login: user.github_login,
706 avatar_url: String::new(),
707 })
708 .collect();
709 server
710 .rpc
711 .respond(receipt, proto::GetUsersResponse { users })
712 .await?;
713 Ok(())
714}
715
716async fn join_channel(
717 request: Box<TypedEnvelope<proto::JoinChannel>>,
718 server: Arc<Server>,
719) -> tide::Result<()> {
720 let user_id = server
721 .state
722 .rpc
723 .read()
724 .await
725 .user_id_for_connection(request.sender_id)?;
726 let channel_id = ChannelId::from_proto(request.payload.channel_id);
727 if !server
728 .state
729 .db
730 .can_user_access_channel(user_id, channel_id)
731 .await?
732 {
733 Err(anyhow!("access denied"))?;
734 }
735
736 server
737 .state
738 .rpc
739 .write()
740 .await
741 .join_channel(request.sender_id, channel_id);
742 let messages = server
743 .state
744 .db
745 .get_recent_channel_messages(channel_id, 50)
746 .await?
747 .into_iter()
748 .map(|msg| proto::ChannelMessage {
749 id: msg.id.to_proto(),
750 body: msg.body,
751 timestamp: msg.sent_at.unix_timestamp() as u64,
752 sender_id: msg.sender_id.to_proto(),
753 })
754 .collect();
755 server
756 .rpc
757 .respond(request.receipt(), proto::JoinChannelResponse { messages })
758 .await?;
759 Ok(())
760}
761
762async fn send_channel_message(
763 request: Box<TypedEnvelope<proto::SendChannelMessage>>,
764 server: Arc<Server>,
765) -> tide::Result<()> {
766 let channel_id = ChannelId::from_proto(request.payload.channel_id);
767 let user_id;
768 let connection_ids;
769 {
770 let state = server.state.rpc.read().await;
771 user_id = state.user_id_for_connection(request.sender_id)?;
772 if let Some(channel) = state.channels.get(&channel_id) {
773 connection_ids = channel.connection_ids();
774 } else {
775 return Ok(());
776 }
777 }
778
779 let timestamp = OffsetDateTime::now_utc();
780 let message_id = server
781 .state
782 .db
783 .create_channel_message(channel_id, user_id, &request.payload.body, timestamp)
784 .await?;
785 let message = proto::ChannelMessageSent {
786 channel_id: channel_id.to_proto(),
787 message: Some(proto::ChannelMessage {
788 sender_id: user_id.to_proto(),
789 id: message_id.to_proto(),
790 body: request.payload.body,
791 timestamp: timestamp.unix_timestamp() as u64,
792 }),
793 };
794 broadcast(request.sender_id, connection_ids, |conn_id| {
795 server.rpc.send(conn_id, message.clone())
796 })
797 .await?;
798
799 Ok(())
800}
801
802async fn broadcast_in_worktree<T: proto::EnvelopedMessage>(
803 worktree_id: u64,
804 request: &TypedEnvelope<T>,
805 server: &Arc<Server>,
806) -> tide::Result<()> {
807 let connection_ids = server
808 .state
809 .rpc
810 .read()
811 .await
812 .read_worktree(worktree_id, request.sender_id)?
813 .connection_ids();
814
815 broadcast(request.sender_id, connection_ids, |conn_id| {
816 server
817 .rpc
818 .forward_send(request.sender_id, conn_id, request.payload.clone())
819 })
820 .await?;
821
822 Ok(())
823}
824
825pub async fn broadcast<F, T>(
826 sender_id: ConnectionId,
827 receiver_ids: Vec<ConnectionId>,
828 mut f: F,
829) -> anyhow::Result<()>
830where
831 F: FnMut(ConnectionId) -> T,
832 T: Future<Output = anyhow::Result<()>>,
833{
834 let futures = receiver_ids
835 .into_iter()
836 .filter(|id| *id != sender_id)
837 .map(|id| f(id));
838 futures::future::try_join_all(futures).await?;
839 Ok(())
840}
841
842fn header_contains_ignore_case<T>(
843 request: &tide::Request<T>,
844 header_name: HeaderName,
845 value: &str,
846) -> bool {
847 request
848 .header(header_name)
849 .map(|h| {
850 h.as_str()
851 .split(',')
852 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
853 })
854 .unwrap_or(false)
855}