rpc.rs

  1use super::{
  2    auth::{self, PeerExt as _},
  3    db::{ChannelId, UserId},
  4    AppState,
  5};
  6use anyhow::anyhow;
  7use async_std::task;
  8use async_tungstenite::{
  9    tungstenite::{protocol::Role, Error as WebSocketError, Message as WebSocketMessage},
 10    WebSocketStream,
 11};
 12use futures::{future::BoxFuture, FutureExt};
 13use postage::prelude::Stream as _;
 14use sha1::{Digest as _, Sha1};
 15use std::{
 16    any::{Any, TypeId},
 17    collections::{HashMap, HashSet},
 18    future::Future,
 19    mem,
 20    sync::Arc,
 21    time::Instant,
 22};
 23use surf::StatusCode;
 24use tide::log;
 25use tide::{
 26    http::headers::{HeaderName, CONNECTION, UPGRADE},
 27    Request, Response,
 28};
 29use time::OffsetDateTime;
 30use zrpc::{
 31    auth::random_token,
 32    proto::{self, EnvelopedMessage},
 33    ConnectionId, Peer, TypedEnvelope,
 34};
 35
 36type ReplicaId = u16;
 37
 38type MessageHandler = Box<
 39    dyn Send
 40        + Sync
 41        + Fn(
 42            &mut Option<Box<dyn Any + Send + Sync>>,
 43            Arc<Server>,
 44        ) -> Option<BoxFuture<'static, tide::Result<()>>>,
 45>;
 46
 47#[derive(Default)]
 48struct ServerBuilder {
 49    handlers: Vec<MessageHandler>,
 50    handler_types: HashSet<TypeId>,
 51}
 52
 53impl ServerBuilder {
 54    pub fn on_message<F, Fut, M>(mut self, handler: F) -> Self
 55    where
 56        F: 'static + Send + Sync + Fn(Box<TypedEnvelope<M>>, Arc<Server>) -> Fut,
 57        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 58        M: EnvelopedMessage,
 59    {
 60        if self.handler_types.insert(TypeId::of::<M>()) {
 61            panic!("registered a handler for the same message twice");
 62        }
 63
 64        self.handlers
 65            .push(Box::new(move |untyped_envelope, server| {
 66                if let Some(typed_envelope) = untyped_envelope.take() {
 67                    match typed_envelope.downcast::<TypedEnvelope<M>>() {
 68                        Ok(typed_envelope) => Some((handler)(typed_envelope, server).boxed()),
 69                        Err(envelope) => {
 70                            *untyped_envelope = Some(envelope);
 71                            None
 72                        }
 73                    }
 74                } else {
 75                    None
 76                }
 77            }));
 78        self
 79    }
 80
 81    pub fn build(self, rpc: &Arc<Peer>, state: &Arc<AppState>) -> Arc<Server> {
 82        Arc::new(Server {
 83            rpc: rpc.clone(),
 84            state: state.clone(),
 85            handlers: self.handlers,
 86        })
 87    }
 88}
 89
 90pub struct Server {
 91    rpc: Arc<Peer>,
 92    state: Arc<AppState>,
 93    handlers: Vec<MessageHandler>,
 94}
 95
 96impl Server {
 97    pub async fn handle_connection<Conn>(
 98        self: &Arc<Self>,
 99        connection: Conn,
100        addr: String,
101        user_id: UserId,
102    ) where
103        Conn: 'static
104            + futures::Sink<WebSocketMessage, Error = WebSocketError>
105            + futures::Stream<Item = Result<WebSocketMessage, WebSocketError>>
106            + Send
107            + Unpin,
108    {
109        let this = self.clone();
110        let (connection_id, handle_io, mut incoming_rx) = this.rpc.add_connection(connection).await;
111        this.state
112            .rpc
113            .write()
114            .await
115            .add_connection(connection_id, user_id);
116
117        let handle_io = handle_io.fuse();
118        futures::pin_mut!(handle_io);
119        loop {
120            let next_message = incoming_rx.recv().fuse();
121            futures::pin_mut!(next_message);
122            futures::select_biased! {
123                message = next_message => {
124                    if let Some(message) = message {
125                        let start_time = Instant::now();
126                        log::info!("RPC message received");
127                        let mut message = Some(message);
128                        for handler in &this.handlers {
129                            if let Some(future) = (handler)(&mut message, this.clone()) {
130                                 if let Err(err) = future.await {
131                                    log::error!("error handling message: {:?}", err);
132                                } else {
133                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
134                                }
135                                break;
136                            }
137                        }
138
139                        if let Some(message) = message {
140                            log::warn!("unhandled message: {:?}", message);
141                        }
142                    } else {
143                        log::info!("rpc connection closed {:?}", addr);
144                        break;
145                    }
146                }
147                handle_io = handle_io => {
148                    if let Err(err) = handle_io {
149                        log::error!("error handling rpc connection {:?} - {:?}", addr, err);
150                    }
151                    break;
152                }
153            }
154        }
155
156        if let Err(err) = this.rpc.sign_out(connection_id, &this.state).await {
157            log::error!("error signing out connection {:?} - {:?}", addr, err);
158        }
159    }
160}
161
162#[derive(Default)]
163pub struct State {
164    connections: HashMap<ConnectionId, Connection>,
165    pub worktrees: HashMap<u64, Worktree>,
166    channels: HashMap<ChannelId, Channel>,
167    next_worktree_id: u64,
168}
169
170struct Connection {
171    user_id: UserId,
172    worktrees: HashSet<u64>,
173    channels: HashSet<ChannelId>,
174}
175
176pub struct Worktree {
177    host_connection_id: Option<ConnectionId>,
178    guest_connection_ids: HashMap<ConnectionId, ReplicaId>,
179    active_replica_ids: HashSet<ReplicaId>,
180    access_token: String,
181    root_name: String,
182    entries: HashMap<u64, proto::Entry>,
183}
184
185#[derive(Default)]
186struct Channel {
187    connection_ids: HashSet<ConnectionId>,
188}
189
190impl Worktree {
191    pub fn connection_ids(&self) -> Vec<ConnectionId> {
192        self.guest_connection_ids
193            .keys()
194            .copied()
195            .chain(self.host_connection_id)
196            .collect()
197    }
198
199    fn host_connection_id(&self) -> tide::Result<ConnectionId> {
200        Ok(self
201            .host_connection_id
202            .ok_or_else(|| anyhow!("host disconnected from worktree"))?)
203    }
204}
205
206impl Channel {
207    fn connection_ids(&self) -> Vec<ConnectionId> {
208        self.connection_ids.iter().copied().collect()
209    }
210}
211
212impl State {
213    // Add a new connection associated with a given user.
214    pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) {
215        self.connections.insert(
216            connection_id,
217            Connection {
218                user_id,
219                worktrees: Default::default(),
220                channels: Default::default(),
221            },
222        );
223    }
224
225    // Remove the given connection and its association with any worktrees.
226    pub fn remove_connection(&mut self, connection_id: ConnectionId) -> Vec<u64> {
227        let mut worktree_ids = Vec::new();
228        if let Some(connection) = self.connections.remove(&connection_id) {
229            for channel_id in connection.channels {
230                if let Some(channel) = self.channels.get_mut(&channel_id) {
231                    channel.connection_ids.remove(&connection_id);
232                }
233            }
234            for worktree_id in connection.worktrees {
235                if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
236                    if worktree.host_connection_id == Some(connection_id) {
237                        worktree_ids.push(worktree_id);
238                    } else if let Some(replica_id) =
239                        worktree.guest_connection_ids.remove(&connection_id)
240                    {
241                        worktree.active_replica_ids.remove(&replica_id);
242                        worktree_ids.push(worktree_id);
243                    }
244                }
245            }
246        }
247        worktree_ids
248    }
249
250    fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
251        if let Some(connection) = self.connections.get_mut(&connection_id) {
252            connection.channels.insert(channel_id);
253            self.channels
254                .entry(channel_id)
255                .or_default()
256                .connection_ids
257                .insert(connection_id);
258        }
259    }
260
261    // Add the given connection as a guest of the given worktree
262    pub fn join_worktree(
263        &mut self,
264        connection_id: ConnectionId,
265        worktree_id: u64,
266        access_token: &str,
267    ) -> Option<(ReplicaId, &Worktree)> {
268        if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
269            if access_token == worktree.access_token {
270                if let Some(connection) = self.connections.get_mut(&connection_id) {
271                    connection.worktrees.insert(worktree_id);
272                }
273
274                let mut replica_id = 1;
275                while worktree.active_replica_ids.contains(&replica_id) {
276                    replica_id += 1;
277                }
278                worktree.active_replica_ids.insert(replica_id);
279                worktree
280                    .guest_connection_ids
281                    .insert(connection_id, replica_id);
282                Some((replica_id, worktree))
283            } else {
284                None
285            }
286        } else {
287            None
288        }
289    }
290
291    fn user_id_for_connection(&self, connection_id: ConnectionId) -> tide::Result<UserId> {
292        Ok(self
293            .connections
294            .get(&connection_id)
295            .ok_or_else(|| anyhow!("unknown connection"))?
296            .user_id)
297    }
298
299    fn read_worktree(
300        &self,
301        worktree_id: u64,
302        connection_id: ConnectionId,
303    ) -> tide::Result<&Worktree> {
304        let worktree = self
305            .worktrees
306            .get(&worktree_id)
307            .ok_or_else(|| anyhow!("worktree not found"))?;
308
309        if worktree.host_connection_id == Some(connection_id)
310            || worktree.guest_connection_ids.contains_key(&connection_id)
311        {
312            Ok(worktree)
313        } else {
314            Err(anyhow!(
315                "{} is not a member of worktree {}",
316                connection_id,
317                worktree_id
318            ))?
319        }
320    }
321
322    fn write_worktree(
323        &mut self,
324        worktree_id: u64,
325        connection_id: ConnectionId,
326    ) -> tide::Result<&mut Worktree> {
327        let worktree = self
328            .worktrees
329            .get_mut(&worktree_id)
330            .ok_or_else(|| anyhow!("worktree not found"))?;
331
332        if worktree.host_connection_id == Some(connection_id)
333            || worktree.guest_connection_ids.contains_key(&connection_id)
334        {
335            Ok(worktree)
336        } else {
337            Err(anyhow!(
338                "{} is not a member of worktree {}",
339                connection_id,
340                worktree_id
341            ))?
342        }
343    }
344}
345
346pub fn build_server(state: &Arc<AppState>, rpc: &Arc<Peer>) -> Arc<Server> {
347    ServerBuilder::default()
348        .on_message(share_worktree)
349        .on_message(join_worktree)
350        .on_message(update_worktree)
351        .on_message(close_worktree)
352        .on_message(open_buffer)
353        .on_message(close_buffer)
354        .on_message(update_buffer)
355        .on_message(buffer_saved)
356        .on_message(save_buffer)
357        .on_message(get_channels)
358        .on_message(get_users)
359        .on_message(join_channel)
360        .on_message(send_channel_message)
361        .build(rpc, state)
362}
363
364pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
365    let server = build_server(app.state(), rpc);
366    app.at("/rpc").with(auth::VerifyToken).get(move |request: Request<Arc<AppState>>| {
367        let user_id = request.ext::<UserId>().copied();
368        let server = server.clone();
369        async move {
370            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
371
372            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
373            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
374            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
375
376            if !upgrade_requested {
377                return Ok(Response::new(StatusCode::UpgradeRequired));
378            }
379
380            let header = match request.header("Sec-Websocket-Key") {
381                Some(h) => h.as_str(),
382                None => return Err(anyhow!("expected sec-websocket-key"))?,
383            };
384
385            let mut response = Response::new(StatusCode::SwitchingProtocols);
386            response.insert_header(UPGRADE, "websocket");
387            response.insert_header(CONNECTION, "Upgrade");
388            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
389            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
390            response.insert_header("Sec-Websocket-Version", "13");
391
392            let http_res: &mut tide::http::Response = response.as_mut();
393            let upgrade_receiver = http_res.recv_upgrade().await;
394            let addr = request.remote().unwrap_or("unknown").to_string();
395            let user_id = user_id.ok_or_else(|| anyhow!("user_id is not present on request. ensure auth::VerifyToken middleware is present"))?;
396            task::spawn(async move {
397                if let Some(stream) = upgrade_receiver.await {
398                    let stream = WebSocketStream::from_raw_socket(stream, Role::Server, None).await;
399                    server.handle_connection(stream, addr, user_id).await;
400                }
401            });
402
403            Ok(response)
404        }
405    });
406}
407
408async fn share_worktree(
409    mut request: Box<TypedEnvelope<proto::ShareWorktree>>,
410    server: Arc<Server>,
411) -> tide::Result<()> {
412    let mut state = server.state.rpc.write().await;
413    let worktree_id = state.next_worktree_id;
414    state.next_worktree_id += 1;
415    let access_token = random_token();
416    let worktree = request
417        .payload
418        .worktree
419        .as_mut()
420        .ok_or_else(|| anyhow!("missing worktree"))?;
421    let entries = mem::take(&mut worktree.entries)
422        .into_iter()
423        .map(|entry| (entry.id, entry))
424        .collect();
425    state.worktrees.insert(
426        worktree_id,
427        Worktree {
428            host_connection_id: Some(request.sender_id),
429            guest_connection_ids: Default::default(),
430            active_replica_ids: Default::default(),
431            access_token: access_token.clone(),
432            root_name: mem::take(&mut worktree.root_name),
433            entries,
434        },
435    );
436
437    server
438        .rpc
439        .respond(
440            request.receipt(),
441            proto::ShareWorktreeResponse {
442                worktree_id,
443                access_token,
444            },
445        )
446        .await?;
447    Ok(())
448}
449
450async fn join_worktree(
451    request: Box<TypedEnvelope<proto::OpenWorktree>>,
452    server: Arc<Server>,
453) -> tide::Result<()> {
454    let worktree_id = request.payload.worktree_id;
455    let access_token = &request.payload.access_token;
456
457    let mut state = server.state.rpc.write().await;
458    if let Some((peer_replica_id, worktree)) =
459        state.join_worktree(request.sender_id, worktree_id, access_token)
460    {
461        let mut peers = Vec::new();
462        if let Some(host_connection_id) = worktree.host_connection_id {
463            peers.push(proto::Peer {
464                peer_id: host_connection_id.0,
465                replica_id: 0,
466            });
467        }
468        for (peer_conn_id, peer_replica_id) in &worktree.guest_connection_ids {
469            if *peer_conn_id != request.sender_id {
470                peers.push(proto::Peer {
471                    peer_id: peer_conn_id.0,
472                    replica_id: *peer_replica_id as u32,
473                });
474            }
475        }
476
477        broadcast(request.sender_id, worktree.connection_ids(), |conn_id| {
478            server.rpc.send(
479                conn_id,
480                proto::AddPeer {
481                    worktree_id,
482                    peer: Some(proto::Peer {
483                        peer_id: request.sender_id.0,
484                        replica_id: peer_replica_id as u32,
485                    }),
486                },
487            )
488        })
489        .await?;
490        server
491            .rpc
492            .respond(
493                request.receipt(),
494                proto::OpenWorktreeResponse {
495                    worktree_id,
496                    worktree: Some(proto::Worktree {
497                        root_name: worktree.root_name.clone(),
498                        entries: worktree.entries.values().cloned().collect(),
499                    }),
500                    replica_id: peer_replica_id as u32,
501                    peers,
502                },
503            )
504            .await?;
505    } else {
506        server
507            .rpc
508            .respond(
509                request.receipt(),
510                proto::OpenWorktreeResponse {
511                    worktree_id,
512                    worktree: None,
513                    replica_id: 0,
514                    peers: Vec::new(),
515                },
516            )
517            .await?;
518    }
519
520    Ok(())
521}
522
523async fn update_worktree(
524    request: Box<TypedEnvelope<proto::UpdateWorktree>>,
525    server: Arc<Server>,
526) -> tide::Result<()> {
527    {
528        let mut state = server.state.rpc.write().await;
529        let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
530        for entry_id in &request.payload.removed_entries {
531            worktree.entries.remove(&entry_id);
532        }
533
534        for entry in &request.payload.updated_entries {
535            worktree.entries.insert(entry.id, entry.clone());
536        }
537    }
538
539    broadcast_in_worktree(request.payload.worktree_id, &request, &server).await?;
540    Ok(())
541}
542
543async fn close_worktree(
544    request: Box<TypedEnvelope<proto::CloseWorktree>>,
545    server: Arc<Server>,
546) -> tide::Result<()> {
547    let connection_ids;
548    {
549        let mut state = server.state.rpc.write().await;
550        let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
551        connection_ids = worktree.connection_ids();
552        if worktree.host_connection_id == Some(request.sender_id) {
553            worktree.host_connection_id = None;
554        } else if let Some(replica_id) = worktree.guest_connection_ids.remove(&request.sender_id) {
555            worktree.active_replica_ids.remove(&replica_id);
556        }
557    }
558
559    broadcast(request.sender_id, connection_ids, |conn_id| {
560        server.rpc.send(
561            conn_id,
562            proto::RemovePeer {
563                worktree_id: request.payload.worktree_id,
564                peer_id: request.sender_id.0,
565            },
566        )
567    })
568    .await?;
569
570    Ok(())
571}
572
573async fn open_buffer(
574    request: Box<TypedEnvelope<proto::OpenBuffer>>,
575    server: Arc<Server>,
576) -> tide::Result<()> {
577    let receipt = request.receipt();
578    let worktree_id = request.payload.worktree_id;
579    let host_connection_id = server
580        .state
581        .rpc
582        .read()
583        .await
584        .read_worktree(worktree_id, request.sender_id)?
585        .host_connection_id()?;
586
587    let response = server
588        .rpc
589        .forward_request(request.sender_id, host_connection_id, request.payload)
590        .await?;
591    server.rpc.respond(receipt, response).await?;
592    Ok(())
593}
594
595async fn close_buffer(
596    request: Box<TypedEnvelope<proto::CloseBuffer>>,
597    server: Arc<Server>,
598) -> tide::Result<()> {
599    let host_connection_id = server
600        .state
601        .rpc
602        .read()
603        .await
604        .read_worktree(request.payload.worktree_id, request.sender_id)?
605        .host_connection_id()?;
606
607    server
608        .rpc
609        .forward_send(request.sender_id, host_connection_id, request.payload)
610        .await?;
611
612    Ok(())
613}
614
615async fn save_buffer(
616    request: Box<TypedEnvelope<proto::SaveBuffer>>,
617    server: Arc<Server>,
618) -> tide::Result<()> {
619    let host;
620    let guests;
621    {
622        let state = server.state.rpc.read().await;
623        let worktree = state.read_worktree(request.payload.worktree_id, request.sender_id)?;
624        host = worktree.host_connection_id()?;
625        guests = worktree
626            .guest_connection_ids
627            .keys()
628            .copied()
629            .collect::<Vec<_>>();
630    }
631
632    let sender = request.sender_id;
633    let receipt = request.receipt();
634    let response = server
635        .rpc
636        .forward_request(sender, host, request.payload.clone())
637        .await?;
638
639    broadcast(host, guests, |conn_id| {
640        let response = response.clone();
641        let server = &server;
642        async move {
643            if conn_id == sender {
644                server.rpc.respond(receipt, response).await
645            } else {
646                server.rpc.forward_send(host, conn_id, response).await
647            }
648        }
649    })
650    .await?;
651
652    Ok(())
653}
654
655async fn update_buffer(
656    request: Box<TypedEnvelope<proto::UpdateBuffer>>,
657    server: Arc<Server>,
658) -> tide::Result<()> {
659    broadcast_in_worktree(request.payload.worktree_id, &request, &server).await
660}
661
662async fn buffer_saved(
663    request: Box<TypedEnvelope<proto::BufferSaved>>,
664    server: Arc<Server>,
665) -> tide::Result<()> {
666    broadcast_in_worktree(request.payload.worktree_id, &request, &server).await
667}
668
669async fn get_channels(
670    request: Box<TypedEnvelope<proto::GetChannels>>,
671    server: Arc<Server>,
672) -> tide::Result<()> {
673    let user_id = server
674        .state
675        .rpc
676        .read()
677        .await
678        .user_id_for_connection(request.sender_id)?;
679    let channels = server.state.db.get_channels_for_user(user_id).await?;
680    server
681        .rpc
682        .respond(
683            request.receipt(),
684            proto::GetChannelsResponse {
685                channels: channels
686                    .into_iter()
687                    .map(|chan| proto::Channel {
688                        id: chan.id.to_proto(),
689                        name: chan.name,
690                    })
691                    .collect(),
692            },
693        )
694        .await?;
695    Ok(())
696}
697
698async fn get_users(
699    request: Box<TypedEnvelope<proto::GetUsers>>,
700    server: Arc<Server>,
701) -> tide::Result<()> {
702    let user_id = server
703        .state
704        .rpc
705        .read()
706        .await
707        .user_id_for_connection(request.sender_id)?;
708    let receipt = request.receipt();
709    let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
710    let users = server
711        .state
712        .db
713        .get_users_by_ids(user_id, user_ids)
714        .await?
715        .into_iter()
716        .map(|user| proto::User {
717            id: user.id.to_proto(),
718            github_login: user.github_login,
719            avatar_url: String::new(),
720        })
721        .collect();
722    server
723        .rpc
724        .respond(receipt, proto::GetUsersResponse { users })
725        .await?;
726    Ok(())
727}
728
729async fn join_channel(
730    request: Box<TypedEnvelope<proto::JoinChannel>>,
731    server: Arc<Server>,
732) -> tide::Result<()> {
733    let user_id = server
734        .state
735        .rpc
736        .read()
737        .await
738        .user_id_for_connection(request.sender_id)?;
739    let channel_id = ChannelId::from_proto(request.payload.channel_id);
740    if !server
741        .state
742        .db
743        .can_user_access_channel(user_id, channel_id)
744        .await?
745    {
746        Err(anyhow!("access denied"))?;
747    }
748
749    server
750        .state
751        .rpc
752        .write()
753        .await
754        .join_channel(request.sender_id, channel_id);
755    let messages = server
756        .state
757        .db
758        .get_recent_channel_messages(channel_id, 50)
759        .await?
760        .into_iter()
761        .map(|msg| proto::ChannelMessage {
762            id: msg.id.to_proto(),
763            body: msg.body,
764            timestamp: msg.sent_at.unix_timestamp() as u64,
765            sender_id: msg.sender_id.to_proto(),
766        })
767        .collect();
768    server
769        .rpc
770        .respond(request.receipt(), proto::JoinChannelResponse { messages })
771        .await?;
772    Ok(())
773}
774
775async fn send_channel_message(
776    request: Box<TypedEnvelope<proto::SendChannelMessage>>,
777    server: Arc<Server>,
778) -> tide::Result<()> {
779    let channel_id = ChannelId::from_proto(request.payload.channel_id);
780    let user_id;
781    let connection_ids;
782    {
783        let state = server.state.rpc.read().await;
784        user_id = state.user_id_for_connection(request.sender_id)?;
785        if let Some(channel) = state.channels.get(&channel_id) {
786            connection_ids = channel.connection_ids();
787        } else {
788            return Ok(());
789        }
790    }
791
792    let timestamp = OffsetDateTime::now_utc();
793    let message_id = server
794        .state
795        .db
796        .create_channel_message(channel_id, user_id, &request.payload.body, timestamp)
797        .await?;
798    let message = proto::ChannelMessageSent {
799        channel_id: channel_id.to_proto(),
800        message: Some(proto::ChannelMessage {
801            sender_id: user_id.to_proto(),
802            id: message_id.to_proto(),
803            body: request.payload.body,
804            timestamp: timestamp.unix_timestamp() as u64,
805        }),
806    };
807    broadcast(request.sender_id, connection_ids, |conn_id| {
808        server.rpc.send(conn_id, message.clone())
809    })
810    .await?;
811
812    Ok(())
813}
814
815async fn broadcast_in_worktree<T: proto::EnvelopedMessage>(
816    worktree_id: u64,
817    request: &TypedEnvelope<T>,
818    server: &Arc<Server>,
819) -> tide::Result<()> {
820    let connection_ids = server
821        .state
822        .rpc
823        .read()
824        .await
825        .read_worktree(worktree_id, request.sender_id)?
826        .connection_ids();
827
828    broadcast(request.sender_id, connection_ids, |conn_id| {
829        server
830            .rpc
831            .forward_send(request.sender_id, conn_id, request.payload.clone())
832    })
833    .await?;
834
835    Ok(())
836}
837
838pub async fn broadcast<F, T>(
839    sender_id: ConnectionId,
840    receiver_ids: Vec<ConnectionId>,
841    mut f: F,
842) -> anyhow::Result<()>
843where
844    F: FnMut(ConnectionId) -> T,
845    T: Future<Output = anyhow::Result<()>>,
846{
847    let futures = receiver_ids
848        .into_iter()
849        .filter(|id| *id != sender_id)
850        .map(|id| f(id));
851    futures::future::try_join_all(futures).await?;
852    Ok(())
853}
854
855fn header_contains_ignore_case<T>(
856    request: &tide::Request<T>,
857    header_name: HeaderName,
858    value: &str,
859) -> bool {
860    request
861        .header(header_name)
862        .map(|h| {
863            h.as_str()
864                .split(',')
865                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
866        })
867        .unwrap_or(false)
868}