mod.rs

  1//! Components in XMPP are services/gateways that are logged into an
  2//! XMPP server under a JID consisting of just a domain name. They are
  3//! allowed to use any user and resource identifiers in their stanzas.
  4use futures::{sink::SinkExt, task::Poll, Sink, Stream};
  5use std::pin::Pin;
  6use std::str::FromStr;
  7use std::task::Context;
  8use xmpp_parsers::{ns, Element, Jid};
  9
 10use self::connect::component_login;
 11
 12use super::xmpp_codec::Packet;
 13use super::Error;
 14use crate::connect::ServerConnector;
 15use crate::xmpp_stream::add_stanza_id;
 16use crate::xmpp_stream::XMPPStream;
 17
 18mod auth;
 19
 20pub(crate) mod connect;
 21
 22/// Component connection to an XMPP server
 23///
 24/// This simplifies the `XMPPStream` to a `Stream`/`Sink` of `Element`
 25/// (stanzas). Connection handling however is up to the user.
 26pub struct Component<C: ServerConnector> {
 27    /// The component's Jabber-Id
 28    pub jid: Jid,
 29    stream: XMPPStream<C::Stream>,
 30}
 31
 32impl<C: ServerConnector> Component<C> {
 33    /// Start a new XMPP component
 34    pub async fn new_with_connector(
 35        jid: &str,
 36        password: &str,
 37        connector: C,
 38    ) -> Result<Self, Error> {
 39        let jid = Jid::from_str(jid)?;
 40        let password = password.to_owned();
 41        let stream = component_login(connector, jid.clone(), password).await?;
 42        Ok(Component { jid, stream })
 43    }
 44
 45    /// Send stanza
 46    pub async fn send_stanza(&mut self, stanza: Element) -> Result<(), Error> {
 47        self.send(add_stanza_id(stanza, ns::COMPONENT_ACCEPT)).await
 48    }
 49
 50    /// End connection
 51    pub async fn send_end(&mut self) -> Result<(), Error> {
 52        self.close().await
 53    }
 54}
 55
 56impl<C: ServerConnector> Stream for Component<C> {
 57    type Item = Element;
 58
 59    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
 60        loop {
 61            match Pin::new(&mut self.stream).poll_next(cx) {
 62                Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => return Poll::Ready(Some(stanza)),
 63                Poll::Ready(Some(Ok(Packet::Text(_)))) => {
 64                    // retry
 65                }
 66                Poll::Ready(Some(Ok(_))) =>
 67                // unexpected
 68                {
 69                    return Poll::Ready(None)
 70                }
 71                Poll::Ready(Some(Err(_))) => return Poll::Ready(None),
 72                Poll::Ready(None) => return Poll::Ready(None),
 73                Poll::Pending => return Poll::Pending,
 74            }
 75        }
 76    }
 77}
 78
 79impl<C: ServerConnector> Sink<Element> for Component<C> {
 80    type Error = Error;
 81
 82    fn start_send(mut self: Pin<&mut Self>, item: Element) -> Result<(), Self::Error> {
 83        Pin::new(&mut self.stream)
 84            .start_send(Packet::Stanza(item))
 85            .map_err(|e| e.into())
 86    }
 87
 88    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
 89        Pin::new(&mut self.stream)
 90            .poll_ready(cx)
 91            .map_err(|e| e.into())
 92    }
 93
 94    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
 95        Pin::new(&mut self.stream)
 96            .poll_flush(cx)
 97            .map_err(|e| e.into())
 98    }
 99
100    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
101        Pin::new(&mut self.stream)
102            .poll_close(cx)
103            .map_err(|e| e.into())
104    }
105}