1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#![deny(bare_trait_objects)]
8
9use futures::stream::StreamExt;
10use reqwest::{
11 header::HeaderMap as ReqwestHeaderMap, Body as ReqwestBody, Client as ReqwestClient,
12};
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, RwLock};
15use tokio::fs::File;
16use tokio_util::codec::{BytesCodec, FramedRead};
17pub use tokio_xmpp::parsers;
18use tokio_xmpp::parsers::{
19 bookmarks2::Conference,
20 caps::{compute_disco, hash_caps, Caps},
21 disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
22 hashes::Algo,
23 http_upload::{Header as HttpUploadHeader, SlotRequest, SlotResult},
24 iq::{Iq, IqType},
25 message::{Body, Message, MessageType},
26 muc::{
27 user::{MucUser, Status},
28 Muc,
29 },
30 ns,
31 presence::{Presence, Type as PresenceType},
32 pubsub::pubsub::{Items, PubSub},
33 roster::{Item as RosterItem, Roster},
34 stanza_error::{DefinedCondition, ErrorType, StanzaError},
35};
36use tokio_xmpp::{AsyncClient as TokioXmppClient, Event as TokioXmppEvent};
37pub use tokio_xmpp::{BareJid, Element, FullJid, Jid};
38#[macro_use]
39extern crate log;
40
41mod pubsub;
42
43pub type Error = tokio_xmpp::Error;
44
45#[derive(Debug)]
46pub enum ClientType {
47 Bot,
48 Pc,
49}
50
51impl Default for ClientType {
52 fn default() -> Self {
53 ClientType::Bot
54 }
55}
56
57impl ToString for ClientType {
58 fn to_string(&self) -> String {
59 String::from(match self {
60 ClientType::Bot => "bot",
61 ClientType::Pc => "pc",
62 })
63 }
64}
65
66#[derive(PartialEq)]
67pub enum ClientFeature {
68 #[cfg(feature = "avatars")]
69 Avatars,
70 ContactList,
71 JoinRooms,
72}
73
74pub type Id = Option<String>;
75pub type RoomNick = String;
76
77#[derive(Debug)]
78pub enum Event {
79 Online,
80 Disconnected(Error),
81 ContactAdded(RosterItem),
82 ContactRemoved(RosterItem),
83 ContactChanged(RosterItem),
84 #[cfg(feature = "avatars")]
85 AvatarRetrieved(Jid, String),
86 ChatMessage(Id, BareJid, Body),
87 JoinRoom(BareJid, Conference),
88 LeaveRoom(BareJid),
89 LeaveAllRooms,
90 RoomJoined(BareJid),
91 RoomLeft(BareJid),
92 RoomMessage(Id, BareJid, RoomNick, Body),
93 /// A private message received from a room, containing the message ID, the room's BareJid,
94 /// the sender's nickname, and the message body.
95 RoomPrivateMessage(Id, BareJid, RoomNick, Body),
96 ServiceMessage(Id, BareJid, Body),
97 HttpUploadedFile(String),
98}
99
100pub struct ClientBuilder<'a> {
101 jid: BareJid,
102 password: &'a str,
103 website: String,
104 default_nick: String,
105 lang: Vec<String>,
106 disco: (ClientType, String),
107 features: Vec<ClientFeature>,
108 resource: Option<String>,
109}
110
111impl ClientBuilder<'_> {
112 pub fn new<'a>(jid: BareJid, password: &'a str) -> ClientBuilder<'a> {
113 ClientBuilder {
114 jid,
115 password,
116 website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
117 default_nick: String::from("xmpp-rs"),
118 lang: vec![String::from("en")],
119 disco: (ClientType::default(), String::from("tokio-xmpp")),
120 features: vec![],
121 resource: None,
122 }
123 }
124
125 /// Optionally set a resource associated to this device on the client
126 pub fn set_resource(mut self, resource: &str) -> Self {
127 self.resource = Some(resource.to_string());
128 self
129 }
130
131 pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
132 self.disco = (type_, String::from(name));
133 self
134 }
135
136 pub fn set_website(mut self, url: &str) -> Self {
137 self.website = String::from(url);
138 self
139 }
140
141 pub fn set_default_nick(mut self, nick: &str) -> Self {
142 self.default_nick = String::from(nick);
143 self
144 }
145
146 pub fn set_lang(mut self, lang: Vec<String>) -> Self {
147 self.lang = lang;
148 self
149 }
150
151 pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
152 self.features.push(feature);
153 self
154 }
155
156 fn make_disco(&self) -> DiscoInfoResult {
157 let identities = vec![Identity::new(
158 "client",
159 self.disco.0.to_string(),
160 "en",
161 self.disco.1.to_string(),
162 )];
163 let mut features = vec![Feature::new(ns::DISCO_INFO)];
164 #[cfg(feature = "avatars")]
165 {
166 if self.features.contains(&ClientFeature::Avatars) {
167 features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
168 }
169 }
170 if self.features.contains(&ClientFeature::JoinRooms) {
171 features.push(Feature::new(format!("{}+notify", ns::BOOKMARKS2)));
172 }
173 DiscoInfoResult {
174 node: None,
175 identities,
176 features,
177 extensions: vec![],
178 }
179 }
180
181 pub fn build(self) -> Agent {
182 let jid: Jid = if let Some(resource) = &self.resource {
183 self.jid.with_resource_str(resource).unwrap().into()
184 } else {
185 self.jid.clone().into()
186 };
187
188 let client = TokioXmppClient::new(jid, self.password);
189 self.build_impl(client)
190 }
191
192 // This function is meant to be used for testing build
193 pub(crate) fn build_impl(self, client: TokioXmppClient) -> Agent {
194 let disco = self.make_disco();
195 let node = self.website;
196
197 Agent {
198 client,
199 default_nick: Arc::new(RwLock::new(self.default_nick)),
200 lang: Arc::new(self.lang),
201 disco,
202 node,
203 uploads: Vec::new(),
204 }
205 }
206}
207
208pub struct Agent {
209 client: TokioXmppClient,
210 default_nick: Arc<RwLock<String>>,
211 lang: Arc<Vec<String>>,
212 disco: DiscoInfoResult,
213 node: String,
214 uploads: Vec<(String, Jid, PathBuf)>,
215}
216
217impl Agent {
218 pub async fn disconnect(&mut self) -> Result<(), Error> {
219 self.client.send_end().await
220 }
221
222 pub async fn join_room(
223 &mut self,
224 room: BareJid,
225 nick: Option<String>,
226 password: Option<String>,
227 lang: &str,
228 status: &str,
229 ) {
230 let mut muc = Muc::new();
231 if let Some(password) = password {
232 muc = muc.with_password(password);
233 }
234
235 let nick = nick.unwrap_or_else(|| self.default_nick.read().unwrap().clone());
236 let room_jid = room.with_resource_str(&nick).unwrap();
237 let mut presence = Presence::new(PresenceType::None).with_to(room_jid);
238 presence.add_payload(muc);
239 presence.set_status(String::from(lang), String::from(status));
240 let _ = self.client.send_stanza(presence.into()).await;
241 }
242
243 /// Send a "leave room" request to the server (specifically, an "unavailable" presence stanza).
244 ///
245 /// The returned future will resolve when the request has been sent,
246 /// not when the room has actually been left.
247 ///
248 /// If successful, a `RoomLeft` event should be received later as a confirmation.
249 ///
250 /// See: https://xmpp.org/extensions/xep-0045.html#exit
251 ///
252 /// Note that this method does NOT remove the room from the auto-join list; the latter
253 /// is more a list of bookmarks that the account knows about and that have a flag set
254 /// to indicate that they should be joined automatically after connecting (see the JoinRoom event).
255 ///
256 /// Regarding the latter, see the these minutes about auto-join behavior:
257 /// https://docs.modernxmpp.org/meetings/2019-01-brussels/#bookmarks
258 ///
259 /// # Arguments
260 ///
261 /// * `room_jid`: The JID of the room to leave.
262 /// * `nickname`: The nickname to use in the room.
263 /// * `lang`: The language of the status message.
264 /// * `status`: The status message to send.
265 pub async fn leave_room(
266 &mut self,
267 room_jid: BareJid,
268 nickname: RoomNick,
269 lang: impl Into<String>,
270 status: impl Into<String>,
271 ) {
272 // XEP-0045 specifies that, to leave a room, the client must send a presence stanza
273 // with type="unavailable".
274 let mut presence = Presence::new(PresenceType::Unavailable).with_to(
275 room_jid
276 .with_resource_str(nickname.as_str())
277 .expect("Invalid room JID after adding resource part."),
278 );
279
280 // Optionally, the client may include a status message in the presence stanza.
281 // TODO: Should this be optional? The XEP says "MAY", but the method signature requires the arguments.
282 // XEP-0045: "The occupant MAY include normal <status/> information in the unavailable presence stanzas"
283 presence.set_status(lang, status);
284
285 // Send the presence stanza.
286 if let Err(e) = self.client.send_stanza(presence.into()).await {
287 // Report any errors to the log.
288 error!("Failed to send leave room presence: {}", e);
289 }
290 }
291
292 pub async fn send_message(
293 &mut self,
294 recipient: Jid,
295 type_: MessageType,
296 lang: &str,
297 text: &str,
298 ) {
299 let mut message = Message::new(Some(recipient));
300 message.type_ = type_;
301 message
302 .bodies
303 .insert(String::from(lang), Body(String::from(text)));
304 let _ = self.client.send_stanza(message.into()).await;
305 }
306
307 pub async fn send_room_private_message(
308 &mut self,
309 room: BareJid,
310 recipient: RoomNick,
311 lang: &str,
312 text: &str,
313 ) {
314 let recipient: Jid = room.with_resource_str(&recipient).unwrap().into();
315 let mut message = Message::new(recipient).with_payload(MucUser::new());
316 message.type_ = MessageType::Chat;
317 message
318 .bodies
319 .insert(String::from(lang), Body(String::from(text)));
320 let _ = self.client.send_stanza(message.into()).await;
321 }
322
323 fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
324 let caps_data = compute_disco(disco);
325 let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
326 let caps = Caps::new(node, hash);
327
328 let mut presence = Presence::new(PresenceType::None);
329 presence.add_payload(caps);
330 presence
331 }
332
333 async fn handle_iq(&mut self, iq: Iq) -> Vec<Event> {
334 let mut events = vec![];
335 let from = iq
336 .from
337 .clone()
338 .unwrap_or_else(|| self.client.bound_jid().unwrap().clone());
339 if let IqType::Get(payload) = iq.payload {
340 if payload.is("query", ns::DISCO_INFO) {
341 let query = DiscoInfoQuery::try_from(payload);
342 match query {
343 Ok(query) => {
344 let mut disco_info = self.disco.clone();
345 disco_info.node = query.node;
346 let iq = Iq::from_result(iq.id, Some(disco_info))
347 .with_to(iq.from.unwrap())
348 .into();
349 let _ = self.client.send_stanza(iq).await;
350 }
351 Err(err) => {
352 let error = StanzaError::new(
353 ErrorType::Modify,
354 DefinedCondition::BadRequest,
355 "en",
356 &format!("{}", err),
357 );
358 let iq = Iq::from_error(iq.id, error)
359 .with_to(iq.from.unwrap())
360 .into();
361 let _ = self.client.send_stanza(iq).await;
362 }
363 }
364 } else {
365 // We MUST answer unhandled get iqs with a service-unavailable error.
366 let error = StanzaError::new(
367 ErrorType::Cancel,
368 DefinedCondition::ServiceUnavailable,
369 "en",
370 "No handler defined for this kind of iq.",
371 );
372 let iq = Iq::from_error(iq.id, error)
373 .with_to(iq.from.unwrap())
374 .into();
375 let _ = self.client.send_stanza(iq).await;
376 }
377 } else if let IqType::Result(Some(payload)) = iq.payload {
378 // TODO: move private iqs like this one somewhere else, for
379 // security reasons.
380 if payload.is("query", ns::ROSTER) && Some(from.clone()) == iq.from {
381 let roster = Roster::try_from(payload).unwrap();
382 for item in roster.items.into_iter() {
383 events.push(Event::ContactAdded(item));
384 }
385 } else if payload.is("pubsub", ns::PUBSUB) {
386 let new_events = pubsub::handle_iq_result(&from, payload);
387 events.extend(new_events);
388 } else if payload.is("slot", ns::HTTP_UPLOAD) {
389 let new_events = handle_upload_result(&from, iq.id, payload, self).await;
390 events.extend(new_events);
391 }
392 } else if let IqType::Set(_) = iq.payload {
393 // We MUST answer unhandled set iqs with a service-unavailable error.
394 let error = StanzaError::new(
395 ErrorType::Cancel,
396 DefinedCondition::ServiceUnavailable,
397 "en",
398 "No handler defined for this kind of iq.",
399 );
400 let iq = Iq::from_error(iq.id, error)
401 .with_to(iq.from.unwrap())
402 .into();
403 let _ = self.client.send_stanza(iq).await;
404 }
405
406 events
407 }
408
409 async fn handle_message(&mut self, message: Message) -> Vec<Event> {
410 let mut events = vec![];
411 let from = message.from.clone().unwrap();
412 let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect();
413 match message.get_best_body(langs) {
414 Some((_lang, body)) => match message.type_ {
415 MessageType::Groupchat => {
416 let event = match from.clone() {
417 Jid::Full(full) => Event::RoomMessage(
418 message.id.clone(),
419 from.to_bare(),
420 full.resource_str().to_owned(),
421 body.clone(),
422 ),
423 Jid::Bare(bare) => {
424 Event::ServiceMessage(message.id.clone(), bare, body.clone())
425 }
426 };
427 events.push(event)
428 }
429 MessageType::Chat | MessageType::Normal => {
430 let mut found_special_message = false;
431
432 for payload in &message.payloads {
433 if let Ok(_) = MucUser::try_from(payload.clone()) {
434 let event = match from.clone() {
435 Jid::Bare(bare) => {
436 // TODO: Can a service message be of type Chat/Normal and not Groupchat?
437 warn!("Received misformed MessageType::Chat in muc#user namespace from a bare JID.");
438 Event::ServiceMessage(message.id.clone(), bare, body.clone())
439 }
440 Jid::Full(full) => Event::RoomPrivateMessage(
441 message.id.clone(),
442 full.to_bare(),
443 full.resource_str().to_owned(),
444 body.clone(),
445 ),
446 };
447
448 found_special_message = true;
449 events.push(event);
450 }
451 }
452
453 if !found_special_message {
454 let event =
455 Event::ChatMessage(message.id.clone(), from.to_bare(), body.clone());
456 events.push(event)
457 }
458 }
459 _ => (),
460 },
461 None => (),
462 }
463 for child in message.payloads {
464 if child.is("event", ns::PUBSUB_EVENT) {
465 let new_events = pubsub::handle_event(&from, child, self).await;
466 events.extend(new_events);
467 }
468 }
469
470 events
471 }
472
473 /// Translate a `Presence` stanza into a list of higher-level `Event`s.
474 async fn handle_presence(&mut self, presence: Presence) -> Vec<Event> {
475 // Allocate an empty vector to store the events.
476 let mut events = vec![];
477
478 // Extract the JID of the sender (i.e. the one whose presence is being sent).
479 let from = presence.from.unwrap().to_bare();
480
481 // Search through the payloads for a MUC user status.
482
483 if let Some(muc) = presence
484 .payloads
485 .iter()
486 .filter_map(|p| MucUser::try_from(p.clone()).ok())
487 .next()
488 {
489 // If a MUC user status was found, search through the statuses for a self-presence.
490 if muc.status.iter().any(|s| *s == Status::SelfPresence) {
491 // If a self-presence was found, then the stanza is about the client's own presence.
492
493 match presence.type_ {
494 PresenceType::None => {
495 // According to https://xmpp.org/extensions/xep-0045.html#enter-pres, no type should be seen as "available".
496 events.push(Event::RoomJoined(from.clone()));
497 }
498 PresenceType::Unavailable => {
499 // According to https://xmpp.org/extensions/xep-0045.html#exit, the server will use type "unavailable" to notify the client that it has left the room/
500 events.push(Event::RoomLeft(from.clone()));
501 }
502 _ => unimplemented!("Presence type {:?}", presence.type_), // TODO: What to do here?
503 }
504 }
505 }
506
507 // Return the list of events.
508 events
509 }
510
511 /// Wait for new events.
512 ///
513 /// # Returns
514 ///
515 /// - `Some(events)` if there are new events; multiple may be returned at once.
516 /// - `None` if the underlying stream is closed.
517 pub async fn wait_for_events(&mut self) -> Option<Vec<Event>> {
518 if let Some(event) = self.client.next().await {
519 let mut events = Vec::new();
520
521 match event {
522 TokioXmppEvent::Online { resumed: false, .. } => {
523 let presence = Self::make_initial_presence(&self.disco, &self.node).into();
524 let _ = self.client.send_stanza(presence).await;
525 events.push(Event::Online);
526 // TODO: only send this when the ContactList feature is enabled.
527 let iq = Iq::from_get(
528 "roster",
529 Roster {
530 ver: None,
531 items: vec![],
532 },
533 )
534 .into();
535 let _ = self.client.send_stanza(iq).await;
536 // TODO: only send this when the JoinRooms feature is enabled.
537 let iq =
538 Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2))).into();
539 let _ = self.client.send_stanza(iq).await;
540 }
541 TokioXmppEvent::Online { resumed: true, .. } => {}
542 TokioXmppEvent::Disconnected(e) => {
543 events.push(Event::Disconnected(e));
544 }
545 TokioXmppEvent::Stanza(elem) => {
546 if elem.is("iq", "jabber:client") {
547 let iq = Iq::try_from(elem).unwrap();
548 let new_events = self.handle_iq(iq).await;
549 events.extend(new_events);
550 } else if elem.is("message", "jabber:client") {
551 let message = Message::try_from(elem).unwrap();
552 let new_events = self.handle_message(message).await;
553 events.extend(new_events);
554 } else if elem.is("presence", "jabber:client") {
555 let presence = Presence::try_from(elem).unwrap();
556 let new_events = self.handle_presence(presence).await;
557 events.extend(new_events);
558 } else if elem.is("error", "http://etherx.jabber.org/streams") {
559 println!("Received a fatal stream error: {}", String::from(&elem));
560 } else {
561 panic!("Unknown stanza: {}", String::from(&elem));
562 }
563 }
564 }
565
566 Some(events)
567 } else {
568 None
569 }
570 }
571
572 pub async fn upload_file_with(&mut self, service: &str, path: &Path) {
573 let name = path.file_name().unwrap().to_str().unwrap().to_string();
574 let file = File::open(path).await.unwrap();
575 let size = file.metadata().await.unwrap().len();
576 let slot_request = SlotRequest {
577 filename: name,
578 size: size,
579 content_type: None,
580 };
581 let to = service.parse::<Jid>().unwrap();
582 let request = Iq::from_get("upload1", slot_request).with_to(to.clone());
583 self.uploads
584 .push((String::from("upload1"), to, path.to_path_buf()));
585 self.client.send_stanza(request.into()).await.unwrap();
586 }
587}
588
589async fn handle_upload_result(
590 from: &Jid,
591 iqid: String,
592 elem: Element,
593 agent: &mut Agent,
594) -> impl IntoIterator<Item = Event> {
595 let mut res: Option<(usize, PathBuf)> = None;
596
597 for (i, (id, to, filepath)) in agent.uploads.iter().enumerate() {
598 if to == from && id == &iqid {
599 res = Some((i, filepath.to_path_buf()));
600 break;
601 }
602 }
603
604 if let Some((index, file)) = res {
605 agent.uploads.remove(index);
606 let slot = SlotResult::try_from(elem).unwrap();
607
608 let mut headers = ReqwestHeaderMap::new();
609 for header in slot.put.headers {
610 let (attr, val) = match header {
611 HttpUploadHeader::Authorization(val) => ("Authorization", val),
612 HttpUploadHeader::Cookie(val) => ("Cookie", val),
613 HttpUploadHeader::Expires(val) => ("Expires", val),
614 };
615 headers.insert(attr, val.parse().unwrap());
616 }
617
618 let web = ReqwestClient::new();
619 let stream = FramedRead::new(File::open(file).await.unwrap(), BytesCodec::new());
620 let body = ReqwestBody::wrap_stream(stream);
621 let res = web
622 .put(slot.put.url.as_str())
623 .headers(headers)
624 .body(body)
625 .send()
626 .await
627 .unwrap();
628 if res.status() == 201 {
629 return vec![Event::HttpUploadedFile(slot.get.url)];
630 }
631 }
632
633 return vec![];
634}
635
636#[cfg(test)]
637mod tests {
638 use super::{Agent, BareJid, ClientBuilder, ClientFeature, ClientType, Event};
639 use std::str::FromStr;
640 use tokio_xmpp::AsyncClient as TokioXmppClient;
641
642 #[tokio::test]
643 async fn test_simple() {
644 let jid = BareJid::from_str("foo@bar").unwrap();
645
646 let client = TokioXmppClient::new(jid.clone(), "meh");
647
648 // Client instance
649 let client_builder = ClientBuilder::new(jid, "meh")
650 .set_client(ClientType::Bot, "xmpp-rs")
651 .set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
652 .set_default_nick("bot")
653 .enable_feature(ClientFeature::ContactList);
654
655 #[cfg(feature = "avatars")]
656 let client_builder = client_builder.enable_feature(ClientFeature::Avatars);
657
658 let mut agent: Agent = client_builder.build_impl(client);
659
660 while let Some(events) = agent.wait_for_events().await {
661 assert!(match events[0] {
662 Event::Disconnected(_) => true,
663 _ => false,
664 });
665 assert_eq!(events.len(), 1);
666 break;
667 }
668 }
669}