lib.rs

  1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
  2//
  3// This Source Code Form is subject to the terms of the Mozilla Public
  4// License, v. 2.0. If a copy of the MPL was not distributed with this
  5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7#![deny(bare_trait_objects)]
  8
  9use futures::stream::StreamExt;
 10use reqwest::{
 11    header::HeaderMap as ReqwestHeaderMap, Body as ReqwestBody, Client as ReqwestClient,
 12};
 13use std::path::{Path, PathBuf};
 14use std::sync::{Arc, RwLock};
 15use tokio::fs::File;
 16use tokio_util::codec::{BytesCodec, FramedRead};
 17pub use tokio_xmpp::parsers;
 18use tokio_xmpp::parsers::{
 19    bookmarks2::Conference,
 20    caps::{compute_disco, hash_caps, Caps},
 21    disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
 22    hashes::Algo,
 23    http_upload::{Header as HttpUploadHeader, SlotRequest, SlotResult},
 24    iq::{Iq, IqType},
 25    message::{Body, Message, MessageType},
 26    muc::{
 27        user::{MucUser, Status},
 28        Muc,
 29    },
 30    ns,
 31    presence::{Presence, Type as PresenceType},
 32    pubsub::pubsub::{Items, PubSub},
 33    roster::{Item as RosterItem, Roster},
 34    stanza_error::{DefinedCondition, ErrorType, StanzaError},
 35};
 36use tokio_xmpp::{AsyncClient as TokioXmppClient, Event as TokioXmppEvent};
 37pub use tokio_xmpp::{BareJid, Element, FullJid, Jid};
 38#[macro_use]
 39extern crate log;
 40
 41mod pubsub;
 42
 43pub type Error = tokio_xmpp::Error;
 44
 45#[derive(Debug)]
 46pub enum ClientType {
 47    Bot,
 48    Pc,
 49}
 50
 51impl Default for ClientType {
 52    fn default() -> Self {
 53        ClientType::Bot
 54    }
 55}
 56
 57impl ToString for ClientType {
 58    fn to_string(&self) -> String {
 59        String::from(match self {
 60            ClientType::Bot => "bot",
 61            ClientType::Pc => "pc",
 62        })
 63    }
 64}
 65
 66#[derive(PartialEq)]
 67pub enum ClientFeature {
 68    #[cfg(feature = "avatars")]
 69    Avatars,
 70    ContactList,
 71    JoinRooms,
 72}
 73
 74pub type Id = Option<String>;
 75pub type RoomNick = String;
 76
 77#[derive(Debug)]
 78pub enum Event {
 79    Online,
 80    Disconnected(Error),
 81    ContactAdded(RosterItem),
 82    ContactRemoved(RosterItem),
 83    ContactChanged(RosterItem),
 84    #[cfg(feature = "avatars")]
 85    AvatarRetrieved(Jid, String),
 86    ChatMessage(Id, BareJid, Body),
 87    JoinRoom(BareJid, Conference),
 88    LeaveRoom(BareJid),
 89    LeaveAllRooms,
 90    RoomJoined(BareJid),
 91    RoomLeft(BareJid),
 92    RoomMessage(Id, BareJid, RoomNick, Body),
 93    /// A private message received from a room, containing the message ID, the room's BareJid,
 94    /// the sender's nickname, and the message body.
 95    RoomPrivateMessage(Id, BareJid, RoomNick, Body),
 96    ServiceMessage(Id, BareJid, Body),
 97    HttpUploadedFile(String),
 98}
 99
100pub struct ClientBuilder<'a> {
101    jid: BareJid,
102    password: &'a str,
103    website: String,
104    default_nick: String,
105    lang: Vec<String>,
106    disco: (ClientType, String),
107    features: Vec<ClientFeature>,
108    resource: Option<String>,
109}
110
111impl ClientBuilder<'_> {
112    pub fn new<'a>(jid: BareJid, password: &'a str) -> ClientBuilder<'a> {
113        ClientBuilder {
114            jid,
115            password,
116            website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
117            default_nick: String::from("xmpp-rs"),
118            lang: vec![String::from("en")],
119            disco: (ClientType::default(), String::from("tokio-xmpp")),
120            features: vec![],
121            resource: None,
122        }
123    }
124
125    /// Optionally set a resource associated to this device on the client
126    pub fn set_resource(mut self, resource: &str) -> Self {
127        self.resource = Some(resource.to_string());
128        self
129    }
130
131    pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
132        self.disco = (type_, String::from(name));
133        self
134    }
135
136    pub fn set_website(mut self, url: &str) -> Self {
137        self.website = String::from(url);
138        self
139    }
140
141    pub fn set_default_nick(mut self, nick: &str) -> Self {
142        self.default_nick = String::from(nick);
143        self
144    }
145
146    pub fn set_lang(mut self, lang: Vec<String>) -> Self {
147        self.lang = lang;
148        self
149    }
150
151    pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
152        self.features.push(feature);
153        self
154    }
155
156    fn make_disco(&self) -> DiscoInfoResult {
157        let identities = vec![Identity::new(
158            "client",
159            self.disco.0.to_string(),
160            "en",
161            self.disco.1.to_string(),
162        )];
163        let mut features = vec![Feature::new(ns::DISCO_INFO)];
164        #[cfg(feature = "avatars")]
165        {
166            if self.features.contains(&ClientFeature::Avatars) {
167                features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
168            }
169        }
170        if self.features.contains(&ClientFeature::JoinRooms) {
171            features.push(Feature::new(format!("{}+notify", ns::BOOKMARKS2)));
172        }
173        DiscoInfoResult {
174            node: None,
175            identities,
176            features,
177            extensions: vec![],
178        }
179    }
180
181    pub fn build(self) -> Agent {
182        let jid: Jid = if let Some(resource) = &self.resource {
183            self.jid.with_resource_str(resource).unwrap().into()
184        } else {
185            self.jid.clone().into()
186        };
187
188        let client = TokioXmppClient::new(jid, self.password);
189        self.build_impl(client)
190    }
191
192    // This function is meant to be used for testing build
193    pub(crate) fn build_impl(self, client: TokioXmppClient) -> Agent {
194        let disco = self.make_disco();
195        let node = self.website;
196
197        Agent {
198            client,
199            default_nick: Arc::new(RwLock::new(self.default_nick)),
200            lang: Arc::new(self.lang),
201            disco,
202            node,
203            uploads: Vec::new(),
204        }
205    }
206}
207
208pub struct Agent {
209    client: TokioXmppClient,
210    default_nick: Arc<RwLock<String>>,
211    lang: Arc<Vec<String>>,
212    disco: DiscoInfoResult,
213    node: String,
214    uploads: Vec<(String, Jid, PathBuf)>,
215}
216
217impl Agent {
218    pub async fn disconnect(&mut self) -> Result<(), Error> {
219        self.client.send_end().await
220    }
221
222    pub async fn join_room(
223        &mut self,
224        room: BareJid,
225        nick: Option<String>,
226        password: Option<String>,
227        lang: &str,
228        status: &str,
229    ) {
230        let mut muc = Muc::new();
231        if let Some(password) = password {
232            muc = muc.with_password(password);
233        }
234
235        let nick = nick.unwrap_or_else(|| self.default_nick.read().unwrap().clone());
236        let room_jid = room.with_resource_str(&nick).unwrap();
237        let mut presence = Presence::new(PresenceType::None).with_to(room_jid);
238        presence.add_payload(muc);
239        presence.set_status(String::from(lang), String::from(status));
240        let _ = self.client.send_stanza(presence.into()).await;
241    }
242
243    /// Send a "leave room" request to the server (specifically, an "unavailable" presence stanza).
244    ///
245    /// The returned future will resolve when the request has been sent,
246    /// not when the room has actually been left.
247    ///
248    /// If successful, a `RoomLeft` event should be received later as a confirmation.
249    ///
250    /// See: https://xmpp.org/extensions/xep-0045.html#exit
251    ///
252    /// Note that this method does NOT remove the room from the auto-join list; the latter
253    /// is more a list of bookmarks that the account knows about and that have a flag set
254    /// to indicate that they should be joined automatically after connecting (see the JoinRoom event).
255    ///
256    /// Regarding the latter, see the these minutes about auto-join behavior:
257    /// https://docs.modernxmpp.org/meetings/2019-01-brussels/#bookmarks
258    ///
259    /// # Arguments
260    ///
261    /// * `room_jid`: The JID of the room to leave.
262    /// * `nickname`: The nickname to use in the room.
263    /// * `lang`: The language of the status message.
264    /// * `status`: The status message to send.
265    pub async fn leave_room(
266        &mut self,
267        room_jid: BareJid,
268        nickname: RoomNick,
269        lang: impl Into<String>,
270        status: impl Into<String>,
271    ) {
272        // XEP-0045 specifies that, to leave a room, the client must send a presence stanza
273        // with type="unavailable".
274        let mut presence = Presence::new(PresenceType::Unavailable).with_to(
275            room_jid
276                .with_resource_str(nickname.as_str())
277                .expect("Invalid room JID after adding resource part."),
278        );
279
280        // Optionally, the client may include a status message in the presence stanza.
281        // TODO: Should this be optional? The XEP says "MAY", but the method signature requires the arguments.
282        // XEP-0045: "The occupant MAY include normal <status/> information in the unavailable presence stanzas"
283        presence.set_status(lang, status);
284
285        // Send the presence stanza.
286        if let Err(e) = self.client.send_stanza(presence.into()).await {
287            // Report any errors to the log.
288            error!("Failed to send leave room presence: {}", e);
289        }
290    }
291
292    pub async fn send_message(
293        &mut self,
294        recipient: Jid,
295        type_: MessageType,
296        lang: &str,
297        text: &str,
298    ) {
299        let mut message = Message::new(Some(recipient));
300        message.type_ = type_;
301        message
302            .bodies
303            .insert(String::from(lang), Body(String::from(text)));
304        let _ = self.client.send_stanza(message.into()).await;
305    }
306
307    pub async fn send_room_private_message(
308        &mut self,
309        room: BareJid,
310        recipient: RoomNick,
311        lang: &str,
312        text: &str,
313    ) {
314        let recipient: Jid = room.with_resource_str(&recipient).unwrap().into();
315        let mut message = Message::new(recipient).with_payload(MucUser::new());
316        message.type_ = MessageType::Chat;
317        message
318            .bodies
319            .insert(String::from(lang), Body(String::from(text)));
320        let _ = self.client.send_stanza(message.into()).await;
321    }
322
323    fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
324        let caps_data = compute_disco(disco);
325        let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
326        let caps = Caps::new(node, hash);
327
328        let mut presence = Presence::new(PresenceType::None);
329        presence.add_payload(caps);
330        presence
331    }
332
333    async fn handle_iq(&mut self, iq: Iq) -> Vec<Event> {
334        let mut events = vec![];
335        let from = iq
336            .from
337            .clone()
338            .unwrap_or_else(|| self.client.bound_jid().unwrap().clone());
339        if let IqType::Get(payload) = iq.payload {
340            if payload.is("query", ns::DISCO_INFO) {
341                let query = DiscoInfoQuery::try_from(payload);
342                match query {
343                    Ok(query) => {
344                        let mut disco_info = self.disco.clone();
345                        disco_info.node = query.node;
346                        let iq = Iq::from_result(iq.id, Some(disco_info))
347                            .with_to(iq.from.unwrap())
348                            .into();
349                        let _ = self.client.send_stanza(iq).await;
350                    }
351                    Err(err) => {
352                        let error = StanzaError::new(
353                            ErrorType::Modify,
354                            DefinedCondition::BadRequest,
355                            "en",
356                            &format!("{}", err),
357                        );
358                        let iq = Iq::from_error(iq.id, error)
359                            .with_to(iq.from.unwrap())
360                            .into();
361                        let _ = self.client.send_stanza(iq).await;
362                    }
363                }
364            } else {
365                // We MUST answer unhandled get iqs with a service-unavailable error.
366                let error = StanzaError::new(
367                    ErrorType::Cancel,
368                    DefinedCondition::ServiceUnavailable,
369                    "en",
370                    "No handler defined for this kind of iq.",
371                );
372                let iq = Iq::from_error(iq.id, error)
373                    .with_to(iq.from.unwrap())
374                    .into();
375                let _ = self.client.send_stanza(iq).await;
376            }
377        } else if let IqType::Result(Some(payload)) = iq.payload {
378            // TODO: move private iqs like this one somewhere else, for
379            // security reasons.
380            if payload.is("query", ns::ROSTER) && Some(from.clone()) == iq.from {
381                let roster = Roster::try_from(payload).unwrap();
382                for item in roster.items.into_iter() {
383                    events.push(Event::ContactAdded(item));
384                }
385            } else if payload.is("pubsub", ns::PUBSUB) {
386                let new_events = pubsub::handle_iq_result(&from, payload);
387                events.extend(new_events);
388            } else if payload.is("slot", ns::HTTP_UPLOAD) {
389                let new_events = handle_upload_result(&from, iq.id, payload, self).await;
390                events.extend(new_events);
391            }
392        } else if let IqType::Set(_) = iq.payload {
393            // We MUST answer unhandled set iqs with a service-unavailable error.
394            let error = StanzaError::new(
395                ErrorType::Cancel,
396                DefinedCondition::ServiceUnavailable,
397                "en",
398                "No handler defined for this kind of iq.",
399            );
400            let iq = Iq::from_error(iq.id, error)
401                .with_to(iq.from.unwrap())
402                .into();
403            let _ = self.client.send_stanza(iq).await;
404        }
405
406        events
407    }
408
409    async fn handle_message(&mut self, message: Message) -> Vec<Event> {
410        let mut events = vec![];
411        let from = message.from.clone().unwrap();
412        let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect();
413        match message.get_best_body(langs) {
414            Some((_lang, body)) => match message.type_ {
415                MessageType::Groupchat => {
416                    let event = match from.clone() {
417                        Jid::Full(full) => Event::RoomMessage(
418                            message.id.clone(),
419                            from.to_bare(),
420                            full.resource_str().to_owned(),
421                            body.clone(),
422                        ),
423                        Jid::Bare(bare) => {
424                            Event::ServiceMessage(message.id.clone(), bare, body.clone())
425                        }
426                    };
427                    events.push(event)
428                }
429                MessageType::Chat | MessageType::Normal => {
430                    let mut found_special_message = false;
431
432                    for payload in &message.payloads {
433                        if let Ok(_) = MucUser::try_from(payload.clone()) {
434                            let event = match from.clone() {
435                                Jid::Bare(bare) => {
436                                    // TODO: Can a service message be of type Chat/Normal and not Groupchat?
437                                    warn!("Received misformed MessageType::Chat in muc#user namespace from a bare JID.");
438                                    Event::ServiceMessage(message.id.clone(), bare, body.clone())
439                                }
440                                Jid::Full(full) => Event::RoomPrivateMessage(
441                                    message.id.clone(),
442                                    full.to_bare(),
443                                    full.resource_str().to_owned(),
444                                    body.clone(),
445                                ),
446                            };
447
448                            found_special_message = true;
449                            events.push(event);
450                        }
451                    }
452
453                    if !found_special_message {
454                        let event =
455                            Event::ChatMessage(message.id.clone(), from.to_bare(), body.clone());
456                        events.push(event)
457                    }
458                }
459                _ => (),
460            },
461            None => (),
462        }
463        for child in message.payloads {
464            if child.is("event", ns::PUBSUB_EVENT) {
465                let new_events = pubsub::handle_event(&from, child, self).await;
466                events.extend(new_events);
467            }
468        }
469
470        events
471    }
472
473    /// Translate a `Presence` stanza into a list of higher-level `Event`s.
474    async fn handle_presence(&mut self, presence: Presence) -> Vec<Event> {
475        // Allocate an empty vector to store the events.
476        let mut events = vec![];
477
478        // Extract the JID of the sender (i.e. the one whose presence is being sent).
479        let from = presence.from.unwrap().to_bare();
480
481        // Search through the payloads for a MUC user status.
482
483        if let Some(muc) = presence
484            .payloads
485            .iter()
486            .filter_map(|p| MucUser::try_from(p.clone()).ok())
487            .next()
488        {
489            // If a MUC user status was found, search through the statuses for a self-presence.
490            if muc.status.iter().any(|s| *s == Status::SelfPresence) {
491                // If a self-presence was found, then the stanza is about the client's own presence.
492
493                match presence.type_ {
494                    PresenceType::None => {
495                        // According to https://xmpp.org/extensions/xep-0045.html#enter-pres, no type should be seen as "available".
496                        events.push(Event::RoomJoined(from.clone()));
497                    }
498                    PresenceType::Unavailable => {
499                        // According to https://xmpp.org/extensions/xep-0045.html#exit, the server will use type "unavailable" to notify the client that it has left the room/
500                        events.push(Event::RoomLeft(from.clone()));
501                    }
502                    _ => unimplemented!("Presence type {:?}", presence.type_), // TODO: What to do here?
503                }
504            }
505        }
506
507        // Return the list of events.
508        events
509    }
510
511    /// Wait for new events.
512    ///
513    /// # Returns
514    ///
515    /// - `Some(events)` if there are new events; multiple may be returned at once.
516    /// - `None` if the underlying stream is closed.
517    pub async fn wait_for_events(&mut self) -> Option<Vec<Event>> {
518        if let Some(event) = self.client.next().await {
519            let mut events = Vec::new();
520
521            match event {
522                TokioXmppEvent::Online { resumed: false, .. } => {
523                    let presence = Self::make_initial_presence(&self.disco, &self.node).into();
524                    let _ = self.client.send_stanza(presence).await;
525                    events.push(Event::Online);
526                    // TODO: only send this when the ContactList feature is enabled.
527                    let iq = Iq::from_get(
528                        "roster",
529                        Roster {
530                            ver: None,
531                            items: vec![],
532                        },
533                    )
534                    .into();
535                    let _ = self.client.send_stanza(iq).await;
536                    // TODO: only send this when the JoinRooms feature is enabled.
537                    let iq =
538                        Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2))).into();
539                    let _ = self.client.send_stanza(iq).await;
540                }
541                TokioXmppEvent::Online { resumed: true, .. } => {}
542                TokioXmppEvent::Disconnected(e) => {
543                    events.push(Event::Disconnected(e));
544                }
545                TokioXmppEvent::Stanza(elem) => {
546                    if elem.is("iq", "jabber:client") {
547                        let iq = Iq::try_from(elem).unwrap();
548                        let new_events = self.handle_iq(iq).await;
549                        events.extend(new_events);
550                    } else if elem.is("message", "jabber:client") {
551                        let message = Message::try_from(elem).unwrap();
552                        let new_events = self.handle_message(message).await;
553                        events.extend(new_events);
554                    } else if elem.is("presence", "jabber:client") {
555                        let presence = Presence::try_from(elem).unwrap();
556                        let new_events = self.handle_presence(presence).await;
557                        events.extend(new_events);
558                    } else if elem.is("error", "http://etherx.jabber.org/streams") {
559                        println!("Received a fatal stream error: {}", String::from(&elem));
560                    } else {
561                        panic!("Unknown stanza: {}", String::from(&elem));
562                    }
563                }
564            }
565
566            Some(events)
567        } else {
568            None
569        }
570    }
571
572    pub async fn upload_file_with(&mut self, service: &str, path: &Path) {
573        let name = path.file_name().unwrap().to_str().unwrap().to_string();
574        let file = File::open(path).await.unwrap();
575        let size = file.metadata().await.unwrap().len();
576        let slot_request = SlotRequest {
577            filename: name,
578            size: size,
579            content_type: None,
580        };
581        let to = service.parse::<Jid>().unwrap();
582        let request = Iq::from_get("upload1", slot_request).with_to(to.clone());
583        self.uploads
584            .push((String::from("upload1"), to, path.to_path_buf()));
585        self.client.send_stanza(request.into()).await.unwrap();
586    }
587}
588
589async fn handle_upload_result(
590    from: &Jid,
591    iqid: String,
592    elem: Element,
593    agent: &mut Agent,
594) -> impl IntoIterator<Item = Event> {
595    let mut res: Option<(usize, PathBuf)> = None;
596
597    for (i, (id, to, filepath)) in agent.uploads.iter().enumerate() {
598        if to == from && id == &iqid {
599            res = Some((i, filepath.to_path_buf()));
600            break;
601        }
602    }
603
604    if let Some((index, file)) = res {
605        agent.uploads.remove(index);
606        let slot = SlotResult::try_from(elem).unwrap();
607
608        let mut headers = ReqwestHeaderMap::new();
609        for header in slot.put.headers {
610            let (attr, val) = match header {
611                HttpUploadHeader::Authorization(val) => ("Authorization", val),
612                HttpUploadHeader::Cookie(val) => ("Cookie", val),
613                HttpUploadHeader::Expires(val) => ("Expires", val),
614            };
615            headers.insert(attr, val.parse().unwrap());
616        }
617
618        let web = ReqwestClient::new();
619        let stream = FramedRead::new(File::open(file).await.unwrap(), BytesCodec::new());
620        let body = ReqwestBody::wrap_stream(stream);
621        let res = web
622            .put(slot.put.url.as_str())
623            .headers(headers)
624            .body(body)
625            .send()
626            .await
627            .unwrap();
628        if res.status() == 201 {
629            return vec![Event::HttpUploadedFile(slot.get.url)];
630        }
631    }
632
633    return vec![];
634}
635
636#[cfg(test)]
637mod tests {
638    use super::{Agent, BareJid, ClientBuilder, ClientFeature, ClientType, Event};
639    use std::str::FromStr;
640    use tokio_xmpp::AsyncClient as TokioXmppClient;
641
642    #[tokio::test]
643    async fn test_simple() {
644        let jid = BareJid::from_str("foo@bar").unwrap();
645
646        let client = TokioXmppClient::new(jid.clone(), "meh");
647
648        // Client instance
649        let client_builder = ClientBuilder::new(jid, "meh")
650            .set_client(ClientType::Bot, "xmpp-rs")
651            .set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
652            .set_default_nick("bot")
653            .enable_feature(ClientFeature::ContactList);
654
655        #[cfg(feature = "avatars")]
656        let client_builder = client_builder.enable_feature(ClientFeature::Avatars);
657
658        let mut agent: Agent = client_builder.build_impl(client);
659
660        while let Some(events) = agent.wait_for_events().await {
661            assert!(match events[0] {
662                Event::Disconnected(_) => true,
663                _ => false,
664            });
665            assert_eq!(events.len(), 1);
666            break;
667        }
668    }
669}