commit 212d9e7e7e80d48eb1f2fa39f5503b5782fd542a Author: Astro Date: Fri Jun 2 00:42:57 2017 +0200 this ain't work diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..a9d37c560c6ab8d4afbf47eda643e8c42e857716 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +target +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..4e643380825af52a1c9bf9460b5608f7446b2d03 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "tokio-xmpp" +version = "0.1.0" +authors = ["Astro "] + +[dependencies] +futures = "0.1.6" +tokio-core = "0.1.1" +RustyXML = "0.1.1" diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000000000000000000000000000000000000..e9162b02c91e1c8af9754f70e0514c185c925b3e --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,112 @@ +extern crate futures; +extern crate tokio_core; +extern crate xml; + +use std::net::SocketAddr; +use std::net::ToSocketAddrs; +use std::sync::Arc; +use std::io::ErrorKind; +use futures::{Future, BoxFuture, Sink, Poll}; +use futures::stream::{Stream, iter}; +use futures::future::result; +use tokio_core::reactor::Handle; +use tokio_core::io::Io; +use tokio_core::net::TcpStream; + +mod xmpp_codec; +use xmpp_codec::*; + + +// type FullClient = sasl::Client> + +type Event = (); +type Error = std::io::Error; + +struct TCPStream { + source: Box>, + sink: Arc>>>, +} + +impl TCPStream { + pub fn connect(addr: &SocketAddr, handle: &Handle) -> BoxFuture, std::io::Error> { + TcpStream::connect(addr, handle) + .and_then(|stream| { + let (sink, source) = stream.framed(XMPPCodec::new()) + // .framed(UTF8Codec::new()) + .split(); + + sink.send(Packet::StreamStart) + .and_then(|sink| result(Ok((Arc::new(Box::new(sink)), source)))) + }) + .and_then(|(sink, source)| { + let sink1 = sink.clone(); + let source = source + .map(|items| iter(items.into_iter().map(Ok))) + .flatten() + .filter_map(move |pkt| Self::process_packet(pkt, &sink1)) + // .for_each(|ev| { + // match ev { + // Packet::Stanza + // _ => (), + // } + // Ok(println!("xmpp: {:?}", ev)) + // }) + // .boxed(); + ; + result(Ok(Arc::new(TCPStream { + source: Box::new(source), + sink: sink, + }))) + }).boxed() + //.map_err(|e| std::io::Error::new(ErrorKind::Other, e)); + } + + fn process_packet(pkt: Packet, sink: &Arc) -> Option + where S: Sink { + + println!("pkt: {:?}", pkt); + None + } +} + +struct ClientStream { + inner: TCPStream, +} + +impl ClientStream { + pub fn connect(jid: &str, password: &str, handle: &Handle) -> Box> { + let addr = "[2a01:4f8:a0:33d0::5]:5222" + .to_socket_addrs().unwrap() + .next().unwrap(); + let stream = + TCPStream::connect(&addr, handle) + .and_then(|stream| { + Ok(ClientStream { + inner: stream + }) + }); + Box::new(stream) + } +} + +#[cfg(test)] +mod tests { + use tokio_core::reactor::Core; + + #[test] + fn it_works() { + let mut core = Core::new().unwrap(); + let client = super::ClientStream::connect( + "astro@spaceboyz.net", + "...", + &core.handle() + ).and_then(|stream| { + stream.inner.source.boxed().for_each(|item| { + Ok(println!("stream item: {:?}", item)) + }) + }).boxed(); + core.run(client).unwrap(); + } + + // TODO: test truncated utf8 +} diff --git a/src/xmpp_codec.rs b/src/xmpp_codec.rs new file mode 100644 index 0000000000000000000000000000000000000000..847339315fce725d93bebb51b0c194fe86810e1a --- /dev/null +++ b/src/xmpp_codec.rs @@ -0,0 +1,149 @@ +use std; +use std::str::from_utf8; +use std::io::{Error, ErrorKind}; +use std::collections::HashMap; +use tokio_core::io::{Codec, EasyBuf}; +use xml; + +const NS_XMLNS: &'static str = "http://www.w3.org/2000/xmlns/"; +const NS_STREAMS: &'static str = "http://etherx.jabber.org/streams"; +const NS_CLIENT: &'static str = "jabber:client"; + +struct XMPPRoot { + builder: xml::ElementBuilder, + pub attributes: HashMap<(String, Option), String>, +} + +impl XMPPRoot { + fn new(root: xml::StartTag) -> Self { + let mut builder = xml::ElementBuilder::new(); + let mut attributes = HashMap::new(); + println!("root attributes: {:?}", root.attributes); + for (name_ns, value) in root.attributes { + match name_ns { + (ref name, None) if name == "xmlns" => + builder.set_default_ns(value), + (ref prefix, Some(ref ns)) if ns == NS_XMLNS => + builder.define_prefix(prefix.to_owned(), value), + _ => { + attributes.insert(name_ns, value); + }, + } + } + + XMPPRoot { + builder: builder, + attributes: attributes, + } + } + + fn handle_event(&mut self, event: Result) + -> Option> { + self.builder.handle_event(event) + } +} + +#[derive(Debug)] +pub enum Packet { + Error(Box), + StreamStart, + Stanza(xml::Element), + StreamEnd, +} + +pub struct XMPPCodec { + parser: xml::Parser, + root: Option, +} + +impl XMPPCodec { + pub fn new() -> Self { + XMPPCodec { + parser: xml::Parser::new(), + root: None, + } + } +} + +impl Codec for XMPPCodec { + type In = Vec; + type Out = Packet; + + fn decode(&mut self, buf: &mut EasyBuf) -> Result, Error> { + match from_utf8(buf.as_slice()) { + Ok(s) => + self.parser.feed_str(s), + Err(e) => + return Err(Error::new(ErrorKind::InvalidInput, e)), + } + + let mut new_root = None; + let mut results = Vec::new(); + for event in &mut self.parser { + match &mut self.root { + &mut None => { + // Expecting + match event { + Ok(xml::Event::ElementStart(start_tag)) => { + new_root = Some(XMPPRoot::new(start_tag)); + results.push(Packet::StreamStart); + }, + Err(e) => + results.push(Packet::Error(Box::new(e))), + _ => + (), + } + } + + &mut Some(ref mut root) => { + match root.handle_event(event) { + None => (), + Some(Ok(stanza)) => { + println!("stanza: {}", stanza); + results.push(Packet::Stanza(stanza)); + }, + Some(Err(e)) => + results.push(Packet::Error(Box::new(e))), + }; + }, + } + + match new_root.take() { + None => (), + Some(root) => self.root = Some(root), + } + } + + if results.len() == 0 { + Ok(None) + } else { + Ok(Some(results)) + } + } + + fn encode(&mut self, msg: Self::Out, buf: &mut Vec) -> Result<(), Error> { + match msg { + Packet::StreamStart => { + let mut write = |s: &str| { + buf.extend_from_slice(s.as_bytes()); + }; + + write("\n"); + write("\n"); + + Ok(()) + }, + // TODO: Implement all + _ => Ok(()) + } + } + + fn decode_eof(&mut self, _buf: &mut EasyBuf) -> Result { + Ok(vec!()) + } +}