1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#![deny(bare_trait_objects)]
8
9use std::str::FromStr;
10use std::rc::Rc;
11use std::cell::RefCell;
12use std::convert::TryFrom;
13use futures::{Future,Stream, Sink, sync::mpsc};
14use tokio_xmpp::{
15 Client as TokioXmppClient,
16 Event as TokioXmppEvent,
17 Packet,
18};
19use xmpp_parsers::{
20 bookmarks2::Conference,
21 caps::{compute_disco, hash_caps, Caps},
22 disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
23 hashes::Algo,
24 iq::{Iq, IqType},
25 message::{Message, MessageType, Body},
26 muc::{
27 Muc,
28 user::{MucUser, Status},
29 },
30 ns,
31 presence::{Presence, Type as PresenceType},
32 pubsub::pubsub::{PubSub, Items},
33 roster::{Roster, Item as RosterItem},
34 stanza_error::{StanzaError, ErrorType, DefinedCondition},
35 Jid, BareJid, FullJid, JidParseError,
36};
37#[macro_use]
38extern crate log;
39
40mod pubsub;
41
42pub type Error = tokio_xmpp::Error;
43
44#[derive(Debug)]
45pub enum ClientType {
46 Bot,
47 Pc,
48}
49
50impl Default for ClientType {
51 fn default() -> Self {
52 ClientType::Bot
53 }
54}
55
56impl ToString for ClientType {
57 fn to_string(&self) -> String {
58 String::from(
59 match self {
60 ClientType::Bot => "bot",
61 ClientType::Pc => "pc",
62 }
63 )
64 }
65}
66
67#[derive(PartialEq)]
68pub enum ClientFeature {
69 #[cfg(feature = "avatars")]
70 Avatars,
71 ContactList,
72 JoinRooms,
73}
74
75#[derive(Debug)]
76pub enum Event {
77 Online,
78 Disconnected,
79 ContactAdded(RosterItem),
80 ContactRemoved(RosterItem),
81 ContactChanged(RosterItem),
82 #[cfg(feature = "avatars")]
83 AvatarRetrieved(Jid, String),
84 JoinRoom(BareJid, Conference),
85 LeaveRoom(BareJid),
86 LeaveAllRooms,
87 RoomJoined(BareJid),
88 RoomLeft(BareJid),
89}
90
91#[derive(Default)]
92pub struct ClientBuilder<'a> {
93 jid: &'a str,
94 password: &'a str,
95 website: String,
96 default_nick: String,
97 disco: (ClientType, String),
98 features: Vec<ClientFeature>,
99}
100
101impl ClientBuilder<'_> {
102 pub fn new<'a>(jid: &'a str, password: &'a str) -> ClientBuilder<'a> {
103 ClientBuilder {
104 jid,
105 password,
106 website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
107 default_nick: String::from("xmpp-rs"),
108 disco: (ClientType::default(), String::from("tokio-xmpp")),
109 features: vec![],
110 }
111 }
112
113 pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
114 self.disco = (type_, String::from(name));
115 self
116 }
117
118 pub fn set_website(mut self, url: &str) -> Self {
119 self.website = String::from(url);
120 self
121 }
122
123 pub fn set_default_nick(mut self, nick: &str) -> Self {
124 self.default_nick = String::from(nick);
125 self
126 }
127
128 pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
129 self.features.push(feature);
130 self
131 }
132
133 fn make_disco(&self) -> DiscoInfoResult {
134 let identities = vec![Identity::new("client", self.disco.0.to_string(),
135 "en", self.disco.1.to_string())];
136 let mut features = vec![
137 Feature::new(ns::DISCO_INFO),
138 ];
139 #[cfg(feature = "avatars")]
140 {
141 if self.features.contains(&ClientFeature::Avatars) {
142 features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
143 }
144 }
145 if self.features.contains(&ClientFeature::JoinRooms) {
146 features.push(Feature::new(format!("{}+notify", ns::BOOKMARKS2)));
147 }
148 DiscoInfoResult {
149 node: None,
150 identities,
151 features,
152 extensions: vec![],
153 }
154 }
155
156 fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
157 let caps_data = compute_disco(disco);
158 let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
159 let caps = Caps::new(node, hash);
160
161 let mut presence = Presence::new(PresenceType::None);
162 presence.add_payload(caps);
163 presence
164 }
165
166 pub fn build(
167 self,
168 ) -> Result<(Agent, impl Stream<Item = Event, Error = tokio_xmpp::Error>), JidParseError> {
169 let client = TokioXmppClient::new(self.jid, self.password)?;
170 Ok(self.build_impl(client))
171 }
172
173 // This function is meant to be used for testing build
174 pub(crate) fn build_impl<S>(
175 self,
176 stream: S,
177 ) -> (Agent, impl Stream<Item = Event, Error = tokio_xmpp::Error>)
178 where
179 S: Stream<Item = tokio_xmpp::Event, Error = tokio_xmpp::Error>
180 + Sink<SinkItem = tokio_xmpp::Packet, SinkError = tokio_xmpp::Error>,
181 {
182 let disco = self.make_disco();
183 let node = self.website;
184 let (sender_tx, sender_rx) = mpsc::unbounded();
185
186 let client = stream;
187 let (sink, stream) = client.split();
188
189 let reader = {
190 let mut sender_tx = sender_tx.clone();
191 let jid = self.jid.to_owned();
192 stream.map(move |event| {
193 // Helper function to send an iq error.
194 let mut events = Vec::new();
195 let send_error = |to, id, type_, condition, text: &str| {
196 let error = StanzaError::new(type_, condition, "en", text);
197 let iq = Iq::from_error(id, error)
198 .with_to(to)
199 .into();
200 sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
201 };
202
203 match event {
204 TokioXmppEvent::Online => {
205 let presence = ClientBuilder::make_initial_presence(&disco, &node).into();
206 let packet = Packet::Stanza(presence);
207 sender_tx.unbounded_send(packet)
208 .unwrap();
209 events.push(Event::Online);
210 // TODO: only send this when the ContactList feature is enabled.
211 let iq = Iq::from_get("roster", Roster { ver: None, items: vec![] })
212 .into();
213 sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
214 // TODO: only send this when the JoinRooms feature is enabled.
215 let iq = Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2)))
216 .into();
217 sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
218 }
219 TokioXmppEvent::Disconnected => {
220 events.push(Event::Disconnected);
221 }
222 TokioXmppEvent::Stanza(stanza) => {
223 if stanza.is("iq", "jabber:client") {
224 let iq = Iq::try_from(stanza).unwrap();
225 let from =
226 iq.from.clone().unwrap_or_else(|| Jid::from_str(&jid).unwrap());
227 if let IqType::Get(payload) = iq.payload {
228 if payload.is("query", ns::DISCO_INFO) {
229 let query = DiscoInfoQuery::try_from(payload);
230 match query {
231 Ok(query) => {
232 let mut disco_info = disco.clone();
233 disco_info.node = query.node;
234 let iq = Iq::from_result(iq.id, Some(disco_info))
235 .with_to(iq.from.unwrap())
236 .into();
237 sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
238 },
239 Err(err) => {
240 send_error(iq.from.unwrap(), iq.id, ErrorType::Modify, DefinedCondition::BadRequest, &format!("{}", err));
241 },
242 }
243 } else {
244 // We MUST answer unhandled get iqs with a service-unavailable error.
245 send_error(iq.from.unwrap(), iq.id, ErrorType::Cancel, DefinedCondition::ServiceUnavailable, "No handler defined for this kind of iq.");
246 }
247 } else if let IqType::Result(Some(payload)) = iq.payload {
248 // TODO: move private iqs like this one somewhere else, for
249 // security reasons.
250 if payload.is("query", ns::ROSTER) && iq.from.is_none() {
251 let roster = Roster::try_from(payload).unwrap();
252 for item in roster.items.into_iter() {
253 events.push(Event::ContactAdded(item));
254 }
255 } else if payload.is("pubsub", ns::PUBSUB) {
256 let new_events = pubsub::handle_iq_result(&from, payload);
257 events.extend(new_events);
258 }
259 } else if let IqType::Set(_) = iq.payload {
260 // We MUST answer unhandled set iqs with a service-unavailable error.
261 send_error(iq.from.unwrap(), iq.id, ErrorType::Cancel, DefinedCondition::ServiceUnavailable, "No handler defined for this kind of iq.");
262 }
263 } else if stanza.is("message", "jabber:client") {
264 let message = Message::try_from(stanza).unwrap();
265 let from = message.from.clone().unwrap();
266 for child in message.payloads {
267 if child.is("event", ns::PUBSUB_EVENT) {
268 let new_events = pubsub::handle_event(&from, child, &mut sender_tx);
269 events.extend(new_events);
270 }
271 }
272 } else if stanza.is("presence", "jabber:client") {
273 let presence = Presence::try_from(stanza).unwrap();
274 let from: BareJid = match presence.from.clone().unwrap() {
275 Jid::Full(FullJid { node, domain, .. }) => BareJid { node, domain },
276 Jid::Bare(bare) => bare,
277 };
278 for payload in presence.payloads.into_iter() {
279 let muc_user = match MucUser::try_from(payload) {
280 Ok(muc_user) => muc_user,
281 _ => continue
282 };
283 for status in muc_user.status.into_iter() {
284 if status == Status::SelfPresence {
285 events.push(Event::RoomJoined(from.clone()));
286 break;
287 }
288 }
289 }
290 } else if stanza.is("error", "http://etherx.jabber.org/streams") {
291 println!("Received a fatal stream error: {}", String::from(&stanza));
292 } else {
293 panic!("Unknown stanza: {}", String::from(&stanza));
294 }
295 }
296 }
297
298 futures::stream::iter_ok(events)
299 })
300 .flatten()
301 };
302
303 let sender = sender_rx
304 .map_err(|e| panic!("Sink error: {:?}", e))
305 .forward(sink)
306 .map(|(rx, mut sink)| {
307 drop(rx);
308 let _ = sink.close();
309 None
310 });
311
312 // TODO is this correct?
313 // Some(Error) means a real error
314 // None means the end of the sender stream and can be ignored
315 let future = reader
316 .map(Some)
317 .select(sender.into_stream())
318 .filter_map(|x| x);
319
320 let agent = Agent {
321 sender_tx,
322 default_nick: Rc::new(RefCell::new(self.default_nick)),
323 };
324
325 (agent, future)
326 }
327}
328
329#[derive(Clone)]
330pub struct Agent {
331 sender_tx: mpsc::UnboundedSender<Packet>,
332 default_nick: Rc<RefCell<String>>,
333}
334
335impl Agent {
336 pub fn join_room(&mut self, room: BareJid, nick: Option<String>, password: Option<String>,
337 lang: &str, status: &str) {
338 let mut muc = Muc::new();
339 if let Some(password) = password {
340 muc = muc.with_password(password);
341 }
342
343 let nick = nick.unwrap_or_else(|| self.default_nick.borrow().clone());
344 let room_jid = room.with_resource(nick);
345 let mut presence = Presence::new(PresenceType::None)
346 .with_to(Jid::Full(room_jid));
347 presence.add_payload(muc);
348 presence.set_status(String::from(lang), String::from(status));
349 let presence = presence.into();
350 self.sender_tx.unbounded_send(Packet::Stanza(presence))
351 .unwrap();
352 }
353
354 pub fn send_message(&mut self, recipient: Jid, type_: MessageType, lang: &str, text: &str) {
355 let mut message = Message::new(Some(recipient));
356 message.type_ = type_;
357 message.bodies.insert(String::from(lang), Body(String::from(text)));
358 let message = message.into();
359 self.sender_tx.unbounded_send(Packet::Stanza(message))
360 .unwrap();
361 }
362}