1//! Components in XMPP are services/gateways that are logged into an
2//! XMPP server under a JID consisting of just a domain name. They are
3//! allowed to use any user and resource identifiers in their stanzas.
4use futures::{sink::SinkExt, task::Poll, Sink, Stream};
5use std::pin::Pin;
6use std::str::FromStr;
7use std::task::Context;
8use tokio::net::TcpStream;
9use xmpp_parsers::{ns, Element, Jid};
10
11use super::happy_eyeballs::connect_to_host;
12use super::xmpp_codec::Packet;
13use super::xmpp_stream;
14use super::Error;
15use crate::xmpp_stream::add_stanza_id;
16
17mod auth;
18
19/// Component connection to an XMPP server
20///
21/// This simplifies the `XMPPStream` to a `Stream`/`Sink` of `Element`
22/// (stanzas). Connection handling however is up to the user.
23pub struct Component {
24 /// The component's Jabber-Id
25 pub jid: Jid,
26 stream: XMPPStream,
27}
28
29type XMPPStream = xmpp_stream::XMPPStream<TcpStream>;
30
31impl Component {
32 /// Start a new XMPP component
33 pub async fn new(jid: &str, password: &str, server: &str, port: u16) -> Result<Self, Error> {
34 let jid = Jid::from_str(jid)?;
35 let password = password.to_owned();
36 let stream = Self::connect(jid.clone(), password, server, port).await?;
37 Ok(Component { jid, stream })
38 }
39
40 async fn connect(
41 jid: Jid,
42 password: String,
43 server: &str,
44 port: u16,
45 ) -> Result<XMPPStream, Error> {
46 let password = password;
47 let tcp_stream = connect_to_host(server, port).await?;
48 let mut xmpp_stream =
49 xmpp_stream::XMPPStream::start(tcp_stream, jid, ns::COMPONENT_ACCEPT.to_owned())
50 .await?;
51 auth::auth(&mut xmpp_stream, password).await?;
52 Ok(xmpp_stream)
53 }
54
55 /// Send stanza
56 pub async fn send_stanza(&mut self, stanza: Element) -> Result<(), Error> {
57 self.send(add_stanza_id(stanza, ns::COMPONENT_ACCEPT)).await
58 }
59
60 /// End connection
61 pub async fn send_end(&mut self) -> Result<(), Error> {
62 self.close().await
63 }
64}
65
66impl Stream for Component {
67 type Item = Element;
68
69 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
70 loop {
71 match Pin::new(&mut self.stream).poll_next(cx) {
72 Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => return Poll::Ready(Some(stanza)),
73 Poll::Ready(Some(Ok(Packet::Text(_)))) => {
74 // retry
75 }
76 Poll::Ready(Some(Ok(_))) =>
77 // unexpected
78 {
79 return Poll::Ready(None)
80 }
81 Poll::Ready(Some(Err(_))) => return Poll::Ready(None),
82 Poll::Ready(None) => return Poll::Ready(None),
83 Poll::Pending => return Poll::Pending,
84 }
85 }
86 }
87}
88
89impl Sink<Element> for Component {
90 type Error = Error;
91
92 fn start_send(mut self: Pin<&mut Self>, item: Element) -> Result<(), Self::Error> {
93 Pin::new(&mut self.stream)
94 .start_send(Packet::Stanza(item))
95 .map_err(|e| e.into())
96 }
97
98 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
99 Pin::new(&mut self.stream)
100 .poll_ready(cx)
101 .map_err(|e| e.into())
102 }
103
104 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
105 Pin::new(&mut self.stream)
106 .poll_flush(cx)
107 .map_err(|e| e.into())
108 }
109
110 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
111 Pin::new(&mut self.stream)
112 .poll_close(cx)
113 .map_err(|e| e.into())
114 }
115}