lib.rs

  1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
  2//
  3// This Source Code Form is subject to the terms of the Mozilla Public
  4// License, v. 2.0. If a copy of the MPL was not distributed with this
  5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7#![deny(bare_trait_objects)]
  8
  9use futures::stream::StreamExt;
 10use reqwest::{
 11    header::HeaderMap as ReqwestHeaderMap, Body as ReqwestBody, Client as ReqwestClient,
 12};
 13use std::convert::TryFrom;
 14use std::path::{Path, PathBuf};
 15use std::sync::{Arc, RwLock};
 16use tokio::fs::File;
 17use tokio_util::codec::{BytesCodec, FramedRead};
 18pub use tokio_xmpp::parsers;
 19use tokio_xmpp::parsers::{
 20    bookmarks2::Conference,
 21    caps::{compute_disco, hash_caps, Caps},
 22    disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
 23    hashes::Algo,
 24    http_upload::{Header as HttpUploadHeader, SlotRequest, SlotResult},
 25    iq::{Iq, IqType},
 26    message::{Body, Message, MessageType},
 27    muc::{
 28        user::{MucUser, Status},
 29        Muc,
 30    },
 31    ns,
 32    presence::{Presence, Type as PresenceType},
 33    pubsub::pubsub::{Items, PubSub},
 34    roster::{Item as RosterItem, Roster},
 35    stanza_error::{DefinedCondition, ErrorType, StanzaError},
 36};
 37use tokio_xmpp::{AsyncClient as TokioXmppClient, Event as TokioXmppEvent};
 38pub use tokio_xmpp::{BareJid, Element, FullJid, Jid};
 39#[macro_use]
 40extern crate log;
 41
 42mod pubsub;
 43
 44pub type Error = tokio_xmpp::Error;
 45
 46#[derive(Debug)]
 47pub enum ClientType {
 48    Bot,
 49    Pc,
 50}
 51
 52impl Default for ClientType {
 53    fn default() -> Self {
 54        ClientType::Bot
 55    }
 56}
 57
 58impl ToString for ClientType {
 59    fn to_string(&self) -> String {
 60        String::from(match self {
 61            ClientType::Bot => "bot",
 62            ClientType::Pc => "pc",
 63        })
 64    }
 65}
 66
 67#[derive(PartialEq)]
 68pub enum ClientFeature {
 69    #[cfg(feature = "avatars")]
 70    Avatars,
 71    ContactList,
 72    JoinRooms,
 73}
 74
 75pub type Id = Option<String>;
 76pub type RoomNick = String;
 77
 78#[derive(Debug)]
 79pub enum Event {
 80    Online,
 81    Disconnected,
 82    ContactAdded(RosterItem),
 83    ContactRemoved(RosterItem),
 84    ContactChanged(RosterItem),
 85    #[cfg(feature = "avatars")]
 86    AvatarRetrieved(Jid, String),
 87    ChatMessage(Id, BareJid, Body),
 88    JoinRoom(BareJid, Conference),
 89    LeaveRoom(BareJid),
 90    LeaveAllRooms,
 91    RoomJoined(BareJid),
 92    RoomLeft(BareJid),
 93    RoomMessage(Id, BareJid, RoomNick, Body),
 94    /// A private message received from a room, containing the message ID, the room's BareJid,
 95    /// the sender's nickname, and the message body.
 96    RoomPrivateMessage(Id, BareJid, RoomNick, Body),
 97    ServiceMessage(Id, BareJid, Body),
 98    HttpUploadedFile(String),
 99}
100
101pub struct ClientBuilder<'a> {
102    jid: BareJid,
103    password: &'a str,
104    website: String,
105    default_nick: String,
106    lang: Vec<String>,
107    disco: (ClientType, String),
108    features: Vec<ClientFeature>,
109    resource: Option<String>,
110}
111
112impl ClientBuilder<'_> {
113    pub fn new<'a>(jid: BareJid, password: &'a str) -> ClientBuilder<'a> {
114        ClientBuilder {
115            jid,
116            password,
117            website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
118            default_nick: String::from("xmpp-rs"),
119            lang: vec![String::from("en")],
120            disco: (ClientType::default(), String::from("tokio-xmpp")),
121            features: vec![],
122            resource: None,
123        }
124    }
125
126    /// Optionally set a resource associated to this device on the client
127    pub fn set_resource(mut self, resource: &str) -> Self {
128        self.resource = Some(resource.to_string());
129        self
130    }
131
132    pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
133        self.disco = (type_, String::from(name));
134        self
135    }
136
137    pub fn set_website(mut self, url: &str) -> Self {
138        self.website = String::from(url);
139        self
140    }
141
142    pub fn set_default_nick(mut self, nick: &str) -> Self {
143        self.default_nick = String::from(nick);
144        self
145    }
146
147    pub fn set_lang(mut self, lang: Vec<String>) -> Self {
148        self.lang = lang;
149        self
150    }
151
152    pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
153        self.features.push(feature);
154        self
155    }
156
157    fn make_disco(&self) -> DiscoInfoResult {
158        let identities = vec![Identity::new(
159            "client",
160            self.disco.0.to_string(),
161            "en",
162            self.disco.1.to_string(),
163        )];
164        let mut features = vec![Feature::new(ns::DISCO_INFO)];
165        #[cfg(feature = "avatars")]
166        {
167            if self.features.contains(&ClientFeature::Avatars) {
168                features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
169            }
170        }
171        if self.features.contains(&ClientFeature::JoinRooms) {
172            features.push(Feature::new(format!("{}+notify", ns::BOOKMARKS2)));
173        }
174        DiscoInfoResult {
175            node: None,
176            identities,
177            features,
178            extensions: vec![],
179        }
180    }
181
182    pub fn build(self) -> Agent {
183        let jid: Jid = if let Some(resource) = &self.resource {
184            self.jid.with_resource_str(resource).unwrap().into()
185        } else {
186            self.jid.clone().into()
187        };
188
189        let client = TokioXmppClient::new(jid, self.password);
190        self.build_impl(client)
191    }
192
193    // This function is meant to be used for testing build
194    pub(crate) fn build_impl(self, client: TokioXmppClient) -> Agent {
195        let disco = self.make_disco();
196        let node = self.website;
197
198        Agent {
199            client,
200            default_nick: Arc::new(RwLock::new(self.default_nick)),
201            lang: Arc::new(self.lang),
202            disco,
203            node,
204            uploads: Vec::new(),
205        }
206    }
207}
208
209pub struct Agent {
210    client: TokioXmppClient,
211    default_nick: Arc<RwLock<String>>,
212    lang: Arc<Vec<String>>,
213    disco: DiscoInfoResult,
214    node: String,
215    uploads: Vec<(String, Jid, PathBuf)>,
216}
217
218impl Agent {
219    pub async fn disconnect(&mut self) -> Result<(), Error> {
220        self.client.send_end().await
221    }
222
223    pub async fn join_room(
224        &mut self,
225        room: BareJid,
226        nick: Option<String>,
227        password: Option<String>,
228        lang: &str,
229        status: &str,
230    ) {
231        let mut muc = Muc::new();
232        if let Some(password) = password {
233            muc = muc.with_password(password);
234        }
235
236        let nick = nick.unwrap_or_else(|| self.default_nick.read().unwrap().clone());
237        let room_jid = room.with_resource_str(&nick).unwrap();
238        let mut presence = Presence::new(PresenceType::None).with_to(room_jid);
239        presence.add_payload(muc);
240        presence.set_status(String::from(lang), String::from(status));
241        let _ = self.client.send_stanza(presence.into()).await;
242    }
243
244    /// Send a "leave room" request to the server (specifically, an "unavailable" presence stanza).
245    ///
246    /// The returned future will resolve when the request has been sent,
247    /// not when the room has actually been left.
248    ///
249    /// If successful, a `RoomLeft` event should be received later as a confirmation.
250    ///
251    /// See: https://xmpp.org/extensions/xep-0045.html#exit
252    ///
253    /// Note that this method does NOT remove the room from the auto-join list; the latter
254    /// is more a list of bookmarks that the account knows about and that have a flag set
255    /// to indicate that they should be joined automatically after connecting (see the JoinRoom event).
256    ///
257    /// Regarding the latter, see the these minutes about auto-join behavior:
258    /// https://docs.modernxmpp.org/meetings/2019-01-brussels/#bookmarks
259    ///
260    /// # Arguments
261    ///
262    /// * `room_jid`: The JID of the room to leave.
263    /// * `nickname`: The nickname to use in the room.
264    /// * `lang`: The language of the status message.
265    /// * `status`: The status message to send.
266    pub async fn leave_room(
267        &mut self,
268        room_jid: BareJid,
269        nickname: RoomNick,
270        lang: impl Into<String>,
271        status: impl Into<String>,
272    ) {
273        // XEP-0045 specifies that, to leave a room, the client must send a presence stanza
274        // with type="unavailable".
275        let mut presence = Presence::new(PresenceType::Unavailable).with_to(
276            room_jid
277                .with_resource_str(nickname.as_str())
278                .expect("Invalid room JID after adding resource part."),
279        );
280
281        // Optionally, the client may include a status message in the presence stanza.
282        // TODO: Should this be optional? The XEP says "MAY", but the method signature requires the arguments.
283        // XEP-0045: "The occupant MAY include normal <status/> information in the unavailable presence stanzas"
284        presence.set_status(lang, status);
285
286        // Send the presence stanza.
287        if let Err(e) = self.client.send_stanza(presence.into()).await {
288            // Report any errors to the log.
289            error!("Failed to send leave room presence: {}", e);
290        }
291    }
292
293    pub async fn send_message(
294        &mut self,
295        recipient: Jid,
296        type_: MessageType,
297        lang: &str,
298        text: &str,
299    ) {
300        let mut message = Message::new(Some(recipient));
301        message.type_ = type_;
302        message
303            .bodies
304            .insert(String::from(lang), Body(String::from(text)));
305        let _ = self.client.send_stanza(message.into()).await;
306    }
307
308    pub async fn send_room_private_message(
309        &mut self,
310        room: BareJid,
311        recipient: RoomNick,
312        lang: &str,
313        text: &str,
314    ) {
315        let recipient: Jid = room.with_resource_str(&recipient).unwrap().into();
316        let mut message = Message::new(recipient).with_payload(MucUser::new());
317        message.type_ = MessageType::Chat;
318        message
319            .bodies
320            .insert(String::from(lang), Body(String::from(text)));
321        let _ = self.client.send_stanza(message.into()).await;
322    }
323
324    fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
325        let caps_data = compute_disco(disco);
326        let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
327        let caps = Caps::new(node, hash);
328
329        let mut presence = Presence::new(PresenceType::None);
330        presence.add_payload(caps);
331        presence
332    }
333
334    async fn handle_iq(&mut self, iq: Iq) -> Vec<Event> {
335        let mut events = vec![];
336        let from = iq
337            .from
338            .clone()
339            .unwrap_or_else(|| self.client.bound_jid().unwrap().clone());
340        if let IqType::Get(payload) = iq.payload {
341            if payload.is("query", ns::DISCO_INFO) {
342                let query = DiscoInfoQuery::try_from(payload);
343                match query {
344                    Ok(query) => {
345                        let mut disco_info = self.disco.clone();
346                        disco_info.node = query.node;
347                        let iq = Iq::from_result(iq.id, Some(disco_info))
348                            .with_to(iq.from.unwrap())
349                            .into();
350                        let _ = self.client.send_stanza(iq).await;
351                    }
352                    Err(err) => {
353                        let error = StanzaError::new(
354                            ErrorType::Modify,
355                            DefinedCondition::BadRequest,
356                            "en",
357                            &format!("{}", err),
358                        );
359                        let iq = Iq::from_error(iq.id, error)
360                            .with_to(iq.from.unwrap())
361                            .into();
362                        let _ = self.client.send_stanza(iq).await;
363                    }
364                }
365            } else {
366                // We MUST answer unhandled get iqs with a service-unavailable error.
367                let error = StanzaError::new(
368                    ErrorType::Cancel,
369                    DefinedCondition::ServiceUnavailable,
370                    "en",
371                    "No handler defined for this kind of iq.",
372                );
373                let iq = Iq::from_error(iq.id, error)
374                    .with_to(iq.from.unwrap())
375                    .into();
376                let _ = self.client.send_stanza(iq).await;
377            }
378        } else if let IqType::Result(Some(payload)) = iq.payload {
379            // TODO: move private iqs like this one somewhere else, for
380            // security reasons.
381            if payload.is("query", ns::ROSTER) && Some(from.clone()) == iq.from {
382                let roster = Roster::try_from(payload).unwrap();
383                for item in roster.items.into_iter() {
384                    events.push(Event::ContactAdded(item));
385                }
386            } else if payload.is("pubsub", ns::PUBSUB) {
387                let new_events = pubsub::handle_iq_result(&from, payload);
388                events.extend(new_events);
389            } else if payload.is("slot", ns::HTTP_UPLOAD) {
390                let new_events = handle_upload_result(&from, iq.id, payload, self).await;
391                events.extend(new_events);
392            }
393        } else if let IqType::Set(_) = iq.payload {
394            // We MUST answer unhandled set iqs with a service-unavailable error.
395            let error = StanzaError::new(
396                ErrorType::Cancel,
397                DefinedCondition::ServiceUnavailable,
398                "en",
399                "No handler defined for this kind of iq.",
400            );
401            let iq = Iq::from_error(iq.id, error)
402                .with_to(iq.from.unwrap())
403                .into();
404            let _ = self.client.send_stanza(iq).await;
405        }
406
407        events
408    }
409
410    async fn handle_message(&mut self, message: Message) -> Vec<Event> {
411        let mut events = vec![];
412        let from = message.from.clone().unwrap();
413        let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect();
414        match message.get_best_body(langs) {
415            Some((_lang, body)) => match message.type_ {
416                MessageType::Groupchat => {
417                    let event = match from.clone() {
418                        Jid::Full(full) => Event::RoomMessage(
419                            message.id.clone(),
420                            from.to_bare(),
421                            full.resource_str().to_owned(),
422                            body.clone(),
423                        ),
424                        Jid::Bare(bare) => {
425                            Event::ServiceMessage(message.id.clone(), bare, body.clone())
426                        }
427                    };
428                    events.push(event)
429                }
430                MessageType::Chat | MessageType::Normal => {
431                    let mut found_special_message = false;
432
433                    for payload in &message.payloads {
434                        if let Ok(_) = MucUser::try_from(payload.clone()) {
435                            let event = match from.clone() {
436                                Jid::Bare(bare) => {
437                                    // TODO: Can a service message be of type Chat/Normal and not Groupchat?
438                                    warn!("Received misformed MessageType::Chat in muc#user namespace from a bare JID.");
439                                    Event::ServiceMessage(message.id.clone(), bare, body.clone())
440                                }
441                                Jid::Full(full) => Event::RoomPrivateMessage(
442                                    message.id.clone(),
443                                    full.to_bare(),
444                                    full.resource_str().to_owned(),
445                                    body.clone(),
446                                ),
447                            };
448
449                            found_special_message = true;
450                            events.push(event);
451                        }
452                    }
453
454                    if !found_special_message {
455                        let event =
456                            Event::ChatMessage(message.id.clone(), from.to_bare(), body.clone());
457                        events.push(event)
458                    }
459                }
460                _ => (),
461            },
462            None => (),
463        }
464        for child in message.payloads {
465            if child.is("event", ns::PUBSUB_EVENT) {
466                let new_events = pubsub::handle_event(&from, child, self).await;
467                events.extend(new_events);
468            }
469        }
470
471        events
472    }
473
474    /// Translate a `Presence` stanza into a list of higher-level `Event`s.
475    async fn handle_presence(&mut self, presence: Presence) -> Vec<Event> {
476        // Allocate an empty vector to store the events.
477        let mut events = vec![];
478
479        // Extract the JID of the sender (i.e. the one whose presence is being sent).
480        let from = presence.from.unwrap().to_bare();
481
482        // Search through the payloads for a MUC user status.
483
484        if let Some(muc) = presence
485            .payloads
486            .iter()
487            .filter_map(|p| MucUser::try_from(p.clone()).ok())
488            .next()
489        {
490            // If a MUC user status was found, search through the statuses for a self-presence.
491            if muc.status.iter().any(|s| *s == Status::SelfPresence) {
492                // If a self-presence was found, then the stanza is about the client's own presence.
493
494                match presence.type_ {
495                    PresenceType::None => {
496                        // According to https://xmpp.org/extensions/xep-0045.html#enter-pres, no type should be seen as "available".
497                        events.push(Event::RoomJoined(from.clone()));
498                    }
499                    PresenceType::Unavailable => {
500                        // According to https://xmpp.org/extensions/xep-0045.html#exit, the server will use type "unavailable" to notify the client that it has left the room/
501                        events.push(Event::RoomLeft(from.clone()));
502                    }
503                    _ => unimplemented!("Presence type {:?}", presence.type_), // TODO: What to do here?
504                }
505            }
506        }
507
508        // Return the list of events.
509        events
510    }
511
512    /// Wait for new events.
513    ///
514    /// # Returns
515    ///
516    /// - `Some(events)` if there are new events; multiple may be returned at once.
517    /// - `None` if the underlying stream is closed.
518    pub async fn wait_for_events(&mut self) -> Option<Vec<Event>> {
519        if let Some(event) = self.client.next().await {
520            let mut events = Vec::new();
521
522            match event {
523                TokioXmppEvent::Online { resumed: false, .. } => {
524                    let presence = Self::make_initial_presence(&self.disco, &self.node).into();
525                    let _ = self.client.send_stanza(presence).await;
526                    events.push(Event::Online);
527                    // TODO: only send this when the ContactList feature is enabled.
528                    let iq = Iq::from_get(
529                        "roster",
530                        Roster {
531                            ver: None,
532                            items: vec![],
533                        },
534                    )
535                    .into();
536                    let _ = self.client.send_stanza(iq).await;
537                    // TODO: only send this when the JoinRooms feature is enabled.
538                    let iq =
539                        Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2))).into();
540                    let _ = self.client.send_stanza(iq).await;
541                }
542                TokioXmppEvent::Online { resumed: true, .. } => {}
543                TokioXmppEvent::Disconnected(_) => {
544                    events.push(Event::Disconnected);
545                }
546                TokioXmppEvent::Stanza(elem) => {
547                    if elem.is("iq", "jabber:client") {
548                        let iq = Iq::try_from(elem).unwrap();
549                        let new_events = self.handle_iq(iq).await;
550                        events.extend(new_events);
551                    } else if elem.is("message", "jabber:client") {
552                        let message = Message::try_from(elem).unwrap();
553                        let new_events = self.handle_message(message).await;
554                        events.extend(new_events);
555                    } else if elem.is("presence", "jabber:client") {
556                        let presence = Presence::try_from(elem).unwrap();
557                        let new_events = self.handle_presence(presence).await;
558                        events.extend(new_events);
559                    } else if elem.is("error", "http://etherx.jabber.org/streams") {
560                        println!("Received a fatal stream error: {}", String::from(&elem));
561                    } else {
562                        panic!("Unknown stanza: {}", String::from(&elem));
563                    }
564                }
565            }
566
567            Some(events)
568        } else {
569            None
570        }
571    }
572
573    pub async fn upload_file_with(&mut self, service: &str, path: &Path) {
574        let name = path.file_name().unwrap().to_str().unwrap().to_string();
575        let file = File::open(path).await.unwrap();
576        let size = file.metadata().await.unwrap().len();
577        let slot_request = SlotRequest {
578            filename: name,
579            size: size,
580            content_type: None,
581        };
582        let to = service.parse::<Jid>().unwrap();
583        let request = Iq::from_get("upload1", slot_request).with_to(to.clone());
584        self.uploads
585            .push((String::from("upload1"), to, path.to_path_buf()));
586        self.client.send_stanza(request.into()).await.unwrap();
587    }
588}
589
590async fn handle_upload_result(
591    from: &Jid,
592    iqid: String,
593    elem: Element,
594    agent: &mut Agent,
595) -> impl IntoIterator<Item = Event> {
596    let mut res: Option<(usize, PathBuf)> = None;
597
598    for (i, (id, to, filepath)) in agent.uploads.iter().enumerate() {
599        if to == from && id == &iqid {
600            res = Some((i, filepath.to_path_buf()));
601            break;
602        }
603    }
604
605    if let Some((index, file)) = res {
606        agent.uploads.remove(index);
607        let slot = SlotResult::try_from(elem).unwrap();
608
609        let mut headers = ReqwestHeaderMap::new();
610        for header in slot.put.headers {
611            let (attr, val) = match header {
612                HttpUploadHeader::Authorization(val) => ("Authorization", val),
613                HttpUploadHeader::Cookie(val) => ("Cookie", val),
614                HttpUploadHeader::Expires(val) => ("Expires", val),
615            };
616            headers.insert(attr, val.parse().unwrap());
617        }
618
619        let web = ReqwestClient::new();
620        let stream = FramedRead::new(File::open(file).await.unwrap(), BytesCodec::new());
621        let body = ReqwestBody::wrap_stream(stream);
622        let res = web
623            .put(slot.put.url.as_str())
624            .headers(headers)
625            .body(body)
626            .send()
627            .await
628            .unwrap();
629        if res.status() == 201 {
630            return vec![Event::HttpUploadedFile(slot.get.url)];
631        }
632    }
633
634    return vec![];
635}
636
637#[cfg(test)]
638mod tests {
639    use super::{Agent, BareJid, ClientBuilder, ClientFeature, ClientType, Event};
640    use std::str::FromStr;
641    use tokio_xmpp::AsyncClient as TokioXmppClient;
642
643    #[tokio::test]
644    async fn test_simple() {
645        let jid = BareJid::from_str("foo@bar").unwrap();
646
647        let client = TokioXmppClient::new(jid.clone(), "meh");
648
649        // Client instance
650        let client_builder = ClientBuilder::new(jid, "meh")
651            .set_client(ClientType::Bot, "xmpp-rs")
652            .set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
653            .set_default_nick("bot")
654            .enable_feature(ClientFeature::ContactList);
655
656        #[cfg(feature = "avatars")]
657        let client_builder = client_builder.enable_feature(ClientFeature::Avatars);
658
659        let mut agent: Agent = client_builder.build_impl(client);
660
661        while let Some(events) = agent.wait_for_events().await {
662            assert!(match events[0] {
663                Event::Disconnected => true,
664                _ => false,
665            });
666            assert_eq!(events.len(), 1);
667            break;
668        }
669    }
670}