responder.rs

  1// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
  2//
  3// This Source Code Form is subject to the terms of the Mozilla Public
  4// License, v. 2.0. If a copy of the MPL was not distributed with this
  5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7use core::pin::Pin;
  8use std::borrow::Cow;
  9use std::io;
 10
 11use futures::SinkExt;
 12
 13use tokio::io::{AsyncBufRead, AsyncWrite};
 14
 15use xmpp_parsers::{stream_error::StreamError, stream_features::StreamFeatures};
 16
 17use xso::{AsXml, FromXml};
 18
 19use super::{
 20    common::{RawXmlStream, StreamHeader},
 21    XmlStream,
 22};
 23
 24/// Type state for a responder stream which has received a stream header
 25///
 26/// To continue stream setup, call [`send_header`][`Self::send_header`].
 27pub struct AcceptedStream<Io> {
 28    pub(super) stream: RawXmlStream<Io>,
 29    pub(super) header: StreamHeader<'static>,
 30}
 31
 32impl<Io> AcceptedStream<Io> {
 33    /// The stream header contents as sent by the peer.
 34    pub fn header(&self) -> StreamHeader<'_> {
 35        StreamHeader {
 36            from: self.header.from.as_deref().map(Cow::Borrowed),
 37            to: self.header.to.as_deref().map(Cow::Borrowed),
 38            id: self.header.id.as_deref().map(Cow::Borrowed),
 39        }
 40    }
 41
 42    /// Extract the stream header contents as sent by the peer.
 43    pub fn take_header(&mut self) -> StreamHeader<'static> {
 44        self.header.take()
 45    }
 46}
 47
 48impl<Io: AsyncBufRead + AsyncWrite + Unpin> AcceptedStream<Io> {
 49    /// Send a stream header.
 50    ///
 51    /// Sends the given stream header to the initiator. Returns a new object
 52    /// which is prepared to send the stream features.
 53    pub async fn send_header(
 54        self,
 55        header: StreamHeader<'_>,
 56    ) -> io::Result<PendingFeaturesSend<Io>> {
 57        let Self {
 58            mut stream,
 59            header: _,
 60        } = self;
 61
 62        header.send(Pin::new(&mut stream)).await?;
 63        Ok(PendingFeaturesSend { stream })
 64    }
 65}
 66
 67/// Type state for a responder stream which has received and sent the stream
 68/// header.
 69///
 70/// To continue stream setup, call [`send_features`][`Self::send_features`].
 71pub struct PendingFeaturesSend<Io> {
 72    pub(super) stream: RawXmlStream<Io>,
 73}
 74
 75impl<Io: AsyncBufRead + AsyncWrite + Unpin> PendingFeaturesSend<Io> {
 76    /// Send the responder's stream features.
 77    ///
 78    /// After the stream features have been sent, the stream can be used for
 79    /// exchanging stream-level elements (stanzas or "nonzas"). The Rust type
 80    /// for these elements must be given as type parameter `T`.
 81    pub async fn send_features<T: FromXml + AsXml>(
 82        self,
 83        features: &'_ StreamFeatures,
 84    ) -> io::Result<XmlStream<Io, T>> {
 85        let Self { mut stream } = self;
 86        Pin::new(&mut stream).start_send_xso(features)?;
 87        stream.flush().await?;
 88
 89        Ok(XmlStream::wrap(stream))
 90    }
 91
 92    /// Send a stream error and shut the stream down.
 93    ///
 94    /// Sends the given stream error to the peer and cleanly closes the stream
 95    /// by sending a stream footer.
 96    pub async fn send_error(self, error: &'_ StreamError) -> io::Result<()> {
 97        let Self { mut stream } = self;
 98        Pin::new(&mut stream).start_send_xso(error)?;
 99        stream.send(xso::Item::ElementFoot).await?;
100        stream.close().await?;
101
102        Ok(())
103    }
104}