1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#![deny(bare_trait_objects)]
8
9use futures::stream::StreamExt;
10use reqwest::{Body as ReqwestBody, Client as ReqwestClient};
11use std::cell::RefCell;
12use std::convert::TryFrom;
13use std::path::{Path, PathBuf};
14use std::rc::Rc;
15use tokio::fs::File;
16use tokio_util::codec::{BytesCodec, FramedRead};
17use tokio_xmpp::{AsyncClient as TokioXmppClient, Event as TokioXmppEvent};
18use xmpp_parsers::{
19 bookmarks2::Conference,
20 caps::{compute_disco, hash_caps, Caps},
21 disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
22 hashes::Algo,
23 http_upload::{SlotRequest, SlotResult},
24 iq::{Iq, IqType},
25 message::{Body, Message, MessageType},
26 muc::{
27 user::{MucUser, Status},
28 Muc,
29 },
30 ns,
31 presence::{Presence, Type as PresenceType},
32 pubsub::pubsub::{Items, PubSub},
33 roster::{Item as RosterItem, Roster},
34 stanza_error::{DefinedCondition, ErrorType, StanzaError},
35 BareJid, Element, FullJid, Jid,
36};
37#[macro_use]
38extern crate log;
39
40mod pubsub;
41
42pub type Error = tokio_xmpp::Error;
43
44#[derive(Debug)]
45pub enum ClientType {
46 Bot,
47 Pc,
48}
49
50impl Default for ClientType {
51 fn default() -> Self {
52 ClientType::Bot
53 }
54}
55
56impl ToString for ClientType {
57 fn to_string(&self) -> String {
58 String::from(match self {
59 ClientType::Bot => "bot",
60 ClientType::Pc => "pc",
61 })
62 }
63}
64
65#[derive(PartialEq)]
66pub enum ClientFeature {
67 #[cfg(feature = "avatars")]
68 Avatars,
69 ContactList,
70 JoinRooms,
71}
72
73pub type RoomNick = String;
74
75#[derive(Debug)]
76pub enum Event {
77 Online,
78 Disconnected,
79 ContactAdded(RosterItem),
80 ContactRemoved(RosterItem),
81 ContactChanged(RosterItem),
82 #[cfg(feature = "avatars")]
83 AvatarRetrieved(Jid, String),
84 ChatMessage(BareJid, Body),
85 JoinRoom(BareJid, Conference),
86 LeaveRoom(BareJid),
87 LeaveAllRooms,
88 RoomJoined(BareJid),
89 RoomLeft(BareJid),
90 RoomMessage(BareJid, RoomNick, Body),
91 HttpUploadedFile(String),
92}
93
94#[derive(Default)]
95pub struct ClientBuilder<'a> {
96 jid: &'a str,
97 password: &'a str,
98 website: String,
99 default_nick: String,
100 lang: Vec<String>,
101 disco: (ClientType, String),
102 features: Vec<ClientFeature>,
103}
104
105impl ClientBuilder<'_> {
106 pub fn new<'a>(jid: &'a str, password: &'a str) -> ClientBuilder<'a> {
107 ClientBuilder {
108 jid,
109 password,
110 website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
111 default_nick: String::from("xmpp-rs"),
112 lang: vec![String::from("en")],
113 disco: (ClientType::default(), String::from("tokio-xmpp")),
114 features: vec![],
115 }
116 }
117
118 pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
119 self.disco = (type_, String::from(name));
120 self
121 }
122
123 pub fn set_website(mut self, url: &str) -> Self {
124 self.website = String::from(url);
125 self
126 }
127
128 pub fn set_default_nick(mut self, nick: &str) -> Self {
129 self.default_nick = String::from(nick);
130 self
131 }
132
133 pub fn set_lang(mut self, lang: Vec<String>) -> Self {
134 self.lang = lang;
135 self
136 }
137
138 pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
139 self.features.push(feature);
140 self
141 }
142
143 fn make_disco(&self) -> DiscoInfoResult {
144 let identities = vec![Identity::new(
145 "client",
146 self.disco.0.to_string(),
147 "en",
148 self.disco.1.to_string(),
149 )];
150 let mut features = vec![Feature::new(ns::DISCO_INFO)];
151 #[cfg(feature = "avatars")]
152 {
153 if self.features.contains(&ClientFeature::Avatars) {
154 features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
155 }
156 }
157 if self.features.contains(&ClientFeature::JoinRooms) {
158 features.push(Feature::new(format!("{}+notify", ns::BOOKMARKS2)));
159 }
160 DiscoInfoResult {
161 node: None,
162 identities,
163 features,
164 extensions: vec![],
165 }
166 }
167
168 pub fn build(self) -> Result<Agent, Error> {
169 let client = TokioXmppClient::new(self.jid, self.password)?;
170 Ok(self.build_impl(client)?)
171 }
172
173 // This function is meant to be used for testing build
174 pub(crate) fn build_impl(self, client: TokioXmppClient) -> Result<Agent, Error> {
175 let disco = self.make_disco();
176 let node = self.website;
177
178 let agent = Agent {
179 client,
180 default_nick: Rc::new(RefCell::new(self.default_nick)),
181 lang: Rc::new(self.lang),
182 disco,
183 node,
184 uploads: Vec::new(),
185 };
186
187 Ok(agent)
188 }
189}
190
191pub struct Agent {
192 client: TokioXmppClient,
193 default_nick: Rc<RefCell<String>>,
194 lang: Rc<Vec<String>>,
195 disco: DiscoInfoResult,
196 node: String,
197 uploads: Vec<(String, Jid, PathBuf)>,
198}
199
200impl Agent {
201 pub async fn join_room(
202 &mut self,
203 room: BareJid,
204 nick: Option<String>,
205 password: Option<String>,
206 lang: &str,
207 status: &str,
208 ) {
209 let mut muc = Muc::new();
210 if let Some(password) = password {
211 muc = muc.with_password(password);
212 }
213
214 let nick = nick.unwrap_or_else(|| self.default_nick.borrow().clone());
215 let room_jid = room.with_resource(nick);
216 let mut presence = Presence::new(PresenceType::None).with_to(Jid::Full(room_jid));
217 presence.add_payload(muc);
218 presence.set_status(String::from(lang), String::from(status));
219 let _ = self.client.send_stanza(presence.into()).await;
220 }
221
222 pub async fn send_message(
223 &mut self,
224 recipient: Jid,
225 type_: MessageType,
226 lang: &str,
227 text: &str,
228 ) {
229 let mut message = Message::new(Some(recipient));
230 message.type_ = type_;
231 message
232 .bodies
233 .insert(String::from(lang), Body(String::from(text)));
234 let _ = self.client.send_stanza(message.into()).await;
235 }
236
237 fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
238 let caps_data = compute_disco(disco);
239 let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
240 let caps = Caps::new(node, hash);
241
242 let mut presence = Presence::new(PresenceType::None);
243 presence.add_payload(caps);
244 presence
245 }
246
247 async fn handle_iq(&mut self, iq: Iq) -> Vec<Event> {
248 let mut events = vec![];
249 let from = iq
250 .from
251 .clone()
252 .unwrap_or_else(|| self.client.bound_jid().unwrap().clone());
253 if let IqType::Get(payload) = iq.payload {
254 if payload.is("query", ns::DISCO_INFO) {
255 let query = DiscoInfoQuery::try_from(payload);
256 match query {
257 Ok(query) => {
258 let mut disco_info = self.disco.clone();
259 disco_info.node = query.node;
260 let iq = Iq::from_result(iq.id, Some(disco_info))
261 .with_to(iq.from.unwrap())
262 .into();
263 let _ = self.client.send_stanza(iq).await;
264 }
265 Err(err) => {
266 let error = StanzaError::new(
267 ErrorType::Modify,
268 DefinedCondition::BadRequest,
269 "en",
270 &format!("{}", err),
271 );
272 let iq = Iq::from_error(iq.id, error)
273 .with_to(iq.from.unwrap())
274 .into();
275 let _ = self.client.send_stanza(iq).await;
276 }
277 }
278 } else {
279 // We MUST answer unhandled get iqs with a service-unavailable error.
280 let error = StanzaError::new(
281 ErrorType::Cancel,
282 DefinedCondition::ServiceUnavailable,
283 "en",
284 "No handler defined for this kind of iq.",
285 );
286 let iq = Iq::from_error(iq.id, error)
287 .with_to(iq.from.unwrap())
288 .into();
289 let _ = self.client.send_stanza(iq).await;
290 }
291 } else if let IqType::Result(Some(payload)) = iq.payload {
292 // TODO: move private iqs like this one somewhere else, for
293 // security reasons.
294 if payload.is("query", ns::ROSTER) && iq.from.is_none() {
295 let roster = Roster::try_from(payload).unwrap();
296 for item in roster.items.into_iter() {
297 events.push(Event::ContactAdded(item));
298 }
299 } else if payload.is("pubsub", ns::PUBSUB) {
300 let new_events = pubsub::handle_iq_result(&from, payload);
301 events.extend(new_events);
302 } else if payload.is("slot", ns::HTTP_UPLOAD) {
303 let new_events = handle_upload_result(&from, iq.id, payload, self).await;
304 events.extend(new_events);
305 }
306 } else if let IqType::Set(_) = iq.payload {
307 // We MUST answer unhandled set iqs with a service-unavailable error.
308 let error = StanzaError::new(
309 ErrorType::Cancel,
310 DefinedCondition::ServiceUnavailable,
311 "en",
312 "No handler defined for this kind of iq.",
313 );
314 let iq = Iq::from_error(iq.id, error)
315 .with_to(iq.from.unwrap())
316 .into();
317 let _ = self.client.send_stanza(iq).await;
318 }
319
320 events
321 }
322
323 async fn handle_message(&mut self, message: Message) -> Vec<Event> {
324 let mut events = vec![];
325 let from = message.from.clone().unwrap();
326 let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect();
327 match message.get_best_body(langs) {
328 Some((_lang, body)) => match message.type_ {
329 MessageType::Groupchat => {
330 let event = Event::RoomMessage(
331 from.clone().into(),
332 FullJid::try_from(from.clone()).unwrap().resource,
333 body.clone(),
334 );
335 events.push(event)
336 }
337 MessageType::Chat | MessageType::Normal => {
338 let event = Event::ChatMessage(from.clone().into(), body.clone());
339 events.push(event)
340 }
341 _ => (),
342 },
343 None => (),
344 }
345 for child in message.payloads {
346 if child.is("event", ns::PUBSUB_EVENT) {
347 let new_events = pubsub::handle_event(&from, child, self).await;
348 events.extend(new_events);
349 }
350 }
351
352 events
353 }
354
355 async fn handle_presence(&mut self, presence: Presence) -> Vec<Event> {
356 let mut events = vec![];
357 let from: BareJid = match presence.from.clone().unwrap() {
358 Jid::Full(FullJid { node, domain, .. }) => BareJid { node, domain },
359 Jid::Bare(bare) => bare,
360 };
361 for payload in presence.payloads.into_iter() {
362 let muc_user = match MucUser::try_from(payload) {
363 Ok(muc_user) => muc_user,
364 _ => continue,
365 };
366 for status in muc_user.status.into_iter() {
367 if status == Status::SelfPresence {
368 events.push(Event::RoomJoined(from.clone()));
369 break;
370 }
371 }
372 }
373
374 events
375 }
376
377 pub async fn wait_for_events(&mut self) -> Option<Vec<Event>> {
378 if let Some(event) = self.client.next().await {
379 let mut events = Vec::new();
380
381 match event {
382 TokioXmppEvent::Online { resumed: false, .. } => {
383 let presence = Self::make_initial_presence(&self.disco, &self.node).into();
384 let _ = self.client.send_stanza(presence).await;
385 events.push(Event::Online);
386 // TODO: only send this when the ContactList feature is enabled.
387 let iq = Iq::from_get(
388 "roster",
389 Roster {
390 ver: None,
391 items: vec![],
392 },
393 )
394 .into();
395 let _ = self.client.send_stanza(iq).await;
396 // TODO: only send this when the JoinRooms feature is enabled.
397 let iq =
398 Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2))).into();
399 let _ = self.client.send_stanza(iq).await;
400 }
401 TokioXmppEvent::Online { resumed: true, .. } => {}
402 TokioXmppEvent::Disconnected(_) => {
403 events.push(Event::Disconnected);
404 }
405 TokioXmppEvent::Stanza(elem) => {
406 if elem.is("iq", "jabber:client") {
407 let iq = Iq::try_from(elem).unwrap();
408 let new_events = self.handle_iq(iq).await;
409 events.extend(new_events);
410 } else if elem.is("message", "jabber:client") {
411 let message = Message::try_from(elem).unwrap();
412 let new_events = self.handle_message(message).await;
413 events.extend(new_events);
414 } else if elem.is("presence", "jabber:client") {
415 let presence = Presence::try_from(elem).unwrap();
416 let new_events = self.handle_presence(presence).await;
417 events.extend(new_events);
418 } else if elem.is("error", "http://etherx.jabber.org/streams") {
419 println!("Received a fatal stream error: {}", String::from(&elem));
420 } else {
421 panic!("Unknown stanza: {}", String::from(&elem));
422 }
423 }
424 }
425
426 Some(events)
427 } else {
428 None
429 }
430 }
431
432 pub async fn upload_file_with(&mut self, service: &str, path: &Path) {
433 let name = path.file_name().unwrap().to_str().unwrap().to_string();
434 let file = File::open(path).await.unwrap();
435 let size = file.metadata().await.unwrap().len();
436 let slot_request = SlotRequest {
437 filename: name,
438 size: size,
439 content_type: None,
440 };
441 let to = service.parse::<Jid>().unwrap();
442 let request = Iq::from_get("upload1", slot_request).with_to(to.clone());
443 self.uploads
444 .push((String::from("upload1"), to, path.to_path_buf()));
445 self.client.send_stanza(request.into()).await.unwrap();
446 }
447}
448
449async fn handle_upload_result(
450 from: &Jid,
451 iqid: String,
452 elem: Element,
453 agent: &mut Agent,
454) -> impl IntoIterator<Item = Event> {
455 let mut res: Option<(usize, PathBuf)> = None;
456
457 for (i, (id, to, filepath)) in agent.uploads.iter().enumerate() {
458 if to == from && id == &iqid {
459 res = Some((i, filepath.to_path_buf()));
460 break;
461 }
462 }
463
464 if let Some((index, file)) = res {
465 agent.uploads.remove(index);
466 let slot = SlotResult::try_from(elem).unwrap();
467 let web = ReqwestClient::new();
468 let stream = FramedRead::new(File::open(file).await.unwrap(), BytesCodec::new());
469 let body = ReqwestBody::wrap_stream(stream);
470 let res = web
471 .put(slot.put.url.as_str())
472 .body(body)
473 .send()
474 .await
475 .unwrap();
476 if res.status() == 201 {
477 return vec![Event::HttpUploadedFile(slot.get.url)];
478 }
479 }
480
481 return vec![];
482}
483
484#[cfg(test)]
485mod tests {
486 use super::{Agent, ClientBuilder, ClientFeature, ClientType, Event};
487 use tokio_xmpp::AsyncClient as TokioXmppClient;
488
489 #[tokio::test]
490 async fn test_simple() {
491 let client = TokioXmppClient::new("foo@bar", "meh").unwrap();
492
493 // Client instance
494 let client_builder = ClientBuilder::new("foo@bar", "meh")
495 .set_client(ClientType::Bot, "xmpp-rs")
496 .set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
497 .set_default_nick("bot")
498 .enable_feature(ClientFeature::Avatars)
499 .enable_feature(ClientFeature::ContactList);
500
501 let mut agent: Agent = client_builder.build_impl(client).unwrap();
502
503 while let Some(events) = agent.wait_for_events().await {
504 assert!(match events[0] {
505 Event::Disconnected => true,
506 _ => false,
507 });
508 assert_eq!(events.len(), 1);
509 break;
510 }
511 }
512}