1//! Components in XMPP are services/gateways that are logged into an
2//! XMPP server under a JID consisting of just a domain name. They are
3//! allowed to use any user and resource identifiers in their stanzas.
4use futures::{sink::SinkExt, task::Poll, Sink, Stream};
5use std::pin::Pin;
6use std::str::FromStr;
7use std::task::Context;
8use tokio::net::TcpStream;
9use xmpp_parsers::{ns, Element, Jid};
10
11use super::happy_eyeballs::connect_to_host;
12use super::xmpp_codec::Packet;
13use super::xmpp_stream;
14use super::Error;
15
16mod auth;
17
18/// Component connection to an XMPP server
19///
20/// This simplifies the `XMPPStream` to a `Stream`/`Sink` of `Element`
21/// (stanzas). Connection handling however is up to the user.
22pub struct Component {
23 /// The component's Jabber-Id
24 pub jid: Jid,
25 stream: XMPPStream,
26}
27
28type XMPPStream = xmpp_stream::XMPPStream<TcpStream>;
29
30impl Component {
31 /// Start a new XMPP component
32 pub async fn new(jid: &str, password: &str, server: &str, port: u16) -> Result<Self, Error> {
33 let jid = Jid::from_str(jid)?;
34 let password = password.to_owned();
35 let stream = Self::connect(jid.clone(), password, server, port).await?;
36 Ok(Component { jid, stream })
37 }
38
39 async fn connect(
40 jid: Jid,
41 password: String,
42 server: &str,
43 port: u16,
44 ) -> Result<XMPPStream, Error> {
45 let password = password;
46 let tcp_stream = connect_to_host(server, port).await?;
47 let mut xmpp_stream =
48 xmpp_stream::XMPPStream::start(tcp_stream, jid, ns::COMPONENT_ACCEPT.to_owned())
49 .await?;
50 auth::auth(&mut xmpp_stream, password).await?;
51 Ok(xmpp_stream)
52 }
53
54 /// Send stanza
55 pub async fn send_stanza(&mut self, stanza: Element) -> Result<(), Error> {
56 self.send(stanza).await
57 }
58
59 /// End connection
60 pub async fn send_end(&mut self) -> Result<(), Error> {
61 self.close().await
62 }
63}
64
65impl Stream for Component {
66 type Item = Element;
67
68 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
69 loop {
70 match Pin::new(&mut self.stream).poll_next(cx) {
71 Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => return Poll::Ready(Some(stanza)),
72 Poll::Ready(Some(Ok(Packet::Text(_)))) => {
73 // retry
74 }
75 Poll::Ready(Some(Ok(_))) =>
76 // unexpected
77 {
78 return Poll::Ready(None)
79 }
80 Poll::Ready(Some(Err(_))) => return Poll::Ready(None),
81 Poll::Ready(None) => return Poll::Ready(None),
82 Poll::Pending => return Poll::Pending,
83 }
84 }
85 }
86}
87
88impl Sink<Element> for Component {
89 type Error = Error;
90
91 fn start_send(mut self: Pin<&mut Self>, item: Element) -> Result<(), Self::Error> {
92 Pin::new(&mut self.stream)
93 .start_send(Packet::Stanza(item))
94 .map_err(|e| e.into())
95 }
96
97 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
98 Pin::new(&mut self.stream)
99 .poll_ready(cx)
100 .map_err(|e| e.into())
101 }
102
103 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
104 Pin::new(&mut self.stream)
105 .poll_flush(cx)
106 .map_err(|e| e.into())
107 }
108
109 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
110 Pin::new(&mut self.stream)
111 .poll_close(cx)
112 .map_err(|e| e.into())
113 }
114}