xmpp_stream.rs

 1use futures::{Poll, Stream, Sink, StartSend};
 2use tokio_io::{AsyncRead, AsyncWrite};
 3use tokio_codec::Framed;
 4use minidom::Element;
 5use jid::Jid;
 6
 7use xmpp_codec::XMPPCodec;
 8use stream_start::StreamStart;
 9
10pub const NS_XMPP_STREAM: &str = "http://etherx.jabber.org/streams";
11
12pub struct XMPPStream<S> {
13    pub jid: Jid,
14    pub stream: Framed<S, XMPPCodec>,
15    pub stream_features: Element,
16    pub ns: String,
17}
18
19impl<S: AsyncRead + AsyncWrite> XMPPStream<S> {
20    pub fn new(jid: Jid,
21               stream: Framed<S, XMPPCodec>,
22               ns: String,
23               stream_features: Element) -> Self {
24        XMPPStream { jid, stream, stream_features, ns }
25    }
26
27    pub fn start(stream: S, jid: Jid, ns: String) -> StreamStart<S> {
28        let xmpp_stream = Framed::new(stream, XMPPCodec::new());
29        StreamStart::from_stream(xmpp_stream, jid, ns)
30    }
31
32    pub fn into_inner(self) -> S {
33        self.stream.into_inner()
34    }
35
36    pub fn restart(self) -> StreamStart<S> {
37        Self::start(self.stream.into_inner(), self.jid, self.ns)
38    }
39}
40
41/// Proxy to self.stream
42impl<S: AsyncWrite> Sink for XMPPStream<S> {
43    type SinkItem = <Framed<S, XMPPCodec> as Sink>::SinkItem;
44    type SinkError = <Framed<S, XMPPCodec> as Sink>::SinkError;
45
46    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
47        self.stream.start_send(item)
48    }
49
50    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
51        self.stream.poll_complete()
52    }
53}
54
55/// Proxy to self.stream
56impl<S: AsyncRead> Stream for XMPPStream<S> {
57    type Item = <Framed<S, XMPPCodec> as Stream>::Item;
58    type Error = <Framed<S, XMPPCodec> as Stream>::Error;
59
60    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
61        self.stream.poll()
62    }
63}