1#!/usr/bin/env ruby
2#
3# Copyright (C) 2017-2020 Denver Gingerich <denver@ossguy.com>
4# Copyright (C) 2017 Stephen Paul Weber <singpolyma@singpolyma.net>
5#
6# This file is part of sgx-bwmsgsv2.
7#
8# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
9# the terms of the GNU Affero General Public License as published by the Free
10# Software Foundation, either version 3 of the License, or (at your option) any
11# later version.
12#
13# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
14# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
16# details.
17#
18# You should have received a copy of the GNU Affero General Public License along
19# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
20
21require 'blather/client/dsl'
22require 'em-hiredis'
23require 'em-http-request'
24require 'json'
25require 'securerandom'
26require 'time'
27require 'uri'
28require 'webrick'
29
30require 'goliath/api'
31require 'goliath/server'
32require 'log4r'
33
34require_relative 'em_promise'
35
36def panic(e)
37 puts "Shutting down gateway due to exception: #{e.message}"
38 puts e.backtrace
39 SGXbwmsgsv2.shutdown
40 puts 'Gateway has terminated.'
41 EM.stop
42end
43
44def extract_shortcode(dest)
45 num, context = dest.split(';', 2)
46 num if context && context == 'phone-context=ca-us.phone-context.soprani.ca'
47end
48
49def is_anonymous_tel?(dest)
50 num, context = dest.split(';', 2)
51 context && context == 'phone-context=anonymous.phone-context.soprani.ca'
52end
53
54class SGXClient < Blather::Client
55 def register_handler(type, *guards, &block)
56 super(type, *guards) { |*args| wrap_handler(*args, &block) }
57 end
58
59 def register_handler_before(type, *guards, &block)
60 check_handler(type, guards)
61 handler = lambda { |*args| wrap_handler(*args, &block) }
62
63 @handlers[type] ||= []
64 @handlers[type].unshift([guards, handler])
65 end
66
67protected
68
69 def wrap_handler(*args, &block)
70 v = block.call(*args)
71 v.catch(&method(:panic)) if v.is_a?(Promise)
72 true # Do not run other handlers unless throw :pass
73 rescue Exception => e
74 panic(e)
75 end
76end
77
78# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
79module CatapultSettingFlagBits
80 VOICEMAIL_TRANSCRIPTION_DISABLED = 0
81 MMS_ON_OOB_URL = 1
82end
83
84module SGXbwmsgsv2
85 extend Blather::DSL
86
87 @client = SGXClient.new
88 @gateway_features = [
89 "http://jabber.org/protocol/disco#info",
90 "http://jabber.org/protocol/address/",
91 "jabber:iq:register"
92 ]
93
94 def self.run
95 # TODO: read/save ARGV[7] creds to local variables
96 client.run
97 end
98
99 # so classes outside this module can write messages, too
100 def self.write(stanza)
101 client.write(stanza)
102 end
103
104 def self.before_handler(type, *guards, &block)
105 client.register_handler_before(type, *guards, &block)
106 end
107
108 def self.send_media(from, to, media_url, desc=nil, subject=nil, m=nil)
109 # we assume media_url is one of these (always the case so far):
110 # https://api.catapult.inetwork.com/v1/users/[uid]/media/[file]
111 # https://messaging.bandwidth.com/api/v2/users/[u]/media/[path]
112
113 usr = to
114 pth = ''
115 if media_url.start_with?(
116 'https://api.catapult.inetwork.com/v1/users/')
117
118 # TODO: MUST fix this TERRIBLE hack
119 # there must be a catapult_cred-<usr> key with V1 creds
120 usr = 'v1'
121
122 pth = media_url.split('/', 8)[7]
123
124 elsif media_url.start_with?(
125 'https://messaging.bandwidth.com/api/v2/users/')
126
127 pth = media_url.split('/', 9)[8]
128 else
129 puts "ERROR2: unrecognized media_url: '#{media_url}'"
130 return
131 end
132
133 # the caller must guarantee that 'to' is a bare JID
134 proxy_url = ARGV[6] + WEBrick::HTTPUtils.escape(usr) + '/' + pth
135
136 puts 'ORIG_URL: ' + media_url
137 puts 'PROX_URL: ' + proxy_url
138
139 # put URL in the body (so Conversations will still see it)...
140 msg = Blather::Stanza::Message.new(to, proxy_url)
141 if m
142 msg = m.copy
143 msg.body = proxy_url
144 end
145 msg.from = from
146 msg.subject = subject if subject
147
148 # ...but also provide URL in XEP-0066 (OOB) fashion
149 # TODO: confirm client supports OOB or don't send this
150 x = Nokogiri::XML::Node.new 'x', msg.document
151 x['xmlns'] = 'jabber:x:oob'
152
153 urln = Nokogiri::XML::Node.new 'url', msg.document
154 urlc = Nokogiri::XML::Text.new proxy_url, msg.document
155 urln.add_child(urlc)
156 x.add_child(urln)
157
158 if desc
159 descn = Nokogiri::XML::Node.new('desc', msg.document)
160 descc = Nokogiri::XML::Text.new(desc, msg.document)
161 descn.add_child(descc)
162 x.add_child(descn)
163 end
164
165 msg.add_child(x)
166
167 write(msg)
168 rescue Exception => e
169 panic(e)
170 end
171
172 def self.error_msg(orig, query_node, type, name, text=nil)
173 orig.type = :error
174
175 error = Nokogiri::XML::Node.new 'error', orig.document
176 error['type'] = type
177 orig.add_child(error)
178
179 suberr = Nokogiri::XML::Node.new name, orig.document
180 suberr['xmlns'] = 'urn:ietf:params:xml:ns:xmpp-stanzas'
181 error.add_child(suberr)
182
183 orig.add_child(query_node) if query_node
184
185 # TODO: add some explanatory xml:lang='en' text (see text param)
186 puts "RESPONSE3: #{orig.inspect}"
187 return orig
188 end
189
190 # workqueue_count MUST be 0 or else Blather uses threads!
191 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, workqueue_count: 0
192
193 def self.pass_on_message(m, users_num, jid)
194 # setup delivery receipt; similar to a reply
195 rcpt = ReceiptMessage.new(m.from.stripped)
196 rcpt.from = m.to
197
198 # pass original message (before sending receipt)
199 m.to = jid
200 m.from = "#{users_num}@#{ARGV[0]}"
201
202 puts 'XRESPONSE0: ' + m.inspect
203 write_to_stream m
204
205 # send a delivery receipt back to the sender
206 # TODO: send only when requested per XEP-0184
207 # TODO: pass receipts from target if supported
208
209 # TODO: put in member/instance variable
210 rcpt['id'] = SecureRandom.uuid
211 rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
212 rcvd['xmlns'] = 'urn:xmpp:receipts'
213 rcvd['id'] = m.id
214 rcpt.add_child(rcvd)
215
216 puts 'XRESPONSE1: ' + rcpt.inspect
217 write_to_stream rcpt
218 end
219
220 def self.call_catapult(
221 token, secret, m, pth, body=nil,
222 head={}, code=[200], respond_with=:body
223 )
224 # pth looks like one of:
225 # "api/v2/users/#{user_id}/[endpoint_name]"
226 # "v1/users/#{user_id}/[endpoint_name]"
227
228 url_prefix = ''
229
230 # TODO: need to make a separate thing for voice.bw.c eventually
231 if pth.start_with? 'api/v2/users'
232 url_prefix = 'https://messaging.bandwidth.com/'
233 elsif pth.start_with? 'v1/users'
234 # begin hack for running V2 messages along with V1 voice
235 url_prefix = 'https://api.catapult.inetwork.com/'
236 # TODO: set token and secret to vals provided at startup
237 # TODO: replace pth's user_id with user_id from ARGV[7]
238 # -> pth = "v1/users/#{new_id}/" + pth.split('/', 4)[3]
239 else
240 # TODO: error
241 end
242
243 EM::HttpRequest.new(
244 url_prefix + pth
245 ).public_send(
246 m,
247 head: {
248 'Authorization' => [token, secret]
249 }.merge(head),
250 body: body
251 ).then { |http|
252 puts "API response to send: #{http.response} with code"\
253 " response.code #{http.response_header.status}"
254
255 if code.include?(http.response_header.status)
256 case respond_with
257 when :body
258 http.response
259 when :headers
260 http.response_header
261 else
262 http
263 end
264 else
265 EMPromise.reject(http.response_header.status)
266 end
267 }
268 end
269
270 def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
271 usern)
272
273 # TODO: fix indentation
274
275 un = s.at("oob|x > oob|url", oob: "jabber:x:oob")
276 if not un
277 puts "MMSOOB: no url node found so process as normal"
278 return to_catapult(s, nil, num_dest, user_id, token,
279 secret, usern)
280 end
281 puts "MMSOOB: found a url node - checking if to make MMS..."
282
283 # TODO: check size of file at un.text and shrink if need
284
285 body = s.respond_to?(:body) ? s.body : ''
286 # some clients send URI in both body & <url/> so delete
287 s.body = body.sub(/\s*#{Regexp::escape(un.text)}\s*$/, '')
288
289 puts "MMSOOB: url text is '#{un.text}'"
290 puts "MMSOOB: the body is '#{body.to_s.strip}'"
291
292 puts "MMSOOB: sending MMS since found OOB & user asked"
293 to_catapult(s, un.text, num_dest, user_id, token,
294 secret, usern)
295 end
296
297 def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
298 body = s.respond_to?(:body) ? s.body : ''
299 if murl.to_s.empty? && body.to_s.strip.empty?
300 return EMPromise.reject(
301 [:modify, 'policy-violation']
302 )
303 end
304
305 extra = {}
306 if murl
307 extra = { media: murl }
308 end
309
310 call_catapult(
311 token,
312 secret,
313 :post,
314 "api/v2/users/#{user_id}/messages",
315 JSON.dump(extra.merge(
316 from: usern,
317 to: num_dest,
318 text: body,
319 applicationId: ARGV[4],
320 tag:
321 # callbacks need id and resourcepart
322 WEBrick::HTTPUtils.escape(s.id.to_s) +
323 ' ' +
324 WEBrick::HTTPUtils.escape(
325 s.from.resource.to_s
326 )
327 )),
328 {'Content-Type' => 'application/json'},
329 [201]
330 ).catch {
331 # TODO: add text; mention code number
332 EMPromise.reject(
333 [:cancel, 'internal-server-error']
334 )
335 }
336
337 t = Time.now
338 tai_timestamp = `./tai`.strip
339 tai_yyyymmdd = Time.at(tai_timestamp.to_i).strftime('%Y%m%d')
340 puts "SMU %d.%09d, %s: msg for %s sent on %s - incrementing\n" %
341 [t.to_i, t.nsec, tai_timestamp, usern, tai_yyyymmdd]
342
343 REDIS.incr('usage_messages-' + tai_yyyymmdd + '-' +
344 usern).then { |total|
345
346 t = Time.now
347 puts "SMU %d.%09d: total msgs for %s-%s now at %s\n" %
348 [t.to_i, t.nsec, tai_yyyymmdd, usern, total]
349 }
350 end
351
352 def self.validate_num(m)
353 # if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
354 if m.to == ARGV[0]
355 an = m.children.find { |v| v.element_name == "addresses"
356 }
357 if not an
358 return EMPromise.reject([:cancel,
359 'item-not-found'])
360 end
361 puts "ADRXEP: found an addresses node - iterate addrs.."
362
363 nums = []
364 an.children.each do |e|
365 num = ''
366 type = ''
367 e.attributes.each do |c|
368 if c[0] == 'type'
369 if c[1] != 'to'
370 # TODO: error
371 end
372 type = c[1].to_s
373 elsif c[0] == 'uri'
374 if !c[1].to_s.start_with? 'sms:'
375 # TODO: error
376 end
377 num = c[1].to_s[4..-1]
378 # TODO: confirm num validates
379 else
380 # TODO: error - unexpected name
381 end
382 end
383 if num.empty? or type.empty?
384 # TODO: error
385 end
386 nums << num
387 end
388 return nums
389 end
390
391 # if not sent to SGX domain, then assume destination is in 'to'
392 EMPromise.resolve(m.to.node.to_s).then { |num_dest|
393 if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
394 next num_dest if num_dest[0] == '+'
395 shortcode = extract_shortcode(num_dest)
396 next shortcode if shortcode
397 end
398
399 if is_anonymous_tel?(num_dest)
400 EMPromise.reject([:cancel, 'gone'])
401 else
402 # TODO: text re num not (yet) supportd/implmentd
403 EMPromise.reject([:cancel, 'item-not-found'])
404 end
405 }
406 end
407
408 def self.fetch_catapult_cred_for(jid)
409 cred_key = "catapult_cred-#{jid.stripped}"
410 REDIS.lrange(cred_key, 0, 3).then { |creds|
411 if creds.length < 4
412 # TODO: add text re credentials not registered
413 EMPromise.reject(
414 [:auth, 'registration-required']
415 )
416 else
417 creds
418 end
419 }
420 end
421
422 message :error? do |m|
423 # TODO: report it somewhere/somehow - eat for now so no err loop
424 puts "EATERROR1: #{m.inspect}"
425 end
426
427 message :body do |m|
428 EMPromise.all([
429 validate_num(m),
430 fetch_catapult_cred_for(m.from)
431 ]).then { |(num_dest, creds)|
432 jid_key = "catapult_jid-#{num_dest}"
433 REDIS.get(jid_key).then { |jid|
434 [jid, num_dest] + creds
435 }
436 }.then { |(jid, num_dest, *creds)|
437 if jid
438 cred_key = "catapult_cred-#{jid}"
439 REDIS.lrange(cred_key, 0, 0).then { |other_user|
440 [jid, num_dest] + creds + other_user
441 }
442 else
443 [jid, num_dest] + creds + [nil]
444 end
445 }.then { |(jid, num_dest, *creds, other_user)|
446 # if destination user is in the system pass on directly
447 if other_user and not other_user.start_with? 'u-'
448 pass_on_message(m, creds.last, jid)
449 else
450 to_catapult_possible_oob(m, num_dest, *creds)
451 end
452 }.catch { |e|
453 if e.is_a?(Array) && e.length == 2
454 write_to_stream error_msg(m.reply, m.body, *e)
455 else
456 EMPromise.reject(e)
457 end
458 }
459 end
460
461 def self.user_cap_identities
462 [{category: 'client', type: 'sms'}]
463 end
464
465 # TODO: must re-add stuff so can do ad-hoc commands
466 def self.user_cap_features
467 [
468 "urn:xmpp:receipts",
469 ]
470 end
471
472 def self.add_gateway_feature(feature)
473 @gateway_features << feature
474 @gateway_features.uniq!
475 end
476
477 subscription :request? do |p|
478 puts "PRESENCE1: #{p.inspect}"
479
480 # subscriptions are allowed from anyone - send reply immediately
481 msg = Blather::Stanza::Presence.new
482 msg.to = p.from
483 msg.from = p.to
484 msg.type = :subscribed
485
486 puts 'RESPONSE5a: ' + msg.inspect
487 write_to_stream msg
488
489 # send a <presence> immediately; not automatically probed for it
490 # TODO: refactor so no "presence :probe? do |p|" duplicate below
491 caps = Blather::Stanza::Capabilities.new
492 # TODO: user a better node URI (?)
493 caps.node = 'http://catapult.sgx.soprani.ca/'
494 caps.identities = user_cap_identities
495 caps.features = user_cap_features
496
497 msg = caps.c
498 msg.to = p.from
499 msg.from = p.to.to_s + '/sgx'
500
501 puts 'RESPONSE5b: ' + msg.inspect
502 write_to_stream msg
503
504 # need to subscribe back so Conversations displays images inline
505 msg = Blather::Stanza::Presence.new
506 msg.to = p.from.to_s.split('/', 2)[0]
507 msg.from = p.to.to_s.split('/', 2)[0]
508 msg.type = :subscribe
509
510 puts 'RESPONSE5c: ' + msg.inspect
511 write_to_stream msg
512 end
513
514 presence :probe? do |p|
515 puts 'PRESENCE2: ' + p.inspect
516
517 caps = Blather::Stanza::Capabilities.new
518 # TODO: user a better node URI (?)
519 caps.node = 'http://catapult.sgx.soprani.ca/'
520 caps.identities = user_cap_identities
521 caps.features = user_cap_features
522
523 msg = caps.c
524 msg.to = p.from
525 msg.from = p.to.to_s + '/sgx'
526
527 puts 'RESPONSE6: ' + msg.inspect
528 write_to_stream msg
529 end
530
531 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#info' do |i|
532 # respond to capabilities request for an sgx-bwmsgsv2 number JID
533 if i.to.node
534 # TODO: confirm the node URL is expected using below
535 #puts "XR[node]: #{xpath_result[0]['node']}"
536
537 msg = i.reply
538 msg.node = i.node
539 msg.identities = user_cap_identities
540 msg.features = user_cap_features
541
542 puts 'RESPONSE7: ' + msg.inspect
543 write_to_stream msg
544 next
545 end
546
547 # respond to capabilities request for sgx-bwmsgsv2 itself
548 msg = i.reply
549 msg.node = i.node
550 msg.identities = [{
551 name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
552 type: 'sms', category: 'gateway'
553 }]
554 msg.features = @gateway_features
555 write_to_stream msg
556 end
557
558 def self.check_then_register(i, *creds)
559 jid_key = "catapult_jid-#{creds.last}"
560 bare_jid = i.from.stripped
561 cred_key = "catapult_cred-#{bare_jid}"
562
563 REDIS.get(jid_key).then { |existing_jid|
564 if existing_jid && existing_jid != bare_jid
565 # TODO: add/log text: credentials exist already
566 EMPromise.reject([:cancel, 'conflict'])
567 end
568 }.then {
569 REDIS.lrange(cred_key, 0, 3)
570 }.then { |existing_creds|
571 # TODO: add/log text: credentials exist already
572 if existing_creds.length == 4 && creds != existing_creds
573 EMPromise.reject([:cancel, 'conflict'])
574 elsif existing_creds.length < 4
575 REDIS.rpush(cred_key, *creds).then { |length|
576 if length != 4
577 EMPromise.reject([
578 :cancel,
579 'internal-server-error'
580 ])
581 end
582 }
583 end
584 }.then {
585 # not necessary if existing_jid non-nil, easier this way
586 REDIS.set(jid_key, bare_jid)
587 }.then { |result|
588 if result != 'OK'
589 # TODO: add txt re push failure
590 EMPromise.reject(
591 [:cancel, 'internal-server-error']
592 )
593 end
594 }.then {
595 write_to_stream i.reply
596 }
597 end
598
599 def self.creds_from_registration_query(qn)
600 xn = qn.children.find { |v| v.element_name == "x" }
601
602 if xn
603 xn.children.each_with_object({}) do |field, h|
604 next if field.element_name != "field"
605 val = field.children.find { |v|
606 v.element_name == "value"
607 }
608
609 case field['var']
610 when 'nick'
611 h[:user_id] = val.text
612 when 'username'
613 h[:api_token] = val.text
614 when 'password'
615 h[:api_secret] = val.text
616 when 'phone'
617 h[:phone_num] = val.text
618 else
619 # TODO: error
620 puts "?: #{field['var']}"
621 end
622 end
623 else
624 qn.children.each_with_object({}) do |field, h|
625 case field.element_name
626 when "nick"
627 h[:user_id] = field.text
628 when "username"
629 h[:api_token] = field.text
630 when "password"
631 h[:api_secret] = field.text
632 when "phone"
633 h[:phone_num] = field.text
634 end
635 end
636 end.values_at(:user_id, :api_token, :api_secret, :phone_num)
637 end
638
639 def self.process_registration(i, qn)
640 EMPromise.resolve(
641 qn.children.find { |v| v.element_name == "remove" }
642 ).then { |rn|
643 if rn
644 puts "received <remove/> - ignoring for now..."
645 EMPromise.reject(:done)
646 else
647 creds_from_registration_query(qn)
648 end
649 }.then { |user_id, api_token, api_secret, phone_num|
650 if phone_num[0] == '+'
651 [user_id, api_token, api_secret, phone_num]
652 else
653 # TODO: add text re number not (yet) supported
654 EMPromise.reject([:cancel, 'item-not-found'])
655 end
656 }.then { |user_id, api_token, api_secret, phone_num|
657 # TODO: find way to verify #{phone_num}, too
658 call_catapult(
659 api_token,
660 api_secret,
661 :get,
662 "api/v2/users/#{user_id}/media"
663 ).then { |response|
664 params = JSON.parse(response)
665 # TODO: confirm params is array - could be empty
666
667 puts "register got str #{response.to_s[0..999]}"
668
669 check_then_register(
670 i,
671 user_id,
672 api_token,
673 api_secret,
674 phone_num
675 )
676 }
677 }.catch { |e|
678 EMPromise.reject(case e
679 when 401
680 # TODO: add text re bad credentials
681 [:auth, 'not-authorized']
682 when 404
683 # TODO: add text re number not found or disabled
684 [:cancel, 'item-not-found']
685 when Integer
686 [:modify, 'not-acceptable']
687 else
688 e
689 end)
690 }
691 end
692
693 def self.registration_form(orig, existing_number=nil)
694 msg = Nokogiri::XML::Node.new 'query', orig.document
695 msg['xmlns'] = 'jabber:iq:register'
696
697 if existing_number
698 msg.add_child(
699 Nokogiri::XML::Node.new(
700 'registered', msg.document
701 )
702 )
703 end
704
705 # TODO: update "User Id" x2 below (to "accountId"?), and others?
706 n1 = Nokogiri::XML::Node.new(
707 'instructions', msg.document
708 )
709 n1.content = "Enter the information from your Account "\
710 "page as well as the Phone Number\nin your "\
711 "account you want to use (ie. '+12345678901')"\
712 ".\nUser Id is nick, API Token is username, "\
713 "API Secret is password, Phone Number is phone"\
714 ".\n\nThe source code for this gateway is at "\
715 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
716 "\nCopyright (C) 2017-2020 Denver Gingerich "\
717 "and others, licensed under AGPLv3+."
718 n2 = Nokogiri::XML::Node.new 'nick', msg.document
719 n3 = Nokogiri::XML::Node.new 'username', msg.document
720 n4 = Nokogiri::XML::Node.new 'password', msg.document
721 n5 = Nokogiri::XML::Node.new 'phone', msg.document
722 n5.content = existing_number.to_s
723 msg.add_child(n1)
724 msg.add_child(n2)
725 msg.add_child(n3)
726 msg.add_child(n4)
727 msg.add_child(n5)
728
729 x = Blather::Stanza::X.new :form, [
730 {
731 required: true, type: :"text-single",
732 label: 'User Id', var: 'nick'
733 },
734 {
735 required: true, type: :"text-single",
736 label: 'API Token', var: 'username'
737 },
738 {
739 required: true, type: :"text-private",
740 label: 'API Secret', var: 'password'
741 },
742 {
743 required: true, type: :"text-single",
744 label: 'Phone Number', var: 'phone',
745 value: existing_number.to_s
746 }
747 ]
748 x.title = 'Register for '\
749 'Soprani.ca Gateway to XMPP - Bandwidth API V2'
750 x.instructions = "Enter the details from your Account "\
751 "page as well as the Phone Number\nin your "\
752 "account you want to use (ie. '+12345678901')"\
753 ".\n\nThe source code for this gateway is at "\
754 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
755 "\nCopyright (C) 2017-2020 Denver Gingerich "\
756 "and others, licensed under AGPLv3+."
757 msg.add_child(x)
758
759 orig.add_child(msg)
760
761 return orig
762 end
763
764 iq '/iq/ns:query', ns: 'jabber:iq:register' do |i, qn|
765 puts "IQ: #{i.inspect}"
766
767 case i.type
768 when :set
769 process_registration(i, qn)
770 when :get
771 bare_jid = i.from.stripped
772 cred_key = "catapult_cred-#{bare_jid}"
773 REDIS.lindex(cred_key, 3).then { |existing_number|
774 reply = registration_form(i.reply, existing_number)
775 puts "RESPONSE2: #{reply.inspect}"
776 write_to_stream reply
777 }
778 else
779 # Unknown IQ, ignore for now
780 EMPromise.reject(:done)
781 end.catch { |e|
782 if e.is_a?(Array) && e.length == 2
783 write_to_stream error_msg(i.reply, qn, *e)
784 elsif e != :done
785 EMPromise.reject(e)
786 end
787 }.catch(&method(:panic))
788 end
789
790 iq :get? do |i|
791 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
792 end
793
794 iq :set? do |i|
795 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
796 end
797end
798
799class ReceiptMessage < Blather::Stanza
800 def self.new(to=nil)
801 node = super :message
802 node.to = to
803 node
804 end
805end
806
807class WebhookHandler < Goliath::API
808 use Goliath::Rack::Params
809
810 def response(env)
811 # TODO: add timestamp grab here, and MUST include ./tai version
812
813 puts 'ENV: ' + env.reject{ |k| k == 'params' }.to_s
814
815 if params.empty?
816 puts 'PARAMS empty!'
817 return [200, {}, "OK"]
818 end
819
820 # TODO: process each message in list, not just first one
821 jparams = params['_json'][0]['message']
822
823 type = params['_json'][0]['type']
824
825 users_num = ''
826 others_num = ''
827 if jparams['direction'] == 'in'
828 users_num = jparams['owner']
829 others_num = jparams['from']
830 elsif jparams['direction'] == 'out'
831 users_num = jparams['from']
832 others_num = jparams['owner']
833 else
834 # TODO: exception or similar
835 puts "big prob: '" + jparams['direction'] + "'" + body
836 return [200, {}, "OK"]
837 end
838
839 puts 'BODY - messageId: ' + jparams['id'] +
840 ', eventType: ' + type +
841 ', time: ' + jparams['time'] +
842 ', direction: ' + jparams['direction'] +
843 #', state: ' + jparams['state'] +
844 ', deliveryState: ' + (jparams['deliveryState'] ?
845 jparams['deliveryState'] : 'NONE') +
846 ', deliveryCode: ' + (jparams['deliveryCode'] ?
847 jparams['deliveryCode'] : 'NONE') +
848 ', deliveryDesc: ' + (jparams['deliveryDescription'] ?
849 jparams['deliveryDescription'] : 'NONE') +
850 ', tag: ' + (jparams['tag'] ? jparams['tag'] : 'NONE') +
851 ', media: ' + (jparams['media'] ?
852 jparams['media'].to_s : 'NONE')
853
854 if others_num[0] != '+'
855 # TODO: check that others_num actually a shortcode first
856 others_num +=
857 ';phone-context=ca-us.phone-context.soprani.ca'
858 end
859
860 jid_key = "catapult_jid-#{users_num}"
861 bare_jid = REDIS.get(jid_key).promise.sync
862
863 if !bare_jid
864 puts "jid_key (#{jid_key}) DNE; BW API misconfigured?"
865
866 # TODO: likely not appropriate; give error to BW API?
867 # TODO: add text re credentials not being registered
868 #write_to_stream error_msg(m.reply, m.body, :auth,
869 # 'registration-required')
870 return [200, {}, "OK"]
871 end
872
873 msg = nil
874 case jparams['direction']
875 when 'in'
876 text = ''
877 case type
878 when 'sms'
879 text = jparams['text']
880 when 'mms'
881 has_media = false
882
883 if jparams['text'].empty?
884 if not has_media
885 text = '[suspected group msg '\
886 'with no text (odd)]'
887 end
888 else
889 text = if has_media
890 # TODO: write/use a caption XEP
891 jparams['text']
892 else
893 '[suspected group msg '\
894 '(recipient list not '\
895 'available) with '\
896 'following text] ' +
897 jparams['text']
898 end
899 end
900
901 # ie. if text param non-empty or had no media
902 if not text.empty?
903 msg = Blather::Stanza::Message.new(
904 bare_jid, text)
905 msg.from = others_num + '@' + ARGV[0]
906 SGXbwmsgsv2.write(msg)
907 end
908
909 return [200, {}, "OK"]
910 when 'message-received'
911 # TODO: handle group chat, and fix above
912 text = jparams['text']
913
914 if jparams['to'].length > 1
915 msg = Blather::Stanza::Message.new(
916 'cheogram.com', text) # TODO
917
918 addrs = Nokogiri::XML::Node.new(
919 'addresses', msg.document)
920 addrs['xmlns'] = 'http://jabber.org/' +
921 'protocol/address'
922
923 addr1 = Nokogiri::XML::Node.new(
924 'address', msg.document)
925 addr1['type'] = 'to'
926 addr1['jid'] = bare_jid
927 addrs.add_child(addr1)
928
929 jparams['to'].each do |receiver|
930 if receiver == users_num
931 # already there in addr1
932 next
933 end
934
935 addrn = Nokogiri::XML::Node.new(
936 'address', msg.document)
937 addrn['type'] = 'to'
938 addrn['uri'] = "sms:#{receiver}"
939 addrn['delivered'] = 'true'
940 addrs.add_child(addrn)
941 end
942
943 msg.add_child(addrs)
944
945 # TODO: delete
946 puts "RESPONSE9: #{msg.inspect}"
947 end
948
949 jparams['media'].each do |media_url|
950 if not media_url.end_with?(
951 '.smil', '.txt', '.xml'
952 )
953
954 has_media = true
955 SGXbwmsgsv2.send_media(
956 others_num + '@' +
957 ARGV[0],
958 bare_jid, media_url,
959 nil, nil, msg
960 )
961 end
962 end unless not jparams['media']
963 else
964 text = "unknown type (#{type})"\
965 " with text: " + jparams['text']
966
967 # TODO: log/notify of this properly
968 puts text
969 end
970
971 if not msg
972 msg = Blather::Stanza::Message.new(bare_jid,
973 text)
974 end
975 else # per prior switch, this is: jparams['direction'] == 'out'
976 tag_parts = jparams['tag'].split(/ /, 2)
977 id = WEBrick::HTTPUtils.unescape(tag_parts[0])
978 resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
979
980 # TODO: remove this hack
981 if jparams['to'].length > 1
982 puts "WARN! group no rcpt: #{users_num}"
983 return [200, {}, "OK"]
984 end
985
986 case type
987 when 'message-failed'
988 # create a bare message like the one user sent
989 msg = Blather::Stanza::Message.new(
990 others_num + '@' + ARGV[0])
991 msg.from = bare_jid + '/' + resourcepart
992 msg['id'] = id
993
994 # TODO: add 'errorCode' and/or 'description' val
995 # create an error reply to the bare message
996 msg = Blather::StanzaError.new(
997 msg,
998 'recipient-unavailable',
999 :wait
1000 ).to_node
1001
1002 # TODO: make prettier: this should be done above
1003 others_num = params['_json'][0]['to']
1004 when 'message-delivered'
1005
1006 msg = ReceiptMessage.new(bare_jid)
1007
1008 # TODO: put in member/instance variable
1009 msg['id'] = SecureRandom.uuid
1010
1011 # TODO: send only when requested per XEP-0184
1012 rcvd = Nokogiri::XML::Node.new(
1013 'received',
1014 msg.document
1015 )
1016 rcvd['xmlns'] = 'urn:xmpp:receipts'
1017 rcvd['id'] = id
1018 msg.add_child(rcvd)
1019
1020 # TODO: make prettier: this should be done above
1021 others_num = params['_json'][0]['to']
1022 else
1023 # TODO: notify somehow of unknown state receivd?
1024 puts "message with id #{id} has "\
1025 "other type #{type}"
1026 return [200, {}, "OK"]
1027 end
1028
1029 puts "RESPONSE4: #{msg.inspect}"
1030 end
1031
1032 msg.from = others_num + '@' + ARGV[0]
1033 SGXbwmsgsv2.write(msg)
1034
1035 [200, {}, "OK"]
1036
1037 rescue Exception => e
1038 puts 'Shutting down gateway due to exception 013: ' + e.message
1039 SGXbwmsgsv2.shutdown
1040 puts 'Gateway has terminated.'
1041 EM.stop
1042 end
1043end
1044
1045at_exit do
1046 $stdout.sync = true
1047
1048 puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\
1049 "==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
1050
1051 if ARGV.size != 7 and ARGV.size != 8
1052 puts "Usage: sgx-bwmsgsv2.rb <component_jid> "\
1053 "<component_password> <server_hostname> "\
1054 "<server_port> <application_id> "\
1055 "<http_listen_port> <mms_proxy_prefix_url> [V1_creds_file]"
1056 exit 0
1057 end
1058
1059 t = Time.now
1060 puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
1061
1062 EM.run do
1063 REDIS = EM::Hiredis.connect
1064
1065 SGXbwmsgsv2.run
1066
1067 # required when using Prosody otherwise disconnects on 6-hour inactivity
1068 EM.add_periodic_timer(3600) do
1069 msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1070 msg.from = ARGV[0]
1071 SGXbwmsgsv2.write(msg)
1072 end
1073
1074 server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
1075 server.api = WebhookHandler.new
1076 server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1077 server.logger = Log4r::Logger.new('goliath')
1078 server.logger.add(Log4r::StdoutOutputter.new('console'))
1079 server.logger.level = Log4r::INFO
1080 server.start do
1081 ["INT", "TERM"].each do |sig|
1082 trap(sig) do
1083 EM.defer do
1084 puts 'Shutting down gateway...'
1085 SGXbwmsgsv2.shutdown
1086
1087 puts 'Gateway has terminated.'
1088 EM.stop
1089 end
1090 end
1091 end
1092 end
1093 end
1094end