mod.rs

  1//! Components in XMPP are services/gateways that are logged into an
  2//! XMPP server under a JID consisting of just a domain name. They are
  3//! allowed to use any user and resource identifiers in their stanzas.
  4use futures::{sink::SinkExt, task::Poll, Sink, Stream};
  5use std::pin::Pin;
  6use std::str::FromStr;
  7use std::task::Context;
  8use tokio::net::TcpStream;
  9use xmpp_parsers::{ns, Element, Jid};
 10
 11use super::happy_eyeballs::connect_to_host;
 12use super::xmpp_codec::Packet;
 13use super::xmpp_stream;
 14use super::Error;
 15
 16mod auth;
 17
 18/// Component connection to an XMPP server
 19///
 20/// This simplifies the `XMPPStream` to a `Stream`/`Sink` of `Element`
 21/// (stanzas). Connection handling however is up to the user.
 22pub struct Component {
 23    /// The component's Jabber-Id
 24    pub jid: Jid,
 25    stream: XMPPStream,
 26}
 27
 28type XMPPStream = xmpp_stream::XMPPStream<TcpStream>;
 29
 30impl Component {
 31    /// Start a new XMPP component
 32    pub async fn new(jid: &str, password: &str, server: &str, port: u16) -> Result<Self, Error> {
 33        let jid = Jid::from_str(jid)?;
 34        let password = password.to_owned();
 35        let stream = Self::connect(jid.clone(), password, server, port).await?;
 36        Ok(Component { jid, stream })
 37    }
 38
 39    async fn connect(
 40        jid: Jid,
 41        password: String,
 42        server: &str,
 43        port: u16,
 44    ) -> Result<XMPPStream, Error> {
 45        let password = password;
 46        let tcp_stream = connect_to_host(server, port).await?;
 47        let mut xmpp_stream =
 48            xmpp_stream::XMPPStream::start(tcp_stream, jid, ns::COMPONENT_ACCEPT.to_owned())
 49                .await?;
 50        auth::auth(&mut xmpp_stream, password).await?;
 51        Ok(xmpp_stream)
 52    }
 53
 54    /// Send stanza
 55    pub async fn send_stanza(&mut self, stanza: Element) -> Result<(), Error> {
 56        self.send(stanza).await
 57    }
 58
 59    /// End connection
 60    pub async fn send_end(&mut self) -> Result<(), Error> {
 61        self.close().await
 62    }
 63}
 64
 65impl Stream for Component {
 66    type Item = Element;
 67
 68    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
 69        loop {
 70            match Pin::new(&mut self.stream).poll_next(cx) {
 71                Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => return Poll::Ready(Some(stanza)),
 72                Poll::Ready(Some(Ok(Packet::Text(_)))) => {
 73                    // retry
 74                }
 75                Poll::Ready(Some(Ok(_))) =>
 76                // unexpected
 77                {
 78                    return Poll::Ready(None)
 79                }
 80                Poll::Ready(Some(Err(_))) => return Poll::Ready(None),
 81                Poll::Ready(None) => return Poll::Ready(None),
 82                Poll::Pending => return Poll::Pending,
 83            }
 84        }
 85    }
 86}
 87
 88impl Sink<Element> for Component {
 89    type Error = Error;
 90
 91    fn start_send(mut self: Pin<&mut Self>, item: Element) -> Result<(), Self::Error> {
 92        Pin::new(&mut self.stream)
 93            .start_send(Packet::Stanza(item))
 94            .map_err(|e| e.into())
 95    }
 96
 97    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
 98        Pin::new(&mut self.stream)
 99            .poll_ready(cx)
100            .map_err(|e| e.into())
101    }
102
103    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
104        Pin::new(&mut self.stream)
105            .poll_flush(cx)
106            .map_err(|e| e.into())
107    }
108
109    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
110        Pin::new(&mut self.stream)
111            .poll_close(cx)
112            .map_err(|e| e.into())
113    }
114}