xmpp_codec.rs

  1use std;
  2use std::fmt::Write;
  3use std::str::from_utf8;
  4use std::io::{Error, ErrorKind};
  5use std::collections::HashMap;
  6use tokio_io::{AsyncRead, AsyncWrite};
  7use tokio_io::codec::{Framed, Encoder, Decoder};
  8use xml;
  9use bytes::*;
 10
 11const NS_XMLNS: &'static str = "http://www.w3.org/2000/xmlns/";
 12const NS_STREAMS: &'static str = "http://etherx.jabber.org/streams";
 13const NS_CLIENT: &'static str = "jabber:client";
 14
 15struct XMPPRoot {
 16    builder: xml::ElementBuilder,
 17    pub attributes: HashMap<(String, Option<String>), String>,
 18}
 19
 20impl XMPPRoot {
 21    fn new(root: xml::StartTag) -> Self {
 22        let mut builder = xml::ElementBuilder::new();
 23        let mut attributes = HashMap::new();
 24        println!("root attributes: {:?}", root.attributes);
 25        for (name_ns, value) in root.attributes {
 26            match name_ns {
 27                (ref name, None) if name == "xmlns" =>
 28                    builder.set_default_ns(value),
 29                (ref prefix, Some(ref ns)) if ns == NS_XMLNS =>
 30                    builder.define_prefix(prefix.to_owned(), value),
 31                _ => {
 32                    attributes.insert(name_ns, value);
 33                },
 34            }
 35        }
 36
 37        XMPPRoot {
 38            builder: builder,
 39            attributes: attributes,
 40        }
 41    }
 42
 43    fn handle_event(&mut self, event: Result<xml::Event, xml::ParserError>)
 44                    -> Option<Result<xml::Element, xml::BuilderError>> {
 45        self.builder.handle_event(event)
 46    }
 47}
 48
 49#[derive(Debug)]
 50pub enum Packet {
 51    Error(Box<std::error::Error>),
 52    StreamStart,
 53    Stanza(xml::Element),
 54    StreamEnd,
 55}
 56
 57pub type XMPPStream<T> = Framed<T, XMPPCodec>;
 58
 59pub struct XMPPCodec {
 60    parser: xml::Parser,
 61    root: Option<XMPPRoot>,
 62}
 63
 64impl XMPPCodec {
 65    pub fn new() -> Self {
 66        XMPPCodec {
 67            parser: xml::Parser::new(),
 68            root: None,
 69        }
 70    }
 71
 72    pub fn frame_stream<S>(stream: S) -> Framed<S, XMPPCodec>
 73        where S: AsyncRead + AsyncWrite
 74    {
 75        AsyncRead::framed(stream, XMPPCodec::new())
 76    }
 77}
 78
 79impl Decoder for XMPPCodec {
 80    type Item = Packet;
 81    type Error = Error;
 82
 83    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
 84        println!("XMPPCodec.decode {:?}", buf.len());
 85        match from_utf8(buf.take().as_ref()) {
 86            Ok(s) =>
 87                self.parser.feed_str(s),
 88            Err(e) =>
 89                return Err(Error::new(ErrorKind::InvalidInput, e)),
 90        }
 91
 92        let mut new_root: Option<XMPPRoot> = None;
 93        let mut result = None;
 94        for event in &mut self.parser {
 95            match self.root {
 96                None => {
 97                    // Expecting <stream:stream>
 98                    match event {
 99                        Ok(xml::Event::ElementStart(start_tag)) => {
100                            self.root = Some(XMPPRoot::new(start_tag));
101                            result = Some(Packet::StreamStart);
102                            break
103                        },
104                        Err(e) => {
105                            result = Some(Packet::Error(Box::new(e)));
106                            break
107                        },
108                        _ =>
109                            (),
110                    }
111                }
112
113                Some(ref mut root) => {
114                    match root.handle_event(event) {
115                        None => (),
116                        Some(Ok(stanza)) => {
117                            println!("stanza: {}", stanza);
118                            result = Some(Packet::Stanza(stanza));
119                            break
120                        },
121                        Some(Err(e)) => {
122                            result = Some(Packet::Error(Box::new(e)));
123                            break
124                        }
125                    };
126                },
127            }
128
129            match new_root.take() {
130                None => (),
131                Some(root) => self.root = Some(root),
132            }
133        }
134
135        Ok(result)
136    }
137
138    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Error> {
139        self.decode(buf)
140    }
141}
142
143impl Encoder for XMPPCodec {
144    type Item = Packet;
145    type Error = Error;
146
147    fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
148        match item {
149            Packet::StreamStart => {
150                write!(dst,
151                       "<?xml version='1.0'?>\n
152<stream:stream version='1.0' to='spaceboyz.net' xmlns='{}' xmlns:stream='{}'>\n",
153                       NS_CLIENT, NS_STREAMS)
154                    .map_err(|_| Error::from(ErrorKind::WriteZero))
155            },
156            Packet::Stanza(stanza) =>
157                write!(dst, "{}", stanza)
158                .map_err(|_| Error::from(ErrorKind::InvalidInput)),
159            // TODO: Implement all
160            _ => Ok(())
161        }
162    }
163}