1#!/usr/bin/env ruby
2#
3# Copyright (C) 2017-2020 Denver Gingerich <denver@ossguy.com>
4# Copyright (C) 2017 Stephen Paul Weber <singpolyma@singpolyma.net>
5#
6# This file is part of sgx-bwmsgsv2.
7#
8# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
9# the terms of the GNU Affero General Public License as published by the Free
10# Software Foundation, either version 3 of the License, or (at your option) any
11# later version.
12#
13# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
14# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
16# details.
17#
18# You should have received a copy of the GNU Affero General Public License along
19# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
20
21require 'blather/client/dsl'
22require 'em-hiredis'
23require 'em-http-request'
24require 'json'
25require 'securerandom'
26require 'time'
27require 'uri'
28require 'webrick'
29
30require 'goliath/api'
31require 'goliath/server'
32require 'log4r'
33
34require_relative 'em_promise'
35
36def panic(e)
37 puts "Shutting down gateway due to exception: #{e.message}"
38 puts e.backtrace
39 SGXbwmsgsv2.shutdown
40 puts 'Gateway has terminated.'
41 EM.stop
42end
43
44def extract_shortcode(dest)
45 num, context = dest.split(';', 2)
46 num if context && context == 'phone-context=ca-us.phone-context.soprani.ca'
47end
48
49def is_anonymous_tel?(dest)
50 num, context = dest.split(';', 2)
51 context && context == 'phone-context=anonymous.phone-context.soprani.ca'
52end
53
54class SGXClient < Blather::Client
55 def register_handler(type, *guards, &block)
56 super(type, *guards) { |*args| wrap_handler(*args, &block) }
57 end
58
59 def register_handler_before(type, *guards, &block)
60 check_handler(type, guards)
61 handler = lambda { |*args| wrap_handler(*args, &block) }
62
63 @handlers[type] ||= []
64 @handlers[type].unshift([guards, handler])
65 end
66
67protected
68
69 def wrap_handler(*args, &block)
70 v = block.call(*args)
71 v.catch(&method(:panic)) if v.is_a?(Promise)
72 true # Do not run other handlers unless throw :pass
73 rescue Exception => e
74 panic(e)
75 end
76end
77
78# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
79module CatapultSettingFlagBits
80 VOICEMAIL_TRANSCRIPTION_DISABLED = 0
81 MMS_ON_OOB_URL = 1
82end
83
84module SGXbwmsgsv2
85 extend Blather::DSL
86
87 @client = SGXClient.new
88 @gateway_features = [
89 "http://jabber.org/protocol/disco#info",
90 "http://jabber.org/protocol/address/",
91 "jabber:iq:register"
92 ]
93
94 def self.run
95 # TODO: read/save ARGV[7] creds to local variables
96 client.run
97 end
98
99 # so classes outside this module can write messages, too
100 def self.write(stanza)
101 client.write(stanza)
102 end
103
104 def self.before_handler(type, *guards, &block)
105 client.register_handler_before(type, *guards, &block)
106 end
107
108 def self.send_media(from, to, media_url, desc=nil, subject=nil, m=nil)
109 # we assume media_url is one of these (always the case so far):
110 # https://api.catapult.inetwork.com/v1/users/[uid]/media/[file]
111 # https://messaging.bandwidth.com/api/v2/users/[u]/media/[path]
112
113 usr = to
114 pth = ''
115 if media_url.start_with?(
116 'https://api.catapult.inetwork.com/v1/users/')
117
118 # TODO: MUST fix this TERRIBLE hack
119 # there must be a catapult_cred-<usr> key with V1 creds
120 usr = 'v1'
121
122 pth = media_url.split('/', 8)[7]
123
124 elsif media_url.start_with?(
125 'https://messaging.bandwidth.com/api/v2/users/')
126
127 pth = media_url.split('/', 9)[8]
128 else
129 puts "ERROR2: unrecognized media_url: '#{media_url}'"
130 return
131 end
132
133 # the caller must guarantee that 'to' is a bare JID
134 proxy_url = ARGV[6] + WEBrick::HTTPUtils.escape(usr) + '/' + pth
135
136 puts 'ORIG_URL: ' + media_url
137 puts 'PROX_URL: ' + proxy_url
138
139 # put URL in the body (so Conversations will still see it)...
140 msg = Blather::Stanza::Message.new(to, proxy_url)
141 if m
142 msg = m.copy
143 msg.body = proxy_url
144 end
145 msg.from = from
146 msg.subject = subject if subject
147
148 # ...but also provide URL in XEP-0066 (OOB) fashion
149 # TODO: confirm client supports OOB or don't send this
150 x = Nokogiri::XML::Node.new 'x', msg.document
151 x['xmlns'] = 'jabber:x:oob'
152
153 urln = Nokogiri::XML::Node.new 'url', msg.document
154 urlc = Nokogiri::XML::Text.new proxy_url, msg.document
155 urln.add_child(urlc)
156 x.add_child(urln)
157
158 if desc
159 descn = Nokogiri::XML::Node.new('desc', msg.document)
160 descc = Nokogiri::XML::Text.new(desc, msg.document)
161 descn.add_child(descc)
162 x.add_child(descn)
163 end
164
165 msg.add_child(x)
166
167 write(msg)
168 rescue Exception => e
169 panic(e)
170 end
171
172 def self.error_msg(orig, query_node, type, name, text=nil)
173 orig.type = :error
174
175 error = Nokogiri::XML::Node.new 'error', orig.document
176 error['type'] = type
177 orig.add_child(error)
178
179 suberr = Nokogiri::XML::Node.new name, orig.document
180 suberr['xmlns'] = 'urn:ietf:params:xml:ns:xmpp-stanzas'
181 error.add_child(suberr)
182
183 orig.add_child(query_node) if query_node
184
185 # TODO: add some explanatory xml:lang='en' text (see text param)
186 puts "RESPONSE3: #{orig.inspect}"
187 return orig
188 end
189
190 # workqueue_count MUST be 0 or else Blather uses threads!
191 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, workqueue_count: 0
192
193 def self.pass_on_message(m, users_num, jid)
194 # setup delivery receipt; similar to a reply
195 rcpt = ReceiptMessage.new(m.from.stripped)
196 rcpt.from = m.to
197
198 # pass original message (before sending receipt)
199 m.to = jid
200 m.from = "#{users_num}@#{ARGV[0]}"
201
202 puts 'XRESPONSE0: ' + m.inspect
203 write_to_stream m
204
205 # send a delivery receipt back to the sender
206 # TODO: send only when requested per XEP-0184
207 # TODO: pass receipts from target if supported
208
209 # TODO: put in member/instance variable
210 rcpt['id'] = SecureRandom.uuid
211 rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
212 rcvd['xmlns'] = 'urn:xmpp:receipts'
213 rcvd['id'] = m.id
214 rcpt.add_child(rcvd)
215
216 puts 'XRESPONSE1: ' + rcpt.inspect
217 write_to_stream rcpt
218 end
219
220 def self.call_catapult(
221 token, secret, m, pth, body=nil,
222 head={}, code=[200], respond_with=:body
223 )
224 # pth looks like one of:
225 # "api/v2/users/#{user_id}/[endpoint_name]"
226 # "v1/users/#{user_id}/[endpoint_name]"
227
228 url_prefix = ''
229
230 # TODO: need to make a separate thing for voice.bw.c eventually
231 if pth.start_with? 'api/v2/users'
232 url_prefix = 'https://messaging.bandwidth.com/'
233 elsif pth.start_with? 'v1/users'
234 # begin hack for running V2 messages along with V1 voice
235 url_prefix = 'https://api.catapult.inetwork.com/'
236 # TODO: set token and secret to vals provided at startup
237 # TODO: replace pth's user_id with user_id from ARGV[7]
238 # -> pth = "v1/users/#{new_id}/" + pth.split('/', 4)[3]
239 else
240 # TODO: error
241 end
242
243 EM::HttpRequest.new(
244 url_prefix + pth
245 ).public_send(
246 m,
247 head: {
248 'Authorization' => [token, secret]
249 }.merge(head),
250 body: body
251 ).then { |http|
252 puts "API response to send: #{http.response} with code"\
253 " response.code #{http.response_header.status}"
254
255 if code.include?(http.response_header.status)
256 case respond_with
257 when :body
258 http.response
259 when :headers
260 http.response_header
261 else
262 http
263 end
264 else
265 EMPromise.reject(http.response_header.status)
266 end
267 }
268 end
269
270 def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
271 usern)
272
273 # TODO: fix indentation
274
275 un = s.at("oob|x > oob|url", oob: "jabber:x:oob")
276 if not un
277 puts "MMSOOB: no url node found so process as normal"
278 return to_catapult(s, nil, num_dest, user_id, token,
279 secret, usern)
280 end
281 puts "MMSOOB: found a url node - checking if to make MMS..."
282
283 # TODO: check size of file at un.text and shrink if need
284
285 body = s.respond_to?(:body) ? s.body : ''
286 # some clients send URI in both body & <url/> so delete
287 s.body = body.sub(/\s*#{Regexp::escape(un.text)}\s*$/, '')
288
289 puts "MMSOOB: url text is '#{un.text}'"
290 puts "MMSOOB: the body is '#{body.to_s.strip}'"
291
292 puts "MMSOOB: sending MMS since found OOB & user asked"
293 to_catapult(s, un.text, num_dest, user_id, token,
294 secret, usern)
295 end
296
297 def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
298 body = s.respond_to?(:body) ? s.body : ''
299 if murl.to_s.empty? && body.to_s.strip.empty?
300 return EMPromise.reject(
301 [:modify, 'policy-violation']
302 )
303 end
304
305 extra = {}
306 if murl
307 extra = { media: murl }
308 end
309
310 call_catapult(
311 token,
312 secret,
313 :post,
314 "api/v2/users/#{user_id}/messages",
315 JSON.dump(extra.merge(
316 from: usern,
317 to: num_dest,
318 text: body,
319 applicationId: ARGV[4],
320 tag:
321 # callbacks need id and resourcepart
322 WEBrick::HTTPUtils.escape(s.id.to_s) +
323 ' ' +
324 WEBrick::HTTPUtils.escape(
325 s.from.resource.to_s
326 )
327 )),
328 {'Content-Type' => 'application/json'},
329 [201]
330 ).catch {
331 # TODO: add text; mention code number
332 EMPromise.reject(
333 [:cancel, 'internal-server-error']
334 )
335 }
336
337 t = Time.now
338 tai_timestamp = `./tai`.strip
339 tai_yyyymmdd = Time.at(tai_timestamp.to_i).strftime('%Y%m%d')
340 puts "SMU %d.%09d, %s: msg for %s sent on %s - incrementing\n" %
341 [t.to_i, t.nsec, tai_timestamp, usern, tai_yyyymmdd]
342
343 REDIS.incr('usage_messages-' + tai_yyyymmdd + '-' +
344 usern).then { |total|
345
346 t = Time.now
347 puts "SMU %d.%09d: total msgs for %s-%s now at %s\n" %
348 [t.to_i, t.nsec, tai_yyyymmdd, usern, total]
349 }
350 end
351
352 def self.validate_num(m)
353 # if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
354 if m.to == ARGV[0]
355 an = m.children.find { |v| v.element_name == "addresses"
356 }
357 if not an
358 return EMPromise.reject([:cancel,
359 'item-not-found'])
360 end
361 puts "ADRXEP: found an addresses node - iterate addrs.."
362
363 nums = []
364 an.children.each do |e|
365 num = ''
366 type = ''
367 e.attributes.each do |c|
368 if c[0] == 'type'
369 if c[1] != 'to'
370 # TODO: error
371 end
372 type = c[1].to_s
373 elsif c[0] == 'uri'
374 if !c[1].to_s.start_with? 'sms:'
375 # TODO: error
376 end
377 num = c[1].to_s[4..-1]
378 # TODO: confirm num validates
379 else
380 # TODO: error - unexpected name
381 end
382 end
383 if num.empty? or type.empty?
384 # TODO: error
385 end
386 nums << num
387 end
388 return nums
389 end
390
391 # if not sent to SGX domain, then assume destination is in 'to'
392 EMPromise.resolve(m.to.node.to_s).then { |num_dest|
393 if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
394 next num_dest if num_dest[0] == '+'
395 shortcode = extract_shortcode(num_dest)
396 next shortcode if shortcode
397 end
398
399 if is_anonymous_tel?(num_dest)
400 EMPromise.reject([:cancel, 'gone'])
401 else
402 # TODO: text re num not (yet) supportd/implmentd
403 EMPromise.reject([:cancel, 'item-not-found'])
404 end
405 }
406 end
407
408 def self.fetch_catapult_cred_for(jid)
409 cred_key = "catapult_cred-#{jid.stripped}"
410 REDIS.lrange(cred_key, 0, 3).then { |creds|
411 if creds.length < 4
412 # TODO: add text re credentials not registered
413 EMPromise.reject(
414 [:auth, 'registration-required']
415 )
416 else
417 creds
418 end
419 }
420 end
421
422 message :error? do |m|
423 # TODO: report it somewhere/somehow - eat for now so no err loop
424 puts "EATERROR1: #{m.inspect}"
425 end
426
427 message :body do |m|
428 EMPromise.all([
429 validate_num(m),
430 fetch_catapult_cred_for(m.from)
431 ]).then { |(num_dest, creds)|
432 jid_key = "catapult_jid-#{num_dest}"
433 REDIS.get(jid_key).then { |jid|
434 [jid, num_dest] + creds
435 }
436 }.then { |(jid, num_dest, *creds)|
437 if jid
438 cred_key = "catapult_cred-#{jid}"
439 REDIS.lrange(cred_key, 0, 0).then { |other_user|
440 [jid, num_dest] + creds + other_user
441 }
442 else
443 [jid, num_dest] + creds + [nil]
444 end
445 }.then { |(jid, num_dest, *creds, other_user)|
446 # if destination user is in the system pass on directly
447 if other_user and not other_user.start_with? 'u-'
448 pass_on_message(m, creds.last, jid)
449 else
450 to_catapult_possible_oob(m, num_dest, *creds)
451 end
452 }.catch { |e|
453 if e.is_a?(Array) && e.length == 2
454 write_to_stream error_msg(m.reply, m.body, *e)
455 else
456 EMPromise.reject(e)
457 end
458 }
459 end
460
461 def self.user_cap_identities
462 [{category: 'client', type: 'sms'}]
463 end
464
465 # TODO: must re-add stuff so can do ad-hoc commands
466 def self.user_cap_features
467 [
468 "urn:xmpp:receipts",
469 ]
470 end
471
472 def self.add_gateway_feature(feature)
473 @gateway_features << feature
474 @gateway_features.uniq!
475 end
476
477 subscription :request? do |p|
478 puts "PRESENCE1: #{p.inspect}"
479
480 # subscriptions are allowed from anyone - send reply immediately
481 msg = Blather::Stanza::Presence.new
482 msg.to = p.from
483 msg.from = p.to
484 msg.type = :subscribed
485
486 puts 'RESPONSE5a: ' + msg.inspect
487 write_to_stream msg
488
489 # send a <presence> immediately; not automatically probed for it
490 # TODO: refactor so no "presence :probe? do |p|" duplicate below
491 caps = Blather::Stanza::Capabilities.new
492 # TODO: user a better node URI (?)
493 caps.node = 'http://catapult.sgx.soprani.ca/'
494 caps.identities = user_cap_identities
495 caps.features = user_cap_features
496
497 msg = caps.c
498 msg.to = p.from
499 msg.from = p.to.to_s + '/sgx'
500
501 puts 'RESPONSE5b: ' + msg.inspect
502 write_to_stream msg
503
504 # need to subscribe back so Conversations displays images inline
505 msg = Blather::Stanza::Presence.new
506 msg.to = p.from.to_s.split('/', 2)[0]
507 msg.from = p.to.to_s.split('/', 2)[0]
508 msg.type = :subscribe
509
510 puts 'RESPONSE5c: ' + msg.inspect
511 write_to_stream msg
512 end
513
514 presence :probe? do |p|
515 puts 'PRESENCE2: ' + p.inspect
516
517 caps = Blather::Stanza::Capabilities.new
518 # TODO: user a better node URI (?)
519 caps.node = 'http://catapult.sgx.soprani.ca/'
520 caps.identities = user_cap_identities
521 caps.features = user_cap_features
522
523 msg = caps.c
524 msg.to = p.from
525 msg.from = p.to.to_s + '/sgx'
526
527 puts 'RESPONSE6: ' + msg.inspect
528 write_to_stream msg
529 end
530
531 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#info' do |i|
532 # respond to capabilities request for an sgx-bwmsgsv2 number JID
533 if i.to.node
534 # TODO: confirm the node URL is expected using below
535 #puts "XR[node]: #{xpath_result[0]['node']}"
536
537 msg = i.reply
538 msg.node = i.node
539 msg.identities = user_cap_identities
540 msg.features = user_cap_features
541
542 puts 'RESPONSE7: ' + msg.inspect
543 write_to_stream msg
544 next
545 end
546
547 # respond to capabilities request for sgx-bwmsgsv2 itself
548 msg = i.reply
549 msg.node = i.node
550 msg.identities = [{
551 name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
552 type: 'sms', category: 'gateway'
553 }]
554 msg.features = @gateway_features
555 write_to_stream msg
556 end
557
558 def self.check_then_register(i, *creds)
559 jid_key = "catapult_jid-#{creds.last}"
560 bare_jid = i.from.stripped
561 cred_key = "catapult_cred-#{bare_jid}"
562
563 REDIS.get(jid_key).then { |existing_jid|
564 if existing_jid && existing_jid != bare_jid
565 # TODO: add/log text: credentials exist already
566 EMPromise.reject([:cancel, 'conflict'])
567 end
568 }.then {
569 REDIS.lrange(cred_key, 0, 3)
570 }.then { |existing_creds|
571 # TODO: add/log text: credentials exist already
572 if existing_creds.length == 4 && creds != existing_creds
573 EMPromise.reject([:cancel, 'conflict'])
574 elsif existing_creds.length < 4
575 REDIS.rpush(cred_key, *creds).then { |length|
576 if length != 4
577 EMPromise.reject([
578 :cancel,
579 'internal-server-error'
580 ])
581 end
582 }
583 end
584 }.then {
585 # not necessary if existing_jid non-nil, easier this way
586 REDIS.set(jid_key, bare_jid)
587 }.then { |result|
588 if result != 'OK'
589 # TODO: add txt re push failure
590 EMPromise.reject(
591 [:cancel, 'internal-server-error']
592 )
593 end
594 }.then {
595 write_to_stream i.reply
596 }
597 end
598
599 def self.creds_from_registration_query(qn)
600 xn = qn.children.find { |v| v.element_name == "x" }
601
602 if xn
603 xn.children.each_with_object({}) do |field, h|
604 next if field.element_name != "field"
605 val = field.children.find { |v|
606 v.element_name == "value"
607 }
608
609 case field['var']
610 when 'nick'
611 h[:user_id] = val.text
612 when 'username'
613 h[:api_token] = val.text
614 when 'password'
615 h[:api_secret] = val.text
616 when 'phone'
617 h[:phone_num] = val.text
618 else
619 # TODO: error
620 puts "?: #{field['var']}"
621 end
622 end
623 else
624 qn.children.each_with_object({}) do |field, h|
625 case field.element_name
626 when "nick"
627 h[:user_id] = field.text
628 when "username"
629 h[:api_token] = field.text
630 when "password"
631 h[:api_secret] = field.text
632 when "phone"
633 h[:phone_num] = field.text
634 end
635 end
636 end.values_at(:user_id, :api_token, :api_secret, :phone_num)
637 end
638
639 def self.process_registration(i, qn)
640 EMPromise.resolve(
641 qn.children.find { |v| v.element_name == "remove" }
642 ).then { |rn|
643 if rn
644 puts "received <remove/> - ignoring for now..."
645 EMPromise.reject(:done)
646 else
647 creds_from_registration_query(qn)
648 end
649 }.then { |user_id, api_token, api_secret, phone_num|
650 if phone_num[0] == '+'
651 [user_id, api_token, api_secret, phone_num]
652 else
653 # TODO: add text re number not (yet) supported
654 EMPromise.reject([:cancel, 'item-not-found'])
655 end
656 }.then { |user_id, api_token, api_secret, phone_num|
657 # TODO: find way to verify #{phone_num}, too
658 call_catapult(
659 api_token,
660 api_secret,
661 :get,
662 "api/v2/users/#{user_id}/media"
663 ).then { |response|
664 params = JSON.parse(response)
665 # TODO: confirm params is array - could be empty
666
667 puts "register got str #{response.to_s[0..999]}"
668
669 check_then_register(
670 i,
671 user_id,
672 api_token,
673 api_secret,
674 phone_num
675 )
676 }
677 }.catch { |e|
678 EMPromise.reject(case e
679 when 401
680 # TODO: add text re bad credentials
681 [:auth, 'not-authorized']
682 when 404
683 # TODO: add text re number not found or disabled
684 [:cancel, 'item-not-found']
685 when Integer
686 [:modify, 'not-acceptable']
687 else
688 e
689 end)
690 }
691 end
692
693 def self.registration_form(orig, existing_number=nil)
694 msg = Nokogiri::XML::Node.new 'query', orig.document
695 msg['xmlns'] = 'jabber:iq:register'
696
697 if existing_number
698 msg.add_child(
699 Nokogiri::XML::Node.new(
700 'registered', msg.document
701 )
702 )
703 end
704
705 # TODO: update "User Id" x2 below (to "accountId"?), and others?
706 n1 = Nokogiri::XML::Node.new(
707 'instructions', msg.document
708 )
709 n1.content = "Enter the information from your Account "\
710 "page as well as the Phone Number\nin your "\
711 "account you want to use (ie. '+12345678901')"\
712 ".\nUser Id is nick, API Token is username, "\
713 "API Secret is password, Phone Number is phone"\
714 ".\n\nThe source code for this gateway is at "\
715 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
716 "\nCopyright (C) 2017-2020 Denver Gingerich "\
717 "and others, licensed under AGPLv3+."
718 n2 = Nokogiri::XML::Node.new 'nick', msg.document
719 n3 = Nokogiri::XML::Node.new 'username', msg.document
720 n4 = Nokogiri::XML::Node.new 'password', msg.document
721 n5 = Nokogiri::XML::Node.new 'phone', msg.document
722 n5.content = existing_number.to_s
723 msg.add_child(n1)
724 msg.add_child(n2)
725 msg.add_child(n3)
726 msg.add_child(n4)
727 msg.add_child(n5)
728
729 x = Blather::Stanza::X.new :form, [
730 {
731 required: true, type: :"text-single",
732 label: 'User Id', var: 'nick'
733 },
734 {
735 required: true, type: :"text-single",
736 label: 'API Token', var: 'username'
737 },
738 {
739 required: true, type: :"text-private",
740 label: 'API Secret', var: 'password'
741 },
742 {
743 required: true, type: :"text-single",
744 label: 'Phone Number', var: 'phone',
745 value: existing_number.to_s
746 }
747 ]
748 x.title = 'Register for '\
749 'Soprani.ca Gateway to XMPP - Bandwidth API V2'
750 x.instructions = "Enter the details from your Account "\
751 "page as well as the Phone Number\nin your "\
752 "account you want to use (ie. '+12345678901')"\
753 ".\n\nThe source code for this gateway is at "\
754 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
755 "\nCopyright (C) 2017-2020 Denver Gingerich "\
756 "and others, licensed under AGPLv3+."
757 msg.add_child(x)
758
759 orig.add_child(msg)
760
761 return orig
762 end
763
764 iq '/iq/ns:query', ns: 'jabber:iq:register' do |i, qn|
765 puts "IQ: #{i.inspect}"
766
767 case i.type
768 when :set
769 process_registration(i, qn)
770 when :get
771 bare_jid = i.from.stripped
772 cred_key = "catapult_cred-#{bare_jid}"
773 REDIS.lindex(cred_key, 3).then { |existing_number|
774 reply = registration_form(i.reply, existing_number)
775 puts "RESPONSE2: #{reply.inspect}"
776 write_to_stream reply
777 }
778 else
779 # Unknown IQ, ignore for now
780 EMPromise.reject(:done)
781 end.catch { |e|
782 if e.is_a?(Array) && e.length == 2
783 write_to_stream error_msg(i.reply, qn, *e)
784 elsif e != :done
785 EMPromise.reject(e)
786 end
787 }.catch(&method(:panic))
788 end
789
790 iq :get? do |i|
791 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
792 end
793
794 iq :set? do |i|
795 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
796 end
797end
798
799class ReceiptMessage < Blather::Stanza
800 def self.new(to=nil)
801 node = super :message
802 node.to = to
803 node
804 end
805end
806
807class WebhookHandler < Goliath::API
808 use Goliath::Rack::Params
809
810 def response(env)
811 # TODO: add timestamp grab here, and MUST include ./tai version
812
813 puts 'ENV: ' + env.reject{ |k| k == 'params' }.to_s
814
815 # TODO: process each message in list, not just first one
816 jparams = params['_json'][0]['message']
817
818 type = params['_json'][0]['type']
819
820 users_num = ''
821 others_num = ''
822 if jparams['direction'] == 'in'
823 users_num = jparams['owner']
824 others_num = jparams['from']
825 elsif jparams['direction'] == 'out'
826 users_num = jparams['from']
827 others_num = jparams['owner']
828 else
829 # TODO: exception or similar
830 puts "big prob: '" + jparams['direction'] + "'" + body
831 return [200, {}, "OK"]
832 end
833
834 puts 'BODY - messageId: ' + jparams['id'] +
835 ', eventType: ' + type +
836 ', time: ' + jparams['time'] +
837 ', direction: ' + jparams['direction'] +
838 #', state: ' + jparams['state'] +
839 ', deliveryState: ' + (jparams['deliveryState'] ?
840 jparams['deliveryState'] : 'NONE') +
841 ', deliveryCode: ' + (jparams['deliveryCode'] ?
842 jparams['deliveryCode'] : 'NONE') +
843 ', deliveryDesc: ' + (jparams['deliveryDescription'] ?
844 jparams['deliveryDescription'] : 'NONE') +
845 ', tag: ' + (jparams['tag'] ? jparams['tag'] : 'NONE') +
846 ', media: ' + (jparams['media'] ?
847 jparams['media'].to_s : 'NONE')
848
849 if others_num[0] != '+'
850 # TODO: check that others_num actually a shortcode first
851 others_num +=
852 ';phone-context=ca-us.phone-context.soprani.ca'
853 end
854
855 jid_key = "catapult_jid-#{users_num}"
856 bare_jid = REDIS.get(jid_key).promise.sync
857
858 if !bare_jid
859 puts "jid_key (#{jid_key}) DNE; BW API misconfigured?"
860
861 # TODO: likely not appropriate; give error to BW API?
862 # TODO: add text re credentials not being registered
863 #write_to_stream error_msg(m.reply, m.body, :auth,
864 # 'registration-required')
865 return [200, {}, "OK"]
866 end
867
868 msg = nil
869 case jparams['direction']
870 when 'in'
871 text = ''
872 case type
873 when 'sms'
874 text = jparams['text']
875 when 'mms'
876 has_media = false
877
878 if jparams['text'].empty?
879 if not has_media
880 text = '[suspected group msg '\
881 'with no text (odd)]'
882 end
883 else
884 text = if has_media
885 # TODO: write/use a caption XEP
886 jparams['text']
887 else
888 '[suspected group msg '\
889 '(recipient list not '\
890 'available) with '\
891 'following text] ' +
892 jparams['text']
893 end
894 end
895
896 # ie. if text param non-empty or had no media
897 if not text.empty?
898 msg = Blather::Stanza::Message.new(
899 bare_jid, text)
900 msg.from = others_num + '@' + ARGV[0]
901 SGXbwmsgsv2.write(msg)
902 end
903
904 return [200, {}, "OK"]
905 when 'message-received'
906 # TODO: handle group chat, and fix above
907 text = jparams['text']
908
909 if jparams['to'].length > 1
910 msg = Blather::Stanza::Message.new(
911 'cheogram.com', text) # TODO
912
913 addrs = Nokogiri::XML::Node.new(
914 'addresses', msg.document)
915 addrs['xmlns'] = 'http://jabber.org/' +
916 'protocol/address'
917
918 addr1 = Nokogiri::XML::Node.new(
919 'address', msg.document)
920 addr1['type'] = 'to'
921 addr1['jid'] = bare_jid
922 addrs.add_child(addr1)
923
924 jparams['to'].each do |receiver|
925 if receiver == users_num
926 # already there in addr1
927 next
928 end
929
930 addrn = Nokogiri::XML::Node.new(
931 'address', msg.document)
932 addrn['type'] = 'to'
933 addrn['uri'] = "sms:#{receiver}"
934 addrn['delivered'] = 'true'
935 addrs.add_child(addrn)
936 end
937
938 msg.add_child(addrs)
939
940 # TODO: delete
941 puts "RESPONSE9: #{msg.inspect}"
942 end
943
944 jparams['media'].each do |media_url|
945 if not media_url.end_with?(
946 '.smil', '.txt', '.xml'
947 )
948
949 has_media = true
950 SGXbwmsgsv2.send_media(
951 others_num + '@' +
952 ARGV[0],
953 bare_jid, media_url,
954 nil, nil, msg
955 )
956 end
957 end unless not jparams['media']
958 else
959 text = "unknown type (#{type})"\
960 " with text: " + jparams['text']
961
962 # TODO: log/notify of this properly
963 puts text
964 end
965
966 if not msg
967 msg = Blather::Stanza::Message.new(bare_jid,
968 text)
969 end
970 else # per prior switch, this is: jparams['direction'] == 'out'
971 tag_parts = jparams['tag'].split(/ /, 2)
972 id = WEBrick::HTTPUtils.unescape(tag_parts[0])
973 resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
974
975 # TODO: remove this hack
976 if jparams['to'].length > 1
977 puts "WARN! group no rcpt: #{users_num}"
978 return [200, {}, "OK"]
979 end
980
981 case type
982 when 'message-failed'
983 # create a bare message like the one user sent
984 msg = Blather::Stanza::Message.new(
985 others_num + '@' + ARGV[0])
986 msg.from = bare_jid + '/' + resourcepart
987 msg['id'] = id
988
989 # TODO: add 'errorCode' and/or 'description' val
990 # create an error reply to the bare message
991 msg = Blather::StanzaError.new(
992 msg,
993 'recipient-unavailable',
994 :wait
995 ).to_node
996
997 # TODO: make prettier: this should be done above
998 others_num = params['_json'][0]['to']
999 when 'message-delivered'
1000
1001 msg = ReceiptMessage.new(bare_jid)
1002
1003 # TODO: put in member/instance variable
1004 msg['id'] = SecureRandom.uuid
1005
1006 # TODO: send only when requested per XEP-0184
1007 rcvd = Nokogiri::XML::Node.new(
1008 'received',
1009 msg.document
1010 )
1011 rcvd['xmlns'] = 'urn:xmpp:receipts'
1012 rcvd['id'] = id
1013 msg.add_child(rcvd)
1014
1015 # TODO: make prettier: this should be done above
1016 others_num = params['_json'][0]['to']
1017 else
1018 # TODO: notify somehow of unknown state receivd?
1019 puts "message with id #{id} has "\
1020 "other type #{type}"
1021 return [200, {}, "OK"]
1022 end
1023
1024 puts "RESPONSE4: #{msg.inspect}"
1025 end
1026
1027 msg.from = others_num + '@' + ARGV[0]
1028 SGXbwmsgsv2.write(msg)
1029
1030 [200, {}, "OK"]
1031
1032 rescue Exception => e
1033 puts 'Shutting down gateway due to exception 013: ' + e.message
1034 SGXbwmsgsv2.shutdown
1035 puts 'Gateway has terminated.'
1036 EM.stop
1037 end
1038end
1039
1040at_exit do
1041 $stdout.sync = true
1042
1043 puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\
1044 "==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
1045
1046 if ARGV.size != 7 and ARGV.size != 8
1047 puts "Usage: sgx-bwmsgsv2.rb <component_jid> "\
1048 "<component_password> <server_hostname> "\
1049 "<server_port> <application_id> "\
1050 "<http_listen_port> <mms_proxy_prefix_url> [V1_creds_file]"
1051 exit 0
1052 end
1053
1054 t = Time.now
1055 puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
1056
1057 EM.run do
1058 REDIS = EM::Hiredis.connect
1059
1060 SGXbwmsgsv2.run
1061
1062 # required when using Prosody otherwise disconnects on 6-hour inactivity
1063 EM.add_periodic_timer(3600) do
1064 msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1065 msg.from = ARGV[0]
1066 SGXbwmsgsv2.write(msg)
1067 end
1068
1069 server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
1070 server.api = WebhookHandler.new
1071 server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1072 server.logger = Log4r::Logger.new('goliath')
1073 server.logger.add(Log4r::StdoutOutputter.new('console'))
1074 server.logger.level = Log4r::INFO
1075 server.start do
1076 ["INT", "TERM"].each do |sig|
1077 trap(sig) do
1078 EM.defer do
1079 puts 'Shutting down gateway...'
1080 SGXbwmsgsv2.shutdown
1081
1082 puts 'Gateway has terminated.'
1083 EM.stop
1084 end
1085 end
1086 end
1087 end
1088 end
1089end