1//! `XMPPStream` is the common container for all XMPP network connections
2
3use futures::sink::Send;
4use futures::{Poll, Sink, StartSend, Stream};
5use jid::Jid;
6use minidom::Element;
7use tokio_codec::Framed;
8use tokio_io::{AsyncRead, AsyncWrite};
9
10use crate::stream_start::StreamStart;
11use crate::xmpp_codec::{Packet, XMPPCodec};
12
13/// <stream:stream> namespace
14pub const NS_XMPP_STREAM: &str = "http://etherx.jabber.org/streams";
15
16/// Wraps a `stream`
17pub struct XMPPStream<S> {
18 /// The local Jabber-Id
19 pub jid: Jid,
20 /// Codec instance
21 pub stream: Framed<S, XMPPCodec>,
22 /// `<stream:features/>` for XMPP version 1.0
23 pub stream_features: Element,
24 /// Root namespace
25 ///
26 /// This is different for either c2s, s2s, or component
27 /// connections.
28 pub ns: String,
29}
30
31impl<S: AsyncRead + AsyncWrite> XMPPStream<S> {
32 /// Constructor
33 pub fn new(
34 jid: Jid,
35 stream: Framed<S, XMPPCodec>,
36 ns: String,
37 stream_features: Element,
38 ) -> Self {
39 XMPPStream {
40 jid,
41 stream,
42 stream_features,
43 ns,
44 }
45 }
46
47 /// Send a `<stream:stream>` start tag
48 pub fn start(stream: S, jid: Jid, ns: String) -> StreamStart<S> {
49 let xmpp_stream = Framed::new(stream, XMPPCodec::new());
50 StreamStart::from_stream(xmpp_stream, jid, ns)
51 }
52
53 /// Unwraps the inner stream
54 pub fn into_inner(self) -> S {
55 self.stream.into_inner()
56 }
57
58 /// Re-run `start()`
59 pub fn restart(self) -> StreamStart<S> {
60 Self::start(self.stream.into_inner(), self.jid, self.ns)
61 }
62}
63
64impl<S: AsyncWrite> XMPPStream<S> {
65 /// Convenience method
66 pub fn send_stanza<E: Into<Element>>(self, e: E) -> Send<Self> {
67 self.send(Packet::Stanza(e.into()))
68 }
69}
70
71/// Proxy to self.stream
72impl<S: AsyncWrite> Sink for XMPPStream<S> {
73 type SinkItem = <Framed<S, XMPPCodec> as Sink>::SinkItem;
74 type SinkError = <Framed<S, XMPPCodec> as Sink>::SinkError;
75
76 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
77 self.stream.start_send(item)
78 }
79
80 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
81 self.stream.poll_complete()
82 }
83}
84
85/// Proxy to self.stream
86impl<S: AsyncRead> Stream for XMPPStream<S> {
87 type Item = <Framed<S, XMPPCodec> as Stream>::Item;
88 type Error = <Framed<S, XMPPCodec> as Stream>::Error;
89
90 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
91 self.stream.poll()
92 }
93}