Detailed changes
@@ -28,39 +28,31 @@ async fn main() {
let mut client = Client::new(jid, password);
// Main loop, processes events
- let mut wait_for_stream_end = false;
- let mut stream_ended = false;
- while !stream_ended {
- if let Some(event) = client.next().await {
- if wait_for_stream_end {
- /* Do Nothing. */
- } else if event.is_online() {
- println!("Online!");
+ while let Some(event) = client.next().await {
+ if event.is_online() {
+ println!("Online!");
- let target_jid: Jid = target.clone().parse().unwrap();
- let iq = make_disco_iq(target_jid);
- println!("Sending disco#info request to {}", target.clone());
- println!(">> {:?}", iq);
- client.send_stanza(iq.into()).await.unwrap();
- } else if let Some(Stanza::Iq(iq)) = event.into_stanza() {
- if let IqType::Result(Some(payload)) = iq.payload {
- if payload.is("query", ns::DISCO_INFO) {
- if let Ok(disco_info) = DiscoInfoResult::try_from(payload) {
- for ext in disco_info.extensions {
- if let Ok(server_info) = ServerInfo::try_from(ext) {
- print_server_info(server_info);
- }
+ let target_jid: Jid = target.clone().parse().unwrap();
+ let iq = make_disco_iq(target_jid);
+ println!("Sending disco#info request to {}", target.clone());
+ println!(">> {:?}", iq);
+ client.send_stanza(iq.into()).await.unwrap();
+ } else if let Some(Stanza::Iq(iq)) = event.into_stanza() {
+ if let IqType::Result(Some(payload)) = iq.payload {
+ if payload.is("query", ns::DISCO_INFO) {
+ if let Ok(disco_info) = DiscoInfoResult::try_from(payload) {
+ for ext in disco_info.extensions {
+ if let Ok(server_info) = ServerInfo::try_from(ext) {
+ print_server_info(server_info);
}
}
}
- wait_for_stream_end = true;
- client.send_end().await.unwrap();
}
+ break;
}
- } else {
- stream_ended = true;
}
}
+ client.send_end().await.expect("Stream shutdown unclean");
}
fn make_disco_iq(target: Jid) -> Iq {
@@ -40,33 +40,29 @@ async fn main() {
let disco_info = make_disco();
// Main loop, processes events
- let mut wait_for_stream_end = false;
- let mut stream_ended = false;
- while !stream_ended {
- if let Some(event) = client.next().await {
- if wait_for_stream_end {
- /* Do nothing */
- } else if event.is_online() {
- println!("Online!");
+ while let Some(event) = client.next().await {
+ if event.is_online() {
+ println!("Online!");
- let caps = get_disco_caps(&disco_info, "https://gitlab.com/xmpp-rs/tokio-xmpp");
- let presence = make_presence(caps);
- client.send_stanza(presence.into()).await.unwrap();
- } else if let Some(stanza) = event.into_stanza() {
- match stanza {
- Stanza::Iq(iq) => {
- if let IqType::Get(payload) = iq.payload {
- if payload.is("query", ns::DISCO_INFO) {
- let query = DiscoInfoQuery::try_from(payload);
- match query {
- Ok(query) => {
- let mut disco = disco_info.clone();
- disco.node = query.node;
- let iq = Iq::from_result(iq.id, Some(disco))
- .with_to(iq.from.unwrap());
- client.send_stanza(iq.into()).await.unwrap();
- }
- Err(err) => client
+ let caps = get_disco_caps(&disco_info, "https://gitlab.com/xmpp-rs/tokio-xmpp");
+ let presence = make_presence(caps);
+ client.send_stanza(presence.into()).await.unwrap();
+ } else if let Some(stanza) = event.into_stanza() {
+ match stanza {
+ Stanza::Iq(iq) => {
+ if let IqType::Get(payload) = iq.payload {
+ if payload.is("query", ns::DISCO_INFO) {
+ let query = DiscoInfoQuery::try_from(payload);
+ match query {
+ Ok(query) => {
+ let mut disco = disco_info.clone();
+ disco.node = query.node;
+ let iq = Iq::from_result(iq.id, Some(disco))
+ .with_to(iq.from.unwrap());
+ client.send_stanza(iq.into()).await.unwrap();
+ }
+ Err(err) => {
+ client
.send_stanza(
make_error(
iq.from.unwrap(),
@@ -78,32 +74,11 @@ async fn main() {
.into(),
)
.await
- .unwrap(),
+ .unwrap();
}
- } else {
- // We MUST answer unhandled get iqs with a service-unavailable error.
- client
- .send_stanza(
- make_error(
- iq.from.unwrap(),
- iq.id,
- ErrorType::Cancel,
- DefinedCondition::ServiceUnavailable,
- "No handler defined for this kind of iq.",
- )
- .into(),
- )
- .await
- .unwrap();
- }
- } else if let IqType::Result(Some(payload)) = iq.payload {
- if payload.is("pubsub", ns::PUBSUB) {
- let pubsub = PubSub::try_from(payload).unwrap();
- let from = iq.from.clone().unwrap_or(jid.clone().into());
- handle_iq_result(pubsub, &from);
}
- } else if let IqType::Set(_) = iq.payload {
- // We MUST answer unhandled set iqs with a service-unavailable error.
+ } else {
+ // We MUST answer unhandled get iqs with a service-unavailable error.
client
.send_stanza(
make_error(
@@ -118,47 +93,64 @@ async fn main() {
.await
.unwrap();
}
+ } else if let IqType::Result(Some(payload)) = iq.payload {
+ if payload.is("pubsub", ns::PUBSUB) {
+ let pubsub = PubSub::try_from(payload).unwrap();
+ let from = iq.from.clone().unwrap_or(jid.clone().into());
+ handle_iq_result(pubsub, &from);
+ }
+ } else if let IqType::Set(_) = iq.payload {
+ // We MUST answer unhandled set iqs with a service-unavailable error.
+ client
+ .send_stanza(
+ make_error(
+ iq.from.unwrap(),
+ iq.id,
+ ErrorType::Cancel,
+ DefinedCondition::ServiceUnavailable,
+ "No handler defined for this kind of iq.",
+ )
+ .into(),
+ )
+ .await
+ .unwrap();
}
- Stanza::Message(message) => {
- let from = message.from.clone().unwrap();
- if let Some(body) = message.get_best_body(vec!["en"]) {
- if body.0 == "die" {
- println!("Secret die command triggered by {}", from);
- wait_for_stream_end = true;
- client.send_end().await.unwrap();
- }
+ }
+ Stanza::Message(message) => {
+ let from = message.from.clone().unwrap();
+ if let Some(body) = message.get_best_body(vec!["en"]) {
+ if body.0 == "die" {
+ println!("Secret die command triggered by {}", from);
+ break;
}
- for child in message.payloads {
- if child.is("event", ns::PUBSUB_EVENT) {
- let event = PubSubEvent::try_from(child).unwrap();
- if let PubSubEvent::PublishedItems { node, items } = event {
- if node.0 == ns::AVATAR_METADATA {
- for item in items.into_iter() {
- let payload = item.payload.clone().unwrap();
- if payload.is("metadata", ns::AVATAR_METADATA) {
- // TODO: do something with these metadata.
- let _metadata =
- AvatarMetadata::try_from(payload).unwrap();
- println!(
- "[1m{}[0m has published an avatar, downloading...",
- from.clone()
- );
- let iq = download_avatar(from.clone());
- client.send_stanza(iq.into()).await.unwrap();
- }
+ }
+ for child in message.payloads {
+ if child.is("event", ns::PUBSUB_EVENT) {
+ let event = PubSubEvent::try_from(child).unwrap();
+ if let PubSubEvent::PublishedItems { node, items } = event {
+ if node.0 == ns::AVATAR_METADATA {
+ for item in items.into_iter() {
+ let payload = item.payload.clone().unwrap();
+ if payload.is("metadata", ns::AVATAR_METADATA) {
+ // TODO: do something with these metadata.
+ let _metadata =
+ AvatarMetadata::try_from(payload).unwrap();
+ println!(
+ "[1m{}[0m has published an avatar, downloading...",
+ from.clone()
+ );
+ let iq = download_avatar(from.clone());
+ client.send_stanza(iq.into()).await.unwrap();
}
}
}
}
}
}
- // Nothing to do here.
- Stanza::Presence(_) => (),
}
+ // Nothing to do here.
+ Stanza::Presence(_) => (),
}
- } else {
- println!("stream_ended");
- stream_ended = true;
}
}
}
@@ -21,50 +21,41 @@ async fn main() {
// Client instance
let mut client = Client::new(jid, password.to_owned());
- client.set_reconnect(true);
// Main loop, processes events
- let mut wait_for_stream_end = false;
- let mut stream_ended = false;
- while !stream_ended {
- if let Some(event) = client.next().await {
- println!("event: {:?}", event);
- if wait_for_stream_end {
- /* Do nothing */
- } else if event.is_online() {
- let jid = event
- .get_jid()
- .map(|jid| format!("{}", jid))
- .unwrap_or("unknown".to_owned());
- println!("Online at {}", jid);
+ while let Some(event) = client.next().await {
+ println!("event: {:?}", event);
+ if event.is_online() {
+ let jid = event
+ .get_jid()
+ .map(|jid| format!("{}", jid))
+ .unwrap_or("unknown".to_owned());
+ println!("Online at {}", jid);
- let presence = make_presence();
- client.send_stanza(presence.into()).await.unwrap();
- } else if let Some(message) = event
- .into_stanza()
- .and_then(|stanza| Message::try_from(stanza).ok())
- {
- match (message.from, message.bodies.get("")) {
- (Some(ref from), Some(ref body)) if body.0 == "die" => {
- println!("Secret die command triggered by {}", from);
- wait_for_stream_end = true;
- client.send_end().await.unwrap();
- }
- (Some(ref from), Some(ref body)) => {
- if message.type_ != MessageType::Error {
- // This is a message we'll echo
- let reply = make_reply(from.clone(), &body.0);
- client.send_stanza(reply.into()).await.unwrap();
- }
+ let presence = make_presence();
+ client.send_stanza(presence.into()).await.unwrap();
+ } else if let Some(message) = event
+ .into_stanza()
+ .and_then(|stanza| Message::try_from(stanza).ok())
+ {
+ match (message.from, message.bodies.get("")) {
+ (Some(ref from), Some(ref body)) if body.0 == "die" => {
+ println!("Secret die command triggered by {}", from);
+ break;
+ }
+ (Some(ref from), Some(ref body)) => {
+ if message.type_ != MessageType::Error {
+ // This is a message we'll echo
+ let reply = make_reply(from.clone(), &body.0);
+ client.send_stanza(reply.into()).await.unwrap();
}
- _ => {}
}
+ _ => {}
}
- } else {
- println!("stream_ended");
- stream_ended = true;
}
}
+
+ client.send_end().await.unwrap();
}
// Construct a <presence/>
@@ -1,56 +0,0 @@
-use std::io;
-
-use futures::{SinkExt, StreamExt};
-use tokio::io::{AsyncBufRead, AsyncWrite};
-use xmpp_parsers::bind::{BindQuery, BindResponse};
-use xmpp_parsers::iq::{Iq, IqType};
-use xmpp_parsers::stream_features::StreamFeatures;
-
-use crate::error::{Error, ProtocolError};
-use crate::event::Stanza;
-use crate::jid::{FullJid, Jid};
-use crate::xmlstream::{ReadError, XmppStream, XmppStreamElement};
-
-const BIND_REQ_ID: &str = "resource-bind";
-
-pub async fn bind<S: AsyncBufRead + AsyncWrite + Unpin>(
- stream: &mut XmppStream<S>,
- features: &StreamFeatures,
- jid: &Jid,
-) -> Result<Option<FullJid>, Error> {
- if features.can_bind() {
- let resource = jid
- .resource()
- .and_then(|resource| Some(resource.to_string()));
- let iq = Iq::from_set(BIND_REQ_ID, BindQuery::new(resource));
- stream.send(&iq).await?;
-
- loop {
- match stream.next().await {
- Some(Ok(XmppStreamElement::Stanza(Stanza::Iq(iq)))) if iq.id == BIND_REQ_ID => {
- match iq.payload {
- IqType::Result(Some(payload)) => match BindResponse::try_from(payload) {
- Ok(v) => {
- return Ok(Some(v.into()));
- }
- Err(_) => return Err(ProtocolError::InvalidBindResponse.into()),
- },
- _ => return Err(ProtocolError::InvalidBindResponse.into()),
- }
- }
- Some(Ok(_)) => {}
- Some(Err(ReadError::SoftTimeout)) => {}
- Some(Err(ReadError::HardError(e))) => return Err(e.into()),
- Some(Err(ReadError::ParseError(e))) => {
- return Err(io::Error::new(io::ErrorKind::InvalidData, e).into())
- }
- Some(Err(ReadError::StreamFooterReceived)) | None => {
- return Err(Error::Disconnected)
- }
- }
- }
- } else {
- // No resource binding available, do nothing.
- return Ok(None);
- }
-}
@@ -9,14 +9,13 @@ use std::io;
use std::str::FromStr;
use tokio::io::{AsyncBufRead, AsyncWrite};
use xmpp_parsers::{
- jid::{FullJid, Jid},
+ jid::Jid,
ns,
sasl::{Auth, Mechanism as XMPPMechanism, Nonza, Response},
stream_features::{SaslMechanisms, StreamFeatures},
};
use crate::{
- client::bind::bind,
connect::ServerConnector,
error::{AuthError, Error, ProtocolError},
xmlstream::{
@@ -133,18 +132,3 @@ pub async fn client_auth<C: ServerConnector>(
.await?;
Ok(stream.recv_features().await?)
}
-
-/// Log into an XMPP server as a client with a jid+pass
-/// does channel binding if supported
-pub async fn client_login<C: ServerConnector>(
- server: C,
- jid: Jid,
- password: String,
- timeouts: Timeouts,
-) -> Result<(Option<FullJid>, StreamFeatures, XmppStream<C::Stream>), Error> {
- let (features, mut stream) = client_auth(server, jid.clone(), password, timeouts).await?;
-
- // XmppStream bound to user session
- let full_jid = bind(&mut stream, &features, &jid).await?;
- Ok((full_jid, features, stream))
-}
@@ -1,11 +1,18 @@
-use futures::sink::SinkExt;
+// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use std::io;
+
use xmpp_parsers::{jid::Jid, stream_features::StreamFeatures};
use crate::{
- client::{login::client_login, stream::ClientState},
connect::ServerConnector,
error::Error,
- xmlstream::{Timeouts, XmppStream, XmppStreamElement},
+ stanzastream::{StanzaStage, StanzaState, StanzaStream, StanzaToken},
+ xmlstream::Timeouts,
Stanza,
};
@@ -16,93 +23,68 @@ use crate::connect::StartTlsServerConnector;
#[cfg(feature = "insecure-tcp")]
use crate::connect::TcpServerConnector;
-mod bind;
pub(crate) mod login;
mod stream;
/// XMPP client connection and state
///
-/// It is able to reconnect. TODO: implement session management.
-///
/// This implements the `futures` crate's [`Stream`](#impl-Stream) and
/// [`Sink`](#impl-Sink<Packet>) traits.
-pub struct Client<C: ServerConnector> {
- jid: Jid,
- password: String,
- connector: C,
- state: ClientState<C::Stream>,
- timeouts: Timeouts,
- reconnect: bool,
- // TODO: tls_required=true
+pub struct Client {
+ stream: StanzaStream,
+ bound_jid: Option<Jid>,
+ features: Option<StreamFeatures>,
}
-impl<C: ServerConnector> Client<C> {
- /// Set whether to reconnect (`true`) or let the stream end
- /// (`false`) when a connection to the server has ended.
- pub fn set_reconnect(&mut self, reconnect: bool) -> &mut Self {
- self.reconnect = reconnect;
- self
- }
-
+impl Client {
/// Get the client's bound JID (the one reported by the XMPP
/// server).
pub fn bound_jid(&self) -> Option<&Jid> {
- match self.state {
- ClientState::Connected { ref bound_jid, .. } => Some(bound_jid),
- _ => None,
- }
+ self.bound_jid.as_ref()
}
/// Send stanza
- pub async fn send_stanza(&mut self, mut stanza: Stanza) -> Result<(), Error> {
+ pub async fn send_stanza(&mut self, mut stanza: Stanza) -> Result<StanzaToken, io::Error> {
stanza.ensure_id();
- self.send(stanza).await
+ let mut token = self.stream.send(Box::new(stanza)).await;
+ match token.wait_for(StanzaStage::Sent).await {
+ // Queued < Sent, so it cannot be reached.
+ Some(StanzaState::Queued) => unreachable!(),
+
+ None | Some(StanzaState::Dropped) => Err(io::Error::new(
+ io::ErrorKind::NotConnected,
+ "stream disconnected fatally before stanza could be sent",
+ )),
+ Some(StanzaState::Failed { error }) => Err(error.into_io_error()),
+ Some(StanzaState::Sent { .. }) | Some(StanzaState::Acked { .. }) => Ok(token),
+ }
}
/// Get the stream features (`<stream:features/>`) of the underlying stream
pub fn get_stream_features(&self) -> Option<&StreamFeatures> {
- match self.state {
- ClientState::Connected { ref features, .. } => Some(features),
- _ => None,
- }
+ self.features.as_ref()
}
/// End connection by sending `</stream:stream>`
///
/// You may expect the server to respond with the same. This
/// client will then drop its connection.
- ///
- /// Make sure to disable reconnect.
- pub async fn send_end(&mut self) -> Result<(), Error> {
- match self.state {
- ClientState::Connected { ref mut stream, .. } => {
- Ok(<XmppStream<C::Stream> as SinkExt<&XmppStreamElement>>::close(stream).await?)
- }
- ClientState::Connecting { .. } => {
- self.state = ClientState::Disconnected;
- Ok(())
- }
- _ => Ok(()),
- }
+ pub async fn send_end(self) -> Result<(), Error> {
+ self.stream.close().await;
+ Ok(())
}
}
#[cfg(feature = "starttls")]
-impl Client<StartTlsServerConnector> {
+impl Client {
/// Start a new XMPP client using StartTLS transport and autoreconnect
///
/// Start polling the returned instance so that it will connect
/// and yield events.
pub fn new<J: Into<Jid>, P: Into<String>>(jid: J, password: P) -> Self {
let jid = jid.into();
- let mut client = Self::new_starttls(
- jid.clone(),
- password,
- DnsConfig::srv(&jid.domain().to_string(), "_xmpp-client._tcp", 5222),
- Timeouts::default(),
- );
- client.set_reconnect(true);
- client
+ let dns_config = DnsConfig::srv(&jid.domain().to_string(), "_xmpp-client._tcp", 5222);
+ Self::new_starttls(jid, password, dns_config, Timeouts::default())
}
/// Start a new XMPP client with StartTLS transport and specific DNS config
@@ -122,7 +104,7 @@ impl Client<StartTlsServerConnector> {
}
#[cfg(feature = "insecure-tcp")]
-impl Client<TcpServerConnector> {
+impl Client {
/// Start a new XMPP client with plaintext insecure connection and specific DNS config
pub fn new_plaintext<J: Into<Jid>, P: Into<String>>(
jid: J,
@@ -139,31 +121,18 @@ impl Client<TcpServerConnector> {
}
}
-impl<C: ServerConnector> Client<C> {
+impl Client {
/// Start a new client given that the JID is already parsed.
- pub fn new_with_connector<J: Into<Jid>, P: Into<String>>(
+ pub fn new_with_connector<J: Into<Jid>, P: Into<String>, C: ServerConnector>(
jid: J,
password: P,
connector: C,
timeouts: Timeouts,
) -> Self {
- let jid = jid.into();
- let password = password.into();
-
- let connect = tokio::spawn(client_login(
- connector.clone(),
- jid.clone(),
- password.clone(),
- timeouts,
- ));
- let client = Client {
- jid,
- password,
- connector,
- state: ClientState::Connecting(connect),
- reconnect: false,
- timeouts,
- };
- client
+ Self {
+ stream: StanzaStream::new_c2s(connector, jid.into(), password.into(), timeouts, 16),
+ bound_jid: None,
+ features: None,
+ }
}
}
@@ -1,38 +1,24 @@
-use futures::{task::Poll, Future, Sink, Stream};
-use std::io;
-use std::mem::replace;
+// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use futures::{ready, task::Poll, Stream};
use std::pin::Pin;
use std::task::Context;
-use tokio::task::JoinHandle;
-use xmpp_parsers::{
- jid::{FullJid, Jid},
- stream_features::StreamFeatures,
-};
use crate::{
- client::{login::client_login, Client},
- connect::{AsyncReadAndWrite, ServerConnector},
- error::Error,
- xmlstream::{xmpp::XmppStreamElement, ReadError, XmppStream},
- Event, Stanza,
+ client::Client,
+ stanzastream::{Event as StanzaStreamEvent, StreamEvent},
+ Event,
};
-pub(crate) enum ClientState<S: AsyncReadAndWrite> {
- Invalid,
- Disconnected,
- Connecting(JoinHandle<Result<(Option<FullJid>, StreamFeatures, XmppStream<S>), Error>>),
- Connected {
- stream: XmppStream<S>,
- features: StreamFeatures,
- bound_jid: Jid,
- },
-}
-
/// Incoming XMPP events
///
/// In an `async fn` you may want to use this with `use
/// futures::stream::StreamExt;`
-impl<C: ServerConnector> Stream for Client<C> {
+impl Stream for Client {
type Item = Event;
/// Low-level read on the XMPP stream, allowing the underlying
@@ -46,177 +32,27 @@ impl<C: ServerConnector> Stream for Client<C> {
///
/// ...for your client
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
- let state = replace(&mut self.state, ClientState::Invalid);
-
- match state {
- ClientState::Invalid => panic!("Invalid client state"),
- ClientState::Disconnected if self.reconnect => {
- // TODO: add timeout
- let connect = tokio::spawn(client_login(
- self.connector.clone(),
- self.jid.clone(),
- self.password.clone(),
- self.timeouts,
- ));
- self.state = ClientState::Connecting(connect);
- self.poll_next(cx)
- }
- ClientState::Disconnected => {
- self.state = ClientState::Disconnected;
- Poll::Ready(None)
- }
- ClientState::Connecting(mut connect) => match Pin::new(&mut connect).poll(cx) {
- Poll::Ready(Ok(Ok((bound_jid, features, stream)))) => {
- let bound_jid = bound_jid.map(Jid::from).unwrap_or_else(|| self.jid.clone());
- self.state = ClientState::Connected {
- stream,
- bound_jid: bound_jid.clone(),
- features,
- };
- Poll::Ready(Some(Event::Online {
+ loop {
+ return Poll::Ready(match ready!(Pin::new(&mut self.stream).poll_next(cx)) {
+ None => None,
+ Some(StanzaStreamEvent::Stanza(st)) => Some(Event::Stanza(st)),
+ Some(StanzaStreamEvent::Stream(StreamEvent::Reset {
+ bound_jid,
+ features,
+ })) => {
+ self.features = Some(features);
+ self.bound_jid = Some(bound_jid.clone());
+ Some(Event::Online {
bound_jid,
resumed: false,
- }))
- }
- Poll::Ready(Ok(Err(e))) => {
- self.state = ClientState::Disconnected;
- return Poll::Ready(Some(Event::Disconnected(e.into())));
- }
- Poll::Ready(Err(e)) => {
- self.state = ClientState::Disconnected;
- panic!("connect task: {}", e);
+ })
}
- Poll::Pending => {
- self.state = ClientState::Connecting(connect);
- Poll::Pending
- }
- },
- ClientState::Connected {
- mut stream,
- features,
- bound_jid,
- } => {
- // Poll sink
- match <XmppStream<C::Stream> as Sink<&XmppStreamElement>>::poll_ready(
- Pin::new(&mut stream),
- cx,
- ) {
- Poll::Pending => (),
- Poll::Ready(Ok(())) => (),
- Poll::Ready(Err(e)) => {
- self.state = ClientState::Disconnected;
- return Poll::Ready(Some(Event::Disconnected(e.into())));
- }
- };
-
- // Poll stream
- //
- // This needs to be a loop in order to ignore packets we donβt care about, or those
- // we want to handle elsewhere. Returning something isnβt correct in those two
- // cases because it would signal to tokio that the XmppStream is also done, while
- // there could be additional packets waiting for us.
- //
- // The proper solution is thus a loop which we exit once we have something to
- // return.
- loop {
- match Pin::new(&mut stream).poll_next(cx) {
- Poll::Ready(None)
- | Poll::Ready(Some(Err(ReadError::StreamFooterReceived))) => {
- // EOF
- self.state = ClientState::Disconnected;
- return Poll::Ready(Some(Event::Disconnected(Error::Disconnected)));
- }
- Poll::Ready(Some(Err(ReadError::HardError(e)))) => {
- // Treat stream as dead on I/O errors
- self.state = ClientState::Disconnected;
- return Poll::Ready(Some(Event::Disconnected(e.into())));
- }
- Poll::Ready(Some(Err(ReadError::ParseError(e)))) => {
- // Treat stream as dead on parse errors, too (for now...)
- self.state = ClientState::Disconnected;
- return Poll::Ready(Some(Event::Disconnected(
- io::Error::new(io::ErrorKind::InvalidData, e).into(),
- )));
- }
- Poll::Ready(Some(Err(ReadError::SoftTimeout))) => {
- // TODO: do something smart about this.
- }
- Poll::Ready(Some(Ok(XmppStreamElement::Stanza(stanza)))) => {
- // Receive stanza
- self.state = ClientState::Connected {
- stream,
- features,
- bound_jid,
- };
- return Poll::Ready(Some(Event::Stanza(stanza)));
- }
- Poll::Ready(Some(Ok(_))) => {
- // We ignore these for now.
- }
- Poll::Pending => {
- // Try again later
- self.state = ClientState::Connected {
- stream,
- features,
- bound_jid,
- };
- return Poll::Pending;
- }
- }
- }
- }
- }
- }
-}
-
-/// Outgoing XMPP packets
-///
-/// See `send_stanza()` for an `async fn`
-impl<C: ServerConnector> Sink<Stanza> for Client<C> {
- type Error = Error;
-
- fn start_send(mut self: Pin<&mut Self>, item: Stanza) -> Result<(), Self::Error> {
- match self.state {
- ClientState::Connected { ref mut stream, .. } => {
- Pin::new(stream).start_send(&item).map_err(|e| e.into())
- }
- _ => Err(Error::InvalidState),
- }
- }
-
- fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
- match self.state {
- ClientState::Connected { ref mut stream, .. } => <XmppStream<C::Stream> as Sink<
- &XmppStreamElement,
- >>::poll_ready(
- Pin::new(stream), cx
- )
- .map_err(|e| e.into()),
- _ => Poll::Pending,
- }
- }
-
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
- match self.state {
- ClientState::Connected { ref mut stream, .. } => <XmppStream<C::Stream> as Sink<
- &XmppStreamElement,
- >>::poll_flush(
- Pin::new(stream), cx
- )
- .map_err(|e| e.into()),
- _ => Poll::Pending,
- }
- }
-
- fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
- match self.state {
- ClientState::Connected { ref mut stream, .. } => <XmppStream<C::Stream> as Sink<
- &XmppStreamElement,
- >>::poll_close(
- Pin::new(stream), cx
- )
- .map_err(|e| e.into()),
- _ => Poll::Pending,
+ Some(StanzaStreamEvent::Stream(StreamEvent::Resumed)) => Some(Event::Online {
+ bound_jid: self.bound_jid.as_ref().unwrap().clone(),
+ resumed: true,
+ }),
+ Some(StanzaStreamEvent::Stream(StreamEvent::Suspended)) => continue,
+ });
}
}
}
@@ -61,7 +61,8 @@ use crate::{
};
/// Client that connects over StartTls
-pub type StartTlsClient = Client<StartTlsServerConnector>;
+#[deprecated(since = "5.0.0", note = "use tokio_xmpp::Client instead")]
+pub type StartTlsClient = Client;
/// Connect via TCP+StartTLS to an XMPP server
#[derive(Debug, Clone)]
@@ -14,7 +14,8 @@ use crate::{
pub type TcpComponent = Component<TcpServerConnector>;
/// Client that connects over TCP
-pub type TcpClient = Client<TcpServerConnector>;
+#[deprecated(since = "5.0.0", note = "use tokio_xmpp::Client instead")]
+pub type TcpClient = Client;
/// Connect via insecure plaintext TCP to an XMPP server
/// This should only be used over localhost or otherwise when you know what you are doing
@@ -8,7 +8,6 @@ use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::RwLock;
-use tokio_xmpp::connect::ServerConnector;
pub use tokio_xmpp::parsers;
use tokio_xmpp::parsers::{disco::DiscoInfoResult, message::MessageType};
pub use tokio_xmpp::{
@@ -19,8 +18,8 @@ pub use tokio_xmpp::{
use crate::{event_loop, message, muc, upload, Error, Event, RoomNick};
-pub struct Agent<C: ServerConnector> {
- pub(crate) client: TokioXmppClient<C>,
+pub struct Agent {
+ pub(crate) client: TokioXmppClient,
pub(crate) default_nick: Arc<RwLock<String>>,
pub(crate) lang: Arc<Vec<String>>,
pub(crate) disco: DiscoInfoResult,
@@ -33,8 +32,8 @@ pub struct Agent<C: ServerConnector> {
pub(crate) rooms_leaving: HashMap<BareJid, String>,
}
-impl<C: ServerConnector> Agent<C> {
- pub async fn disconnect(&mut self) -> Result<(), Error> {
+impl Agent {
+ pub async fn disconnect(self) -> Result<(), Error> {
self.client.send_end().await
}
@@ -152,25 +152,24 @@ impl<C: ServerConnector> ClientBuilder<'_, C> {
}
}
- pub fn build(self) -> Agent<C> {
+ pub fn build(self) -> Agent {
let jid: Jid = if let Some(resource) = &self.resource {
self.jid.with_resource_str(resource).unwrap().into()
} else {
self.jid.clone().into()
};
- let mut client = TokioXmppClient::new_with_connector(
+ let client = TokioXmppClient::new_with_connector(
jid,
self.password,
self.server_connector.clone(),
self.timeouts,
);
- client.set_reconnect(true);
self.build_impl(client)
}
// This function is meant to be used for testing build
- pub(crate) fn build_impl(self, client: TokioXmppClient<C>) -> Agent<C> {
+ pub(crate) fn build_impl(self, client: TokioXmppClient) -> Agent {
let disco = self.make_disco();
let node = self.website;
@@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
jid::Jid,
parsers::{
@@ -19,11 +18,7 @@ use tokio_xmpp::{
use crate::Agent;
-pub async fn handle_disco_info_result<C: ServerConnector>(
- agent: &mut Agent<C>,
- disco: DiscoInfoResult,
- from: Jid,
-) {
+pub async fn handle_disco_info_result(agent: &mut Agent, disco: DiscoInfoResult, from: Jid) {
// Safe unwrap because no DISCO is received when we are not online
if from == agent.client.bound_jid().unwrap().to_bare() && agent.awaiting_disco_bookmarks_type {
info!("Received disco info about bookmarks type");
@@ -5,7 +5,6 @@
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use futures::StreamExt;
-use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
parsers::{disco::DiscoInfoQuery, iq::Iq, roster::Roster},
Event as TokioXmppEvent, Stanza,
@@ -14,7 +13,7 @@ use tokio_xmpp::{
use crate::{iq, message, presence, Agent, Event};
/// Wait for new events, or Error::Disconnected when stream is closed and will not reconnect.
-pub async fn wait_for_events<C: ServerConnector>(agent: &mut Agent<C>) -> Vec<Event> {
+pub async fn wait_for_events(agent: &mut Agent) -> Vec<Event> {
if let Some(event) = agent.client.next().await {
let mut events = Vec::new();
@@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
jid::Jid,
minidom::Element,
@@ -18,8 +17,8 @@ use tokio_xmpp::{
use crate::{Agent, Event};
-pub async fn handle_iq_get<C: ServerConnector>(
- agent: &mut Agent<C>,
+pub async fn handle_iq_get(
+ agent: &mut Agent,
_events: &mut Vec<Event>,
from: Jid,
_to: Option<Jid>,
@@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::parsers::iq::{Iq, IqType};
use crate::{Agent, Event};
@@ -13,7 +12,7 @@ pub mod get;
pub mod result;
pub mod set;
-pub async fn handle_iq<C: ServerConnector>(agent: &mut Agent<C>, iq: Iq) -> Vec<Event> {
+pub async fn handle_iq(agent: &mut Agent, iq: Iq) -> Vec<Event> {
let mut events = vec![];
let from = iq
.from
@@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
jid::Jid,
minidom::Element,
@@ -13,8 +12,8 @@ use tokio_xmpp::{
use crate::{disco, muc::room::JoinRoomSettings, pubsub, upload, Agent, Event};
-pub async fn handle_iq_result<C: ServerConnector>(
- agent: &mut Agent<C>,
+pub async fn handle_iq_result(
+ agent: &mut Agent,
events: &mut Vec<Event>,
from: Jid,
_to: Option<Jid>,
@@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
jid::Jid,
minidom::Element,
@@ -16,8 +15,8 @@ use tokio_xmpp::{
use crate::{Agent, Event};
-pub async fn handle_iq_set<C: ServerConnector>(
- agent: &mut Agent<C>,
+pub async fn handle_iq_set(
+ agent: &mut Agent,
_events: &mut Vec<Event>,
from: Jid,
_to: Option<Jid>,
@@ -38,6 +38,10 @@ pub type Error = tokio_xmpp::Error;
pub type Id = Option<String>;
pub type RoomNick = String;
+// The test below is dysfunctional since we have moved to StanzaStream. The
+// StanzaStream will attempt to connect to foo@bar indefinitely.
+// Keeping it here as inspiration for future integration tests.
+/*
#[cfg(all(test, any(feature = "starttls-rust", feature = "starttls-native")))]
mod tests {
use super::jid::BareJid;
@@ -74,3 +78,4 @@ mod tests {
}
}
}
+*/
@@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
jid::Jid,
parsers::{message::Message, muc::user::MucUser},
@@ -12,8 +11,8 @@ use tokio_xmpp::{
use crate::{delay::StanzaTimeInfo, Agent, Event};
-pub async fn handle_message_chat<C: ServerConnector>(
- agent: &mut Agent<C>,
+pub async fn handle_message_chat(
+ agent: &mut Agent,
events: &mut Vec<Event>,
from: Jid,
message: &Message,
@@ -4,13 +4,12 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{jid::Jid, parsers::message::Message};
use crate::{delay::StanzaTimeInfo, Agent, Event};
-pub async fn handle_message_group_chat<C: ServerConnector>(
- agent: &mut Agent<C>,
+pub async fn handle_message_group_chat(
+ agent: &mut Agent,
events: &mut Vec<Event>,
from: Jid,
message: &Message,
@@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::parsers::{
message::{Message, MessageType},
ns,
@@ -15,10 +14,7 @@ use crate::{delay::message_time_info, pubsub, Agent, Event};
pub mod chat;
pub mod group_chat;
-pub async fn handle_message<C: ServerConnector>(
- agent: &mut Agent<C>,
- message: Message,
-) -> Vec<Event> {
+pub async fn handle_message(agent: &mut Agent, message: Message) -> Vec<Event> {
let mut events = vec![];
let from = message.from.clone().unwrap();
let time_info = message_time_info(&message);
@@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
jid::Jid,
parsers::message::{Body, Message, MessageType},
@@ -12,8 +11,8 @@ use tokio_xmpp::{
use crate::Agent;
-pub async fn send_message<C: ServerConnector>(
- agent: &mut Agent<C>,
+pub async fn send_message(
+ agent: &mut Agent,
recipient: Jid,
type_: MessageType,
lang: &str,
@@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
jid::{BareJid, Jid},
parsers::{
@@ -15,8 +14,8 @@ use tokio_xmpp::{
use crate::{Agent, RoomNick};
-pub async fn send_room_private_message<C: ServerConnector>(
- agent: &mut Agent<C>,
+pub async fn send_room_private_message(
+ agent: &mut Agent,
room: BareJid,
recipient: RoomNick,
lang: &str,
@@ -5,7 +5,6 @@
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use crate::parsers::message::MessageType;
-use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
jid::BareJid,
parsers::{
@@ -51,10 +50,7 @@ impl<'a> JoinRoomSettings<'a> {
}
/// TODO: this method should add bookmark and ensure autojoin is true
-pub async fn join_room<'a, C: ServerConnector>(
- agent: &mut Agent<C>,
- settings: JoinRoomSettings<'a>,
-) {
+pub async fn join_room<'a>(agent: &mut Agent, settings: JoinRoomSettings<'a>) {
let JoinRoomSettings {
room,
nick,
@@ -122,10 +118,7 @@ impl<'a> LeaveRoomSettings<'a> {
/// If successful, a `RoomLeft` event should be received later as a confirmation. See [XEP-0045](https://xmpp.org/extensions/xep-0045.html#exit).
///
/// TODO: this method should set autojoin false on bookmark
-pub async fn leave_room<'a, C: ServerConnector>(
- agent: &mut Agent<C>,
- settings: LeaveRoomSettings<'a>,
-) {
+pub async fn leave_room<'a>(agent: &mut Agent, settings: LeaveRoomSettings<'a>) {
let LeaveRoomSettings { room, status } = settings;
if agent.rooms_leaving.contains_key(&room) {
@@ -188,10 +181,7 @@ impl<'a> RoomMessageSettings<'a> {
}
}
-pub async fn send_room_message<'a, C: ServerConnector>(
- agent: &mut Agent<C>,
- settings: RoomMessageSettings<'a>,
-) {
+pub async fn send_room_message<'a>(agent: &mut Agent, settings: RoomMessageSettings<'a>) {
let RoomMessageSettings {
room,
message,
@@ -4,7 +4,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::parsers::{
muc::user::{MucUser, Status},
presence::{Presence, Type as PresenceType},
@@ -13,10 +12,7 @@ use tokio_xmpp::parsers::{
use crate::{Agent, Event};
/// Translate a `Presence` stanza into a list of higher-level `Event`s.
-pub async fn handle_presence<C: ServerConnector>(
- agent: &mut Agent<C>,
- presence: Presence,
-) -> Vec<Event> {
+pub async fn handle_presence(agent: &mut Agent, presence: Presence) -> Vec<Event> {
// Allocate an empty vector to store the events.
let mut events = vec![];
@@ -8,7 +8,6 @@ use super::Agent;
use crate::Event;
use std::fs::{self, File};
use std::io::{self, Write};
-use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::parsers::{
avatar::{Data, Metadata},
iq::Iq,
@@ -21,9 +20,9 @@ use tokio_xmpp::parsers::{
},
};
-pub(crate) async fn handle_metadata_pubsub_event<C: ServerConnector>(
+pub(crate) async fn handle_metadata_pubsub_event(
from: &Jid,
- agent: &mut Agent<C>,
+ agent: &mut Agent,
items: Vec<Item>,
) -> Vec<Event> {
let mut events = Vec::new();
@@ -11,7 +11,6 @@ use crate::{
};
use std::str::FromStr;
use tokio_xmpp::{
- connect::ServerConnector,
jid::{BareJid, Jid},
minidom::Element,
parsers::{
@@ -23,10 +22,10 @@ use tokio_xmpp::{
#[cfg(feature = "avatars")]
pub(crate) mod avatar;
-pub(crate) async fn handle_event<C: ServerConnector>(
+pub(crate) async fn handle_event(
#[cfg_attr(not(feature = "avatars"), allow(unused_variables))] from: &Jid,
elem: Element,
- #[cfg_attr(not(feature = "avatars"), allow(unused_variables))] agent: &mut Agent<C>,
+ #[cfg_attr(not(feature = "avatars"), allow(unused_variables))] agent: &mut Agent,
) -> Vec<Event> {
// We allow the useless mut warning for no-default-features,
// since for now only avatars pushes events here.
@@ -101,10 +100,10 @@ pub(crate) async fn handle_event<C: ServerConnector>(
events
}
-pub(crate) async fn handle_iq_result<C: ServerConnector>(
+pub(crate) async fn handle_iq_result(
#[cfg_attr(not(feature = "avatars"), allow(unused_variables))] from: &Jid,
elem: Element,
- agent: &mut Agent<C>,
+ agent: &mut Agent,
) -> impl IntoIterator<Item = Event> {
// We allow the useless mut warning for no-default-features,
// since for now only avatars pushes events here.
@@ -10,7 +10,6 @@ use reqwest::{
use std::path::PathBuf;
use tokio::fs::File;
use tokio_util::codec::{BytesCodec, FramedRead};
-use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
jid::Jid,
minidom::Element,
@@ -19,11 +18,11 @@ use tokio_xmpp::{
use crate::{Agent, Event};
-pub async fn handle_upload_result<C: ServerConnector>(
+pub async fn handle_upload_result(
from: &Jid,
iqid: String,
elem: Element,
- agent: &mut Agent<C>,
+ agent: &mut Agent,
) -> impl IntoIterator<Item = Event> {
let mut res: Option<(usize, PathBuf)> = None;
@@ -6,7 +6,6 @@
use std::path::Path;
use tokio::fs::File;
-use tokio_xmpp::connect::ServerConnector;
use tokio_xmpp::{
jid::Jid,
parsers::{http_upload::SlotRequest, iq::Iq},
@@ -14,11 +13,7 @@ use tokio_xmpp::{
use crate::Agent;
-pub async fn upload_file_with<C: ServerConnector>(
- agent: &mut Agent<C>,
- service: &str,
- path: &Path,
-) {
+pub async fn upload_file_with(agent: &mut Agent, service: &str, path: &Path) {
let name = path.file_name().unwrap().to_str().unwrap().to_string();
let file = File::open(path).await.unwrap();
let size = file.metadata().await.unwrap().len();