xmpp_stream.rs

 1//! `XMPPStream` is the common container for all XMPP network connections
 2
 3use futures::sink::Send;
 4use futures::{Poll, Sink, StartSend, Stream};
 5use jid::Jid;
 6use minidom::Element;
 7use tokio_codec::Framed;
 8use tokio_io::{AsyncRead, AsyncWrite};
 9
10use crate::stream_start::StreamStart;
11use crate::xmpp_codec::{Packet, XMPPCodec};
12
13/// <stream:stream> namespace
14pub const NS_XMPP_STREAM: &str = "http://etherx.jabber.org/streams";
15
16/// Wraps a `stream`
17pub struct XMPPStream<S> {
18    /// The local Jabber-Id
19    pub jid: Jid,
20    /// Codec instance
21    pub stream: Framed<S, XMPPCodec>,
22    /// `<stream:features/>` for XMPP version 1.0
23    pub stream_features: Element,
24    /// Root namespace
25    ///
26    /// This is different for either c2s, s2s, or component
27    /// connections.
28    pub ns: String,
29}
30
31impl<S: AsyncRead + AsyncWrite> XMPPStream<S> {
32    /// Constructor
33    pub fn new(
34        jid: Jid,
35        stream: Framed<S, XMPPCodec>,
36        ns: String,
37        stream_features: Element,
38    ) -> Self {
39        XMPPStream {
40            jid,
41            stream,
42            stream_features,
43            ns,
44        }
45    }
46
47    /// Send a `<stream:stream>` start tag
48    pub fn start(stream: S, jid: Jid, ns: String) -> StreamStart<S> {
49        let xmpp_stream = Framed::new(stream, XMPPCodec::new());
50        StreamStart::from_stream(xmpp_stream, jid, ns)
51    }
52
53    /// Unwraps the inner stream
54    pub fn into_inner(self) -> S {
55        self.stream.into_inner()
56    }
57
58    /// Re-run `start()`
59    pub fn restart(self) -> StreamStart<S> {
60        Self::start(self.stream.into_inner(), self.jid, self.ns)
61    }
62}
63
64impl<S: AsyncWrite> XMPPStream<S> {
65    /// Convenience method
66    pub fn send_stanza<E: Into<Element>>(self, e: E) -> Send<Self> {
67        self.send(Packet::Stanza(e.into()))
68    }
69}
70
71/// Proxy to self.stream
72impl<S: AsyncWrite> Sink for XMPPStream<S> {
73    type SinkItem = <Framed<S, XMPPCodec> as Sink>::SinkItem;
74    type SinkError = <Framed<S, XMPPCodec> as Sink>::SinkError;
75
76    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
77        self.stream.start_send(item)
78    }
79
80    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
81        self.stream.poll_complete()
82    }
83}
84
85/// Proxy to self.stream
86impl<S: AsyncRead> Stream for XMPPStream<S> {
87    type Item = <Framed<S, XMPPCodec> as Stream>::Item;
88    type Error = <Framed<S, XMPPCodec> as Stream>::Error;
89
90    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
91        self.stream.poll()
92    }
93}