rpc.rs

  1use super::{
  2    auth::{self, PeerExt as _},
  3    db::{ChannelId, UserId},
  4    AppState,
  5};
  6use anyhow::anyhow;
  7use async_std::task;
  8use async_tungstenite::{
  9    tungstenite::{protocol::Role, Error as WebSocketError, Message as WebSocketMessage},
 10    WebSocketStream,
 11};
 12use futures::{future::BoxFuture, FutureExt};
 13use postage::prelude::Stream as _;
 14use sha1::{Digest as _, Sha1};
 15use std::{
 16    any::{Any, TypeId},
 17    collections::{HashMap, HashSet},
 18    future::Future,
 19    mem,
 20    sync::Arc,
 21    time::Instant,
 22};
 23use surf::StatusCode;
 24use tide::log;
 25use tide::{
 26    http::headers::{HeaderName, CONNECTION, UPGRADE},
 27    Request, Response,
 28};
 29use time::OffsetDateTime;
 30use zrpc::{
 31    auth::random_token,
 32    proto::{self, EnvelopedMessage},
 33    ConnectionId, Peer, TypedEnvelope,
 34};
 35
 36type ReplicaId = u16;
 37
 38type Handler = Box<
 39    dyn Send
 40        + Sync
 41        + Fn(&mut Option<Box<dyn Any + Send + Sync>>, Arc<Server>) -> Option<BoxFuture<'static, ()>>,
 42>;
 43
 44#[derive(Default)]
 45struct ServerBuilder {
 46    handlers: Vec<Handler>,
 47    handler_types: HashSet<TypeId>,
 48}
 49
 50impl ServerBuilder {
 51    pub fn on_message<F, Fut, M>(&mut self, handler: F) -> &mut Self
 52    where
 53        F: 'static + Send + Sync + Fn(Box<TypedEnvelope<M>>, Arc<Server>) -> Fut,
 54        Fut: 'static + Send + Future<Output = ()>,
 55        M: EnvelopedMessage,
 56    {
 57        if self.handler_types.insert(TypeId::of::<M>()) {
 58            panic!("registered a handler for the same message twice");
 59        }
 60
 61        self.handlers
 62            .push(Box::new(move |untyped_envelope, server| {
 63                if let Some(typed_envelope) = untyped_envelope.take() {
 64                    match typed_envelope.downcast::<TypedEnvelope<M>>() {
 65                        Ok(typed_envelope) => Some((handler)(typed_envelope, server).boxed()),
 66                        Err(envelope) => {
 67                            *untyped_envelope = Some(envelope);
 68                            None
 69                        }
 70                    }
 71                } else {
 72                    None
 73                }
 74            }));
 75        self
 76    }
 77
 78    pub fn build(self, rpc: &Arc<Peer>, state: &Arc<AppState>) -> Arc<Server> {
 79        Arc::new(Server {
 80            rpc: rpc.clone(),
 81            state: state.clone(),
 82            handlers: self.handlers,
 83        })
 84    }
 85}
 86
 87pub struct Server {
 88    rpc: Arc<Peer>,
 89    state: Arc<AppState>,
 90    handlers: Vec<Handler>,
 91}
 92
 93impl Server {
 94    pub async fn handle_connection<Conn>(
 95        self: &Arc<Self>,
 96        connection: Conn,
 97        addr: String,
 98        user_id: UserId,
 99    ) where
100        Conn: 'static
101            + futures::Sink<WebSocketMessage, Error = WebSocketError>
102            + futures::Stream<Item = Result<WebSocketMessage, WebSocketError>>
103            + Send
104            + Unpin,
105    {
106        let this = self.clone();
107        let (connection_id, handle_io, mut incoming_rx) = this.rpc.add_connection(connection).await;
108        this.state
109            .rpc
110            .write()
111            .await
112            .add_connection(connection_id, user_id);
113
114        let handle_io = handle_io.fuse();
115        futures::pin_mut!(handle_io);
116        loop {
117            let next_message = incoming_rx.recv().fuse();
118            futures::pin_mut!(next_message);
119            futures::select_biased! {
120                message = next_message => {
121                    if let Some(message) = message {
122                        let mut message = Some(message);
123                        for handler in &this.handlers {
124                            if let Some(future) = (handler)(&mut message, this.clone()) {
125                                future.await;
126                                break;
127                            }
128                        }
129
130                        if let Some(message) = message {
131                            log::warn!("unhandled message: {:?}", message);
132                        }
133                    } else {
134                        log::info!("rpc connection closed {:?}", addr);
135                        break;
136                    }
137                }
138                handle_io = handle_io => {
139                    if let Err(err) = handle_io {
140                        log::error!("error handling rpc connection {:?} - {:?}", addr, err);
141                    }
142                    break;
143                }
144            }
145        }
146
147        if let Err(err) = this.rpc.sign_out(connection_id, &this.state).await {
148            log::error!("error signing out connection {:?} - {:?}", addr, err);
149        }
150    }
151}
152
153#[derive(Default)]
154pub struct State {
155    connections: HashMap<ConnectionId, Connection>,
156    pub worktrees: HashMap<u64, Worktree>,
157    channels: HashMap<ChannelId, Channel>,
158    next_worktree_id: u64,
159}
160
161struct Connection {
162    user_id: UserId,
163    worktrees: HashSet<u64>,
164    channels: HashSet<ChannelId>,
165}
166
167pub struct Worktree {
168    host_connection_id: Option<ConnectionId>,
169    guest_connection_ids: HashMap<ConnectionId, ReplicaId>,
170    active_replica_ids: HashSet<ReplicaId>,
171    access_token: String,
172    root_name: String,
173    entries: HashMap<u64, proto::Entry>,
174}
175
176#[derive(Default)]
177struct Channel {
178    connection_ids: HashSet<ConnectionId>,
179}
180
181impl Worktree {
182    pub fn connection_ids(&self) -> Vec<ConnectionId> {
183        self.guest_connection_ids
184            .keys()
185            .copied()
186            .chain(self.host_connection_id)
187            .collect()
188    }
189
190    fn host_connection_id(&self) -> tide::Result<ConnectionId> {
191        Ok(self
192            .host_connection_id
193            .ok_or_else(|| anyhow!("host disconnected from worktree"))?)
194    }
195}
196
197impl Channel {
198    fn connection_ids(&self) -> Vec<ConnectionId> {
199        self.connection_ids.iter().copied().collect()
200    }
201}
202
203impl State {
204    // Add a new connection associated with a given user.
205    pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) {
206        self.connections.insert(
207            connection_id,
208            Connection {
209                user_id,
210                worktrees: Default::default(),
211                channels: Default::default(),
212            },
213        );
214    }
215
216    // Remove the given connection and its association with any worktrees.
217    pub fn remove_connection(&mut self, connection_id: ConnectionId) -> Vec<u64> {
218        let mut worktree_ids = Vec::new();
219        if let Some(connection) = self.connections.remove(&connection_id) {
220            for channel_id in connection.channels {
221                if let Some(channel) = self.channels.get_mut(&channel_id) {
222                    channel.connection_ids.remove(&connection_id);
223                }
224            }
225            for worktree_id in connection.worktrees {
226                if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
227                    if worktree.host_connection_id == Some(connection_id) {
228                        worktree_ids.push(worktree_id);
229                    } else if let Some(replica_id) =
230                        worktree.guest_connection_ids.remove(&connection_id)
231                    {
232                        worktree.active_replica_ids.remove(&replica_id);
233                        worktree_ids.push(worktree_id);
234                    }
235                }
236            }
237        }
238        worktree_ids
239    }
240
241    fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
242        if let Some(connection) = self.connections.get_mut(&connection_id) {
243            connection.channels.insert(channel_id);
244            self.channels
245                .entry(channel_id)
246                .or_default()
247                .connection_ids
248                .insert(connection_id);
249        }
250    }
251
252    // Add the given connection as a guest of the given worktree
253    pub fn join_worktree(
254        &mut self,
255        connection_id: ConnectionId,
256        worktree_id: u64,
257        access_token: &str,
258    ) -> Option<(ReplicaId, &Worktree)> {
259        if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
260            if access_token == worktree.access_token {
261                if let Some(connection) = self.connections.get_mut(&connection_id) {
262                    connection.worktrees.insert(worktree_id);
263                }
264
265                let mut replica_id = 1;
266                while worktree.active_replica_ids.contains(&replica_id) {
267                    replica_id += 1;
268                }
269                worktree.active_replica_ids.insert(replica_id);
270                worktree
271                    .guest_connection_ids
272                    .insert(connection_id, replica_id);
273                Some((replica_id, worktree))
274            } else {
275                None
276            }
277        } else {
278            None
279        }
280    }
281
282    fn user_id_for_connection(&self, connection_id: ConnectionId) -> tide::Result<UserId> {
283        Ok(self
284            .connections
285            .get(&connection_id)
286            .ok_or_else(|| anyhow!("unknown connection"))?
287            .user_id)
288    }
289
290    fn read_worktree(
291        &self,
292        worktree_id: u64,
293        connection_id: ConnectionId,
294    ) -> tide::Result<&Worktree> {
295        let worktree = self
296            .worktrees
297            .get(&worktree_id)
298            .ok_or_else(|| anyhow!("worktree not found"))?;
299
300        if worktree.host_connection_id == Some(connection_id)
301            || worktree.guest_connection_ids.contains_key(&connection_id)
302        {
303            Ok(worktree)
304        } else {
305            Err(anyhow!(
306                "{} is not a member of worktree {}",
307                connection_id,
308                worktree_id
309            ))?
310        }
311    }
312
313    fn write_worktree(
314        &mut self,
315        worktree_id: u64,
316        connection_id: ConnectionId,
317    ) -> tide::Result<&mut Worktree> {
318        let worktree = self
319            .worktrees
320            .get_mut(&worktree_id)
321            .ok_or_else(|| anyhow!("worktree not found"))?;
322
323        if worktree.host_connection_id == Some(connection_id)
324            || worktree.guest_connection_ids.contains_key(&connection_id)
325        {
326            Ok(worktree)
327        } else {
328            Err(anyhow!(
329                "{} is not a member of worktree {}",
330                connection_id,
331                worktree_id
332            ))?
333        }
334    }
335}
336
337pub fn build_server(state: &Arc<AppState>, rpc: &Arc<Peer>) -> Arc<Server> {
338    ServerBuilder::default()
339        // .on_message(share_worktree)
340        // .on_message(join_worktree)
341        // .on_message(update_worktree)
342        // .on_message(close_worktree)
343        // .on_message(open_buffer)
344        // .on_message(close_buffer)
345        // .on_message(update_buffer)
346        // .on_message(buffer_saved)
347        // .on_message(save_buffer)
348        // .on_message(get_channels)
349        // .on_message(get_users)
350        // .on_message(join_channel)
351        // .on_message(send_channel_message)
352        .build(rpc, state)
353}
354
355pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
356    let server = build_server(app.state(), rpc);
357
358    let rpc = rpc.clone();
359    app.at("/rpc").with(auth::VerifyToken).get(move |request: Request<Arc<AppState>>| {
360        let user_id = request.ext::<UserId>().copied();
361        let server = server.clone();
362        async move {
363            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
364
365            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
366            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
367            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
368
369            if !upgrade_requested {
370                return Ok(Response::new(StatusCode::UpgradeRequired));
371            }
372
373            let header = match request.header("Sec-Websocket-Key") {
374                Some(h) => h.as_str(),
375                None => return Err(anyhow!("expected sec-websocket-key"))?,
376            };
377
378            let mut response = Response::new(StatusCode::SwitchingProtocols);
379            response.insert_header(UPGRADE, "websocket");
380            response.insert_header(CONNECTION, "Upgrade");
381            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
382            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
383            response.insert_header("Sec-Websocket-Version", "13");
384
385            let http_res: &mut tide::http::Response = response.as_mut();
386            let upgrade_receiver = http_res.recv_upgrade().await;
387            let addr = request.remote().unwrap_or("unknown").to_string();
388            let user_id = user_id.ok_or_else(|| anyhow!("user_id is not present on request. ensure auth::VerifyToken middleware is present"))?;
389            task::spawn(async move {
390                if let Some(stream) = upgrade_receiver.await {
391                    let stream = WebSocketStream::from_raw_socket(stream, Role::Server, None).await;
392                    server.handle_connection(stream, addr, user_id).await;
393                }
394            });
395
396            Ok(response)
397        }
398    });
399}
400
401async fn share_worktree(
402    mut request: TypedEnvelope<proto::ShareWorktree>,
403    rpc: &Arc<Peer>,
404    state: &Arc<AppState>,
405) -> tide::Result<()> {
406    let mut state = state.rpc.write().await;
407    let worktree_id = state.next_worktree_id;
408    state.next_worktree_id += 1;
409    let access_token = random_token();
410    let worktree = request
411        .payload
412        .worktree
413        .as_mut()
414        .ok_or_else(|| anyhow!("missing worktree"))?;
415    let entries = mem::take(&mut worktree.entries)
416        .into_iter()
417        .map(|entry| (entry.id, entry))
418        .collect();
419    state.worktrees.insert(
420        worktree_id,
421        Worktree {
422            host_connection_id: Some(request.sender_id),
423            guest_connection_ids: Default::default(),
424            active_replica_ids: Default::default(),
425            access_token: access_token.clone(),
426            root_name: mem::take(&mut worktree.root_name),
427            entries,
428        },
429    );
430
431    rpc.respond(
432        request.receipt(),
433        proto::ShareWorktreeResponse {
434            worktree_id,
435            access_token,
436        },
437    )
438    .await?;
439    Ok(())
440}
441
442async fn join_worktree(
443    request: TypedEnvelope<proto::OpenWorktree>,
444    rpc: &Arc<Peer>,
445    state: &Arc<AppState>,
446) -> tide::Result<()> {
447    let worktree_id = request.payload.worktree_id;
448    let access_token = &request.payload.access_token;
449
450    let mut state = state.rpc.write().await;
451    if let Some((peer_replica_id, worktree)) =
452        state.join_worktree(request.sender_id, worktree_id, access_token)
453    {
454        let mut peers = Vec::new();
455        if let Some(host_connection_id) = worktree.host_connection_id {
456            peers.push(proto::Peer {
457                peer_id: host_connection_id.0,
458                replica_id: 0,
459            });
460        }
461        for (peer_conn_id, peer_replica_id) in &worktree.guest_connection_ids {
462            if *peer_conn_id != request.sender_id {
463                peers.push(proto::Peer {
464                    peer_id: peer_conn_id.0,
465                    replica_id: *peer_replica_id as u32,
466                });
467            }
468        }
469
470        broadcast(request.sender_id, worktree.connection_ids(), |conn_id| {
471            rpc.send(
472                conn_id,
473                proto::AddPeer {
474                    worktree_id,
475                    peer: Some(proto::Peer {
476                        peer_id: request.sender_id.0,
477                        replica_id: peer_replica_id as u32,
478                    }),
479                },
480            )
481        })
482        .await?;
483        rpc.respond(
484            request.receipt(),
485            proto::OpenWorktreeResponse {
486                worktree_id,
487                worktree: Some(proto::Worktree {
488                    root_name: worktree.root_name.clone(),
489                    entries: worktree.entries.values().cloned().collect(),
490                }),
491                replica_id: peer_replica_id as u32,
492                peers,
493            },
494        )
495        .await?;
496    } else {
497        rpc.respond(
498            request.receipt(),
499            proto::OpenWorktreeResponse {
500                worktree_id,
501                worktree: None,
502                replica_id: 0,
503                peers: Vec::new(),
504            },
505        )
506        .await?;
507    }
508
509    Ok(())
510}
511
512async fn update_worktree(
513    request: TypedEnvelope<proto::UpdateWorktree>,
514    rpc: &Arc<Peer>,
515    state: &Arc<AppState>,
516) -> tide::Result<()> {
517    {
518        let mut state = state.rpc.write().await;
519        let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
520        for entry_id in &request.payload.removed_entries {
521            worktree.entries.remove(&entry_id);
522        }
523
524        for entry in &request.payload.updated_entries {
525            worktree.entries.insert(entry.id, entry.clone());
526        }
527    }
528
529    broadcast_in_worktree(request.payload.worktree_id, request, rpc, state).await?;
530    Ok(())
531}
532
533async fn close_worktree(
534    request: TypedEnvelope<proto::CloseWorktree>,
535    rpc: &Arc<Peer>,
536    state: &Arc<AppState>,
537) -> tide::Result<()> {
538    let connection_ids;
539    {
540        let mut state = state.rpc.write().await;
541        let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
542        connection_ids = worktree.connection_ids();
543        if worktree.host_connection_id == Some(request.sender_id) {
544            worktree.host_connection_id = None;
545        } else if let Some(replica_id) = worktree.guest_connection_ids.remove(&request.sender_id) {
546            worktree.active_replica_ids.remove(&replica_id);
547        }
548    }
549
550    broadcast(request.sender_id, connection_ids, |conn_id| {
551        rpc.send(
552            conn_id,
553            proto::RemovePeer {
554                worktree_id: request.payload.worktree_id,
555                peer_id: request.sender_id.0,
556            },
557        )
558    })
559    .await?;
560
561    Ok(())
562}
563
564async fn open_buffer(
565    request: TypedEnvelope<proto::OpenBuffer>,
566    rpc: &Arc<Peer>,
567    state: &Arc<AppState>,
568) -> tide::Result<()> {
569    let receipt = request.receipt();
570    let worktree_id = request.payload.worktree_id;
571    let host_connection_id = state
572        .rpc
573        .read()
574        .await
575        .read_worktree(worktree_id, request.sender_id)?
576        .host_connection_id()?;
577
578    let response = rpc
579        .forward_request(request.sender_id, host_connection_id, request.payload)
580        .await?;
581    rpc.respond(receipt, response).await?;
582    Ok(())
583}
584
585async fn close_buffer(
586    request: TypedEnvelope<proto::CloseBuffer>,
587    rpc: &Arc<Peer>,
588    state: &Arc<AppState>,
589) -> tide::Result<()> {
590    let host_connection_id = state
591        .rpc
592        .read()
593        .await
594        .read_worktree(request.payload.worktree_id, request.sender_id)?
595        .host_connection_id()?;
596
597    rpc.forward_send(request.sender_id, host_connection_id, request.payload)
598        .await?;
599
600    Ok(())
601}
602
603async fn save_buffer(
604    request: TypedEnvelope<proto::SaveBuffer>,
605    rpc: &Arc<Peer>,
606    state: &Arc<AppState>,
607) -> tide::Result<()> {
608    let host;
609    let guests;
610    {
611        let state = state.rpc.read().await;
612        let worktree = state.read_worktree(request.payload.worktree_id, request.sender_id)?;
613        host = worktree.host_connection_id()?;
614        guests = worktree
615            .guest_connection_ids
616            .keys()
617            .copied()
618            .collect::<Vec<_>>();
619    }
620
621    let sender = request.sender_id;
622    let receipt = request.receipt();
623    let response = rpc
624        .forward_request(sender, host, request.payload.clone())
625        .await?;
626
627    broadcast(host, guests, |conn_id| {
628        let response = response.clone();
629        async move {
630            if conn_id == sender {
631                rpc.respond(receipt, response).await
632            } else {
633                rpc.forward_send(host, conn_id, response).await
634            }
635        }
636    })
637    .await?;
638
639    Ok(())
640}
641
642async fn update_buffer(
643    request: TypedEnvelope<proto::UpdateBuffer>,
644    rpc: &Arc<Peer>,
645    state: &Arc<AppState>,
646) -> tide::Result<()> {
647    broadcast_in_worktree(request.payload.worktree_id, request, rpc, state).await
648}
649
650async fn buffer_saved(
651    request: TypedEnvelope<proto::BufferSaved>,
652    rpc: &Arc<Peer>,
653    state: &Arc<AppState>,
654) -> tide::Result<()> {
655    broadcast_in_worktree(request.payload.worktree_id, request, rpc, state).await
656}
657
658async fn get_channels(
659    request: TypedEnvelope<proto::GetChannels>,
660    rpc: &Arc<Peer>,
661    state: &Arc<AppState>,
662) -> tide::Result<()> {
663    let user_id = state
664        .rpc
665        .read()
666        .await
667        .user_id_for_connection(request.sender_id)?;
668    let channels = state.db.get_channels_for_user(user_id).await?;
669    rpc.respond(
670        request.receipt(),
671        proto::GetChannelsResponse {
672            channels: channels
673                .into_iter()
674                .map(|chan| proto::Channel {
675                    id: chan.id.to_proto(),
676                    name: chan.name,
677                })
678                .collect(),
679        },
680    )
681    .await?;
682    Ok(())
683}
684
685async fn get_users(
686    request: TypedEnvelope<proto::GetUsers>,
687    rpc: &Arc<Peer>,
688    state: &Arc<AppState>,
689) -> tide::Result<()> {
690    let user_id = state
691        .rpc
692        .read()
693        .await
694        .user_id_for_connection(request.sender_id)?;
695    let receipt = request.receipt();
696    let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
697    let users = state
698        .db
699        .get_users_by_ids(user_id, user_ids)
700        .await?
701        .into_iter()
702        .map(|user| proto::User {
703            id: user.id.to_proto(),
704            github_login: user.github_login,
705            avatar_url: String::new(),
706        })
707        .collect();
708    rpc.respond(receipt, proto::GetUsersResponse { users })
709        .await?;
710    Ok(())
711}
712
713async fn join_channel(
714    request: TypedEnvelope<proto::JoinChannel>,
715    rpc: &Arc<Peer>,
716    state: &Arc<AppState>,
717) -> tide::Result<()> {
718    let user_id = state
719        .rpc
720        .read()
721        .await
722        .user_id_for_connection(request.sender_id)?;
723    let channel_id = ChannelId::from_proto(request.payload.channel_id);
724    if !state
725        .db
726        .can_user_access_channel(user_id, channel_id)
727        .await?
728    {
729        Err(anyhow!("access denied"))?;
730    }
731
732    state
733        .rpc
734        .write()
735        .await
736        .join_channel(request.sender_id, channel_id);
737    let messages = state
738        .db
739        .get_recent_channel_messages(channel_id, 50)
740        .await?
741        .into_iter()
742        .map(|msg| proto::ChannelMessage {
743            id: msg.id.to_proto(),
744            body: msg.body,
745            timestamp: msg.sent_at.unix_timestamp() as u64,
746            sender_id: msg.sender_id.to_proto(),
747        })
748        .collect();
749    rpc.respond(request.receipt(), proto::JoinChannelResponse { messages })
750        .await?;
751    Ok(())
752}
753
754async fn send_channel_message(
755    request: TypedEnvelope<proto::SendChannelMessage>,
756    peer: &Arc<Peer>,
757    app: &Arc<AppState>,
758) -> tide::Result<()> {
759    let channel_id = ChannelId::from_proto(request.payload.channel_id);
760    let user_id;
761    let connection_ids;
762    {
763        let state = app.rpc.read().await;
764        user_id = state.user_id_for_connection(request.sender_id)?;
765        if let Some(channel) = state.channels.get(&channel_id) {
766            connection_ids = channel.connection_ids();
767        } else {
768            return Ok(());
769        }
770    }
771
772    let timestamp = OffsetDateTime::now_utc();
773    let message_id = app
774        .db
775        .create_channel_message(channel_id, user_id, &request.payload.body, timestamp)
776        .await?;
777    let message = proto::ChannelMessageSent {
778        channel_id: channel_id.to_proto(),
779        message: Some(proto::ChannelMessage {
780            sender_id: user_id.to_proto(),
781            id: message_id.to_proto(),
782            body: request.payload.body,
783            timestamp: timestamp.unix_timestamp() as u64,
784        }),
785    };
786    broadcast(request.sender_id, connection_ids, |conn_id| {
787        peer.send(conn_id, message.clone())
788    })
789    .await?;
790
791    Ok(())
792}
793
794async fn broadcast_in_worktree<T: proto::EnvelopedMessage>(
795    worktree_id: u64,
796    request: TypedEnvelope<T>,
797    rpc: &Arc<Peer>,
798    state: &Arc<AppState>,
799) -> tide::Result<()> {
800    let connection_ids = state
801        .rpc
802        .read()
803        .await
804        .read_worktree(worktree_id, request.sender_id)?
805        .connection_ids();
806
807    broadcast(request.sender_id, connection_ids, |conn_id| {
808        rpc.forward_send(request.sender_id, conn_id, request.payload.clone())
809    })
810    .await?;
811
812    Ok(())
813}
814
815pub async fn broadcast<F, T>(
816    sender_id: ConnectionId,
817    receiver_ids: Vec<ConnectionId>,
818    mut f: F,
819) -> anyhow::Result<()>
820where
821    F: FnMut(ConnectionId) -> T,
822    T: Future<Output = anyhow::Result<()>>,
823{
824    let futures = receiver_ids
825        .into_iter()
826        .filter(|id| *id != sender_id)
827        .map(|id| f(id));
828    futures::future::try_join_all(futures).await?;
829    Ok(())
830}
831
832fn header_contains_ignore_case<T>(
833    request: &tide::Request<T>,
834    header_name: HeaderName,
835    value: &str,
836) -> bool {
837    request
838        .header(header_name)
839        .map(|h| {
840            h.as_str()
841                .split(',')
842                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
843        })
844        .unwrap_or(false)
845}