1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#![deny(bare_trait_objects)]
8
9use futures::stream::StreamExt;
10use reqwest::{
11 header::HeaderMap as ReqwestHeaderMap, Body as ReqwestBody, Client as ReqwestClient,
12};
13use std::convert::TryFrom;
14use std::path::{Path, PathBuf};
15use std::sync::{Arc, RwLock};
16use tokio::fs::File;
17use tokio_util::codec::{BytesCodec, FramedRead};
18use tokio_xmpp::{AsyncClient as TokioXmppClient, Event as TokioXmppEvent};
19use xmpp_parsers::{
20 bookmarks2::Conference,
21 caps::{compute_disco, hash_caps, Caps},
22 disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
23 hashes::Algo,
24 http_upload::{Header as HttpUploadHeader, SlotRequest, SlotResult},
25 iq::{Iq, IqType},
26 message::{Body, Message, MessageType},
27 muc::{
28 user::{MucUser, Status},
29 Muc,
30 },
31 ns,
32 presence::{Presence, Type as PresenceType},
33 pubsub::pubsub::{Items, PubSub},
34 roster::{Item as RosterItem, Roster},
35 stanza_error::{DefinedCondition, ErrorType, StanzaError},
36 BareJid, Element, FullJid, Jid,
37};
38#[macro_use]
39extern crate log;
40
41mod pubsub;
42
43pub type Error = tokio_xmpp::Error;
44
45#[derive(Debug)]
46pub enum ClientType {
47 Bot,
48 Pc,
49}
50
51impl Default for ClientType {
52 fn default() -> Self {
53 ClientType::Bot
54 }
55}
56
57impl ToString for ClientType {
58 fn to_string(&self) -> String {
59 String::from(match self {
60 ClientType::Bot => "bot",
61 ClientType::Pc => "pc",
62 })
63 }
64}
65
66#[derive(PartialEq)]
67pub enum ClientFeature {
68 #[cfg(feature = "avatars")]
69 Avatars,
70 ContactList,
71 JoinRooms,
72}
73
74pub type Id = Option<String>;
75pub type RoomNick = String;
76
77#[derive(Debug)]
78pub enum Event {
79 Online,
80 Disconnected,
81 ContactAdded(RosterItem),
82 ContactRemoved(RosterItem),
83 ContactChanged(RosterItem),
84 #[cfg(feature = "avatars")]
85 AvatarRetrieved(Jid, String),
86 ChatMessage(Id, BareJid, Body),
87 JoinRoom(BareJid, Conference),
88 LeaveRoom(BareJid),
89 LeaveAllRooms,
90 RoomJoined(BareJid),
91 RoomLeft(BareJid),
92 RoomMessage(Id, BareJid, RoomNick, Body),
93 ServiceMessage(Id, BareJid, Body),
94 HttpUploadedFile(String),
95}
96
97pub struct ClientBuilder<'a> {
98 jid: BareJid,
99 password: &'a str,
100 website: String,
101 default_nick: String,
102 lang: Vec<String>,
103 disco: (ClientType, String),
104 features: Vec<ClientFeature>,
105 resource: Option<String>,
106}
107
108impl ClientBuilder<'_> {
109 pub fn new<'a>(jid: BareJid, password: &'a str) -> ClientBuilder<'a> {
110 ClientBuilder {
111 jid,
112 password,
113 website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
114 default_nick: String::from("xmpp-rs"),
115 lang: vec![String::from("en")],
116 disco: (ClientType::default(), String::from("tokio-xmpp")),
117 features: vec![],
118 resource: None,
119 }
120 }
121
122 /// Optionally set a resource associated to this device on the client
123 pub fn set_resource(mut self, resource: &str) -> Self {
124 self.resource = Some(resource.to_string());
125 self
126 }
127
128 pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
129 self.disco = (type_, String::from(name));
130 self
131 }
132
133 pub fn set_website(mut self, url: &str) -> Self {
134 self.website = String::from(url);
135 self
136 }
137
138 pub fn set_default_nick(mut self, nick: &str) -> Self {
139 self.default_nick = String::from(nick);
140 self
141 }
142
143 pub fn set_lang(mut self, lang: Vec<String>) -> Self {
144 self.lang = lang;
145 self
146 }
147
148 pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
149 self.features.push(feature);
150 self
151 }
152
153 fn make_disco(&self) -> DiscoInfoResult {
154 let identities = vec![Identity::new(
155 "client",
156 self.disco.0.to_string(),
157 "en",
158 self.disco.1.to_string(),
159 )];
160 let mut features = vec![Feature::new(ns::DISCO_INFO)];
161 #[cfg(feature = "avatars")]
162 {
163 if self.features.contains(&ClientFeature::Avatars) {
164 features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
165 }
166 }
167 if self.features.contains(&ClientFeature::JoinRooms) {
168 features.push(Feature::new(format!("{}+notify", ns::BOOKMARKS2)));
169 }
170 DiscoInfoResult {
171 node: None,
172 identities,
173 features,
174 extensions: vec![],
175 }
176 }
177
178 pub fn build(self) -> Agent {
179 let jid: Jid = if let Some(resource) = &self.resource {
180 self.jid.clone().with_resource(resource.to_string()).into()
181 } else {
182 self.jid.clone().into()
183 };
184
185 let client = TokioXmppClient::new(jid, self.password);
186 self.build_impl(client)
187 }
188
189 // This function is meant to be used for testing build
190 pub(crate) fn build_impl(self, client: TokioXmppClient) -> Agent {
191 let disco = self.make_disco();
192 let node = self.website;
193
194 Agent {
195 client,
196 default_nick: Arc::new(RwLock::new(self.default_nick)),
197 lang: Arc::new(self.lang),
198 disco,
199 node,
200 uploads: Vec::new(),
201 }
202 }
203}
204
205pub struct Agent {
206 client: TokioXmppClient,
207 default_nick: Arc<RwLock<String>>,
208 lang: Arc<Vec<String>>,
209 disco: DiscoInfoResult,
210 node: String,
211 uploads: Vec<(String, Jid, PathBuf)>,
212}
213
214impl Agent {
215 pub async fn disconnect(&mut self) -> Result<(), Error> {
216 self.client.send_end().await
217 }
218
219 pub async fn join_room(
220 &mut self,
221 room: BareJid,
222 nick: Option<String>,
223 password: Option<String>,
224 lang: &str,
225 status: &str,
226 ) {
227 let mut muc = Muc::new();
228 if let Some(password) = password {
229 muc = muc.with_password(password);
230 }
231
232 let nick = nick.unwrap_or_else(|| self.default_nick.read().unwrap().clone());
233 let room_jid = room.with_resource(nick);
234 let mut presence = Presence::new(PresenceType::None).with_to(Jid::Full(room_jid));
235 presence.add_payload(muc);
236 presence.set_status(String::from(lang), String::from(status));
237 let _ = self.client.send_stanza(presence.into()).await;
238 }
239
240 pub async fn send_message(
241 &mut self,
242 recipient: Jid,
243 type_: MessageType,
244 lang: &str,
245 text: &str,
246 ) {
247 let mut message = Message::new(Some(recipient));
248 message.type_ = type_;
249 message
250 .bodies
251 .insert(String::from(lang), Body(String::from(text)));
252 let _ = self.client.send_stanza(message.into()).await;
253 }
254
255 fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
256 let caps_data = compute_disco(disco);
257 let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
258 let caps = Caps::new(node, hash);
259
260 let mut presence = Presence::new(PresenceType::None);
261 presence.add_payload(caps);
262 presence
263 }
264
265 async fn handle_iq(&mut self, iq: Iq) -> Vec<Event> {
266 let mut events = vec![];
267 let from = iq
268 .from
269 .clone()
270 .unwrap_or_else(|| self.client.bound_jid().unwrap().clone());
271 if let IqType::Get(payload) = iq.payload {
272 if payload.is("query", ns::DISCO_INFO) {
273 let query = DiscoInfoQuery::try_from(payload);
274 match query {
275 Ok(query) => {
276 let mut disco_info = self.disco.clone();
277 disco_info.node = query.node;
278 let iq = Iq::from_result(iq.id, Some(disco_info))
279 .with_to(iq.from.unwrap())
280 .into();
281 let _ = self.client.send_stanza(iq).await;
282 }
283 Err(err) => {
284 let error = StanzaError::new(
285 ErrorType::Modify,
286 DefinedCondition::BadRequest,
287 "en",
288 &format!("{}", err),
289 );
290 let iq = Iq::from_error(iq.id, error)
291 .with_to(iq.from.unwrap())
292 .into();
293 let _ = self.client.send_stanza(iq).await;
294 }
295 }
296 } else {
297 // We MUST answer unhandled get iqs with a service-unavailable error.
298 let error = StanzaError::new(
299 ErrorType::Cancel,
300 DefinedCondition::ServiceUnavailable,
301 "en",
302 "No handler defined for this kind of iq.",
303 );
304 let iq = Iq::from_error(iq.id, error)
305 .with_to(iq.from.unwrap())
306 .into();
307 let _ = self.client.send_stanza(iq).await;
308 }
309 } else if let IqType::Result(Some(payload)) = iq.payload {
310 // TODO: move private iqs like this one somewhere else, for
311 // security reasons.
312 if payload.is("query", ns::ROSTER) && Some(from.clone()) == iq.from {
313 let roster = Roster::try_from(payload).unwrap();
314 for item in roster.items.into_iter() {
315 events.push(Event::ContactAdded(item));
316 }
317 } else if payload.is("pubsub", ns::PUBSUB) {
318 let new_events = pubsub::handle_iq_result(&from, payload);
319 events.extend(new_events);
320 } else if payload.is("slot", ns::HTTP_UPLOAD) {
321 let new_events = handle_upload_result(&from, iq.id, payload, self).await;
322 events.extend(new_events);
323 }
324 } else if let IqType::Set(_) = iq.payload {
325 // We MUST answer unhandled set iqs with a service-unavailable error.
326 let error = StanzaError::new(
327 ErrorType::Cancel,
328 DefinedCondition::ServiceUnavailable,
329 "en",
330 "No handler defined for this kind of iq.",
331 );
332 let iq = Iq::from_error(iq.id, error)
333 .with_to(iq.from.unwrap())
334 .into();
335 let _ = self.client.send_stanza(iq).await;
336 }
337
338 events
339 }
340
341 async fn handle_message(&mut self, message: Message) -> Vec<Event> {
342 let mut events = vec![];
343 let from = message.from.clone().unwrap();
344 let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect();
345 match message.get_best_body(langs) {
346 Some((_lang, body)) => match message.type_ {
347 MessageType::Groupchat => {
348 let event = match from.clone() {
349 Jid::Full(full) => Event::RoomMessage(
350 message.id.clone(),
351 from.clone().into(),
352 full.resource,
353 body.clone(),
354 ),
355 Jid::Bare(bare) => {
356 Event::ServiceMessage(message.id.clone(), bare, body.clone())
357 }
358 };
359 events.push(event)
360 }
361 MessageType::Chat | MessageType::Normal => {
362 let event =
363 Event::ChatMessage(message.id.clone(), from.clone().into(), body.clone());
364 events.push(event)
365 }
366 _ => (),
367 },
368 None => (),
369 }
370 for child in message.payloads {
371 if child.is("event", ns::PUBSUB_EVENT) {
372 let new_events = pubsub::handle_event(&from, child, self).await;
373 events.extend(new_events);
374 }
375 }
376
377 events
378 }
379
380 async fn handle_presence(&mut self, presence: Presence) -> Vec<Event> {
381 let mut events = vec![];
382 let from: BareJid = match presence.from.clone().unwrap() {
383 Jid::Full(FullJid { node, domain, .. }) => BareJid { node, domain },
384 Jid::Bare(bare) => bare,
385 };
386 for payload in presence.payloads.into_iter() {
387 let muc_user = match MucUser::try_from(payload) {
388 Ok(muc_user) => muc_user,
389 _ => continue,
390 };
391 for status in muc_user.status.into_iter() {
392 if status == Status::SelfPresence {
393 events.push(Event::RoomJoined(from.clone()));
394 break;
395 }
396 }
397 }
398
399 events
400 }
401
402 pub async fn wait_for_events(&mut self) -> Option<Vec<Event>> {
403 if let Some(event) = self.client.next().await {
404 let mut events = Vec::new();
405
406 match event {
407 TokioXmppEvent::Online { resumed: false, .. } => {
408 let presence = Self::make_initial_presence(&self.disco, &self.node).into();
409 let _ = self.client.send_stanza(presence).await;
410 events.push(Event::Online);
411 // TODO: only send this when the ContactList feature is enabled.
412 let iq = Iq::from_get(
413 "roster",
414 Roster {
415 ver: None,
416 items: vec![],
417 },
418 )
419 .into();
420 let _ = self.client.send_stanza(iq).await;
421 // TODO: only send this when the JoinRooms feature is enabled.
422 let iq =
423 Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2))).into();
424 let _ = self.client.send_stanza(iq).await;
425 }
426 TokioXmppEvent::Online { resumed: true, .. } => {}
427 TokioXmppEvent::Disconnected(_) => {
428 events.push(Event::Disconnected);
429 }
430 TokioXmppEvent::Stanza(elem) => {
431 if elem.is("iq", "jabber:client") {
432 let iq = Iq::try_from(elem).unwrap();
433 let new_events = self.handle_iq(iq).await;
434 events.extend(new_events);
435 } else if elem.is("message", "jabber:client") {
436 let message = Message::try_from(elem).unwrap();
437 let new_events = self.handle_message(message).await;
438 events.extend(new_events);
439 } else if elem.is("presence", "jabber:client") {
440 let presence = Presence::try_from(elem).unwrap();
441 let new_events = self.handle_presence(presence).await;
442 events.extend(new_events);
443 } else if elem.is("error", "http://etherx.jabber.org/streams") {
444 println!("Received a fatal stream error: {}", String::from(&elem));
445 } else {
446 panic!("Unknown stanza: {}", String::from(&elem));
447 }
448 }
449 }
450
451 Some(events)
452 } else {
453 None
454 }
455 }
456
457 pub async fn upload_file_with(&mut self, service: &str, path: &Path) {
458 let name = path.file_name().unwrap().to_str().unwrap().to_string();
459 let file = File::open(path).await.unwrap();
460 let size = file.metadata().await.unwrap().len();
461 let slot_request = SlotRequest {
462 filename: name,
463 size: size,
464 content_type: None,
465 };
466 let to = service.parse::<Jid>().unwrap();
467 let request = Iq::from_get("upload1", slot_request).with_to(to.clone());
468 self.uploads
469 .push((String::from("upload1"), to, path.to_path_buf()));
470 self.client.send_stanza(request.into()).await.unwrap();
471 }
472}
473
474async fn handle_upload_result(
475 from: &Jid,
476 iqid: String,
477 elem: Element,
478 agent: &mut Agent,
479) -> impl IntoIterator<Item = Event> {
480 let mut res: Option<(usize, PathBuf)> = None;
481
482 for (i, (id, to, filepath)) in agent.uploads.iter().enumerate() {
483 if to == from && id == &iqid {
484 res = Some((i, filepath.to_path_buf()));
485 break;
486 }
487 }
488
489 if let Some((index, file)) = res {
490 agent.uploads.remove(index);
491 let slot = SlotResult::try_from(elem).unwrap();
492
493 let mut headers = ReqwestHeaderMap::new();
494 for header in slot.put.headers {
495 let (attr, val) = match header {
496 HttpUploadHeader::Authorization(val) => ("Authorization", val),
497 HttpUploadHeader::Cookie(val) => ("Cookie", val),
498 HttpUploadHeader::Expires(val) => ("Expires", val),
499 };
500 headers.insert(attr, val.parse().unwrap());
501 }
502
503 let web = ReqwestClient::new();
504 let stream = FramedRead::new(File::open(file).await.unwrap(), BytesCodec::new());
505 let body = ReqwestBody::wrap_stream(stream);
506 let res = web
507 .put(slot.put.url.as_str())
508 .headers(headers)
509 .body(body)
510 .send()
511 .await
512 .unwrap();
513 if res.status() == 201 {
514 return vec![Event::HttpUploadedFile(slot.get.url)];
515 }
516 }
517
518 return vec![];
519}
520
521#[cfg(test)]
522mod tests {
523 use super::{Agent, BareJid, ClientBuilder, ClientFeature, ClientType, Event};
524 use std::str::FromStr;
525 use tokio_xmpp::AsyncClient as TokioXmppClient;
526
527 #[tokio::test]
528 async fn test_simple() {
529 let jid = BareJid::from_str("foo@bar").unwrap();
530
531 let client = TokioXmppClient::new(jid.clone(), "meh");
532
533 // Client instance
534 let client_builder = ClientBuilder::new(jid, "meh")
535 .set_client(ClientType::Bot, "xmpp-rs")
536 .set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
537 .set_default_nick("bot")
538 .enable_feature(ClientFeature::Avatars)
539 .enable_feature(ClientFeature::ContactList);
540
541 let mut agent: Agent = client_builder.build_impl(client);
542
543 while let Some(events) = agent.wait_for_events().await {
544 assert!(match events[0] {
545 Event::Disconnected => true,
546 _ => false,
547 });
548 assert_eq!(events.len(), 1);
549 break;
550 }
551 }
552}