1//! Components in XMPP are services/gateways that are logged into an
2//! XMPP server under a JID consisting of just a domain name. They are
3//! allowed to use any user and resource identifiers in their stanzas.
4use futures::{sink::SinkExt, task::Poll, Sink, Stream};
5use std::pin::Pin;
6use std::str::FromStr;
7use std::task::Context;
8use xmpp_parsers::{ns, Element, Jid};
9
10use self::connect::component_login;
11
12use super::xmpp_codec::Packet;
13use super::Error;
14use crate::connect::ServerConnector;
15use crate::xmpp_stream::add_stanza_id;
16use crate::xmpp_stream::XMPPStream;
17
18mod auth;
19
20pub(crate) mod connect;
21
22/// Component connection to an XMPP server
23///
24/// This simplifies the `XMPPStream` to a `Stream`/`Sink` of `Element`
25/// (stanzas). Connection handling however is up to the user.
26pub struct Component<C: ServerConnector> {
27 /// The component's Jabber-Id
28 pub jid: Jid,
29 stream: XMPPStream<C::Stream>,
30}
31
32impl<C: ServerConnector> Component<C> {
33 /// Start a new XMPP component
34 pub async fn new_with_connector(
35 jid: &str,
36 password: &str,
37 connector: C,
38 ) -> Result<Self, Error> {
39 let jid = Jid::from_str(jid)?;
40 let password = password.to_owned();
41 let stream = component_login(connector, jid.clone(), password).await?;
42 Ok(Component { jid, stream })
43 }
44
45 /// Send stanza
46 pub async fn send_stanza(&mut self, stanza: Element) -> Result<(), Error> {
47 self.send(add_stanza_id(stanza, ns::COMPONENT_ACCEPT)).await
48 }
49
50 /// End connection
51 pub async fn send_end(&mut self) -> Result<(), Error> {
52 self.close().await
53 }
54}
55
56impl<C: ServerConnector> Stream for Component<C> {
57 type Item = Element;
58
59 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
60 loop {
61 match Pin::new(&mut self.stream).poll_next(cx) {
62 Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => return Poll::Ready(Some(stanza)),
63 Poll::Ready(Some(Ok(Packet::Text(_)))) => {
64 // retry
65 }
66 Poll::Ready(Some(Ok(_))) =>
67 // unexpected
68 {
69 return Poll::Ready(None)
70 }
71 Poll::Ready(Some(Err(_))) => return Poll::Ready(None),
72 Poll::Ready(None) => return Poll::Ready(None),
73 Poll::Pending => return Poll::Pending,
74 }
75 }
76 }
77}
78
79impl<C: ServerConnector> Sink<Element> for Component<C> {
80 type Error = Error;
81
82 fn start_send(mut self: Pin<&mut Self>, item: Element) -> Result<(), Self::Error> {
83 Pin::new(&mut self.stream)
84 .start_send(Packet::Stanza(item))
85 .map_err(|e| e.into())
86 }
87
88 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
89 Pin::new(&mut self.stream)
90 .poll_ready(cx)
91 .map_err(|e| e.into())
92 }
93
94 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
95 Pin::new(&mut self.stream)
96 .poll_flush(cx)
97 .map_err(|e| e.into())
98 }
99
100 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
101 Pin::new(&mut self.stream)
102 .poll_close(cx)
103 .map_err(|e| e.into())
104 }
105}