lib.rs

  1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
  2//
  3// This Source Code Form is subject to the terms of the Mozilla Public
  4// License, v. 2.0. If a copy of the MPL was not distributed with this
  5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7#![deny(bare_trait_objects)]
  8
  9use std::str::FromStr;
 10use futures::{Future,Stream, Sink, sync::mpsc};
 11use tokio_xmpp::{
 12    Client as TokioXmppClient,
 13    Event as TokioXmppEvent,
 14    Packet,
 15};
 16use xmpp_parsers::{
 17    caps::{compute_disco, hash_caps, Caps},
 18    disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
 19    hashes::Algo,
 20    iq::{Iq, IqType},
 21    message::{Message, MessageType, Body},
 22    muc::{
 23        Muc,
 24        user::{MucUser, Status},
 25    },
 26    ns,
 27    presence::{Presence, Type as PresenceType},
 28    pubsub::{
 29        event::PubSubEvent,
 30        pubsub::PubSub,
 31    },
 32    roster::{Roster, Item as RosterItem},
 33    stanza_error::{StanzaError, ErrorType, DefinedCondition},
 34    Jid, JidParseError, TryFrom,
 35};
 36
 37mod avatar;
 38
 39pub type Error = tokio_xmpp::Error;
 40
 41#[derive(Debug)]
 42pub enum ClientType {
 43    Bot,
 44    Pc,
 45}
 46
 47impl Default for ClientType {
 48    fn default() -> Self {
 49        ClientType::Bot
 50    }
 51}
 52
 53impl ToString for ClientType {
 54    fn to_string(&self) -> String {
 55        String::from(
 56            match self {
 57                ClientType::Bot => "bot",
 58                ClientType::Pc => "pc",
 59            }
 60        )
 61    }
 62}
 63
 64#[derive(PartialEq)]
 65pub enum ClientFeature {
 66    Avatars,
 67    ContactList,
 68}
 69
 70pub enum Event {
 71    Online,
 72    Disconnected,
 73    ContactAdded(RosterItem),
 74    ContactRemoved(RosterItem),
 75    ContactChanged(RosterItem),
 76    AvatarRetrieved(Jid, String),
 77    RoomJoined(Jid),
 78}
 79
 80#[derive(Default)]
 81pub struct ClientBuilder<'a> {
 82    jid: &'a str,
 83    password: &'a str,
 84    website: String,
 85    disco: (ClientType, String),
 86    features: Vec<ClientFeature>,
 87}
 88
 89impl ClientBuilder<'_> {
 90    pub fn new<'a>(jid: &'a str, password: &'a str) -> ClientBuilder<'a> {
 91        ClientBuilder {
 92            jid,
 93            password,
 94            website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
 95            disco: (ClientType::default(), String::from("tokio-xmpp")),
 96            features: vec![],
 97        }
 98    }
 99
100    pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
101        self.disco = (type_, String::from(name));
102        self
103    }
104
105    pub fn set_website(mut self, url: &str) -> Self {
106        self.website = String::from(url);
107        self
108    }
109
110    pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
111        self.features.push(feature);
112        self
113    }
114
115    fn make_disco(&self) -> DiscoInfoResult {
116        let identities = vec![Identity::new("client", self.disco.0.to_string(),
117                                            "en", self.disco.1.to_string())];
118        let mut features = vec![
119            Feature::new(ns::DISCO_INFO),
120        ];
121        if self.features.contains(&ClientFeature::Avatars) {
122            features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
123        }
124        DiscoInfoResult {
125            node: None,
126            identities,
127            features,
128            extensions: vec![],
129        }
130    }
131
132    fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
133        let caps_data = compute_disco(disco);
134        let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
135        let caps = Caps::new(node, hash);
136
137        let mut presence = Presence::new(PresenceType::None);
138        presence.add_payload(caps);
139        presence
140    }
141
142    pub fn build(
143        self,
144    ) -> Result<(Agent, impl Stream<Item = Event, Error = tokio_xmpp::Error>), JidParseError> {
145        let client = TokioXmppClient::new(self.jid, self.password)?;
146        Ok(self.build_impl(client))
147    }
148
149    // This function is meant to be used for testing build
150    pub(crate) fn build_impl<S>(
151        self,
152        stream: S,
153    ) -> (Agent, impl Stream<Item = Event, Error = tokio_xmpp::Error>)
154    where
155        S: Stream<Item = tokio_xmpp::Event, Error = tokio_xmpp::Error>
156            + Sink<SinkItem = tokio_xmpp::Packet, SinkError = tokio_xmpp::Error>,
157    {
158        let disco = self.make_disco();
159        let node = self.website;
160        let (sender_tx, sender_rx) = mpsc::unbounded();
161
162        let client = stream;
163        let (sink, stream) = client.split();
164
165        let reader = {
166            let mut sender_tx = sender_tx.clone();
167            let jid = self.jid.to_owned();
168            stream.map(move |event| {
169                // Helper function to send an iq error.
170                let mut events = Vec::new();
171                let send_error = |to, id, type_, condition, text: &str| {
172                    let error = StanzaError::new(type_, condition, "en", text);
173                    let iq = Iq::from_error(id, error)
174                        .with_to(to)
175                        .into();
176                    sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
177                };
178
179                match event {
180                    TokioXmppEvent::Online => {
181                        let presence = ClientBuilder::make_initial_presence(&disco, &node).into();
182                        let packet = Packet::Stanza(presence);
183                        sender_tx.unbounded_send(packet)
184                            .unwrap();
185                        events.push(Event::Online);
186                        // TODO: only send this when the ContactList feature is enabled.
187                        let iq = Iq::from_get("roster", Roster { ver: None, items: vec![] })
188                            .into();
189                        sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
190                    }
191                    TokioXmppEvent::Disconnected => {
192                        events.push(Event::Disconnected);
193                    }
194                    TokioXmppEvent::Stanza(stanza) => {
195                        if stanza.is("iq", "jabber:client") {
196                            let iq = Iq::try_from(stanza).unwrap();
197                            if let IqType::Get(payload) = iq.payload {
198                                if payload.is("query", ns::DISCO_INFO) {
199                                    let query = DiscoInfoQuery::try_from(payload);
200                                    match query {
201                                        Ok(query) => {
202                                            let mut disco_info = disco.clone();
203                                            disco_info.node = query.node;
204                                            let iq = Iq::from_result(iq.id, Some(disco_info))
205                                                .with_to(iq.from.unwrap())
206                                                .into();
207                                            sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
208                                        },
209                                        Err(err) => {
210                                            send_error(iq.from.unwrap(), iq.id, ErrorType::Modify, DefinedCondition::BadRequest, &format!("{}", err));
211                                        },
212                                    }
213                                } else {
214                                    // We MUST answer unhandled get iqs with a service-unavailable error.
215                                    send_error(iq.from.unwrap(), iq.id, ErrorType::Cancel, DefinedCondition::ServiceUnavailable, "No handler defined for this kind of iq.");
216                                }
217                            } else if let IqType::Result(Some(payload)) = iq.payload {
218                                // TODO: move private iqs like this one somewhere else, for
219                                // security reasons.
220                                if payload.is("query", ns::ROSTER) && iq.from.is_none() {
221                                    let roster = Roster::try_from(payload).unwrap();
222                                    for item in roster.items.into_iter() {
223                                        events.push(Event::ContactAdded(item));
224                                    }
225                                } else if payload.is("pubsub", ns::PUBSUB) {
226                                    let pubsub = PubSub::try_from(payload).unwrap();
227                                    let from =
228                                        iq.from.clone().unwrap_or(Jid::from_str(&jid).unwrap());
229                                    if let PubSub::Items(items) = pubsub {
230                                        if items.node.0 == ns::AVATAR_DATA {
231                                            let new_events = avatar::handle_data_pubsub_iq(&from, &items);
232                                            events.extend(new_events);
233                                        }
234                                    }
235                                }
236                            } else if let IqType::Set(_) = iq.payload {
237                                // We MUST answer unhandled set iqs with a service-unavailable error.
238                                send_error(iq.from.unwrap(), iq.id, ErrorType::Cancel, DefinedCondition::ServiceUnavailable, "No handler defined for this kind of iq.");
239                            }
240                        } else if stanza.is("message", "jabber:client") {
241                            let message = Message::try_from(stanza).unwrap();
242                            let from = message.from.clone().unwrap();
243                            for child in message.payloads {
244                                if child.is("event", ns::PUBSUB_EVENT) {
245                                    let event = PubSubEvent::try_from(child).unwrap();
246                                    if let PubSubEvent::PublishedItems { node, items } = event {
247                                        if node.0 == ns::AVATAR_METADATA {
248                                            avatar::handle_metadata_pubsub_event(&from, &mut sender_tx, items);
249                                        }
250                                    }
251                                }
252                            }
253                        } else if stanza.is("presence", "jabber:client") {
254                            let presence = Presence::try_from(stanza).unwrap();
255                            let from = presence.from.clone().unwrap();
256                            for payload in presence.payloads.into_iter() {
257                                let muc_user = match MucUser::try_from(payload) {
258                                    Ok(muc_user) => muc_user,
259                                    _ => continue
260                                };
261                                for status in muc_user.status.into_iter() {
262                                    if status == Status::SelfPresence {
263                                        events.push(Event::RoomJoined(from.clone()));
264                                        break;
265                                    }
266                                }
267                            }
268                        } else if stanza.is("error", "http://etherx.jabber.org/streams") {
269                            println!("Received a fatal stream error: {}", String::from(&stanza));
270                        } else {
271                            panic!("Unknown stanza: {}", String::from(&stanza));
272                        }
273                    }
274                }
275
276                futures::stream::iter_ok(events)
277            })
278            .flatten()
279        };
280
281        let sender = sender_rx
282            .map_err(|e| panic!("Sink error: {:?}", e))
283            .forward(sink)
284            .map(|(rx, mut sink)| {
285                drop(rx);
286                let _ = sink.close();
287                None
288            });
289
290        // TODO is this correct?
291        // Some(Error) means a real error
292        // None means the end of the sender stream and can be ignored
293        let future = reader
294            .map(Some)
295            .select(sender.into_stream())
296            .filter_map(|x| x);
297
298        let agent = Agent { sender_tx };
299
300        (agent, future)
301    }
302}
303
304pub struct Client {
305    sender_tx: mpsc::UnboundedSender<Packet>,
306    stream: Box<dyn Stream<Item = Event, Error = Error>>,
307}
308
309impl Client {
310    pub fn get_agent(&self) -> Agent {
311        Agent {
312            sender_tx: self.sender_tx.clone(),
313        }
314    }
315
316    pub fn listen(self) -> Box<dyn Stream<Item = Event, Error = Error>> {
317        self.stream
318    }
319}
320
321pub struct Agent {
322    sender_tx: mpsc::UnboundedSender<Packet>,
323}
324
325impl Agent {
326    pub fn join_room(&mut self, room: Jid, lang: &str, status: &str) {
327        let mut presence = Presence::new(PresenceType::None)
328            .with_to(Some(room))
329            .with_payloads(vec![Muc::new().into()]);
330        presence.set_status(String::from(lang), String::from(status));
331        let presence = presence.into();
332        self.sender_tx.unbounded_send(Packet::Stanza(presence))
333            .unwrap();
334    }
335
336    pub fn send_message(&mut self, recipient: Jid, type_: MessageType, lang: &str, text: &str) {
337        let mut message = Message::new(Some(recipient));
338        message.type_ = type_;
339        message.bodies.insert(String::from(lang), Body(String::from(text)));
340        let message = message.into();
341        self.sender_tx.unbounded_send(Packet::Stanza(message))
342            .unwrap();
343    }
344}