mod.rs

  1//! Components in XMPP are services/gateways that are logged into an
  2//! XMPP server under a JID consisting of just a domain name. They are
  3//! allowed to use any user and resource identifiers in their stanzas.
  4use futures::{sink::SinkExt, task::Poll, Sink, Stream};
  5use std::pin::Pin;
  6use std::str::FromStr;
  7use std::task::Context;
  8use tokio::net::TcpStream;
  9use xmpp_parsers::{ns, Element, Jid};
 10
 11use super::happy_eyeballs::connect_to_host;
 12use super::xmpp_codec::Packet;
 13use super::xmpp_stream;
 14use super::Error;
 15use crate::xmpp_stream::add_stanza_id;
 16
 17mod auth;
 18
 19/// Component connection to an XMPP server
 20///
 21/// This simplifies the `XMPPStream` to a `Stream`/`Sink` of `Element`
 22/// (stanzas). Connection handling however is up to the user.
 23pub struct Component {
 24    /// The component's Jabber-Id
 25    pub jid: Jid,
 26    stream: XMPPStream,
 27}
 28
 29type XMPPStream = xmpp_stream::XMPPStream<TcpStream>;
 30
 31impl Component {
 32    /// Start a new XMPP component
 33    pub async fn new(jid: &str, password: &str, server: &str, port: u16) -> Result<Self, Error> {
 34        let jid = Jid::from_str(jid)?;
 35        let password = password.to_owned();
 36        let stream = Self::connect(jid.clone(), password, server, port).await?;
 37        Ok(Component { jid, stream })
 38    }
 39
 40    async fn connect(
 41        jid: Jid,
 42        password: String,
 43        server: &str,
 44        port: u16,
 45    ) -> Result<XMPPStream, Error> {
 46        let password = password;
 47        let tcp_stream = connect_to_host(server, port).await?;
 48        let mut xmpp_stream =
 49            xmpp_stream::XMPPStream::start(tcp_stream, jid, ns::COMPONENT_ACCEPT.to_owned())
 50                .await?;
 51        auth::auth(&mut xmpp_stream, password).await?;
 52        Ok(xmpp_stream)
 53    }
 54
 55    /// Send stanza
 56    pub async fn send_stanza(&mut self, stanza: Element) -> Result<(), Error> {
 57        self.send(add_stanza_id(stanza, ns::COMPONENT_ACCEPT)).await
 58    }
 59
 60    /// End connection
 61    pub async fn send_end(&mut self) -> Result<(), Error> {
 62        self.close().await
 63    }
 64}
 65
 66impl Stream for Component {
 67    type Item = Element;
 68
 69    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
 70        loop {
 71            match Pin::new(&mut self.stream).poll_next(cx) {
 72                Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => return Poll::Ready(Some(stanza)),
 73                Poll::Ready(Some(Ok(Packet::Text(_)))) => {
 74                    // retry
 75                }
 76                Poll::Ready(Some(Ok(_))) =>
 77                // unexpected
 78                {
 79                    return Poll::Ready(None)
 80                }
 81                Poll::Ready(Some(Err(_))) => return Poll::Ready(None),
 82                Poll::Ready(None) => return Poll::Ready(None),
 83                Poll::Pending => return Poll::Pending,
 84            }
 85        }
 86    }
 87}
 88
 89impl Sink<Element> for Component {
 90    type Error = Error;
 91
 92    fn start_send(mut self: Pin<&mut Self>, item: Element) -> Result<(), Self::Error> {
 93        Pin::new(&mut self.stream)
 94            .start_send(Packet::Stanza(item))
 95            .map_err(|e| e.into())
 96    }
 97
 98    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
 99        Pin::new(&mut self.stream)
100            .poll_ready(cx)
101            .map_err(|e| e.into())
102    }
103
104    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
105        Pin::new(&mut self.stream)
106            .poll_flush(cx)
107            .map_err(|e| e.into())
108    }
109
110    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
111        Pin::new(&mut self.stream)
112            .poll_close(cx)
113            .map_err(|e| e.into())
114    }
115}