1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#![deny(bare_trait_objects)]
8
9use std::str::FromStr;
10use futures::{Future,Stream, Sink, sync::mpsc};
11use tokio_xmpp::{
12 Client as TokioXmppClient,
13 Event as TokioXmppEvent,
14 Packet,
15};
16use xmpp_parsers::{
17 caps::{compute_disco, hash_caps, Caps},
18 disco::{DiscoInfoQuery, DiscoInfoResult, Feature, Identity},
19 hashes::Algo,
20 iq::{Iq, IqType},
21 message::{Message, MessageType, Body},
22 muc::{
23 Muc,
24 user::{MucUser, Status},
25 },
26 ns,
27 presence::{Presence, Type as PresenceType},
28 pubsub::{
29 event::PubSubEvent,
30 pubsub::PubSub,
31 },
32 roster::{Roster, Item as RosterItem},
33 stanza_error::{StanzaError, ErrorType, DefinedCondition},
34 Jid, JidParseError, TryFrom,
35};
36
37mod avatar;
38
39pub type Error = tokio_xmpp::Error;
40
41#[derive(Debug)]
42pub enum ClientType {
43 Bot,
44 Pc,
45}
46
47impl Default for ClientType {
48 fn default() -> Self {
49 ClientType::Bot
50 }
51}
52
53impl ToString for ClientType {
54 fn to_string(&self) -> String {
55 String::from(
56 match self {
57 ClientType::Bot => "bot",
58 ClientType::Pc => "pc",
59 }
60 )
61 }
62}
63
64#[derive(PartialEq)]
65pub enum ClientFeature {
66 Avatars,
67 ContactList,
68}
69
70pub enum Event {
71 Online,
72 Disconnected,
73 ContactAdded(RosterItem),
74 ContactRemoved(RosterItem),
75 ContactChanged(RosterItem),
76 AvatarRetrieved(Jid, String),
77 RoomJoined(Jid),
78}
79
80#[derive(Default)]
81pub struct ClientBuilder<'a> {
82 jid: &'a str,
83 password: &'a str,
84 website: String,
85 disco: (ClientType, String),
86 features: Vec<ClientFeature>,
87}
88
89impl ClientBuilder<'_> {
90 pub fn new<'a>(jid: &'a str, password: &'a str) -> ClientBuilder<'a> {
91 ClientBuilder {
92 jid,
93 password,
94 website: String::from("https://gitlab.com/xmpp-rs/tokio-xmpp"),
95 disco: (ClientType::default(), String::from("tokio-xmpp")),
96 features: vec![],
97 }
98 }
99
100 pub fn set_client(mut self, type_: ClientType, name: &str) -> Self {
101 self.disco = (type_, String::from(name));
102 self
103 }
104
105 pub fn set_website(mut self, url: &str) -> Self {
106 self.website = String::from(url);
107 self
108 }
109
110 pub fn enable_feature(mut self, feature: ClientFeature) -> Self {
111 self.features.push(feature);
112 self
113 }
114
115 fn make_disco(&self) -> DiscoInfoResult {
116 let identities = vec![Identity::new("client", self.disco.0.to_string(),
117 "en", self.disco.1.to_string())];
118 let mut features = vec![
119 Feature::new(ns::DISCO_INFO),
120 ];
121 if self.features.contains(&ClientFeature::Avatars) {
122 features.push(Feature::new(format!("{}+notify", ns::AVATAR_METADATA)));
123 }
124 DiscoInfoResult {
125 node: None,
126 identities,
127 features,
128 extensions: vec![],
129 }
130 }
131
132 fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
133 let caps_data = compute_disco(disco);
134 let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
135 let caps = Caps::new(node, hash);
136
137 let mut presence = Presence::new(PresenceType::None);
138 presence.add_payload(caps);
139 presence
140 }
141
142 pub fn build(
143 self,
144 ) -> Result<(Agent, impl Stream<Item = Event, Error = tokio_xmpp::Error>), JidParseError> {
145 let client = TokioXmppClient::new(self.jid, self.password)?;
146 Ok(self.build_impl(client))
147 }
148
149 // This function is meant to be used for testing build
150 pub(crate) fn build_impl<S>(
151 self,
152 stream: S,
153 ) -> (Agent, impl Stream<Item = Event, Error = tokio_xmpp::Error>)
154 where
155 S: Stream<Item = tokio_xmpp::Event, Error = tokio_xmpp::Error>
156 + Sink<SinkItem = tokio_xmpp::Packet, SinkError = tokio_xmpp::Error>,
157 {
158 let disco = self.make_disco();
159 let node = self.website;
160 let (sender_tx, sender_rx) = mpsc::unbounded();
161
162 let client = stream;
163 let (sink, stream) = client.split();
164
165 let reader = {
166 let mut sender_tx = sender_tx.clone();
167 let jid = self.jid.to_owned();
168 stream.map(move |event| {
169 // Helper function to send an iq error.
170 let mut events = Vec::new();
171 let send_error = |to, id, type_, condition, text: &str| {
172 let error = StanzaError::new(type_, condition, "en", text);
173 let iq = Iq::from_error(id, error)
174 .with_to(to)
175 .into();
176 sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
177 };
178
179 match event {
180 TokioXmppEvent::Online => {
181 let presence = ClientBuilder::make_initial_presence(&disco, &node).into();
182 let packet = Packet::Stanza(presence);
183 sender_tx.unbounded_send(packet)
184 .unwrap();
185 events.push(Event::Online);
186 // TODO: only send this when the ContactList feature is enabled.
187 let iq = Iq::from_get("roster", Roster { ver: None, items: vec![] })
188 .into();
189 sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
190 }
191 TokioXmppEvent::Disconnected => {
192 events.push(Event::Disconnected);
193 }
194 TokioXmppEvent::Stanza(stanza) => {
195 if stanza.is("iq", "jabber:client") {
196 let iq = Iq::try_from(stanza).unwrap();
197 if let IqType::Get(payload) = iq.payload {
198 if payload.is("query", ns::DISCO_INFO) {
199 let query = DiscoInfoQuery::try_from(payload);
200 match query {
201 Ok(query) => {
202 let mut disco_info = disco.clone();
203 disco_info.node = query.node;
204 let iq = Iq::from_result(iq.id, Some(disco_info))
205 .with_to(iq.from.unwrap())
206 .into();
207 sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
208 },
209 Err(err) => {
210 send_error(iq.from.unwrap(), iq.id, ErrorType::Modify, DefinedCondition::BadRequest, &format!("{}", err));
211 },
212 }
213 } else {
214 // We MUST answer unhandled get iqs with a service-unavailable error.
215 send_error(iq.from.unwrap(), iq.id, ErrorType::Cancel, DefinedCondition::ServiceUnavailable, "No handler defined for this kind of iq.");
216 }
217 } else if let IqType::Result(Some(payload)) = iq.payload {
218 // TODO: move private iqs like this one somewhere else, for
219 // security reasons.
220 if payload.is("query", ns::ROSTER) && iq.from.is_none() {
221 let roster = Roster::try_from(payload).unwrap();
222 for item in roster.items.into_iter() {
223 events.push(Event::ContactAdded(item));
224 }
225 } else if payload.is("pubsub", ns::PUBSUB) {
226 let pubsub = PubSub::try_from(payload).unwrap();
227 let from =
228 iq.from.clone().unwrap_or(Jid::from_str(&jid).unwrap());
229 if let PubSub::Items(items) = pubsub {
230 if items.node.0 == ns::AVATAR_DATA {
231 let new_events = avatar::handle_data_pubsub_iq(&from, &items);
232 events.extend(new_events);
233 }
234 }
235 }
236 } else if let IqType::Set(_) = iq.payload {
237 // We MUST answer unhandled set iqs with a service-unavailable error.
238 send_error(iq.from.unwrap(), iq.id, ErrorType::Cancel, DefinedCondition::ServiceUnavailable, "No handler defined for this kind of iq.");
239 }
240 } else if stanza.is("message", "jabber:client") {
241 let message = Message::try_from(stanza).unwrap();
242 let from = message.from.clone().unwrap();
243 for child in message.payloads {
244 if child.is("event", ns::PUBSUB_EVENT) {
245 let event = PubSubEvent::try_from(child).unwrap();
246 if let PubSubEvent::PublishedItems { node, items } = event {
247 if node.0 == ns::AVATAR_METADATA {
248 avatar::handle_metadata_pubsub_event(&from, &mut sender_tx, items);
249 }
250 }
251 }
252 }
253 } else if stanza.is("presence", "jabber:client") {
254 let presence = Presence::try_from(stanza).unwrap();
255 let from = presence.from.clone().unwrap();
256 for payload in presence.payloads.into_iter() {
257 let muc_user = match MucUser::try_from(payload) {
258 Ok(muc_user) => muc_user,
259 _ => continue
260 };
261 for status in muc_user.status.into_iter() {
262 if status == Status::SelfPresence {
263 events.push(Event::RoomJoined(from.clone()));
264 break;
265 }
266 }
267 }
268 } else if stanza.is("error", "http://etherx.jabber.org/streams") {
269 println!("Received a fatal stream error: {}", String::from(&stanza));
270 } else {
271 panic!("Unknown stanza: {}", String::from(&stanza));
272 }
273 }
274 }
275
276 futures::stream::iter_ok(events)
277 })
278 .flatten()
279 };
280
281 let sender = sender_rx
282 .map_err(|e| panic!("Sink error: {:?}", e))
283 .forward(sink)
284 .map(|(rx, mut sink)| {
285 drop(rx);
286 let _ = sink.close();
287 None
288 });
289
290 // TODO is this correct?
291 // Some(Error) means a real error
292 // None means the end of the sender stream and can be ignored
293 let future = reader
294 .map(Some)
295 .select(sender.into_stream())
296 .filter_map(|x| x);
297
298 let agent = Agent { sender_tx };
299
300 (agent, future)
301 }
302}
303
304pub struct Client {
305 sender_tx: mpsc::UnboundedSender<Packet>,
306 stream: Box<dyn Stream<Item = Event, Error = Error>>,
307}
308
309impl Client {
310 pub fn get_agent(&self) -> Agent {
311 Agent {
312 sender_tx: self.sender_tx.clone(),
313 }
314 }
315
316 pub fn listen(self) -> Box<dyn Stream<Item = Event, Error = Error>> {
317 self.stream
318 }
319}
320
321pub struct Agent {
322 sender_tx: mpsc::UnboundedSender<Packet>,
323}
324
325impl Agent {
326 pub fn join_room(&mut self, room: Jid, lang: &str, status: &str) {
327 let mut presence = Presence::new(PresenceType::None)
328 .with_to(Some(room))
329 .with_payloads(vec![Muc::new().into()]);
330 presence.set_status(String::from(lang), String::from(status));
331 let presence = presence.into();
332 self.sender_tx.unbounded_send(Packet::Stanza(presence))
333 .unwrap();
334 }
335
336 pub fn send_message(&mut self, recipient: Jid, type_: MessageType, lang: &str, text: &str) {
337 let mut message = Message::new(Some(recipient));
338 message.type_ = type_;
339 message.bodies.insert(String::from(lang), Body(String::from(text)));
340 let message = message.into();
341 self.sender_tx.unbounded_send(Packet::Stanza(message))
342 .unwrap();
343 }
344}