lib.rs

  1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
  2//
  3// This Source Code Form is subject to the terms of the Mozilla Public
  4// License, v. 2.0. If a copy of the MPL was not distributed with this
  5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7#![deny(bare_trait_objects)]
  8
  9use futures::stream::StreamExt;
 10use reqwest::{
 11    header::HeaderMap as ReqwestHeaderMap, Body as ReqwestBody, Client as ReqwestClient,
 12};
 13use std::cell::RefCell;
 14use std::convert::TryFrom;
 15use std::path::{Path, PathBuf};
 16use std::rc::Rc;
 17use tokio::fs::File;
 18use tokio_util::codec::{BytesCodec, FramedRead};
 19use tokio_xmpp::{AsyncClient as TokioXmppClient, Event as TokioXmppEvent};
 20use xmpp_parsers::{
 21    bookmarks2::Conference,
 22    caps::{compute_disco, hash_caps, Caps},
 23    disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
 24    hashes::Algo,
 25    http_upload::{Header as HttpUploadHeader, SlotRequest, SlotResult},
 26    iq::{Iq, IqType},
 27    message::{Body, Message, MessageType},
 28    muc::{
 29        user::{MucUser, Status},
 30        Muc,
 31    },
 32    ns,
 33    presence::{Presence, Type as PresenceType},
 34    pubsub::pubsub::{Items, PubSub},
 35    roster::{Item as RosterItem, Roster},
 36    stanza_error::{DefinedCondition, ErrorType, StanzaError},
 37    BareJid, Element, FullJid, Jid,
 38};
 39#[macro_use]
 40extern crate log;
 41
 42mod pubsub;
 43
 44pub type Error = tokio_xmpp::Error;
 45
 46#[derive(Debug)]
 47pub enum ClientType {
 48    Bot,
 49    Pc,
 50}
 51
 52impl Default for ClientType {
 53    fn default() -> Self {
 54        ClientType::Bot
 55    }
 56}
 57
 58impl ToString for ClientType {
 59    fn to_string(&self) -> String {
 60        String::from(match self {
 61            ClientType::Bot => "bot",
 62            ClientType::Pc => "pc",
 63        })
 64    }
 65}
 66
 67#[derive(PartialEq)]
 68pub enum ClientFeature {
 69    #[cfg(feature = "avatars")]
 70    Avatars,
 71    ContactList,
 72    JoinRooms,
 73}
 74
 75pub type RoomNick = String;
 76
 77#[derive(Debug)]
 78pub enum Event {
 79    Online,
 80    Disconnected,
 81    ContactAdded(RosterItem),
 82    ContactRemoved(RosterItem),
 83    ContactChanged(RosterItem),
 84    #[cfg(feature = "avatars")]
 85    AvatarRetrieved(Jid, String),
 86    ChatMessage(BareJid, Body),
 87    JoinRoom(BareJid, Conference),
 88    LeaveRoom(BareJid),
 89    LeaveAllRooms,
 90    RoomJoined(BareJid),
 91    RoomLeft(BareJid),
 92    RoomMessage(BareJid, RoomNick, Body),
 93    HttpUploadedFile(String),
 94}
 95
 96#[derive(Default)]
 97pub struct ClientBuilder<'a> {
 98    jid: &'a str,
 99    password: &'a str,
100    website: String,
101    default_nick: String,
102    lang: Vec<String>,
103    disco: (ClientType, String),
104    features: Vec<ClientFeature>,
105}
106
107impl ClientBuilder<'_> {
108    pub fn new<'a>(jid: &'a str, password: &'a str) -> ClientBuilder<'a> {
109        ClientBuilder {
110            jid,
111            password,
112            website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
113            default_nick: String::from("xmpp-rs"),
114            lang: vec![String::from("en")],
115            disco: (ClientType::default(), String::from("tokio-xmpp")),
116            features: vec![],
117        }
118    }
119
120    pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
121        self.disco = (type_, String::from(name));
122        self
123    }
124
125    pub fn set_website(mut self, url: &str) -> Self {
126        self.website = String::from(url);
127        self
128    }
129
130    pub fn set_default_nick(mut self, nick: &str) -> Self {
131        self.default_nick = String::from(nick);
132        self
133    }
134
135    pub fn set_lang(mut self, lang: Vec<String>) -> Self {
136        self.lang = lang;
137        self
138    }
139
140    pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
141        self.features.push(feature);
142        self
143    }
144
145    fn make_disco(&self) -> DiscoInfoResult {
146        let identities = vec![Identity::new(
147            "client",
148            self.disco.0.to_string(),
149            "en",
150            self.disco.1.to_string(),
151        )];
152        let mut features = vec![Feature::new(ns::DISCO_INFO)];
153        #[cfg(feature = "avatars")]
154        {
155            if self.features.contains(&ClientFeature::Avatars) {
156                features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
157            }
158        }
159        if self.features.contains(&ClientFeature::JoinRooms) {
160            features.push(Feature::new(format!("{}+notify", ns::BOOKMARKS2)));
161        }
162        DiscoInfoResult {
163            node: None,
164            identities,
165            features,
166            extensions: vec![],
167        }
168    }
169
170    pub fn build(self) -> Result<Agent, Error> {
171        let client = TokioXmppClient::new(self.jid, self.password)?;
172        Ok(self.build_impl(client)?)
173    }
174
175    // This function is meant to be used for testing build
176    pub(crate) fn build_impl(self, client: TokioXmppClient) -> Result<Agent, Error> {
177        let disco = self.make_disco();
178        let node = self.website;
179
180        let agent = Agent {
181            client,
182            default_nick: Rc::new(RefCell::new(self.default_nick)),
183            lang: Rc::new(self.lang),
184            disco,
185            node,
186            uploads: Vec::new(),
187        };
188
189        Ok(agent)
190    }
191}
192
193pub struct Agent {
194    client: TokioXmppClient,
195    default_nick: Rc<RefCell<String>>,
196    lang: Rc<Vec<String>>,
197    disco: DiscoInfoResult,
198    node: String,
199    uploads: Vec<(String, Jid, PathBuf)>,
200}
201
202impl Agent {
203    pub async fn disconnect(&mut self) -> Result<(), Error> {
204        self.client.send_end().await
205    }
206
207    pub async fn join_room(
208        &mut self,
209        room: BareJid,
210        nick: Option<String>,
211        password: Option<String>,
212        lang: &str,
213        status: &str,
214    ) {
215        let mut muc = Muc::new();
216        if let Some(password) = password {
217            muc = muc.with_password(password);
218        }
219
220        let nick = nick.unwrap_or_else(|| self.default_nick.borrow().clone());
221        let room_jid = room.with_resource(nick);
222        let mut presence = Presence::new(PresenceType::None).with_to(Jid::Full(room_jid));
223        presence.add_payload(muc);
224        presence.set_status(String::from(lang), String::from(status));
225        let _ = self.client.send_stanza(presence.into()).await;
226    }
227
228    pub async fn send_message(
229        &mut self,
230        recipient: Jid,
231        type_: MessageType,
232        lang: &str,
233        text: &str,
234    ) {
235        let mut message = Message::new(Some(recipient));
236        message.type_ = type_;
237        message
238            .bodies
239            .insert(String::from(lang), Body(String::from(text)));
240        let _ = self.client.send_stanza(message.into()).await;
241    }
242
243    fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
244        let caps_data = compute_disco(disco);
245        let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
246        let caps = Caps::new(node, hash);
247
248        let mut presence = Presence::new(PresenceType::None);
249        presence.add_payload(caps);
250        presence
251    }
252
253    async fn handle_iq(&mut self, iq: Iq) -> Vec<Event> {
254        let mut events = vec![];
255        let from = iq
256            .from
257            .clone()
258            .unwrap_or_else(|| self.client.bound_jid().unwrap().clone());
259        if let IqType::Get(payload) = iq.payload {
260            if payload.is("query", ns::DISCO_INFO) {
261                let query = DiscoInfoQuery::try_from(payload);
262                match query {
263                    Ok(query) => {
264                        let mut disco_info = self.disco.clone();
265                        disco_info.node = query.node;
266                        let iq = Iq::from_result(iq.id, Some(disco_info))
267                            .with_to(iq.from.unwrap())
268                            .into();
269                        let _ = self.client.send_stanza(iq).await;
270                    }
271                    Err(err) => {
272                        let error = StanzaError::new(
273                            ErrorType::Modify,
274                            DefinedCondition::BadRequest,
275                            "en",
276                            &format!("{}", err),
277                        );
278                        let iq = Iq::from_error(iq.id, error)
279                            .with_to(iq.from.unwrap())
280                            .into();
281                        let _ = self.client.send_stanza(iq).await;
282                    }
283                }
284            } else {
285                // We MUST answer unhandled get iqs with a service-unavailable error.
286                let error = StanzaError::new(
287                    ErrorType::Cancel,
288                    DefinedCondition::ServiceUnavailable,
289                    "en",
290                    "No handler defined for this kind of iq.",
291                );
292                let iq = Iq::from_error(iq.id, error)
293                    .with_to(iq.from.unwrap())
294                    .into();
295                let _ = self.client.send_stanza(iq).await;
296            }
297        } else if let IqType::Result(Some(payload)) = iq.payload {
298            // TODO: move private iqs like this one somewhere else, for
299            // security reasons.
300            if payload.is("query", ns::ROSTER) && iq.from.is_none() {
301                let roster = Roster::try_from(payload).unwrap();
302                for item in roster.items.into_iter() {
303                    events.push(Event::ContactAdded(item));
304                }
305            } else if payload.is("pubsub", ns::PUBSUB) {
306                let new_events = pubsub::handle_iq_result(&from, payload);
307                events.extend(new_events);
308            } else if payload.is("slot", ns::HTTP_UPLOAD) {
309                let new_events = handle_upload_result(&from, iq.id, payload, self).await;
310                events.extend(new_events);
311            }
312        } else if let IqType::Set(_) = iq.payload {
313            // We MUST answer unhandled set iqs with a service-unavailable error.
314            let error = StanzaError::new(
315                ErrorType::Cancel,
316                DefinedCondition::ServiceUnavailable,
317                "en",
318                "No handler defined for this kind of iq.",
319            );
320            let iq = Iq::from_error(iq.id, error)
321                .with_to(iq.from.unwrap())
322                .into();
323            let _ = self.client.send_stanza(iq).await;
324        }
325
326        events
327    }
328
329    async fn handle_message(&mut self, message: Message) -> Vec<Event> {
330        let mut events = vec![];
331        let from = message.from.clone().unwrap();
332        let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect();
333        match message.get_best_body(langs) {
334            Some((_lang, body)) => match message.type_ {
335                MessageType::Groupchat => {
336                    let event = Event::RoomMessage(
337                        from.clone().into(),
338                        FullJid::try_from(from.clone()).unwrap().resource,
339                        body.clone(),
340                    );
341                    events.push(event)
342                }
343                MessageType::Chat | MessageType::Normal => {
344                    let event = Event::ChatMessage(from.clone().into(), body.clone());
345                    events.push(event)
346                }
347                _ => (),
348            },
349            None => (),
350        }
351        for child in message.payloads {
352            if child.is("event", ns::PUBSUB_EVENT) {
353                let new_events = pubsub::handle_event(&from, child, self).await;
354                events.extend(new_events);
355            }
356        }
357
358        events
359    }
360
361    async fn handle_presence(&mut self, presence: Presence) -> Vec<Event> {
362        let mut events = vec![];
363        let from: BareJid = match presence.from.clone().unwrap() {
364            Jid::Full(FullJid { node, domain, .. }) => BareJid { node, domain },
365            Jid::Bare(bare) => bare,
366        };
367        for payload in presence.payloads.into_iter() {
368            let muc_user = match MucUser::try_from(payload) {
369                Ok(muc_user) => muc_user,
370                _ => continue,
371            };
372            for status in muc_user.status.into_iter() {
373                if status == Status::SelfPresence {
374                    events.push(Event::RoomJoined(from.clone()));
375                    break;
376                }
377            }
378        }
379
380        events
381    }
382
383    pub async fn wait_for_events(&mut self) -> Option<Vec<Event>> {
384        if let Some(event) = self.client.next().await {
385            let mut events = Vec::new();
386
387            match event {
388                TokioXmppEvent::Online { resumed: false, .. } => {
389                    let presence = Self::make_initial_presence(&self.disco, &self.node).into();
390                    let _ = self.client.send_stanza(presence).await;
391                    events.push(Event::Online);
392                    // TODO: only send this when the ContactList feature is enabled.
393                    let iq = Iq::from_get(
394                        "roster",
395                        Roster {
396                            ver: None,
397                            items: vec![],
398                        },
399                    )
400                    .into();
401                    let _ = self.client.send_stanza(iq).await;
402                    // TODO: only send this when the JoinRooms feature is enabled.
403                    let iq =
404                        Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2))).into();
405                    let _ = self.client.send_stanza(iq).await;
406                }
407                TokioXmppEvent::Online { resumed: true, .. } => {}
408                TokioXmppEvent::Disconnected(_) => {
409                    events.push(Event::Disconnected);
410                }
411                TokioXmppEvent::Stanza(elem) => {
412                    if elem.is("iq", "jabber:client") {
413                        let iq = Iq::try_from(elem).unwrap();
414                        let new_events = self.handle_iq(iq).await;
415                        events.extend(new_events);
416                    } else if elem.is("message", "jabber:client") {
417                        let message = Message::try_from(elem).unwrap();
418                        let new_events = self.handle_message(message).await;
419                        events.extend(new_events);
420                    } else if elem.is("presence", "jabber:client") {
421                        let presence = Presence::try_from(elem).unwrap();
422                        let new_events = self.handle_presence(presence).await;
423                        events.extend(new_events);
424                    } else if elem.is("error", "http://etherx.jabber.org/streams") {
425                        println!("Received a fatal stream error: {}", String::from(&elem));
426                    } else {
427                        panic!("Unknown stanza: {}", String::from(&elem));
428                    }
429                }
430            }
431
432            Some(events)
433        } else {
434            None
435        }
436    }
437
438    pub async fn upload_file_with(&mut self, service: &str, path: &Path) {
439        let name = path.file_name().unwrap().to_str().unwrap().to_string();
440        let file = File::open(path).await.unwrap();
441        let size = file.metadata().await.unwrap().len();
442        let slot_request = SlotRequest {
443            filename: name,
444            size: size,
445            content_type: None,
446        };
447        let to = service.parse::<Jid>().unwrap();
448        let request = Iq::from_get("upload1", slot_request).with_to(to.clone());
449        self.uploads
450            .push((String::from("upload1"), to, path.to_path_buf()));
451        self.client.send_stanza(request.into()).await.unwrap();
452    }
453}
454
455async fn handle_upload_result(
456    from: &Jid,
457    iqid: String,
458    elem: Element,
459    agent: &mut Agent,
460) -> impl IntoIterator<Item = Event> {
461    let mut res: Option<(usize, PathBuf)> = None;
462
463    for (i, (id, to, filepath)) in agent.uploads.iter().enumerate() {
464        if to == from && id == &iqid {
465            res = Some((i, filepath.to_path_buf()));
466            break;
467        }
468    }
469
470    if let Some((index, file)) = res {
471        agent.uploads.remove(index);
472        let slot = SlotResult::try_from(elem).unwrap();
473
474        let mut headers = ReqwestHeaderMap::new();
475        for header in slot.put.headers {
476            let (attr, val) = match header {
477                HttpUploadHeader::Authorization(val) => ("Authorization", val),
478                HttpUploadHeader::Cookie(val) => ("Cookie", val),
479                HttpUploadHeader::Expires(val) => ("Expires", val),
480            };
481            headers.insert(attr, val.parse().unwrap());
482        }
483
484        let web = ReqwestClient::new();
485        let stream = FramedRead::new(File::open(file).await.unwrap(), BytesCodec::new());
486        let body = ReqwestBody::wrap_stream(stream);
487        let res = web
488            .put(slot.put.url.as_str())
489            .headers(headers)
490            .body(body)
491            .send()
492            .await
493            .unwrap();
494        if res.status() == 201 {
495            return vec![Event::HttpUploadedFile(slot.get.url)];
496        }
497    }
498
499    return vec![];
500}
501
502#[cfg(test)]
503mod tests {
504    use super::{Agent, ClientBuilder, ClientFeature, ClientType, Event};
505    use tokio_xmpp::AsyncClient as TokioXmppClient;
506
507    #[tokio::test]
508    async fn test_simple() {
509        let client = TokioXmppClient::new("foo@bar", "meh").unwrap();
510
511        // Client instance
512        let client_builder = ClientBuilder::new("foo@bar", "meh")
513            .set_client(ClientType::Bot, "xmpp-rs")
514            .set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
515            .set_default_nick("bot")
516            .enable_feature(ClientFeature::Avatars)
517            .enable_feature(ClientFeature::ContactList);
518
519        let mut agent: Agent = client_builder.build_impl(client).unwrap();
520
521        while let Some(events) = agent.wait_for_events().await {
522            assert!(match events[0] {
523                Event::Disconnected => true,
524                _ => false,
525            });
526            assert_eq!(events.len(), 1);
527            break;
528        }
529    }
530}