xmpp: HTTP File Upload

Maxime “pep” Buquet created

Implement XEP-0363 HTTP File Upload in xmpp-rs.

The current interface is as is because of a limitation of our libraries.
It is not possible to await on an IQ to get a result, so we have to
workaround it by storing data on the Agent and fetching it back when we
get the result.

The client will have to first call `upload_file_with` and then listen on
Event::HttpUploadedFile, which are decoupled actions, instead of
awaiting on upload_file_with and getting the URL as a result directly.

`upload_file_with` doesn't yet find the upload service by itself for the
same reason as above.

Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>

Change summary

xmpp/Cargo.toml |  4 ++
xmpp/src/lib.rs | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 66 insertions(+), 2 deletions(-)

Detailed changes

xmpp/Cargo.toml 🔗

@@ -17,8 +17,10 @@ edition = "2018"
 tokio-xmpp = "3.0.0"
 xmpp-parsers = "0.19"
 futures = "0.3"
-tokio = "1"
+tokio = { version = "1", features = ["full"] }
 log = "0.4"
+reqwest = { version = "0.11.8", features = ["stream"] }
+tokio-util = { version = "0.6.9", features = ["codec"] }
 
 [dev-dependencies]
 env_logger = "0.8"

xmpp/src/lib.rs 🔗

@@ -7,15 +7,20 @@
 #![deny(bare_trait_objects)]
 
 use futures::stream::StreamExt;
+use reqwest::{Body as ReqwestBody, Client as ReqwestClient};
 use std::cell::RefCell;
 use std::convert::TryFrom;
+use std::path::{Path, PathBuf};
 use std::rc::Rc;
+use tokio::fs::File;
+use tokio_util::codec::{BytesCodec, FramedRead};
 use tokio_xmpp::{AsyncClient as TokioXmppClient, Event as TokioXmppEvent};
 use xmpp_parsers::{
     bookmarks2::Conference,
     caps::{compute_disco, hash_caps, Caps},
     disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
     hashes::Algo,
+    http_upload::{SlotRequest, SlotResult},
     iq::{Iq, IqType},
     message::{Body, Message, MessageType},
     muc::{
@@ -27,7 +32,7 @@ use xmpp_parsers::{
     pubsub::pubsub::{Items, PubSub},
     roster::{Item as RosterItem, Roster},
     stanza_error::{DefinedCondition, ErrorType, StanzaError},
-    BareJid, FullJid, Jid,
+    BareJid, Element, FullJid, Jid,
 };
 #[macro_use]
 extern crate log;
@@ -83,6 +88,7 @@ pub enum Event {
     RoomJoined(BareJid),
     RoomLeft(BareJid),
     RoomMessage(BareJid, RoomNick, Body),
+    HttpUploadedFile(String),
 }
 
 #[derive(Default)]
@@ -175,6 +181,7 @@ impl ClientBuilder<'_> {
             lang: Rc::new(self.lang),
             disco,
             node,
+            uploads: Vec::new(),
         };
 
         Ok(agent)
@@ -187,6 +194,7 @@ pub struct Agent {
     lang: Rc<Vec<String>>,
     disco: DiscoInfoResult,
     node: String,
+    uploads: Vec<(String, Jid, PathBuf)>,
 }
 
 impl Agent {
@@ -291,6 +299,9 @@ impl Agent {
             } else if payload.is("pubsub", ns::PUBSUB) {
                 let new_events = pubsub::handle_iq_result(&from, payload);
                 events.extend(new_events);
+            } else if payload.is("slot", ns::HTTP_UPLOAD) {
+                let new_events = handle_upload_result(&from, iq.id, payload, self).await;
+                events.extend(new_events);
             }
         } else if let IqType::Set(_) = iq.payload {
             // We MUST answer unhandled set iqs with a service-unavailable error.
@@ -417,6 +428,57 @@ impl Agent {
             None
         }
     }
+
+    pub async fn upload_file_with(&mut self, service: &str, path: &Path) {
+        let name = path.file_name().unwrap().to_str().unwrap().to_string();
+        let file = File::open(path).await.unwrap();
+        let size = file.metadata().await.unwrap().len();
+        let slot_request = SlotRequest {
+            filename: name,
+            size: size,
+            content_type: None,
+        };
+        let to = service.parse::<Jid>().unwrap();
+        let request = Iq::from_get("upload1", slot_request).with_to(to.clone());
+        self.uploads
+            .push((String::from("upload1"), to, path.to_path_buf()));
+        self.client.send_stanza(request.into()).await.unwrap();
+    }
+}
+
+async fn handle_upload_result(
+    from: &Jid,
+    iqid: String,
+    elem: Element,
+    agent: &mut Agent,
+) -> impl IntoIterator<Item = Event> {
+    let mut res: Option<(usize, PathBuf)> = None;
+
+    for (i, (id, to, filepath)) in agent.uploads.iter().enumerate() {
+        if to == from && id == &iqid {
+            res = Some((i, filepath.to_path_buf()));
+            break;
+        }
+    }
+
+    if let Some((index, file)) = res {
+        agent.uploads.remove(index);
+        let slot = SlotResult::try_from(elem).unwrap();
+        let web = ReqwestClient::new();
+        let stream = FramedRead::new(File::open(file).await.unwrap(), BytesCodec::new());
+        let body = ReqwestBody::wrap_stream(stream);
+        let res = web
+            .put(slot.put.url.as_str())
+            .body(body)
+            .send()
+            .await
+            .unwrap();
+        if res.status() == 201 {
+            return vec![Event::HttpUploadedFile(slot.get.url)];
+        }
+    }
+
+    return vec![];
 }
 
 #[cfg(test)]