@@ -4,7 +4,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-use futures::{Future, Stream, sync::mpsc};
+use futures::prelude::*;
use std::env::args;
use std::process::exit;
use std::str::FromStr;
@@ -26,18 +26,19 @@ fn main() {
// tokio_core context
let mut rt = Runtime::new().unwrap();
- let (value_tx, value_rx) = mpsc::unbounded();
// Client instance
- let (client, mut agent) = ClientBuilder::new(jid, password)
+ let (mut agent, stream) = ClientBuilder::new(jid, password)
.set_client(ClientType::Bot, "xmpp-rs")
.set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
.enable_feature(ClientFeature::Avatars)
.enable_feature(ClientFeature::ContactList)
- .build(value_tx)
+ .build()
.unwrap();
- let forwarder = value_rx.for_each(|evt: Event| {
+ // We return either Some(Error) if an error was encountered
+ // or None, if we were simply disconnected
+ let handler = stream.map_err(Some).for_each(|evt: Event| {
match evt {
Event::Online => {
println!("Online.");
@@ -46,7 +47,7 @@ fn main() {
},
Event::Disconnected => {
println!("Disconnected.");
- return Err(());
+ return Err(None);
},
Event::ContactAdded(contact) => {
println!("Contact {:?} added.", contact);
@@ -66,19 +67,10 @@ fn main() {
},
}
Ok(())
- })
- .map_err(|e| println!("{:?}", e));
+ });
- // Start polling
- match rt.block_on(client
- .select2(forwarder)
- .map(|_| ())
- .map_err(|_| ())
- ) {
- Ok(_) => (),
- Err(e) => {
- println!("Fatal: {:?}", e);
- ()
- }
- }
+ rt.block_on(handler).unwrap_or_else(|e| match e {
+ Some(e) => println!("Error: {:?}", e),
+ None => println!("Disconnected."),
+ });
}
@@ -4,7 +4,8 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-use futures::{Sink, sync::mpsc};
+use crate::Event;
+use futures::{sync::mpsc, Sink};
use std::fs::{create_dir_all, File};
use std::io::{self, Write};
use tokio_xmpp::Packet;
@@ -19,7 +20,6 @@ use xmpp_parsers::{
},
Jid, TryFrom,
};
-use crate::Event;
pub(crate) fn handle_metadata_pubsub_event(from: &Jid, tx: &mut mpsc::UnboundedSender<Packet>, items: Vec<Item>) {
for item in items {
@@ -43,16 +43,24 @@ fn download_avatar(from: &Jid) -> Iq {
.with_to(from.clone())
}
-pub(crate) fn handle_data_pubsub_iq(from: &Jid, tx: &mut mpsc::UnboundedSender<Event>, items: Items) {
- for item in items.items {
- if let Some(id) = item.id.clone() {
- if let Some(payload) = &item.payload {
+// The return value of this function will be simply pushed to a Vec in the caller function,
+// so it makes no sense to allocate a Vec here - we're lazy instead
+pub(crate) fn handle_data_pubsub_iq<'a>(
+ from: &'a Jid,
+ items: &'a Items,
+) -> impl IntoIterator<Item = Event> + 'a {
+ let from = from.clone();
+ items
+ .items
+ .iter()
+ .filter_map(move |item| match (&item.id, &item.payload) {
+ (Some(id), Some(payload)) => {
let data = Data::try_from(payload.clone()).unwrap();
- let filename = save_avatar(from, id.0, &data.data).unwrap();
- tx.unbounded_send(Event::AvatarRetrieved(from.clone(), filename)).unwrap();
+ let filename = save_avatar(&from, id.0.clone(), &data.data).unwrap();
+ Some(Event::AvatarRetrieved(from.clone(), filename))
}
- }
- }
+ _ => None,
+ })
}
fn save_avatar(from: &Jid, id: String, data: &[u8]) -> io::Result<String> {
@@ -34,6 +34,8 @@ use xmpp_parsers::{
mod avatar;
+pub type Error = tokio_xmpp::Error;
+
#[derive(Debug)]
pub enum ClientType {
Bot,
@@ -135,19 +137,35 @@ impl ClientBuilder<'_> {
presence
}
- pub fn build(self, mut app_tx: mpsc::UnboundedSender<Event>) -> Result<(Box<Future<Item = (), Error = ()>>, Client), JidParseError> {
+ pub fn build(
+ self,
+ ) -> Result<(Agent, impl Stream<Item = Event, Error = tokio_xmpp::Error>), JidParseError> {
+ let client = TokioXmppClient::new(self.jid, self.password)?;
+ Ok(self.build_impl(client))
+ }
+
+ // This function is meant to be used for testing build
+ pub(crate) fn build_impl<S>(
+ self,
+ stream: S,
+ ) -> (Agent, impl Stream<Item = Event, Error = tokio_xmpp::Error>)
+ where
+ S: Stream<Item = tokio_xmpp::Event, Error = tokio_xmpp::Error>
+ + Sink<SinkItem = tokio_xmpp::Packet, SinkError = tokio_xmpp::Error>,
+ {
let disco = self.make_disco();
let node = self.website;
let (sender_tx, sender_rx) = mpsc::unbounded();
- let client = TokioXmppClient::new(self.jid, self.password)?;
+ let client = stream;
let (sink, stream) = client.split();
let reader = {
let mut sender_tx = sender_tx.clone();
let jid = self.jid.to_owned();
- stream.for_each(move |event| {
+ stream.map(move |event| {
// Helper function to send an iq error.
+ let mut events = Vec::new();
let send_error = |to, id, type_, condition, text: &str| {
let error = StanzaError::new(type_, condition, "en", text);
let iq = Iq::from_error(id, error)
@@ -162,13 +180,13 @@ impl ClientBuilder<'_> {
let packet = Packet::Stanza(presence);
sender_tx.unbounded_send(packet)
.unwrap();
- app_tx.unbounded_send(Event::Online).unwrap();
+ events.push(Event::Online);
let iq = Iq::from_get("roster", Roster { ver: None, items: vec![] })
.into();
sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
}
TokioXmppEvent::Disconnected => {
- app_tx.unbounded_send(Event::Disconnected).unwrap();
+ events.push(Event::Disconnected);
}
TokioXmppEvent::Stanza(stanza) => {
if stanza.is("iq", "jabber:client") {
@@ -197,7 +215,7 @@ impl ClientBuilder<'_> {
if payload.is("query", ns::ROSTER) {
let roster = Roster::try_from(payload).unwrap();
for item in roster.items.into_iter() {
- app_tx.unbounded_send(Event::ContactAdded(item)).unwrap();
+ events.push(Event::ContactAdded(item));
}
} else if payload.is("pubsub", ns::PUBSUB) {
let pubsub = PubSub::try_from(payload).unwrap();
@@ -205,7 +223,8 @@ impl ClientBuilder<'_> {
iq.from.clone().unwrap_or(Jid::from_str(&jid).unwrap());
if let PubSub::Items(items) = pubsub {
if items.node.0 == ns::AVATAR_DATA {
- avatar::handle_data_pubsub_iq(&from, &mut app_tx, items);
+ let new_events = avatar::handle_data_pubsub_iq(&from, &items);
+ events.extend(new_events);
}
}
}
@@ -236,7 +255,7 @@ impl ClientBuilder<'_> {
};
for status in muc_user.status.into_iter() {
if status == Status::SelfPresence {
- app_tx.unbounded_send(Event::RoomJoined(from.clone())).unwrap();
+ events.push(Event::RoomJoined(from.clone()));
break;
}
}
@@ -249,8 +268,9 @@ impl ClientBuilder<'_> {
}
}
- Ok(())
+ futures::stream::iter_ok(events)
})
+ .flatten()
};
let sender = sender_rx
@@ -259,25 +279,45 @@ impl ClientBuilder<'_> {
.map(|(rx, mut sink)| {
drop(rx);
let _ = sink.close();
+ None
});
- let future = reader.select(sender)
- .map(|_| ())
- .map_err(|_| ());
+ // TODO is this correct?
+ // Some(Error) means a real error
+ // None means the end of the sender stream and can be ignored
+ let future = reader
+ .map(Some)
+ .select(sender.into_stream())
+ .filter_map(|x| x);
- let agent = Client {
- sender_tx,
- };
+ let agent = Agent { sender_tx };
- Ok((Box::new(future), agent))
+ (agent, future)
}
}
pub struct Client {
sender_tx: mpsc::UnboundedSender<Packet>,
+ stream: Box<Stream<Item = Event, Error = Error>>,
}
impl Client {
+ pub fn get_agent(&self) -> Agent {
+ Agent {
+ sender_tx: self.sender_tx.clone(),
+ }
+ }
+
+ pub fn listen(self) -> Box<Stream<Item = Event, Error = Error>> {
+ self.stream
+ }
+}
+
+pub struct Agent {
+ sender_tx: mpsc::UnboundedSender<Packet>,
+}
+
+impl Agent {
pub fn join_room(&mut self, room: Jid, lang: &str, status: &str) {
let mut presence = Presence::new(PresenceType::None)
.with_to(Some(room))