From 35ce86243f87fe69d9d7e32627473fecf5ae3d66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Sch=C3=A4fer?= Date: Tue, 20 Aug 2024 16:54:48 +0200 Subject: [PATCH] client: use stanzastream! --- tokio-xmpp/examples/contact_addr.rs | 42 ++--- tokio-xmpp/examples/download_avatars.rs | 154 ++++++++-------- tokio-xmpp/examples/echo_bot.rs | 63 +++---- tokio-xmpp/src/client/bind.rs | 56 ------ tokio-xmpp/src/client/login.rs | 18 +- tokio-xmpp/src/client/mod.rs | 119 +++++-------- tokio-xmpp/src/client/stream.rs | 222 ++++-------------------- tokio-xmpp/src/connect/starttls.rs | 3 +- tokio-xmpp/src/connect/tcp.rs | 3 +- xmpp/src/agent.rs | 9 +- xmpp/src/builder.rs | 7 +- xmpp/src/disco/mod.rs | 7 +- xmpp/src/event_loop.rs | 3 +- xmpp/src/iq/get.rs | 5 +- xmpp/src/iq/mod.rs | 3 +- xmpp/src/iq/result.rs | 5 +- xmpp/src/iq/set.rs | 5 +- xmpp/src/lib.rs | 5 + xmpp/src/message/receive/chat.rs | 5 +- xmpp/src/message/receive/group_chat.rs | 5 +- xmpp/src/message/receive/mod.rs | 6 +- xmpp/src/message/send.rs | 5 +- xmpp/src/muc/private_message.rs | 5 +- xmpp/src/muc/room.rs | 16 +- xmpp/src/presence/receive.rs | 6 +- xmpp/src/pubsub/avatar.rs | 5 +- xmpp/src/pubsub/mod.rs | 9 +- xmpp/src/upload/receive.rs | 5 +- xmpp/src/upload/send.rs | 7 +- 29 files changed, 238 insertions(+), 565 deletions(-) delete mode 100644 tokio-xmpp/src/client/bind.rs diff --git a/tokio-xmpp/examples/contact_addr.rs b/tokio-xmpp/examples/contact_addr.rs index d9a14484cec58323640dc82b89426a4a5c3fd9b6..343b90d9408b7823226e8add3a4529a480d88a09 100644 --- a/tokio-xmpp/examples/contact_addr.rs +++ b/tokio-xmpp/examples/contact_addr.rs @@ -28,39 +28,31 @@ async fn main() { let mut client = Client::new(jid, password); // Main loop, processes events - let mut wait_for_stream_end = false; - let mut stream_ended = false; - while !stream_ended { - if let Some(event) = client.next().await { - if wait_for_stream_end { - /* Do Nothing. */ - } else if event.is_online() { - println!("Online!"); + while let Some(event) = client.next().await { + if event.is_online() { + println!("Online!"); - let target_jid: Jid = target.clone().parse().unwrap(); - let iq = make_disco_iq(target_jid); - println!("Sending disco#info request to {}", target.clone()); - println!(">> {:?}", iq); - client.send_stanza(iq.into()).await.unwrap(); - } else if let Some(Stanza::Iq(iq)) = event.into_stanza() { - if let IqType::Result(Some(payload)) = iq.payload { - if payload.is("query", ns::DISCO_INFO) { - if let Ok(disco_info) = DiscoInfoResult::try_from(payload) { - for ext in disco_info.extensions { - if let Ok(server_info) = ServerInfo::try_from(ext) { - print_server_info(server_info); - } + let target_jid: Jid = target.clone().parse().unwrap(); + let iq = make_disco_iq(target_jid); + println!("Sending disco#info request to {}", target.clone()); + println!(">> {:?}", iq); + client.send_stanza(iq.into()).await.unwrap(); + } else if let Some(Stanza::Iq(iq)) = event.into_stanza() { + if let IqType::Result(Some(payload)) = iq.payload { + if payload.is("query", ns::DISCO_INFO) { + if let Ok(disco_info) = DiscoInfoResult::try_from(payload) { + for ext in disco_info.extensions { + if let Ok(server_info) = ServerInfo::try_from(ext) { + print_server_info(server_info); } } } - wait_for_stream_end = true; - client.send_end().await.unwrap(); } + break; } - } else { - stream_ended = true; } } + client.send_end().await.expect("Stream shutdown unclean"); } fn make_disco_iq(target: Jid) -> Iq { diff --git a/tokio-xmpp/examples/download_avatars.rs b/tokio-xmpp/examples/download_avatars.rs index 7192639fa47a26263b9f912373e4a922a702c865..bedae361fdd508d51172d73c179483030c100f82 100644 --- a/tokio-xmpp/examples/download_avatars.rs +++ b/tokio-xmpp/examples/download_avatars.rs @@ -40,33 +40,29 @@ async fn main() { let disco_info = make_disco(); // Main loop, processes events - let mut wait_for_stream_end = false; - let mut stream_ended = false; - while !stream_ended { - if let Some(event) = client.next().await { - if wait_for_stream_end { - /* Do nothing */ - } else if event.is_online() { - println!("Online!"); + while let Some(event) = client.next().await { + if event.is_online() { + println!("Online!"); - let caps = get_disco_caps(&disco_info, "https://gitlab.com/xmpp-rs/tokio-xmpp"); - let presence = make_presence(caps); - client.send_stanza(presence.into()).await.unwrap(); - } else if let Some(stanza) = event.into_stanza() { - match stanza { - Stanza::Iq(iq) => { - if let IqType::Get(payload) = iq.payload { - if payload.is("query", ns::DISCO_INFO) { - let query = DiscoInfoQuery::try_from(payload); - match query { - Ok(query) => { - let mut disco = disco_info.clone(); - disco.node = query.node; - let iq = Iq::from_result(iq.id, Some(disco)) - .with_to(iq.from.unwrap()); - client.send_stanza(iq.into()).await.unwrap(); - } - Err(err) => client + let caps = get_disco_caps(&disco_info, "https://gitlab.com/xmpp-rs/tokio-xmpp"); + let presence = make_presence(caps); + client.send_stanza(presence.into()).await.unwrap(); + } else if let Some(stanza) = event.into_stanza() { + match stanza { + Stanza::Iq(iq) => { + if let IqType::Get(payload) = iq.payload { + if payload.is("query", ns::DISCO_INFO) { + let query = DiscoInfoQuery::try_from(payload); + match query { + Ok(query) => { + let mut disco = disco_info.clone(); + disco.node = query.node; + let iq = Iq::from_result(iq.id, Some(disco)) + .with_to(iq.from.unwrap()); + client.send_stanza(iq.into()).await.unwrap(); + } + Err(err) => { + client .send_stanza( make_error( iq.from.unwrap(), @@ -78,32 +74,11 @@ async fn main() { .into(), ) .await - .unwrap(), + .unwrap(); } - } else { - // We MUST answer unhandled get iqs with a service-unavailable error. - client - .send_stanza( - make_error( - iq.from.unwrap(), - iq.id, - ErrorType::Cancel, - DefinedCondition::ServiceUnavailable, - "No handler defined for this kind of iq.", - ) - .into(), - ) - .await - .unwrap(); - } - } else if let IqType::Result(Some(payload)) = iq.payload { - if payload.is("pubsub", ns::PUBSUB) { - let pubsub = PubSub::try_from(payload).unwrap(); - let from = iq.from.clone().unwrap_or(jid.clone().into()); - handle_iq_result(pubsub, &from); } - } else if let IqType::Set(_) = iq.payload { - // We MUST answer unhandled set iqs with a service-unavailable error. + } else { + // We MUST answer unhandled get iqs with a service-unavailable error. client .send_stanza( make_error( @@ -118,47 +93,64 @@ async fn main() { .await .unwrap(); } + } else if let IqType::Result(Some(payload)) = iq.payload { + if payload.is("pubsub", ns::PUBSUB) { + let pubsub = PubSub::try_from(payload).unwrap(); + let from = iq.from.clone().unwrap_or(jid.clone().into()); + handle_iq_result(pubsub, &from); + } + } else if let IqType::Set(_) = iq.payload { + // We MUST answer unhandled set iqs with a service-unavailable error. + client + .send_stanza( + make_error( + iq.from.unwrap(), + iq.id, + ErrorType::Cancel, + DefinedCondition::ServiceUnavailable, + "No handler defined for this kind of iq.", + ) + .into(), + ) + .await + .unwrap(); } - Stanza::Message(message) => { - let from = message.from.clone().unwrap(); - if let Some(body) = message.get_best_body(vec!["en"]) { - if body.0 == "die" { - println!("Secret die command triggered by {}", from); - wait_for_stream_end = true; - client.send_end().await.unwrap(); - } + } + Stanza::Message(message) => { + let from = message.from.clone().unwrap(); + if let Some(body) = message.get_best_body(vec!["en"]) { + if body.0 == "die" { + println!("Secret die command triggered by {}", from); + break; } - for child in message.payloads { - if child.is("event", ns::PUBSUB_EVENT) { - let event = PubSubEvent::try_from(child).unwrap(); - if let PubSubEvent::PublishedItems { node, items } = event { - if node.0 == ns::AVATAR_METADATA { - for item in items.into_iter() { - let payload = item.payload.clone().unwrap(); - if payload.is("metadata", ns::AVATAR_METADATA) { - // TODO: do something with these metadata. - let _metadata = - AvatarMetadata::try_from(payload).unwrap(); - println!( - "{} has published an avatar, downloading...", - from.clone() - ); - let iq = download_avatar(from.clone()); - client.send_stanza(iq.into()).await.unwrap(); - } + } + for child in message.payloads { + if child.is("event", ns::PUBSUB_EVENT) { + let event = PubSubEvent::try_from(child).unwrap(); + if let PubSubEvent::PublishedItems { node, items } = event { + if node.0 == ns::AVATAR_METADATA { + for item in items.into_iter() { + let payload = item.payload.clone().unwrap(); + if payload.is("metadata", ns::AVATAR_METADATA) { + // TODO: do something with these metadata. + let _metadata = + AvatarMetadata::try_from(payload).unwrap(); + println!( + "{} has published an avatar, downloading...", + from.clone() + ); + let iq = download_avatar(from.clone()); + client.send_stanza(iq.into()).await.unwrap(); } } } } } } - // Nothing to do here. - Stanza::Presence(_) => (), } + // Nothing to do here. + Stanza::Presence(_) => (), } - } else { - println!("stream_ended"); - stream_ended = true; } } } diff --git a/tokio-xmpp/examples/echo_bot.rs b/tokio-xmpp/examples/echo_bot.rs index 055ff8f770153b72f37137203b9afe724c443000..204d2d01d76704250aeef3c6167509df5d5afd7c 100644 --- a/tokio-xmpp/examples/echo_bot.rs +++ b/tokio-xmpp/examples/echo_bot.rs @@ -21,50 +21,41 @@ async fn main() { // Client instance let mut client = Client::new(jid, password.to_owned()); - client.set_reconnect(true); // Main loop, processes events - let mut wait_for_stream_end = false; - let mut stream_ended = false; - while !stream_ended { - if let Some(event) = client.next().await { - println!("event: {:?}", event); - if wait_for_stream_end { - /* Do nothing */ - } else if event.is_online() { - let jid = event - .get_jid() - .map(|jid| format!("{}", jid)) - .unwrap_or("unknown".to_owned()); - println!("Online at {}", jid); + while let Some(event) = client.next().await { + println!("event: {:?}", event); + if event.is_online() { + let jid = event + .get_jid() + .map(|jid| format!("{}", jid)) + .unwrap_or("unknown".to_owned()); + println!("Online at {}", jid); - let presence = make_presence(); - client.send_stanza(presence.into()).await.unwrap(); - } else if let Some(message) = event - .into_stanza() - .and_then(|stanza| Message::try_from(stanza).ok()) - { - match (message.from, message.bodies.get("")) { - (Some(ref from), Some(ref body)) if body.0 == "die" => { - println!("Secret die command triggered by {}", from); - wait_for_stream_end = true; - client.send_end().await.unwrap(); - } - (Some(ref from), Some(ref body)) => { - if message.type_ != MessageType::Error { - // This is a message we'll echo - let reply = make_reply(from.clone(), &body.0); - client.send_stanza(reply.into()).await.unwrap(); - } + let presence = make_presence(); + client.send_stanza(presence.into()).await.unwrap(); + } else if let Some(message) = event + .into_stanza() + .and_then(|stanza| Message::try_from(stanza).ok()) + { + match (message.from, message.bodies.get("")) { + (Some(ref from), Some(ref body)) if body.0 == "die" => { + println!("Secret die command triggered by {}", from); + break; + } + (Some(ref from), Some(ref body)) => { + if message.type_ != MessageType::Error { + // This is a message we'll echo + let reply = make_reply(from.clone(), &body.0); + client.send_stanza(reply.into()).await.unwrap(); } - _ => {} } + _ => {} } - } else { - println!("stream_ended"); - stream_ended = true; } } + + client.send_end().await.unwrap(); } // Construct a diff --git a/tokio-xmpp/src/client/bind.rs b/tokio-xmpp/src/client/bind.rs deleted file mode 100644 index 7cacab130e1406476b20eae12d27f8637175ff5c..0000000000000000000000000000000000000000 --- a/tokio-xmpp/src/client/bind.rs +++ /dev/null @@ -1,56 +0,0 @@ -use std::io; - -use futures::{SinkExt, StreamExt}; -use tokio::io::{AsyncBufRead, AsyncWrite}; -use xmpp_parsers::bind::{BindQuery, BindResponse}; -use xmpp_parsers::iq::{Iq, IqType}; -use xmpp_parsers::stream_features::StreamFeatures; - -use crate::error::{Error, ProtocolError}; -use crate::event::Stanza; -use crate::jid::{FullJid, Jid}; -use crate::xmlstream::{ReadError, XmppStream, XmppStreamElement}; - -const BIND_REQ_ID: &str = "resource-bind"; - -pub async fn bind( - stream: &mut XmppStream, - features: &StreamFeatures, - jid: &Jid, -) -> Result, Error> { - if features.can_bind() { - let resource = jid - .resource() - .and_then(|resource| Some(resource.to_string())); - let iq = Iq::from_set(BIND_REQ_ID, BindQuery::new(resource)); - stream.send(&iq).await?; - - loop { - match stream.next().await { - Some(Ok(XmppStreamElement::Stanza(Stanza::Iq(iq)))) if iq.id == BIND_REQ_ID => { - match iq.payload { - IqType::Result(Some(payload)) => match BindResponse::try_from(payload) { - Ok(v) => { - return Ok(Some(v.into())); - } - Err(_) => return Err(ProtocolError::InvalidBindResponse.into()), - }, - _ => return Err(ProtocolError::InvalidBindResponse.into()), - } - } - Some(Ok(_)) => {} - Some(Err(ReadError::SoftTimeout)) => {} - Some(Err(ReadError::HardError(e))) => return Err(e.into()), - Some(Err(ReadError::ParseError(e))) => { - return Err(io::Error::new(io::ErrorKind::InvalidData, e).into()) - } - Some(Err(ReadError::StreamFooterReceived)) | None => { - return Err(Error::Disconnected) - } - } - } - } else { - // No resource binding available, do nothing. - return Ok(None); - } -} diff --git a/tokio-xmpp/src/client/login.rs b/tokio-xmpp/src/client/login.rs index 48193f5101f9b033b18441eebe8f0f03239eb202..d232de4dcf3306c7ac3a96cf1a1321fe6690bc48 100644 --- a/tokio-xmpp/src/client/login.rs +++ b/tokio-xmpp/src/client/login.rs @@ -9,14 +9,13 @@ use std::io; use std::str::FromStr; use tokio::io::{AsyncBufRead, AsyncWrite}; use xmpp_parsers::{ - jid::{FullJid, Jid}, + jid::Jid, ns, sasl::{Auth, Mechanism as XMPPMechanism, Nonza, Response}, stream_features::{SaslMechanisms, StreamFeatures}, }; use crate::{ - client::bind::bind, connect::ServerConnector, error::{AuthError, Error, ProtocolError}, xmlstream::{ @@ -133,18 +132,3 @@ pub async fn client_auth( .await?; Ok(stream.recv_features().await?) } - -/// Log into an XMPP server as a client with a jid+pass -/// does channel binding if supported -pub async fn client_login( - server: C, - jid: Jid, - password: String, - timeouts: Timeouts, -) -> Result<(Option, StreamFeatures, XmppStream), Error> { - let (features, mut stream) = client_auth(server, jid.clone(), password, timeouts).await?; - - // XmppStream bound to user session - let full_jid = bind(&mut stream, &features, &jid).await?; - Ok((full_jid, features, stream)) -} diff --git a/tokio-xmpp/src/client/mod.rs b/tokio-xmpp/src/client/mod.rs index dbd784bcb831d82791c9ddca378fe5a2739e703e..3d84c030fb2eb7ae4f2d521a2ca970fe8866fa42 100644 --- a/tokio-xmpp/src/client/mod.rs +++ b/tokio-xmpp/src/client/mod.rs @@ -1,11 +1,18 @@ -use futures::sink::SinkExt; +// Copyright (c) 2019 Emmanuel Gil Peyrot +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use std::io; + use xmpp_parsers::{jid::Jid, stream_features::StreamFeatures}; use crate::{ - client::{login::client_login, stream::ClientState}, connect::ServerConnector, error::Error, - xmlstream::{Timeouts, XmppStream, XmppStreamElement}, + stanzastream::{StanzaStage, StanzaState, StanzaStream, StanzaToken}, + xmlstream::Timeouts, Stanza, }; @@ -16,93 +23,68 @@ use crate::connect::StartTlsServerConnector; #[cfg(feature = "insecure-tcp")] use crate::connect::TcpServerConnector; -mod bind; pub(crate) mod login; mod stream; /// XMPP client connection and state /// -/// It is able to reconnect. TODO: implement session management. -/// /// This implements the `futures` crate's [`Stream`](#impl-Stream) and /// [`Sink`](#impl-Sink) traits. -pub struct Client { - jid: Jid, - password: String, - connector: C, - state: ClientState, - timeouts: Timeouts, - reconnect: bool, - // TODO: tls_required=true +pub struct Client { + stream: StanzaStream, + bound_jid: Option, + features: Option, } -impl Client { - /// Set whether to reconnect (`true`) or let the stream end - /// (`false`) when a connection to the server has ended. - pub fn set_reconnect(&mut self, reconnect: bool) -> &mut Self { - self.reconnect = reconnect; - self - } - +impl Client { /// Get the client's bound JID (the one reported by the XMPP /// server). pub fn bound_jid(&self) -> Option<&Jid> { - match self.state { - ClientState::Connected { ref bound_jid, .. } => Some(bound_jid), - _ => None, - } + self.bound_jid.as_ref() } /// Send stanza - pub async fn send_stanza(&mut self, mut stanza: Stanza) -> Result<(), Error> { + pub async fn send_stanza(&mut self, mut stanza: Stanza) -> Result { stanza.ensure_id(); - self.send(stanza).await + let mut token = self.stream.send(Box::new(stanza)).await; + match token.wait_for(StanzaStage::Sent).await { + // Queued < Sent, so it cannot be reached. + Some(StanzaState::Queued) => unreachable!(), + + None | Some(StanzaState::Dropped) => Err(io::Error::new( + io::ErrorKind::NotConnected, + "stream disconnected fatally before stanza could be sent", + )), + Some(StanzaState::Failed { error }) => Err(error.into_io_error()), + Some(StanzaState::Sent { .. }) | Some(StanzaState::Acked { .. }) => Ok(token), + } } /// Get the stream features (``) of the underlying stream pub fn get_stream_features(&self) -> Option<&StreamFeatures> { - match self.state { - ClientState::Connected { ref features, .. } => Some(features), - _ => None, - } + self.features.as_ref() } /// End connection by sending `` /// /// You may expect the server to respond with the same. This /// client will then drop its connection. - /// - /// Make sure to disable reconnect. - pub async fn send_end(&mut self) -> Result<(), Error> { - match self.state { - ClientState::Connected { ref mut stream, .. } => { - Ok( as SinkExt<&XmppStreamElement>>::close(stream).await?) - } - ClientState::Connecting { .. } => { - self.state = ClientState::Disconnected; - Ok(()) - } - _ => Ok(()), - } + pub async fn send_end(self) -> Result<(), Error> { + self.stream.close().await; + Ok(()) } } #[cfg(feature = "starttls")] -impl Client { +impl Client { /// Start a new XMPP client using StartTLS transport and autoreconnect /// /// Start polling the returned instance so that it will connect /// and yield events. pub fn new, P: Into>(jid: J, password: P) -> Self { let jid = jid.into(); - let mut client = Self::new_starttls( - jid.clone(), - password, - DnsConfig::srv(&jid.domain().to_string(), "_xmpp-client._tcp", 5222), - Timeouts::default(), - ); - client.set_reconnect(true); - client + let dns_config = DnsConfig::srv(&jid.domain().to_string(), "_xmpp-client._tcp", 5222); + Self::new_starttls(jid, password, dns_config, Timeouts::default()) } /// Start a new XMPP client with StartTLS transport and specific DNS config @@ -122,7 +104,7 @@ impl Client { } #[cfg(feature = "insecure-tcp")] -impl Client { +impl Client { /// Start a new XMPP client with plaintext insecure connection and specific DNS config pub fn new_plaintext, P: Into>( jid: J, @@ -139,31 +121,18 @@ impl Client { } } -impl Client { +impl Client { /// Start a new client given that the JID is already parsed. - pub fn new_with_connector, P: Into>( + pub fn new_with_connector, P: Into, C: ServerConnector>( jid: J, password: P, connector: C, timeouts: Timeouts, ) -> Self { - let jid = jid.into(); - let password = password.into(); - - let connect = tokio::spawn(client_login( - connector.clone(), - jid.clone(), - password.clone(), - timeouts, - )); - let client = Client { - jid, - password, - connector, - state: ClientState::Connecting(connect), - reconnect: false, - timeouts, - }; - client + Self { + stream: StanzaStream::new_c2s(connector, jid.into(), password.into(), timeouts, 16), + bound_jid: None, + features: None, + } } } diff --git a/tokio-xmpp/src/client/stream.rs b/tokio-xmpp/src/client/stream.rs index faaa46e78b17a5061f1264d1bd3501b2171fbbf0..54ff8c55837d6a9f6b41d2832e0fd17076e2c1c5 100644 --- a/tokio-xmpp/src/client/stream.rs +++ b/tokio-xmpp/src/client/stream.rs @@ -1,38 +1,24 @@ -use futures::{task::Poll, Future, Sink, Stream}; -use std::io; -use std::mem::replace; +// Copyright (c) 2019 Emmanuel Gil Peyrot +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use futures::{ready, task::Poll, Stream}; use std::pin::Pin; use std::task::Context; -use tokio::task::JoinHandle; -use xmpp_parsers::{ - jid::{FullJid, Jid}, - stream_features::StreamFeatures, -}; use crate::{ - client::{login::client_login, Client}, - connect::{AsyncReadAndWrite, ServerConnector}, - error::Error, - xmlstream::{xmpp::XmppStreamElement, ReadError, XmppStream}, - Event, Stanza, + client::Client, + stanzastream::{Event as StanzaStreamEvent, StreamEvent}, + Event, }; -pub(crate) enum ClientState { - Invalid, - Disconnected, - Connecting(JoinHandle, StreamFeatures, XmppStream), Error>>), - Connected { - stream: XmppStream, - features: StreamFeatures, - bound_jid: Jid, - }, -} - /// Incoming XMPP events /// /// In an `async fn` you may want to use this with `use /// futures::stream::StreamExt;` -impl Stream for Client { +impl Stream for Client { type Item = Event; /// Low-level read on the XMPP stream, allowing the underlying @@ -46,177 +32,27 @@ impl Stream for Client { /// /// ...for your client fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let state = replace(&mut self.state, ClientState::Invalid); - - match state { - ClientState::Invalid => panic!("Invalid client state"), - ClientState::Disconnected if self.reconnect => { - // TODO: add timeout - let connect = tokio::spawn(client_login( - self.connector.clone(), - self.jid.clone(), - self.password.clone(), - self.timeouts, - )); - self.state = ClientState::Connecting(connect); - self.poll_next(cx) - } - ClientState::Disconnected => { - self.state = ClientState::Disconnected; - Poll::Ready(None) - } - ClientState::Connecting(mut connect) => match Pin::new(&mut connect).poll(cx) { - Poll::Ready(Ok(Ok((bound_jid, features, stream)))) => { - let bound_jid = bound_jid.map(Jid::from).unwrap_or_else(|| self.jid.clone()); - self.state = ClientState::Connected { - stream, - bound_jid: bound_jid.clone(), - features, - }; - Poll::Ready(Some(Event::Online { + loop { + return Poll::Ready(match ready!(Pin::new(&mut self.stream).poll_next(cx)) { + None => None, + Some(StanzaStreamEvent::Stanza(st)) => Some(Event::Stanza(st)), + Some(StanzaStreamEvent::Stream(StreamEvent::Reset { + bound_jid, + features, + })) => { + self.features = Some(features); + self.bound_jid = Some(bound_jid.clone()); + Some(Event::Online { bound_jid, resumed: false, - })) - } - Poll::Ready(Ok(Err(e))) => { - self.state = ClientState::Disconnected; - return Poll::Ready(Some(Event::Disconnected(e.into()))); - } - Poll::Ready(Err(e)) => { - self.state = ClientState::Disconnected; - panic!("connect task: {}", e); + }) } - Poll::Pending => { - self.state = ClientState::Connecting(connect); - Poll::Pending - } - }, - ClientState::Connected { - mut stream, - features, - bound_jid, - } => { - // Poll sink - match as Sink<&XmppStreamElement>>::poll_ready( - Pin::new(&mut stream), - cx, - ) { - Poll::Pending => (), - Poll::Ready(Ok(())) => (), - Poll::Ready(Err(e)) => { - self.state = ClientState::Disconnected; - return Poll::Ready(Some(Event::Disconnected(e.into()))); - } - }; - - // Poll stream - // - // This needs to be a loop in order to ignore packets we don’t care about, or those - // we want to handle elsewhere. Returning something isn’t correct in those two - // cases because it would signal to tokio that the XmppStream is also done, while - // there could be additional packets waiting for us. - // - // The proper solution is thus a loop which we exit once we have something to - // return. - loop { - match Pin::new(&mut stream).poll_next(cx) { - Poll::Ready(None) - | Poll::Ready(Some(Err(ReadError::StreamFooterReceived))) => { - // EOF - self.state = ClientState::Disconnected; - return Poll::Ready(Some(Event::Disconnected(Error::Disconnected))); - } - Poll::Ready(Some(Err(ReadError::HardError(e)))) => { - // Treat stream as dead on I/O errors - self.state = ClientState::Disconnected; - return Poll::Ready(Some(Event::Disconnected(e.into()))); - } - Poll::Ready(Some(Err(ReadError::ParseError(e)))) => { - // Treat stream as dead on parse errors, too (for now...) - self.state = ClientState::Disconnected; - return Poll::Ready(Some(Event::Disconnected( - io::Error::new(io::ErrorKind::InvalidData, e).into(), - ))); - } - Poll::Ready(Some(Err(ReadError::SoftTimeout))) => { - // TODO: do something smart about this. - } - Poll::Ready(Some(Ok(XmppStreamElement::Stanza(stanza)))) => { - // Receive stanza - self.state = ClientState::Connected { - stream, - features, - bound_jid, - }; - return Poll::Ready(Some(Event::Stanza(stanza))); - } - Poll::Ready(Some(Ok(_))) => { - // We ignore these for now. - } - Poll::Pending => { - // Try again later - self.state = ClientState::Connected { - stream, - features, - bound_jid, - }; - return Poll::Pending; - } - } - } - } - } - } -} - -/// Outgoing XMPP packets -/// -/// See `send_stanza()` for an `async fn` -impl Sink for Client { - type Error = Error; - - fn start_send(mut self: Pin<&mut Self>, item: Stanza) -> Result<(), Self::Error> { - match self.state { - ClientState::Connected { ref mut stream, .. } => { - Pin::new(stream).start_send(&item).map_err(|e| e.into()) - } - _ => Err(Error::InvalidState), - } - } - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match self.state { - ClientState::Connected { ref mut stream, .. } => as Sink< - &XmppStreamElement, - >>::poll_ready( - Pin::new(stream), cx - ) - .map_err(|e| e.into()), - _ => Poll::Pending, - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match self.state { - ClientState::Connected { ref mut stream, .. } => as Sink< - &XmppStreamElement, - >>::poll_flush( - Pin::new(stream), cx - ) - .map_err(|e| e.into()), - _ => Poll::Pending, - } - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match self.state { - ClientState::Connected { ref mut stream, .. } => as Sink< - &XmppStreamElement, - >>::poll_close( - Pin::new(stream), cx - ) - .map_err(|e| e.into()), - _ => Poll::Pending, + Some(StanzaStreamEvent::Stream(StreamEvent::Resumed)) => Some(Event::Online { + bound_jid: self.bound_jid.as_ref().unwrap().clone(), + resumed: true, + }), + Some(StanzaStreamEvent::Stream(StreamEvent::Suspended)) => continue, + }); } } } diff --git a/tokio-xmpp/src/connect/starttls.rs b/tokio-xmpp/src/connect/starttls.rs index 08ba90cebbbf034361abd7da19e96f646a16ab70..f8c33790a15ddf44e1d980930983e4b110583ff1 100644 --- a/tokio-xmpp/src/connect/starttls.rs +++ b/tokio-xmpp/src/connect/starttls.rs @@ -61,7 +61,8 @@ use crate::{ }; /// Client that connects over StartTls -pub type StartTlsClient = Client; +#[deprecated(since = "5.0.0", note = "use tokio_xmpp::Client instead")] +pub type StartTlsClient = Client; /// Connect via TCP+StartTLS to an XMPP server #[derive(Debug, Clone)] diff --git a/tokio-xmpp/src/connect/tcp.rs b/tokio-xmpp/src/connect/tcp.rs index 474e25b711c006060bfac3b531b5aef88f7b44fb..aa7bc2c8248ef5d68f213c5723df27e4575baba1 100644 --- a/tokio-xmpp/src/connect/tcp.rs +++ b/tokio-xmpp/src/connect/tcp.rs @@ -14,7 +14,8 @@ use crate::{ pub type TcpComponent = Component; /// Client that connects over TCP -pub type TcpClient = Client; +#[deprecated(since = "5.0.0", note = "use tokio_xmpp::Client instead")] +pub type TcpClient = Client; /// Connect via insecure plaintext TCP to an XMPP server /// This should only be used over localhost or otherwise when you know what you are doing diff --git a/xmpp/src/agent.rs b/xmpp/src/agent.rs index 5f5421d2745c841207d9a5394952221059c49007..3a32e6f75b7dc93f5d780fc297115e99d7c5b3b0 100644 --- a/xmpp/src/agent.rs +++ b/xmpp/src/agent.rs @@ -8,7 +8,6 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::sync::RwLock; -use tokio_xmpp::connect::ServerConnector; pub use tokio_xmpp::parsers; use tokio_xmpp::parsers::{disco::DiscoInfoResult, message::MessageType}; pub use tokio_xmpp::{ @@ -19,8 +18,8 @@ pub use tokio_xmpp::{ use crate::{event_loop, message, muc, upload, Error, Event, RoomNick}; -pub struct Agent { - pub(crate) client: TokioXmppClient, +pub struct Agent { + pub(crate) client: TokioXmppClient, pub(crate) default_nick: Arc>, pub(crate) lang: Arc>, pub(crate) disco: DiscoInfoResult, @@ -33,8 +32,8 @@ pub struct Agent { pub(crate) rooms_leaving: HashMap, } -impl Agent { - pub async fn disconnect(&mut self) -> Result<(), Error> { +impl Agent { + pub async fn disconnect(self) -> Result<(), Error> { self.client.send_end().await } diff --git a/xmpp/src/builder.rs b/xmpp/src/builder.rs index 821e27549b9b6af9cff2f7dd55d8d258158a64c7..df9526b5016ece797ec0b79e85f900ca2ecc8896 100644 --- a/xmpp/src/builder.rs +++ b/xmpp/src/builder.rs @@ -152,25 +152,24 @@ impl ClientBuilder<'_, C> { } } - pub fn build(self) -> Agent { + pub fn build(self) -> Agent { let jid: Jid = if let Some(resource) = &self.resource { self.jid.with_resource_str(resource).unwrap().into() } else { self.jid.clone().into() }; - let mut client = TokioXmppClient::new_with_connector( + let client = TokioXmppClient::new_with_connector( jid, self.password, self.server_connector.clone(), self.timeouts, ); - client.set_reconnect(true); self.build_impl(client) } // This function is meant to be used for testing build - pub(crate) fn build_impl(self, client: TokioXmppClient) -> Agent { + pub(crate) fn build_impl(self, client: TokioXmppClient) -> Agent { let disco = self.make_disco(); let node = self.website; diff --git a/xmpp/src/disco/mod.rs b/xmpp/src/disco/mod.rs index ee3312e38874b21b810efd07fae04dcf9ebc0be5..87e20020b7ec76726380e7627f1623ca8ae237ff 100644 --- a/xmpp/src/disco/mod.rs +++ b/xmpp/src/disco/mod.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ jid::Jid, parsers::{ @@ -19,11 +18,7 @@ use tokio_xmpp::{ use crate::Agent; -pub async fn handle_disco_info_result( - agent: &mut Agent, - disco: DiscoInfoResult, - from: Jid, -) { +pub async fn handle_disco_info_result(agent: &mut Agent, disco: DiscoInfoResult, from: Jid) { // Safe unwrap because no DISCO is received when we are not online if from == agent.client.bound_jid().unwrap().to_bare() && agent.awaiting_disco_bookmarks_type { info!("Received disco info about bookmarks type"); diff --git a/xmpp/src/event_loop.rs b/xmpp/src/event_loop.rs index 79c29305fc6ac54b2aca7ffa916ac476525197ff..86a977760dc967d705bafefc316e5de3159648fb 100644 --- a/xmpp/src/event_loop.rs +++ b/xmpp/src/event_loop.rs @@ -5,7 +5,6 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. use futures::StreamExt; -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ parsers::{disco::DiscoInfoQuery, iq::Iq, roster::Roster}, Event as TokioXmppEvent, Stanza, @@ -14,7 +13,7 @@ use tokio_xmpp::{ use crate::{iq, message, presence, Agent, Event}; /// Wait for new events, or Error::Disconnected when stream is closed and will not reconnect. -pub async fn wait_for_events(agent: &mut Agent) -> Vec { +pub async fn wait_for_events(agent: &mut Agent) -> Vec { if let Some(event) = agent.client.next().await { let mut events = Vec::new(); diff --git a/xmpp/src/iq/get.rs b/xmpp/src/iq/get.rs index cb9fb708265ae3f0189af51cd88294dd5e52a021..46b9c847662697fc2b8a8e26ac353960807d9af9 100644 --- a/xmpp/src/iq/get.rs +++ b/xmpp/src/iq/get.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ jid::Jid, minidom::Element, @@ -18,8 +17,8 @@ use tokio_xmpp::{ use crate::{Agent, Event}; -pub async fn handle_iq_get( - agent: &mut Agent, +pub async fn handle_iq_get( + agent: &mut Agent, _events: &mut Vec, from: Jid, _to: Option, diff --git a/xmpp/src/iq/mod.rs b/xmpp/src/iq/mod.rs index 3d097bbc66fad1c00dca54f7b67b3e50ab53a880..54db1bd083c9173524f8c0a0e49ff2ac5309c5a1 100644 --- a/xmpp/src/iq/mod.rs +++ b/xmpp/src/iq/mod.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::parsers::iq::{Iq, IqType}; use crate::{Agent, Event}; @@ -13,7 +12,7 @@ pub mod get; pub mod result; pub mod set; -pub async fn handle_iq(agent: &mut Agent, iq: Iq) -> Vec { +pub async fn handle_iq(agent: &mut Agent, iq: Iq) -> Vec { let mut events = vec![]; let from = iq .from diff --git a/xmpp/src/iq/result.rs b/xmpp/src/iq/result.rs index 0f6f3a59d65d9d964211aa74746f1f15adc36647..0d866d2bd5fbd0f65c0d23ab9ab7b4d9970dd7d9 100644 --- a/xmpp/src/iq/result.rs +++ b/xmpp/src/iq/result.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ jid::Jid, minidom::Element, @@ -13,8 +12,8 @@ use tokio_xmpp::{ use crate::{disco, muc::room::JoinRoomSettings, pubsub, upload, Agent, Event}; -pub async fn handle_iq_result( - agent: &mut Agent, +pub async fn handle_iq_result( + agent: &mut Agent, events: &mut Vec, from: Jid, _to: Option, diff --git a/xmpp/src/iq/set.rs b/xmpp/src/iq/set.rs index 821b45eeac92073d7cd787cf4118172c4d2f231e..cd26ff49ed0803d1760edf70f0debaa1312d11b0 100644 --- a/xmpp/src/iq/set.rs +++ b/xmpp/src/iq/set.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ jid::Jid, minidom::Element, @@ -16,8 +15,8 @@ use tokio_xmpp::{ use crate::{Agent, Event}; -pub async fn handle_iq_set( - agent: &mut Agent, +pub async fn handle_iq_set( + agent: &mut Agent, _events: &mut Vec, from: Jid, _to: Option, diff --git a/xmpp/src/lib.rs b/xmpp/src/lib.rs index 3a9f238e3a17d851345f4bcbeca02b4c08fb7f7a..617785e7e56e6d9cf998bb8af9bad018c4247f37 100644 --- a/xmpp/src/lib.rs +++ b/xmpp/src/lib.rs @@ -38,6 +38,10 @@ pub type Error = tokio_xmpp::Error; pub type Id = Option; pub type RoomNick = String; +// The test below is dysfunctional since we have moved to StanzaStream. The +// StanzaStream will attempt to connect to foo@bar indefinitely. +// Keeping it here as inspiration for future integration tests. +/* #[cfg(all(test, any(feature = "starttls-rust", feature = "starttls-native")))] mod tests { use super::jid::BareJid; @@ -74,3 +78,4 @@ mod tests { } } } +*/ diff --git a/xmpp/src/message/receive/chat.rs b/xmpp/src/message/receive/chat.rs index da708a1d872a6174c47161f18a9fe4ee0137a795..343fff029e1eadda4fcf5cd56b8185b9f98d683f 100644 --- a/xmpp/src/message/receive/chat.rs +++ b/xmpp/src/message/receive/chat.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ jid::Jid, parsers::{message::Message, muc::user::MucUser}, @@ -12,8 +11,8 @@ use tokio_xmpp::{ use crate::{delay::StanzaTimeInfo, Agent, Event}; -pub async fn handle_message_chat( - agent: &mut Agent, +pub async fn handle_message_chat( + agent: &mut Agent, events: &mut Vec, from: Jid, message: &Message, diff --git a/xmpp/src/message/receive/group_chat.rs b/xmpp/src/message/receive/group_chat.rs index 66dfb8f714df53029663e7084d3dbf6505b4acf0..fe4ac46a99731be61104858545d775087fab1e98 100644 --- a/xmpp/src/message/receive/group_chat.rs +++ b/xmpp/src/message/receive/group_chat.rs @@ -4,13 +4,12 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{jid::Jid, parsers::message::Message}; use crate::{delay::StanzaTimeInfo, Agent, Event}; -pub async fn handle_message_group_chat( - agent: &mut Agent, +pub async fn handle_message_group_chat( + agent: &mut Agent, events: &mut Vec, from: Jid, message: &Message, diff --git a/xmpp/src/message/receive/mod.rs b/xmpp/src/message/receive/mod.rs index 5dbe8608e26f1560af2e9188fb6f9275d672d05f..0b03112db2fdf70d1e819fc3ac7c1f7093716010 100644 --- a/xmpp/src/message/receive/mod.rs +++ b/xmpp/src/message/receive/mod.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::parsers::{ message::{Message, MessageType}, ns, @@ -15,10 +14,7 @@ use crate::{delay::message_time_info, pubsub, Agent, Event}; pub mod chat; pub mod group_chat; -pub async fn handle_message( - agent: &mut Agent, - message: Message, -) -> Vec { +pub async fn handle_message(agent: &mut Agent, message: Message) -> Vec { let mut events = vec![]; let from = message.from.clone().unwrap(); let time_info = message_time_info(&message); diff --git a/xmpp/src/message/send.rs b/xmpp/src/message/send.rs index d099d2b011ba51e237094b01093445ce6b48005c..a1368eaf4d0e2b02357ebf089a43c92bc37f2e41 100644 --- a/xmpp/src/message/send.rs +++ b/xmpp/src/message/send.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ jid::Jid, parsers::message::{Body, Message, MessageType}, @@ -12,8 +11,8 @@ use tokio_xmpp::{ use crate::Agent; -pub async fn send_message( - agent: &mut Agent, +pub async fn send_message( + agent: &mut Agent, recipient: Jid, type_: MessageType, lang: &str, diff --git a/xmpp/src/muc/private_message.rs b/xmpp/src/muc/private_message.rs index dc1ca7bb56ed70a56e941eb274ffb4777901b9bb..6c473a826a315ae21f59daff5e619e78a63e72e6 100644 --- a/xmpp/src/muc/private_message.rs +++ b/xmpp/src/muc/private_message.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ jid::{BareJid, Jid}, parsers::{ @@ -15,8 +14,8 @@ use tokio_xmpp::{ use crate::{Agent, RoomNick}; -pub async fn send_room_private_message( - agent: &mut Agent, +pub async fn send_room_private_message( + agent: &mut Agent, room: BareJid, recipient: RoomNick, lang: &str, diff --git a/xmpp/src/muc/room.rs b/xmpp/src/muc/room.rs index dc6e9aa510ad67e640d901b378ad29022c246545..7791b79da1c793bc22893a98a660de1ecc149ba5 100644 --- a/xmpp/src/muc/room.rs +++ b/xmpp/src/muc/room.rs @@ -5,7 +5,6 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. use crate::parsers::message::MessageType; -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ jid::BareJid, parsers::{ @@ -51,10 +50,7 @@ impl<'a> JoinRoomSettings<'a> { } /// TODO: this method should add bookmark and ensure autojoin is true -pub async fn join_room<'a, C: ServerConnector>( - agent: &mut Agent, - settings: JoinRoomSettings<'a>, -) { +pub async fn join_room<'a>(agent: &mut Agent, settings: JoinRoomSettings<'a>) { let JoinRoomSettings { room, nick, @@ -122,10 +118,7 @@ impl<'a> LeaveRoomSettings<'a> { /// If successful, a `RoomLeft` event should be received later as a confirmation. See [XEP-0045](https://xmpp.org/extensions/xep-0045.html#exit). /// /// TODO: this method should set autojoin false on bookmark -pub async fn leave_room<'a, C: ServerConnector>( - agent: &mut Agent, - settings: LeaveRoomSettings<'a>, -) { +pub async fn leave_room<'a>(agent: &mut Agent, settings: LeaveRoomSettings<'a>) { let LeaveRoomSettings { room, status } = settings; if agent.rooms_leaving.contains_key(&room) { @@ -188,10 +181,7 @@ impl<'a> RoomMessageSettings<'a> { } } -pub async fn send_room_message<'a, C: ServerConnector>( - agent: &mut Agent, - settings: RoomMessageSettings<'a>, -) { +pub async fn send_room_message<'a>(agent: &mut Agent, settings: RoomMessageSettings<'a>) { let RoomMessageSettings { room, message, diff --git a/xmpp/src/presence/receive.rs b/xmpp/src/presence/receive.rs index 0f26ab629e31000bd174e745a7684077941857e9..916ae9bf21c7771ca63026ef0997e71e38a2c795 100644 --- a/xmpp/src/presence/receive.rs +++ b/xmpp/src/presence/receive.rs @@ -4,7 +4,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::parsers::{ muc::user::{MucUser, Status}, presence::{Presence, Type as PresenceType}, @@ -13,10 +12,7 @@ use tokio_xmpp::parsers::{ use crate::{Agent, Event}; /// Translate a `Presence` stanza into a list of higher-level `Event`s. -pub async fn handle_presence( - agent: &mut Agent, - presence: Presence, -) -> Vec { +pub async fn handle_presence(agent: &mut Agent, presence: Presence) -> Vec { // Allocate an empty vector to store the events. let mut events = vec![]; diff --git a/xmpp/src/pubsub/avatar.rs b/xmpp/src/pubsub/avatar.rs index 3d522b5996c0bad51f9572c5b6ad1629f6c58de9..4a04f0edd899e6b465d46743c4cd4e7737a4df62 100644 --- a/xmpp/src/pubsub/avatar.rs +++ b/xmpp/src/pubsub/avatar.rs @@ -8,7 +8,6 @@ use super::Agent; use crate::Event; use std::fs::{self, File}; use std::io::{self, Write}; -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::parsers::{ avatar::{Data, Metadata}, iq::Iq, @@ -21,9 +20,9 @@ use tokio_xmpp::parsers::{ }, }; -pub(crate) async fn handle_metadata_pubsub_event( +pub(crate) async fn handle_metadata_pubsub_event( from: &Jid, - agent: &mut Agent, + agent: &mut Agent, items: Vec, ) -> Vec { let mut events = Vec::new(); diff --git a/xmpp/src/pubsub/mod.rs b/xmpp/src/pubsub/mod.rs index f514baecc18dae864dbce20e353c8d598be61a0d..148d13d80b95c39eb82184958f4746038ef3f038 100644 --- a/xmpp/src/pubsub/mod.rs +++ b/xmpp/src/pubsub/mod.rs @@ -11,7 +11,6 @@ use crate::{ }; use std::str::FromStr; use tokio_xmpp::{ - connect::ServerConnector, jid::{BareJid, Jid}, minidom::Element, parsers::{ @@ -23,10 +22,10 @@ use tokio_xmpp::{ #[cfg(feature = "avatars")] pub(crate) mod avatar; -pub(crate) async fn handle_event( +pub(crate) async fn handle_event( #[cfg_attr(not(feature = "avatars"), allow(unused_variables))] from: &Jid, elem: Element, - #[cfg_attr(not(feature = "avatars"), allow(unused_variables))] agent: &mut Agent, + #[cfg_attr(not(feature = "avatars"), allow(unused_variables))] agent: &mut Agent, ) -> Vec { // We allow the useless mut warning for no-default-features, // since for now only avatars pushes events here. @@ -101,10 +100,10 @@ pub(crate) async fn handle_event( events } -pub(crate) async fn handle_iq_result( +pub(crate) async fn handle_iq_result( #[cfg_attr(not(feature = "avatars"), allow(unused_variables))] from: &Jid, elem: Element, - agent: &mut Agent, + agent: &mut Agent, ) -> impl IntoIterator { // We allow the useless mut warning for no-default-features, // since for now only avatars pushes events here. diff --git a/xmpp/src/upload/receive.rs b/xmpp/src/upload/receive.rs index 9f2c712caefa43e60b7cf6d0f5d23d2e27ff98dc..2c9fb66d9d08be3a62e7910849ae153a0ab50a00 100644 --- a/xmpp/src/upload/receive.rs +++ b/xmpp/src/upload/receive.rs @@ -10,7 +10,6 @@ use reqwest::{ use std::path::PathBuf; use tokio::fs::File; use tokio_util::codec::{BytesCodec, FramedRead}; -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ jid::Jid, minidom::Element, @@ -19,11 +18,11 @@ use tokio_xmpp::{ use crate::{Agent, Event}; -pub async fn handle_upload_result( +pub async fn handle_upload_result( from: &Jid, iqid: String, elem: Element, - agent: &mut Agent, + agent: &mut Agent, ) -> impl IntoIterator { let mut res: Option<(usize, PathBuf)> = None; diff --git a/xmpp/src/upload/send.rs b/xmpp/src/upload/send.rs index c00e098c7f77907098a420a8de24ff97f2bda724..396c12751f4d32a5f63334e7029d6310d52f9a8a 100644 --- a/xmpp/src/upload/send.rs +++ b/xmpp/src/upload/send.rs @@ -6,7 +6,6 @@ use std::path::Path; use tokio::fs::File; -use tokio_xmpp::connect::ServerConnector; use tokio_xmpp::{ jid::Jid, parsers::{http_upload::SlotRequest, iq::Iq}, @@ -14,11 +13,7 @@ use tokio_xmpp::{ use crate::Agent; -pub async fn upload_file_with( - agent: &mut Agent, - service: &str, - path: &Path, -) { +pub async fn upload_file_with(agent: &mut Agent, service: &str, path: &Path) { let name = path.file_name().unwrap().to_str().unwrap().to_string(); let file = File::open(path).await.unwrap(); let size = file.metadata().await.unwrap().len();