1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#![deny(bare_trait_objects)]
8
9use futures::stream::StreamExt;
10use reqwest::{
11 header::HeaderMap as ReqwestHeaderMap, Body as ReqwestBody, Client as ReqwestClient,
12};
13use std::cell::RefCell;
14use std::convert::TryFrom;
15use std::path::{Path, PathBuf};
16use std::rc::Rc;
17use tokio::fs::File;
18use tokio_util::codec::{BytesCodec, FramedRead};
19use tokio_xmpp::{AsyncClient as TokioXmppClient, Event as TokioXmppEvent};
20use xmpp_parsers::{
21 bookmarks2::Conference,
22 caps::{compute_disco, hash_caps, Caps},
23 disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
24 hashes::Algo,
25 http_upload::{Header as HttpUploadHeader, SlotRequest, SlotResult},
26 iq::{Iq, IqType},
27 message::{Body, Message, MessageType},
28 muc::{
29 user::{MucUser, Status},
30 Muc,
31 },
32 ns,
33 presence::{Presence, Type as PresenceType},
34 pubsub::pubsub::{Items, PubSub},
35 roster::{Item as RosterItem, Roster},
36 stanza_error::{DefinedCondition, ErrorType, StanzaError},
37 BareJid, Element, FullJid, Jid,
38};
39#[macro_use]
40extern crate log;
41
42mod pubsub;
43
44pub type Error = tokio_xmpp::Error;
45
46#[derive(Debug)]
47pub enum ClientType {
48 Bot,
49 Pc,
50}
51
52impl Default for ClientType {
53 fn default() -> Self {
54 ClientType::Bot
55 }
56}
57
58impl ToString for ClientType {
59 fn to_string(&self) -> String {
60 String::from(match self {
61 ClientType::Bot => "bot",
62 ClientType::Pc => "pc",
63 })
64 }
65}
66
67#[derive(PartialEq)]
68pub enum ClientFeature {
69 #[cfg(feature = "avatars")]
70 Avatars,
71 ContactList,
72 JoinRooms,
73}
74
75pub type Id = Option<String>;
76pub type RoomNick = String;
77
78#[derive(Debug)]
79pub enum Event {
80 Online,
81 Disconnected,
82 ContactAdded(RosterItem),
83 ContactRemoved(RosterItem),
84 ContactChanged(RosterItem),
85 #[cfg(feature = "avatars")]
86 AvatarRetrieved(Jid, String),
87 ChatMessage(Id, BareJid, Body),
88 JoinRoom(BareJid, Conference),
89 LeaveRoom(BareJid),
90 LeaveAllRooms,
91 RoomJoined(BareJid),
92 RoomLeft(BareJid),
93 RoomMessage(Id, BareJid, RoomNick, Body),
94 ServiceMessage(Id, BareJid, Body),
95 HttpUploadedFile(String),
96}
97
98#[derive(Default)]
99pub struct ClientBuilder<'a> {
100 jid: &'a str,
101 password: &'a str,
102 website: String,
103 default_nick: String,
104 lang: Vec<String>,
105 disco: (ClientType, String),
106 features: Vec<ClientFeature>,
107}
108
109impl ClientBuilder<'_> {
110 pub fn new<'a>(jid: &'a str, password: &'a str) -> ClientBuilder<'a> {
111 ClientBuilder {
112 jid,
113 password,
114 website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
115 default_nick: String::from("xmpp-rs"),
116 lang: vec![String::from("en")],
117 disco: (ClientType::default(), String::from("tokio-xmpp")),
118 features: vec![],
119 }
120 }
121
122 pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
123 self.disco = (type_, String::from(name));
124 self
125 }
126
127 pub fn set_website(mut self, url: &str) -> Self {
128 self.website = String::from(url);
129 self
130 }
131
132 pub fn set_default_nick(mut self, nick: &str) -> Self {
133 self.default_nick = String::from(nick);
134 self
135 }
136
137 pub fn set_lang(mut self, lang: Vec<String>) -> Self {
138 self.lang = lang;
139 self
140 }
141
142 pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
143 self.features.push(feature);
144 self
145 }
146
147 fn make_disco(&self) -> DiscoInfoResult {
148 let identities = vec![Identity::new(
149 "client",
150 self.disco.0.to_string(),
151 "en",
152 self.disco.1.to_string(),
153 )];
154 let mut features = vec![Feature::new(ns::DISCO_INFO)];
155 #[cfg(feature = "avatars")]
156 {
157 if self.features.contains(&ClientFeature::Avatars) {
158 features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
159 }
160 }
161 if self.features.contains(&ClientFeature::JoinRooms) {
162 features.push(Feature::new(format!("{}+notify", ns::BOOKMARKS2)));
163 }
164 DiscoInfoResult {
165 node: None,
166 identities,
167 features,
168 extensions: vec![],
169 }
170 }
171
172 pub fn build(self) -> Result<Agent, Error> {
173 let client = TokioXmppClient::new(self.jid, self.password)?;
174 Ok(self.build_impl(client)?)
175 }
176
177 // This function is meant to be used for testing build
178 pub(crate) fn build_impl(self, client: TokioXmppClient) -> Result<Agent, Error> {
179 let disco = self.make_disco();
180 let node = self.website;
181
182 let agent = Agent {
183 client,
184 default_nick: Rc::new(RefCell::new(self.default_nick)),
185 lang: Rc::new(self.lang),
186 disco,
187 node,
188 uploads: Vec::new(),
189 };
190
191 Ok(agent)
192 }
193}
194
195pub struct Agent {
196 client: TokioXmppClient,
197 default_nick: Rc<RefCell<String>>,
198 lang: Rc<Vec<String>>,
199 disco: DiscoInfoResult,
200 node: String,
201 uploads: Vec<(String, Jid, PathBuf)>,
202}
203
204impl Agent {
205 pub async fn disconnect(&mut self) -> Result<(), Error> {
206 self.client.send_end().await
207 }
208
209 pub async fn join_room(
210 &mut self,
211 room: BareJid,
212 nick: Option<String>,
213 password: Option<String>,
214 lang: &str,
215 status: &str,
216 ) {
217 let mut muc = Muc::new();
218 if let Some(password) = password {
219 muc = muc.with_password(password);
220 }
221
222 let nick = nick.unwrap_or_else(|| self.default_nick.borrow().clone());
223 let room_jid = room.with_resource(nick);
224 let mut presence = Presence::new(PresenceType::None).with_to(Jid::Full(room_jid));
225 presence.add_payload(muc);
226 presence.set_status(String::from(lang), String::from(status));
227 let _ = self.client.send_stanza(presence.into()).await;
228 }
229
230 pub async fn send_message(
231 &mut self,
232 recipient: Jid,
233 type_: MessageType,
234 lang: &str,
235 text: &str,
236 ) {
237 let mut message = Message::new(Some(recipient));
238 message.type_ = type_;
239 message
240 .bodies
241 .insert(String::from(lang), Body(String::from(text)));
242 let _ = self.client.send_stanza(message.into()).await;
243 }
244
245 fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
246 let caps_data = compute_disco(disco);
247 let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
248 let caps = Caps::new(node, hash);
249
250 let mut presence = Presence::new(PresenceType::None);
251 presence.add_payload(caps);
252 presence
253 }
254
255 async fn handle_iq(&mut self, iq: Iq) -> Vec<Event> {
256 let mut events = vec![];
257 let from = iq
258 .from
259 .clone()
260 .unwrap_or_else(|| self.client.bound_jid().unwrap().clone());
261 if let IqType::Get(payload) = iq.payload {
262 if payload.is("query", ns::DISCO_INFO) {
263 let query = DiscoInfoQuery::try_from(payload);
264 match query {
265 Ok(query) => {
266 let mut disco_info = self.disco.clone();
267 disco_info.node = query.node;
268 let iq = Iq::from_result(iq.id, Some(disco_info))
269 .with_to(iq.from.unwrap())
270 .into();
271 let _ = self.client.send_stanza(iq).await;
272 }
273 Err(err) => {
274 let error = StanzaError::new(
275 ErrorType::Modify,
276 DefinedCondition::BadRequest,
277 "en",
278 &format!("{}", err),
279 );
280 let iq = Iq::from_error(iq.id, error)
281 .with_to(iq.from.unwrap())
282 .into();
283 let _ = self.client.send_stanza(iq).await;
284 }
285 }
286 } else {
287 // We MUST answer unhandled get iqs with a service-unavailable error.
288 let error = StanzaError::new(
289 ErrorType::Cancel,
290 DefinedCondition::ServiceUnavailable,
291 "en",
292 "No handler defined for this kind of iq.",
293 );
294 let iq = Iq::from_error(iq.id, error)
295 .with_to(iq.from.unwrap())
296 .into();
297 let _ = self.client.send_stanza(iq).await;
298 }
299 } else if let IqType::Result(Some(payload)) = iq.payload {
300 // TODO: move private iqs like this one somewhere else, for
301 // security reasons.
302 if payload.is("query", ns::ROSTER) && iq.from.is_none() {
303 let roster = Roster::try_from(payload).unwrap();
304 for item in roster.items.into_iter() {
305 events.push(Event::ContactAdded(item));
306 }
307 } else if payload.is("pubsub", ns::PUBSUB) {
308 let new_events = pubsub::handle_iq_result(&from, payload);
309 events.extend(new_events);
310 } else if payload.is("slot", ns::HTTP_UPLOAD) {
311 let new_events = handle_upload_result(&from, iq.id, payload, self).await;
312 events.extend(new_events);
313 }
314 } else if let IqType::Set(_) = iq.payload {
315 // We MUST answer unhandled set iqs with a service-unavailable error.
316 let error = StanzaError::new(
317 ErrorType::Cancel,
318 DefinedCondition::ServiceUnavailable,
319 "en",
320 "No handler defined for this kind of iq.",
321 );
322 let iq = Iq::from_error(iq.id, error)
323 .with_to(iq.from.unwrap())
324 .into();
325 let _ = self.client.send_stanza(iq).await;
326 }
327
328 events
329 }
330
331 async fn handle_message(&mut self, message: Message) -> Vec<Event> {
332 let mut events = vec![];
333 let from = message.from.clone().unwrap();
334 let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect();
335 match message.get_best_body(langs) {
336 Some((_lang, body)) => match message.type_ {
337 MessageType::Groupchat => {
338 let event = match from.clone() {
339 Jid::Full(full) => Event::RoomMessage(
340 message.id.clone(),
341 from.clone().into(),
342 full.resource,
343 body.clone(),
344 ),
345 Jid::Bare(bare) => {
346 Event::ServiceMessage(message.id.clone(), bare, body.clone())
347 }
348 };
349 events.push(event)
350 }
351 MessageType::Chat | MessageType::Normal => {
352 let event =
353 Event::ChatMessage(message.id.clone(), from.clone().into(), body.clone());
354 events.push(event)
355 }
356 _ => (),
357 },
358 None => (),
359 }
360 for child in message.payloads {
361 if child.is("event", ns::PUBSUB_EVENT) {
362 let new_events = pubsub::handle_event(&from, child, self).await;
363 events.extend(new_events);
364 }
365 }
366
367 events
368 }
369
370 async fn handle_presence(&mut self, presence: Presence) -> Vec<Event> {
371 let mut events = vec![];
372 let from: BareJid = match presence.from.clone().unwrap() {
373 Jid::Full(FullJid { node, domain, .. }) => BareJid { node, domain },
374 Jid::Bare(bare) => bare,
375 };
376 for payload in presence.payloads.into_iter() {
377 let muc_user = match MucUser::try_from(payload) {
378 Ok(muc_user) => muc_user,
379 _ => continue,
380 };
381 for status in muc_user.status.into_iter() {
382 if status == Status::SelfPresence {
383 events.push(Event::RoomJoined(from.clone()));
384 break;
385 }
386 }
387 }
388
389 events
390 }
391
392 pub async fn wait_for_events(&mut self) -> Option<Vec<Event>> {
393 if let Some(event) = self.client.next().await {
394 let mut events = Vec::new();
395
396 match event {
397 TokioXmppEvent::Online { resumed: false, .. } => {
398 let presence = Self::make_initial_presence(&self.disco, &self.node).into();
399 let _ = self.client.send_stanza(presence).await;
400 events.push(Event::Online);
401 // TODO: only send this when the ContactList feature is enabled.
402 let iq = Iq::from_get(
403 "roster",
404 Roster {
405 ver: None,
406 items: vec![],
407 },
408 )
409 .into();
410 let _ = self.client.send_stanza(iq).await;
411 // TODO: only send this when the JoinRooms feature is enabled.
412 let iq =
413 Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2))).into();
414 let _ = self.client.send_stanza(iq).await;
415 }
416 TokioXmppEvent::Online { resumed: true, .. } => {}
417 TokioXmppEvent::Disconnected(_) => {
418 events.push(Event::Disconnected);
419 }
420 TokioXmppEvent::Stanza(elem) => {
421 if elem.is("iq", "jabber:client") {
422 let iq = Iq::try_from(elem).unwrap();
423 let new_events = self.handle_iq(iq).await;
424 events.extend(new_events);
425 } else if elem.is("message", "jabber:client") {
426 let message = Message::try_from(elem).unwrap();
427 let new_events = self.handle_message(message).await;
428 events.extend(new_events);
429 } else if elem.is("presence", "jabber:client") {
430 let presence = Presence::try_from(elem).unwrap();
431 let new_events = self.handle_presence(presence).await;
432 events.extend(new_events);
433 } else if elem.is("error", "http://etherx.jabber.org/streams") {
434 println!("Received a fatal stream error: {}", String::from(&elem));
435 } else {
436 panic!("Unknown stanza: {}", String::from(&elem));
437 }
438 }
439 }
440
441 Some(events)
442 } else {
443 None
444 }
445 }
446
447 pub async fn upload_file_with(&mut self, service: &str, path: &Path) {
448 let name = path.file_name().unwrap().to_str().unwrap().to_string();
449 let file = File::open(path).await.unwrap();
450 let size = file.metadata().await.unwrap().len();
451 let slot_request = SlotRequest {
452 filename: name,
453 size: size,
454 content_type: None,
455 };
456 let to = service.parse::<Jid>().unwrap();
457 let request = Iq::from_get("upload1", slot_request).with_to(to.clone());
458 self.uploads
459 .push((String::from("upload1"), to, path.to_path_buf()));
460 self.client.send_stanza(request.into()).await.unwrap();
461 }
462}
463
464async fn handle_upload_result(
465 from: &Jid,
466 iqid: String,
467 elem: Element,
468 agent: &mut Agent,
469) -> impl IntoIterator<Item = Event> {
470 let mut res: Option<(usize, PathBuf)> = None;
471
472 for (i, (id, to, filepath)) in agent.uploads.iter().enumerate() {
473 if to == from && id == &iqid {
474 res = Some((i, filepath.to_path_buf()));
475 break;
476 }
477 }
478
479 if let Some((index, file)) = res {
480 agent.uploads.remove(index);
481 let slot = SlotResult::try_from(elem).unwrap();
482
483 let mut headers = ReqwestHeaderMap::new();
484 for header in slot.put.headers {
485 let (attr, val) = match header {
486 HttpUploadHeader::Authorization(val) => ("Authorization", val),
487 HttpUploadHeader::Cookie(val) => ("Cookie", val),
488 HttpUploadHeader::Expires(val) => ("Expires", val),
489 };
490 headers.insert(attr, val.parse().unwrap());
491 }
492
493 let web = ReqwestClient::new();
494 let stream = FramedRead::new(File::open(file).await.unwrap(), BytesCodec::new());
495 let body = ReqwestBody::wrap_stream(stream);
496 let res = web
497 .put(slot.put.url.as_str())
498 .headers(headers)
499 .body(body)
500 .send()
501 .await
502 .unwrap();
503 if res.status() == 201 {
504 return vec![Event::HttpUploadedFile(slot.get.url)];
505 }
506 }
507
508 return vec![];
509}
510
511#[cfg(test)]
512mod tests {
513 use super::{Agent, ClientBuilder, ClientFeature, ClientType, Event};
514 use tokio_xmpp::AsyncClient as TokioXmppClient;
515
516 #[tokio::test]
517 async fn test_simple() {
518 let client = TokioXmppClient::new("foo@bar", "meh").unwrap();
519
520 // Client instance
521 let client_builder = ClientBuilder::new("foo@bar", "meh")
522 .set_client(ClientType::Bot, "xmpp-rs")
523 .set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
524 .set_default_nick("bot")
525 .enable_feature(ClientFeature::Avatars)
526 .enable_feature(ClientFeature::ContactList);
527
528 let mut agent: Agent = client_builder.build_impl(client).unwrap();
529
530 while let Some(events) = agent.wait_for_events().await {
531 assert!(match events[0] {
532 Event::Disconnected => true,
533 _ => false,
534 });
535 assert_eq!(events.len(), 1);
536 break;
537 }
538 }
539}