lib.rs

  1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
  2//
  3// This Source Code Form is subject to the terms of the Mozilla Public
  4// License, v. 2.0. If a copy of the MPL was not distributed with this
  5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7use std::str::FromStr;
  8use futures::{Future,Stream, Sink, sync::mpsc};
  9use tokio_xmpp::{
 10    Client as TokioXmppClient,
 11    Event as TokioXmppEvent,
 12    Packet,
 13};
 14use xmpp_parsers::{
 15    caps::{compute_disco, hash_caps, Caps},
 16    disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
 17    hashes::Algo,
 18    iq::{Iq, IqType},
 19    message::{Message, MessageType, Body},
 20    muc::{
 21        Muc,
 22        user::{MucUser, Status},
 23    },
 24    ns,
 25    presence::{Presence, Type as PresenceType},
 26    pubsub::{
 27        event::PubSubEvent,
 28        pubsub::PubSub,
 29    },
 30    roster::{Roster, Item as RosterItem},
 31    stanza_error::{StanzaError, ErrorType, DefinedCondition},
 32    Jid, JidParseError, TryFrom,
 33};
 34
 35mod avatar;
 36
 37#[derive(Debug)]
 38pub enum ClientType {
 39    Bot,
 40    Pc,
 41}
 42
 43impl Default for ClientType {
 44    fn default() -> Self {
 45        ClientType::Bot
 46    }
 47}
 48
 49impl ToString for ClientType {
 50    fn to_string(&self) -> String {
 51        String::from(
 52            match self {
 53                ClientType::Bot => "bot",
 54                ClientType::Pc => "pc",
 55            }
 56        )
 57    }
 58}
 59
 60#[derive(PartialEq)]
 61pub enum ClientFeature {
 62    Avatars,
 63    ContactList,
 64}
 65
 66pub enum Event {
 67    Online,
 68    Disconnected,
 69    ContactAdded(RosterItem),
 70    ContactRemoved(RosterItem),
 71    ContactChanged(RosterItem),
 72    AvatarRetrieved(Jid, String),
 73    RoomJoined(Jid),
 74}
 75
 76#[derive(Default)]
 77pub struct ClientBuilder<'a> {
 78    jid: &'a str,
 79    password: &'a str,
 80    website: String,
 81    disco: (ClientType, String),
 82    features: Vec<ClientFeature>,
 83}
 84
 85impl ClientBuilder<'_> {
 86    pub fn new<'a>(jid: &'a str, password: &'a str) -> ClientBuilder<'a> {
 87        ClientBuilder {
 88            jid,
 89            password,
 90            website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
 91            disco: (ClientType::default(), String::from("tokio-xmpp")),
 92            features: vec![],
 93        }
 94    }
 95
 96    pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
 97        self.disco = (type_, String::from(name));
 98        self
 99    }
100
101    pub fn set_website(mut self, url: &str) -> Self {
102        self.website = String::from(url);
103        self
104    }
105
106    pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
107        self.features.push(feature);
108        self
109    }
110
111    fn make_disco(&self) -> DiscoInfoResult {
112        let identities = vec![Identity::new("client", self.disco.0.to_string(),
113                                            "en", self.disco.1.to_string())];
114        let mut features = vec![
115            Feature::new(ns::DISCO_INFO),
116        ];
117        if self.features.contains(&ClientFeature::Avatars) {
118            features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
119        }
120        DiscoInfoResult {
121            node: None,
122            identities,
123            features,
124            extensions: vec![],
125        }
126    }
127
128    fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
129        let caps_data = compute_disco(disco);
130        let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
131        let caps = Caps::new(node, hash);
132
133        let mut presence = Presence::new(PresenceType::None);
134        presence.add_payload(caps);
135        presence
136    }
137
138    pub fn build(self, mut app_tx: mpsc::UnboundedSender<Event>) -> Result<(Box<Future<Item = (), Error = ()>>, Client), JidParseError> {
139        let disco = self.make_disco();
140        let node = self.website;
141        let (sender_tx, sender_rx) = mpsc::unbounded();
142
143        let client = TokioXmppClient::new(self.jid, self.password)?;
144        let (sink, stream) = client.split();
145
146        let reader = {
147            let mut sender_tx = sender_tx.clone();
148            let jid = self.jid.to_owned();
149            stream.for_each(move |event| {
150                // Helper function to send an iq error.
151                let send_error = |to, id, type_, condition, text: &str| {
152                    let error = StanzaError::new(type_, condition, "en", text);
153                    let iq = Iq::from_error(id, error)
154                        .with_to(to)
155                        .into();
156                    sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
157                };
158
159                match event {
160                    TokioXmppEvent::Online => {
161                        let presence = ClientBuilder::make_initial_presence(&disco, &node).into();
162                        let packet = Packet::Stanza(presence);
163                        sender_tx.unbounded_send(packet)
164                            .unwrap();
165                        app_tx.unbounded_send(Event::Online).unwrap();
166                        let iq = Iq::from_get("roster", Roster { ver: None, items: vec![] })
167                            .into();
168                        sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
169                    }
170                    TokioXmppEvent::Disconnected => {
171                        app_tx.unbounded_send(Event::Disconnected).unwrap();
172                    }
173                    TokioXmppEvent::Stanza(stanza) => {
174                        if stanza.is("iq", "jabber:client") {
175                            let iq = Iq::try_from(stanza).unwrap();
176                            if let IqType::Get(payload) = iq.payload {
177                                if payload.is("query", ns::DISCO_INFO) {
178                                    let query = DiscoInfoQuery::try_from(payload);
179                                    match query {
180                                        Ok(query) => {
181                                            let mut disco_info = disco.clone();
182                                            disco_info.node = query.node;
183                                            let iq = Iq::from_result(iq.id, Some(disco_info))
184                                                .with_to(iq.from.unwrap())
185                                                .into();
186                                            sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
187                                        },
188                                        Err(err) => {
189                                            send_error(iq.from.unwrap(), iq.id, ErrorType::Modify, DefinedCondition::BadRequest, &format!("{}", err));
190                                        },
191                                    }
192                                } else {
193                                    // We MUST answer unhandled get iqs with a service-unavailable error.
194                                    send_error(iq.from.unwrap(), iq.id, ErrorType::Cancel, DefinedCondition::ServiceUnavailable, "No handler defined for this kind of iq.");
195                                }
196                            } else if let IqType::Result(Some(payload)) = iq.payload {
197                                if payload.is("query", ns::ROSTER) {
198                                    let roster = Roster::try_from(payload).unwrap();
199                                    for item in roster.items.into_iter() {
200                                        app_tx.unbounded_send(Event::ContactAdded(item)).unwrap();
201                                    }
202                                } else if payload.is("pubsub", ns::PUBSUB) {
203                                    let pubsub = PubSub::try_from(payload).unwrap();
204                                    let from =
205                                        iq.from.clone().unwrap_or(Jid::from_str(&jid).unwrap());
206                                    if let PubSub::Items(items) = pubsub {
207                                        if items.node.0 == ns::AVATAR_DATA {
208                                            avatar::handle_data_pubsub_iq(&from, &mut app_tx, items);
209                                        }
210                                    }
211                                }
212                            } else if let IqType::Set(_) = iq.payload {
213                                // We MUST answer unhandled set iqs with a service-unavailable error.
214                                send_error(iq.from.unwrap(), iq.id, ErrorType::Cancel, DefinedCondition::ServiceUnavailable, "No handler defined for this kind of iq.");
215                            }
216                        } else if stanza.is("message", "jabber:client") {
217                            let message = Message::try_from(stanza).unwrap();
218                            let from = message.from.clone().unwrap();
219                            for child in message.payloads {
220                                if child.is("event", ns::PUBSUB_EVENT) {
221                                    let event = PubSubEvent::try_from(child).unwrap();
222                                    if let PubSubEvent::PublishedItems { node, items } = event {
223                                        if node.0 == ns::AVATAR_METADATA {
224                                            avatar::handle_metadata_pubsub_event(&from, &mut sender_tx, items);
225                                        }
226                                    }
227                                }
228                            }
229                        } else if stanza.is("presence", "jabber:client") {
230                            let presence = Presence::try_from(stanza).unwrap();
231                            let from = presence.from.clone().unwrap();
232                            for payload in presence.payloads.into_iter() {
233                                let muc_user = match MucUser::try_from(payload) {
234                                    Ok(muc_user) => muc_user,
235                                    _ => continue
236                                };
237                                for status in muc_user.status.into_iter() {
238                                    if status == Status::SelfPresence {
239                                        app_tx.unbounded_send(Event::RoomJoined(from.clone())).unwrap();
240                                        break;
241                                    }
242                                }
243                            }
244                        } else if stanza.is("error", "http://etherx.jabber.org/streams") {
245                            println!("Received a fatal stream error: {}", String::from(&stanza));
246                        } else {
247                            panic!("Unknown stanza: {}", String::from(&stanza));
248                        }
249                    }
250                }
251
252                Ok(())
253            })
254        };
255
256        let sender = sender_rx
257            .map_err(|e| panic!("Sink error: {:?}", e))
258            .forward(sink)
259            .map(|(rx, mut sink)| {
260                drop(rx);
261                let _ = sink.close();
262            });
263
264        let future = reader.select(sender)
265            .map(|_| ())
266            .map_err(|_| ());
267
268        let agent = Client {
269            sender_tx,
270        };
271
272        Ok((Box::new(future), agent))
273    }
274}
275
276pub struct Client {
277    sender_tx: mpsc::UnboundedSender<Packet>,
278}
279
280impl Client {
281    pub fn join_room(&mut self, room: Jid, lang: &str, status: &str) {
282        let mut presence = Presence::new(PresenceType::None)
283            .with_to(Some(room))
284            .with_payloads(vec![Muc::new().into()]);
285        presence.set_status(String::from(lang), String::from(status));
286        let presence = presence.into();
287        self.sender_tx.unbounded_send(Packet::Stanza(presence))
288            .unwrap();
289    }
290
291    pub fn send_message(&mut self, recipient: Jid, type_: MessageType, lang: &str, text: &str) {
292        let mut message = Message::new(Some(recipient));
293        message.type_ = type_;
294        message.bodies.insert(String::from(lang), Body(String::from(text)));
295        let message = message.into();
296        self.sender_tx.unbounded_send(Packet::Stanza(message))
297            .unwrap();
298    }
299}