1#!/usr/bin/env ruby
2#
3# Copyright (C) 2017 Denver Gingerich <denver@ossguy.com>
4# Copyright (C) 2017 Stephen Paul Weber <singpolyma@singpolyma.net>
5#
6# This file is part of sgx-catapult.
7#
8# sgx-catapult is free software: you can redistribute it and/or modify it under
9# the terms of the GNU Affero General Public License as published by the Free
10# Software Foundation, either version 3 of the License, or (at your option) any
11# later version.
12#
13# sgx-catapult is distributed in the hope that it will be useful, but WITHOUT
14# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
16# details.
17#
18# You should have received a copy of the GNU Affero General Public License along
19# with sgx-catapult. If not, see <http://www.gnu.org/licenses/>.
20
21require 'blather/client/dsl'
22require 'em-hiredis'
23require 'em-http-request'
24require 'json'
25require 'securerandom'
26require 'time'
27require 'uri'
28require 'webrick'
29
30require 'goliath/api'
31require 'goliath/server'
32require 'log4r'
33
34require_relative 'em_promise'
35
36$stdout.sync = true
37
38puts "Soprani.ca/SMS Gateway for XMPP - Catapult\n"\
39 "==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
40
41if ARGV.size != 9
42 puts "Usage: sgx-catapult.rb <component_jid> <component_password> "\
43 "<server_hostname> <server_port> "\
44 "<redis_hostname> <redis_port> <delivery_receipt_url> "\
45 "<http_listen_port> <mms_proxy_prefix_url>"
46 exit 0
47end
48
49t = Time.now
50puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
51
52def panic(e)
53 puts "Shutting down gateway due to exception: #{e.message}"
54 puts e.backtrace
55 SGXcatapult.shutdown
56 puts 'Gateway has terminated.'
57 EM.stop
58end
59
60def extract_shortcode(dest)
61 num, context = dest.split(';', 2)
62 num if context && context == 'phone-context=ca-us.phone-context.soprani.ca'
63end
64
65class SGXClient < Blather::Client
66 def register_handler(type, *guards, &block)
67 super(type, *guards) do |stanza|
68 begin
69 v = block.call(stanza)
70 v.catch(&method(:panic)) if v.is_a?(Promise)
71 true # Do not run other handlers unless throw :pass
72 rescue Exception => e
73 panic(e)
74 end
75 end
76 end
77end
78
79module SGXcatapult
80 extend Blather::DSL
81
82 @jingle_sids = {}
83 @jingle_fnames = {}
84 @partial_data = {}
85 @client = SGXClient.new
86
87 def self.run
88 client.run
89 end
90
91 # so classes outside this module can write messages, too
92 def self.write(stanza)
93 client.write(stanza)
94 end
95
96 def self.error_msg(orig, query_node, type, name, text=nil)
97 if not query_node.nil?
98 orig.add_child(query_node)
99 orig.type = :error
100 end
101
102 error = Nokogiri::XML::Node.new 'error', orig.document
103 error['type'] = type
104 orig.add_child(error)
105
106 suberr = Nokogiri::XML::Node.new name, orig.document
107 suberr['xmlns'] = 'urn:ietf:params:xml:ns:xmpp-stanzas'
108 error.add_child(suberr)
109
110 # TODO: add some explanatory xml:lang='en' text (see text param)
111 puts "RESPONSE3: #{orig.inspect}"
112 return orig
113 end
114
115 # workqueue_count MUST be 0 or else Blather uses threads!
116 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, workqueue_count: 0
117
118 def self.pass_on_message(m, users_num, jid)
119 # setup delivery receipt; similar to a reply
120 rcpt = ReceiptMessage.new(m.from.stripped)
121 rcpt.from = m.to
122
123 # pass original message (before sending receipt)
124 m.to = jid
125 m.from = "#{users_num}@#{ARGV[0]}"
126
127 puts 'XRESPONSE0: ' + m.inspect
128 write_to_stream m
129
130 # send a delivery receipt back to the sender
131 # TODO: send only when requested per XEP-0184
132 # TODO: pass receipts from target if supported
133
134 # TODO: put in member/instance variable
135 rcpt['id'] = SecureRandom.uuid
136 rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
137 rcvd['xmlns'] = 'urn:xmpp:receipts'
138 rcvd['id'] = m.id
139 rcpt.add_child(rcvd)
140
141 puts 'XRESPONSE1: ' + rcpt.inspect
142 write_to_stream rcpt
143 end
144
145 def self.call_catapult(
146 token, secret, m, pth, body=nil, head={}, code=[200]
147 )
148 EM::HttpRequest.new(
149 "https://api.catapult.inetwork.com/#{pth}"
150 ).public_send(
151 m,
152 head: {
153 'Authorization' => [token, secret]
154 }.merge(head),
155 body: body
156 ).then { |http|
157 puts "API response to send: #{http.response} with code"\
158 " response.code #{http.response_header.status}"
159
160 if code.include?(http.response_header.status)
161 http.response
162 else
163 EMPromise.reject(http.response_header.status)
164 end
165 }
166 end
167
168 def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
169 extra = if murl
170 {
171 media: murl
172 }
173 else
174 {
175 receiptRequested: 'all',
176 callbackUrl: ARGV[6]
177 }
178 end
179
180 call_catapult(
181 token,
182 secret,
183 :post,
184 "v1/users/#{user_id}/messages",
185 JSON.dump(extra.merge(
186 from: usern,
187 to: num_dest,
188 text: s.respond_to?(:body) ? s.body : '',
189 tag:
190 # callbacks need id and resourcepart
191 WEBrick::HTTPUtils.escape(s.id.to_s) +
192 ' ' +
193 WEBrick::HTTPUtils.escape(
194 s.from.resource.to_s
195 )
196 )),
197 {'Content-Type' => 'application/json'},
198 [201]
199 ).catch {
200 # TODO: add text; mention code number
201 EMPromise.reject(
202 [:cancel, 'internal-server-error']
203 )
204 }
205 end
206
207 def self.validate_num(num)
208 EMPromise.resolve(num.to_s).then { |num_dest|
209 if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
210 next num_dest if num_dest[0] == '+'
211 shortcode = extract_shortcode(num_dest)
212 next shortcode if shortcode
213 end
214 # TODO: text re num not (yet) supportd/implmentd
215 EMPromise.reject([:cancel, 'item-not-found'])
216 }
217 end
218
219 def self.fetch_catapult_cred_for(jid)
220 cred_key = "catapult_cred-#{jid.stripped}"
221 REDIS.lrange(cred_key, 0, 3).then { |creds|
222 if creds.length < 4
223 # TODO: add text re credentials not registered
224 EMPromise.reject(
225 [:auth, 'registration-required']
226 )
227 else
228 creds
229 end
230 }
231 end
232
233 message :chat?, :body do |m|
234 EMPromise.all([
235 validate_num(m.to.node),
236 fetch_catapult_cred_for(m.from)
237 ]).then { |(num_dest, creds)|
238 jid_key = "catapult_jid-#{num_dest}"
239 REDIS.get(jid_key).then { |jid|
240 [jid, num_dest] + creds
241 }
242 }.then { |(jid, num_dest, *creds)|
243 # if destination user is in the system pass on directly
244 if jid
245 pass_on_message(m, creds.last, jid)
246 else
247 to_catapult(m, nil, num_dest, *creds)
248 end
249 }.catch { |e|
250 if e.is_a?(Array) && e.length == 2
251 write_to_stream error_msg(m.reply, m.body, *e)
252 else
253 EMPromise.reject(e)
254 end
255 }
256 end
257
258 def self.user_cap_identities
259 [{category: 'client', type: 'sms'}]
260 end
261
262 def self.user_cap_features
263 [
264 "urn:xmpp:receipts",
265 "urn:xmpp:jingle:1", "urn:xmpp:jingle:transports:ibb:1",
266
267 # TODO: add more efficient file transfer mechanisms
268 #"urn:xmpp:jingle:transports:s5b:1",
269
270 # TODO: MUST add all reasonable vers of file-transfer
271 #"urn:xmpp:jingle:apps:file-transfer:4"
272 "urn:xmpp:jingle:apps:file-transfer:3"
273 ]
274 end
275
276 presence :subscribe? do |p|
277 puts "PRESENCE1: #{p.inspect}"
278
279 # subscriptions are allowed from anyone - send reply immediately
280 msg = Blather::Stanza::Presence.new
281 msg.to = p.from
282 msg.from = p.to
283 msg.type = :subscribed
284
285 puts 'RESPONSE5a: ' + msg.inspect
286 write_to_stream msg
287
288 # send a <presence> immediately; not automatically probed for it
289 # TODO: refactor so no "presence :probe? do |p|" duplicate below
290 caps = Blather::Stanza::Capabilities.new
291 # TODO: user a better node URI (?)
292 caps.node = 'http://catapult.sgx.soprani.ca/'
293 caps.identities = user_cap_identities
294 caps.features = user_cap_features
295
296 msg = caps.c
297 msg.to = p.from
298 msg.from = p.to.to_s + '/sgx'
299
300 puts 'RESPONSE5b: ' + msg.inspect
301 write_to_stream msg
302
303 # need to subscribe back so Conversations displays images inline
304 msg = Blather::Stanza::Presence.new
305 msg.to = p.from.to_s.split('/', 2)[0]
306 msg.from = p.to.to_s.split('/', 2)[0]
307 msg.type = :subscribe
308
309 puts 'RESPONSE5c: ' + msg.inspect
310 write_to_stream msg
311 end
312
313 presence :probe? do |p|
314 puts 'PRESENCE2: ' + p.inspect
315
316 caps = Blather::Stanza::Capabilities.new
317 # TODO: user a better node URI (?)
318 caps.node = 'http://catapult.sgx.soprani.ca/'
319 caps.identities = user_cap_identities
320 caps.features = user_cap_features
321
322 msg = caps.c
323 msg.to = p.from
324 msg.from = p.to.to_s + '/sgx'
325
326 puts 'RESPONSE6: ' + msg.inspect
327 write_to_stream msg
328 end
329
330 iq '/iq/ns:jingle', ns: 'urn:xmpp:jingle:1' do |i, jn|
331 puts "IQj: #{i.inspect}"
332
333 if jn[0]['action'] == 'transport-accept'
334 puts "REPLY0: #{i.reply.inspect}"
335 write_to_stream i.reply
336 next
337 elsif jn[0]['action'] == 'session-terminate'
338 # TODO: unexpected (usually we do this; handle?)
339 puts "TERMINATED"
340 next
341 elsif jn[0]['action'] == 'transport-info'
342 # TODO: unexpected, but should handle in a nice way
343 puts "FAIL!!!"
344 next
345 elsif i.type == :error
346 # TODO: do something, maybe terminating the connection
347 puts 'ERROR!!!'
348 next
349 end
350
351 # TODO: should probably confirm we got session-initiate here
352
353 write_to_stream i.reply
354 puts "RESPONSE8: #{i.reply.inspect}"
355
356 msg = Blather::Stanza::Iq.new :set
357 msg.to = i.from
358 msg.from = i.to
359
360 cn = jn.children.find { |v| v.element_name == "content" }
361 puts 'CN-name: ' + cn['name']
362 puts 'JN-sid: ' + jn[0]['sid']
363
364 ibb_found = false
365 last_sid = ''
366 cn.children.each do |child|
367 if child.element_name == 'transport'
368 puts 'TPORT: ' + child.namespace.href
369 last_sid = child['sid']
370 if 'urn:xmpp:jingle:transports:ibb:1' ==
371 child.namespace.href
372
373 ibb_found = true
374 break
375 end
376 end
377 end
378
379 j = Nokogiri::XML::Node.new 'jingle', msg.document
380 j['xmlns'] = 'urn:xmpp:jingle:1'
381 j['sid'] = jn[0]['sid']
382 msg.add_child(j)
383
384 content = Nokogiri::XML::Node.new 'content', msg.document
385 content['name'] = cn['name']
386 content['creator'] = 'initiator'
387 j.add_child(content)
388
389 transport = Nokogiri::XML::Node.new 'transport', msg.document
390 # TODO: make block-size more variable and/or dependent on sender
391 transport['block-size'] = '4096'
392 transport['xmlns'] = 'urn:xmpp:jingle:transports:ibb:1'
393 if ibb_found
394 transport['sid'] = last_sid
395 j['action'] = 'session-accept'
396 j['responder'] = i.from
397
398 dsc = Nokogiri::XML::Node.new 'description', msg.document
399 dsc['xmlns'] = 'urn:xmpp:jingle:apps:file-transfer:3'
400 content.add_child(dsc)
401 else
402 # for Conversations - it tries s5b even if caps ibb-only
403 transport['sid'] = SecureRandom.uuid
404 j['action'] = 'transport-replace'
405 j['initiator'] = i.from
406 end
407 content.add_child(transport)
408
409 @jingle_sids[transport['sid']] = jn[0]['sid']
410
411 # TODO: save <date> as well? Gajim sends, Conversations does not
412 # TODO: save/validate <size> with eventual full received length
413 fname =
414 cn
415 .children.find { |v| v.element_name == "description" }
416 .children.find { |w| w.element_name == "offer" }
417 .children.find { |x| x.element_name == "file" }
418 .children.find { |y| y.element_name == "name" }
419 @jingle_fnames[transport['sid']] = fname.text
420
421 puts "RESPONSE9: #{msg.inspect}"
422 write_to_stream msg
423 end
424
425 iq '/iq/ns:open', ns: 'http://jabber.org/protocol/ibb' do |i, on|
426 puts "IQo: #{i.inspect}"
427
428 @partial_data[on[0]['sid']] = ''
429 write_to_stream i.reply
430 end
431
432 iq '/iq/ns:data', ns: 'http://jabber.org/protocol/ibb' do |i, dn|
433 @partial_data[dn[0]['sid']] += Base64.decode64(dn[0].text)
434 write_to_stream i.reply
435 end
436
437 iq '/iq/ns:close', ns: 'http://jabber.org/protocol/ibb' do |i, cn|
438 puts "IQc: #{i.inspect}"
439 write_to_stream i.reply
440
441 EMPromise.all([
442 validate_num(i.to.node),
443 fetch_catapult_cred_for(i.from)
444 ]).then { |(num_dest, creds)|
445 # Gajim bug: <close/> has Jingle (not transport) sid; fix later
446 if not @jingle_fnames.key? cn[0]['sid']
447 puts 'ERROR: Not found in filename map: ' + cn[0]['sid']
448
449 next EMPromise.reject(:done)
450 # TODO: in case only Gajim's <data/> bug fixed, add map:
451 #cn[0]['sid'] = @jingle_tsids[cn[0]['sid']]
452 end
453
454 # upload cached data to server (before success reply)
455 media_name =
456 "#{Time.now.utc.iso8601}_#{SecureRandom.uuid}"\
457 "_#{@jingle_fnames[cn[0]['sid']]}"
458 puts 'name to save: ' + media_name
459
460 path = "/v1/users/#{creds.first}/media/#{media_name}"
461
462 EMPromise.all([
463 call_catapult(
464 *creds[1..2],
465 :put,
466 path,
467 @partial_data[cn[0]['sid']]
468 ).catch {
469 EMPromise.reject([
470 :cancel, 'internal-server-error'
471 ])
472 },
473 to_catapult(
474 i,
475 "https://api.catapult.inetwork.com/" +
476 path,
477 num_dest,
478 *creds
479 )
480 ])
481 }.then {
482 @partial_data[cn[0]['sid']] = ''
483
484 # received the complete file so now close the stream
485 msg = Blather::Stanza::Iq.new :set
486 msg.to = i.from
487 msg.from = i.to
488
489 j = Nokogiri::XML::Node.new 'jingle', msg.document
490 j['xmlns'] = 'urn:xmpp:jingle:1'
491 j['action'] = 'session-terminate'
492 j['sid'] = @jingle_sids[cn[0]['sid']]
493 msg.add_child(j)
494
495 r = Nokogiri::XML::Node.new 'reason', msg.document
496 s = Nokogiri::XML::Node.new 'success', msg.document
497 r.add_child(s)
498 j.add_child(r)
499
500 puts 'RESPONSE1: ' + msg.inspect
501 write_to_stream msg
502 }.catch { |e|
503 if e.is_a?(Array) && e.length == 2
504 write_to_stream error_msg(i.reply, nil, *e)
505 elsif e != :done
506 EMPromise.reject(e)
507 end
508 }
509 end
510
511 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#items' do |i|
512 write_to_stream i.reply
513 end
514
515 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#info' do |i|
516 # respond to capabilities request for an sgx-catapult number JID
517 if i.to.node
518 # TODO: confirm the node URL is expected using below
519 #puts "XR[node]: #{xpath_result[0]['node']}"
520
521 msg = i.reply
522 msg.identities = user_cap_identities
523 msg.features = user_cap_features
524
525 puts 'RESPONSE7: ' + msg.inspect
526 write_to_stream msg
527 next
528 end
529
530 # respond to capabilities request for sgx-catapult itself
531 msg = i.reply
532 msg.identities = [{
533 name: 'Soprani.ca Gateway to XMPP - Catapult',
534 type: 'sms', category: 'gateway'
535 }]
536 msg.features = [
537 "jabber:iq:register",
538 "jabber:iq:gateway",
539 "jabber:iq:private",
540 "http://jabber.org/protocol/disco#info",
541 "http://jabber.org/protocol/commands",
542 "http://jabber.org/protocol/muc"
543 ]
544 write_to_stream msg
545 end
546
547 def self.check_then_register(i, *creds)
548 jid_key = "catapult_jid-#{creds.last}"
549 bare_jid = i.from.stripped
550 cred_key = "catapult_cred-#{bare_jid}"
551
552 REDIS.get(jid_key).then { |existing_jid|
553 if existing_jid && existing_jid != bare_jid
554 # TODO: add/log text: credentials exist already
555 EMPromise.reject([:cancel, 'conflict'])
556 end
557 }.then {
558 REDIS.lrange(cred_key, 0, 3)
559 }.then { |existing_creds|
560 # TODO: add/log text: credentials exist already
561 if existing_creds.length == 4 && creds != existing_creds
562 EMPromise.reject([:cancel, 'conflict'])
563 elsif existing_creds.length < 4
564 REDIS.rpush(cred_key, *creds).then { |length|
565 if length != 4
566 EMPromise.reject([
567 :cancel,
568 'internal-server-error'
569 ])
570 end
571 }
572 end
573 }.then {
574 # not necessary if existing_jid non-nil, easier this way
575 REDIS.set(jid_key, bare_jid)
576 }.then { |result|
577 if result != 'OK'
578 # TODO: add txt re push failure
579 EMPromise.reject(
580 [:cancel, 'internal-server-error']
581 )
582 end
583 }.then {
584 write_to_stream i.reply
585 }
586 end
587
588 def self.creds_from_registration_query(qn)
589 xn = qn.children.find { |v| v.element_name == "x" }
590
591 if xn
592 xn.children.each_with_object({}) do |field, h|
593 next if field.element_name != "field"
594 val = field.children.find { |v|
595 v.element_name == "value"
596 }
597
598 case field['var']
599 when 'nick'
600 h[:user_id] = val.text
601 when 'username'
602 h[:api_token] = val.text
603 when 'password'
604 h[:api_secret] = val.text
605 when 'phone'
606 h[:phone_num] = val.text
607 else
608 # TODO: error
609 puts "?: #{field['var']}"
610 end
611 end
612 else
613 qn.children.each_with_object({}) do |field, h|
614 case field.element_name
615 when "nick"
616 h[:user_id] = field.text
617 when "username"
618 h[:api_token] = field.text
619 when "password"
620 h[:api_secret] = field.text
621 when "phone"
622 h[:phone_num] = field.text
623 end
624 end
625 end.values_at(:user_id, :api_token, :api_secret, :phone_num)
626 end
627
628 def self.process_registration(i, qn)
629 EMPromise.resolve(
630 qn.children.find { |v| v.element_name == "remove" }
631 ).then { |rn|
632 if rn
633 puts "received <remove/> - ignoring for now..."
634 EMPromise.reject(:done)
635 else
636 creds_from_registration_query(qn)
637 end
638 }.then { |user_id, api_token, api_secret, phone_num|
639 if phone_num[0] == '+'
640 [user_id, api_token, api_secret, phone_num]
641 else
642 # TODO: add text re number not (yet) supported
643 EMPromise.reject([:cancel, 'item-not-found'])
644 end
645 }.then { |user_id, api_token, api_secret, phone_num|
646 call_catapult(
647 api_token,
648 api_secret,
649 :get,
650 "v1/users/#{user_id}/phoneNumbers/#{phone_num}"
651 ).then { |response|
652 params = JSON.parse(response)
653 if params['numberState'] == 'enabled'
654 check_then_register(
655 i,
656 user_id,
657 api_token,
658 api_secret,
659 phone_num
660 )
661 else
662 # TODO: add text re number disabled
663 EMPromise.reject([:modify, 'not-acceptable'])
664 end
665 }
666 }.catch { |e|
667 EMPromise.reject(case e
668 when 401
669 # TODO: add text re bad credentials
670 [:auth, 'not-authorized']
671 when 404
672 # TODO: add text re number not found or disabled
673 [:cancel, 'item-not-found']
674 when Integer
675 [:modify, 'not-acceptable']
676 else
677 e
678 end)
679 }
680 end
681
682 def self.registration_form(orig, existing_number=nil)
683 msg = Nokogiri::XML::Node.new 'query', orig.document
684 msg['xmlns'] = 'jabber:iq:register'
685
686 if existing_number
687 msg.add_child(
688 Nokogiri::XML::Node.new(
689 'registered', msg.document
690 )
691 )
692 end
693
694 n1 = Nokogiri::XML::Node.new(
695 'instructions', msg.document
696 )
697 n1.content = "Enter the information from your Account "\
698 "page as well as the Phone Number\nin your "\
699 "account you want to use (ie. '+12345678901')"\
700 ".\nUser Id is nick, API Token is username, "\
701 "API Secret is password, Phone Number is phone"\
702 ".\n\nThe source code for this gateway is at "\
703 "https://gitlab.com/ossguy/sgx-catapult ."\
704 "\nCopyright (C) 2017 Denver Gingerich and "\
705 "others, licensed under AGPLv3+."
706 n2 = Nokogiri::XML::Node.new 'nick', msg.document
707 n3 = Nokogiri::XML::Node.new 'username', msg.document
708 n4 = Nokogiri::XML::Node.new 'password', msg.document
709 n5 = Nokogiri::XML::Node.new 'phone', msg.document
710 n5.content = existing_number.to_s
711 msg.add_child(n1)
712 msg.add_child(n2)
713 msg.add_child(n3)
714 msg.add_child(n4)
715 msg.add_child(n5)
716
717 x = Blather::Stanza::X.new :form, [
718 {
719 required: true, type: :"text-single",
720 label: 'User Id', var: 'nick'
721 },
722 {
723 required: true, type: :"text-single",
724 label: 'API Token', var: 'username'
725 },
726 {
727 required: true, type: :"text-private",
728 label: 'API Secret', var: 'password'
729 },
730 {
731 required: true, type: :"text-single",
732 label: 'Phone Number', var: 'phone',
733 value: existing_number.to_s
734 }
735 ]
736 x.title = 'Register for '\
737 'Soprani.ca Gateway to XMPP - Catapult'
738 x.instructions = "Enter the details from your Account "\
739 "page as well as the Phone Number\nin your "\
740 "account you want to use (ie. '+12345678901')"\
741 ".\n\nThe source code for this gateway is at "\
742 "https://gitlab.com/ossguy/sgx-catapult ."\
743 "\nCopyright (C) 2017 Denver Gingerich and "\
744 "others, licensed under AGPLv3+."
745 msg.add_child(x)
746
747 orig.add_child(msg)
748
749 return orig
750 end
751
752 iq '/iq/ns:query', ns: 'jabber:iq:register' do |i, qn|
753 puts "IQ: #{i.inspect}"
754
755 case i.type
756 when :set
757 process_registration(i, qn)
758 when :get
759 bare_jid = i.from.stripped
760 cred_key = "catapult_cred-#{bare_jid}"
761 REDIS.lindex(cred_key, 3).then { |existing_number|
762 reply = registration_form(i.reply, existing_number)
763 puts "RESPONSE2: #{reply.inspect}"
764 write_to_stream reply
765 }
766 else
767 # Unknown IQ, ignore for now
768 EMPromise.reject(:done)
769 end.catch { |e|
770 if e.is_a?(Array) && e.length == 2
771 write_to_stream error_msg(i.reply, qn, *e)
772 elsif e != :done
773 EMPromise.reject(e)
774 end
775 }.catch(&method(:panic))
776 end
777
778 subscription(:request?) do |s|
779 # TODO: are these the best to return? really need '!' here?
780 #write_to_stream s.approve!
781 #write_to_stream s.request!
782 end
783
784 iq :get? do |i|
785 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
786 end
787
788 iq :set? do |i|
789 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
790 end
791end
792
793[:INT, :TERM].each do |sig|
794 trap(sig) {
795 puts 'Shutting down gateway...'
796 SGXcatapult.shutdown
797 puts 'Gateway has terminated.'
798
799 EM.stop
800 }
801end
802
803class ReceiptMessage < Blather::Stanza
804 def self.new(to=nil)
805 node = super :message
806 node.to = to
807 node
808 end
809end
810
811class WebhookHandler < Goliath::API
812 def send_media(from, to, media_url)
813 # we assume media_url is of the form (always the case so far):
814 # https://api.catapult.inetwork.com/v1/users/[uid]/media/[file]
815
816 # the caller must guarantee that 'to' is a bare JID
817 proxy_url = ARGV[8] + to + '/' + media_url.split('/', 8)[7]
818
819 puts 'ORIG_URL: ' + media_url
820 puts 'PROX_URL: ' + proxy_url
821
822 # put URL in the body (so Conversations will still see it)...
823 msg = Blather::Stanza::Message.new(to, proxy_url)
824 msg.from = from
825
826 # ...but also provide URL in XEP-0066 (OOB) fashion
827 # TODO: confirm client supports OOB or don't send this
828 x = Nokogiri::XML::Node.new 'x', msg.document
829 x['xmlns'] = 'jabber:x:oob'
830
831 urln = Nokogiri::XML::Node.new 'url', msg.document
832 urlc = Nokogiri::XML::Text.new proxy_url, msg.document
833
834 urln.add_child(urlc)
835 x.add_child(urln)
836 msg.add_child(x)
837
838 SGXcatapult.write(msg)
839
840 rescue Exception => e
841 puts 'Shutting down gateway due to exception 012: ' + e.message
842 SGXcatapult.shutdown
843 puts 'Gateway has terminated.'
844 EM.stop
845 end
846
847 def response(env)
848 puts 'ENV: ' + env.to_s
849 body = Rack::Request.new(env).body.read
850 params = JSON.parse body
851
852 users_num = ''
853 others_num = ''
854 if params['direction'] == 'in'
855 users_num = params['to']
856 others_num = params['from']
857 elsif params['direction'] == 'out'
858 users_num = params['from']
859 others_num = params['to']
860 else
861 # TODO: exception or similar
862 puts "big problem: '" + params['direction'] + "'" + body
863 return [200, {}, "OK"]
864 end
865
866 puts 'BODY - messageId: ' + params['messageId'] +
867 ', eventType: ' + params['eventType'] +
868 ', time: ' + params['time'] +
869 ', direction: ' + params['direction'] +
870 ', state: ' + params['state'] +
871 ', deliveryState: ' + (params['deliveryState'] ?
872 params['deliveryState'] : 'NONE') +
873 ', deliveryCode: ' + (params['deliveryCode'] ?
874 params['deliveryCode'] : 'NONE') +
875 ', deliveryDesc: ' + (params['deliveryDescription'] ?
876 params['deliveryDescription'] : 'NONE') +
877 ', tag: ' + (params['tag'] ? params['tag'] : 'NONE') +
878 ', media: ' + (params['media'] ? params['media'].to_s :
879 'NONE')
880
881 if others_num[0] != '+'
882 # TODO: check that others_num actually a shortcode first
883 others_num +=
884 ';phone-context=ca-us.phone-context.soprani.ca'
885 end
886
887 jid_key = "catapult_jid-#{users_num}"
888 bare_jid = REDIS.get(jid_key).promise.sync
889
890 if !bare_jid
891 puts "jid_key (#{jid_key}) DNE; Catapult misconfigured?"
892
893 # TODO: likely not appropriate; give error to Catapult?
894 # TODO: add text re credentials not being registered
895 #write_to_stream error_msg(m.reply, m.body, :auth,
896 # 'registration-required')
897 return [200, {}, "OK"]
898 end
899
900 msg = ''
901 case params['direction']
902 when 'in'
903 text = ''
904 case params['eventType']
905 when 'sms'
906 text = params['text']
907 when 'mms'
908 has_media = false
909 params['media'].each do |media_url|
910 if not media_url.end_with?(
911 '.smil', '.txt', '.xml'
912 )
913
914 has_media = true
915 send_media(
916 others_num + '@' +
917 ARGV[0],
918 bare_jid, media_url
919 )
920 end
921 end
922
923 if params['text'].empty?
924 if not has_media
925 text = '[suspected group msg '\
926 'with no text (odd)]'
927 end
928 else
929 text = if has_media
930 # TODO: write/use a caption XEP
931 params['text']
932 else
933 '[suspected group msg '\
934 '(recipient list not '\
935 'available) with '\
936 'following text] ' +
937 params['text']
938 end
939 end
940
941 # ie. if text param non-empty or had no media
942 if not text.empty?
943 msg = Blather::Stanza::Message.new(
944 bare_jid, text)
945 msg.from = others_num + '@' + ARGV[0]
946 SGXcatapult.write(msg)
947 end
948
949 return [200, {}, "OK"]
950 else
951 text = "unknown type (#{params['eventType']})"\
952 " with text: " + params['text']
953
954 # TODO: log/notify of this properly
955 puts text
956 end
957
958 msg = Blather::Stanza::Message.new(bare_jid, text)
959 else # per prior switch, this is: params['direction'] == 'out'
960 tag_parts = params['tag'].split(/ /, 2)
961 id = WEBrick::HTTPUtils.unescape(tag_parts[0])
962 resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
963
964 case params['deliveryState']
965 when 'not-delivered'
966 # create a bare message like the one user sent
967 msg = Blather::Stanza::Message.new(
968 others_num + '@' + ARGV[0])
969 msg.from = bare_jid + '/' + resourcepart
970 msg['id'] = id
971
972 # create an error reply to the bare message
973 msg = Blather::StanzaError.new(
974 msg,
975 'recipient-unavailable',
976 :wait
977 ).to_node
978 when 'delivered'
979 msg = ReceiptMessage.new(bare_jid)
980
981 # TODO: put in member/instance variable
982 msg['id'] = SecureRandom.uuid
983
984 # TODO: send only when requested per XEP-0184
985 rcvd = Nokogiri::XML::Node.new(
986 'received',
987 msg.document
988 )
989 rcvd['xmlns'] = 'urn:xmpp:receipts'
990 rcvd['id'] = id
991 msg.add_child(rcvd)
992 when 'waiting'
993 # can't really do anything with it; nice to know
994 puts "message with id #{id} waiting"
995 return [200, {}, "OK"]
996 else
997 # TODO: notify somehow of unknown state receivd?
998 puts "message with id #{id} has "\
999 "other state #{params['deliveryState']}"
1000 return [200, {}, "OK"]
1001 end
1002
1003 puts "RESPONSE4: #{msg.inspect}"
1004 end
1005
1006 msg.from = others_num + '@' + ARGV[0]
1007 SGXcatapult.write(msg)
1008
1009 [200, {}, "OK"]
1010
1011 rescue Exception => e
1012 puts 'Shutting down gateway due to exception 013: ' + e.message
1013 SGXcatapult.shutdown
1014 puts 'Gateway has terminated.'
1015 EM.stop
1016 end
1017end
1018
1019EM.run do
1020 REDIS = EM::Hiredis.connect("redis://#{ARGV[4]}:#{ARGV[5]}/0")
1021
1022 SGXcatapult.run
1023
1024 # required when using Prosody otherwise disconnects on 6-hour inactivity
1025 EM.add_periodic_timer(3600) do
1026 msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1027 msg.from = ARGV[0]
1028 SGXcatapult.write(msg)
1029 end
1030
1031 server = Goliath::Server.new('0.0.0.0', ARGV[7].to_i)
1032 server.api = WebhookHandler.new
1033 server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1034 server.logger = Log4r::Logger.new('goliath')
1035 server.logger.add(Log4r::StdoutOutputter.new('console'))
1036 server.logger.level = Log4r::INFO
1037 server.start
1038end