1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#![deny(bare_trait_objects)]
8
9use futures::stream::StreamExt;
10use reqwest::{
11 header::HeaderMap as ReqwestHeaderMap, Body as ReqwestBody, Client as ReqwestClient,
12};
13use std::cell::RefCell;
14use std::convert::TryFrom;
15use std::path::{Path, PathBuf};
16use std::rc::Rc;
17use tokio::fs::File;
18use tokio_util::codec::{BytesCodec, FramedRead};
19use tokio_xmpp::{AsyncClient as TokioXmppClient, Event as TokioXmppEvent};
20use xmpp_parsers::{
21 bookmarks2::Conference,
22 caps::{compute_disco, hash_caps, Caps},
23 disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
24 hashes::Algo,
25 http_upload::{Header as HttpUploadHeader, SlotRequest, SlotResult},
26 iq::{Iq, IqType},
27 message::{Body, Message, MessageType},
28 muc::{
29 user::{MucUser, Status},
30 Muc,
31 },
32 ns,
33 presence::{Presence, Type as PresenceType},
34 pubsub::pubsub::{Items, PubSub},
35 roster::{Item as RosterItem, Roster},
36 stanza_error::{DefinedCondition, ErrorType, StanzaError},
37 BareJid, Element, FullJid, Jid,
38};
39#[macro_use]
40extern crate log;
41
42mod pubsub;
43
44pub type Error = tokio_xmpp::Error;
45
46#[derive(Debug)]
47pub enum ClientType {
48 Bot,
49 Pc,
50}
51
52impl Default for ClientType {
53 fn default() -> Self {
54 ClientType::Bot
55 }
56}
57
58impl ToString for ClientType {
59 fn to_string(&self) -> String {
60 String::from(match self {
61 ClientType::Bot => "bot",
62 ClientType::Pc => "pc",
63 })
64 }
65}
66
67#[derive(PartialEq)]
68pub enum ClientFeature {
69 #[cfg(feature = "avatars")]
70 Avatars,
71 ContactList,
72 JoinRooms,
73}
74
75pub type RoomNick = String;
76
77#[derive(Debug)]
78pub enum Event {
79 Online,
80 Disconnected,
81 ContactAdded(RosterItem),
82 ContactRemoved(RosterItem),
83 ContactChanged(RosterItem),
84 #[cfg(feature = "avatars")]
85 AvatarRetrieved(Jid, String),
86 ChatMessage(BareJid, Body),
87 JoinRoom(BareJid, Conference),
88 LeaveRoom(BareJid),
89 LeaveAllRooms,
90 RoomJoined(BareJid),
91 RoomLeft(BareJid),
92 RoomMessage(BareJid, RoomNick, Body),
93 HttpUploadedFile(String),
94}
95
96#[derive(Default)]
97pub struct ClientBuilder<'a> {
98 jid: &'a str,
99 password: &'a str,
100 website: String,
101 default_nick: String,
102 lang: Vec<String>,
103 disco: (ClientType, String),
104 features: Vec<ClientFeature>,
105}
106
107impl ClientBuilder<'_> {
108 pub fn new<'a>(jid: &'a str, password: &'a str) -> ClientBuilder<'a> {
109 ClientBuilder {
110 jid,
111 password,
112 website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
113 default_nick: String::from("xmpp-rs"),
114 lang: vec![String::from("en")],
115 disco: (ClientType::default(), String::from("tokio-xmpp")),
116 features: vec![],
117 }
118 }
119
120 pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
121 self.disco = (type_, String::from(name));
122 self
123 }
124
125 pub fn set_website(mut self, url: &str) -> Self {
126 self.website = String::from(url);
127 self
128 }
129
130 pub fn set_default_nick(mut self, nick: &str) -> Self {
131 self.default_nick = String::from(nick);
132 self
133 }
134
135 pub fn set_lang(mut self, lang: Vec<String>) -> Self {
136 self.lang = lang;
137 self
138 }
139
140 pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
141 self.features.push(feature);
142 self
143 }
144
145 fn make_disco(&self) -> DiscoInfoResult {
146 let identities = vec![Identity::new(
147 "client",
148 self.disco.0.to_string(),
149 "en",
150 self.disco.1.to_string(),
151 )];
152 let mut features = vec![Feature::new(ns::DISCO_INFO)];
153 #[cfg(feature = "avatars")]
154 {
155 if self.features.contains(&ClientFeature::Avatars) {
156 features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
157 }
158 }
159 if self.features.contains(&ClientFeature::JoinRooms) {
160 features.push(Feature::new(format!("{}+notify", ns::BOOKMARKS2)));
161 }
162 DiscoInfoResult {
163 node: None,
164 identities,
165 features,
166 extensions: vec![],
167 }
168 }
169
170 pub fn build(self) -> Result<Agent, Error> {
171 let client = TokioXmppClient::new(self.jid, self.password)?;
172 Ok(self.build_impl(client)?)
173 }
174
175 // This function is meant to be used for testing build
176 pub(crate) fn build_impl(self, client: TokioXmppClient) -> Result<Agent, Error> {
177 let disco = self.make_disco();
178 let node = self.website;
179
180 let agent = Agent {
181 client,
182 default_nick: Rc::new(RefCell::new(self.default_nick)),
183 lang: Rc::new(self.lang),
184 disco,
185 node,
186 uploads: Vec::new(),
187 };
188
189 Ok(agent)
190 }
191}
192
193pub struct Agent {
194 client: TokioXmppClient,
195 default_nick: Rc<RefCell<String>>,
196 lang: Rc<Vec<String>>,
197 disco: DiscoInfoResult,
198 node: String,
199 uploads: Vec<(String, Jid, PathBuf)>,
200}
201
202impl Agent {
203 pub async fn disconnect(&mut self) -> Result<(), Error> {
204 self.client.send_end().await
205 }
206
207 pub async fn join_room(
208 &mut self,
209 room: BareJid,
210 nick: Option<String>,
211 password: Option<String>,
212 lang: &str,
213 status: &str,
214 ) {
215 let mut muc = Muc::new();
216 if let Some(password) = password {
217 muc = muc.with_password(password);
218 }
219
220 let nick = nick.unwrap_or_else(|| self.default_nick.borrow().clone());
221 let room_jid = room.with_resource(nick);
222 let mut presence = Presence::new(PresenceType::None).with_to(Jid::Full(room_jid));
223 presence.add_payload(muc);
224 presence.set_status(String::from(lang), String::from(status));
225 let _ = self.client.send_stanza(presence.into()).await;
226 }
227
228 pub async fn send_message(
229 &mut self,
230 recipient: Jid,
231 type_: MessageType,
232 lang: &str,
233 text: &str,
234 ) {
235 let mut message = Message::new(Some(recipient));
236 message.type_ = type_;
237 message
238 .bodies
239 .insert(String::from(lang), Body(String::from(text)));
240 let _ = self.client.send_stanza(message.into()).await;
241 }
242
243 fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
244 let caps_data = compute_disco(disco);
245 let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
246 let caps = Caps::new(node, hash);
247
248 let mut presence = Presence::new(PresenceType::None);
249 presence.add_payload(caps);
250 presence
251 }
252
253 async fn handle_iq(&mut self, iq: Iq) -> Vec<Event> {
254 let mut events = vec![];
255 let from = iq
256 .from
257 .clone()
258 .unwrap_or_else(|| self.client.bound_jid().unwrap().clone());
259 if let IqType::Get(payload) = iq.payload {
260 if payload.is("query", ns::DISCO_INFO) {
261 let query = DiscoInfoQuery::try_from(payload);
262 match query {
263 Ok(query) => {
264 let mut disco_info = self.disco.clone();
265 disco_info.node = query.node;
266 let iq = Iq::from_result(iq.id, Some(disco_info))
267 .with_to(iq.from.unwrap())
268 .into();
269 let _ = self.client.send_stanza(iq).await;
270 }
271 Err(err) => {
272 let error = StanzaError::new(
273 ErrorType::Modify,
274 DefinedCondition::BadRequest,
275 "en",
276 &format!("{}", err),
277 );
278 let iq = Iq::from_error(iq.id, error)
279 .with_to(iq.from.unwrap())
280 .into();
281 let _ = self.client.send_stanza(iq).await;
282 }
283 }
284 } else {
285 // We MUST answer unhandled get iqs with a service-unavailable error.
286 let error = StanzaError::new(
287 ErrorType::Cancel,
288 DefinedCondition::ServiceUnavailable,
289 "en",
290 "No handler defined for this kind of iq.",
291 );
292 let iq = Iq::from_error(iq.id, error)
293 .with_to(iq.from.unwrap())
294 .into();
295 let _ = self.client.send_stanza(iq).await;
296 }
297 } else if let IqType::Result(Some(payload)) = iq.payload {
298 // TODO: move private iqs like this one somewhere else, for
299 // security reasons.
300 if payload.is("query", ns::ROSTER) && iq.from.is_none() {
301 let roster = Roster::try_from(payload).unwrap();
302 for item in roster.items.into_iter() {
303 events.push(Event::ContactAdded(item));
304 }
305 } else if payload.is("pubsub", ns::PUBSUB) {
306 let new_events = pubsub::handle_iq_result(&from, payload);
307 events.extend(new_events);
308 } else if payload.is("slot", ns::HTTP_UPLOAD) {
309 let new_events = handle_upload_result(&from, iq.id, payload, self).await;
310 events.extend(new_events);
311 }
312 } else if let IqType::Set(_) = iq.payload {
313 // We MUST answer unhandled set iqs with a service-unavailable error.
314 let error = StanzaError::new(
315 ErrorType::Cancel,
316 DefinedCondition::ServiceUnavailable,
317 "en",
318 "No handler defined for this kind of iq.",
319 );
320 let iq = Iq::from_error(iq.id, error)
321 .with_to(iq.from.unwrap())
322 .into();
323 let _ = self.client.send_stanza(iq).await;
324 }
325
326 events
327 }
328
329 async fn handle_message(&mut self, message: Message) -> Vec<Event> {
330 let mut events = vec![];
331 let from = message.from.clone().unwrap();
332 let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect();
333 match message.get_best_body(langs) {
334 Some((_lang, body)) => match message.type_ {
335 MessageType::Groupchat => {
336 let event = Event::RoomMessage(
337 from.clone().into(),
338 FullJid::try_from(from.clone()).unwrap().resource,
339 body.clone(),
340 );
341 events.push(event)
342 }
343 MessageType::Chat | MessageType::Normal => {
344 let event = Event::ChatMessage(from.clone().into(), body.clone());
345 events.push(event)
346 }
347 _ => (),
348 },
349 None => (),
350 }
351 for child in message.payloads {
352 if child.is("event", ns::PUBSUB_EVENT) {
353 let new_events = pubsub::handle_event(&from, child, self).await;
354 events.extend(new_events);
355 }
356 }
357
358 events
359 }
360
361 async fn handle_presence(&mut self, presence: Presence) -> Vec<Event> {
362 let mut events = vec![];
363 let from: BareJid = match presence.from.clone().unwrap() {
364 Jid::Full(FullJid { node, domain, .. }) => BareJid { node, domain },
365 Jid::Bare(bare) => bare,
366 };
367 for payload in presence.payloads.into_iter() {
368 let muc_user = match MucUser::try_from(payload) {
369 Ok(muc_user) => muc_user,
370 _ => continue,
371 };
372 for status in muc_user.status.into_iter() {
373 if status == Status::SelfPresence {
374 events.push(Event::RoomJoined(from.clone()));
375 break;
376 }
377 }
378 }
379
380 events
381 }
382
383 pub async fn wait_for_events(&mut self) -> Option<Vec<Event>> {
384 if let Some(event) = self.client.next().await {
385 let mut events = Vec::new();
386
387 match event {
388 TokioXmppEvent::Online { resumed: false, .. } => {
389 let presence = Self::make_initial_presence(&self.disco, &self.node).into();
390 let _ = self.client.send_stanza(presence).await;
391 events.push(Event::Online);
392 // TODO: only send this when the ContactList feature is enabled.
393 let iq = Iq::from_get(
394 "roster",
395 Roster {
396 ver: None,
397 items: vec![],
398 },
399 )
400 .into();
401 let _ = self.client.send_stanza(iq).await;
402 // TODO: only send this when the JoinRooms feature is enabled.
403 let iq =
404 Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2))).into();
405 let _ = self.client.send_stanza(iq).await;
406 }
407 TokioXmppEvent::Online { resumed: true, .. } => {}
408 TokioXmppEvent::Disconnected(_) => {
409 events.push(Event::Disconnected);
410 }
411 TokioXmppEvent::Stanza(elem) => {
412 if elem.is("iq", "jabber:client") {
413 let iq = Iq::try_from(elem).unwrap();
414 let new_events = self.handle_iq(iq).await;
415 events.extend(new_events);
416 } else if elem.is("message", "jabber:client") {
417 let message = Message::try_from(elem).unwrap();
418 let new_events = self.handle_message(message).await;
419 events.extend(new_events);
420 } else if elem.is("presence", "jabber:client") {
421 let presence = Presence::try_from(elem).unwrap();
422 let new_events = self.handle_presence(presence).await;
423 events.extend(new_events);
424 } else if elem.is("error", "http://etherx.jabber.org/streams") {
425 println!("Received a fatal stream error: {}", String::from(&elem));
426 } else {
427 panic!("Unknown stanza: {}", String::from(&elem));
428 }
429 }
430 }
431
432 Some(events)
433 } else {
434 None
435 }
436 }
437
438 pub async fn upload_file_with(&mut self, service: &str, path: &Path) {
439 let name = path.file_name().unwrap().to_str().unwrap().to_string();
440 let file = File::open(path).await.unwrap();
441 let size = file.metadata().await.unwrap().len();
442 let slot_request = SlotRequest {
443 filename: name,
444 size: size,
445 content_type: None,
446 };
447 let to = service.parse::<Jid>().unwrap();
448 let request = Iq::from_get("upload1", slot_request).with_to(to.clone());
449 self.uploads
450 .push((String::from("upload1"), to, path.to_path_buf()));
451 self.client.send_stanza(request.into()).await.unwrap();
452 }
453}
454
455async fn handle_upload_result(
456 from: &Jid,
457 iqid: String,
458 elem: Element,
459 agent: &mut Agent,
460) -> impl IntoIterator<Item = Event> {
461 let mut res: Option<(usize, PathBuf)> = None;
462
463 for (i, (id, to, filepath)) in agent.uploads.iter().enumerate() {
464 if to == from && id == &iqid {
465 res = Some((i, filepath.to_path_buf()));
466 break;
467 }
468 }
469
470 if let Some((index, file)) = res {
471 agent.uploads.remove(index);
472 let slot = SlotResult::try_from(elem).unwrap();
473
474 let mut headers = ReqwestHeaderMap::new();
475 for header in slot.put.headers {
476 let (attr, val) = match header {
477 HttpUploadHeader::Authorization(val) => ("Authorization", val),
478 HttpUploadHeader::Cookie(val) => ("Cookie", val),
479 HttpUploadHeader::Expires(val) => ("Expires", val),
480 };
481 headers.insert(attr, val.parse().unwrap());
482 }
483
484 let web = ReqwestClient::new();
485 let stream = FramedRead::new(File::open(file).await.unwrap(), BytesCodec::new());
486 let body = ReqwestBody::wrap_stream(stream);
487 let res = web
488 .put(slot.put.url.as_str())
489 .headers(headers)
490 .body(body)
491 .send()
492 .await
493 .unwrap();
494 if res.status() == 201 {
495 return vec![Event::HttpUploadedFile(slot.get.url)];
496 }
497 }
498
499 return vec![];
500}
501
502#[cfg(test)]
503mod tests {
504 use super::{Agent, ClientBuilder, ClientFeature, ClientType, Event};
505 use tokio_xmpp::AsyncClient as TokioXmppClient;
506
507 #[tokio::test]
508 async fn test_simple() {
509 let client = TokioXmppClient::new("foo@bar", "meh").unwrap();
510
511 // Client instance
512 let client_builder = ClientBuilder::new("foo@bar", "meh")
513 .set_client(ClientType::Bot, "xmpp-rs")
514 .set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
515 .set_default_nick("bot")
516 .enable_feature(ClientFeature::Avatars)
517 .enable_feature(ClientFeature::ContactList);
518
519 let mut agent: Agent = client_builder.build_impl(client).unwrap();
520
521 while let Some(events) = agent.wait_for_events().await {
522 assert!(match events[0] {
523 Event::Disconnected => true,
524 _ => false,
525 });
526 assert_eq!(events.len(), 1);
527 break;
528 }
529 }
530}