1use futures::{future, Future, Sink, Stream};
2use std::convert::TryFrom;
3use std::env::args;
4use std::process::exit;
5use tokio::runtime::current_thread::Runtime;
6use tokio_xmpp::{Client, Packet};
7use xmpp_parsers::{Jid, Element};
8use xmpp_parsers::message::{Body, Message, MessageType};
9use xmpp_parsers::presence::{Presence, Show as PresenceShow, Type as PresenceType};
10
11fn main() {
12 let args: Vec<String> = args().collect();
13 if args.len() != 3 {
14 println!("Usage: {} <jid> <password>", args[0]);
15 exit(1);
16 }
17 let jid = &args[1];
18 let password = &args[2];
19
20 // tokio_core context
21 let mut rt = Runtime::new().unwrap();
22 // Client instance
23 let client = Client::new(jid, password).unwrap();
24
25 // Make the two interfaces for sending and receiving independent
26 // of each other so we can move one into a closure.
27 let (sink, stream) = client.split();
28
29 // Create outgoing pipe
30 let (mut tx, rx) = futures::unsync::mpsc::unbounded();
31 rt.spawn(
32 rx.forward(
33 sink.sink_map_err(|_| panic!("Pipe"))
34 )
35 .map(|(rx, mut sink)| {
36 drop(rx);
37 let _ = sink.close();
38 })
39 .map_err(|e| {
40 panic!("Send error: {:?}", e);
41 })
42 );
43
44 // Main loop, processes events
45 let mut wait_for_stream_end = false;
46 let done = stream.for_each(move |event| {
47 if wait_for_stream_end {
48 /* Do nothing */
49 } else if event.is_online() {
50 let jid = event.get_jid()
51 .map(|jid| format!("{}", jid))
52 .unwrap_or("unknown".to_owned());
53 println!("Online at {}", jid);
54
55 let presence = make_presence();
56 tx.start_send(Packet::Stanza(presence)).unwrap();
57 } else if let Some(message) = event
58 .into_stanza()
59 .and_then(|stanza| Message::try_from(stanza).ok())
60 {
61 match (message.from, message.bodies.get("")) {
62 (Some(ref from), Some(ref body)) if body.0 == "die" => {
63 println!("Secret die command triggered by {}", from);
64 wait_for_stream_end = true;
65 tx.start_send(Packet::StreamEnd).unwrap();
66 }
67 (Some(ref from), Some(ref body)) => {
68 if message.type_ != MessageType::Error {
69 // This is a message we'll echo
70 let reply = make_reply(from.clone(), &body.0);
71 tx.start_send(Packet::Stanza(reply)).unwrap();
72 }
73 }
74 _ => {}
75 }
76 }
77
78 future::ok(())
79 });
80
81 // Start polling `done`
82 match rt.block_on(done) {
83 Ok(_) => (),
84 Err(e) => {
85 println!("Fatal: {}", e);
86 ()
87 }
88 }
89}
90
91// Construct a <presence/>
92fn make_presence() -> Element {
93 let mut presence = Presence::new(PresenceType::None);
94 presence.show = Some(PresenceShow::Chat);
95 presence
96 .statuses
97 .insert(String::from("en"), String::from("Echoing messages."));
98 presence.into()
99}
100
101// Construct a chat <message/>
102fn make_reply(to: Jid, body: &str) -> Element {
103 let mut message = Message::new(Some(to));
104 message.bodies.insert(String::new(), Body(body.to_owned()));
105 message.into()
106}