@@ -0,0 +1,37 @@
+use std::io::{Read, stdin};
+use std::env::args;
+use std::process::exit;
+use std::str::FromStr;
+use tokio;
+use tokio_xmpp::OneshotClient as Client;
+use xmpp_parsers::message::{Body, Message};
+use xmpp_parsers::Jid;
+
+#[tokio::main]
+async fn main() {
+ let args: Vec<String> = args().collect();
+ if args.len() != 4 {
+ println!("Usage: {} <jid> <password> <recipient>", args[0]);
+ exit(1);
+ }
+ // Configuration
+ let jid = &args[1];
+ let password = &args[2];
+ let recipient = Jid::from_str(&args[3]).unwrap();
+
+ // Client instance
+ let mut client = Client::new(jid, password.to_owned()).await.unwrap();
+
+ // Read from stdin
+ println!("Client connected, type message and submit with Ctrl-D");
+ let mut body = String::new();
+ stdin().lock().read_to_string(&mut body).unwrap();
+
+ // Send message
+ let mut message = Message::new(Some(recipient));
+ message.bodies.insert(String::new(), Body(body.to_owned()));
+ client.send_stanza(message).await.unwrap();
+
+ // Close client connection
+ client.end().await.unwrap();
+}
@@ -0,0 +1,154 @@
+use futures::{sink::SinkExt, Sink, Stream};
+use idna;
+use sasl::common::{ChannelBinding, Credentials};
+use std::pin::Pin;
+use std::str::FromStr;
+use std::task::{Context, Poll};
+use tokio::{net::TcpStream, stream::StreamExt};
+use tokio_tls::TlsStream;
+use xmpp_parsers::{Element, Jid};
+
+use crate::happy_eyeballs::connect;
+use crate::starttls::starttls;
+use crate::xmpp_codec::Packet;
+use crate::xmpp_stream;
+use crate::{Error, ProtocolError};
+use super::auth::auth;
+use super::bind::bind;
+
+/// XMPP client connection and state
+///
+/// This implements the `futures` crate's [`Stream`](#impl-Stream) and
+/// [`Sink`](#impl-Sink<Packet>) traits.
+pub struct Client {
+ stream: XMPPStream,
+}
+
+type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
+const NS_JABBER_CLIENT: &str = "jabber:client";
+
+impl Client {
+ /// Start a new XMPP client and wait for a usable session
+ pub async fn new<P: Into<String>>(jid: &str, password: P) -> Result<Self, Error> {
+ let jid = Jid::from_str(jid)?;
+ let client = Self::new_with_jid(jid, password.into()).await?;
+ Ok(client)
+ }
+
+ /// Start a new client given that the JID is already parsed.
+ pub async fn new_with_jid(jid: Jid, password: String) -> Result<Self, Error> {
+ let stream = Self::connect(jid.clone(), password.clone()).await?;
+ Ok(Client { stream })
+ }
+
+ async fn connect(jid: Jid, password: String) -> Result<XMPPStream, Error> {
+ let username = jid.clone().node().unwrap();
+ let password = password;
+ let domain = idna::domain_to_ascii(&jid.clone().domain()).map_err(|_| Error::Idna)?;
+
+ // TCP connection
+ let tcp_stream = connect(&domain, Some("_xmpp-client._tcp"), 5222).await?;
+
+ // Unencryped XMPPStream
+ let xmpp_stream =
+ xmpp_stream::XMPPStream::start(tcp_stream, jid.clone(), NS_JABBER_CLIENT.to_owned()).await?;
+
+ let xmpp_stream = if xmpp_stream.stream_features.can_starttls() {
+ // TlsStream
+ let tls_stream = starttls(xmpp_stream).await?;
+ // Encrypted XMPPStream
+ xmpp_stream::XMPPStream::start(tls_stream, jid.clone(), NS_JABBER_CLIENT.to_owned()).await?
+ } else {
+ return Err(Error::Protocol(ProtocolError::NoTls));
+ };
+
+ let creds = Credentials::default()
+ .with_username(username)
+ .with_password(password)
+ .with_channel_binding(ChannelBinding::None);
+ // Authenticated (unspecified) stream
+ let stream = auth(xmpp_stream, creds).await?;
+ // Authenticated XMPPStream
+ let xmpp_stream = xmpp_stream::XMPPStream::start(stream, jid, NS_JABBER_CLIENT.to_owned()).await?;
+
+ // XMPPStream bound to user session
+ let xmpp_stream = bind(xmpp_stream).await?;
+ Ok(xmpp_stream)
+ }
+
+ /// Get the client's bound JID (the one reported by the XMPP
+ /// server).
+ pub fn bound_jid(&self) -> &Jid {
+ &self.stream.jid
+ }
+
+ /// Send stanza
+ pub async fn send_stanza<E>(&mut self, stanza: E) -> Result<(), Error>
+ where
+ E: Into<Element>,
+ {
+ self.send(Packet::Stanza(stanza.into())).await
+ }
+
+ /// End connection by sending `</stream:stream>`
+ ///
+ /// You may expect the server to respond with the same. This
+ /// client will then drop its connection.
+ pub async fn end(mut self) -> Result<(), Error> {
+ self.send(Packet::StreamEnd).await?;
+
+ // Wait for stream end from server
+ while let Some(Ok(_)) = self.next().await {}
+
+ Ok(())
+ }
+}
+
+/// Incoming XMPP events
+///
+/// In an `async fn` you may want to use this with `use
+/// futures::stream::StreamExt;`
+impl Stream for Client {
+ type Item = Result<Element, Error>;
+
+ /// Low-level read on the XMPP stream
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+ loop {
+ match Pin::new(&mut self.stream).poll_next(cx) {
+ Poll::Pending =>
+ return Poll::Pending,
+ Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) =>
+ return Poll::Ready(Some(Ok(stanza))),
+ Poll::Ready(Some(Ok(Packet::Text(_)))) => {
+ // Ignore, retry
+ }
+ Poll::Ready(_) =>
+ // Unexpected and errors, just end
+ return Poll::Ready(None),
+ }
+ }
+ }
+}
+
+/// Outgoing XMPP packets
+///
+/// See `send_stanza()` for an `async fn`
+impl Sink<Packet> for Client {
+ type Error = Error;
+
+ fn start_send(mut self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> {
+ Pin::new(&mut self.stream).start_send(item)
+ }
+
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
+ Pin::new(&mut self.stream).poll_ready(cx)
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
+ Pin::new(&mut self.stream).poll_flush(cx)
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
+ Pin::new(&mut self.stream).poll_close(cx)
+ }
+}
@@ -12,7 +12,7 @@ pub mod xmpp_stream;
pub mod stream_features;
pub use crate::event::Event;
mod client;
-pub use crate::client::Client;
+pub use client::{Client, oneshot_client::Client as OneshotClient};
mod component;
pub use crate::component::Component;
mod error;