lib.rs

  1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
  2//
  3// This Source Code Form is subject to the terms of the Mozilla Public
  4// License, v. 2.0. If a copy of the MPL was not distributed with this
  5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7#![deny(bare_trait_objects)]
  8
  9use futures::stream::StreamExt;
 10use reqwest::{
 11    header::HeaderMap as ReqwestHeaderMap, Body as ReqwestBody, Client as ReqwestClient,
 12};
 13use std::convert::TryFrom;
 14use std::path::{Path, PathBuf};
 15use std::sync::{Arc, RwLock};
 16use tokio::fs::File;
 17use tokio_util::codec::{BytesCodec, FramedRead};
 18use tokio_xmpp::{AsyncClient as TokioXmppClient, Event as TokioXmppEvent};
 19use xmpp_parsers::{
 20    bookmarks2::Conference,
 21    caps::{compute_disco, hash_caps, Caps},
 22    disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
 23    hashes::Algo,
 24    http_upload::{Header as HttpUploadHeader, SlotRequest, SlotResult},
 25    iq::{Iq, IqType},
 26    message::{Body, Message, MessageType},
 27    muc::{
 28        user::{MucUser, Status},
 29        Muc,
 30    },
 31    ns,
 32    presence::{Presence, Type as PresenceType},
 33    pubsub::pubsub::{Items, PubSub},
 34    roster::{Item as RosterItem, Roster},
 35    stanza_error::{DefinedCondition, ErrorType, StanzaError},
 36    BareJid, Element, FullJid, Jid,
 37};
 38#[macro_use]
 39extern crate log;
 40
 41mod pubsub;
 42
 43pub type Error = tokio_xmpp::Error;
 44
 45#[derive(Debug)]
 46pub enum ClientType {
 47    Bot,
 48    Pc,
 49}
 50
 51impl Default for ClientType {
 52    fn default() -> Self {
 53        ClientType::Bot
 54    }
 55}
 56
 57impl ToString for ClientType {
 58    fn to_string(&self) -> String {
 59        String::from(match self {
 60            ClientType::Bot => "bot",
 61            ClientType::Pc => "pc",
 62        })
 63    }
 64}
 65
 66#[derive(PartialEq)]
 67pub enum ClientFeature {
 68    #[cfg(feature = "avatars")]
 69    Avatars,
 70    ContactList,
 71    JoinRooms,
 72}
 73
 74pub type Id = Option<String>;
 75pub type RoomNick = String;
 76
 77#[derive(Debug)]
 78pub enum Event {
 79    Online,
 80    Disconnected,
 81    ContactAdded(RosterItem),
 82    ContactRemoved(RosterItem),
 83    ContactChanged(RosterItem),
 84    #[cfg(feature = "avatars")]
 85    AvatarRetrieved(Jid, String),
 86    ChatMessage(Id, BareJid, Body),
 87    JoinRoom(BareJid, Conference),
 88    LeaveRoom(BareJid),
 89    LeaveAllRooms,
 90    RoomJoined(BareJid),
 91    RoomLeft(BareJid),
 92    RoomMessage(Id, BareJid, RoomNick, Body),
 93    ServiceMessage(Id, BareJid, Body),
 94    HttpUploadedFile(String),
 95}
 96
 97pub struct ClientBuilder<'a> {
 98    jid: BareJid,
 99    password: &'a str,
100    website: String,
101    default_nick: String,
102    lang: Vec<String>,
103    disco: (ClientType, String),
104    features: Vec<ClientFeature>,
105    resource: Option<String>,
106}
107
108impl ClientBuilder<'_> {
109    pub fn new<'a>(jid: BareJid, password: &'a str) -> ClientBuilder<'a> {
110        ClientBuilder {
111            jid,
112            password,
113            website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
114            default_nick: String::from("xmpp-rs"),
115            lang: vec![String::from("en")],
116            disco: (ClientType::default(), String::from("tokio-xmpp")),
117            features: vec![],
118            resource: None,
119        }
120    }
121
122    /// Optionally set a resource associated to this device on the client
123    pub fn set_resource(mut self, resource: &str) -> Self {
124        self.resource = Some(resource.to_string());
125        self
126    }
127
128    pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
129        self.disco = (type_, String::from(name));
130        self
131    }
132
133    pub fn set_website(mut self, url: &str) -> Self {
134        self.website = String::from(url);
135        self
136    }
137
138    pub fn set_default_nick(mut self, nick: &str) -> Self {
139        self.default_nick = String::from(nick);
140        self
141    }
142
143    pub fn set_lang(mut self, lang: Vec<String>) -> Self {
144        self.lang = lang;
145        self
146    }
147
148    pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
149        self.features.push(feature);
150        self
151    }
152
153    fn make_disco(&self) -> DiscoInfoResult {
154        let identities = vec![Identity::new(
155            "client",
156            self.disco.0.to_string(),
157            "en",
158            self.disco.1.to_string(),
159        )];
160        let mut features = vec![Feature::new(ns::DISCO_INFO)];
161        #[cfg(feature = "avatars")]
162        {
163            if self.features.contains(&ClientFeature::Avatars) {
164                features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
165            }
166        }
167        if self.features.contains(&ClientFeature::JoinRooms) {
168            features.push(Feature::new(format!("{}+notify", ns::BOOKMARKS2)));
169        }
170        DiscoInfoResult {
171            node: None,
172            identities,
173            features,
174            extensions: vec![],
175        }
176    }
177
178    pub fn build(self) -> Agent {
179        let jid: Jid = if let Some(resource) = &self.resource {
180            self.jid.clone().with_resource(resource.to_string()).into()
181        } else {
182            self.jid.clone().into()
183        };
184
185        let client = TokioXmppClient::new(jid, self.password);
186        self.build_impl(client)
187    }
188
189    // This function is meant to be used for testing build
190    pub(crate) fn build_impl(self, client: TokioXmppClient) -> Agent {
191        let disco = self.make_disco();
192        let node = self.website;
193
194        Agent {
195            client,
196            default_nick: Arc::new(RwLock::new(self.default_nick)),
197            lang: Arc::new(self.lang),
198            disco,
199            node,
200            uploads: Vec::new(),
201        }
202    }
203}
204
205pub struct Agent {
206    client: TokioXmppClient,
207    default_nick: Arc<RwLock<String>>,
208    lang: Arc<Vec<String>>,
209    disco: DiscoInfoResult,
210    node: String,
211    uploads: Vec<(String, Jid, PathBuf)>,
212}
213
214impl Agent {
215    pub async fn disconnect(&mut self) -> Result<(), Error> {
216        self.client.send_end().await
217    }
218
219    pub async fn join_room(
220        &mut self,
221        room: BareJid,
222        nick: Option<String>,
223        password: Option<String>,
224        lang: &str,
225        status: &str,
226    ) {
227        let mut muc = Muc::new();
228        if let Some(password) = password {
229            muc = muc.with_password(password);
230        }
231
232        let nick = nick.unwrap_or_else(|| self.default_nick.read().unwrap().clone());
233        let room_jid = room.with_resource(nick);
234        let mut presence = Presence::new(PresenceType::None).with_to(Jid::Full(room_jid));
235        presence.add_payload(muc);
236        presence.set_status(String::from(lang), String::from(status));
237        let _ = self.client.send_stanza(presence.into()).await;
238    }
239
240    pub async fn send_message(
241        &mut self,
242        recipient: Jid,
243        type_: MessageType,
244        lang: &str,
245        text: &str,
246    ) {
247        let mut message = Message::new(Some(recipient));
248        message.type_ = type_;
249        message
250            .bodies
251            .insert(String::from(lang), Body(String::from(text)));
252        let _ = self.client.send_stanza(message.into()).await;
253    }
254
255    fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
256        let caps_data = compute_disco(disco);
257        let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
258        let caps = Caps::new(node, hash);
259
260        let mut presence = Presence::new(PresenceType::None);
261        presence.add_payload(caps);
262        presence
263    }
264
265    async fn handle_iq(&mut self, iq: Iq) -> Vec<Event> {
266        let mut events = vec![];
267        let from = iq
268            .from
269            .clone()
270            .unwrap_or_else(|| self.client.bound_jid().unwrap().clone());
271        if let IqType::Get(payload) = iq.payload {
272            if payload.is("query", ns::DISCO_INFO) {
273                let query = DiscoInfoQuery::try_from(payload);
274                match query {
275                    Ok(query) => {
276                        let mut disco_info = self.disco.clone();
277                        disco_info.node = query.node;
278                        let iq = Iq::from_result(iq.id, Some(disco_info))
279                            .with_to(iq.from.unwrap())
280                            .into();
281                        let _ = self.client.send_stanza(iq).await;
282                    }
283                    Err(err) => {
284                        let error = StanzaError::new(
285                            ErrorType::Modify,
286                            DefinedCondition::BadRequest,
287                            "en",
288                            &format!("{}", err),
289                        );
290                        let iq = Iq::from_error(iq.id, error)
291                            .with_to(iq.from.unwrap())
292                            .into();
293                        let _ = self.client.send_stanza(iq).await;
294                    }
295                }
296            } else {
297                // We MUST answer unhandled get iqs with a service-unavailable error.
298                let error = StanzaError::new(
299                    ErrorType::Cancel,
300                    DefinedCondition::ServiceUnavailable,
301                    "en",
302                    "No handler defined for this kind of iq.",
303                );
304                let iq = Iq::from_error(iq.id, error)
305                    .with_to(iq.from.unwrap())
306                    .into();
307                let _ = self.client.send_stanza(iq).await;
308            }
309        } else if let IqType::Result(Some(payload)) = iq.payload {
310            // TODO: move private iqs like this one somewhere else, for
311            // security reasons.
312            if payload.is("query", ns::ROSTER) && Some(from.clone()) == iq.from {
313                let roster = Roster::try_from(payload).unwrap();
314                for item in roster.items.into_iter() {
315                    events.push(Event::ContactAdded(item));
316                }
317            } else if payload.is("pubsub", ns::PUBSUB) {
318                let new_events = pubsub::handle_iq_result(&from, payload);
319                events.extend(new_events);
320            } else if payload.is("slot", ns::HTTP_UPLOAD) {
321                let new_events = handle_upload_result(&from, iq.id, payload, self).await;
322                events.extend(new_events);
323            }
324        } else if let IqType::Set(_) = iq.payload {
325            // We MUST answer unhandled set iqs with a service-unavailable error.
326            let error = StanzaError::new(
327                ErrorType::Cancel,
328                DefinedCondition::ServiceUnavailable,
329                "en",
330                "No handler defined for this kind of iq.",
331            );
332            let iq = Iq::from_error(iq.id, error)
333                .with_to(iq.from.unwrap())
334                .into();
335            let _ = self.client.send_stanza(iq).await;
336        }
337
338        events
339    }
340
341    async fn handle_message(&mut self, message: Message) -> Vec<Event> {
342        let mut events = vec![];
343        let from = message.from.clone().unwrap();
344        let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect();
345        match message.get_best_body(langs) {
346            Some((_lang, body)) => match message.type_ {
347                MessageType::Groupchat => {
348                    let event = match from.clone() {
349                        Jid::Full(full) => Event::RoomMessage(
350                            message.id.clone(),
351                            from.clone().into(),
352                            full.resource,
353                            body.clone(),
354                        ),
355                        Jid::Bare(bare) => {
356                            Event::ServiceMessage(message.id.clone(), bare, body.clone())
357                        }
358                    };
359                    events.push(event)
360                }
361                MessageType::Chat | MessageType::Normal => {
362                    let event =
363                        Event::ChatMessage(message.id.clone(), from.clone().into(), body.clone());
364                    events.push(event)
365                }
366                _ => (),
367            },
368            None => (),
369        }
370        for child in message.payloads {
371            if child.is("event", ns::PUBSUB_EVENT) {
372                let new_events = pubsub::handle_event(&from, child, self).await;
373                events.extend(new_events);
374            }
375        }
376
377        events
378    }
379
380    async fn handle_presence(&mut self, presence: Presence) -> Vec<Event> {
381        let mut events = vec![];
382        let from: BareJid = match presence.from.clone().unwrap() {
383            Jid::Full(FullJid { node, domain, .. }) => BareJid { node, domain },
384            Jid::Bare(bare) => bare,
385        };
386        for payload in presence.payloads.into_iter() {
387            let muc_user = match MucUser::try_from(payload) {
388                Ok(muc_user) => muc_user,
389                _ => continue,
390            };
391            for status in muc_user.status.into_iter() {
392                if status == Status::SelfPresence {
393                    events.push(Event::RoomJoined(from.clone()));
394                    break;
395                }
396            }
397        }
398
399        events
400    }
401
402    pub async fn wait_for_events(&mut self) -> Option<Vec<Event>> {
403        if let Some(event) = self.client.next().await {
404            let mut events = Vec::new();
405
406            match event {
407                TokioXmppEvent::Online { resumed: false, .. } => {
408                    let presence = Self::make_initial_presence(&self.disco, &self.node).into();
409                    let _ = self.client.send_stanza(presence).await;
410                    events.push(Event::Online);
411                    // TODO: only send this when the ContactList feature is enabled.
412                    let iq = Iq::from_get(
413                        "roster",
414                        Roster {
415                            ver: None,
416                            items: vec![],
417                        },
418                    )
419                    .into();
420                    let _ = self.client.send_stanza(iq).await;
421                    // TODO: only send this when the JoinRooms feature is enabled.
422                    let iq =
423                        Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2))).into();
424                    let _ = self.client.send_stanza(iq).await;
425                }
426                TokioXmppEvent::Online { resumed: true, .. } => {}
427                TokioXmppEvent::Disconnected(_) => {
428                    events.push(Event::Disconnected);
429                }
430                TokioXmppEvent::Stanza(elem) => {
431                    if elem.is("iq", "jabber:client") {
432                        let iq = Iq::try_from(elem).unwrap();
433                        let new_events = self.handle_iq(iq).await;
434                        events.extend(new_events);
435                    } else if elem.is("message", "jabber:client") {
436                        let message = Message::try_from(elem).unwrap();
437                        let new_events = self.handle_message(message).await;
438                        events.extend(new_events);
439                    } else if elem.is("presence", "jabber:client") {
440                        let presence = Presence::try_from(elem).unwrap();
441                        let new_events = self.handle_presence(presence).await;
442                        events.extend(new_events);
443                    } else if elem.is("error", "http://etherx.jabber.org/streams") {
444                        println!("Received a fatal stream error: {}", String::from(&elem));
445                    } else {
446                        panic!("Unknown stanza: {}", String::from(&elem));
447                    }
448                }
449            }
450
451            Some(events)
452        } else {
453            None
454        }
455    }
456
457    pub async fn upload_file_with(&mut self, service: &str, path: &Path) {
458        let name = path.file_name().unwrap().to_str().unwrap().to_string();
459        let file = File::open(path).await.unwrap();
460        let size = file.metadata().await.unwrap().len();
461        let slot_request = SlotRequest {
462            filename: name,
463            size: size,
464            content_type: None,
465        };
466        let to = service.parse::<Jid>().unwrap();
467        let request = Iq::from_get("upload1", slot_request).with_to(to.clone());
468        self.uploads
469            .push((String::from("upload1"), to, path.to_path_buf()));
470        self.client.send_stanza(request.into()).await.unwrap();
471    }
472}
473
474async fn handle_upload_result(
475    from: &Jid,
476    iqid: String,
477    elem: Element,
478    agent: &mut Agent,
479) -> impl IntoIterator<Item = Event> {
480    let mut res: Option<(usize, PathBuf)> = None;
481
482    for (i, (id, to, filepath)) in agent.uploads.iter().enumerate() {
483        if to == from && id == &iqid {
484            res = Some((i, filepath.to_path_buf()));
485            break;
486        }
487    }
488
489    if let Some((index, file)) = res {
490        agent.uploads.remove(index);
491        let slot = SlotResult::try_from(elem).unwrap();
492
493        let mut headers = ReqwestHeaderMap::new();
494        for header in slot.put.headers {
495            let (attr, val) = match header {
496                HttpUploadHeader::Authorization(val) => ("Authorization", val),
497                HttpUploadHeader::Cookie(val) => ("Cookie", val),
498                HttpUploadHeader::Expires(val) => ("Expires", val),
499            };
500            headers.insert(attr, val.parse().unwrap());
501        }
502
503        let web = ReqwestClient::new();
504        let stream = FramedRead::new(File::open(file).await.unwrap(), BytesCodec::new());
505        let body = ReqwestBody::wrap_stream(stream);
506        let res = web
507            .put(slot.put.url.as_str())
508            .headers(headers)
509            .body(body)
510            .send()
511            .await
512            .unwrap();
513        if res.status() == 201 {
514            return vec![Event::HttpUploadedFile(slot.get.url)];
515        }
516    }
517
518    return vec![];
519}
520
521#[cfg(test)]
522mod tests {
523    use super::{Agent, BareJid, ClientBuilder, ClientFeature, ClientType, Event};
524    use std::str::FromStr;
525    use tokio_xmpp::AsyncClient as TokioXmppClient;
526
527    #[tokio::test]
528    async fn test_simple() {
529        let jid = BareJid::from_str("foo@bar").unwrap();
530
531        let client = TokioXmppClient::new(jid.clone(), "meh");
532
533        // Client instance
534        let client_builder = ClientBuilder::new(jid, "meh")
535            .set_client(ClientType::Bot, "xmpp-rs")
536            .set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
537            .set_default_nick("bot")
538            .enable_feature(ClientFeature::Avatars)
539            .enable_feature(ClientFeature::ContactList);
540
541        let mut agent: Agent = client_builder.build_impl(client);
542
543        while let Some(events) = agent.wait_for_events().await {
544            assert!(match events[0] {
545                Event::Disconnected => true,
546                _ => false,
547            });
548            assert_eq!(events.len(), 1);
549            break;
550        }
551    }
552}