lib.rs

  1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
  2//
  3// This Source Code Form is subject to the terms of the Mozilla Public
  4// License, v. 2.0. If a copy of the MPL was not distributed with this
  5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7#![deny(bare_trait_objects)]
  8
  9use futures::stream::StreamExt;
 10use reqwest::{
 11    header::HeaderMap as ReqwestHeaderMap, Body as ReqwestBody, Client as ReqwestClient,
 12};
 13use std::convert::TryFrom;
 14use std::path::{Path, PathBuf};
 15use std::sync::{Arc, RwLock};
 16use tokio::fs::File;
 17use tokio_util::codec::{BytesCodec, FramedRead};
 18pub use tokio_xmpp::parsers;
 19use tokio_xmpp::parsers::{
 20    bookmarks2::Conference,
 21    caps::{compute_disco, hash_caps, Caps},
 22    disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
 23    hashes::Algo,
 24    http_upload::{Header as HttpUploadHeader, SlotRequest, SlotResult},
 25    iq::{Iq, IqType},
 26    message::{Body, Message, MessageType},
 27    muc::{
 28        user::{MucUser, Status},
 29        Muc,
 30    },
 31    ns,
 32    presence::{Presence, Type as PresenceType},
 33    pubsub::pubsub::{Items, PubSub},
 34    roster::{Item as RosterItem, Roster},
 35    stanza_error::{DefinedCondition, ErrorType, StanzaError},
 36};
 37use tokio_xmpp::{AsyncClient as TokioXmppClient, Event as TokioXmppEvent};
 38pub use tokio_xmpp::{BareJid, Element, FullJid, Jid};
 39#[macro_use]
 40extern crate log;
 41
 42mod pubsub;
 43
 44pub type Error = tokio_xmpp::Error;
 45
 46#[derive(Debug)]
 47pub enum ClientType {
 48    Bot,
 49    Pc,
 50}
 51
 52impl Default for ClientType {
 53    fn default() -> Self {
 54        ClientType::Bot
 55    }
 56}
 57
 58impl ToString for ClientType {
 59    fn to_string(&self) -> String {
 60        String::from(match self {
 61            ClientType::Bot => "bot",
 62            ClientType::Pc => "pc",
 63        })
 64    }
 65}
 66
 67#[derive(PartialEq)]
 68pub enum ClientFeature {
 69    #[cfg(feature = "avatars")]
 70    Avatars,
 71    ContactList,
 72    JoinRooms,
 73}
 74
 75pub type Id = Option<String>;
 76pub type RoomNick = String;
 77
 78#[derive(Debug)]
 79pub enum Event {
 80    Online,
 81    Disconnected,
 82    ContactAdded(RosterItem),
 83    ContactRemoved(RosterItem),
 84    ContactChanged(RosterItem),
 85    #[cfg(feature = "avatars")]
 86    AvatarRetrieved(Jid, String),
 87    ChatMessage(Id, BareJid, Body),
 88    JoinRoom(BareJid, Conference),
 89    LeaveRoom(BareJid),
 90    LeaveAllRooms,
 91    RoomJoined(BareJid),
 92    RoomLeft(BareJid),
 93    RoomMessage(Id, BareJid, RoomNick, Body),
 94    /// A private message received from a room, containing the message ID, the room's BareJid,
 95    /// the sender's nickname, and the message body.
 96    RoomPrivateMessage(Id, BareJid, RoomNick, Body),
 97    ServiceMessage(Id, BareJid, Body),
 98    HttpUploadedFile(String),
 99}
100
101pub struct ClientBuilder<'a> {
102    jid: BareJid,
103    password: &'a str,
104    website: String,
105    default_nick: String,
106    lang: Vec<String>,
107    disco: (ClientType, String),
108    features: Vec<ClientFeature>,
109    resource: Option<String>,
110}
111
112impl ClientBuilder<'_> {
113    pub fn new<'a>(jid: BareJid, password: &'a str) -> ClientBuilder<'a> {
114        ClientBuilder {
115            jid,
116            password,
117            website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
118            default_nick: String::from("xmpp-rs"),
119            lang: vec![String::from("en")],
120            disco: (ClientType::default(), String::from("tokio-xmpp")),
121            features: vec![],
122            resource: None,
123        }
124    }
125
126    /// Optionally set a resource associated to this device on the client
127    pub fn set_resource(mut self, resource: &str) -> Self {
128        self.resource = Some(resource.to_string());
129        self
130    }
131
132    pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
133        self.disco = (type_, String::from(name));
134        self
135    }
136
137    pub fn set_website(mut self, url: &str) -> Self {
138        self.website = String::from(url);
139        self
140    }
141
142    pub fn set_default_nick(mut self, nick: &str) -> Self {
143        self.default_nick = String::from(nick);
144        self
145    }
146
147    pub fn set_lang(mut self, lang: Vec<String>) -> Self {
148        self.lang = lang;
149        self
150    }
151
152    pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
153        self.features.push(feature);
154        self
155    }
156
157    fn make_disco(&self) -> DiscoInfoResult {
158        let identities = vec![Identity::new(
159            "client",
160            self.disco.0.to_string(),
161            "en",
162            self.disco.1.to_string(),
163        )];
164        let mut features = vec![Feature::new(ns::DISCO_INFO)];
165        #[cfg(feature = "avatars")]
166        {
167            if self.features.contains(&ClientFeature::Avatars) {
168                features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
169            }
170        }
171        if self.features.contains(&ClientFeature::JoinRooms) {
172            features.push(Feature::new(format!("{}+notify", ns::BOOKMARKS2)));
173        }
174        DiscoInfoResult {
175            node: None,
176            identities,
177            features,
178            extensions: vec![],
179        }
180    }
181
182    pub fn build(self) -> Agent {
183        let jid: Jid = if let Some(resource) = &self.resource {
184            self.jid.with_resource_str(resource).unwrap().into()
185        } else {
186            self.jid.clone().into()
187        };
188
189        let client = TokioXmppClient::new(jid, self.password);
190        self.build_impl(client)
191    }
192
193    // This function is meant to be used for testing build
194    pub(crate) fn build_impl(self, client: TokioXmppClient) -> Agent {
195        let disco = self.make_disco();
196        let node = self.website;
197
198        Agent {
199            client,
200            default_nick: Arc::new(RwLock::new(self.default_nick)),
201            lang: Arc::new(self.lang),
202            disco,
203            node,
204            uploads: Vec::new(),
205        }
206    }
207}
208
209pub struct Agent {
210    client: TokioXmppClient,
211    default_nick: Arc<RwLock<String>>,
212    lang: Arc<Vec<String>>,
213    disco: DiscoInfoResult,
214    node: String,
215    uploads: Vec<(String, Jid, PathBuf)>,
216}
217
218impl Agent {
219    pub async fn disconnect(&mut self) -> Result<(), Error> {
220        self.client.send_end().await
221    }
222
223    pub async fn join_room(
224        &mut self,
225        room: BareJid,
226        nick: Option<String>,
227        password: Option<String>,
228        lang: &str,
229        status: &str,
230    ) {
231        let mut muc = Muc::new();
232        if let Some(password) = password {
233            muc = muc.with_password(password);
234        }
235
236        let nick = nick.unwrap_or_else(|| self.default_nick.read().unwrap().clone());
237        let room_jid = room.with_resource_str(&nick).unwrap();
238        let mut presence = Presence::new(PresenceType::None).with_to(room_jid);
239        presence.add_payload(muc);
240        presence.set_status(String::from(lang), String::from(status));
241        let _ = self.client.send_stanza(presence.into()).await;
242    }
243
244    pub async fn send_message(
245        &mut self,
246        recipient: Jid,
247        type_: MessageType,
248        lang: &str,
249        text: &str,
250    ) {
251        let mut message = Message::new(Some(recipient));
252        message.type_ = type_;
253        message
254            .bodies
255            .insert(String::from(lang), Body(String::from(text)));
256        let _ = self.client.send_stanza(message.into()).await;
257    }
258
259    pub async fn send_room_private_message(
260        &mut self,
261        room: BareJid,
262        recipient: RoomNick,
263        lang: &str,
264        text: &str,
265    ) {
266        let recipient: Jid = room.with_resource_str(&recipient).unwrap().into();
267        let mut message = Message::new(recipient).with_payload(MucUser::new());
268        message.type_ = MessageType::Chat;
269        message
270            .bodies
271            .insert(String::from(lang), Body(String::from(text)));
272        let _ = self.client.send_stanza(message.into()).await;
273    }
274
275    fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
276        let caps_data = compute_disco(disco);
277        let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
278        let caps = Caps::new(node, hash);
279
280        let mut presence = Presence::new(PresenceType::None);
281        presence.add_payload(caps);
282        presence
283    }
284
285    async fn handle_iq(&mut self, iq: Iq) -> Vec<Event> {
286        let mut events = vec![];
287        let from = iq
288            .from
289            .clone()
290            .unwrap_or_else(|| self.client.bound_jid().unwrap().clone());
291        if let IqType::Get(payload) = iq.payload {
292            if payload.is("query", ns::DISCO_INFO) {
293                let query = DiscoInfoQuery::try_from(payload);
294                match query {
295                    Ok(query) => {
296                        let mut disco_info = self.disco.clone();
297                        disco_info.node = query.node;
298                        let iq = Iq::from_result(iq.id, Some(disco_info))
299                            .with_to(iq.from.unwrap())
300                            .into();
301                        let _ = self.client.send_stanza(iq).await;
302                    }
303                    Err(err) => {
304                        let error = StanzaError::new(
305                            ErrorType::Modify,
306                            DefinedCondition::BadRequest,
307                            "en",
308                            &format!("{}", err),
309                        );
310                        let iq = Iq::from_error(iq.id, error)
311                            .with_to(iq.from.unwrap())
312                            .into();
313                        let _ = self.client.send_stanza(iq).await;
314                    }
315                }
316            } else {
317                // We MUST answer unhandled get iqs with a service-unavailable error.
318                let error = StanzaError::new(
319                    ErrorType::Cancel,
320                    DefinedCondition::ServiceUnavailable,
321                    "en",
322                    "No handler defined for this kind of iq.",
323                );
324                let iq = Iq::from_error(iq.id, error)
325                    .with_to(iq.from.unwrap())
326                    .into();
327                let _ = self.client.send_stanza(iq).await;
328            }
329        } else if let IqType::Result(Some(payload)) = iq.payload {
330            // TODO: move private iqs like this one somewhere else, for
331            // security reasons.
332            if payload.is("query", ns::ROSTER) && Some(from.clone()) == iq.from {
333                let roster = Roster::try_from(payload).unwrap();
334                for item in roster.items.into_iter() {
335                    events.push(Event::ContactAdded(item));
336                }
337            } else if payload.is("pubsub", ns::PUBSUB) {
338                let new_events = pubsub::handle_iq_result(&from, payload);
339                events.extend(new_events);
340            } else if payload.is("slot", ns::HTTP_UPLOAD) {
341                let new_events = handle_upload_result(&from, iq.id, payload, self).await;
342                events.extend(new_events);
343            }
344        } else if let IqType::Set(_) = iq.payload {
345            // We MUST answer unhandled set iqs with a service-unavailable error.
346            let error = StanzaError::new(
347                ErrorType::Cancel,
348                DefinedCondition::ServiceUnavailable,
349                "en",
350                "No handler defined for this kind of iq.",
351            );
352            let iq = Iq::from_error(iq.id, error)
353                .with_to(iq.from.unwrap())
354                .into();
355            let _ = self.client.send_stanza(iq).await;
356        }
357
358        events
359    }
360
361    async fn handle_message(&mut self, message: Message) -> Vec<Event> {
362        let mut events = vec![];
363        let from = message.from.clone().unwrap();
364        let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect();
365        match message.get_best_body(langs) {
366            Some((_lang, body)) => match message.type_ {
367                MessageType::Groupchat => {
368                    let event = match from.clone() {
369                        Jid::Full(full) => Event::RoomMessage(
370                            message.id.clone(),
371                            from.to_bare(),
372                            full.resource_str().to_owned(),
373                            body.clone(),
374                        ),
375                        Jid::Bare(bare) => {
376                            Event::ServiceMessage(message.id.clone(), bare, body.clone())
377                        }
378                    };
379                    events.push(event)
380                }
381                MessageType::Chat | MessageType::Normal => {
382                    let mut found_special_message = false;
383
384                    for payload in &message.payloads {
385                        if let Ok(_) = MucUser::try_from(payload.clone()) {
386                            let event = match from.clone() {
387                                Jid::Bare(bare) => {
388                                    // TODO: Can a service message be of type Chat/Normal and not Groupchat?
389                                    warn!("Received misformed MessageType::Chat in muc#user namespace from a bare JID.");
390                                    Event::ServiceMessage(message.id.clone(), bare, body.clone())
391                                }
392                                Jid::Full(full) => Event::RoomPrivateMessage(
393                                    message.id.clone(),
394                                    full.to_bare(),
395                                    full.resource_str().to_owned(),
396                                    body.clone(),
397                                ),
398                            };
399
400                            found_special_message = true;
401                            events.push(event);
402                        }
403                    }
404
405                    if !found_special_message {
406                        let event =
407                            Event::ChatMessage(message.id.clone(), from.to_bare(), body.clone());
408                        events.push(event)
409                    }
410                }
411                _ => (),
412            },
413            None => (),
414        }
415        for child in message.payloads {
416            if child.is("event", ns::PUBSUB_EVENT) {
417                let new_events = pubsub::handle_event(&from, child, self).await;
418                events.extend(new_events);
419            }
420        }
421
422        events
423    }
424
425    async fn handle_presence(&mut self, presence: Presence) -> Vec<Event> {
426        let mut events = vec![];
427        let from = presence.from.unwrap().to_bare();
428        for payload in presence.payloads.into_iter() {
429            let muc_user = match MucUser::try_from(payload) {
430                Ok(muc_user) => muc_user,
431                _ => continue,
432            };
433            for status in muc_user.status.into_iter() {
434                if status == Status::SelfPresence {
435                    events.push(Event::RoomJoined(from.clone()));
436                    break;
437                }
438            }
439        }
440
441        events
442    }
443
444    pub async fn wait_for_events(&mut self) -> Option<Vec<Event>> {
445        if let Some(event) = self.client.next().await {
446            let mut events = Vec::new();
447
448            match event {
449                TokioXmppEvent::Online { resumed: false, .. } => {
450                    let presence = Self::make_initial_presence(&self.disco, &self.node).into();
451                    let _ = self.client.send_stanza(presence).await;
452                    events.push(Event::Online);
453                    // TODO: only send this when the ContactList feature is enabled.
454                    let iq = Iq::from_get(
455                        "roster",
456                        Roster {
457                            ver: None,
458                            items: vec![],
459                        },
460                    )
461                    .into();
462                    let _ = self.client.send_stanza(iq).await;
463                    // TODO: only send this when the JoinRooms feature is enabled.
464                    let iq =
465                        Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2))).into();
466                    let _ = self.client.send_stanza(iq).await;
467                }
468                TokioXmppEvent::Online { resumed: true, .. } => {}
469                TokioXmppEvent::Disconnected(_) => {
470                    events.push(Event::Disconnected);
471                }
472                TokioXmppEvent::Stanza(elem) => {
473                    if elem.is("iq", "jabber:client") {
474                        let iq = Iq::try_from(elem).unwrap();
475                        let new_events = self.handle_iq(iq).await;
476                        events.extend(new_events);
477                    } else if elem.is("message", "jabber:client") {
478                        let message = Message::try_from(elem).unwrap();
479                        let new_events = self.handle_message(message).await;
480                        events.extend(new_events);
481                    } else if elem.is("presence", "jabber:client") {
482                        let presence = Presence::try_from(elem).unwrap();
483                        let new_events = self.handle_presence(presence).await;
484                        events.extend(new_events);
485                    } else if elem.is("error", "http://etherx.jabber.org/streams") {
486                        println!("Received a fatal stream error: {}", String::from(&elem));
487                    } else {
488                        panic!("Unknown stanza: {}", String::from(&elem));
489                    }
490                }
491            }
492
493            Some(events)
494        } else {
495            None
496        }
497    }
498
499    pub async fn upload_file_with(&mut self, service: &str, path: &Path) {
500        let name = path.file_name().unwrap().to_str().unwrap().to_string();
501        let file = File::open(path).await.unwrap();
502        let size = file.metadata().await.unwrap().len();
503        let slot_request = SlotRequest {
504            filename: name,
505            size: size,
506            content_type: None,
507        };
508        let to = service.parse::<Jid>().unwrap();
509        let request = Iq::from_get("upload1", slot_request).with_to(to.clone());
510        self.uploads
511            .push((String::from("upload1"), to, path.to_path_buf()));
512        self.client.send_stanza(request.into()).await.unwrap();
513    }
514}
515
516async fn handle_upload_result(
517    from: &Jid,
518    iqid: String,
519    elem: Element,
520    agent: &mut Agent,
521) -> impl IntoIterator<Item = Event> {
522    let mut res: Option<(usize, PathBuf)> = None;
523
524    for (i, (id, to, filepath)) in agent.uploads.iter().enumerate() {
525        if to == from && id == &iqid {
526            res = Some((i, filepath.to_path_buf()));
527            break;
528        }
529    }
530
531    if let Some((index, file)) = res {
532        agent.uploads.remove(index);
533        let slot = SlotResult::try_from(elem).unwrap();
534
535        let mut headers = ReqwestHeaderMap::new();
536        for header in slot.put.headers {
537            let (attr, val) = match header {
538                HttpUploadHeader::Authorization(val) => ("Authorization", val),
539                HttpUploadHeader::Cookie(val) => ("Cookie", val),
540                HttpUploadHeader::Expires(val) => ("Expires", val),
541            };
542            headers.insert(attr, val.parse().unwrap());
543        }
544
545        let web = ReqwestClient::new();
546        let stream = FramedRead::new(File::open(file).await.unwrap(), BytesCodec::new());
547        let body = ReqwestBody::wrap_stream(stream);
548        let res = web
549            .put(slot.put.url.as_str())
550            .headers(headers)
551            .body(body)
552            .send()
553            .await
554            .unwrap();
555        if res.status() == 201 {
556            return vec![Event::HttpUploadedFile(slot.get.url)];
557        }
558    }
559
560    return vec![];
561}
562
563#[cfg(test)]
564mod tests {
565    use super::{Agent, BareJid, ClientBuilder, ClientFeature, ClientType, Event};
566    use std::str::FromStr;
567    use tokio_xmpp::AsyncClient as TokioXmppClient;
568
569    #[tokio::test]
570    async fn test_simple() {
571        let jid = BareJid::from_str("foo@bar").unwrap();
572
573        let client = TokioXmppClient::new(jid.clone(), "meh");
574
575        // Client instance
576        let client_builder = ClientBuilder::new(jid, "meh")
577            .set_client(ClientType::Bot, "xmpp-rs")
578            .set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
579            .set_default_nick("bot")
580            .enable_feature(ClientFeature::ContactList);
581
582        #[cfg(feature = "avatars")]
583        let client_builder = client_builder.enable_feature(ClientFeature::Avatars);
584
585        let mut agent: Agent = client_builder.build_impl(client);
586
587        while let Some(events) = agent.wait_for_events().await {
588            assert!(match events[0] {
589                Event::Disconnected => true,
590                _ => false,
591            });
592            assert_eq!(events.len(), 1);
593            break;
594        }
595    }
596}