lib.rs

  1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
  2//
  3// This Source Code Form is subject to the terms of the Mozilla Public
  4// License, v. 2.0. If a copy of the MPL was not distributed with this
  5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7#![deny(bare_trait_objects)]
  8
  9use futures::stream::StreamExt;
 10use reqwest::{Body as ReqwestBody, Client as ReqwestClient};
 11use std::cell::RefCell;
 12use std::convert::TryFrom;
 13use std::path::{Path, PathBuf};
 14use std::rc::Rc;
 15use tokio::fs::File;
 16use tokio_util::codec::{BytesCodec, FramedRead};
 17use tokio_xmpp::{AsyncClient as TokioXmppClient, Event as TokioXmppEvent};
 18use xmpp_parsers::{
 19    bookmarks2::Conference,
 20    caps::{compute_disco, hash_caps, Caps},
 21    disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
 22    hashes::Algo,
 23    http_upload::{SlotRequest, SlotResult},
 24    iq::{Iq, IqType},
 25    message::{Body, Message, MessageType},
 26    muc::{
 27        user::{MucUser, Status},
 28        Muc,
 29    },
 30    ns,
 31    presence::{Presence, Type as PresenceType},
 32    pubsub::pubsub::{Items, PubSub},
 33    roster::{Item as RosterItem, Roster},
 34    stanza_error::{DefinedCondition, ErrorType, StanzaError},
 35    BareJid, Element, FullJid, Jid,
 36};
 37#[macro_use]
 38extern crate log;
 39
 40mod pubsub;
 41
 42pub type Error = tokio_xmpp::Error;
 43
 44#[derive(Debug)]
 45pub enum ClientType {
 46    Bot,
 47    Pc,
 48}
 49
 50impl Default for ClientType {
 51    fn default() -> Self {
 52        ClientType::Bot
 53    }
 54}
 55
 56impl ToString for ClientType {
 57    fn to_string(&self) -> String {
 58        String::from(match self {
 59            ClientType::Bot => "bot",
 60            ClientType::Pc => "pc",
 61        })
 62    }
 63}
 64
 65#[derive(PartialEq)]
 66pub enum ClientFeature {
 67    #[cfg(feature = "avatars")]
 68    Avatars,
 69    ContactList,
 70    JoinRooms,
 71}
 72
 73pub type RoomNick = String;
 74
 75#[derive(Debug)]
 76pub enum Event {
 77    Online,
 78    Disconnected,
 79    ContactAdded(RosterItem),
 80    ContactRemoved(RosterItem),
 81    ContactChanged(RosterItem),
 82    #[cfg(feature = "avatars")]
 83    AvatarRetrieved(Jid, String),
 84    ChatMessage(BareJid, Body),
 85    JoinRoom(BareJid, Conference),
 86    LeaveRoom(BareJid),
 87    LeaveAllRooms,
 88    RoomJoined(BareJid),
 89    RoomLeft(BareJid),
 90    RoomMessage(BareJid, RoomNick, Body),
 91    HttpUploadedFile(String),
 92}
 93
 94#[derive(Default)]
 95pub struct ClientBuilder<'a> {
 96    jid: &'a str,
 97    password: &'a str,
 98    website: String,
 99    default_nick: String,
100    lang: Vec<String>,
101    disco: (ClientType, String),
102    features: Vec<ClientFeature>,
103}
104
105impl ClientBuilder<'_> {
106    pub fn new<'a>(jid: &'a str, password: &'a str) -> ClientBuilder<'a> {
107        ClientBuilder {
108            jid,
109            password,
110            website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
111            default_nick: String::from("xmpp-rs"),
112            lang: vec![String::from("en")],
113            disco: (ClientType::default(), String::from("tokio-xmpp")),
114            features: vec![],
115        }
116    }
117
118    pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
119        self.disco = (type_, String::from(name));
120        self
121    }
122
123    pub fn set_website(mut self, url: &str) -> Self {
124        self.website = String::from(url);
125        self
126    }
127
128    pub fn set_default_nick(mut self, nick: &str) -> Self {
129        self.default_nick = String::from(nick);
130        self
131    }
132
133    pub fn set_lang(mut self, lang: Vec<String>) -> Self {
134        self.lang = lang;
135        self
136    }
137
138    pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
139        self.features.push(feature);
140        self
141    }
142
143    fn make_disco(&self) -> DiscoInfoResult {
144        let identities = vec![Identity::new(
145            "client",
146            self.disco.0.to_string(),
147            "en",
148            self.disco.1.to_string(),
149        )];
150        let mut features = vec![Feature::new(ns::DISCO_INFO)];
151        #[cfg(feature = "avatars")]
152        {
153            if self.features.contains(&ClientFeature::Avatars) {
154                features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
155            }
156        }
157        if self.features.contains(&ClientFeature::JoinRooms) {
158            features.push(Feature::new(format!("{}+notify", ns::BOOKMARKS2)));
159        }
160        DiscoInfoResult {
161            node: None,
162            identities,
163            features,
164            extensions: vec![],
165        }
166    }
167
168    pub fn build(self) -> Result<Agent, Error> {
169        let client = TokioXmppClient::new(self.jid, self.password)?;
170        Ok(self.build_impl(client)?)
171    }
172
173    // This function is meant to be used for testing build
174    pub(crate) fn build_impl(self, client: TokioXmppClient) -> Result<Agent, Error> {
175        let disco = self.make_disco();
176        let node = self.website;
177
178        let agent = Agent {
179            client,
180            default_nick: Rc::new(RefCell::new(self.default_nick)),
181            lang: Rc::new(self.lang),
182            disco,
183            node,
184            uploads: Vec::new(),
185        };
186
187        Ok(agent)
188    }
189}
190
191pub struct Agent {
192    client: TokioXmppClient,
193    default_nick: Rc<RefCell<String>>,
194    lang: Rc<Vec<String>>,
195    disco: DiscoInfoResult,
196    node: String,
197    uploads: Vec<(String, Jid, PathBuf)>,
198}
199
200impl Agent {
201    pub async fn join_room(
202        &mut self,
203        room: BareJid,
204        nick: Option<String>,
205        password: Option<String>,
206        lang: &str,
207        status: &str,
208    ) {
209        let mut muc = Muc::new();
210        if let Some(password) = password {
211            muc = muc.with_password(password);
212        }
213
214        let nick = nick.unwrap_or_else(|| self.default_nick.borrow().clone());
215        let room_jid = room.with_resource(nick);
216        let mut presence = Presence::new(PresenceType::None).with_to(Jid::Full(room_jid));
217        presence.add_payload(muc);
218        presence.set_status(String::from(lang), String::from(status));
219        let _ = self.client.send_stanza(presence.into()).await;
220    }
221
222    pub async fn send_message(
223        &mut self,
224        recipient: Jid,
225        type_: MessageType,
226        lang: &str,
227        text: &str,
228    ) {
229        let mut message = Message::new(Some(recipient));
230        message.type_ = type_;
231        message
232            .bodies
233            .insert(String::from(lang), Body(String::from(text)));
234        let _ = self.client.send_stanza(message.into()).await;
235    }
236
237    fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
238        let caps_data = compute_disco(disco);
239        let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
240        let caps = Caps::new(node, hash);
241
242        let mut presence = Presence::new(PresenceType::None);
243        presence.add_payload(caps);
244        presence
245    }
246
247    async fn handle_iq(&mut self, iq: Iq) -> Vec<Event> {
248        let mut events = vec![];
249        let from = iq
250            .from
251            .clone()
252            .unwrap_or_else(|| self.client.bound_jid().unwrap().clone());
253        if let IqType::Get(payload) = iq.payload {
254            if payload.is("query", ns::DISCO_INFO) {
255                let query = DiscoInfoQuery::try_from(payload);
256                match query {
257                    Ok(query) => {
258                        let mut disco_info = self.disco.clone();
259                        disco_info.node = query.node;
260                        let iq = Iq::from_result(iq.id, Some(disco_info))
261                            .with_to(iq.from.unwrap())
262                            .into();
263                        let _ = self.client.send_stanza(iq).await;
264                    }
265                    Err(err) => {
266                        let error = StanzaError::new(
267                            ErrorType::Modify,
268                            DefinedCondition::BadRequest,
269                            "en",
270                            &format!("{}", err),
271                        );
272                        let iq = Iq::from_error(iq.id, error)
273                            .with_to(iq.from.unwrap())
274                            .into();
275                        let _ = self.client.send_stanza(iq).await;
276                    }
277                }
278            } else {
279                // We MUST answer unhandled get iqs with a service-unavailable error.
280                let error = StanzaError::new(
281                    ErrorType::Cancel,
282                    DefinedCondition::ServiceUnavailable,
283                    "en",
284                    "No handler defined for this kind of iq.",
285                );
286                let iq = Iq::from_error(iq.id, error)
287                    .with_to(iq.from.unwrap())
288                    .into();
289                let _ = self.client.send_stanza(iq).await;
290            }
291        } else if let IqType::Result(Some(payload)) = iq.payload {
292            // TODO: move private iqs like this one somewhere else, for
293            // security reasons.
294            if payload.is("query", ns::ROSTER) && iq.from.is_none() {
295                let roster = Roster::try_from(payload).unwrap();
296                for item in roster.items.into_iter() {
297                    events.push(Event::ContactAdded(item));
298                }
299            } else if payload.is("pubsub", ns::PUBSUB) {
300                let new_events = pubsub::handle_iq_result(&from, payload);
301                events.extend(new_events);
302            } else if payload.is("slot", ns::HTTP_UPLOAD) {
303                let new_events = handle_upload_result(&from, iq.id, payload, self).await;
304                events.extend(new_events);
305            }
306        } else if let IqType::Set(_) = iq.payload {
307            // We MUST answer unhandled set iqs with a service-unavailable error.
308            let error = StanzaError::new(
309                ErrorType::Cancel,
310                DefinedCondition::ServiceUnavailable,
311                "en",
312                "No handler defined for this kind of iq.",
313            );
314            let iq = Iq::from_error(iq.id, error)
315                .with_to(iq.from.unwrap())
316                .into();
317            let _ = self.client.send_stanza(iq).await;
318        }
319
320        events
321    }
322
323    async fn handle_message(&mut self, message: Message) -> Vec<Event> {
324        let mut events = vec![];
325        let from = message.from.clone().unwrap();
326        let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect();
327        match message.get_best_body(langs) {
328            Some((_lang, body)) => match message.type_ {
329                MessageType::Groupchat => {
330                    let event = Event::RoomMessage(
331                        from.clone().into(),
332                        FullJid::try_from(from.clone()).unwrap().resource,
333                        body.clone(),
334                    );
335                    events.push(event)
336                }
337                MessageType::Chat | MessageType::Normal => {
338                    let event = Event::ChatMessage(from.clone().into(), body.clone());
339                    events.push(event)
340                }
341                _ => (),
342            },
343            None => (),
344        }
345        for child in message.payloads {
346            if child.is("event", ns::PUBSUB_EVENT) {
347                let new_events = pubsub::handle_event(&from, child, self).await;
348                events.extend(new_events);
349            }
350        }
351
352        events
353    }
354
355    async fn handle_presence(&mut self, presence: Presence) -> Vec<Event> {
356        let mut events = vec![];
357        let from: BareJid = match presence.from.clone().unwrap() {
358            Jid::Full(FullJid { node, domain, .. }) => BareJid { node, domain },
359            Jid::Bare(bare) => bare,
360        };
361        for payload in presence.payloads.into_iter() {
362            let muc_user = match MucUser::try_from(payload) {
363                Ok(muc_user) => muc_user,
364                _ => continue,
365            };
366            for status in muc_user.status.into_iter() {
367                if status == Status::SelfPresence {
368                    events.push(Event::RoomJoined(from.clone()));
369                    break;
370                }
371            }
372        }
373
374        events
375    }
376
377    pub async fn wait_for_events(&mut self) -> Option<Vec<Event>> {
378        if let Some(event) = self.client.next().await {
379            let mut events = Vec::new();
380
381            match event {
382                TokioXmppEvent::Online { resumed: false, .. } => {
383                    let presence = Self::make_initial_presence(&self.disco, &self.node).into();
384                    let _ = self.client.send_stanza(presence).await;
385                    events.push(Event::Online);
386                    // TODO: only send this when the ContactList feature is enabled.
387                    let iq = Iq::from_get(
388                        "roster",
389                        Roster {
390                            ver: None,
391                            items: vec![],
392                        },
393                    )
394                    .into();
395                    let _ = self.client.send_stanza(iq).await;
396                    // TODO: only send this when the JoinRooms feature is enabled.
397                    let iq =
398                        Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2))).into();
399                    let _ = self.client.send_stanza(iq).await;
400                }
401                TokioXmppEvent::Online { resumed: true, .. } => {}
402                TokioXmppEvent::Disconnected(_) => {
403                    events.push(Event::Disconnected);
404                }
405                TokioXmppEvent::Stanza(elem) => {
406                    if elem.is("iq", "jabber:client") {
407                        let iq = Iq::try_from(elem).unwrap();
408                        let new_events = self.handle_iq(iq).await;
409                        events.extend(new_events);
410                    } else if elem.is("message", "jabber:client") {
411                        let message = Message::try_from(elem).unwrap();
412                        let new_events = self.handle_message(message).await;
413                        events.extend(new_events);
414                    } else if elem.is("presence", "jabber:client") {
415                        let presence = Presence::try_from(elem).unwrap();
416                        let new_events = self.handle_presence(presence).await;
417                        events.extend(new_events);
418                    } else if elem.is("error", "http://etherx.jabber.org/streams") {
419                        println!("Received a fatal stream error: {}", String::from(&elem));
420                    } else {
421                        panic!("Unknown stanza: {}", String::from(&elem));
422                    }
423                }
424            }
425
426            Some(events)
427        } else {
428            None
429        }
430    }
431
432    pub async fn upload_file_with(&mut self, service: &str, path: &Path) {
433        let name = path.file_name().unwrap().to_str().unwrap().to_string();
434        let file = File::open(path).await.unwrap();
435        let size = file.metadata().await.unwrap().len();
436        let slot_request = SlotRequest {
437            filename: name,
438            size: size,
439            content_type: None,
440        };
441        let to = service.parse::<Jid>().unwrap();
442        let request = Iq::from_get("upload1", slot_request).with_to(to.clone());
443        self.uploads
444            .push((String::from("upload1"), to, path.to_path_buf()));
445        self.client.send_stanza(request.into()).await.unwrap();
446    }
447}
448
449async fn handle_upload_result(
450    from: &Jid,
451    iqid: String,
452    elem: Element,
453    agent: &mut Agent,
454) -> impl IntoIterator<Item = Event> {
455    let mut res: Option<(usize, PathBuf)> = None;
456
457    for (i, (id, to, filepath)) in agent.uploads.iter().enumerate() {
458        if to == from && id == &iqid {
459            res = Some((i, filepath.to_path_buf()));
460            break;
461        }
462    }
463
464    if let Some((index, file)) = res {
465        agent.uploads.remove(index);
466        let slot = SlotResult::try_from(elem).unwrap();
467        let web = ReqwestClient::new();
468        let stream = FramedRead::new(File::open(file).await.unwrap(), BytesCodec::new());
469        let body = ReqwestBody::wrap_stream(stream);
470        let res = web
471            .put(slot.put.url.as_str())
472            .body(body)
473            .send()
474            .await
475            .unwrap();
476        if res.status() == 201 {
477            return vec![Event::HttpUploadedFile(slot.get.url)];
478        }
479    }
480
481    return vec![];
482}
483
484#[cfg(test)]
485mod tests {
486    use super::{Agent, ClientBuilder, ClientFeature, ClientType, Event};
487    use tokio_xmpp::AsyncClient as TokioXmppClient;
488
489    #[tokio::test]
490    async fn test_simple() {
491        let client = TokioXmppClient::new("foo@bar", "meh").unwrap();
492
493        // Client instance
494        let client_builder = ClientBuilder::new("foo@bar", "meh")
495            .set_client(ClientType::Bot, "xmpp-rs")
496            .set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
497            .set_default_nick("bot")
498            .enable_feature(ClientFeature::Avatars)
499            .enable_feature(ClientFeature::ContactList);
500
501        let mut agent: Agent = client_builder.build_impl(client).unwrap();
502
503        while let Some(events) = agent.wait_for_events().await {
504            assert!(match events[0] {
505                Event::Disconnected => true,
506                _ => false,
507            });
508            assert_eq!(events.len(), 1);
509            break;
510        }
511    }
512}