this ain't work

Astro created

Change summary

.gitignore        |   2 
Cargo.toml        |   9 ++
src/lib.rs        | 112 ++++++++++++++++++++++++++++++++++++
src/xmpp_codec.rs | 149 +++++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 272 insertions(+)

Detailed changes

Cargo.toml 🔗

@@ -0,0 +1,9 @@
+[package]
+name = "tokio-xmpp"
+version = "0.1.0"
+authors = ["Astro <astro@spaceboyz.net>"]
+
+[dependencies]
+futures = "0.1.6"
+tokio-core = "0.1.1"
+RustyXML = "0.1.1"

src/lib.rs 🔗

@@ -0,0 +1,112 @@
+extern crate futures;
+extern crate tokio_core;
+extern crate xml;
+
+use std::net::SocketAddr;
+use std::net::ToSocketAddrs;
+use std::sync::Arc;
+use std::io::ErrorKind;
+use futures::{Future, BoxFuture, Sink, Poll};
+use futures::stream::{Stream, iter};
+use futures::future::result;
+use tokio_core::reactor::Handle;
+use tokio_core::io::Io;
+use tokio_core::net::TcpStream;
+
+mod xmpp_codec;
+use xmpp_codec::*;
+
+
+// type FullClient = sasl::Client<StartTLS<TCPConnection>>
+
+type Event = ();
+type Error = std::io::Error;
+
+struct TCPStream {
+    source: Box<Stream<Item=Event, Error=std::io::Error>>,
+    sink: Arc<Box<futures::stream::SplitSink<tokio_core::io::Framed<tokio_core::net::TcpStream, xmpp_codec::XMPPCodec>>>>,
+}
+
+impl TCPStream {
+    pub fn connect(addr: &SocketAddr, handle: &Handle) -> BoxFuture<Arc<TCPStream>, std::io::Error> {
+        TcpStream::connect(addr, handle)
+            .and_then(|stream| {
+                let (sink, source) = stream.framed(XMPPCodec::new())
+                // .framed(UTF8Codec::new())
+                    .split();
+                
+                sink.send(Packet::StreamStart)
+                    .and_then(|sink| result(Ok((Arc::new(Box::new(sink)), source))))
+            })
+            .and_then(|(sink, source)| {
+                let sink1 = sink.clone();
+                let source = source
+                    .map(|items| iter(items.into_iter().map(Ok)))
+                    .flatten()
+                    .filter_map(move |pkt| Self::process_packet(pkt, &sink1))
+                // .for_each(|ev| {
+                //     match ev {
+                //         Packet::Stanza
+                //         _ => (),
+                //     }
+                //     Ok(println!("xmpp: {:?}", ev))
+                // })
+                // .boxed();
+                    ;
+                result(Ok(Arc::new(TCPStream {
+                    source: Box::new(source),
+                    sink: sink,
+                })))
+            }).boxed()
+            //.map_err(|e| std::io::Error::new(ErrorKind::Other, e));
+    }
+
+    fn process_packet<S>(pkt: Packet, sink: &Arc<S>) -> Option<Event>
+        where S: Sink<SinkItem=Packet, SinkError=std::io::Error> {
+
+        println!("pkt: {:?}", pkt);
+        None
+    }
+}
+
+struct ClientStream {
+    inner: TCPStream,
+}
+
+impl ClientStream {
+    pub fn connect(jid: &str, password: &str, handle: &Handle) -> Box<Future<Item=Self, Error=std::io::Error>> {
+        let addr = "[2a01:4f8:a0:33d0::5]:5222"
+            .to_socket_addrs().unwrap()
+            .next().unwrap();
+        let stream =
+            TCPStream::connect(&addr, handle)
+            .and_then(|stream| {
+                Ok(ClientStream {
+                    inner: stream
+                })
+            });
+        Box::new(stream)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use tokio_core::reactor::Core;
+
+    #[test]
+    fn it_works() {
+        let mut core = Core::new().unwrap();
+        let client = super::ClientStream::connect(
+            "astro@spaceboyz.net",
+            "...",
+            &core.handle()
+        ).and_then(|stream| {
+            stream.inner.source.boxed().for_each(|item| {
+                Ok(println!("stream item: {:?}", item))
+            })
+        }).boxed();
+        core.run(client).unwrap();
+    }
+
+    // TODO: test truncated utf8
+}

src/xmpp_codec.rs 🔗

@@ -0,0 +1,149 @@
+use std;
+use std::str::from_utf8;
+use std::io::{Error, ErrorKind};
+use std::collections::HashMap;
+use tokio_core::io::{Codec, EasyBuf};
+use xml;
+
+const NS_XMLNS: &'static str = "http://www.w3.org/2000/xmlns/";
+const NS_STREAMS: &'static str = "http://etherx.jabber.org/streams";
+const NS_CLIENT: &'static str = "jabber:client";
+
+struct XMPPRoot {
+    builder: xml::ElementBuilder,
+    pub attributes: HashMap<(String, Option<String>), String>,
+}
+
+impl XMPPRoot {
+    fn new(root: xml::StartTag) -> Self {
+        let mut builder = xml::ElementBuilder::new();
+        let mut attributes = HashMap::new();
+        println!("root attributes: {:?}", root.attributes);
+        for (name_ns, value) in root.attributes {
+            match name_ns {
+                (ref name, None) if name == "xmlns" =>
+                    builder.set_default_ns(value),
+                (ref prefix, Some(ref ns)) if ns == NS_XMLNS =>
+                    builder.define_prefix(prefix.to_owned(), value),
+                _ => {
+                    attributes.insert(name_ns, value);
+                },
+            }
+        }
+
+        XMPPRoot {
+            builder: builder,
+            attributes: attributes,
+        }
+    }
+
+    fn handle_event(&mut self, event: Result<xml::Event, xml::ParserError>)
+                    -> Option<Result<xml::Element, xml::BuilderError>> {
+        self.builder.handle_event(event)
+    }
+}
+
+#[derive(Debug)]
+pub enum Packet {
+    Error(Box<std::error::Error>),
+    StreamStart,
+    Stanza(xml::Element),
+    StreamEnd,
+}
+
+pub struct XMPPCodec {
+    parser: xml::Parser,
+    root: Option<XMPPRoot>,
+}
+
+impl XMPPCodec {
+    pub fn new() -> Self {
+        XMPPCodec {
+            parser: xml::Parser::new(),
+            root: None,
+        }
+    }
+}
+
+impl Codec for XMPPCodec {
+    type In = Vec<Packet>;
+    type Out = Packet;
+
+    fn decode(&mut self, buf: &mut EasyBuf) -> Result<Option<Self::In>, Error> {
+        match from_utf8(buf.as_slice()) {
+            Ok(s) =>
+                self.parser.feed_str(s),
+            Err(e) =>
+                return Err(Error::new(ErrorKind::InvalidInput, e)),
+        }
+
+        let mut new_root = None;
+        let mut results = Vec::new();
+        for event in &mut self.parser {
+            match &mut self.root {
+                &mut None => {
+                    // Expecting <stream:stream>
+                    match event {
+                        Ok(xml::Event::ElementStart(start_tag)) => {
+                            new_root = Some(XMPPRoot::new(start_tag));
+                            results.push(Packet::StreamStart);
+                        },
+                        Err(e) =>
+                            results.push(Packet::Error(Box::new(e))),
+                        _ =>
+                            (),
+                    }
+                }
+
+                &mut Some(ref mut root) => {
+                    match root.handle_event(event) {
+                        None => (),
+                        Some(Ok(stanza)) => {
+                            println!("stanza: {}", stanza);
+                            results.push(Packet::Stanza(stanza));
+                        },
+                        Some(Err(e)) =>
+                            results.push(Packet::Error(Box::new(e))),
+                    };
+                },
+            }
+
+            match new_root.take() {
+                None => (),
+                Some(root) => self.root = Some(root),
+            }
+        }
+
+        if results.len() == 0 {
+            Ok(None)
+        } else {
+            Ok(Some(results))
+        }
+    }
+
+    fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> Result<(), Error> {
+        match msg {
+            Packet::StreamStart => {
+                let mut write = |s: &str| {
+                    buf.extend_from_slice(s.as_bytes());
+                };
+
+                write("<?xml version='1.0'?>\n");
+                write("<stream:stream");
+                write(" version='1.0'");
+                write(" to='spaceboyz.net'");
+                write(&format!(" xmlns='{}'", NS_CLIENT));
+                write(&format!(" xmlns:stream='{}'", NS_STREAMS));
+                write(">\n");
+
+                Ok(())
+            },
+            // TODO: Implement all
+            _ => Ok(())
+        }
+    }
+
+    fn decode_eof(&mut self, _buf: &mut EasyBuf) -> Result<Self::In, Error> {
+        Ok(vec!())
+    }
+}