1use futures::{future, Future, Sink, Stream};
2use std::convert::TryFrom;
3use std::env::args;
4use std::process::exit;
5use tokio::runtime::current_thread::Runtime;
6use tokio_xmpp::{Client, Packet};
7use xmpp_parsers::message::{Body, Message, MessageType};
8use xmpp_parsers::presence::{Presence, Show as PresenceShow, Type as PresenceType};
9use xmpp_parsers::{Element, Jid};
10
11fn main() {
12 let args: Vec<String> = args().collect();
13 if args.len() != 3 {
14 println!("Usage: {} <jid> <password>", args[0]);
15 exit(1);
16 }
17 let jid = &args[1];
18 let password = &args[2];
19
20 // tokio_core context
21 let mut rt = Runtime::new().unwrap();
22 // Client instance
23 let client = Client::new(jid, password).unwrap();
24
25 // Make the two interfaces for sending and receiving independent
26 // of each other so we can move one into a closure.
27 let (sink, stream) = client.split();
28
29 // Create outgoing pipe
30 let (mut tx, rx) = futures::unsync::mpsc::unbounded();
31 rt.spawn(
32 rx.forward(sink.sink_map_err(|_| panic!("Pipe")))
33 .map(|(rx, mut sink)| {
34 drop(rx);
35 let _ = sink.close();
36 })
37 .map_err(|e| {
38 panic!("Send error: {:?}", e);
39 }),
40 );
41
42 // Main loop, processes events
43 let mut wait_for_stream_end = false;
44 let done = stream.for_each(move |event| {
45 if wait_for_stream_end {
46 /* Do nothing */
47 } else if event.is_online() {
48 let jid = event
49 .get_jid()
50 .map(|jid| format!("{}", jid))
51 .unwrap_or("unknown".to_owned());
52 println!("Online at {}", jid);
53
54 let presence = make_presence();
55 tx.start_send(Packet::Stanza(presence)).unwrap();
56 } else if let Some(message) = event
57 .into_stanza()
58 .and_then(|stanza| Message::try_from(stanza).ok())
59 {
60 match (message.from, message.bodies.get("")) {
61 (Some(ref from), Some(ref body)) if body.0 == "die" => {
62 println!("Secret die command triggered by {}", from);
63 wait_for_stream_end = true;
64 tx.start_send(Packet::StreamEnd).unwrap();
65 }
66 (Some(ref from), Some(ref body)) => {
67 if message.type_ != MessageType::Error {
68 // This is a message we'll echo
69 let reply = make_reply(from.clone(), &body.0);
70 tx.start_send(Packet::Stanza(reply)).unwrap();
71 }
72 }
73 _ => {}
74 }
75 }
76
77 future::ok(())
78 });
79
80 // Start polling `done`
81 match rt.block_on(done) {
82 Ok(_) => (),
83 Err(e) => {
84 println!("Fatal: {}", e);
85 ()
86 }
87 }
88}
89
90// Construct a <presence/>
91fn make_presence() -> Element {
92 let mut presence = Presence::new(PresenceType::None);
93 presence.show = Some(PresenceShow::Chat);
94 presence
95 .statuses
96 .insert(String::from("en"), String::from("Echoing messages."));
97 presence.into()
98}
99
100// Construct a chat <message/>
101fn make_reply(to: Jid, body: &str) -> Element {
102 let mut message = Message::new(Some(to));
103 message.bodies.insert(String::new(), Body(body.to_owned()));
104 message.into()
105}