.gitlab-ci.yml π
@@ -1,4 +1,4 @@
-image: "scorpil/rust:nightly"
+image: "pitkley/rust:nightly"
before_script:
- apt-get update -yqq
lumi created
Update/Change deps: quick-xml, xmpp-parsers, minidom, jid
See merge request xmpp-rs/xmpp-rs!32
.gitlab-ci.yml | 2
Cargo.toml | 8 +-
src/client.rs | 12 +--
src/component.rs | 26 ++++--
src/connection.rs | 34 +++++---
src/error.rs | 18 ++--
src/lib.rs | 2
src/plugins/ibb.rs | 155 +++++++++++++++++++++++------------------
src/plugins/messaging.rs | 16 ++--
src/plugins/roster.rs | 6
src/transport.rs | 128 ++++++++++++++++------------------
11 files changed, 213 insertions(+), 194 deletions(-)
@@ -1,4 +1,4 @@
-image: "scorpil/rust:nightly"
+image: "pitkley/rust:nightly"
before_script:
- apt-get update -yqq
@@ -15,12 +15,12 @@ license = "LGPL-3.0+"
gitlab = { repository = "lumi/xmpp-rs" }
[dependencies]
-xml-rs = "0.4.1"
-xmpp-parsers = "0.7.0"
+quick-xml = "0.10.0"
+xmpp-parsers = "0.9.0"
openssl = "0.9.12"
base64 = "0.6.0"
-minidom = "0.4.1"
-jid = "0.2.1"
+minidom = "0.7.0"
+jid = { version = "0.4", features = ["minidom"] }
sasl = "0.4.0"
sha-1 = "0.4"
chrono = "0.4.0"
@@ -1,4 +1,3 @@
-use xml;
use jid::Jid;
use transport::{Transport, SslTransport};
use error::Error;
@@ -17,7 +16,7 @@ use base64;
use minidom::Element;
-use xml::reader::XmlEvent as ReaderEvent;
+use quick_xml::events::Event as XmlEvent;
use std::sync::{Mutex, Arc};
@@ -156,10 +155,6 @@ impl Client {
self.transport.lock().unwrap().write_element(elem)
}
- fn read_event(&self) -> Result<xml::reader::XmlEvent, Error> {
- self.transport.lock().unwrap().read_event()
- }
-
fn connect(&mut self, mut credentials: SaslCredentials) -> Result<(), Error> {
let features = self.wait_for_features()?;
let ms = &features.sasl_mechanisms.ok_or(Error::SaslError(Some("no SASL mechanisms".to_owned())))?;
@@ -269,9 +264,10 @@ impl Client {
fn wait_for_features(&mut self) -> Result<StreamFeatures, Error> {
// TODO: this is very ugly
loop {
- let e = self.read_event()?;
+ let mut transport = self.transport.lock().unwrap();
+ let e = transport.read_event();
match e {
- ReaderEvent::StartElement { .. } => {
+ Ok(XmlEvent::Start { .. }) => {
break;
},
_ => (),
@@ -1,4 +1,3 @@
-use xml;
use jid::Jid;
use transport::{Transport, PlainTransport};
use error::Error;
@@ -10,8 +9,9 @@ use sha_1::{Sha1, Digest};
use minidom::Element;
-use xml::reader::XmlEvent as ReaderEvent;
+use quick_xml::events::Event as XmlEvent;
+use std::str;
use std::fmt::Write;
use std::sync::{Mutex, Arc};
@@ -131,19 +131,23 @@ impl Component {
self.transport.lock().unwrap().write_element(elem)
}
- fn read_event(&self) -> Result<xml::reader::XmlEvent, Error> {
- self.transport.lock().unwrap().read_event()
- }
-
fn connect(&mut self, secret: String) -> Result<(), Error> {
let mut sid = String::new();
loop {
- let e = self.read_event()?;
+ let mut transport = self.transport.lock().unwrap();
+ let e = transport.read_event()?;
match e {
- ReaderEvent::StartElement { attributes, .. } => {
- for attribute in attributes {
- if attribute.name.namespace == None && attribute.name.local_name == "id" {
- sid = attribute.value;
+ XmlEvent::Start(ref e) => {
+ for attr_result in e.attributes() {
+ match attr_result {
+ Ok(attr) => {
+ let name = str::from_utf8(attr.key)?;
+ let value = str::from_utf8(&attr.value)?;
+ if name == "id" {
+ sid = value.to_owned();
+ }
+ },
+ _ => panic!()
}
}
break;
@@ -2,7 +2,7 @@ use transport::Transport;
use error::Error;
use ns;
-use xml::writer::XmlEvent as WriterEvent;
+use quick_xml::events::{Event as WriterEvent, BytesStart, BytesEnd};
pub trait Connection {
type InitError;
@@ -23,16 +23,20 @@ impl Connection for C2S {
fn namespace() -> &'static str { ns::CLIENT }
fn init<T: Transport>(transport: &mut T, domain: &str, id: &str) -> Result<(), Error> {
- transport.write_event(WriterEvent::start_element("stream:stream")
- .attr("to", domain)
- .attr("id", id)
- .default_ns(ns::CLIENT)
- .ns("stream", ns::STREAM))?;
+ let name = "stream:stream";
+ let mut elem = BytesStart::borrowed(name.as_bytes(), name.len());
+ elem.push_attribute(("to", domain));
+ elem.push_attribute(("id", id));
+ elem.push_attribute(("xmlns", ns::CLIENT));
+ elem.push_attribute(("xmlns:stream", ns::STREAM));
+ transport.write_event(WriterEvent::Start(elem))?;
Ok(())
}
fn close<T: Transport>(transport: &mut T) -> Result<(), Error> {
- transport.write_event(WriterEvent::end_element())?;
+ let name = "stream:stream";
+ let elem = BytesEnd::borrowed(name.as_bytes());
+ transport.write_event(WriterEvent::End(elem))?;
Ok(())
}
}
@@ -46,16 +50,20 @@ impl Connection for Component2S {
fn namespace() -> &'static str { ns::COMPONENT_ACCEPT }
fn init<T: Transport>(transport: &mut T, domain: &str, id: &str) -> Result<(), Error> {
- transport.write_event(WriterEvent::start_element("stream:stream")
- .attr("to", domain)
- .attr("id", id)
- .default_ns(ns::COMPONENT_ACCEPT)
- .ns("stream", ns::STREAM))?;
+ let name = "stream:stream";
+ let mut elem = BytesStart::borrowed(name.as_bytes(), name.len());
+ elem.push_attribute(("to", domain));
+ elem.push_attribute(("id", id));
+ elem.push_attribute(("xmlns", ns::COMPONENT_ACCEPT));
+ elem.push_attribute(("xmlns:stream", ns::STREAM));
+ transport.write_event(WriterEvent::Start(elem))?;
Ok(())
}
fn close<T: Transport>(transport: &mut T) -> Result<(), Error> {
- transport.write_event(WriterEvent::end_element())?;
+ let name = "stream:stream";
+ let elem = BytesEnd::borrowed(name.as_bytes());
+ transport.write_event(WriterEvent::End(elem))?;
Ok(())
}
}
@@ -5,12 +5,12 @@ use std::fmt::Error as FormatError;
use std::io;
use std::net::TcpStream;
+use std::str::Utf8Error;
use openssl::ssl::HandshakeError;
use openssl::error::ErrorStack;
-use xml::reader::Error as XmlError;
-use xml::writer::Error as EmitterError;
+use quick_xml::errors::Error as XmlError;
use minidom::Error as MinidomError;
@@ -22,7 +22,6 @@ use components::sasl_error::SaslError;
#[derive(Debug)]
pub enum Error {
XmlError(XmlError),
- EmitterError(EmitterError),
IoError(io::Error),
HandshakeError(HandshakeError<TcpStream>),
OpenSslErrorStack(ErrorStack),
@@ -31,6 +30,7 @@ pub enum Error {
SaslError(Option<String>),
XmppSaslError(SaslError),
FormatError(FormatError),
+ Utf8Error(Utf8Error),
StreamError,
EndOfDocument,
}
@@ -41,12 +41,6 @@ impl From<XmlError> for Error {
}
}
-impl From<EmitterError> for Error {
- fn from(err: EmitterError) -> Error {
- Error::EmitterError(err)
- }
-}
-
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error {
Error::IoError(err)
@@ -82,3 +76,9 @@ impl From<FormatError> for Error {
Error::FormatError(err)
}
}
+
+impl From<Utf8Error> for Error {
+ fn from(err: Utf8Error) -> Error {
+ Error::Utf8Error(err)
+ }
+}
@@ -1,4 +1,4 @@
-extern crate xml;
+extern crate quick_xml;
extern crate xmpp_parsers;
extern crate openssl;
extern crate minidom;
@@ -10,7 +10,7 @@ use jid::Jid;
use plugins::stanza::Iq;
use plugins::disco::DiscoPlugin;
use xmpp_parsers::iq::{IqType, IqSetPayload};
-use xmpp_parsers::ibb::{IBB, Stanza};
+use xmpp_parsers::ibb::{Open, Data, Close, Stanza};
use xmpp_parsers::stanza_error::{StanzaError, ErrorType, DefinedCondition};
use xmpp_parsers::ns;
@@ -86,74 +86,79 @@ impl IbbPlugin {
}
}
- fn handle_ibb(&self, from: Jid, ibb: IBB) -> Result<(), StanzaError> {
+ fn handle_ibb_open(&self, from: Jid, open: Open) -> Result<(), StanzaError> {
let mut sessions = self.sessions.lock().unwrap();
- match ibb {
- IBB::Open { block_size, sid, stanza } => {
- match sessions.entry((from.clone(), sid.clone())) {
- Entry::Vacant(_) => Ok(()),
- Entry::Occupied(_) => Err(generate_error(
- ErrorType::Cancel,
- DefinedCondition::NotAcceptable,
- "This session is already open."
- )),
- }?;
- let session = Session {
- stanza,
- block_size,
- cur_seq: 65535u16,
- };
- sessions.insert((from, sid), session.clone());
- self.proxy.dispatch(IbbOpen {
- session: session,
- });
- },
- IBB::Data { seq, sid, data } => {
- let entry = match sessions.entry((from, sid)) {
- Entry::Occupied(entry) => Ok(entry),
- Entry::Vacant(_) => Err(generate_error(
- ErrorType::Cancel,
- DefinedCondition::ItemNotFound,
- "This session doesnβt exist."
- )),
- }?;
- let mut session = entry.into_mut();
- if session.stanza != Stanza::Iq {
- return Err(generate_error(
- ErrorType::Cancel,
- DefinedCondition::NotAcceptable,
- "Wrong stanza type."
- ))
- }
- let cur_seq = session.cur_seq.wrapping_add(1);
- if seq != cur_seq {
- return Err(generate_error(
- ErrorType::Cancel,
- DefinedCondition::NotAcceptable,
- "Wrong seq number."
- ))
- }
- session.cur_seq = cur_seq;
- self.proxy.dispatch(IbbData {
- session: session.clone(),
- data,
- });
- },
- IBB::Close { sid } => {
- let entry = match sessions.entry((from, sid)) {
- Entry::Occupied(entry) => Ok(entry),
- Entry::Vacant(_) => Err(generate_error(
- ErrorType::Cancel,
- DefinedCondition::ItemNotFound,
- "This session doesnβt exist."
- )),
- }?;
- let session = entry.remove();
- self.proxy.dispatch(IbbClose {
- session,
- });
- },
+ let Open { block_size, sid, stanza } = open;
+ match sessions.entry((from.clone(), sid.clone())) {
+ Entry::Vacant(_) => Ok(()),
+ Entry::Occupied(_) => Err(generate_error(
+ ErrorType::Cancel,
+ DefinedCondition::NotAcceptable,
+ "This session is already open."
+ )),
+ }?;
+ let session = Session {
+ stanza,
+ block_size,
+ cur_seq: 65535u16,
+ };
+ sessions.insert((from, sid), session.clone());
+ self.proxy.dispatch(IbbOpen {
+ session: session,
+ });
+ Ok(())
+ }
+
+ fn handle_ibb_data(&self, from: Jid, data: Data) -> Result<(), StanzaError> {
+ let mut sessions = self.sessions.lock().unwrap();
+ let Data { seq, sid, data } = data;
+ let entry = match sessions.entry((from, sid)) {
+ Entry::Occupied(entry) => Ok(entry),
+ Entry::Vacant(_) => Err(generate_error(
+ ErrorType::Cancel,
+ DefinedCondition::ItemNotFound,
+ "This session doesnβt exist."
+ )),
+ }?;
+ let session = entry.into_mut();
+ if session.stanza != Stanza::Iq {
+ return Err(generate_error(
+ ErrorType::Cancel,
+ DefinedCondition::NotAcceptable,
+ "Wrong stanza type."
+ ))
+ }
+ let cur_seq = session.cur_seq.wrapping_add(1);
+ if seq != cur_seq {
+ return Err(generate_error(
+ ErrorType::Cancel,
+ DefinedCondition::NotAcceptable,
+ "Wrong seq number."
+ ))
}
+ session.cur_seq = cur_seq;
+ self.proxy.dispatch(IbbData {
+ session: session.clone(),
+ data,
+ });
+ Ok(())
+ }
+
+ fn handle_ibb_close(&self, from: Jid, close: Close) -> Result<(), StanzaError> {
+ let mut sessions = self.sessions.lock().unwrap();
+ let Close { sid } = close;
+ let entry = match sessions.entry((from, sid)) {
+ Entry::Occupied(entry) => Ok(entry),
+ Entry::Vacant(_) => Err(generate_error(
+ ErrorType::Cancel,
+ DefinedCondition::ItemNotFound,
+ "This session doesnβt exist."
+ )),
+ }?;
+ let session = entry.remove();
+ self.proxy.dispatch(IbbClose {
+ session,
+ });
Ok(())
}
@@ -164,8 +169,20 @@ impl IbbPlugin {
let id = iq.id.unwrap();
// TODO: use an intermediate plugin to parse this payload.
let payload = match IqSetPayload::try_from(payload) {
- Ok(IqSetPayload::IBB(ibb)) => {
- match self.handle_ibb(from.clone(), ibb) {
+ Ok(IqSetPayload::IbbOpen(open)) => {
+ match self.handle_ibb_open(from.clone(), open) {
+ Ok(_) => IqType::Result(None),
+ Err(error) => IqType::Error(error),
+ }
+ },
+ Ok(IqSetPayload::IbbData(data)) => {
+ match self.handle_ibb_data(from.clone(), data) {
+ Ok(_) => IqType::Result(None),
+ Err(error) => IqType::Error(error),
+ }
+ },
+ Ok(IqSetPayload::IbbClose(close)) => {
+ match self.handle_ibb_close(from.clone(), close) {
Ok(_) => IqType::Result(None),
Err(error) => IqType::Error(error),
}
@@ -7,9 +7,9 @@ use error::Error;
use jid::Jid;
use plugins::stanza::Message;
-use xmpp_parsers::message::{MessagePayload, MessageType};
+use xmpp_parsers::message::{MessagePayload, MessageType, Body};
use xmpp_parsers::chatstates::ChatState;
-use xmpp_parsers::receipts::Receipt;
+use xmpp_parsers::receipts::{Request, Received};
use xmpp_parsers::stanza_id::StanzaId;
// TODO: use the id (maybe even stanza-id) to identify every message.
@@ -70,7 +70,7 @@ impl MessagingPlugin {
id: Some(self.proxy.gen_id()),
bodies: {
let mut bodies = BTreeMap::new();
- bodies.insert(String::new(), String::from(body));
+ bodies.insert(String::new(), Body(body.to_owned()));
bodies
},
subjects: BTreeMap::new(),
@@ -98,11 +98,11 @@ impl MessagingPlugin {
chat_state: chat_state,
}),
// XEP-0184
- MessagePayload::Receipt(Receipt::Request) => self.proxy.dispatch(ReceiptRequestEvent {
+ MessagePayload::ReceiptRequest(Request) => self.proxy.dispatch(ReceiptRequestEvent {
from: from.clone(),
}),
// XEP-0184
- MessagePayload::Receipt(Receipt::Received(id)) => self.proxy.dispatch(ReceiptReceivedEvent {
+ MessagePayload::ReceiptReceived(Received {id}) => self.proxy.dispatch(ReceiptReceivedEvent {
from: from.clone(),
id: id.unwrap(),
}),
@@ -118,9 +118,9 @@ impl MessagingPlugin {
if message.bodies.contains_key("") {
self.proxy.dispatch(MessageEvent {
from: from,
- body: message.bodies[""].clone(),
- subject: if message.subjects.contains_key("") { Some(message.subjects[""].clone()) } else { None },
- thread: message.thread.clone(),
+ body: message.bodies[""].clone().0,
+ subject: if message.subjects.contains_key("") { Some(message.subjects[""].clone().0) } else { None },
+ thread: match message.thread.clone() { Some(thread) => Some(thread.0), None => None },
});
}
Propagation::Stop
@@ -77,7 +77,7 @@ impl RosterPlugin {
// TODO: use a better error type.
pub fn send_roster_set(&self, to: Option<Jid>, item: Item) -> Result<(), String> {
- if item.subscription.is_some() && item.subscription != Some(Subscription::Remove) {
+ if item.subscription != Subscription::Remove {
return Err(String::from("Subscription must be either nothing or Remove."));
}
let iq = Iq {
@@ -117,10 +117,10 @@ impl RosterPlugin {
let mut jids = self.jids.lock().unwrap();
let previous = jids.insert(item.jid.clone(), item.clone());
if previous.is_none() {
- assert!(item.subscription != Some(Subscription::Remove));
+ assert!(item.subscription != Subscription::Remove);
self.proxy.dispatch(RosterPush::Added(item));
} else {
- if item.subscription == Some(Subscription::Remove) {
+ if item.subscription == Subscription::Remove {
self.proxy.dispatch(RosterPush::Removed(item));
} else {
self.proxy.dispatch(RosterPush::Modified(item));
@@ -1,11 +1,13 @@
//! Provides transports for the xml streams.
+use std::io::BufReader;
use std::io::prelude::*;
+use std::str;
use std::net::{TcpStream, Shutdown};
-use xml::reader::{EventReader, XmlEvent as XmlReaderEvent};
-use xml::writer::{EventWriter, XmlEvent as XmlWriterEvent, EmitterConfig};
+use quick_xml::reader::{Reader as EventReader};
+use quick_xml::events::Event;
use std::sync::{Arc, Mutex};
@@ -24,11 +26,11 @@ use sasl::common::ChannelBinding;
/// A trait which transports are required to implement.
pub trait Transport {
- /// Writes an `xml::writer::XmlEvent` to the stream.
- fn write_event<'a, E: Into<XmlWriterEvent<'a>>>(&mut self, event: E) -> Result<(), Error>;
+ /// Writes a `quick_xml::events::Event` to the stream.
+ fn write_event<'a, E: Into<Event<'a>>>(&mut self, event: E) -> Result<(), Error>;
- /// Reads an `xml::reader::XmlEvent` from the stream.
- fn read_event(&mut self) -> Result<XmlReaderEvent, Error>;
+ /// Reads a `quick_xml::events::Event` from the stream.
+ fn read_event(&mut self) -> Result<Event, Error>;
/// Writes a `minidom::Element` to the stream.
fn write_element(&mut self, element: &minidom::Element) -> Result<(), Error>;
@@ -48,19 +50,21 @@ pub trait Transport {
/// A plain text transport, completely unencrypted.
pub struct PlainTransport {
inner: Arc<Mutex<TcpStream>>, // TODO: this feels rather ugly
- reader: EventReader<LockedIO<TcpStream>>, // TODO: especially feels ugly because
- // this read would keep the lock
- // held very long (potentially)
- writer: EventWriter<LockedIO<TcpStream>>,
+ // TODO: especially feels ugly because this read would keep the lock held very long
+ // (potentially)
+ reader: EventReader<BufReader<LockedIO<TcpStream>>>,
+ writer: LockedIO<TcpStream>,
+ buf: Vec<u8>,
}
impl Transport for PlainTransport {
- fn write_event<'a, E: Into<XmlWriterEvent<'a>>>(&mut self, event: E) -> Result<(), Error> {
- Ok(self.writer.write(event)?)
+ fn write_event<'a, E: Into<Event<'a>>>(&mut self, event: E) -> Result<(), Error> {
+ self.writer.write(&event.into())?;
+ Ok(())
}
- fn read_event(&mut self) -> Result<XmlReaderEvent, Error> {
- Ok(self.reader.next()?)
+ fn read_event(&mut self) -> Result<Event, Error> {
+ Ok(self.reader.read_event(&mut self.buf)?)
}
fn write_element(&mut self, element: &minidom::Element) -> Result<(), Error> {
@@ -74,13 +78,8 @@ impl Transport for PlainTransport {
fn reset_stream(&mut self) {
let locked_io = LockedIO::from(self.inner.clone());
- self.reader = EventReader::new(locked_io.clone());
- self.writer = EventWriter::new_with_config(locked_io, EmitterConfig {
- line_separator: "".into(),
- perform_indent: false,
- normalize_empty_elements: false,
- .. Default::default()
- });
+ self.reader = EventReader::from_reader(BufReader::new(locked_io.clone()));
+ self.writer = locked_io;
}
fn channel_bind(&self) -> ChannelBinding {
@@ -93,21 +92,16 @@ impl PlainTransport {
/// Connects to a server without any encryption.
pub fn connect(host: &str, port: u16) -> Result<PlainTransport, Error> {
let tcp_stream = TcpStream::connect((host, port))?;
- let parser = EventReader::new(tcp_stream);
- let parser_stream = parser.into_inner();
- let stream = Arc::new(Mutex::new(parser_stream));
+ let stream = Arc::new(Mutex::new(tcp_stream));
let locked_io = LockedIO::from(stream.clone());
- let reader = EventReader::new(locked_io.clone());
- let writer = EventWriter::new_with_config(locked_io, EmitterConfig {
- line_separator: "".into(),
- perform_indent: false,
- normalize_empty_elements: false,
- .. Default::default()
- });
+ let reader = EventReader::from_reader(BufReader::new(locked_io.clone()));
+ let writer = locked_io;
+
Ok(PlainTransport {
inner: stream,
reader: reader,
writer: writer,
+ buf: Vec::new(),
})
}
@@ -123,19 +117,21 @@ impl PlainTransport {
/// A transport which uses STARTTLS.
pub struct SslTransport {
inner: Arc<Mutex<SslStream<TcpStream>>>, // TODO: this feels rather ugly
- reader: EventReader<LockedIO<SslStream<TcpStream>>>, // TODO: especially feels ugly because
- // this read would keep the lock
- // held very long (potentially)
- writer: EventWriter<LockedIO<SslStream<TcpStream>>>,
+ // TODO: especially feels ugly because this read would keep the lock held very long
+ // (potentially)
+ reader: EventReader<BufReader<LockedIO<SslStream<TcpStream>>>>,
+ writer: LockedIO<SslStream<TcpStream>>,
+ buf: Vec<u8>,
}
impl Transport for SslTransport {
- fn write_event<'a, E: Into<XmlWriterEvent<'a>>>(&mut self, event: E) -> Result<(), Error> {
- Ok(self.writer.write(event)?)
+ fn write_event<'a, E: Into<Event<'a>>>(&mut self, event: E) -> Result<(), Error> {
+ self.writer.write(&event.into())?;
+ Ok(())
}
- fn read_event(&mut self) -> Result<XmlReaderEvent, Error> {
- Ok(self.reader.next()?)
+ fn read_event(&mut self) -> Result<Event, Error> {
+ Ok(self.reader.read_event(&mut self.buf)?)
}
fn write_element(&mut self, element: &minidom::Element) -> Result<(), Error> {
@@ -148,13 +144,8 @@ impl Transport for SslTransport {
fn reset_stream(&mut self) {
let locked_io = LockedIO::from(self.inner.clone());
- self.reader = EventReader::new(locked_io.clone());
- self.writer = EventWriter::new_with_config(locked_io, EmitterConfig {
- line_separator: "".into(),
- perform_indent: false,
- normalize_empty_elements: false,
- .. Default::default()
- });
+ self.reader = EventReader::from_reader(BufReader::new(locked_io.clone()));
+ self.writer = locked_io;
}
fn channel_bind(&self) -> ChannelBinding {
@@ -172,23 +163,29 @@ impl SslTransport {
, ns::CLIENT, ns::STREAM, host)?;
write!(stream, "<starttls xmlns='{}'/>"
, ns::TLS)?;
- let mut parser = EventReader::new(stream);
- loop { // TODO: possibly a timeout?
- match parser.next()? {
- XmlReaderEvent::StartElement { name, .. } => {
- if let Some(ns) = name.namespace {
- if ns == ns::TLS && name.local_name == "proceed" {
- break;
- }
- else if ns == ns::STREAM && name.local_name == "error" {
- return Err(Error::StreamError);
+ {
+ let mut parser = EventReader::from_reader(BufReader::new(&stream));
+ let mut buf = Vec::new();
+ let ns_buf = Vec::new();
+ loop { // TODO: possibly a timeout?
+ match parser.read_event(&mut buf)? {
+ Event::Start(ref e) => {
+ let (namespace, local_name) = parser.resolve_namespace(e.name(), &ns_buf);
+ let namespace = namespace.map(str::from_utf8);
+ let local_name = str::from_utf8(local_name)?;
+
+ if let Some(ns) = namespace {
+ if ns == Ok(ns::TLS) && local_name == "proceed" {
+ break;
+ } else if ns == Ok(ns::STREAM) && local_name == "error" {
+ return Err(Error::StreamError);
+ }
}
- }
- },
- _ => {},
+ },
+ _ => (),
+ }
}
}
- let stream = parser.into_inner();
#[cfg(feature = "insecure")]
let ssl_stream = {
let mut ctx = SslContextBuilder::new(SslMethod::tls())?;
@@ -203,17 +200,14 @@ impl SslTransport {
};
let ssl_stream = Arc::new(Mutex::new(ssl_stream));
let locked_io = LockedIO::from(ssl_stream.clone());
- let reader = EventReader::new(locked_io.clone());
- let writer = EventWriter::new_with_config(locked_io, EmitterConfig {
- line_separator: "".into(),
- perform_indent: false,
- normalize_empty_elements: false,
- .. Default::default()
- });
+ let reader = EventReader::from_reader(BufReader::new(locked_io.clone()));
+ let writer = locked_io;
+
Ok(SslTransport {
inner: ssl_stream,
reader: reader,
writer: writer,
+ buf: Vec::new(),
})
}