lib.rs

  1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
  2//
  3// This Source Code Form is subject to the terms of the Mozilla Public
  4// License, v. 2.0. If a copy of the MPL was not distributed with this
  5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7#![deny(bare_trait_objects)]
  8
  9use std::str::FromStr;
 10use std::rc::Rc;
 11use std::cell::RefCell;
 12use std::convert::TryFrom;
 13use futures::{Future,Stream, Sink, sync::mpsc};
 14use tokio_xmpp::{
 15    Client as TokioXmppClient,
 16    Event as TokioXmppEvent,
 17    Packet,
 18};
 19use xmpp_parsers::{
 20    bookmarks2::Conference,
 21    caps::{compute_disco, hash_caps, Caps},
 22    disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
 23    hashes::Algo,
 24    iq::{Iq, IqType},
 25    message::{Message, MessageType, Body},
 26    muc::{
 27        Muc,
 28        user::{MucUser, Status},
 29    },
 30    ns,
 31    presence::{Presence, Type as PresenceType},
 32    pubsub::pubsub::{PubSub, Items},
 33    roster::{Roster, Item as RosterItem},
 34    stanza_error::{StanzaError, ErrorType, DefinedCondition},
 35    Jid, BareJid, FullJid, JidParseError,
 36};
 37#[macro_use]
 38extern crate log;
 39
 40mod pubsub;
 41
 42pub type Error = tokio_xmpp::Error;
 43
 44#[derive(Debug)]
 45pub enum ClientType {
 46    Bot,
 47    Pc,
 48}
 49
 50impl Default for ClientType {
 51    fn default() -> Self {
 52        ClientType::Bot
 53    }
 54}
 55
 56impl ToString for ClientType {
 57    fn to_string(&self) -> String {
 58        String::from(
 59            match self {
 60                ClientType::Bot => "bot",
 61                ClientType::Pc => "pc",
 62            }
 63        )
 64    }
 65}
 66
 67#[derive(PartialEq)]
 68pub enum ClientFeature {
 69    #[cfg(feature = "avatars")]
 70    Avatars,
 71    ContactList,
 72    JoinRooms,
 73}
 74
 75#[derive(Debug)]
 76pub enum Event {
 77    Online,
 78    Disconnected,
 79    ContactAdded(RosterItem),
 80    ContactRemoved(RosterItem),
 81    ContactChanged(RosterItem),
 82    #[cfg(feature = "avatars")]
 83    AvatarRetrieved(Jid, String),
 84    JoinRoom(BareJid, Conference),
 85    LeaveRoom(BareJid),
 86    LeaveAllRooms,
 87    RoomJoined(BareJid),
 88    RoomLeft(BareJid),
 89}
 90
 91#[derive(Default)]
 92pub struct ClientBuilder<'a> {
 93    jid: &'a str,
 94    password: &'a str,
 95    website: String,
 96    default_nick: String,
 97    disco: (ClientType, String),
 98    features: Vec<ClientFeature>,
 99}
100
101impl ClientBuilder<'_> {
102    pub fn new<'a>(jid: &'a str, password: &'a str) -> ClientBuilder<'a> {
103        ClientBuilder {
104            jid,
105            password,
106            website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
107            default_nick: String::from("xmpp-rs"),
108            disco: (ClientType::default(), String::from("tokio-xmpp")),
109            features: vec![],
110        }
111    }
112
113    pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
114        self.disco = (type_, String::from(name));
115        self
116    }
117
118    pub fn set_website(mut self, url: &str) -> Self {
119        self.website = String::from(url);
120        self
121    }
122
123    pub fn set_default_nick(mut self, nick: &str) -> Self {
124        self.default_nick = String::from(nick);
125        self
126    }
127
128    pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
129        self.features.push(feature);
130        self
131    }
132
133    fn make_disco(&self) -> DiscoInfoResult {
134        let identities = vec![Identity::new("client", self.disco.0.to_string(),
135                                            "en", self.disco.1.to_string())];
136        let mut features = vec![
137            Feature::new(ns::DISCO_INFO),
138        ];
139        #[cfg(feature = "avatars")]
140        {
141            if self.features.contains(&ClientFeature::Avatars) {
142                features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
143            }
144        }
145        if self.features.contains(&ClientFeature::JoinRooms) {
146            features.push(Feature::new(format!("{}+notify", ns::BOOKMARKS2)));
147        }
148        DiscoInfoResult {
149            node: None,
150            identities,
151            features,
152            extensions: vec![],
153        }
154    }
155
156    fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
157        let caps_data = compute_disco(disco);
158        let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
159        let caps = Caps::new(node, hash);
160
161        let mut presence = Presence::new(PresenceType::None);
162        presence.add_payload(caps);
163        presence
164    }
165
166    pub fn build(
167        self,
168    ) -> Result<(Agent, impl Stream<Item = Event, Error = tokio_xmpp::Error>), JidParseError> {
169        let client = TokioXmppClient::new(self.jid, self.password)?;
170        Ok(self.build_impl(client))
171    }
172
173    // This function is meant to be used for testing build
174    pub(crate) fn build_impl<S>(
175        self,
176        stream: S,
177    ) -> (Agent, impl Stream<Item = Event, Error = tokio_xmpp::Error>)
178    where
179        S: Stream<Item = tokio_xmpp::Event, Error = tokio_xmpp::Error>
180            + Sink<SinkItem = tokio_xmpp::Packet, SinkError = tokio_xmpp::Error>,
181    {
182        let disco = self.make_disco();
183        let node = self.website;
184        let (sender_tx, sender_rx) = mpsc::unbounded();
185
186        let client = stream;
187        let (sink, stream) = client.split();
188
189        let reader = {
190            let mut sender_tx = sender_tx.clone();
191            let jid = self.jid.to_owned();
192            stream.map(move |event| {
193                // Helper function to send an iq error.
194                let mut events = Vec::new();
195                let send_error = |to, id, type_, condition, text: &str| {
196                    let error = StanzaError::new(type_, condition, "en", text);
197                    let iq = Iq::from_error(id, error)
198                        .with_to(to)
199                        .into();
200                    sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
201                };
202
203                match event {
204                    TokioXmppEvent::Online => {
205                        let presence = ClientBuilder::make_initial_presence(&disco, &node).into();
206                        let packet = Packet::Stanza(presence);
207                        sender_tx.unbounded_send(packet)
208                            .unwrap();
209                        events.push(Event::Online);
210                        // TODO: only send this when the ContactList feature is enabled.
211                        let iq = Iq::from_get("roster", Roster { ver: None, items: vec![] })
212                            .into();
213                        sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
214                        // TODO: only send this when the JoinRooms feature is enabled.
215                        let iq = Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2)))
216                            .into();
217                        sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
218                    }
219                    TokioXmppEvent::Disconnected => {
220                        events.push(Event::Disconnected);
221                    }
222                    TokioXmppEvent::Stanza(stanza) => {
223                        if stanza.is("iq", "jabber:client") {
224                            let iq = Iq::try_from(stanza).unwrap();
225                            let from =
226                                iq.from.clone().unwrap_or_else(|| Jid::from_str(&jid).unwrap());
227                            if let IqType::Get(payload) = iq.payload {
228                                if payload.is("query", ns::DISCO_INFO) {
229                                    let query = DiscoInfoQuery::try_from(payload);
230                                    match query {
231                                        Ok(query) => {
232                                            let mut disco_info = disco.clone();
233                                            disco_info.node = query.node;
234                                            let iq = Iq::from_result(iq.id, Some(disco_info))
235                                                .with_to(iq.from.unwrap())
236                                                .into();
237                                            sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
238                                        },
239                                        Err(err) => {
240                                            send_error(iq.from.unwrap(), iq.id, ErrorType::Modify, DefinedCondition::BadRequest, &format!("{}", err));
241                                        },
242                                    }
243                                } else {
244                                    // We MUST answer unhandled get iqs with a service-unavailable error.
245                                    send_error(iq.from.unwrap(), iq.id, ErrorType::Cancel, DefinedCondition::ServiceUnavailable, "No handler defined for this kind of iq.");
246                                }
247                            } else if let IqType::Result(Some(payload)) = iq.payload {
248                                // TODO: move private iqs like this one somewhere else, for
249                                // security reasons.
250                                if payload.is("query", ns::ROSTER) && iq.from.is_none() {
251                                    let roster = Roster::try_from(payload).unwrap();
252                                    for item in roster.items.into_iter() {
253                                        events.push(Event::ContactAdded(item));
254                                    }
255                                } else if payload.is("pubsub", ns::PUBSUB) {
256                                    let new_events = pubsub::handle_iq_result(&from, payload);
257                                    events.extend(new_events);
258                                }
259                            } else if let IqType::Set(_) = iq.payload {
260                                // We MUST answer unhandled set iqs with a service-unavailable error.
261                                send_error(iq.from.unwrap(), iq.id, ErrorType::Cancel, DefinedCondition::ServiceUnavailable, "No handler defined for this kind of iq.");
262                            }
263                        } else if stanza.is("message", "jabber:client") {
264                            let message = Message::try_from(stanza).unwrap();
265                            let from = message.from.clone().unwrap();
266                            for child in message.payloads {
267                                if child.is("event", ns::PUBSUB_EVENT) {
268                                    let new_events = pubsub::handle_event(&from, child, &mut sender_tx);
269                                    events.extend(new_events);
270                                }
271                            }
272                        } else if stanza.is("presence", "jabber:client") {
273                            let presence = Presence::try_from(stanza).unwrap();
274                            let from: BareJid = match presence.from.clone().unwrap() {
275                                Jid::Full(FullJid { node, domain, .. }) => BareJid { node, domain },
276                                Jid::Bare(bare) => bare,
277                            };
278                            for payload in presence.payloads.into_iter() {
279                                let muc_user = match MucUser::try_from(payload) {
280                                    Ok(muc_user) => muc_user,
281                                    _ => continue
282                                };
283                                for status in muc_user.status.into_iter() {
284                                    if status == Status::SelfPresence {
285                                        events.push(Event::RoomJoined(from.clone()));
286                                        break;
287                                    }
288                                }
289                            }
290                        } else if stanza.is("error", "http://etherx.jabber.org/streams") {
291                            println!("Received a fatal stream error: {}", String::from(&stanza));
292                        } else {
293                            panic!("Unknown stanza: {}", String::from(&stanza));
294                        }
295                    }
296                }
297
298                futures::stream::iter_ok(events)
299            })
300            .flatten()
301        };
302
303        let sender = sender_rx
304            .map_err(|e| panic!("Sink error: {:?}", e))
305            .forward(sink)
306            .map(|(rx, mut sink)| {
307                drop(rx);
308                let _ = sink.close();
309                None
310            });
311
312        // TODO is this correct?
313        // Some(Error) means a real error
314        // None means the end of the sender stream and can be ignored
315        let future = reader
316            .map(Some)
317            .select(sender.into_stream())
318            .filter_map(|x| x);
319
320        let agent = Agent {
321            sender_tx,
322            default_nick: Rc::new(RefCell::new(self.default_nick)),
323        };
324
325        (agent, future)
326    }
327}
328
329#[derive(Clone)]
330pub struct Agent {
331    sender_tx: mpsc::UnboundedSender<Packet>,
332    default_nick: Rc<RefCell<String>>,
333}
334
335impl Agent {
336    pub fn join_room(&mut self, room: BareJid, nick: Option<String>, password: Option<String>,
337                     lang: &str, status: &str) {
338        let mut muc = Muc::new();
339        if let Some(password) = password {
340            muc = muc.with_password(password);
341        }
342
343        let nick = nick.unwrap_or_else(|| self.default_nick.borrow().clone());
344        let room_jid = room.with_resource(nick);
345        let mut presence = Presence::new(PresenceType::None)
346            .with_to(Jid::Full(room_jid));
347        presence.add_payload(muc);
348        presence.set_status(String::from(lang), String::from(status));
349        let presence = presence.into();
350        self.sender_tx.unbounded_send(Packet::Stanza(presence))
351            .unwrap();
352    }
353
354    pub fn send_message(&mut self, recipient: Jid, type_: MessageType, lang: &str, text: &str) {
355        let mut message = Message::new(Some(recipient));
356        message.type_ = type_;
357        message.bodies.insert(String::from(lang), Body(String::from(text)));
358        let message = message.into();
359        self.sender_tx.unbounded_send(Packet::Stanza(message))
360            .unwrap();
361    }
362}