Hello world!

Emmanuel Gil Peyrot created

Change summary

.gitignore            |   3 
Cargo.toml            |  11 +
examples/hello_bot.rs |  74 +++++++++++
src/avatar.rs         |  65 ++++++++++
src/lib.rs            | 286 +++++++++++++++++++++++++++++++++++++++++++++
5 files changed, 439 insertions(+)

Detailed changes

Cargo.toml 🔗

@@ -0,0 +1,11 @@
+[package]
+name = "xmpp"
+version = "0.3.0"
+authors = ["Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>"]
+edition = "2018"
+
+[dependencies]
+tokio-xmpp = "1"
+xmpp-parsers = "0.13"
+futures = "0.1"
+tokio = "0.1"

examples/hello_bot.rs 🔗

@@ -0,0 +1,74 @@
+// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use futures::{Future, Stream, sync::mpsc};
+use std::env::args;
+use std::process::exit;
+use std::str::FromStr;
+use tokio::runtime::current_thread::Runtime;
+use xmpp_parsers::{Jid, message::MessageType};
+use xmpp::{ClientBuilder, ClientType, ClientFeature, Event};
+
+fn main() {
+    let args: Vec<String> = args().collect();
+    if args.len() != 5 {
+        println!("Usage: {} <jid> <password> <room JID> <nick>", args[0]);
+        exit(1);
+    }
+    let jid = &args[1];
+    let password = &args[2];
+    let room_jid = &args[3];
+    let nick: &str = &args[4];
+
+    // tokio_core context
+    let mut rt = Runtime::new().unwrap();
+
+    let (value_tx, value_rx) = mpsc::unbounded();
+
+    // Client instance
+    let (client, mut agent) = ClientBuilder::new(jid, password)
+        .set_client(ClientType::Bot, "xmpp-rs")
+        .set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
+        .enable_feature(ClientFeature::Avatars)
+        .build(value_tx)
+        .unwrap();
+
+    let forwarder = value_rx.for_each(|evt: Event| {
+        match evt {
+            Event::Online => {
+                println!("Online.");
+                let room_jid = Jid::from_str(room_jid).unwrap().with_resource(nick);
+                agent.join_room(room_jid, "en", "Yet another bot!");
+            },
+            Event::Disconnected => {
+                println!("Disconnected.");
+                return Err(());
+            },
+            Event::RoomJoined(jid) => {
+                println!("Joined room {}.", jid);
+                agent.send_message(jid.into_bare_jid(), MessageType::Groupchat, "en", "Hello world!");
+            },
+            Event::AvatarRetrieved(jid, path) => {
+                println!("Received avatar for {} in {}.", jid, path);
+            },
+        }
+        Ok(())
+    })
+        .map_err(|e| println!("{:?}", e));
+
+    // Start polling
+    match rt.block_on(client
+        .select2(forwarder)
+        .map(|_| ())
+        .map_err(|_| ())
+    ) {
+        Ok(_) => (),
+        Err(e) => {
+            println!("Fatal: {:?}", e);
+            ()
+        }
+    }
+}

src/avatar.rs 🔗

@@ -0,0 +1,65 @@
+// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use futures::{Sink, sync::mpsc};
+use std::fs::{create_dir_all, File};
+use std::io::{self, Write};
+use tokio_xmpp::Packet;
+use xmpp_parsers::{
+    avatar::{Data, Metadata},
+    iq::Iq,
+    ns,
+    pubsub::{
+        event::Item,
+        pubsub::{Items, PubSub},
+        NodeName,
+    },
+    Jid, TryFrom,
+};
+use crate::Event;
+
+pub(crate) fn handle_metadata_pubsub_event(from: &Jid, tx: &mut mpsc::UnboundedSender<Packet>, items: Vec<Item>) {
+    for item in items {
+        let payload = item.payload.clone().unwrap();
+        if payload.is("metadata", ns::AVATAR_METADATA) {
+            // TODO: do something with these metadata.
+            let _metadata = Metadata::try_from(payload).unwrap();
+            let iq = download_avatar(from);
+            tx.start_send(Packet::Stanza(iq.into())).unwrap();
+        }
+    }
+}
+
+fn download_avatar(from: &Jid) -> Iq {
+    Iq::from_get("coucou", PubSub::Items(Items {
+        max_items: None,
+        node: NodeName(String::from(ns::AVATAR_DATA)),
+        subid: None,
+        items: Vec::new(),
+    }))
+    .with_to(from.clone())
+}
+
+pub(crate) fn handle_data_pubsub_iq(from: &Jid, tx: &mut mpsc::UnboundedSender<Event>, items: Items) {
+    for item in items.items {
+        if let Some(id) = item.id.clone() {
+            if let Some(payload) = &item.payload {
+                let data = Data::try_from(payload.clone()).unwrap();
+                let filename = save_avatar(from, id.0, &data.data).unwrap();
+                tx.unbounded_send(Event::AvatarRetrieved(from.clone(), filename)).unwrap();
+            }
+        }
+    }
+}
+
+fn save_avatar(from: &Jid, id: String, data: &[u8]) -> io::Result<String> {
+    let directory = format!("data/{}", from);
+    let filename = format!("data/{}/{}", from, id);
+    create_dir_all(directory)?;
+    let mut file = File::create(&filename)?;
+    file.write_all(data)?;
+    Ok(filename)
+}

src/lib.rs 🔗

@@ -0,0 +1,286 @@
+// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use std::str::FromStr;
+use futures::{Future,Stream, Sink, sync::mpsc};
+use tokio_xmpp::{
+    Client as TokioXmppClient,
+    Event as TokioXmppEvent,
+    Packet,
+};
+use xmpp_parsers::{
+    caps::{compute_disco, hash_caps, Caps},
+    disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
+    hashes::Algo,
+    iq::{Iq, IqType},
+    message::{Message, MessageType, Body},
+    muc::{
+        Muc,
+        user::{MucUser, Status},
+    },
+    ns,
+    presence::{Presence, Type as PresenceType},
+    pubsub::{
+        event::PubSubEvent,
+        pubsub::PubSub,
+    },
+    stanza_error::{StanzaError, ErrorType, DefinedCondition},
+    Jid, JidParseError, TryFrom,
+};
+
+mod avatar;
+
+#[derive(Debug)]
+pub enum ClientType {
+    Bot,
+    Pc,
+}
+
+impl Default for ClientType {
+    fn default() -> Self {
+        ClientType::Bot
+    }
+}
+
+impl ToString for ClientType {
+    fn to_string(&self) -> String {
+        String::from(
+            match self {
+                ClientType::Bot => "bot",
+                ClientType::Pc => "pc",
+            }
+        )
+    }
+}
+
+#[derive(PartialEq)]
+pub enum ClientFeature {
+    Avatars,
+}
+
+pub enum Event {
+    Online,
+    Disconnected,
+    AvatarRetrieved(Jid, String),
+    RoomJoined(Jid),
+}
+
+#[derive(Default)]
+pub struct ClientBuilder<'a> {
+    jid: &'a str,
+    password: &'a str,
+    website: String,
+    disco: (ClientType, String),
+    features: Vec<ClientFeature>,
+}
+
+impl ClientBuilder<'_> {
+    pub fn new<'a>(jid: &'a str, password: &'a str) -> ClientBuilder<'a> {
+        ClientBuilder {
+            jid,
+            password,
+            website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
+            disco: (ClientType::default(), String::from("tokio-xmpp")),
+            features: vec![],
+        }
+    }
+
+    pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
+        self.disco = (type_, String::from(name));
+        self
+    }
+
+    pub fn set_website(mut self, url: &str) -> Self {
+        self.website = String::from(url);
+        self
+    }
+
+    pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
+        self.features.push(feature);
+        self
+    }
+
+    fn make_disco(&self) -> DiscoInfoResult {
+        let identities = vec![Identity::new("client", self.disco.0.to_string(),
+                                            "en", self.disco.1.to_string())];
+        let mut features = vec![
+            Feature::new(ns::DISCO_INFO),
+        ];
+        if self.features.contains(&ClientFeature::Avatars) {
+            features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
+        }
+        DiscoInfoResult {
+            node: None,
+            identities,
+            features,
+            extensions: vec![],
+        }
+    }
+
+    fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
+        let caps_data = compute_disco(disco);
+        let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
+        let caps = Caps::new(node, hash);
+
+        let mut presence = Presence::new(PresenceType::None);
+        presence.add_payload(caps);
+        presence
+    }
+
+    pub fn build(self, mut app_tx: mpsc::UnboundedSender<Event>) -> Result<(Box<Future<Item = (), Error = ()>>, Client), JidParseError> {
+        let disco = self.make_disco();
+        let node = self.website;
+        let (sender_tx, sender_rx) = mpsc::unbounded();
+
+        let client = TokioXmppClient::new(self.jid, self.password)?;
+        let (sink, stream) = client.split();
+
+        let reader = {
+            let mut sender_tx = sender_tx.clone();
+            let jid = self.jid.to_owned();
+            stream.for_each(move |event| {
+                // Helper function to send an iq error.
+                let send_error = |to, id, type_, condition, text: &str| {
+                    let error = StanzaError::new(type_, condition, "en", text);
+                    let iq = Iq::from_error(id, error)
+                        .with_to(to)
+                        .into();
+                    sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
+                };
+
+                match event {
+                    TokioXmppEvent::Online => {
+                        let presence = ClientBuilder::make_initial_presence(&disco, &node).into();
+                        let packet = Packet::Stanza(presence);
+                        sender_tx.unbounded_send(packet)
+                            .unwrap();
+                        app_tx.unbounded_send(Event::Online).unwrap();
+                    }
+                    TokioXmppEvent::Disconnected => {
+                        app_tx.unbounded_send(Event::Disconnected).unwrap();
+                    }
+                    TokioXmppEvent::Stanza(stanza) => {
+                        if stanza.is("iq", "jabber:client") {
+                            let iq = Iq::try_from(stanza).unwrap();
+                            if let IqType::Get(payload) = iq.payload {
+                                if payload.is("query", ns::DISCO_INFO) {
+                                    let query = DiscoInfoQuery::try_from(payload);
+                                    match query {
+                                        Ok(query) => {
+                                            let mut disco_info = disco.clone();
+                                            disco_info.node = query.node;
+                                            let iq = Iq::from_result(iq.id, Some(disco_info))
+                                                .with_to(iq.from.unwrap())
+                                                .into();
+                                            sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
+                                        },
+                                        Err(err) => {
+                                            send_error(iq.from.unwrap(), iq.id, ErrorType::Modify, DefinedCondition::BadRequest, &format!("{}", err));
+                                        },
+                                    }
+                                } else {
+                                    // We MUST answer unhandled get iqs with a service-unavailable error.
+                                    send_error(iq.from.unwrap(), iq.id, ErrorType::Cancel, DefinedCondition::ServiceUnavailable, "No handler defined for this kind of iq.");
+                                }
+                            } else if let IqType::Result(Some(payload)) = iq.payload {
+                                if payload.is("pubsub", ns::PUBSUB) {
+                                    let pubsub = PubSub::try_from(payload).unwrap();
+                                    let from =
+                                        iq.from.clone().unwrap_or(Jid::from_str(&jid).unwrap());
+                                    if let PubSub::Items(items) = pubsub {
+                                        if items.node.0 == ns::AVATAR_DATA {
+                                            avatar::handle_data_pubsub_iq(&from, &mut app_tx, items);
+                                        }
+                                    }
+                                }
+                            } else if let IqType::Set(_) = iq.payload {
+                                // We MUST answer unhandled set iqs with a service-unavailable error.
+                                send_error(iq.from.unwrap(), iq.id, ErrorType::Cancel, DefinedCondition::ServiceUnavailable, "No handler defined for this kind of iq.");
+                            }
+                        } else if stanza.is("message", "jabber:client") {
+                            let message = Message::try_from(stanza).unwrap();
+                            let from = message.from.clone().unwrap();
+                            for child in message.payloads {
+                                if child.is("event", ns::PUBSUB_EVENT) {
+                                    let event = PubSubEvent::try_from(child).unwrap();
+                                    if let PubSubEvent::PublishedItems { node, items } = event {
+                                        if node.0 == ns::AVATAR_METADATA {
+                                            avatar::handle_metadata_pubsub_event(&from, &mut sender_tx, items);
+                                        }
+                                    }
+                                }
+                            }
+                        } else if stanza.is("presence", "jabber:client") {
+                            let presence = Presence::try_from(stanza).unwrap();
+                            let from = presence.from.clone().unwrap();
+                            for payload in presence.payloads.into_iter() {
+                                let muc_user = match MucUser::try_from(payload) {
+                                    Ok(muc_user) => muc_user,
+                                    _ => continue
+                                };
+                                for status in muc_user.status.into_iter() {
+                                    if status == Status::SelfPresence {
+                                        app_tx.unbounded_send(Event::RoomJoined(from.clone())).unwrap();
+                                        break;
+                                    }
+                                }
+                            }
+                        } else if stanza.is("error", "http://etherx.jabber.org/streams") {
+                            println!("Received a fatal stream error: {}", String::from(&stanza));
+                        } else {
+                            panic!("Unknown stanza: {}", String::from(&stanza));
+                        }
+                    }
+                }
+
+                Ok(())
+            })
+        };
+
+        let sender = sender_rx
+            .map_err(|e| panic!("Sink error: {:?}", e))
+            .forward(sink)
+            .map(|(rx, mut sink)| {
+                drop(rx);
+                let _ = sink.close();
+            });
+
+        let future = reader.select(sender)
+            .map(|_| ())
+            .map_err(|_| ());
+
+        let agent = Client {
+            sender_tx,
+        };
+
+        Ok((Box::new(future), agent))
+    }
+}
+
+pub struct Client {
+    sender_tx: mpsc::UnboundedSender<Packet>,
+}
+
+impl Client {
+    pub fn join_room(&mut self, room: Jid, lang: &str, status: &str) {
+        let mut presence = Presence::new(PresenceType::None)
+            .with_to(Some(room))
+            .with_payloads(vec![Muc::new().into()]);
+        presence.set_status(String::from(lang), String::from(status));
+        let presence = presence.into();
+        self.sender_tx.unbounded_send(Packet::Stanza(presence))
+            .unwrap();
+    }
+
+    pub fn send_message(&mut self, recipient: Jid, type_: MessageType, lang: &str, text: &str) {
+        let mut message = Message::new(Some(recipient));
+        message.type_ = type_;
+        message.bodies.insert(String::from(lang), Body(String::from(text)));
+        let message = message.into();
+        self.sender_tx.unbounded_send(Packet::Stanza(message))
+            .unwrap();
+    }
+}