rpc.rs

  1use super::{
  2    auth::{self, PeerExt as _},
  3    db::{ChannelId, UserId},
  4    AppState,
  5};
  6use anyhow::anyhow;
  7use async_std::task;
  8use async_tungstenite::{
  9    tungstenite::{protocol::Role, Error as WebSocketError, Message as WebSocketMessage},
 10    WebSocketStream,
 11};
 12use futures::{future::BoxFuture, FutureExt};
 13use postage::prelude::Stream as _;
 14use sha1::{Digest as _, Sha1};
 15use std::{
 16    any::{Any, TypeId},
 17    collections::{HashMap, HashSet},
 18    future::Future,
 19    mem,
 20    sync::Arc,
 21    time::Instant,
 22};
 23use surf::StatusCode;
 24use tide::log;
 25use tide::{
 26    http::headers::{HeaderName, CONNECTION, UPGRADE},
 27    Request, Response,
 28};
 29use time::OffsetDateTime;
 30use zrpc::{
 31    auth::random_token,
 32    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
 33    ConnectionId, Peer, TypedEnvelope,
 34};
 35
 36type ReplicaId = u16;
 37
 38type MessageHandler = Box<
 39    dyn Send
 40        + Sync
 41        + Fn(Box<dyn AnyTypedEnvelope>, Arc<Server>) -> BoxFuture<'static, tide::Result<()>>,
 42>;
 43
 44#[derive(Default)]
 45struct ServerBuilder {
 46    handlers: HashMap<TypeId, MessageHandler>,
 47}
 48
 49impl ServerBuilder {
 50    pub fn on_message<F, Fut, M>(mut self, handler: F) -> Self
 51    where
 52        F: 'static + Send + Sync + Fn(Box<TypedEnvelope<M>>, Arc<Server>) -> Fut,
 53        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 54        M: EnvelopedMessage,
 55    {
 56        let prev_handler = self.handlers.insert(
 57            TypeId::of::<M>(),
 58            Box::new(move |envelope, server| {
 59                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 60                (handler)(envelope, server).boxed()
 61            }),
 62        );
 63        if prev_handler.is_some() {
 64            panic!("registered a handler for the same message twice");
 65        }
 66
 67        self
 68    }
 69
 70    pub fn build(self, rpc: &Arc<Peer>, state: &Arc<AppState>) -> Arc<Server> {
 71        Arc::new(Server {
 72            rpc: rpc.clone(),
 73            state: state.clone(),
 74            handlers: self.handlers,
 75        })
 76    }
 77}
 78
 79pub struct Server {
 80    rpc: Arc<Peer>,
 81    state: Arc<AppState>,
 82    handlers: HashMap<TypeId, MessageHandler>,
 83}
 84
 85impl Server {
 86    pub fn handle_connection<Conn>(
 87        self: &Arc<Self>,
 88        connection: Conn,
 89        addr: String,
 90        user_id: UserId,
 91    ) -> impl Future<Output = ()>
 92    where
 93        Conn: 'static
 94            + futures::Sink<WebSocketMessage, Error = WebSocketError>
 95            + futures::Stream<Item = Result<WebSocketMessage, WebSocketError>>
 96            + Send
 97            + Unpin,
 98    {
 99        let this = self.clone();
100        async move {
101            let (connection_id, handle_io, mut incoming_rx) =
102                this.rpc.add_connection(connection).await;
103            this.state
104                .rpc
105                .write()
106                .await
107                .add_connection(connection_id, user_id);
108
109            let handle_io = handle_io.fuse();
110            futures::pin_mut!(handle_io);
111            loop {
112                let next_message = incoming_rx.recv().fuse();
113                futures::pin_mut!(next_message);
114                futures::select_biased! {
115                    message = next_message => {
116                        if let Some(message) = message {
117                            let start_time = Instant::now();
118                            log::info!("RPC message received: {}", message.payload_type_name());
119                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
120                                if let Err(err) = (handler)(message, this.clone()).await {
121                                    log::error!("error handling message: {:?}", err);
122                                } else {
123                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
124                                }
125                            } else {
126                                log::warn!("unhandled message: {}", message.payload_type_name());
127                            }
128                        } else {
129                            log::info!("rpc connection closed {:?}", addr);
130                            break;
131                        }
132                    }
133                    handle_io = handle_io => {
134                        if let Err(err) = handle_io {
135                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
136                        }
137                        break;
138                    }
139                }
140            }
141
142            if let Err(err) = this.rpc.sign_out(connection_id, &this.state).await {
143                log::error!("error signing out connection {:?} - {:?}", addr, err);
144            }
145        }
146    }
147}
148
149#[derive(Default)]
150pub struct State {
151    connections: HashMap<ConnectionId, Connection>,
152    pub worktrees: HashMap<u64, Worktree>,
153    channels: HashMap<ChannelId, Channel>,
154    next_worktree_id: u64,
155}
156
157struct Connection {
158    user_id: UserId,
159    worktrees: HashSet<u64>,
160    channels: HashSet<ChannelId>,
161}
162
163pub struct Worktree {
164    host_connection_id: Option<ConnectionId>,
165    guest_connection_ids: HashMap<ConnectionId, ReplicaId>,
166    active_replica_ids: HashSet<ReplicaId>,
167    access_token: String,
168    root_name: String,
169    entries: HashMap<u64, proto::Entry>,
170}
171
172#[derive(Default)]
173struct Channel {
174    connection_ids: HashSet<ConnectionId>,
175}
176
177impl Worktree {
178    pub fn connection_ids(&self) -> Vec<ConnectionId> {
179        self.guest_connection_ids
180            .keys()
181            .copied()
182            .chain(self.host_connection_id)
183            .collect()
184    }
185
186    fn host_connection_id(&self) -> tide::Result<ConnectionId> {
187        Ok(self
188            .host_connection_id
189            .ok_or_else(|| anyhow!("host disconnected from worktree"))?)
190    }
191}
192
193impl Channel {
194    fn connection_ids(&self) -> Vec<ConnectionId> {
195        self.connection_ids.iter().copied().collect()
196    }
197}
198
199impl State {
200    // Add a new connection associated with a given user.
201    pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) {
202        self.connections.insert(
203            connection_id,
204            Connection {
205                user_id,
206                worktrees: Default::default(),
207                channels: Default::default(),
208            },
209        );
210    }
211
212    // Remove the given connection and its association with any worktrees.
213    pub fn remove_connection(&mut self, connection_id: ConnectionId) -> Vec<u64> {
214        let mut worktree_ids = Vec::new();
215        if let Some(connection) = self.connections.remove(&connection_id) {
216            for channel_id in connection.channels {
217                if let Some(channel) = self.channels.get_mut(&channel_id) {
218                    channel.connection_ids.remove(&connection_id);
219                }
220            }
221            for worktree_id in connection.worktrees {
222                if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
223                    if worktree.host_connection_id == Some(connection_id) {
224                        worktree_ids.push(worktree_id);
225                    } else if let Some(replica_id) =
226                        worktree.guest_connection_ids.remove(&connection_id)
227                    {
228                        worktree.active_replica_ids.remove(&replica_id);
229                        worktree_ids.push(worktree_id);
230                    }
231                }
232            }
233        }
234        worktree_ids
235    }
236
237    fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
238        if let Some(connection) = self.connections.get_mut(&connection_id) {
239            connection.channels.insert(channel_id);
240            self.channels
241                .entry(channel_id)
242                .or_default()
243                .connection_ids
244                .insert(connection_id);
245        }
246    }
247
248    // Add the given connection as a guest of the given worktree
249    pub fn join_worktree(
250        &mut self,
251        connection_id: ConnectionId,
252        worktree_id: u64,
253        access_token: &str,
254    ) -> Option<(ReplicaId, &Worktree)> {
255        if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
256            if access_token == worktree.access_token {
257                if let Some(connection) = self.connections.get_mut(&connection_id) {
258                    connection.worktrees.insert(worktree_id);
259                }
260
261                let mut replica_id = 1;
262                while worktree.active_replica_ids.contains(&replica_id) {
263                    replica_id += 1;
264                }
265                worktree.active_replica_ids.insert(replica_id);
266                worktree
267                    .guest_connection_ids
268                    .insert(connection_id, replica_id);
269                Some((replica_id, worktree))
270            } else {
271                None
272            }
273        } else {
274            None
275        }
276    }
277
278    fn user_id_for_connection(&self, connection_id: ConnectionId) -> tide::Result<UserId> {
279        Ok(self
280            .connections
281            .get(&connection_id)
282            .ok_or_else(|| anyhow!("unknown connection"))?
283            .user_id)
284    }
285
286    fn read_worktree(
287        &self,
288        worktree_id: u64,
289        connection_id: ConnectionId,
290    ) -> tide::Result<&Worktree> {
291        let worktree = self
292            .worktrees
293            .get(&worktree_id)
294            .ok_or_else(|| anyhow!("worktree not found"))?;
295
296        if worktree.host_connection_id == Some(connection_id)
297            || worktree.guest_connection_ids.contains_key(&connection_id)
298        {
299            Ok(worktree)
300        } else {
301            Err(anyhow!(
302                "{} is not a member of worktree {}",
303                connection_id,
304                worktree_id
305            ))?
306        }
307    }
308
309    fn write_worktree(
310        &mut self,
311        worktree_id: u64,
312        connection_id: ConnectionId,
313    ) -> tide::Result<&mut Worktree> {
314        let worktree = self
315            .worktrees
316            .get_mut(&worktree_id)
317            .ok_or_else(|| anyhow!("worktree not found"))?;
318
319        if worktree.host_connection_id == Some(connection_id)
320            || worktree.guest_connection_ids.contains_key(&connection_id)
321        {
322            Ok(worktree)
323        } else {
324            Err(anyhow!(
325                "{} is not a member of worktree {}",
326                connection_id,
327                worktree_id
328            ))?
329        }
330    }
331}
332
333pub fn build_server(state: &Arc<AppState>, rpc: &Arc<Peer>) -> Arc<Server> {
334    ServerBuilder::default()
335        .on_message(share_worktree)
336        .on_message(join_worktree)
337        .on_message(update_worktree)
338        .on_message(close_worktree)
339        .on_message(open_buffer)
340        .on_message(close_buffer)
341        .on_message(update_buffer)
342        .on_message(buffer_saved)
343        .on_message(save_buffer)
344        .on_message(get_channels)
345        .on_message(get_users)
346        .on_message(join_channel)
347        .on_message(send_channel_message)
348        .build(rpc, state)
349}
350
351pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
352    let server = build_server(app.state(), rpc);
353    app.at("/rpc").with(auth::VerifyToken).get(move |request: Request<Arc<AppState>>| {
354        let user_id = request.ext::<UserId>().copied();
355        let server = server.clone();
356        async move {
357            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
358
359            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
360            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
361            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
362
363            if !upgrade_requested {
364                return Ok(Response::new(StatusCode::UpgradeRequired));
365            }
366
367            let header = match request.header("Sec-Websocket-Key") {
368                Some(h) => h.as_str(),
369                None => return Err(anyhow!("expected sec-websocket-key"))?,
370            };
371
372            let mut response = Response::new(StatusCode::SwitchingProtocols);
373            response.insert_header(UPGRADE, "websocket");
374            response.insert_header(CONNECTION, "Upgrade");
375            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
376            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
377            response.insert_header("Sec-Websocket-Version", "13");
378
379            let http_res: &mut tide::http::Response = response.as_mut();
380            let upgrade_receiver = http_res.recv_upgrade().await;
381            let addr = request.remote().unwrap_or("unknown").to_string();
382            let user_id = user_id.ok_or_else(|| anyhow!("user_id is not present on request. ensure auth::VerifyToken middleware is present"))?;
383            task::spawn(async move {
384                if let Some(stream) = upgrade_receiver.await {
385                    let stream = WebSocketStream::from_raw_socket(stream, Role::Server, None).await;
386                    server.handle_connection(stream, addr, user_id).await;
387                }
388            });
389
390            Ok(response)
391        }
392    });
393}
394
395async fn share_worktree(
396    mut request: Box<TypedEnvelope<proto::ShareWorktree>>,
397    server: Arc<Server>,
398) -> tide::Result<()> {
399    let mut state = server.state.rpc.write().await;
400    let worktree_id = state.next_worktree_id;
401    state.next_worktree_id += 1;
402    let access_token = random_token();
403    let worktree = request
404        .payload
405        .worktree
406        .as_mut()
407        .ok_or_else(|| anyhow!("missing worktree"))?;
408    let entries = mem::take(&mut worktree.entries)
409        .into_iter()
410        .map(|entry| (entry.id, entry))
411        .collect();
412    state.worktrees.insert(
413        worktree_id,
414        Worktree {
415            host_connection_id: Some(request.sender_id),
416            guest_connection_ids: Default::default(),
417            active_replica_ids: Default::default(),
418            access_token: access_token.clone(),
419            root_name: mem::take(&mut worktree.root_name),
420            entries,
421        },
422    );
423
424    server
425        .rpc
426        .respond(
427            request.receipt(),
428            proto::ShareWorktreeResponse {
429                worktree_id,
430                access_token,
431            },
432        )
433        .await?;
434    Ok(())
435}
436
437async fn join_worktree(
438    request: Box<TypedEnvelope<proto::OpenWorktree>>,
439    server: Arc<Server>,
440) -> tide::Result<()> {
441    let worktree_id = request.payload.worktree_id;
442    let access_token = &request.payload.access_token;
443
444    let mut state = server.state.rpc.write().await;
445    if let Some((peer_replica_id, worktree)) =
446        state.join_worktree(request.sender_id, worktree_id, access_token)
447    {
448        let mut peers = Vec::new();
449        if let Some(host_connection_id) = worktree.host_connection_id {
450            peers.push(proto::Peer {
451                peer_id: host_connection_id.0,
452                replica_id: 0,
453            });
454        }
455        for (peer_conn_id, peer_replica_id) in &worktree.guest_connection_ids {
456            if *peer_conn_id != request.sender_id {
457                peers.push(proto::Peer {
458                    peer_id: peer_conn_id.0,
459                    replica_id: *peer_replica_id as u32,
460                });
461            }
462        }
463
464        broadcast(request.sender_id, worktree.connection_ids(), |conn_id| {
465            server.rpc.send(
466                conn_id,
467                proto::AddPeer {
468                    worktree_id,
469                    peer: Some(proto::Peer {
470                        peer_id: request.sender_id.0,
471                        replica_id: peer_replica_id as u32,
472                    }),
473                },
474            )
475        })
476        .await?;
477        server
478            .rpc
479            .respond(
480                request.receipt(),
481                proto::OpenWorktreeResponse {
482                    worktree_id,
483                    worktree: Some(proto::Worktree {
484                        root_name: worktree.root_name.clone(),
485                        entries: worktree.entries.values().cloned().collect(),
486                    }),
487                    replica_id: peer_replica_id as u32,
488                    peers,
489                },
490            )
491            .await?;
492    } else {
493        server
494            .rpc
495            .respond(
496                request.receipt(),
497                proto::OpenWorktreeResponse {
498                    worktree_id,
499                    worktree: None,
500                    replica_id: 0,
501                    peers: Vec::new(),
502                },
503            )
504            .await?;
505    }
506
507    Ok(())
508}
509
510async fn update_worktree(
511    request: Box<TypedEnvelope<proto::UpdateWorktree>>,
512    server: Arc<Server>,
513) -> tide::Result<()> {
514    {
515        let mut state = server.state.rpc.write().await;
516        let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
517        for entry_id in &request.payload.removed_entries {
518            worktree.entries.remove(&entry_id);
519        }
520
521        for entry in &request.payload.updated_entries {
522            worktree.entries.insert(entry.id, entry.clone());
523        }
524    }
525
526    broadcast_in_worktree(request.payload.worktree_id, &request, &server).await?;
527    Ok(())
528}
529
530async fn close_worktree(
531    request: Box<TypedEnvelope<proto::CloseWorktree>>,
532    server: Arc<Server>,
533) -> tide::Result<()> {
534    let connection_ids;
535    {
536        let mut state = server.state.rpc.write().await;
537        let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
538        connection_ids = worktree.connection_ids();
539        if worktree.host_connection_id == Some(request.sender_id) {
540            worktree.host_connection_id = None;
541        } else if let Some(replica_id) = worktree.guest_connection_ids.remove(&request.sender_id) {
542            worktree.active_replica_ids.remove(&replica_id);
543        }
544    }
545
546    broadcast(request.sender_id, connection_ids, |conn_id| {
547        server.rpc.send(
548            conn_id,
549            proto::RemovePeer {
550                worktree_id: request.payload.worktree_id,
551                peer_id: request.sender_id.0,
552            },
553        )
554    })
555    .await?;
556
557    Ok(())
558}
559
560async fn open_buffer(
561    request: Box<TypedEnvelope<proto::OpenBuffer>>,
562    server: Arc<Server>,
563) -> tide::Result<()> {
564    let receipt = request.receipt();
565    let worktree_id = request.payload.worktree_id;
566    let host_connection_id = server
567        .state
568        .rpc
569        .read()
570        .await
571        .read_worktree(worktree_id, request.sender_id)?
572        .host_connection_id()?;
573
574    let response = server
575        .rpc
576        .forward_request(request.sender_id, host_connection_id, request.payload)
577        .await?;
578    server.rpc.respond(receipt, response).await?;
579    Ok(())
580}
581
582async fn close_buffer(
583    request: Box<TypedEnvelope<proto::CloseBuffer>>,
584    server: Arc<Server>,
585) -> tide::Result<()> {
586    let host_connection_id = server
587        .state
588        .rpc
589        .read()
590        .await
591        .read_worktree(request.payload.worktree_id, request.sender_id)?
592        .host_connection_id()?;
593
594    server
595        .rpc
596        .forward_send(request.sender_id, host_connection_id, request.payload)
597        .await?;
598
599    Ok(())
600}
601
602async fn save_buffer(
603    request: Box<TypedEnvelope<proto::SaveBuffer>>,
604    server: Arc<Server>,
605) -> tide::Result<()> {
606    let host;
607    let guests;
608    {
609        let state = server.state.rpc.read().await;
610        let worktree = state.read_worktree(request.payload.worktree_id, request.sender_id)?;
611        host = worktree.host_connection_id()?;
612        guests = worktree
613            .guest_connection_ids
614            .keys()
615            .copied()
616            .collect::<Vec<_>>();
617    }
618
619    let sender = request.sender_id;
620    let receipt = request.receipt();
621    let response = server
622        .rpc
623        .forward_request(sender, host, request.payload.clone())
624        .await?;
625
626    broadcast(host, guests, |conn_id| {
627        let response = response.clone();
628        let server = &server;
629        async move {
630            if conn_id == sender {
631                server.rpc.respond(receipt, response).await
632            } else {
633                server.rpc.forward_send(host, conn_id, response).await
634            }
635        }
636    })
637    .await?;
638
639    Ok(())
640}
641
642async fn update_buffer(
643    request: Box<TypedEnvelope<proto::UpdateBuffer>>,
644    server: Arc<Server>,
645) -> tide::Result<()> {
646    broadcast_in_worktree(request.payload.worktree_id, &request, &server).await
647}
648
649async fn buffer_saved(
650    request: Box<TypedEnvelope<proto::BufferSaved>>,
651    server: Arc<Server>,
652) -> tide::Result<()> {
653    broadcast_in_worktree(request.payload.worktree_id, &request, &server).await
654}
655
656async fn get_channels(
657    request: Box<TypedEnvelope<proto::GetChannels>>,
658    server: Arc<Server>,
659) -> tide::Result<()> {
660    let user_id = server
661        .state
662        .rpc
663        .read()
664        .await
665        .user_id_for_connection(request.sender_id)?;
666    let channels = server.state.db.get_channels_for_user(user_id).await?;
667    server
668        .rpc
669        .respond(
670            request.receipt(),
671            proto::GetChannelsResponse {
672                channels: channels
673                    .into_iter()
674                    .map(|chan| proto::Channel {
675                        id: chan.id.to_proto(),
676                        name: chan.name,
677                    })
678                    .collect(),
679            },
680        )
681        .await?;
682    Ok(())
683}
684
685async fn get_users(
686    request: Box<TypedEnvelope<proto::GetUsers>>,
687    server: Arc<Server>,
688) -> tide::Result<()> {
689    let user_id = server
690        .state
691        .rpc
692        .read()
693        .await
694        .user_id_for_connection(request.sender_id)?;
695    let receipt = request.receipt();
696    let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
697    let users = server
698        .state
699        .db
700        .get_users_by_ids(user_id, user_ids)
701        .await?
702        .into_iter()
703        .map(|user| proto::User {
704            id: user.id.to_proto(),
705            github_login: user.github_login,
706            avatar_url: String::new(),
707        })
708        .collect();
709    server
710        .rpc
711        .respond(receipt, proto::GetUsersResponse { users })
712        .await?;
713    Ok(())
714}
715
716async fn join_channel(
717    request: Box<TypedEnvelope<proto::JoinChannel>>,
718    server: Arc<Server>,
719) -> tide::Result<()> {
720    let user_id = server
721        .state
722        .rpc
723        .read()
724        .await
725        .user_id_for_connection(request.sender_id)?;
726    let channel_id = ChannelId::from_proto(request.payload.channel_id);
727    if !server
728        .state
729        .db
730        .can_user_access_channel(user_id, channel_id)
731        .await?
732    {
733        Err(anyhow!("access denied"))?;
734    }
735
736    server
737        .state
738        .rpc
739        .write()
740        .await
741        .join_channel(request.sender_id, channel_id);
742    let messages = server
743        .state
744        .db
745        .get_recent_channel_messages(channel_id, 50)
746        .await?
747        .into_iter()
748        .map(|msg| proto::ChannelMessage {
749            id: msg.id.to_proto(),
750            body: msg.body,
751            timestamp: msg.sent_at.unix_timestamp() as u64,
752            sender_id: msg.sender_id.to_proto(),
753        })
754        .collect();
755    server
756        .rpc
757        .respond(request.receipt(), proto::JoinChannelResponse { messages })
758        .await?;
759    Ok(())
760}
761
762async fn send_channel_message(
763    request: Box<TypedEnvelope<proto::SendChannelMessage>>,
764    server: Arc<Server>,
765) -> tide::Result<()> {
766    let channel_id = ChannelId::from_proto(request.payload.channel_id);
767    let user_id;
768    let connection_ids;
769    {
770        let state = server.state.rpc.read().await;
771        user_id = state.user_id_for_connection(request.sender_id)?;
772        if let Some(channel) = state.channels.get(&channel_id) {
773            connection_ids = channel.connection_ids();
774        } else {
775            return Ok(());
776        }
777    }
778
779    let timestamp = OffsetDateTime::now_utc();
780    let message_id = server
781        .state
782        .db
783        .create_channel_message(channel_id, user_id, &request.payload.body, timestamp)
784        .await?;
785    let message = proto::ChannelMessageSent {
786        channel_id: channel_id.to_proto(),
787        message: Some(proto::ChannelMessage {
788            sender_id: user_id.to_proto(),
789            id: message_id.to_proto(),
790            body: request.payload.body,
791            timestamp: timestamp.unix_timestamp() as u64,
792        }),
793    };
794    broadcast(request.sender_id, connection_ids, |conn_id| {
795        server.rpc.send(conn_id, message.clone())
796    })
797    .await?;
798
799    Ok(())
800}
801
802async fn broadcast_in_worktree<T: proto::EnvelopedMessage>(
803    worktree_id: u64,
804    request: &TypedEnvelope<T>,
805    server: &Arc<Server>,
806) -> tide::Result<()> {
807    let connection_ids = server
808        .state
809        .rpc
810        .read()
811        .await
812        .read_worktree(worktree_id, request.sender_id)?
813        .connection_ids();
814
815    broadcast(request.sender_id, connection_ids, |conn_id| {
816        server
817            .rpc
818            .forward_send(request.sender_id, conn_id, request.payload.clone())
819    })
820    .await?;
821
822    Ok(())
823}
824
825pub async fn broadcast<F, T>(
826    sender_id: ConnectionId,
827    receiver_ids: Vec<ConnectionId>,
828    mut f: F,
829) -> anyhow::Result<()>
830where
831    F: FnMut(ConnectionId) -> T,
832    T: Future<Output = anyhow::Result<()>>,
833{
834    let futures = receiver_ids
835        .into_iter()
836        .filter(|id| *id != sender_id)
837        .map(|id| f(id));
838    futures::future::try_join_all(futures).await?;
839    Ok(())
840}
841
842fn header_contains_ignore_case<T>(
843    request: &tide::Request<T>,
844    header_name: HeaderName,
845    value: &str,
846) -> bool {
847    request
848        .header(header_name)
849        .map(|h| {
850            h.as_str()
851                .split(',')
852                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
853        })
854        .unwrap_or(false)
855}