1use super::{
2 auth::{self, PeerExt as _},
3 db::{ChannelId, UserId},
4 AppState,
5};
6use anyhow::anyhow;
7use async_std::task;
8use async_tungstenite::{
9 tungstenite::{protocol::Role, Error as WebSocketError, Message as WebSocketMessage},
10 WebSocketStream,
11};
12use sha1::{Digest as _, Sha1};
13use std::{
14 collections::{HashMap, HashSet},
15 future::Future,
16 mem,
17 sync::Arc,
18 time::Instant,
19};
20use surf::StatusCode;
21use tide::log;
22use tide::{
23 http::headers::{HeaderName, CONNECTION, UPGRADE},
24 Request, Response,
25};
26use time::OffsetDateTime;
27use zrpc::{
28 auth::random_token,
29 proto::{self, EnvelopedMessage},
30 ConnectionId, Peer, Router, TypedEnvelope,
31};
32
33type ReplicaId = u16;
34
35#[derive(Default)]
36pub struct State {
37 connections: HashMap<ConnectionId, Connection>,
38 pub worktrees: HashMap<u64, Worktree>,
39 channels: HashMap<ChannelId, Channel>,
40 next_worktree_id: u64,
41}
42
43struct Connection {
44 user_id: UserId,
45 worktrees: HashSet<u64>,
46 channels: HashSet<ChannelId>,
47}
48
49pub struct Worktree {
50 host_connection_id: Option<ConnectionId>,
51 guest_connection_ids: HashMap<ConnectionId, ReplicaId>,
52 active_replica_ids: HashSet<ReplicaId>,
53 access_token: String,
54 root_name: String,
55 entries: HashMap<u64, proto::Entry>,
56}
57
58#[derive(Default)]
59struct Channel {
60 connection_ids: HashSet<ConnectionId>,
61}
62
63impl Worktree {
64 pub fn connection_ids(&self) -> Vec<ConnectionId> {
65 self.guest_connection_ids
66 .keys()
67 .copied()
68 .chain(self.host_connection_id)
69 .collect()
70 }
71
72 fn host_connection_id(&self) -> tide::Result<ConnectionId> {
73 Ok(self
74 .host_connection_id
75 .ok_or_else(|| anyhow!("host disconnected from worktree"))?)
76 }
77}
78
79impl Channel {
80 fn connection_ids(&self) -> Vec<ConnectionId> {
81 self.connection_ids.iter().copied().collect()
82 }
83}
84
85impl State {
86 // Add a new connection associated with a given user.
87 pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) {
88 self.connections.insert(
89 connection_id,
90 Connection {
91 user_id,
92 worktrees: Default::default(),
93 channels: Default::default(),
94 },
95 );
96 }
97
98 // Remove the given connection and its association with any worktrees.
99 pub fn remove_connection(&mut self, connection_id: ConnectionId) -> Vec<u64> {
100 let mut worktree_ids = Vec::new();
101 if let Some(connection) = self.connections.remove(&connection_id) {
102 for channel_id in connection.channels {
103 if let Some(channel) = self.channels.get_mut(&channel_id) {
104 channel.connection_ids.remove(&connection_id);
105 }
106 }
107 for worktree_id in connection.worktrees {
108 if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
109 if worktree.host_connection_id == Some(connection_id) {
110 worktree_ids.push(worktree_id);
111 } else if let Some(replica_id) =
112 worktree.guest_connection_ids.remove(&connection_id)
113 {
114 worktree.active_replica_ids.remove(&replica_id);
115 worktree_ids.push(worktree_id);
116 }
117 }
118 }
119 }
120 worktree_ids
121 }
122
123 fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
124 if let Some(connection) = self.connections.get_mut(&connection_id) {
125 connection.channels.insert(channel_id);
126 self.channels
127 .entry(channel_id)
128 .or_default()
129 .connection_ids
130 .insert(connection_id);
131 }
132 }
133
134 // Add the given connection as a guest of the given worktree
135 pub fn join_worktree(
136 &mut self,
137 connection_id: ConnectionId,
138 worktree_id: u64,
139 access_token: &str,
140 ) -> Option<(ReplicaId, &Worktree)> {
141 if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
142 if access_token == worktree.access_token {
143 if let Some(connection) = self.connections.get_mut(&connection_id) {
144 connection.worktrees.insert(worktree_id);
145 }
146
147 let mut replica_id = 1;
148 while worktree.active_replica_ids.contains(&replica_id) {
149 replica_id += 1;
150 }
151 worktree.active_replica_ids.insert(replica_id);
152 worktree
153 .guest_connection_ids
154 .insert(connection_id, replica_id);
155 Some((replica_id, worktree))
156 } else {
157 None
158 }
159 } else {
160 None
161 }
162 }
163
164 fn user_id_for_connection(&self, connection_id: ConnectionId) -> tide::Result<UserId> {
165 Ok(self
166 .connections
167 .get(&connection_id)
168 .ok_or_else(|| anyhow!("unknown connection"))?
169 .user_id)
170 }
171
172 fn read_worktree(
173 &self,
174 worktree_id: u64,
175 connection_id: ConnectionId,
176 ) -> tide::Result<&Worktree> {
177 let worktree = self
178 .worktrees
179 .get(&worktree_id)
180 .ok_or_else(|| anyhow!("worktree not found"))?;
181
182 if worktree.host_connection_id == Some(connection_id)
183 || worktree.guest_connection_ids.contains_key(&connection_id)
184 {
185 Ok(worktree)
186 } else {
187 Err(anyhow!(
188 "{} is not a member of worktree {}",
189 connection_id,
190 worktree_id
191 ))?
192 }
193 }
194
195 fn write_worktree(
196 &mut self,
197 worktree_id: u64,
198 connection_id: ConnectionId,
199 ) -> tide::Result<&mut Worktree> {
200 let worktree = self
201 .worktrees
202 .get_mut(&worktree_id)
203 .ok_or_else(|| anyhow!("worktree not found"))?;
204
205 if worktree.host_connection_id == Some(connection_id)
206 || worktree.guest_connection_ids.contains_key(&connection_id)
207 {
208 Ok(worktree)
209 } else {
210 Err(anyhow!(
211 "{} is not a member of worktree {}",
212 connection_id,
213 worktree_id
214 ))?
215 }
216 }
217}
218
219trait MessageHandler<'a, M: proto::EnvelopedMessage> {
220 type Output: 'a + Send + Future<Output = tide::Result<()>>;
221
222 fn handle(
223 &self,
224 message: TypedEnvelope<M>,
225 rpc: &'a Arc<Peer>,
226 app_state: &'a Arc<AppState>,
227 ) -> Self::Output;
228}
229
230impl<'a, M, F, Fut> MessageHandler<'a, M> for F
231where
232 M: proto::EnvelopedMessage,
233 F: Fn(TypedEnvelope<M>, &'a Arc<Peer>, &'a Arc<AppState>) -> Fut,
234 Fut: 'a + Send + Future<Output = tide::Result<()>>,
235{
236 type Output = Fut;
237
238 fn handle(
239 &self,
240 message: TypedEnvelope<M>,
241 rpc: &'a Arc<Peer>,
242 app_state: &'a Arc<AppState>,
243 ) -> Self::Output {
244 (self)(message, rpc, app_state)
245 }
246}
247
248fn on_message<M, H>(router: &mut Router, rpc: &Arc<Peer>, app_state: &Arc<AppState>, handler: H)
249where
250 M: EnvelopedMessage,
251 H: 'static + Clone + Send + Sync + for<'a> MessageHandler<'a, M>,
252{
253 let rpc = rpc.clone();
254 let handler = handler.clone();
255 let app_state = app_state.clone();
256 router.add_message_handler(move |message| {
257 let rpc = rpc.clone();
258 let handler = handler.clone();
259 let app_state = app_state.clone();
260 async move {
261 let sender_id = message.sender_id;
262 let message_id = message.message_id;
263 let start_time = Instant::now();
264 log::info!(
265 "RPC message received. id: {}.{}, type:{}",
266 sender_id,
267 message_id,
268 M::NAME
269 );
270 if let Err(err) = handler.handle(message, &rpc, &app_state).await {
271 log::error!("error handling message: {:?}", err);
272 } else {
273 log::info!(
274 "RPC message handled. id:{}.{}, duration:{:?}",
275 sender_id,
276 message_id,
277 start_time.elapsed()
278 );
279 }
280
281 Ok(())
282 }
283 });
284}
285
286pub fn add_rpc_routes(router: &mut Router, state: &Arc<AppState>, rpc: &Arc<Peer>) {
287 on_message(router, rpc, state, share_worktree);
288 on_message(router, rpc, state, join_worktree);
289 on_message(router, rpc, state, update_worktree);
290 on_message(router, rpc, state, close_worktree);
291 on_message(router, rpc, state, open_buffer);
292 on_message(router, rpc, state, close_buffer);
293 on_message(router, rpc, state, update_buffer);
294 on_message(router, rpc, state, buffer_saved);
295 on_message(router, rpc, state, save_buffer);
296 on_message(router, rpc, state, get_channels);
297 on_message(router, rpc, state, get_users);
298 on_message(router, rpc, state, join_channel);
299 on_message(router, rpc, state, send_channel_message);
300}
301
302pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
303 let mut router = Router::new();
304 add_rpc_routes(&mut router, app.state(), rpc);
305 let router = Arc::new(router);
306
307 let rpc = rpc.clone();
308 app.at("/rpc").with(auth::VerifyToken).get(move |request: Request<Arc<AppState>>| {
309 let user_id = request.ext::<UserId>().copied();
310 let rpc = rpc.clone();
311 let router = router.clone();
312 async move {
313 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
314
315 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
316 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
317 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
318
319 if !upgrade_requested {
320 return Ok(Response::new(StatusCode::UpgradeRequired));
321 }
322
323 let header = match request.header("Sec-Websocket-Key") {
324 Some(h) => h.as_str(),
325 None => return Err(anyhow!("expected sec-websocket-key"))?,
326 };
327
328 let mut response = Response::new(StatusCode::SwitchingProtocols);
329 response.insert_header(UPGRADE, "websocket");
330 response.insert_header(CONNECTION, "Upgrade");
331 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
332 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
333 response.insert_header("Sec-Websocket-Version", "13");
334
335 let http_res: &mut tide::http::Response = response.as_mut();
336 let upgrade_receiver = http_res.recv_upgrade().await;
337 let addr = request.remote().unwrap_or("unknown").to_string();
338 let state = request.state().clone();
339 let user_id = user_id.ok_or_else(|| anyhow!("user_id is not present on request. ensure auth::VerifyToken middleware is present"))?;
340 task::spawn(async move {
341 if let Some(stream) = upgrade_receiver.await {
342 let stream = WebSocketStream::from_raw_socket(stream, Role::Server, None).await;
343 handle_connection(rpc, router, state, addr, stream, user_id).await;
344 }
345 });
346
347 Ok(response)
348 }
349 });
350}
351
352pub async fn handle_connection<Conn>(
353 rpc: Arc<Peer>,
354 router: Arc<Router>,
355 state: Arc<AppState>,
356 addr: String,
357 stream: Conn,
358 user_id: UserId,
359) where
360 Conn: 'static
361 + futures::Sink<WebSocketMessage, Error = WebSocketError>
362 + futures::Stream<Item = Result<WebSocketMessage, WebSocketError>>
363 + Send
364 + Unpin,
365{
366 log::info!("accepted rpc connection: {:?}", addr);
367 let (connection_id, handle_io, handle_messages) = rpc.add_connection(stream, router).await;
368 state
369 .rpc
370 .write()
371 .await
372 .add_connection(connection_id, user_id);
373
374 let handle_messages = async move {
375 handle_messages.await;
376 Ok(())
377 };
378
379 if let Err(e) = futures::try_join!(handle_messages, handle_io) {
380 log::error!("error handling rpc connection {:?} - {:?}", addr, e);
381 }
382
383 log::info!("closing connection to {:?}", addr);
384 if let Err(e) = rpc.sign_out(connection_id, &state).await {
385 log::error!("error signing out connection {:?} - {:?}", addr, e);
386 }
387}
388
389async fn share_worktree(
390 mut request: TypedEnvelope<proto::ShareWorktree>,
391 rpc: &Arc<Peer>,
392 state: &Arc<AppState>,
393) -> tide::Result<()> {
394 let mut state = state.rpc.write().await;
395 let worktree_id = state.next_worktree_id;
396 state.next_worktree_id += 1;
397 let access_token = random_token();
398 let worktree = request
399 .payload
400 .worktree
401 .as_mut()
402 .ok_or_else(|| anyhow!("missing worktree"))?;
403 let entries = mem::take(&mut worktree.entries)
404 .into_iter()
405 .map(|entry| (entry.id, entry))
406 .collect();
407 state.worktrees.insert(
408 worktree_id,
409 Worktree {
410 host_connection_id: Some(request.sender_id),
411 guest_connection_ids: Default::default(),
412 active_replica_ids: Default::default(),
413 access_token: access_token.clone(),
414 root_name: mem::take(&mut worktree.root_name),
415 entries,
416 },
417 );
418
419 rpc.respond(
420 request.receipt(),
421 proto::ShareWorktreeResponse {
422 worktree_id,
423 access_token,
424 },
425 )
426 .await?;
427 Ok(())
428}
429
430async fn join_worktree(
431 request: TypedEnvelope<proto::OpenWorktree>,
432 rpc: &Arc<Peer>,
433 state: &Arc<AppState>,
434) -> tide::Result<()> {
435 let worktree_id = request.payload.worktree_id;
436 let access_token = &request.payload.access_token;
437
438 let mut state = state.rpc.write().await;
439 if let Some((peer_replica_id, worktree)) =
440 state.join_worktree(request.sender_id, worktree_id, access_token)
441 {
442 let mut peers = Vec::new();
443 if let Some(host_connection_id) = worktree.host_connection_id {
444 peers.push(proto::Peer {
445 peer_id: host_connection_id.0,
446 replica_id: 0,
447 });
448 }
449 for (peer_conn_id, peer_replica_id) in &worktree.guest_connection_ids {
450 if *peer_conn_id != request.sender_id {
451 peers.push(proto::Peer {
452 peer_id: peer_conn_id.0,
453 replica_id: *peer_replica_id as u32,
454 });
455 }
456 }
457
458 broadcast(request.sender_id, worktree.connection_ids(), |conn_id| {
459 rpc.send(
460 conn_id,
461 proto::AddPeer {
462 worktree_id,
463 peer: Some(proto::Peer {
464 peer_id: request.sender_id.0,
465 replica_id: peer_replica_id as u32,
466 }),
467 },
468 )
469 })
470 .await?;
471 rpc.respond(
472 request.receipt(),
473 proto::OpenWorktreeResponse {
474 worktree_id,
475 worktree: Some(proto::Worktree {
476 root_name: worktree.root_name.clone(),
477 entries: worktree.entries.values().cloned().collect(),
478 }),
479 replica_id: peer_replica_id as u32,
480 peers,
481 },
482 )
483 .await?;
484 } else {
485 rpc.respond(
486 request.receipt(),
487 proto::OpenWorktreeResponse {
488 worktree_id,
489 worktree: None,
490 replica_id: 0,
491 peers: Vec::new(),
492 },
493 )
494 .await?;
495 }
496
497 Ok(())
498}
499
500async fn update_worktree(
501 request: TypedEnvelope<proto::UpdateWorktree>,
502 rpc: &Arc<Peer>,
503 state: &Arc<AppState>,
504) -> tide::Result<()> {
505 {
506 let mut state = state.rpc.write().await;
507 let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
508 for entry_id in &request.payload.removed_entries {
509 worktree.entries.remove(&entry_id);
510 }
511
512 for entry in &request.payload.updated_entries {
513 worktree.entries.insert(entry.id, entry.clone());
514 }
515 }
516
517 broadcast_in_worktree(request.payload.worktree_id, request, rpc, state).await?;
518 Ok(())
519}
520
521async fn close_worktree(
522 request: TypedEnvelope<proto::CloseWorktree>,
523 rpc: &Arc<Peer>,
524 state: &Arc<AppState>,
525) -> tide::Result<()> {
526 let connection_ids;
527 {
528 let mut state = state.rpc.write().await;
529 let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
530 connection_ids = worktree.connection_ids();
531 if worktree.host_connection_id == Some(request.sender_id) {
532 worktree.host_connection_id = None;
533 } else if let Some(replica_id) = worktree.guest_connection_ids.remove(&request.sender_id) {
534 worktree.active_replica_ids.remove(&replica_id);
535 }
536 }
537
538 broadcast(request.sender_id, connection_ids, |conn_id| {
539 rpc.send(
540 conn_id,
541 proto::RemovePeer {
542 worktree_id: request.payload.worktree_id,
543 peer_id: request.sender_id.0,
544 },
545 )
546 })
547 .await?;
548
549 Ok(())
550}
551
552async fn open_buffer(
553 request: TypedEnvelope<proto::OpenBuffer>,
554 rpc: &Arc<Peer>,
555 state: &Arc<AppState>,
556) -> tide::Result<()> {
557 let receipt = request.receipt();
558 let worktree_id = request.payload.worktree_id;
559 let host_connection_id = state
560 .rpc
561 .read()
562 .await
563 .read_worktree(worktree_id, request.sender_id)?
564 .host_connection_id()?;
565
566 let response = rpc
567 .forward_request(request.sender_id, host_connection_id, request.payload)
568 .await?;
569 rpc.respond(receipt, response).await?;
570 Ok(())
571}
572
573async fn close_buffer(
574 request: TypedEnvelope<proto::CloseBuffer>,
575 rpc: &Arc<Peer>,
576 state: &Arc<AppState>,
577) -> tide::Result<()> {
578 let host_connection_id = state
579 .rpc
580 .read()
581 .await
582 .read_worktree(request.payload.worktree_id, request.sender_id)?
583 .host_connection_id()?;
584
585 rpc.forward_send(request.sender_id, host_connection_id, request.payload)
586 .await?;
587
588 Ok(())
589}
590
591async fn save_buffer(
592 request: TypedEnvelope<proto::SaveBuffer>,
593 rpc: &Arc<Peer>,
594 state: &Arc<AppState>,
595) -> tide::Result<()> {
596 let host;
597 let guests;
598 {
599 let state = state.rpc.read().await;
600 let worktree = state.read_worktree(request.payload.worktree_id, request.sender_id)?;
601 host = worktree.host_connection_id()?;
602 guests = worktree
603 .guest_connection_ids
604 .keys()
605 .copied()
606 .collect::<Vec<_>>();
607 }
608
609 let sender = request.sender_id;
610 let receipt = request.receipt();
611 let response = rpc
612 .forward_request(sender, host, request.payload.clone())
613 .await?;
614
615 broadcast(host, guests, |conn_id| {
616 let response = response.clone();
617 async move {
618 if conn_id == sender {
619 rpc.respond(receipt, response).await
620 } else {
621 rpc.forward_send(host, conn_id, response).await
622 }
623 }
624 })
625 .await?;
626
627 Ok(())
628}
629
630async fn update_buffer(
631 request: TypedEnvelope<proto::UpdateBuffer>,
632 rpc: &Arc<Peer>,
633 state: &Arc<AppState>,
634) -> tide::Result<()> {
635 broadcast_in_worktree(request.payload.worktree_id, request, rpc, state).await
636}
637
638async fn buffer_saved(
639 request: TypedEnvelope<proto::BufferSaved>,
640 rpc: &Arc<Peer>,
641 state: &Arc<AppState>,
642) -> tide::Result<()> {
643 broadcast_in_worktree(request.payload.worktree_id, request, rpc, state).await
644}
645
646async fn get_channels(
647 request: TypedEnvelope<proto::GetChannels>,
648 rpc: &Arc<Peer>,
649 state: &Arc<AppState>,
650) -> tide::Result<()> {
651 let user_id = state
652 .rpc
653 .read()
654 .await
655 .user_id_for_connection(request.sender_id)?;
656 let channels = state.db.get_channels_for_user(user_id).await?;
657 rpc.respond(
658 request.receipt(),
659 proto::GetChannelsResponse {
660 channels: channels
661 .into_iter()
662 .map(|chan| proto::Channel {
663 id: chan.id.to_proto(),
664 name: chan.name,
665 })
666 .collect(),
667 },
668 )
669 .await?;
670 Ok(())
671}
672
673async fn get_users(
674 request: TypedEnvelope<proto::GetUsers>,
675 rpc: &Arc<Peer>,
676 state: &Arc<AppState>,
677) -> tide::Result<()> {
678 let user_id = state
679 .rpc
680 .read()
681 .await
682 .user_id_for_connection(request.sender_id)?;
683 let receipt = request.receipt();
684 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
685 let users = state
686 .db
687 .get_users_by_ids(user_id, user_ids)
688 .await?
689 .into_iter()
690 .map(|user| proto::User {
691 id: user.id.to_proto(),
692 github_login: user.github_login,
693 avatar_url: String::new(),
694 })
695 .collect();
696 rpc.respond(receipt, proto::GetUsersResponse { users })
697 .await?;
698 Ok(())
699}
700
701async fn join_channel(
702 request: TypedEnvelope<proto::JoinChannel>,
703 rpc: &Arc<Peer>,
704 state: &Arc<AppState>,
705) -> tide::Result<()> {
706 let user_id = state
707 .rpc
708 .read()
709 .await
710 .user_id_for_connection(request.sender_id)?;
711 let channel_id = ChannelId::from_proto(request.payload.channel_id);
712 if !state
713 .db
714 .can_user_access_channel(user_id, channel_id)
715 .await?
716 {
717 Err(anyhow!("access denied"))?;
718 }
719
720 state
721 .rpc
722 .write()
723 .await
724 .join_channel(request.sender_id, channel_id);
725 let messages = state
726 .db
727 .get_recent_channel_messages(channel_id, 50)
728 .await?
729 .into_iter()
730 .map(|msg| proto::ChannelMessage {
731 id: msg.id.to_proto(),
732 body: msg.body,
733 timestamp: msg.sent_at.unix_timestamp() as u64,
734 sender_id: msg.sender_id.to_proto(),
735 })
736 .collect();
737 rpc.respond(request.receipt(), proto::JoinChannelResponse { messages })
738 .await?;
739 Ok(())
740}
741
742async fn send_channel_message(
743 request: TypedEnvelope<proto::SendChannelMessage>,
744 peer: &Arc<Peer>,
745 app: &Arc<AppState>,
746) -> tide::Result<()> {
747 let channel_id = ChannelId::from_proto(request.payload.channel_id);
748 let user_id;
749 let connection_ids;
750 {
751 let state = app.rpc.read().await;
752 user_id = state.user_id_for_connection(request.sender_id)?;
753 if let Some(channel) = state.channels.get(&channel_id) {
754 connection_ids = channel.connection_ids();
755 } else {
756 return Ok(());
757 }
758 }
759
760 let timestamp = OffsetDateTime::now_utc();
761 let message_id = app
762 .db
763 .create_channel_message(channel_id, user_id, &request.payload.body, timestamp)
764 .await?;
765 let message = proto::ChannelMessageSent {
766 channel_id: channel_id.to_proto(),
767 message: Some(proto::ChannelMessage {
768 sender_id: user_id.to_proto(),
769 id: message_id.to_proto(),
770 body: request.payload.body,
771 timestamp: timestamp.unix_timestamp() as u64,
772 }),
773 };
774 broadcast(request.sender_id, connection_ids, |conn_id| {
775 peer.send(conn_id, message.clone())
776 })
777 .await?;
778
779 Ok(())
780}
781
782async fn broadcast_in_worktree<T: proto::EnvelopedMessage>(
783 worktree_id: u64,
784 request: TypedEnvelope<T>,
785 rpc: &Arc<Peer>,
786 state: &Arc<AppState>,
787) -> tide::Result<()> {
788 let connection_ids = state
789 .rpc
790 .read()
791 .await
792 .read_worktree(worktree_id, request.sender_id)?
793 .connection_ids();
794
795 broadcast(request.sender_id, connection_ids, |conn_id| {
796 rpc.forward_send(request.sender_id, conn_id, request.payload.clone())
797 })
798 .await?;
799
800 Ok(())
801}
802
803pub async fn broadcast<F, T>(
804 sender_id: ConnectionId,
805 receiver_ids: Vec<ConnectionId>,
806 mut f: F,
807) -> anyhow::Result<()>
808where
809 F: FnMut(ConnectionId) -> T,
810 T: Future<Output = anyhow::Result<()>>,
811{
812 let futures = receiver_ids
813 .into_iter()
814 .filter(|id| *id != sender_id)
815 .map(|id| f(id));
816 futures::future::try_join_all(futures).await?;
817 Ok(())
818}
819
820fn header_contains_ignore_case<T>(
821 request: &tide::Request<T>,
822 header_name: HeaderName,
823 value: &str,
824) -> bool {
825 request
826 .header(header_name)
827 .map(|h| {
828 h.as_str()
829 .split(',')
830 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
831 })
832 .unwrap_or(false)
833}