1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#![deny(bare_trait_objects)]
8
9use futures::stream::StreamExt;
10use reqwest::{
11 header::HeaderMap as ReqwestHeaderMap, Body as ReqwestBody, Client as ReqwestClient,
12};
13use std::convert::TryFrom;
14use std::path::{Path, PathBuf};
15use std::sync::{Arc, RwLock};
16use tokio::fs::File;
17use tokio_util::codec::{BytesCodec, FramedRead};
18pub use tokio_xmpp::parsers;
19use tokio_xmpp::parsers::{
20 bookmarks2::Conference,
21 caps::{compute_disco, hash_caps, Caps},
22 disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
23 hashes::Algo,
24 http_upload::{Header as HttpUploadHeader, SlotRequest, SlotResult},
25 iq::{Iq, IqType},
26 message::{Body, Message, MessageType},
27 muc::{
28 user::{MucUser, Status},
29 Muc,
30 },
31 ns,
32 presence::{Presence, Type as PresenceType},
33 pubsub::pubsub::{Items, PubSub},
34 roster::{Item as RosterItem, Roster},
35 stanza_error::{DefinedCondition, ErrorType, StanzaError},
36};
37use tokio_xmpp::{AsyncClient as TokioXmppClient, Event as TokioXmppEvent};
38pub use tokio_xmpp::{BareJid, Element, FullJid, Jid};
39#[macro_use]
40extern crate log;
41
42mod pubsub;
43
44pub type Error = tokio_xmpp::Error;
45
46#[derive(Debug)]
47pub enum ClientType {
48 Bot,
49 Pc,
50}
51
52impl Default for ClientType {
53 fn default() -> Self {
54 ClientType::Bot
55 }
56}
57
58impl ToString for ClientType {
59 fn to_string(&self) -> String {
60 String::from(match self {
61 ClientType::Bot => "bot",
62 ClientType::Pc => "pc",
63 })
64 }
65}
66
67#[derive(PartialEq)]
68pub enum ClientFeature {
69 #[cfg(feature = "avatars")]
70 Avatars,
71 ContactList,
72 JoinRooms,
73}
74
75pub type Id = Option<String>;
76pub type RoomNick = String;
77
78#[derive(Debug)]
79pub enum Event {
80 Online,
81 Disconnected,
82 ContactAdded(RosterItem),
83 ContactRemoved(RosterItem),
84 ContactChanged(RosterItem),
85 #[cfg(feature = "avatars")]
86 AvatarRetrieved(Jid, String),
87 ChatMessage(Id, BareJid, Body),
88 JoinRoom(BareJid, Conference),
89 LeaveRoom(BareJid),
90 LeaveAllRooms,
91 RoomJoined(BareJid),
92 RoomLeft(BareJid),
93 RoomMessage(Id, BareJid, RoomNick, Body),
94 /// A private message received from a room, containing the message ID, the room's BareJid,
95 /// the sender's nickname, and the message body.
96 RoomPrivateMessage(Id, BareJid, RoomNick, Body),
97 ServiceMessage(Id, BareJid, Body),
98 HttpUploadedFile(String),
99}
100
101pub struct ClientBuilder<'a> {
102 jid: BareJid,
103 password: &'a str,
104 website: String,
105 default_nick: String,
106 lang: Vec<String>,
107 disco: (ClientType, String),
108 features: Vec<ClientFeature>,
109 resource: Option<String>,
110}
111
112impl ClientBuilder<'_> {
113 pub fn new<'a>(jid: BareJid, password: &'a str) -> ClientBuilder<'a> {
114 ClientBuilder {
115 jid,
116 password,
117 website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
118 default_nick: String::from("xmpp-rs"),
119 lang: vec![String::from("en")],
120 disco: (ClientType::default(), String::from("tokio-xmpp")),
121 features: vec![],
122 resource: None,
123 }
124 }
125
126 /// Optionally set a resource associated to this device on the client
127 pub fn set_resource(mut self, resource: &str) -> Self {
128 self.resource = Some(resource.to_string());
129 self
130 }
131
132 pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
133 self.disco = (type_, String::from(name));
134 self
135 }
136
137 pub fn set_website(mut self, url: &str) -> Self {
138 self.website = String::from(url);
139 self
140 }
141
142 pub fn set_default_nick(mut self, nick: &str) -> Self {
143 self.default_nick = String::from(nick);
144 self
145 }
146
147 pub fn set_lang(mut self, lang: Vec<String>) -> Self {
148 self.lang = lang;
149 self
150 }
151
152 pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
153 self.features.push(feature);
154 self
155 }
156
157 fn make_disco(&self) -> DiscoInfoResult {
158 let identities = vec![Identity::new(
159 "client",
160 self.disco.0.to_string(),
161 "en",
162 self.disco.1.to_string(),
163 )];
164 let mut features = vec![Feature::new(ns::DISCO_INFO)];
165 #[cfg(feature = "avatars")]
166 {
167 if self.features.contains(&ClientFeature::Avatars) {
168 features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
169 }
170 }
171 if self.features.contains(&ClientFeature::JoinRooms) {
172 features.push(Feature::new(format!("{}+notify", ns::BOOKMARKS2)));
173 }
174 DiscoInfoResult {
175 node: None,
176 identities,
177 features,
178 extensions: vec![],
179 }
180 }
181
182 pub fn build(self) -> Agent {
183 let jid: Jid = if let Some(resource) = &self.resource {
184 self.jid.with_resource_str(resource).unwrap().into()
185 } else {
186 self.jid.clone().into()
187 };
188
189 let client = TokioXmppClient::new(jid, self.password);
190 self.build_impl(client)
191 }
192
193 // This function is meant to be used for testing build
194 pub(crate) fn build_impl(self, client: TokioXmppClient) -> Agent {
195 let disco = self.make_disco();
196 let node = self.website;
197
198 Agent {
199 client,
200 default_nick: Arc::new(RwLock::new(self.default_nick)),
201 lang: Arc::new(self.lang),
202 disco,
203 node,
204 uploads: Vec::new(),
205 }
206 }
207}
208
209pub struct Agent {
210 client: TokioXmppClient,
211 default_nick: Arc<RwLock<String>>,
212 lang: Arc<Vec<String>>,
213 disco: DiscoInfoResult,
214 node: String,
215 uploads: Vec<(String, Jid, PathBuf)>,
216}
217
218impl Agent {
219 pub async fn disconnect(&mut self) -> Result<(), Error> {
220 self.client.send_end().await
221 }
222
223 pub async fn join_room(
224 &mut self,
225 room: BareJid,
226 nick: Option<String>,
227 password: Option<String>,
228 lang: &str,
229 status: &str,
230 ) {
231 let mut muc = Muc::new();
232 if let Some(password) = password {
233 muc = muc.with_password(password);
234 }
235
236 let nick = nick.unwrap_or_else(|| self.default_nick.read().unwrap().clone());
237 let room_jid = room.with_resource_str(&nick).unwrap();
238 let mut presence = Presence::new(PresenceType::None).with_to(room_jid);
239 presence.add_payload(muc);
240 presence.set_status(String::from(lang), String::from(status));
241 let _ = self.client.send_stanza(presence.into()).await;
242 }
243
244 /// Send a "leave room" request to the server (specifically, an "unavailable" presence stanza).
245 ///
246 /// The returned future will resolve when the request has been sent,
247 /// not when the room has actually been left.
248 ///
249 /// If successful, a `RoomLeft` event should be received later as a confirmation.
250 ///
251 /// See: https://xmpp.org/extensions/xep-0045.html#exit
252 ///
253 /// Note that this method does NOT remove the room from the auto-join list; the latter
254 /// is more a list of bookmarks that the account knows about and that have a flag set
255 /// to indicate that they should be joined automatically after connecting (see the JoinRoom event).
256 ///
257 /// Regarding the latter, see the these minutes about auto-join behavior:
258 /// https://docs.modernxmpp.org/meetings/2019-01-brussels/#bookmarks
259 ///
260 /// # Arguments
261 ///
262 /// * `room_jid`: The JID of the room to leave.
263 /// * `nickname`: The nickname to use in the room.
264 /// * `lang`: The language of the status message.
265 /// * `status`: The status message to send.
266 pub async fn leave_room(
267 &mut self,
268 room_jid: BareJid,
269 nickname: RoomNick,
270 lang: impl Into<String>,
271 status: impl Into<String>,
272 ) {
273 // XEP-0045 specifies that, to leave a room, the client must send a presence stanza
274 // with type="unavailable".
275 let mut presence = Presence::new(PresenceType::Unavailable).with_to(
276 room_jid
277 .with_resource_str(nickname.as_str())
278 .expect("Invalid room JID after adding resource part."),
279 );
280
281 // Optionally, the client may include a status message in the presence stanza.
282 // TODO: Should this be optional? The XEP says "MAY", but the method signature requires the arguments.
283 // XEP-0045: "The occupant MAY include normal <status/> information in the unavailable presence stanzas"
284 presence.set_status(lang, status);
285
286 // Send the presence stanza.
287 if let Err(e) = self.client.send_stanza(presence.into()).await {
288 // Report any errors to the log.
289 error!("Failed to send leave room presence: {}", e);
290 }
291 }
292
293 pub async fn send_message(
294 &mut self,
295 recipient: Jid,
296 type_: MessageType,
297 lang: &str,
298 text: &str,
299 ) {
300 let mut message = Message::new(Some(recipient));
301 message.type_ = type_;
302 message
303 .bodies
304 .insert(String::from(lang), Body(String::from(text)));
305 let _ = self.client.send_stanza(message.into()).await;
306 }
307
308 pub async fn send_room_private_message(
309 &mut self,
310 room: BareJid,
311 recipient: RoomNick,
312 lang: &str,
313 text: &str,
314 ) {
315 let recipient: Jid = room.with_resource_str(&recipient).unwrap().into();
316 let mut message = Message::new(recipient).with_payload(MucUser::new());
317 message.type_ = MessageType::Chat;
318 message
319 .bodies
320 .insert(String::from(lang), Body(String::from(text)));
321 let _ = self.client.send_stanza(message.into()).await;
322 }
323
324 fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
325 let caps_data = compute_disco(disco);
326 let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
327 let caps = Caps::new(node, hash);
328
329 let mut presence = Presence::new(PresenceType::None);
330 presence.add_payload(caps);
331 presence
332 }
333
334 async fn handle_iq(&mut self, iq: Iq) -> Vec<Event> {
335 let mut events = vec![];
336 let from = iq
337 .from
338 .clone()
339 .unwrap_or_else(|| self.client.bound_jid().unwrap().clone());
340 if let IqType::Get(payload) = iq.payload {
341 if payload.is("query", ns::DISCO_INFO) {
342 let query = DiscoInfoQuery::try_from(payload);
343 match query {
344 Ok(query) => {
345 let mut disco_info = self.disco.clone();
346 disco_info.node = query.node;
347 let iq = Iq::from_result(iq.id, Some(disco_info))
348 .with_to(iq.from.unwrap())
349 .into();
350 let _ = self.client.send_stanza(iq).await;
351 }
352 Err(err) => {
353 let error = StanzaError::new(
354 ErrorType::Modify,
355 DefinedCondition::BadRequest,
356 "en",
357 &format!("{}", err),
358 );
359 let iq = Iq::from_error(iq.id, error)
360 .with_to(iq.from.unwrap())
361 .into();
362 let _ = self.client.send_stanza(iq).await;
363 }
364 }
365 } else {
366 // We MUST answer unhandled get iqs with a service-unavailable error.
367 let error = StanzaError::new(
368 ErrorType::Cancel,
369 DefinedCondition::ServiceUnavailable,
370 "en",
371 "No handler defined for this kind of iq.",
372 );
373 let iq = Iq::from_error(iq.id, error)
374 .with_to(iq.from.unwrap())
375 .into();
376 let _ = self.client.send_stanza(iq).await;
377 }
378 } else if let IqType::Result(Some(payload)) = iq.payload {
379 // TODO: move private iqs like this one somewhere else, for
380 // security reasons.
381 if payload.is("query", ns::ROSTER) && Some(from.clone()) == iq.from {
382 let roster = Roster::try_from(payload).unwrap();
383 for item in roster.items.into_iter() {
384 events.push(Event::ContactAdded(item));
385 }
386 } else if payload.is("pubsub", ns::PUBSUB) {
387 let new_events = pubsub::handle_iq_result(&from, payload);
388 events.extend(new_events);
389 } else if payload.is("slot", ns::HTTP_UPLOAD) {
390 let new_events = handle_upload_result(&from, iq.id, payload, self).await;
391 events.extend(new_events);
392 }
393 } else if let IqType::Set(_) = iq.payload {
394 // We MUST answer unhandled set iqs with a service-unavailable error.
395 let error = StanzaError::new(
396 ErrorType::Cancel,
397 DefinedCondition::ServiceUnavailable,
398 "en",
399 "No handler defined for this kind of iq.",
400 );
401 let iq = Iq::from_error(iq.id, error)
402 .with_to(iq.from.unwrap())
403 .into();
404 let _ = self.client.send_stanza(iq).await;
405 }
406
407 events
408 }
409
410 async fn handle_message(&mut self, message: Message) -> Vec<Event> {
411 let mut events = vec![];
412 let from = message.from.clone().unwrap();
413 let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect();
414 match message.get_best_body(langs) {
415 Some((_lang, body)) => match message.type_ {
416 MessageType::Groupchat => {
417 let event = match from.clone() {
418 Jid::Full(full) => Event::RoomMessage(
419 message.id.clone(),
420 from.to_bare(),
421 full.resource_str().to_owned(),
422 body.clone(),
423 ),
424 Jid::Bare(bare) => {
425 Event::ServiceMessage(message.id.clone(), bare, body.clone())
426 }
427 };
428 events.push(event)
429 }
430 MessageType::Chat | MessageType::Normal => {
431 let mut found_special_message = false;
432
433 for payload in &message.payloads {
434 if let Ok(_) = MucUser::try_from(payload.clone()) {
435 let event = match from.clone() {
436 Jid::Bare(bare) => {
437 // TODO: Can a service message be of type Chat/Normal and not Groupchat?
438 warn!("Received misformed MessageType::Chat in muc#user namespace from a bare JID.");
439 Event::ServiceMessage(message.id.clone(), bare, body.clone())
440 }
441 Jid::Full(full) => Event::RoomPrivateMessage(
442 message.id.clone(),
443 full.to_bare(),
444 full.resource_str().to_owned(),
445 body.clone(),
446 ),
447 };
448
449 found_special_message = true;
450 events.push(event);
451 }
452 }
453
454 if !found_special_message {
455 let event =
456 Event::ChatMessage(message.id.clone(), from.to_bare(), body.clone());
457 events.push(event)
458 }
459 }
460 _ => (),
461 },
462 None => (),
463 }
464 for child in message.payloads {
465 if child.is("event", ns::PUBSUB_EVENT) {
466 let new_events = pubsub::handle_event(&from, child, self).await;
467 events.extend(new_events);
468 }
469 }
470
471 events
472 }
473
474 /// Translate a `Presence` stanza into a list of higher-level `Event`s.
475 async fn handle_presence(&mut self, presence: Presence) -> Vec<Event> {
476 // Allocate an empty vector to store the events.
477 let mut events = vec![];
478
479 // Extract the JID of the sender (i.e. the one whose presence is being sent).
480 let from = presence.from.unwrap().to_bare();
481
482 // Search through the payloads for a MUC user status.
483
484 if let Some(muc) = presence
485 .payloads
486 .iter()
487 .filter_map(|p| MucUser::try_from(p.clone()).ok())
488 .next()
489 {
490 // If a MUC user status was found, search through the statuses for a self-presence.
491 if muc.status.iter().any(|s| *s == Status::SelfPresence) {
492 // If a self-presence was found, then the stanza is about the client's own presence.
493
494 match presence.type_ {
495 PresenceType::None => {
496 // According to https://xmpp.org/extensions/xep-0045.html#enter-pres, no type should be seen as "available".
497 events.push(Event::RoomJoined(from.clone()));
498 }
499 PresenceType::Unavailable => {
500 // According to https://xmpp.org/extensions/xep-0045.html#exit, the server will use type "unavailable" to notify the client that it has left the room/
501 events.push(Event::RoomLeft(from.clone()));
502 }
503 _ => unimplemented!("Presence type {:?}", presence.type_), // TODO: What to do here?
504 }
505 }
506 }
507
508 // Return the list of events.
509 events
510 }
511
512 /// Wait for new events.
513 ///
514 /// # Returns
515 ///
516 /// - `Some(events)` if there are new events; multiple may be returned at once.
517 /// - `None` if the underlying stream is closed.
518 pub async fn wait_for_events(&mut self) -> Option<Vec<Event>> {
519 if let Some(event) = self.client.next().await {
520 let mut events = Vec::new();
521
522 match event {
523 TokioXmppEvent::Online { resumed: false, .. } => {
524 let presence = Self::make_initial_presence(&self.disco, &self.node).into();
525 let _ = self.client.send_stanza(presence).await;
526 events.push(Event::Online);
527 // TODO: only send this when the ContactList feature is enabled.
528 let iq = Iq::from_get(
529 "roster",
530 Roster {
531 ver: None,
532 items: vec![],
533 },
534 )
535 .into();
536 let _ = self.client.send_stanza(iq).await;
537 // TODO: only send this when the JoinRooms feature is enabled.
538 let iq =
539 Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2))).into();
540 let _ = self.client.send_stanza(iq).await;
541 }
542 TokioXmppEvent::Online { resumed: true, .. } => {}
543 TokioXmppEvent::Disconnected(_) => {
544 events.push(Event::Disconnected);
545 }
546 TokioXmppEvent::Stanza(elem) => {
547 if elem.is("iq", "jabber:client") {
548 let iq = Iq::try_from(elem).unwrap();
549 let new_events = self.handle_iq(iq).await;
550 events.extend(new_events);
551 } else if elem.is("message", "jabber:client") {
552 let message = Message::try_from(elem).unwrap();
553 let new_events = self.handle_message(message).await;
554 events.extend(new_events);
555 } else if elem.is("presence", "jabber:client") {
556 let presence = Presence::try_from(elem).unwrap();
557 let new_events = self.handle_presence(presence).await;
558 events.extend(new_events);
559 } else if elem.is("error", "http://etherx.jabber.org/streams") {
560 println!("Received a fatal stream error: {}", String::from(&elem));
561 } else {
562 panic!("Unknown stanza: {}", String::from(&elem));
563 }
564 }
565 }
566
567 Some(events)
568 } else {
569 None
570 }
571 }
572
573 pub async fn upload_file_with(&mut self, service: &str, path: &Path) {
574 let name = path.file_name().unwrap().to_str().unwrap().to_string();
575 let file = File::open(path).await.unwrap();
576 let size = file.metadata().await.unwrap().len();
577 let slot_request = SlotRequest {
578 filename: name,
579 size: size,
580 content_type: None,
581 };
582 let to = service.parse::<Jid>().unwrap();
583 let request = Iq::from_get("upload1", slot_request).with_to(to.clone());
584 self.uploads
585 .push((String::from("upload1"), to, path.to_path_buf()));
586 self.client.send_stanza(request.into()).await.unwrap();
587 }
588}
589
590async fn handle_upload_result(
591 from: &Jid,
592 iqid: String,
593 elem: Element,
594 agent: &mut Agent,
595) -> impl IntoIterator<Item = Event> {
596 let mut res: Option<(usize, PathBuf)> = None;
597
598 for (i, (id, to, filepath)) in agent.uploads.iter().enumerate() {
599 if to == from && id == &iqid {
600 res = Some((i, filepath.to_path_buf()));
601 break;
602 }
603 }
604
605 if let Some((index, file)) = res {
606 agent.uploads.remove(index);
607 let slot = SlotResult::try_from(elem).unwrap();
608
609 let mut headers = ReqwestHeaderMap::new();
610 for header in slot.put.headers {
611 let (attr, val) = match header {
612 HttpUploadHeader::Authorization(val) => ("Authorization", val),
613 HttpUploadHeader::Cookie(val) => ("Cookie", val),
614 HttpUploadHeader::Expires(val) => ("Expires", val),
615 };
616 headers.insert(attr, val.parse().unwrap());
617 }
618
619 let web = ReqwestClient::new();
620 let stream = FramedRead::new(File::open(file).await.unwrap(), BytesCodec::new());
621 let body = ReqwestBody::wrap_stream(stream);
622 let res = web
623 .put(slot.put.url.as_str())
624 .headers(headers)
625 .body(body)
626 .send()
627 .await
628 .unwrap();
629 if res.status() == 201 {
630 return vec![Event::HttpUploadedFile(slot.get.url)];
631 }
632 }
633
634 return vec![];
635}
636
637#[cfg(test)]
638mod tests {
639 use super::{Agent, BareJid, ClientBuilder, ClientFeature, ClientType, Event};
640 use std::str::FromStr;
641 use tokio_xmpp::AsyncClient as TokioXmppClient;
642
643 #[tokio::test]
644 async fn test_simple() {
645 let jid = BareJid::from_str("foo@bar").unwrap();
646
647 let client = TokioXmppClient::new(jid.clone(), "meh");
648
649 // Client instance
650 let client_builder = ClientBuilder::new(jid, "meh")
651 .set_client(ClientType::Bot, "xmpp-rs")
652 .set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
653 .set_default_nick("bot")
654 .enable_feature(ClientFeature::ContactList);
655
656 #[cfg(feature = "avatars")]
657 let client_builder = client_builder.enable_feature(ClientFeature::Avatars);
658
659 let mut agent: Agent = client_builder.build_impl(client);
660
661 while let Some(events) = agent.wait_for_events().await {
662 assert!(match events[0] {
663 Event::Disconnected => true,
664 _ => false,
665 });
666 assert_eq!(events.len(), 1);
667 break;
668 }
669 }
670}