Cargo.toml 🔗
@@ -6,4 +6,6 @@ authors = ["Astro <astro@spaceboyz.net>"]
[dependencies]
futures = "*"
tokio-core = "*"
+tokio-io = "*"
+bytes = "*"
RustyXML = "*"
Astro created
Cargo.toml | 2 +
src/lib.rs | 4 +-
src/tcp.rs | 19 ++++++-----------
src/xmpp_codec.rs | 53 +++++++++++++++++++++++-------------------------
4 files changed, 36 insertions(+), 42 deletions(-)
@@ -6,4 +6,6 @@ authors = ["Astro <astro@spaceboyz.net>"]
[dependencies]
futures = "*"
tokio-core = "*"
+tokio-io = "*"
+bytes = "*"
RustyXML = "*"
@@ -1,9 +1,9 @@
#[macro_use]
extern crate futures;
extern crate tokio_core;
+extern crate tokio_io;
+extern crate bytes;
extern crate xml;
-extern crate rustls;
-extern crate tokio_rustls;
mod xmpp_codec;
@@ -1,16 +1,12 @@
use std::fmt;
use std::net::SocketAddr;
-use std::net::ToSocketAddrs;
-use std::sync::Arc;
use std::io::{Error, ErrorKind};
-use futures::{Future, BoxFuture, Sink, Poll, Async};
-use futures::stream::{Stream, iter};
+use futures::{Future, Sink, Poll, Async};
+use futures::stream::Stream;
use futures::sink;
use tokio_core::reactor::Handle;
-use tokio_core::io::Io;
+use tokio_io::AsyncRead;
use tokio_core::net::{TcpStream, TcpStreamNew};
-use rustls::ClientConfig;
-use tokio_rustls::ClientConfigExt;
use super::{XMPPStream, XMPPCodec, Packet};
@@ -25,7 +21,6 @@ enum TcpClientState {
SendStart(sink::Send<XMPPStream<TcpStream>>),
RecvStart(Option<XMPPStream<TcpStream>>),
Established,
- Invalid,
}
impl fmt::Debug for TcpClientState {
@@ -35,9 +30,9 @@ impl fmt::Debug for TcpClientState {
TcpClientState::SendStart(_) => "SendStart",
TcpClientState::RecvStart(_) => "RecvStart",
TcpClientState::Established => "Established",
- TcpClientState::Invalid => "Invalid",
};
- write!(fmt, "{}", s)
+ try!(write!(fmt, "{}", s));
+ Ok(())
}
}
@@ -58,7 +53,7 @@ impl Future for TcpClient {
let (new_state, result) = match self.state {
TcpClientState::Connecting(ref mut tcp_stream_new) => {
let tcp_stream = try_ready!(tcp_stream_new.poll());
- let xmpp_stream = tcp_stream.framed(XMPPCodec::new());
+ let xmpp_stream = AsyncRead::framed(tcp_stream, XMPPCodec::new());
let send = xmpp_stream.send(Packet::StreamStart);
let new_state = TcpClientState::SendStart(send);
(new_state, Ok(Async::NotReady))
@@ -82,7 +77,7 @@ impl Future for TcpClient {
let new_state = TcpClientState::Established;
(new_state, Ok(Async::Ready(xmpp_stream)))
},
- TcpClientState::Established | TcpClientState::Invalid =>
+ TcpClientState::Established =>
unreachable!(),
};
@@ -1,9 +1,11 @@
use std;
+use std::fmt::Write;
use std::str::from_utf8;
use std::io::{Error, ErrorKind};
use std::collections::HashMap;
-use tokio_core::io::{Codec, EasyBuf, Framed};
+use tokio_io::codec::{Framed, Encoder, Decoder};
use xml;
+use bytes::*;
const NS_XMLNS: &'static str = "http://www.w3.org/2000/xmlns/";
const NS_STREAMS: &'static str = "http://etherx.jabber.org/streams";
@@ -67,22 +69,20 @@ impl XMPPCodec {
}
}
-impl Codec for XMPPCodec {
- type In = Packet;
- type Out = Packet;
+impl Decoder for XMPPCodec {
+ type Item = Packet;
+ type Error = Error;
- fn decode(&mut self, buf: &mut EasyBuf) -> Result<Option<Self::In>, Error> {
+ fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
println!("XMPPCodec.decode {:?}", buf.len());
- let buf_len = buf.len();
- let chunk = buf.drain_to(buf_len);
- match from_utf8(chunk.as_slice()) {
+ match from_utf8(buf.take().as_ref()) {
Ok(s) =>
self.parser.feed_str(s),
Err(e) =>
return Err(Error::new(ErrorKind::InvalidInput, e)),
}
- let mut new_root = None;
+ let mut new_root: Option<XMPPRoot> = None;
let mut result = None;
for event in &mut self.parser {
match self.root {
@@ -128,29 +128,26 @@ impl Codec for XMPPCodec {
Ok(result)
}
- fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> Result<(), Error> {
- match msg {
+ fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Error> {
+ self.decode(buf)
+ }
+}
+
+impl Encoder for XMPPCodec {
+ type Item = Packet;
+ type Error = Error;
+
+ fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
+ match item {
Packet::StreamStart => {
- let mut write = |s: &str| {
- buf.extend_from_slice(s.as_bytes());
- };
-
- write("<?xml version='1.0'?>\n");
- write("<stream:stream");
- write(" version='1.0'");
- write(" to='spaceboyz.net'");
- write(&format!(" xmlns='{}'", NS_CLIENT));
- write(&format!(" xmlns:stream='{}'", NS_STREAMS));
- write(">\n");
-
- Ok(())
+ write!(dst,
+ "<?xml version='1.0'?>\n
+<stream:stream version='1.0' to='spaceboyz.net' xmlns='{}' xmlns:stream='{}'>\n",
+ NS_CLIENT, NS_STREAMS)
+ .map_err(|_| Error::from(ErrorKind::WriteZero))
},
// TODO: Implement all
_ => Ok(())
}
}
-
- fn decode_eof(&mut self, _buf: &mut EasyBuf) -> Result<Self::In, Error> {
- Err(Error::from(ErrorKind::UnexpectedEof))
- }
}