1use super::{
2 auth::{self, PeerExt as _},
3 db::{ChannelId, UserId},
4 AppState,
5};
6use anyhow::anyhow;
7use async_std::task;
8use async_tungstenite::{
9 tungstenite::{protocol::Role, Error as WebSocketError, Message as WebSocketMessage},
10 WebSocketStream,
11};
12use futures::{future::BoxFuture, FutureExt};
13use postage::prelude::Stream as _;
14use sha1::{Digest as _, Sha1};
15use std::{
16 any::{Any, TypeId},
17 collections::{HashMap, HashSet},
18 future::Future,
19 mem,
20 sync::Arc,
21 time::Instant,
22};
23use surf::StatusCode;
24use tide::log;
25use tide::{
26 http::headers::{HeaderName, CONNECTION, UPGRADE},
27 Request, Response,
28};
29use time::OffsetDateTime;
30use zrpc::{
31 auth::random_token,
32 proto::{self, EnvelopedMessage},
33 ConnectionId, Peer, TypedEnvelope,
34};
35
36type ReplicaId = u16;
37
38type MessageHandler = Box<
39 dyn Send
40 + Sync
41 + Fn(
42 &mut Option<Box<dyn Any + Send + Sync>>,
43 Arc<Server>,
44 ) -> Option<BoxFuture<'static, tide::Result<()>>>,
45>;
46
47#[derive(Default)]
48struct ServerBuilder {
49 handlers: Vec<MessageHandler>,
50 handler_types: HashSet<TypeId>,
51}
52
53impl ServerBuilder {
54 pub fn on_message<F, Fut, M>(mut self, handler: F) -> Self
55 where
56 F: 'static + Send + Sync + Fn(Box<TypedEnvelope<M>>, Arc<Server>) -> Fut,
57 Fut: 'static + Send + Future<Output = tide::Result<()>>,
58 M: EnvelopedMessage,
59 {
60 if self.handler_types.insert(TypeId::of::<M>()) {
61 panic!("registered a handler for the same message twice");
62 }
63
64 self.handlers
65 .push(Box::new(move |untyped_envelope, server| {
66 if let Some(typed_envelope) = untyped_envelope.take() {
67 match typed_envelope.downcast::<TypedEnvelope<M>>() {
68 Ok(typed_envelope) => Some((handler)(typed_envelope, server).boxed()),
69 Err(envelope) => {
70 *untyped_envelope = Some(envelope);
71 None
72 }
73 }
74 } else {
75 None
76 }
77 }));
78 self
79 }
80
81 pub fn build(self, rpc: &Arc<Peer>, state: &Arc<AppState>) -> Arc<Server> {
82 Arc::new(Server {
83 rpc: rpc.clone(),
84 state: state.clone(),
85 handlers: self.handlers,
86 })
87 }
88}
89
90pub struct Server {
91 rpc: Arc<Peer>,
92 state: Arc<AppState>,
93 handlers: Vec<MessageHandler>,
94}
95
96impl Server {
97 pub async fn handle_connection<Conn>(
98 self: &Arc<Self>,
99 connection: Conn,
100 addr: String,
101 user_id: UserId,
102 ) where
103 Conn: 'static
104 + futures::Sink<WebSocketMessage, Error = WebSocketError>
105 + futures::Stream<Item = Result<WebSocketMessage, WebSocketError>>
106 + Send
107 + Unpin,
108 {
109 let this = self.clone();
110 let (connection_id, handle_io, mut incoming_rx) = this.rpc.add_connection(connection).await;
111 this.state
112 .rpc
113 .write()
114 .await
115 .add_connection(connection_id, user_id);
116
117 let handle_io = handle_io.fuse();
118 futures::pin_mut!(handle_io);
119 loop {
120 let next_message = incoming_rx.recv().fuse();
121 futures::pin_mut!(next_message);
122 futures::select_biased! {
123 message = next_message => {
124 if let Some(message) = message {
125 let start_time = Instant::now();
126 log::info!("RPC message received");
127 let mut message = Some(message);
128 for handler in &this.handlers {
129 if let Some(future) = (handler)(&mut message, this.clone()) {
130 if let Err(err) = future.await {
131 log::error!("error handling message: {:?}", err);
132 } else {
133 log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
134 }
135 break;
136 }
137 }
138
139 if let Some(message) = message {
140 log::warn!("unhandled message: {:?}", message);
141 }
142 } else {
143 log::info!("rpc connection closed {:?}", addr);
144 break;
145 }
146 }
147 handle_io = handle_io => {
148 if let Err(err) = handle_io {
149 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
150 }
151 break;
152 }
153 }
154 }
155
156 if let Err(err) = this.rpc.sign_out(connection_id, &this.state).await {
157 log::error!("error signing out connection {:?} - {:?}", addr, err);
158 }
159 }
160}
161
162#[derive(Default)]
163pub struct State {
164 connections: HashMap<ConnectionId, Connection>,
165 pub worktrees: HashMap<u64, Worktree>,
166 channels: HashMap<ChannelId, Channel>,
167 next_worktree_id: u64,
168}
169
170struct Connection {
171 user_id: UserId,
172 worktrees: HashSet<u64>,
173 channels: HashSet<ChannelId>,
174}
175
176pub struct Worktree {
177 host_connection_id: Option<ConnectionId>,
178 guest_connection_ids: HashMap<ConnectionId, ReplicaId>,
179 active_replica_ids: HashSet<ReplicaId>,
180 access_token: String,
181 root_name: String,
182 entries: HashMap<u64, proto::Entry>,
183}
184
185#[derive(Default)]
186struct Channel {
187 connection_ids: HashSet<ConnectionId>,
188}
189
190impl Worktree {
191 pub fn connection_ids(&self) -> Vec<ConnectionId> {
192 self.guest_connection_ids
193 .keys()
194 .copied()
195 .chain(self.host_connection_id)
196 .collect()
197 }
198
199 fn host_connection_id(&self) -> tide::Result<ConnectionId> {
200 Ok(self
201 .host_connection_id
202 .ok_or_else(|| anyhow!("host disconnected from worktree"))?)
203 }
204}
205
206impl Channel {
207 fn connection_ids(&self) -> Vec<ConnectionId> {
208 self.connection_ids.iter().copied().collect()
209 }
210}
211
212impl State {
213 // Add a new connection associated with a given user.
214 pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) {
215 self.connections.insert(
216 connection_id,
217 Connection {
218 user_id,
219 worktrees: Default::default(),
220 channels: Default::default(),
221 },
222 );
223 }
224
225 // Remove the given connection and its association with any worktrees.
226 pub fn remove_connection(&mut self, connection_id: ConnectionId) -> Vec<u64> {
227 let mut worktree_ids = Vec::new();
228 if let Some(connection) = self.connections.remove(&connection_id) {
229 for channel_id in connection.channels {
230 if let Some(channel) = self.channels.get_mut(&channel_id) {
231 channel.connection_ids.remove(&connection_id);
232 }
233 }
234 for worktree_id in connection.worktrees {
235 if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
236 if worktree.host_connection_id == Some(connection_id) {
237 worktree_ids.push(worktree_id);
238 } else if let Some(replica_id) =
239 worktree.guest_connection_ids.remove(&connection_id)
240 {
241 worktree.active_replica_ids.remove(&replica_id);
242 worktree_ids.push(worktree_id);
243 }
244 }
245 }
246 }
247 worktree_ids
248 }
249
250 fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
251 if let Some(connection) = self.connections.get_mut(&connection_id) {
252 connection.channels.insert(channel_id);
253 self.channels
254 .entry(channel_id)
255 .or_default()
256 .connection_ids
257 .insert(connection_id);
258 }
259 }
260
261 // Add the given connection as a guest of the given worktree
262 pub fn join_worktree(
263 &mut self,
264 connection_id: ConnectionId,
265 worktree_id: u64,
266 access_token: &str,
267 ) -> Option<(ReplicaId, &Worktree)> {
268 if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
269 if access_token == worktree.access_token {
270 if let Some(connection) = self.connections.get_mut(&connection_id) {
271 connection.worktrees.insert(worktree_id);
272 }
273
274 let mut replica_id = 1;
275 while worktree.active_replica_ids.contains(&replica_id) {
276 replica_id += 1;
277 }
278 worktree.active_replica_ids.insert(replica_id);
279 worktree
280 .guest_connection_ids
281 .insert(connection_id, replica_id);
282 Some((replica_id, worktree))
283 } else {
284 None
285 }
286 } else {
287 None
288 }
289 }
290
291 fn user_id_for_connection(&self, connection_id: ConnectionId) -> tide::Result<UserId> {
292 Ok(self
293 .connections
294 .get(&connection_id)
295 .ok_or_else(|| anyhow!("unknown connection"))?
296 .user_id)
297 }
298
299 fn read_worktree(
300 &self,
301 worktree_id: u64,
302 connection_id: ConnectionId,
303 ) -> tide::Result<&Worktree> {
304 let worktree = self
305 .worktrees
306 .get(&worktree_id)
307 .ok_or_else(|| anyhow!("worktree not found"))?;
308
309 if worktree.host_connection_id == Some(connection_id)
310 || worktree.guest_connection_ids.contains_key(&connection_id)
311 {
312 Ok(worktree)
313 } else {
314 Err(anyhow!(
315 "{} is not a member of worktree {}",
316 connection_id,
317 worktree_id
318 ))?
319 }
320 }
321
322 fn write_worktree(
323 &mut self,
324 worktree_id: u64,
325 connection_id: ConnectionId,
326 ) -> tide::Result<&mut Worktree> {
327 let worktree = self
328 .worktrees
329 .get_mut(&worktree_id)
330 .ok_or_else(|| anyhow!("worktree not found"))?;
331
332 if worktree.host_connection_id == Some(connection_id)
333 || worktree.guest_connection_ids.contains_key(&connection_id)
334 {
335 Ok(worktree)
336 } else {
337 Err(anyhow!(
338 "{} is not a member of worktree {}",
339 connection_id,
340 worktree_id
341 ))?
342 }
343 }
344}
345
346pub fn build_server(state: &Arc<AppState>, rpc: &Arc<Peer>) -> Arc<Server> {
347 ServerBuilder::default()
348 .on_message(share_worktree)
349 .on_message(join_worktree)
350 .on_message(update_worktree)
351 .on_message(close_worktree)
352 .on_message(open_buffer)
353 .on_message(close_buffer)
354 .on_message(update_buffer)
355 .on_message(buffer_saved)
356 .on_message(save_buffer)
357 .on_message(get_channels)
358 .on_message(get_users)
359 .on_message(join_channel)
360 .on_message(send_channel_message)
361 .build(rpc, state)
362}
363
364pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
365 let server = build_server(app.state(), rpc);
366 app.at("/rpc").with(auth::VerifyToken).get(move |request: Request<Arc<AppState>>| {
367 let user_id = request.ext::<UserId>().copied();
368 let server = server.clone();
369 async move {
370 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
371
372 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
373 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
374 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
375
376 if !upgrade_requested {
377 return Ok(Response::new(StatusCode::UpgradeRequired));
378 }
379
380 let header = match request.header("Sec-Websocket-Key") {
381 Some(h) => h.as_str(),
382 None => return Err(anyhow!("expected sec-websocket-key"))?,
383 };
384
385 let mut response = Response::new(StatusCode::SwitchingProtocols);
386 response.insert_header(UPGRADE, "websocket");
387 response.insert_header(CONNECTION, "Upgrade");
388 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
389 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
390 response.insert_header("Sec-Websocket-Version", "13");
391
392 let http_res: &mut tide::http::Response = response.as_mut();
393 let upgrade_receiver = http_res.recv_upgrade().await;
394 let addr = request.remote().unwrap_or("unknown").to_string();
395 let user_id = user_id.ok_or_else(|| anyhow!("user_id is not present on request. ensure auth::VerifyToken middleware is present"))?;
396 task::spawn(async move {
397 if let Some(stream) = upgrade_receiver.await {
398 let stream = WebSocketStream::from_raw_socket(stream, Role::Server, None).await;
399 server.handle_connection(stream, addr, user_id).await;
400 }
401 });
402
403 Ok(response)
404 }
405 });
406}
407
408async fn share_worktree(
409 mut request: Box<TypedEnvelope<proto::ShareWorktree>>,
410 server: Arc<Server>,
411) -> tide::Result<()> {
412 let mut state = server.state.rpc.write().await;
413 let worktree_id = state.next_worktree_id;
414 state.next_worktree_id += 1;
415 let access_token = random_token();
416 let worktree = request
417 .payload
418 .worktree
419 .as_mut()
420 .ok_or_else(|| anyhow!("missing worktree"))?;
421 let entries = mem::take(&mut worktree.entries)
422 .into_iter()
423 .map(|entry| (entry.id, entry))
424 .collect();
425 state.worktrees.insert(
426 worktree_id,
427 Worktree {
428 host_connection_id: Some(request.sender_id),
429 guest_connection_ids: Default::default(),
430 active_replica_ids: Default::default(),
431 access_token: access_token.clone(),
432 root_name: mem::take(&mut worktree.root_name),
433 entries,
434 },
435 );
436
437 server
438 .rpc
439 .respond(
440 request.receipt(),
441 proto::ShareWorktreeResponse {
442 worktree_id,
443 access_token,
444 },
445 )
446 .await?;
447 Ok(())
448}
449
450async fn join_worktree(
451 request: Box<TypedEnvelope<proto::OpenWorktree>>,
452 server: Arc<Server>,
453) -> tide::Result<()> {
454 let worktree_id = request.payload.worktree_id;
455 let access_token = &request.payload.access_token;
456
457 let mut state = server.state.rpc.write().await;
458 if let Some((peer_replica_id, worktree)) =
459 state.join_worktree(request.sender_id, worktree_id, access_token)
460 {
461 let mut peers = Vec::new();
462 if let Some(host_connection_id) = worktree.host_connection_id {
463 peers.push(proto::Peer {
464 peer_id: host_connection_id.0,
465 replica_id: 0,
466 });
467 }
468 for (peer_conn_id, peer_replica_id) in &worktree.guest_connection_ids {
469 if *peer_conn_id != request.sender_id {
470 peers.push(proto::Peer {
471 peer_id: peer_conn_id.0,
472 replica_id: *peer_replica_id as u32,
473 });
474 }
475 }
476
477 broadcast(request.sender_id, worktree.connection_ids(), |conn_id| {
478 server.rpc.send(
479 conn_id,
480 proto::AddPeer {
481 worktree_id,
482 peer: Some(proto::Peer {
483 peer_id: request.sender_id.0,
484 replica_id: peer_replica_id as u32,
485 }),
486 },
487 )
488 })
489 .await?;
490 server
491 .rpc
492 .respond(
493 request.receipt(),
494 proto::OpenWorktreeResponse {
495 worktree_id,
496 worktree: Some(proto::Worktree {
497 root_name: worktree.root_name.clone(),
498 entries: worktree.entries.values().cloned().collect(),
499 }),
500 replica_id: peer_replica_id as u32,
501 peers,
502 },
503 )
504 .await?;
505 } else {
506 server
507 .rpc
508 .respond(
509 request.receipt(),
510 proto::OpenWorktreeResponse {
511 worktree_id,
512 worktree: None,
513 replica_id: 0,
514 peers: Vec::new(),
515 },
516 )
517 .await?;
518 }
519
520 Ok(())
521}
522
523async fn update_worktree(
524 request: Box<TypedEnvelope<proto::UpdateWorktree>>,
525 server: Arc<Server>,
526) -> tide::Result<()> {
527 {
528 let mut state = server.state.rpc.write().await;
529 let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
530 for entry_id in &request.payload.removed_entries {
531 worktree.entries.remove(&entry_id);
532 }
533
534 for entry in &request.payload.updated_entries {
535 worktree.entries.insert(entry.id, entry.clone());
536 }
537 }
538
539 broadcast_in_worktree(request.payload.worktree_id, &request, &server).await?;
540 Ok(())
541}
542
543async fn close_worktree(
544 request: Box<TypedEnvelope<proto::CloseWorktree>>,
545 server: Arc<Server>,
546) -> tide::Result<()> {
547 let connection_ids;
548 {
549 let mut state = server.state.rpc.write().await;
550 let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
551 connection_ids = worktree.connection_ids();
552 if worktree.host_connection_id == Some(request.sender_id) {
553 worktree.host_connection_id = None;
554 } else if let Some(replica_id) = worktree.guest_connection_ids.remove(&request.sender_id) {
555 worktree.active_replica_ids.remove(&replica_id);
556 }
557 }
558
559 broadcast(request.sender_id, connection_ids, |conn_id| {
560 server.rpc.send(
561 conn_id,
562 proto::RemovePeer {
563 worktree_id: request.payload.worktree_id,
564 peer_id: request.sender_id.0,
565 },
566 )
567 })
568 .await?;
569
570 Ok(())
571}
572
573async fn open_buffer(
574 request: Box<TypedEnvelope<proto::OpenBuffer>>,
575 server: Arc<Server>,
576) -> tide::Result<()> {
577 let receipt = request.receipt();
578 let worktree_id = request.payload.worktree_id;
579 let host_connection_id = server
580 .state
581 .rpc
582 .read()
583 .await
584 .read_worktree(worktree_id, request.sender_id)?
585 .host_connection_id()?;
586
587 let response = server
588 .rpc
589 .forward_request(request.sender_id, host_connection_id, request.payload)
590 .await?;
591 server.rpc.respond(receipt, response).await?;
592 Ok(())
593}
594
595async fn close_buffer(
596 request: Box<TypedEnvelope<proto::CloseBuffer>>,
597 server: Arc<Server>,
598) -> tide::Result<()> {
599 let host_connection_id = server
600 .state
601 .rpc
602 .read()
603 .await
604 .read_worktree(request.payload.worktree_id, request.sender_id)?
605 .host_connection_id()?;
606
607 server
608 .rpc
609 .forward_send(request.sender_id, host_connection_id, request.payload)
610 .await?;
611
612 Ok(())
613}
614
615async fn save_buffer(
616 request: Box<TypedEnvelope<proto::SaveBuffer>>,
617 server: Arc<Server>,
618) -> tide::Result<()> {
619 let host;
620 let guests;
621 {
622 let state = server.state.rpc.read().await;
623 let worktree = state.read_worktree(request.payload.worktree_id, request.sender_id)?;
624 host = worktree.host_connection_id()?;
625 guests = worktree
626 .guest_connection_ids
627 .keys()
628 .copied()
629 .collect::<Vec<_>>();
630 }
631
632 let sender = request.sender_id;
633 let receipt = request.receipt();
634 let response = server
635 .rpc
636 .forward_request(sender, host, request.payload.clone())
637 .await?;
638
639 broadcast(host, guests, |conn_id| {
640 let response = response.clone();
641 let server = &server;
642 async move {
643 if conn_id == sender {
644 server.rpc.respond(receipt, response).await
645 } else {
646 server.rpc.forward_send(host, conn_id, response).await
647 }
648 }
649 })
650 .await?;
651
652 Ok(())
653}
654
655async fn update_buffer(
656 request: Box<TypedEnvelope<proto::UpdateBuffer>>,
657 server: Arc<Server>,
658) -> tide::Result<()> {
659 broadcast_in_worktree(request.payload.worktree_id, &request, &server).await
660}
661
662async fn buffer_saved(
663 request: Box<TypedEnvelope<proto::BufferSaved>>,
664 server: Arc<Server>,
665) -> tide::Result<()> {
666 broadcast_in_worktree(request.payload.worktree_id, &request, &server).await
667}
668
669async fn get_channels(
670 request: Box<TypedEnvelope<proto::GetChannels>>,
671 server: Arc<Server>,
672) -> tide::Result<()> {
673 let user_id = server
674 .state
675 .rpc
676 .read()
677 .await
678 .user_id_for_connection(request.sender_id)?;
679 let channels = server.state.db.get_channels_for_user(user_id).await?;
680 server
681 .rpc
682 .respond(
683 request.receipt(),
684 proto::GetChannelsResponse {
685 channels: channels
686 .into_iter()
687 .map(|chan| proto::Channel {
688 id: chan.id.to_proto(),
689 name: chan.name,
690 })
691 .collect(),
692 },
693 )
694 .await?;
695 Ok(())
696}
697
698async fn get_users(
699 request: Box<TypedEnvelope<proto::GetUsers>>,
700 server: Arc<Server>,
701) -> tide::Result<()> {
702 let user_id = server
703 .state
704 .rpc
705 .read()
706 .await
707 .user_id_for_connection(request.sender_id)?;
708 let receipt = request.receipt();
709 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
710 let users = server
711 .state
712 .db
713 .get_users_by_ids(user_id, user_ids)
714 .await?
715 .into_iter()
716 .map(|user| proto::User {
717 id: user.id.to_proto(),
718 github_login: user.github_login,
719 avatar_url: String::new(),
720 })
721 .collect();
722 server
723 .rpc
724 .respond(receipt, proto::GetUsersResponse { users })
725 .await?;
726 Ok(())
727}
728
729async fn join_channel(
730 request: Box<TypedEnvelope<proto::JoinChannel>>,
731 server: Arc<Server>,
732) -> tide::Result<()> {
733 let user_id = server
734 .state
735 .rpc
736 .read()
737 .await
738 .user_id_for_connection(request.sender_id)?;
739 let channel_id = ChannelId::from_proto(request.payload.channel_id);
740 if !server
741 .state
742 .db
743 .can_user_access_channel(user_id, channel_id)
744 .await?
745 {
746 Err(anyhow!("access denied"))?;
747 }
748
749 server
750 .state
751 .rpc
752 .write()
753 .await
754 .join_channel(request.sender_id, channel_id);
755 let messages = server
756 .state
757 .db
758 .get_recent_channel_messages(channel_id, 50)
759 .await?
760 .into_iter()
761 .map(|msg| proto::ChannelMessage {
762 id: msg.id.to_proto(),
763 body: msg.body,
764 timestamp: msg.sent_at.unix_timestamp() as u64,
765 sender_id: msg.sender_id.to_proto(),
766 })
767 .collect();
768 server
769 .rpc
770 .respond(request.receipt(), proto::JoinChannelResponse { messages })
771 .await?;
772 Ok(())
773}
774
775async fn send_channel_message(
776 request: Box<TypedEnvelope<proto::SendChannelMessage>>,
777 server: Arc<Server>,
778) -> tide::Result<()> {
779 let channel_id = ChannelId::from_proto(request.payload.channel_id);
780 let user_id;
781 let connection_ids;
782 {
783 let state = server.state.rpc.read().await;
784 user_id = state.user_id_for_connection(request.sender_id)?;
785 if let Some(channel) = state.channels.get(&channel_id) {
786 connection_ids = channel.connection_ids();
787 } else {
788 return Ok(());
789 }
790 }
791
792 let timestamp = OffsetDateTime::now_utc();
793 let message_id = server
794 .state
795 .db
796 .create_channel_message(channel_id, user_id, &request.payload.body, timestamp)
797 .await?;
798 let message = proto::ChannelMessageSent {
799 channel_id: channel_id.to_proto(),
800 message: Some(proto::ChannelMessage {
801 sender_id: user_id.to_proto(),
802 id: message_id.to_proto(),
803 body: request.payload.body,
804 timestamp: timestamp.unix_timestamp() as u64,
805 }),
806 };
807 broadcast(request.sender_id, connection_ids, |conn_id| {
808 server.rpc.send(conn_id, message.clone())
809 })
810 .await?;
811
812 Ok(())
813}
814
815async fn broadcast_in_worktree<T: proto::EnvelopedMessage>(
816 worktree_id: u64,
817 request: &TypedEnvelope<T>,
818 server: &Arc<Server>,
819) -> tide::Result<()> {
820 let connection_ids = server
821 .state
822 .rpc
823 .read()
824 .await
825 .read_worktree(worktree_id, request.sender_id)?
826 .connection_ids();
827
828 broadcast(request.sender_id, connection_ids, |conn_id| {
829 server
830 .rpc
831 .forward_send(request.sender_id, conn_id, request.payload.clone())
832 })
833 .await?;
834
835 Ok(())
836}
837
838pub async fn broadcast<F, T>(
839 sender_id: ConnectionId,
840 receiver_ids: Vec<ConnectionId>,
841 mut f: F,
842) -> anyhow::Result<()>
843where
844 F: FnMut(ConnectionId) -> T,
845 T: Future<Output = anyhow::Result<()>>,
846{
847 let futures = receiver_ids
848 .into_iter()
849 .filter(|id| *id != sender_id)
850 .map(|id| f(id));
851 futures::future::try_join_all(futures).await?;
852 Ok(())
853}
854
855fn header_contains_ignore_case<T>(
856 request: &tide::Request<T>,
857 header_name: HeaderName,
858 value: &str,
859) -> bool {
860 request
861 .header(header_name)
862 .map(|h| {
863 h.as_str()
864 .split(',')
865 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
866 })
867 .unwrap_or(false)
868}