1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#![deny(bare_trait_objects)]
8
9use futures::stream::StreamExt;
10use reqwest::{
11 header::HeaderMap as ReqwestHeaderMap, Body as ReqwestBody, Client as ReqwestClient,
12};
13use std::convert::TryFrom;
14use std::path::{Path, PathBuf};
15use std::sync::{Arc, RwLock};
16use tokio::fs::File;
17use tokio_util::codec::{BytesCodec, FramedRead};
18pub use tokio_xmpp::parsers;
19use tokio_xmpp::parsers::{
20 bookmarks2::Conference,
21 caps::{compute_disco, hash_caps, Caps},
22 disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
23 hashes::Algo,
24 http_upload::{Header as HttpUploadHeader, SlotRequest, SlotResult},
25 iq::{Iq, IqType},
26 message::{Body, Message, MessageType},
27 muc::{
28 user::{MucUser, Status},
29 Muc,
30 },
31 ns,
32 presence::{Presence, Type as PresenceType},
33 pubsub::pubsub::{Items, PubSub},
34 roster::{Item as RosterItem, Roster},
35 stanza_error::{DefinedCondition, ErrorType, StanzaError},
36};
37use tokio_xmpp::{AsyncClient as TokioXmppClient, Event as TokioXmppEvent};
38pub use tokio_xmpp::{BareJid, Element, FullJid, Jid};
39#[macro_use]
40extern crate log;
41
42mod pubsub;
43
44pub type Error = tokio_xmpp::Error;
45
46#[derive(Debug)]
47pub enum ClientType {
48 Bot,
49 Pc,
50}
51
52impl Default for ClientType {
53 fn default() -> Self {
54 ClientType::Bot
55 }
56}
57
58impl ToString for ClientType {
59 fn to_string(&self) -> String {
60 String::from(match self {
61 ClientType::Bot => "bot",
62 ClientType::Pc => "pc",
63 })
64 }
65}
66
67#[derive(PartialEq)]
68pub enum ClientFeature {
69 #[cfg(feature = "avatars")]
70 Avatars,
71 ContactList,
72 JoinRooms,
73}
74
75pub type Id = Option<String>;
76pub type RoomNick = String;
77
78#[derive(Debug)]
79pub enum Event {
80 Online,
81 Disconnected,
82 ContactAdded(RosterItem),
83 ContactRemoved(RosterItem),
84 ContactChanged(RosterItem),
85 #[cfg(feature = "avatars")]
86 AvatarRetrieved(Jid, String),
87 ChatMessage(Id, BareJid, Body),
88 JoinRoom(BareJid, Conference),
89 LeaveRoom(BareJid),
90 LeaveAllRooms,
91 RoomJoined(BareJid),
92 RoomLeft(BareJid),
93 RoomMessage(Id, BareJid, RoomNick, Body),
94 /// A private message received from a room, containing the message ID, the room's BareJid,
95 /// the sender's nickname, and the message body.
96 RoomPrivateMessage(Id, BareJid, RoomNick, Body),
97 ServiceMessage(Id, BareJid, Body),
98 HttpUploadedFile(String),
99}
100
101pub struct ClientBuilder<'a> {
102 jid: BareJid,
103 password: &'a str,
104 website: String,
105 default_nick: String,
106 lang: Vec<String>,
107 disco: (ClientType, String),
108 features: Vec<ClientFeature>,
109 resource: Option<String>,
110}
111
112impl ClientBuilder<'_> {
113 pub fn new<'a>(jid: BareJid, password: &'a str) -> ClientBuilder<'a> {
114 ClientBuilder {
115 jid,
116 password,
117 website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
118 default_nick: String::from("xmpp-rs"),
119 lang: vec![String::from("en")],
120 disco: (ClientType::default(), String::from("tokio-xmpp")),
121 features: vec![],
122 resource: None,
123 }
124 }
125
126 /// Optionally set a resource associated to this device on the client
127 pub fn set_resource(mut self, resource: &str) -> Self {
128 self.resource = Some(resource.to_string());
129 self
130 }
131
132 pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
133 self.disco = (type_, String::from(name));
134 self
135 }
136
137 pub fn set_website(mut self, url: &str) -> Self {
138 self.website = String::from(url);
139 self
140 }
141
142 pub fn set_default_nick(mut self, nick: &str) -> Self {
143 self.default_nick = String::from(nick);
144 self
145 }
146
147 pub fn set_lang(mut self, lang: Vec<String>) -> Self {
148 self.lang = lang;
149 self
150 }
151
152 pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
153 self.features.push(feature);
154 self
155 }
156
157 fn make_disco(&self) -> DiscoInfoResult {
158 let identities = vec![Identity::new(
159 "client",
160 self.disco.0.to_string(),
161 "en",
162 self.disco.1.to_string(),
163 )];
164 let mut features = vec![Feature::new(ns::DISCO_INFO)];
165 #[cfg(feature = "avatars")]
166 {
167 if self.features.contains(&ClientFeature::Avatars) {
168 features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
169 }
170 }
171 if self.features.contains(&ClientFeature::JoinRooms) {
172 features.push(Feature::new(format!("{}+notify", ns::BOOKMARKS2)));
173 }
174 DiscoInfoResult {
175 node: None,
176 identities,
177 features,
178 extensions: vec![],
179 }
180 }
181
182 pub fn build(self) -> Agent {
183 let jid: Jid = if let Some(resource) = &self.resource {
184 self.jid.with_resource_str(resource).unwrap().into()
185 } else {
186 self.jid.clone().into()
187 };
188
189 let client = TokioXmppClient::new(jid, self.password);
190 self.build_impl(client)
191 }
192
193 // This function is meant to be used for testing build
194 pub(crate) fn build_impl(self, client: TokioXmppClient) -> Agent {
195 let disco = self.make_disco();
196 let node = self.website;
197
198 Agent {
199 client,
200 default_nick: Arc::new(RwLock::new(self.default_nick)),
201 lang: Arc::new(self.lang),
202 disco,
203 node,
204 uploads: Vec::new(),
205 }
206 }
207}
208
209pub struct Agent {
210 client: TokioXmppClient,
211 default_nick: Arc<RwLock<String>>,
212 lang: Arc<Vec<String>>,
213 disco: DiscoInfoResult,
214 node: String,
215 uploads: Vec<(String, Jid, PathBuf)>,
216}
217
218impl Agent {
219 pub async fn disconnect(&mut self) -> Result<(), Error> {
220 self.client.send_end().await
221 }
222
223 pub async fn join_room(
224 &mut self,
225 room: BareJid,
226 nick: Option<String>,
227 password: Option<String>,
228 lang: &str,
229 status: &str,
230 ) {
231 let mut muc = Muc::new();
232 if let Some(password) = password {
233 muc = muc.with_password(password);
234 }
235
236 let nick = nick.unwrap_or_else(|| self.default_nick.read().unwrap().clone());
237 let room_jid = room.with_resource_str(&nick).unwrap();
238 let mut presence = Presence::new(PresenceType::None).with_to(room_jid);
239 presence.add_payload(muc);
240 presence.set_status(String::from(lang), String::from(status));
241 let _ = self.client.send_stanza(presence.into()).await;
242 }
243
244 pub async fn send_message(
245 &mut self,
246 recipient: Jid,
247 type_: MessageType,
248 lang: &str,
249 text: &str,
250 ) {
251 let mut message = Message::new(Some(recipient));
252 message.type_ = type_;
253 message
254 .bodies
255 .insert(String::from(lang), Body(String::from(text)));
256 let _ = self.client.send_stanza(message.into()).await;
257 }
258
259 pub async fn send_room_private_message(
260 &mut self,
261 room: BareJid,
262 recipient: RoomNick,
263 lang: &str,
264 text: &str,
265 ) {
266 let recipient: Jid = room.with_resource_str(&recipient).unwrap().into();
267 let mut message = Message::new(recipient).with_payload(MucUser::new());
268 message.type_ = MessageType::Chat;
269 message
270 .bodies
271 .insert(String::from(lang), Body(String::from(text)));
272 let _ = self.client.send_stanza(message.into()).await;
273 }
274
275 fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
276 let caps_data = compute_disco(disco);
277 let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
278 let caps = Caps::new(node, hash);
279
280 let mut presence = Presence::new(PresenceType::None);
281 presence.add_payload(caps);
282 presence
283 }
284
285 async fn handle_iq(&mut self, iq: Iq) -> Vec<Event> {
286 let mut events = vec![];
287 let from = iq
288 .from
289 .clone()
290 .unwrap_or_else(|| self.client.bound_jid().unwrap().clone());
291 if let IqType::Get(payload) = iq.payload {
292 if payload.is("query", ns::DISCO_INFO) {
293 let query = DiscoInfoQuery::try_from(payload);
294 match query {
295 Ok(query) => {
296 let mut disco_info = self.disco.clone();
297 disco_info.node = query.node;
298 let iq = Iq::from_result(iq.id, Some(disco_info))
299 .with_to(iq.from.unwrap())
300 .into();
301 let _ = self.client.send_stanza(iq).await;
302 }
303 Err(err) => {
304 let error = StanzaError::new(
305 ErrorType::Modify,
306 DefinedCondition::BadRequest,
307 "en",
308 &format!("{}", err),
309 );
310 let iq = Iq::from_error(iq.id, error)
311 .with_to(iq.from.unwrap())
312 .into();
313 let _ = self.client.send_stanza(iq).await;
314 }
315 }
316 } else {
317 // We MUST answer unhandled get iqs with a service-unavailable error.
318 let error = StanzaError::new(
319 ErrorType::Cancel,
320 DefinedCondition::ServiceUnavailable,
321 "en",
322 "No handler defined for this kind of iq.",
323 );
324 let iq = Iq::from_error(iq.id, error)
325 .with_to(iq.from.unwrap())
326 .into();
327 let _ = self.client.send_stanza(iq).await;
328 }
329 } else if let IqType::Result(Some(payload)) = iq.payload {
330 // TODO: move private iqs like this one somewhere else, for
331 // security reasons.
332 if payload.is("query", ns::ROSTER) && Some(from.clone()) == iq.from {
333 let roster = Roster::try_from(payload).unwrap();
334 for item in roster.items.into_iter() {
335 events.push(Event::ContactAdded(item));
336 }
337 } else if payload.is("pubsub", ns::PUBSUB) {
338 let new_events = pubsub::handle_iq_result(&from, payload);
339 events.extend(new_events);
340 } else if payload.is("slot", ns::HTTP_UPLOAD) {
341 let new_events = handle_upload_result(&from, iq.id, payload, self).await;
342 events.extend(new_events);
343 }
344 } else if let IqType::Set(_) = iq.payload {
345 // We MUST answer unhandled set iqs with a service-unavailable error.
346 let error = StanzaError::new(
347 ErrorType::Cancel,
348 DefinedCondition::ServiceUnavailable,
349 "en",
350 "No handler defined for this kind of iq.",
351 );
352 let iq = Iq::from_error(iq.id, error)
353 .with_to(iq.from.unwrap())
354 .into();
355 let _ = self.client.send_stanza(iq).await;
356 }
357
358 events
359 }
360
361 async fn handle_message(&mut self, message: Message) -> Vec<Event> {
362 let mut events = vec![];
363 let from = message.from.clone().unwrap();
364 let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect();
365 match message.get_best_body(langs) {
366 Some((_lang, body)) => match message.type_ {
367 MessageType::Groupchat => {
368 let event = match from.clone() {
369 Jid::Full(full) => Event::RoomMessage(
370 message.id.clone(),
371 from.to_bare(),
372 full.resource_str().to_owned(),
373 body.clone(),
374 ),
375 Jid::Bare(bare) => {
376 Event::ServiceMessage(message.id.clone(), bare, body.clone())
377 }
378 };
379 events.push(event)
380 }
381 MessageType::Chat | MessageType::Normal => {
382 let mut found_special_message = false;
383
384 for payload in &message.payloads {
385 if let Ok(_) = MucUser::try_from(payload.clone()) {
386 let event = match from.clone() {
387 Jid::Bare(bare) => {
388 // TODO: Can a service message be of type Chat/Normal and not Groupchat?
389 warn!("Received misformed MessageType::Chat in muc#user namespace from a bare JID.");
390 Event::ServiceMessage(message.id.clone(), bare, body.clone())
391 }
392 Jid::Full(full) => Event::RoomPrivateMessage(
393 message.id.clone(),
394 full.to_bare(),
395 full.resource_str().to_owned(),
396 body.clone(),
397 ),
398 };
399
400 found_special_message = true;
401 events.push(event);
402 }
403 }
404
405 if !found_special_message {
406 let event =
407 Event::ChatMessage(message.id.clone(), from.to_bare(), body.clone());
408 events.push(event)
409 }
410 }
411 _ => (),
412 },
413 None => (),
414 }
415 for child in message.payloads {
416 if child.is("event", ns::PUBSUB_EVENT) {
417 let new_events = pubsub::handle_event(&from, child, self).await;
418 events.extend(new_events);
419 }
420 }
421
422 events
423 }
424
425 async fn handle_presence(&mut self, presence: Presence) -> Vec<Event> {
426 let mut events = vec![];
427 let from = presence.from.unwrap().to_bare();
428 for payload in presence.payloads.into_iter() {
429 let muc_user = match MucUser::try_from(payload) {
430 Ok(muc_user) => muc_user,
431 _ => continue,
432 };
433 for status in muc_user.status.into_iter() {
434 if status == Status::SelfPresence {
435 events.push(Event::RoomJoined(from.clone()));
436 break;
437 }
438 }
439 }
440
441 events
442 }
443
444 pub async fn wait_for_events(&mut self) -> Option<Vec<Event>> {
445 if let Some(event) = self.client.next().await {
446 let mut events = Vec::new();
447
448 match event {
449 TokioXmppEvent::Online { resumed: false, .. } => {
450 let presence = Self::make_initial_presence(&self.disco, &self.node).into();
451 let _ = self.client.send_stanza(presence).await;
452 events.push(Event::Online);
453 // TODO: only send this when the ContactList feature is enabled.
454 let iq = Iq::from_get(
455 "roster",
456 Roster {
457 ver: None,
458 items: vec![],
459 },
460 )
461 .into();
462 let _ = self.client.send_stanza(iq).await;
463 // TODO: only send this when the JoinRooms feature is enabled.
464 let iq =
465 Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2))).into();
466 let _ = self.client.send_stanza(iq).await;
467 }
468 TokioXmppEvent::Online { resumed: true, .. } => {}
469 TokioXmppEvent::Disconnected(_) => {
470 events.push(Event::Disconnected);
471 }
472 TokioXmppEvent::Stanza(elem) => {
473 if elem.is("iq", "jabber:client") {
474 let iq = Iq::try_from(elem).unwrap();
475 let new_events = self.handle_iq(iq).await;
476 events.extend(new_events);
477 } else if elem.is("message", "jabber:client") {
478 let message = Message::try_from(elem).unwrap();
479 let new_events = self.handle_message(message).await;
480 events.extend(new_events);
481 } else if elem.is("presence", "jabber:client") {
482 let presence = Presence::try_from(elem).unwrap();
483 let new_events = self.handle_presence(presence).await;
484 events.extend(new_events);
485 } else if elem.is("error", "http://etherx.jabber.org/streams") {
486 println!("Received a fatal stream error: {}", String::from(&elem));
487 } else {
488 panic!("Unknown stanza: {}", String::from(&elem));
489 }
490 }
491 }
492
493 Some(events)
494 } else {
495 None
496 }
497 }
498
499 pub async fn upload_file_with(&mut self, service: &str, path: &Path) {
500 let name = path.file_name().unwrap().to_str().unwrap().to_string();
501 let file = File::open(path).await.unwrap();
502 let size = file.metadata().await.unwrap().len();
503 let slot_request = SlotRequest {
504 filename: name,
505 size: size,
506 content_type: None,
507 };
508 let to = service.parse::<Jid>().unwrap();
509 let request = Iq::from_get("upload1", slot_request).with_to(to.clone());
510 self.uploads
511 .push((String::from("upload1"), to, path.to_path_buf()));
512 self.client.send_stanza(request.into()).await.unwrap();
513 }
514}
515
516async fn handle_upload_result(
517 from: &Jid,
518 iqid: String,
519 elem: Element,
520 agent: &mut Agent,
521) -> impl IntoIterator<Item = Event> {
522 let mut res: Option<(usize, PathBuf)> = None;
523
524 for (i, (id, to, filepath)) in agent.uploads.iter().enumerate() {
525 if to == from && id == &iqid {
526 res = Some((i, filepath.to_path_buf()));
527 break;
528 }
529 }
530
531 if let Some((index, file)) = res {
532 agent.uploads.remove(index);
533 let slot = SlotResult::try_from(elem).unwrap();
534
535 let mut headers = ReqwestHeaderMap::new();
536 for header in slot.put.headers {
537 let (attr, val) = match header {
538 HttpUploadHeader::Authorization(val) => ("Authorization", val),
539 HttpUploadHeader::Cookie(val) => ("Cookie", val),
540 HttpUploadHeader::Expires(val) => ("Expires", val),
541 };
542 headers.insert(attr, val.parse().unwrap());
543 }
544
545 let web = ReqwestClient::new();
546 let stream = FramedRead::new(File::open(file).await.unwrap(), BytesCodec::new());
547 let body = ReqwestBody::wrap_stream(stream);
548 let res = web
549 .put(slot.put.url.as_str())
550 .headers(headers)
551 .body(body)
552 .send()
553 .await
554 .unwrap();
555 if res.status() == 201 {
556 return vec![Event::HttpUploadedFile(slot.get.url)];
557 }
558 }
559
560 return vec![];
561}
562
563#[cfg(test)]
564mod tests {
565 use super::{Agent, BareJid, ClientBuilder, ClientFeature, ClientType, Event};
566 use std::str::FromStr;
567 use tokio_xmpp::AsyncClient as TokioXmppClient;
568
569 #[tokio::test]
570 async fn test_simple() {
571 let jid = BareJid::from_str("foo@bar").unwrap();
572
573 let client = TokioXmppClient::new(jid.clone(), "meh");
574
575 // Client instance
576 let client_builder = ClientBuilder::new(jid, "meh")
577 .set_client(ClientType::Bot, "xmpp-rs")
578 .set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
579 .set_default_nick("bot")
580 .enable_feature(ClientFeature::ContactList);
581
582 #[cfg(feature = "avatars")]
583 let client_builder = client_builder.enable_feature(ClientFeature::Avatars);
584
585 let mut agent: Agent = client_builder.build_impl(client);
586
587 while let Some(events) = agent.wait_for_events().await {
588 assert!(match events[0] {
589 Event::Disconnected => true,
590 _ => false,
591 });
592 assert_eq!(events.len(), 1);
593 break;
594 }
595 }
596}