1use futures::{future, Future, Sink, Stream};
2use std::convert::TryFrom;
3use std::env::args;
4use std::process::exit;
5use tokio::runtime::current_thread::Runtime;
6use tokio_xmpp::{Client, Packet};
7use xmpp_parsers::{Jid, Element};
8use xmpp_parsers::message::{Body, Message, MessageType};
9use xmpp_parsers::presence::{Presence, Show as PresenceShow, Type as PresenceType};
10
11fn main() {
12 let args: Vec<String> = args().collect();
13 if args.len() != 3 {
14 println!("Usage: {} <jid> <password>", args[0]);
15 exit(1);
16 }
17 let jid = &args[1];
18 let password = &args[2];
19
20 // tokio_core context
21 let mut rt = Runtime::new().unwrap();
22 // Client instance
23 let client = Client::new(jid, password).unwrap();
24
25 // Make the two interfaces for sending and receiving independent
26 // of each other so we can move one into a closure.
27 let (sink, stream) = client.split();
28
29 // Create outgoing pipe
30 let (mut tx, rx) = futures::unsync::mpsc::unbounded();
31 rt.spawn(
32 rx.forward(
33 sink.sink_map_err(|_| panic!("Pipe"))
34 )
35 .map(|(rx, mut sink)| {
36 drop(rx);
37 let _ = sink.close();
38 })
39 .map_err(|e| {
40 panic!("Send error: {:?}", e);
41 })
42 );
43
44 // Main loop, processes events
45 let mut wait_for_stream_end = false;
46 let done = stream.for_each(move |event| {
47 if wait_for_stream_end {
48 /* Do nothing */
49 } else if event.is_online() {
50 println!("Online!");
51
52 let presence = make_presence();
53 tx.start_send(Packet::Stanza(presence)).unwrap();
54 } else if let Some(message) = event
55 .into_stanza()
56 .and_then(|stanza| Message::try_from(stanza).ok())
57 {
58 match (message.from, message.bodies.get("")) {
59 (Some(ref from), Some(ref body)) if body.0 == "die" => {
60 println!("Secret die command triggered by {}", from.clone());
61 wait_for_stream_end = true;
62 tx.start_send(Packet::StreamEnd).unwrap();
63 }
64 (Some(ref from), Some(ref body)) => {
65 if message.type_ != MessageType::Error {
66 // This is a message we'll echo
67 let reply = make_reply(from.clone(), &body.0);
68 tx.start_send(Packet::Stanza(reply)).unwrap();
69 }
70 }
71 _ => {}
72 }
73 }
74
75 future::ok(())
76 });
77
78 // Start polling `done`
79 match rt.block_on(done) {
80 Ok(_) => (),
81 Err(e) => {
82 println!("Fatal: {}", e);
83 ()
84 }
85 }
86}
87
88// Construct a <presence/>
89fn make_presence() -> Element {
90 let mut presence = Presence::new(PresenceType::None);
91 presence.show = Some(PresenceShow::Chat);
92 presence
93 .statuses
94 .insert(String::from("en"), String::from("Echoing messages."));
95 presence.into()
96}
97
98// Construct a chat <message/>
99fn make_reply(to: Jid, body: &str) -> Element {
100 let mut message = Message::new(Some(to));
101 message.bodies.insert(String::new(), Body(body.to_owned()));
102 message.into()
103}