lib.rs

  1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
  2//
  3// This Source Code Form is subject to the terms of the Mozilla Public
  4// License, v. 2.0. If a copy of the MPL was not distributed with this
  5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7#![deny(bare_trait_objects)]
  8
  9use futures::stream::StreamExt;
 10use reqwest::{
 11    header::HeaderMap as ReqwestHeaderMap, Body as ReqwestBody, Client as ReqwestClient,
 12};
 13use std::cell::RefCell;
 14use std::convert::TryFrom;
 15use std::path::{Path, PathBuf};
 16use std::rc::Rc;
 17use tokio::fs::File;
 18use tokio_util::codec::{BytesCodec, FramedRead};
 19use tokio_xmpp::{AsyncClient as TokioXmppClient, Event as TokioXmppEvent};
 20use xmpp_parsers::{
 21    bookmarks2::Conference,
 22    caps::{compute_disco, hash_caps, Caps},
 23    disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
 24    hashes::Algo,
 25    http_upload::{Header as HttpUploadHeader, SlotRequest, SlotResult},
 26    iq::{Iq, IqType},
 27    message::{Body, Message, MessageType},
 28    muc::{
 29        user::{MucUser, Status},
 30        Muc,
 31    },
 32    ns,
 33    presence::{Presence, Type as PresenceType},
 34    pubsub::pubsub::{Items, PubSub},
 35    roster::{Item as RosterItem, Roster},
 36    stanza_error::{DefinedCondition, ErrorType, StanzaError},
 37    BareJid, Element, FullJid, Jid,
 38};
 39#[macro_use]
 40extern crate log;
 41
 42mod pubsub;
 43
 44pub type Error = tokio_xmpp::Error;
 45
 46#[derive(Debug)]
 47pub enum ClientType {
 48    Bot,
 49    Pc,
 50}
 51
 52impl Default for ClientType {
 53    fn default() -> Self {
 54        ClientType::Bot
 55    }
 56}
 57
 58impl ToString for ClientType {
 59    fn to_string(&self) -> String {
 60        String::from(match self {
 61            ClientType::Bot => "bot",
 62            ClientType::Pc => "pc",
 63        })
 64    }
 65}
 66
 67#[derive(PartialEq)]
 68pub enum ClientFeature {
 69    #[cfg(feature = "avatars")]
 70    Avatars,
 71    ContactList,
 72    JoinRooms,
 73}
 74
 75pub type Id = Option<String>;
 76pub type RoomNick = String;
 77
 78#[derive(Debug)]
 79pub enum Event {
 80    Online,
 81    Disconnected,
 82    ContactAdded(RosterItem),
 83    ContactRemoved(RosterItem),
 84    ContactChanged(RosterItem),
 85    #[cfg(feature = "avatars")]
 86    AvatarRetrieved(Jid, String),
 87    ChatMessage(Id, BareJid, Body),
 88    JoinRoom(BareJid, Conference),
 89    LeaveRoom(BareJid),
 90    LeaveAllRooms,
 91    RoomJoined(BareJid),
 92    RoomLeft(BareJid),
 93    RoomMessage(Id, BareJid, RoomNick, Body),
 94    ServiceMessage(Id, BareJid, Body),
 95    HttpUploadedFile(String),
 96}
 97
 98#[derive(Default)]
 99pub struct ClientBuilder<'a> {
100    jid: &'a str,
101    password: &'a str,
102    website: String,
103    default_nick: String,
104    lang: Vec<String>,
105    disco: (ClientType, String),
106    features: Vec<ClientFeature>,
107}
108
109impl ClientBuilder<'_> {
110    pub fn new<'a>(jid: &'a str, password: &'a str) -> ClientBuilder<'a> {
111        ClientBuilder {
112            jid,
113            password,
114            website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
115            default_nick: String::from("xmpp-rs"),
116            lang: vec![String::from("en")],
117            disco: (ClientType::default(), String::from("tokio-xmpp")),
118            features: vec![],
119        }
120    }
121
122    pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
123        self.disco = (type_, String::from(name));
124        self
125    }
126
127    pub fn set_website(mut self, url: &str) -> Self {
128        self.website = String::from(url);
129        self
130    }
131
132    pub fn set_default_nick(mut self, nick: &str) -> Self {
133        self.default_nick = String::from(nick);
134        self
135    }
136
137    pub fn set_lang(mut self, lang: Vec<String>) -> Self {
138        self.lang = lang;
139        self
140    }
141
142    pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
143        self.features.push(feature);
144        self
145    }
146
147    fn make_disco(&self) -> DiscoInfoResult {
148        let identities = vec![Identity::new(
149            "client",
150            self.disco.0.to_string(),
151            "en",
152            self.disco.1.to_string(),
153        )];
154        let mut features = vec![Feature::new(ns::DISCO_INFO)];
155        #[cfg(feature = "avatars")]
156        {
157            if self.features.contains(&ClientFeature::Avatars) {
158                features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
159            }
160        }
161        if self.features.contains(&ClientFeature::JoinRooms) {
162            features.push(Feature::new(format!("{}+notify", ns::BOOKMARKS2)));
163        }
164        DiscoInfoResult {
165            node: None,
166            identities,
167            features,
168            extensions: vec![],
169        }
170    }
171
172    pub fn build(self) -> Result<Agent, Error> {
173        let client = TokioXmppClient::new(self.jid, self.password)?;
174        Ok(self.build_impl(client)?)
175    }
176
177    // This function is meant to be used for testing build
178    pub(crate) fn build_impl(self, client: TokioXmppClient) -> Result<Agent, Error> {
179        let disco = self.make_disco();
180        let node = self.website;
181
182        let agent = Agent {
183            client,
184            default_nick: Rc::new(RefCell::new(self.default_nick)),
185            lang: Rc::new(self.lang),
186            disco,
187            node,
188            uploads: Vec::new(),
189        };
190
191        Ok(agent)
192    }
193}
194
195pub struct Agent {
196    client: TokioXmppClient,
197    default_nick: Rc<RefCell<String>>,
198    lang: Rc<Vec<String>>,
199    disco: DiscoInfoResult,
200    node: String,
201    uploads: Vec<(String, Jid, PathBuf)>,
202}
203
204impl Agent {
205    pub async fn disconnect(&mut self) -> Result<(), Error> {
206        self.client.send_end().await
207    }
208
209    pub async fn join_room(
210        &mut self,
211        room: BareJid,
212        nick: Option<String>,
213        password: Option<String>,
214        lang: &str,
215        status: &str,
216    ) {
217        let mut muc = Muc::new();
218        if let Some(password) = password {
219            muc = muc.with_password(password);
220        }
221
222        let nick = nick.unwrap_or_else(|| self.default_nick.borrow().clone());
223        let room_jid = room.with_resource(nick);
224        let mut presence = Presence::new(PresenceType::None).with_to(Jid::Full(room_jid));
225        presence.add_payload(muc);
226        presence.set_status(String::from(lang), String::from(status));
227        let _ = self.client.send_stanza(presence.into()).await;
228    }
229
230    pub async fn send_message(
231        &mut self,
232        recipient: Jid,
233        type_: MessageType,
234        lang: &str,
235        text: &str,
236    ) {
237        let mut message = Message::new(Some(recipient));
238        message.type_ = type_;
239        message
240            .bodies
241            .insert(String::from(lang), Body(String::from(text)));
242        let _ = self.client.send_stanza(message.into()).await;
243    }
244
245    fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
246        let caps_data = compute_disco(disco);
247        let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
248        let caps = Caps::new(node, hash);
249
250        let mut presence = Presence::new(PresenceType::None);
251        presence.add_payload(caps);
252        presence
253    }
254
255    async fn handle_iq(&mut self, iq: Iq) -> Vec<Event> {
256        let mut events = vec![];
257        let from = iq
258            .from
259            .clone()
260            .unwrap_or_else(|| self.client.bound_jid().unwrap().clone());
261        if let IqType::Get(payload) = iq.payload {
262            if payload.is("query", ns::DISCO_INFO) {
263                let query = DiscoInfoQuery::try_from(payload);
264                match query {
265                    Ok(query) => {
266                        let mut disco_info = self.disco.clone();
267                        disco_info.node = query.node;
268                        let iq = Iq::from_result(iq.id, Some(disco_info))
269                            .with_to(iq.from.unwrap())
270                            .into();
271                        let _ = self.client.send_stanza(iq).await;
272                    }
273                    Err(err) => {
274                        let error = StanzaError::new(
275                            ErrorType::Modify,
276                            DefinedCondition::BadRequest,
277                            "en",
278                            &format!("{}", err),
279                        );
280                        let iq = Iq::from_error(iq.id, error)
281                            .with_to(iq.from.unwrap())
282                            .into();
283                        let _ = self.client.send_stanza(iq).await;
284                    }
285                }
286            } else {
287                // We MUST answer unhandled get iqs with a service-unavailable error.
288                let error = StanzaError::new(
289                    ErrorType::Cancel,
290                    DefinedCondition::ServiceUnavailable,
291                    "en",
292                    "No handler defined for this kind of iq.",
293                );
294                let iq = Iq::from_error(iq.id, error)
295                    .with_to(iq.from.unwrap())
296                    .into();
297                let _ = self.client.send_stanza(iq).await;
298            }
299        } else if let IqType::Result(Some(payload)) = iq.payload {
300            // TODO: move private iqs like this one somewhere else, for
301            // security reasons.
302            if payload.is("query", ns::ROSTER) && iq.from.is_none() {
303                let roster = Roster::try_from(payload).unwrap();
304                for item in roster.items.into_iter() {
305                    events.push(Event::ContactAdded(item));
306                }
307            } else if payload.is("pubsub", ns::PUBSUB) {
308                let new_events = pubsub::handle_iq_result(&from, payload);
309                events.extend(new_events);
310            } else if payload.is("slot", ns::HTTP_UPLOAD) {
311                let new_events = handle_upload_result(&from, iq.id, payload, self).await;
312                events.extend(new_events);
313            }
314        } else if let IqType::Set(_) = iq.payload {
315            // We MUST answer unhandled set iqs with a service-unavailable error.
316            let error = StanzaError::new(
317                ErrorType::Cancel,
318                DefinedCondition::ServiceUnavailable,
319                "en",
320                "No handler defined for this kind of iq.",
321            );
322            let iq = Iq::from_error(iq.id, error)
323                .with_to(iq.from.unwrap())
324                .into();
325            let _ = self.client.send_stanza(iq).await;
326        }
327
328        events
329    }
330
331    async fn handle_message(&mut self, message: Message) -> Vec<Event> {
332        let mut events = vec![];
333        let from = message.from.clone().unwrap();
334        let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect();
335        match message.get_best_body(langs) {
336            Some((_lang, body)) => match message.type_ {
337                MessageType::Groupchat => {
338                    let event = match from.clone() {
339                        Jid::Full(full) => Event::RoomMessage(
340                            message.id.clone(),
341                            from.clone().into(),
342                            full.resource,
343                            body.clone(),
344                        ),
345                        Jid::Bare(bare) => {
346                            Event::ServiceMessage(message.id.clone(), bare, body.clone())
347                        }
348                    };
349                    events.push(event)
350                }
351                MessageType::Chat | MessageType::Normal => {
352                    let event =
353                        Event::ChatMessage(message.id.clone(), from.clone().into(), body.clone());
354                    events.push(event)
355                }
356                _ => (),
357            },
358            None => (),
359        }
360        for child in message.payloads {
361            if child.is("event", ns::PUBSUB_EVENT) {
362                let new_events = pubsub::handle_event(&from, child, self).await;
363                events.extend(new_events);
364            }
365        }
366
367        events
368    }
369
370    async fn handle_presence(&mut self, presence: Presence) -> Vec<Event> {
371        let mut events = vec![];
372        let from: BareJid = match presence.from.clone().unwrap() {
373            Jid::Full(FullJid { node, domain, .. }) => BareJid { node, domain },
374            Jid::Bare(bare) => bare,
375        };
376        for payload in presence.payloads.into_iter() {
377            let muc_user = match MucUser::try_from(payload) {
378                Ok(muc_user) => muc_user,
379                _ => continue,
380            };
381            for status in muc_user.status.into_iter() {
382                if status == Status::SelfPresence {
383                    events.push(Event::RoomJoined(from.clone()));
384                    break;
385                }
386            }
387        }
388
389        events
390    }
391
392    pub async fn wait_for_events(&mut self) -> Option<Vec<Event>> {
393        if let Some(event) = self.client.next().await {
394            let mut events = Vec::new();
395
396            match event {
397                TokioXmppEvent::Online { resumed: false, .. } => {
398                    let presence = Self::make_initial_presence(&self.disco, &self.node).into();
399                    let _ = self.client.send_stanza(presence).await;
400                    events.push(Event::Online);
401                    // TODO: only send this when the ContactList feature is enabled.
402                    let iq = Iq::from_get(
403                        "roster",
404                        Roster {
405                            ver: None,
406                            items: vec![],
407                        },
408                    )
409                    .into();
410                    let _ = self.client.send_stanza(iq).await;
411                    // TODO: only send this when the JoinRooms feature is enabled.
412                    let iq =
413                        Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2))).into();
414                    let _ = self.client.send_stanza(iq).await;
415                }
416                TokioXmppEvent::Online { resumed: true, .. } => {}
417                TokioXmppEvent::Disconnected(_) => {
418                    events.push(Event::Disconnected);
419                }
420                TokioXmppEvent::Stanza(elem) => {
421                    if elem.is("iq", "jabber:client") {
422                        let iq = Iq::try_from(elem).unwrap();
423                        let new_events = self.handle_iq(iq).await;
424                        events.extend(new_events);
425                    } else if elem.is("message", "jabber:client") {
426                        let message = Message::try_from(elem).unwrap();
427                        let new_events = self.handle_message(message).await;
428                        events.extend(new_events);
429                    } else if elem.is("presence", "jabber:client") {
430                        let presence = Presence::try_from(elem).unwrap();
431                        let new_events = self.handle_presence(presence).await;
432                        events.extend(new_events);
433                    } else if elem.is("error", "http://etherx.jabber.org/streams") {
434                        println!("Received a fatal stream error: {}", String::from(&elem));
435                    } else {
436                        panic!("Unknown stanza: {}", String::from(&elem));
437                    }
438                }
439            }
440
441            Some(events)
442        } else {
443            None
444        }
445    }
446
447    pub async fn upload_file_with(&mut self, service: &str, path: &Path) {
448        let name = path.file_name().unwrap().to_str().unwrap().to_string();
449        let file = File::open(path).await.unwrap();
450        let size = file.metadata().await.unwrap().len();
451        let slot_request = SlotRequest {
452            filename: name,
453            size: size,
454            content_type: None,
455        };
456        let to = service.parse::<Jid>().unwrap();
457        let request = Iq::from_get("upload1", slot_request).with_to(to.clone());
458        self.uploads
459            .push((String::from("upload1"), to, path.to_path_buf()));
460        self.client.send_stanza(request.into()).await.unwrap();
461    }
462}
463
464async fn handle_upload_result(
465    from: &Jid,
466    iqid: String,
467    elem: Element,
468    agent: &mut Agent,
469) -> impl IntoIterator<Item = Event> {
470    let mut res: Option<(usize, PathBuf)> = None;
471
472    for (i, (id, to, filepath)) in agent.uploads.iter().enumerate() {
473        if to == from && id == &iqid {
474            res = Some((i, filepath.to_path_buf()));
475            break;
476        }
477    }
478
479    if let Some((index, file)) = res {
480        agent.uploads.remove(index);
481        let slot = SlotResult::try_from(elem).unwrap();
482
483        let mut headers = ReqwestHeaderMap::new();
484        for header in slot.put.headers {
485            let (attr, val) = match header {
486                HttpUploadHeader::Authorization(val) => ("Authorization", val),
487                HttpUploadHeader::Cookie(val) => ("Cookie", val),
488                HttpUploadHeader::Expires(val) => ("Expires", val),
489            };
490            headers.insert(attr, val.parse().unwrap());
491        }
492
493        let web = ReqwestClient::new();
494        let stream = FramedRead::new(File::open(file).await.unwrap(), BytesCodec::new());
495        let body = ReqwestBody::wrap_stream(stream);
496        let res = web
497            .put(slot.put.url.as_str())
498            .headers(headers)
499            .body(body)
500            .send()
501            .await
502            .unwrap();
503        if res.status() == 201 {
504            return vec![Event::HttpUploadedFile(slot.get.url)];
505        }
506    }
507
508    return vec![];
509}
510
511#[cfg(test)]
512mod tests {
513    use super::{Agent, ClientBuilder, ClientFeature, ClientType, Event};
514    use tokio_xmpp::AsyncClient as TokioXmppClient;
515
516    #[tokio::test]
517    async fn test_simple() {
518        let client = TokioXmppClient::new("foo@bar", "meh").unwrap();
519
520        // Client instance
521        let client_builder = ClientBuilder::new("foo@bar", "meh")
522            .set_client(ClientType::Bot, "xmpp-rs")
523            .set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
524            .set_default_nick("bot")
525            .enable_feature(ClientFeature::Avatars)
526            .enable_feature(ClientFeature::ContactList);
527
528        let mut agent: Agent = client_builder.build_impl(client).unwrap();
529
530        while let Some(events) = agent.wait_for_events().await {
531            assert!(match events[0] {
532                Event::Disconnected => true,
533                _ => false,
534            });
535            assert_eq!(events.len(), 1);
536            break;
537        }
538    }
539}