rpc.rs

  1use super::{
  2    auth::{self, PeerExt as _},
  3    db::{ChannelId, UserId},
  4    AppState,
  5};
  6use anyhow::anyhow;
  7use async_std::task;
  8use async_tungstenite::{
  9    tungstenite::{protocol::Role, Error as WebSocketError, Message as WebSocketMessage},
 10    WebSocketStream,
 11};
 12use sha1::{Digest as _, Sha1};
 13use std::{
 14    collections::{HashMap, HashSet},
 15    future::Future,
 16    mem,
 17    sync::Arc,
 18    time::Instant,
 19};
 20use surf::StatusCode;
 21use tide::log;
 22use tide::{
 23    http::headers::{HeaderName, CONNECTION, UPGRADE},
 24    Request, Response,
 25};
 26use time::OffsetDateTime;
 27use zrpc::{
 28    auth::random_token,
 29    proto::{self, EnvelopedMessage},
 30    ConnectionId, Peer, Router, TypedEnvelope,
 31};
 32
 33type ReplicaId = u16;
 34
 35#[derive(Default)]
 36pub struct State {
 37    connections: HashMap<ConnectionId, Connection>,
 38    pub worktrees: HashMap<u64, Worktree>,
 39    channels: HashMap<ChannelId, Channel>,
 40    next_worktree_id: u64,
 41}
 42
 43struct Connection {
 44    user_id: UserId,
 45    worktrees: HashSet<u64>,
 46    channels: HashSet<ChannelId>,
 47}
 48
 49pub struct Worktree {
 50    host_connection_id: Option<ConnectionId>,
 51    guest_connection_ids: HashMap<ConnectionId, ReplicaId>,
 52    active_replica_ids: HashSet<ReplicaId>,
 53    access_token: String,
 54    root_name: String,
 55    entries: HashMap<u64, proto::Entry>,
 56}
 57
 58#[derive(Default)]
 59struct Channel {
 60    connection_ids: HashSet<ConnectionId>,
 61}
 62
 63impl Worktree {
 64    pub fn connection_ids(&self) -> Vec<ConnectionId> {
 65        self.guest_connection_ids
 66            .keys()
 67            .copied()
 68            .chain(self.host_connection_id)
 69            .collect()
 70    }
 71
 72    fn host_connection_id(&self) -> tide::Result<ConnectionId> {
 73        Ok(self
 74            .host_connection_id
 75            .ok_or_else(|| anyhow!("host disconnected from worktree"))?)
 76    }
 77}
 78
 79impl Channel {
 80    fn connection_ids(&self) -> Vec<ConnectionId> {
 81        self.connection_ids.iter().copied().collect()
 82    }
 83}
 84
 85impl State {
 86    // Add a new connection associated with a given user.
 87    pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) {
 88        self.connections.insert(
 89            connection_id,
 90            Connection {
 91                user_id,
 92                worktrees: Default::default(),
 93                channels: Default::default(),
 94            },
 95        );
 96    }
 97
 98    // Remove the given connection and its association with any worktrees.
 99    pub fn remove_connection(&mut self, connection_id: ConnectionId) -> Vec<u64> {
100        let mut worktree_ids = Vec::new();
101        if let Some(connection) = self.connections.remove(&connection_id) {
102            for channel_id in connection.channels {
103                if let Some(channel) = self.channels.get_mut(&channel_id) {
104                    channel.connection_ids.remove(&connection_id);
105                }
106            }
107            for worktree_id in connection.worktrees {
108                if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
109                    if worktree.host_connection_id == Some(connection_id) {
110                        worktree_ids.push(worktree_id);
111                    } else if let Some(replica_id) =
112                        worktree.guest_connection_ids.remove(&connection_id)
113                    {
114                        worktree.active_replica_ids.remove(&replica_id);
115                        worktree_ids.push(worktree_id);
116                    }
117                }
118            }
119        }
120        worktree_ids
121    }
122
123    fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
124        if let Some(connection) = self.connections.get_mut(&connection_id) {
125            connection.channels.insert(channel_id);
126            self.channels
127                .entry(channel_id)
128                .or_default()
129                .connection_ids
130                .insert(connection_id);
131        }
132    }
133
134    // Add the given connection as a guest of the given worktree
135    pub fn join_worktree(
136        &mut self,
137        connection_id: ConnectionId,
138        worktree_id: u64,
139        access_token: &str,
140    ) -> Option<(ReplicaId, &Worktree)> {
141        if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
142            if access_token == worktree.access_token {
143                if let Some(connection) = self.connections.get_mut(&connection_id) {
144                    connection.worktrees.insert(worktree_id);
145                }
146
147                let mut replica_id = 1;
148                while worktree.active_replica_ids.contains(&replica_id) {
149                    replica_id += 1;
150                }
151                worktree.active_replica_ids.insert(replica_id);
152                worktree
153                    .guest_connection_ids
154                    .insert(connection_id, replica_id);
155                Some((replica_id, worktree))
156            } else {
157                None
158            }
159        } else {
160            None
161        }
162    }
163
164    fn user_id_for_connection(&self, connection_id: ConnectionId) -> tide::Result<UserId> {
165        Ok(self
166            .connections
167            .get(&connection_id)
168            .ok_or_else(|| anyhow!("unknown connection"))?
169            .user_id)
170    }
171
172    fn read_worktree(
173        &self,
174        worktree_id: u64,
175        connection_id: ConnectionId,
176    ) -> tide::Result<&Worktree> {
177        let worktree = self
178            .worktrees
179            .get(&worktree_id)
180            .ok_or_else(|| anyhow!("worktree not found"))?;
181
182        if worktree.host_connection_id == Some(connection_id)
183            || worktree.guest_connection_ids.contains_key(&connection_id)
184        {
185            Ok(worktree)
186        } else {
187            Err(anyhow!(
188                "{} is not a member of worktree {}",
189                connection_id,
190                worktree_id
191            ))?
192        }
193    }
194
195    fn write_worktree(
196        &mut self,
197        worktree_id: u64,
198        connection_id: ConnectionId,
199    ) -> tide::Result<&mut Worktree> {
200        let worktree = self
201            .worktrees
202            .get_mut(&worktree_id)
203            .ok_or_else(|| anyhow!("worktree not found"))?;
204
205        if worktree.host_connection_id == Some(connection_id)
206            || worktree.guest_connection_ids.contains_key(&connection_id)
207        {
208            Ok(worktree)
209        } else {
210            Err(anyhow!(
211                "{} is not a member of worktree {}",
212                connection_id,
213                worktree_id
214            ))?
215        }
216    }
217}
218
219trait MessageHandler<'a, M: proto::EnvelopedMessage> {
220    type Output: 'a + Send + Future<Output = tide::Result<()>>;
221
222    fn handle(
223        &self,
224        message: TypedEnvelope<M>,
225        rpc: &'a Arc<Peer>,
226        app_state: &'a Arc<AppState>,
227    ) -> Self::Output;
228}
229
230impl<'a, M, F, Fut> MessageHandler<'a, M> for F
231where
232    M: proto::EnvelopedMessage,
233    F: Fn(TypedEnvelope<M>, &'a Arc<Peer>, &'a Arc<AppState>) -> Fut,
234    Fut: 'a + Send + Future<Output = tide::Result<()>>,
235{
236    type Output = Fut;
237
238    fn handle(
239        &self,
240        message: TypedEnvelope<M>,
241        rpc: &'a Arc<Peer>,
242        app_state: &'a Arc<AppState>,
243    ) -> Self::Output {
244        (self)(message, rpc, app_state)
245    }
246}
247
248fn on_message<M, H>(router: &mut Router, rpc: &Arc<Peer>, app_state: &Arc<AppState>, handler: H)
249where
250    M: EnvelopedMessage,
251    H: 'static + Clone + Send + Sync + for<'a> MessageHandler<'a, M>,
252{
253    let rpc = rpc.clone();
254    let handler = handler.clone();
255    let app_state = app_state.clone();
256    router.add_message_handler(move |message| {
257        let rpc = rpc.clone();
258        let handler = handler.clone();
259        let app_state = app_state.clone();
260        async move {
261            let sender_id = message.sender_id;
262            let message_id = message.message_id;
263            let start_time = Instant::now();
264            log::info!(
265                "RPC message received. id: {}.{}, type:{}",
266                sender_id,
267                message_id,
268                M::NAME
269            );
270            if let Err(err) = handler.handle(message, &rpc, &app_state).await {
271                log::error!("error handling message: {:?}", err);
272            } else {
273                log::info!(
274                    "RPC message handled. id:{}.{}, duration:{:?}",
275                    sender_id,
276                    message_id,
277                    start_time.elapsed()
278                );
279            }
280
281            Ok(())
282        }
283    });
284}
285
286pub fn add_rpc_routes(router: &mut Router, state: &Arc<AppState>, rpc: &Arc<Peer>) {
287    on_message(router, rpc, state, share_worktree);
288    on_message(router, rpc, state, join_worktree);
289    on_message(router, rpc, state, update_worktree);
290    on_message(router, rpc, state, close_worktree);
291    on_message(router, rpc, state, open_buffer);
292    on_message(router, rpc, state, close_buffer);
293    on_message(router, rpc, state, update_buffer);
294    on_message(router, rpc, state, buffer_saved);
295    on_message(router, rpc, state, save_buffer);
296    on_message(router, rpc, state, get_channels);
297    on_message(router, rpc, state, get_users);
298    on_message(router, rpc, state, join_channel);
299    on_message(router, rpc, state, send_channel_message);
300}
301
302pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
303    let mut router = Router::new();
304    add_rpc_routes(&mut router, app.state(), rpc);
305    let router = Arc::new(router);
306
307    let rpc = rpc.clone();
308    app.at("/rpc").with(auth::VerifyToken).get(move |request: Request<Arc<AppState>>| {
309        let user_id = request.ext::<UserId>().copied();
310        let rpc = rpc.clone();
311        let router = router.clone();
312        async move {
313            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
314
315            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
316            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
317            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
318
319            if !upgrade_requested {
320                return Ok(Response::new(StatusCode::UpgradeRequired));
321            }
322
323            let header = match request.header("Sec-Websocket-Key") {
324                Some(h) => h.as_str(),
325                None => return Err(anyhow!("expected sec-websocket-key"))?,
326            };
327
328            let mut response = Response::new(StatusCode::SwitchingProtocols);
329            response.insert_header(UPGRADE, "websocket");
330            response.insert_header(CONNECTION, "Upgrade");
331            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
332            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
333            response.insert_header("Sec-Websocket-Version", "13");
334
335            let http_res: &mut tide::http::Response = response.as_mut();
336            let upgrade_receiver = http_res.recv_upgrade().await;
337            let addr = request.remote().unwrap_or("unknown").to_string();
338            let state = request.state().clone();
339            let user_id = user_id.ok_or_else(|| anyhow!("user_id is not present on request. ensure auth::VerifyToken middleware is present"))?;
340            task::spawn(async move {
341                if let Some(stream) = upgrade_receiver.await {
342                    let stream = WebSocketStream::from_raw_socket(stream, Role::Server, None).await;
343                    handle_connection(rpc, router, state, addr, stream, user_id).await;
344                }
345            });
346
347            Ok(response)
348        }
349    });
350}
351
352pub async fn handle_connection<Conn>(
353    rpc: Arc<Peer>,
354    router: Arc<Router>,
355    state: Arc<AppState>,
356    addr: String,
357    stream: Conn,
358    user_id: UserId,
359) where
360    Conn: 'static
361        + futures::Sink<WebSocketMessage, Error = WebSocketError>
362        + futures::Stream<Item = Result<WebSocketMessage, WebSocketError>>
363        + Send
364        + Unpin,
365{
366    log::info!("accepted rpc connection: {:?}", addr);
367    let (connection_id, handle_io, handle_messages) = rpc.add_connection(stream, router).await;
368    state
369        .rpc
370        .write()
371        .await
372        .add_connection(connection_id, user_id);
373
374    let handle_messages = async move {
375        handle_messages.await;
376        Ok(())
377    };
378
379    if let Err(e) = futures::try_join!(handle_messages, handle_io) {
380        log::error!("error handling rpc connection {:?} - {:?}", addr, e);
381    }
382
383    log::info!("closing connection to {:?}", addr);
384    if let Err(e) = rpc.sign_out(connection_id, &state).await {
385        log::error!("error signing out connection {:?} - {:?}", addr, e);
386    }
387}
388
389async fn share_worktree(
390    mut request: TypedEnvelope<proto::ShareWorktree>,
391    rpc: &Arc<Peer>,
392    state: &Arc<AppState>,
393) -> tide::Result<()> {
394    let mut state = state.rpc.write().await;
395    let worktree_id = state.next_worktree_id;
396    state.next_worktree_id += 1;
397    let access_token = random_token();
398    let worktree = request
399        .payload
400        .worktree
401        .as_mut()
402        .ok_or_else(|| anyhow!("missing worktree"))?;
403    let entries = mem::take(&mut worktree.entries)
404        .into_iter()
405        .map(|entry| (entry.id, entry))
406        .collect();
407    state.worktrees.insert(
408        worktree_id,
409        Worktree {
410            host_connection_id: Some(request.sender_id),
411            guest_connection_ids: Default::default(),
412            active_replica_ids: Default::default(),
413            access_token: access_token.clone(),
414            root_name: mem::take(&mut worktree.root_name),
415            entries,
416        },
417    );
418
419    rpc.respond(
420        request.receipt(),
421        proto::ShareWorktreeResponse {
422            worktree_id,
423            access_token,
424        },
425    )
426    .await?;
427    Ok(())
428}
429
430async fn join_worktree(
431    request: TypedEnvelope<proto::OpenWorktree>,
432    rpc: &Arc<Peer>,
433    state: &Arc<AppState>,
434) -> tide::Result<()> {
435    let worktree_id = request.payload.worktree_id;
436    let access_token = &request.payload.access_token;
437
438    let mut state = state.rpc.write().await;
439    if let Some((peer_replica_id, worktree)) =
440        state.join_worktree(request.sender_id, worktree_id, access_token)
441    {
442        let mut peers = Vec::new();
443        if let Some(host_connection_id) = worktree.host_connection_id {
444            peers.push(proto::Peer {
445                peer_id: host_connection_id.0,
446                replica_id: 0,
447            });
448        }
449        for (peer_conn_id, peer_replica_id) in &worktree.guest_connection_ids {
450            if *peer_conn_id != request.sender_id {
451                peers.push(proto::Peer {
452                    peer_id: peer_conn_id.0,
453                    replica_id: *peer_replica_id as u32,
454                });
455            }
456        }
457
458        broadcast(request.sender_id, worktree.connection_ids(), |conn_id| {
459            rpc.send(
460                conn_id,
461                proto::AddPeer {
462                    worktree_id,
463                    peer: Some(proto::Peer {
464                        peer_id: request.sender_id.0,
465                        replica_id: peer_replica_id as u32,
466                    }),
467                },
468            )
469        })
470        .await?;
471        rpc.respond(
472            request.receipt(),
473            proto::OpenWorktreeResponse {
474                worktree_id,
475                worktree: Some(proto::Worktree {
476                    root_name: worktree.root_name.clone(),
477                    entries: worktree.entries.values().cloned().collect(),
478                }),
479                replica_id: peer_replica_id as u32,
480                peers,
481            },
482        )
483        .await?;
484    } else {
485        rpc.respond(
486            request.receipt(),
487            proto::OpenWorktreeResponse {
488                worktree_id,
489                worktree: None,
490                replica_id: 0,
491                peers: Vec::new(),
492            },
493        )
494        .await?;
495    }
496
497    Ok(())
498}
499
500async fn update_worktree(
501    request: TypedEnvelope<proto::UpdateWorktree>,
502    rpc: &Arc<Peer>,
503    state: &Arc<AppState>,
504) -> tide::Result<()> {
505    {
506        let mut state = state.rpc.write().await;
507        let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
508        for entry_id in &request.payload.removed_entries {
509            worktree.entries.remove(&entry_id);
510        }
511
512        for entry in &request.payload.updated_entries {
513            worktree.entries.insert(entry.id, entry.clone());
514        }
515    }
516
517    broadcast_in_worktree(request.payload.worktree_id, request, rpc, state).await?;
518    Ok(())
519}
520
521async fn close_worktree(
522    request: TypedEnvelope<proto::CloseWorktree>,
523    rpc: &Arc<Peer>,
524    state: &Arc<AppState>,
525) -> tide::Result<()> {
526    let connection_ids;
527    {
528        let mut state = state.rpc.write().await;
529        let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
530        connection_ids = worktree.connection_ids();
531        if worktree.host_connection_id == Some(request.sender_id) {
532            worktree.host_connection_id = None;
533        } else if let Some(replica_id) = worktree.guest_connection_ids.remove(&request.sender_id) {
534            worktree.active_replica_ids.remove(&replica_id);
535        }
536    }
537
538    broadcast(request.sender_id, connection_ids, |conn_id| {
539        rpc.send(
540            conn_id,
541            proto::RemovePeer {
542                worktree_id: request.payload.worktree_id,
543                peer_id: request.sender_id.0,
544            },
545        )
546    })
547    .await?;
548
549    Ok(())
550}
551
552async fn open_buffer(
553    request: TypedEnvelope<proto::OpenBuffer>,
554    rpc: &Arc<Peer>,
555    state: &Arc<AppState>,
556) -> tide::Result<()> {
557    let receipt = request.receipt();
558    let worktree_id = request.payload.worktree_id;
559    let host_connection_id = state
560        .rpc
561        .read()
562        .await
563        .read_worktree(worktree_id, request.sender_id)?
564        .host_connection_id()?;
565
566    let response = rpc
567        .forward_request(request.sender_id, host_connection_id, request.payload)
568        .await?;
569    rpc.respond(receipt, response).await?;
570    Ok(())
571}
572
573async fn close_buffer(
574    request: TypedEnvelope<proto::CloseBuffer>,
575    rpc: &Arc<Peer>,
576    state: &Arc<AppState>,
577) -> tide::Result<()> {
578    let host_connection_id = state
579        .rpc
580        .read()
581        .await
582        .read_worktree(request.payload.worktree_id, request.sender_id)?
583        .host_connection_id()?;
584
585    rpc.forward_send(request.sender_id, host_connection_id, request.payload)
586        .await?;
587
588    Ok(())
589}
590
591async fn save_buffer(
592    request: TypedEnvelope<proto::SaveBuffer>,
593    rpc: &Arc<Peer>,
594    state: &Arc<AppState>,
595) -> tide::Result<()> {
596    let host;
597    let guests;
598    {
599        let state = state.rpc.read().await;
600        let worktree = state.read_worktree(request.payload.worktree_id, request.sender_id)?;
601        host = worktree.host_connection_id()?;
602        guests = worktree
603            .guest_connection_ids
604            .keys()
605            .copied()
606            .collect::<Vec<_>>();
607    }
608
609    let sender = request.sender_id;
610    let receipt = request.receipt();
611    let response = rpc
612        .forward_request(sender, host, request.payload.clone())
613        .await?;
614
615    broadcast(host, guests, |conn_id| {
616        let response = response.clone();
617        async move {
618            if conn_id == sender {
619                rpc.respond(receipt, response).await
620            } else {
621                rpc.forward_send(host, conn_id, response).await
622            }
623        }
624    })
625    .await?;
626
627    Ok(())
628}
629
630async fn update_buffer(
631    request: TypedEnvelope<proto::UpdateBuffer>,
632    rpc: &Arc<Peer>,
633    state: &Arc<AppState>,
634) -> tide::Result<()> {
635    broadcast_in_worktree(request.payload.worktree_id, request, rpc, state).await
636}
637
638async fn buffer_saved(
639    request: TypedEnvelope<proto::BufferSaved>,
640    rpc: &Arc<Peer>,
641    state: &Arc<AppState>,
642) -> tide::Result<()> {
643    broadcast_in_worktree(request.payload.worktree_id, request, rpc, state).await
644}
645
646async fn get_channels(
647    request: TypedEnvelope<proto::GetChannels>,
648    rpc: &Arc<Peer>,
649    state: &Arc<AppState>,
650) -> tide::Result<()> {
651    let user_id = state
652        .rpc
653        .read()
654        .await
655        .user_id_for_connection(request.sender_id)?;
656    let channels = state.db.get_channels_for_user(user_id).await?;
657    rpc.respond(
658        request.receipt(),
659        proto::GetChannelsResponse {
660            channels: channels
661                .into_iter()
662                .map(|chan| proto::Channel {
663                    id: chan.id.to_proto(),
664                    name: chan.name,
665                })
666                .collect(),
667        },
668    )
669    .await?;
670    Ok(())
671}
672
673async fn get_users(
674    request: TypedEnvelope<proto::GetUsers>,
675    rpc: &Arc<Peer>,
676    state: &Arc<AppState>,
677) -> tide::Result<()> {
678    let user_id = state
679        .rpc
680        .read()
681        .await
682        .user_id_for_connection(request.sender_id)?;
683    let receipt = request.receipt();
684    let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
685    let users = state
686        .db
687        .get_users_by_ids(user_id, user_ids)
688        .await?
689        .into_iter()
690        .map(|user| proto::User {
691            id: user.id.to_proto(),
692            github_login: user.github_login,
693            avatar_url: String::new(),
694        })
695        .collect();
696    rpc.respond(receipt, proto::GetUsersResponse { users })
697        .await?;
698    Ok(())
699}
700
701async fn join_channel(
702    request: TypedEnvelope<proto::JoinChannel>,
703    rpc: &Arc<Peer>,
704    state: &Arc<AppState>,
705) -> tide::Result<()> {
706    let user_id = state
707        .rpc
708        .read()
709        .await
710        .user_id_for_connection(request.sender_id)?;
711    let channel_id = ChannelId::from_proto(request.payload.channel_id);
712    if !state
713        .db
714        .can_user_access_channel(user_id, channel_id)
715        .await?
716    {
717        Err(anyhow!("access denied"))?;
718    }
719
720    state
721        .rpc
722        .write()
723        .await
724        .join_channel(request.sender_id, channel_id);
725    let messages = state
726        .db
727        .get_recent_channel_messages(channel_id, 50)
728        .await?
729        .into_iter()
730        .map(|msg| proto::ChannelMessage {
731            id: msg.id.to_proto(),
732            body: msg.body,
733            timestamp: msg.sent_at.unix_timestamp() as u64,
734            sender_id: msg.sender_id.to_proto(),
735        })
736        .collect();
737    rpc.respond(request.receipt(), proto::JoinChannelResponse { messages })
738        .await?;
739    Ok(())
740}
741
742async fn send_channel_message(
743    request: TypedEnvelope<proto::SendChannelMessage>,
744    peer: &Arc<Peer>,
745    app: &Arc<AppState>,
746) -> tide::Result<()> {
747    let channel_id = ChannelId::from_proto(request.payload.channel_id);
748    let user_id;
749    let connection_ids;
750    {
751        let state = app.rpc.read().await;
752        user_id = state.user_id_for_connection(request.sender_id)?;
753        if let Some(channel) = state.channels.get(&channel_id) {
754            connection_ids = channel.connection_ids();
755        } else {
756            return Ok(());
757        }
758    }
759
760    let timestamp = OffsetDateTime::now_utc();
761    let message_id = app
762        .db
763        .create_channel_message(channel_id, user_id, &request.payload.body, timestamp)
764        .await?;
765    let message = proto::ChannelMessageSent {
766        channel_id: channel_id.to_proto(),
767        message: Some(proto::ChannelMessage {
768            sender_id: user_id.to_proto(),
769            id: message_id.to_proto(),
770            body: request.payload.body,
771            timestamp: timestamp.unix_timestamp() as u64,
772        }),
773    };
774    broadcast(request.sender_id, connection_ids, |conn_id| {
775        peer.send(conn_id, message.clone())
776    })
777    .await?;
778
779    Ok(())
780}
781
782async fn broadcast_in_worktree<T: proto::EnvelopedMessage>(
783    worktree_id: u64,
784    request: TypedEnvelope<T>,
785    rpc: &Arc<Peer>,
786    state: &Arc<AppState>,
787) -> tide::Result<()> {
788    let connection_ids = state
789        .rpc
790        .read()
791        .await
792        .read_worktree(worktree_id, request.sender_id)?
793        .connection_ids();
794
795    broadcast(request.sender_id, connection_ids, |conn_id| {
796        rpc.forward_send(request.sender_id, conn_id, request.payload.clone())
797    })
798    .await?;
799
800    Ok(())
801}
802
803pub async fn broadcast<F, T>(
804    sender_id: ConnectionId,
805    receiver_ids: Vec<ConnectionId>,
806    mut f: F,
807) -> anyhow::Result<()>
808where
809    F: FnMut(ConnectionId) -> T,
810    T: Future<Output = anyhow::Result<()>>,
811{
812    let futures = receiver_ids
813        .into_iter()
814        .filter(|id| *id != sender_id)
815        .map(|id| f(id));
816    futures::future::try_join_all(futures).await?;
817    Ok(())
818}
819
820fn header_contains_ignore_case<T>(
821    request: &tide::Request<T>,
822    header_name: HeaderName,
823    value: &str,
824) -> bool {
825    request
826        .header(header_name)
827        .map(|h| {
828            h.as_str()
829                .split(',')
830                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
831        })
832        .unwrap_or(false)
833}