lib.rs

  1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
  2//
  3// This Source Code Form is subject to the terms of the Mozilla Public
  4// License, v. 2.0. If a copy of the MPL was not distributed with this
  5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7use std::str::FromStr;
  8use futures::{Future,Stream, Sink, sync::mpsc};
  9use tokio_xmpp::{
 10    Client as TokioXmppClient,
 11    Event as TokioXmppEvent,
 12    Packet,
 13};
 14use xmpp_parsers::{
 15    caps::{compute_disco, hash_caps, Caps},
 16    disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
 17    hashes::Algo,
 18    iq::{Iq, IqType},
 19    message::{Message, MessageType, Body},
 20    muc::{
 21        Muc,
 22        user::{MucUser, Status},
 23    },
 24    ns,
 25    presence::{Presence, Type as PresenceType},
 26    pubsub::{
 27        event::PubSubEvent,
 28        pubsub::PubSub,
 29    },
 30    roster::{Roster, Item as RosterItem},
 31    stanza_error::{StanzaError, ErrorType, DefinedCondition},
 32    Jid, JidParseError, TryFrom,
 33};
 34
 35mod avatar;
 36
 37pub type Error = tokio_xmpp::Error;
 38
 39#[derive(Debug)]
 40pub enum ClientType {
 41    Bot,
 42    Pc,
 43}
 44
 45impl Default for ClientType {
 46    fn default() -> Self {
 47        ClientType::Bot
 48    }
 49}
 50
 51impl ToString for ClientType {
 52    fn to_string(&self) -> String {
 53        String::from(
 54            match self {
 55                ClientType::Bot => "bot",
 56                ClientType::Pc => "pc",
 57            }
 58        )
 59    }
 60}
 61
 62#[derive(PartialEq)]
 63pub enum ClientFeature {
 64    Avatars,
 65    ContactList,
 66}
 67
 68pub enum Event {
 69    Online,
 70    Disconnected,
 71    ContactAdded(RosterItem),
 72    ContactRemoved(RosterItem),
 73    ContactChanged(RosterItem),
 74    AvatarRetrieved(Jid, String),
 75    RoomJoined(Jid),
 76}
 77
 78#[derive(Default)]
 79pub struct ClientBuilder<'a> {
 80    jid: &'a str,
 81    password: &'a str,
 82    website: String,
 83    disco: (ClientType, String),
 84    features: Vec<ClientFeature>,
 85}
 86
 87impl ClientBuilder<'_> {
 88    pub fn new<'a>(jid: &'a str, password: &'a str) -> ClientBuilder<'a> {
 89        ClientBuilder {
 90            jid,
 91            password,
 92            website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
 93            disco: (ClientType::default(), String::from("tokio-xmpp")),
 94            features: vec![],
 95        }
 96    }
 97
 98    pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
 99        self.disco = (type_, String::from(name));
100        self
101    }
102
103    pub fn set_website(mut self, url: &str) -> Self {
104        self.website = String::from(url);
105        self
106    }
107
108    pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
109        self.features.push(feature);
110        self
111    }
112
113    fn make_disco(&self) -> DiscoInfoResult {
114        let identities = vec![Identity::new("client", self.disco.0.to_string(),
115                                            "en", self.disco.1.to_string())];
116        let mut features = vec![
117            Feature::new(ns::DISCO_INFO),
118        ];
119        if self.features.contains(&ClientFeature::Avatars) {
120            features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
121        }
122        DiscoInfoResult {
123            node: None,
124            identities,
125            features,
126            extensions: vec![],
127        }
128    }
129
130    fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
131        let caps_data = compute_disco(disco);
132        let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
133        let caps = Caps::new(node, hash);
134
135        let mut presence = Presence::new(PresenceType::None);
136        presence.add_payload(caps);
137        presence
138    }
139
140    pub fn build(
141        self,
142    ) -> Result<(Agent, impl Stream<Item = Event, Error = tokio_xmpp::Error>), JidParseError> {
143        let client = TokioXmppClient::new(self.jid, self.password)?;
144        Ok(self.build_impl(client))
145    }
146
147    // This function is meant to be used for testing build
148    pub(crate) fn build_impl<S>(
149        self,
150        stream: S,
151    ) -> (Agent, impl Stream<Item = Event, Error = tokio_xmpp::Error>)
152    where
153        S: Stream<Item = tokio_xmpp::Event, Error = tokio_xmpp::Error>
154            + Sink<SinkItem = tokio_xmpp::Packet, SinkError = tokio_xmpp::Error>,
155    {
156        let disco = self.make_disco();
157        let node = self.website;
158        let (sender_tx, sender_rx) = mpsc::unbounded();
159
160        let client = stream;
161        let (sink, stream) = client.split();
162
163        let reader = {
164            let mut sender_tx = sender_tx.clone();
165            let jid = self.jid.to_owned();
166            stream.map(move |event| {
167                // Helper function to send an iq error.
168                let mut events = Vec::new();
169                let send_error = |to, id, type_, condition, text: &str| {
170                    let error = StanzaError::new(type_, condition, "en", text);
171                    let iq = Iq::from_error(id, error)
172                        .with_to(to)
173                        .into();
174                    sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
175                };
176
177                match event {
178                    TokioXmppEvent::Online => {
179                        let presence = ClientBuilder::make_initial_presence(&disco, &node).into();
180                        let packet = Packet::Stanza(presence);
181                        sender_tx.unbounded_send(packet)
182                            .unwrap();
183                        events.push(Event::Online);
184                        let iq = Iq::from_get("roster", Roster { ver: None, items: vec![] })
185                            .into();
186                        sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
187                    }
188                    TokioXmppEvent::Disconnected => {
189                        events.push(Event::Disconnected);
190                    }
191                    TokioXmppEvent::Stanza(stanza) => {
192                        if stanza.is("iq", "jabber:client") {
193                            let iq = Iq::try_from(stanza).unwrap();
194                            if let IqType::Get(payload) = iq.payload {
195                                if payload.is("query", ns::DISCO_INFO) {
196                                    let query = DiscoInfoQuery::try_from(payload);
197                                    match query {
198                                        Ok(query) => {
199                                            let mut disco_info = disco.clone();
200                                            disco_info.node = query.node;
201                                            let iq = Iq::from_result(iq.id, Some(disco_info))
202                                                .with_to(iq.from.unwrap())
203                                                .into();
204                                            sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
205                                        },
206                                        Err(err) => {
207                                            send_error(iq.from.unwrap(), iq.id, ErrorType::Modify, DefinedCondition::BadRequest, &format!("{}", err));
208                                        },
209                                    }
210                                } else {
211                                    // We MUST answer unhandled get iqs with a service-unavailable error.
212                                    send_error(iq.from.unwrap(), iq.id, ErrorType::Cancel, DefinedCondition::ServiceUnavailable, "No handler defined for this kind of iq.");
213                                }
214                            } else if let IqType::Result(Some(payload)) = iq.payload {
215                                if payload.is("query", ns::ROSTER) {
216                                    let roster = Roster::try_from(payload).unwrap();
217                                    for item in roster.items.into_iter() {
218                                        events.push(Event::ContactAdded(item));
219                                    }
220                                } else if payload.is("pubsub", ns::PUBSUB) {
221                                    let pubsub = PubSub::try_from(payload).unwrap();
222                                    let from =
223                                        iq.from.clone().unwrap_or(Jid::from_str(&jid).unwrap());
224                                    if let PubSub::Items(items) = pubsub {
225                                        if items.node.0 == ns::AVATAR_DATA {
226                                            let new_events = avatar::handle_data_pubsub_iq(&from, &items);
227                                            events.extend(new_events);
228                                        }
229                                    }
230                                }
231                            } else if let IqType::Set(_) = iq.payload {
232                                // We MUST answer unhandled set iqs with a service-unavailable error.
233                                send_error(iq.from.unwrap(), iq.id, ErrorType::Cancel, DefinedCondition::ServiceUnavailable, "No handler defined for this kind of iq.");
234                            }
235                        } else if stanza.is("message", "jabber:client") {
236                            let message = Message::try_from(stanza).unwrap();
237                            let from = message.from.clone().unwrap();
238                            for child in message.payloads {
239                                if child.is("event", ns::PUBSUB_EVENT) {
240                                    let event = PubSubEvent::try_from(child).unwrap();
241                                    if let PubSubEvent::PublishedItems { node, items } = event {
242                                        if node.0 == ns::AVATAR_METADATA {
243                                            avatar::handle_metadata_pubsub_event(&from, &mut sender_tx, items);
244                                        }
245                                    }
246                                }
247                            }
248                        } else if stanza.is("presence", "jabber:client") {
249                            let presence = Presence::try_from(stanza).unwrap();
250                            let from = presence.from.clone().unwrap();
251                            for payload in presence.payloads.into_iter() {
252                                let muc_user = match MucUser::try_from(payload) {
253                                    Ok(muc_user) => muc_user,
254                                    _ => continue
255                                };
256                                for status in muc_user.status.into_iter() {
257                                    if status == Status::SelfPresence {
258                                        events.push(Event::RoomJoined(from.clone()));
259                                        break;
260                                    }
261                                }
262                            }
263                        } else if stanza.is("error", "http://etherx.jabber.org/streams") {
264                            println!("Received a fatal stream error: {}", String::from(&stanza));
265                        } else {
266                            panic!("Unknown stanza: {}", String::from(&stanza));
267                        }
268                    }
269                }
270
271                futures::stream::iter_ok(events)
272            })
273            .flatten()
274        };
275
276        let sender = sender_rx
277            .map_err(|e| panic!("Sink error: {:?}", e))
278            .forward(sink)
279            .map(|(rx, mut sink)| {
280                drop(rx);
281                let _ = sink.close();
282                None
283            });
284
285        // TODO is this correct?
286        // Some(Error) means a real error
287        // None means the end of the sender stream and can be ignored
288        let future = reader
289            .map(Some)
290            .select(sender.into_stream())
291            .filter_map(|x| x);
292
293        let agent = Agent { sender_tx };
294
295        (agent, future)
296    }
297}
298
299pub struct Client {
300    sender_tx: mpsc::UnboundedSender<Packet>,
301    stream: Box<Stream<Item = Event, Error = Error>>,
302}
303
304impl Client {
305    pub fn get_agent(&self) -> Agent {
306        Agent {
307            sender_tx: self.sender_tx.clone(),
308        }
309    }
310
311    pub fn listen(self) -> Box<Stream<Item = Event, Error = Error>> {
312        self.stream
313    }
314}
315
316pub struct Agent {
317    sender_tx: mpsc::UnboundedSender<Packet>,
318}
319
320impl Agent {
321    pub fn join_room(&mut self, room: Jid, lang: &str, status: &str) {
322        let mut presence = Presence::new(PresenceType::None)
323            .with_to(Some(room))
324            .with_payloads(vec![Muc::new().into()]);
325        presence.set_status(String::from(lang), String::from(status));
326        let presence = presence.into();
327        self.sender_tx.unbounded_send(Packet::Stanza(presence))
328            .unwrap();
329    }
330
331    pub fn send_message(&mut self, recipient: Jid, type_: MessageType, lang: &str, text: &str) {
332        let mut message = Message::new(Some(recipient));
333        message.type_ = type_;
334        message.bodies.insert(String::from(lang), Body(String::from(text)));
335        let message = message.into();
336        self.sender_tx.unbounded_send(Packet::Stanza(message))
337            .unwrap();
338    }
339}