1#!/usr/bin/env ruby
2#
3# Copyright (C) 2017-2020 Denver Gingerich <denver@ossguy.com>
4# Copyright (C) 2017 Stephen Paul Weber <singpolyma@singpolyma.net>
5#
6# This file is part of sgx-bwmsgsv2.
7#
8# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
9# the terms of the GNU Affero General Public License as published by the Free
10# Software Foundation, either version 3 of the License, or (at your option) any
11# later version.
12#
13# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
14# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
16# details.
17#
18# You should have received a copy of the GNU Affero General Public License along
19# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
20
21require 'blather/client/dsl'
22require 'em-hiredis'
23require 'em-http-request'
24require 'json'
25require 'securerandom'
26require 'time'
27require 'uri'
28require 'webrick'
29
30require 'goliath/api'
31require 'goliath/server'
32require 'log4r'
33
34require_relative 'em_promise'
35
36def panic(e)
37 puts "Shutting down gateway due to exception: #{e.message}"
38 puts e.backtrace
39 SGXbwmsgsv2.shutdown
40 puts 'Gateway has terminated.'
41 EM.stop
42end
43
44def extract_shortcode(dest)
45 num, context = dest.split(';', 2)
46 num if context && context == 'phone-context=ca-us.phone-context.soprani.ca'
47end
48
49def is_anonymous_tel?(dest)
50 num, context = dest.split(';', 2)
51 context && context == 'phone-context=anonymous.phone-context.soprani.ca'
52end
53
54class SGXClient < Blather::Client
55 def register_handler(type, *guards, &block)
56 super(type, *guards) { |*args| wrap_handler(*args, &block) }
57 end
58
59 def register_handler_before(type, *guards, &block)
60 check_handler(type, guards)
61 handler = lambda { |*args| wrap_handler(*args, &block) }
62
63 @handlers[type] ||= []
64 @handlers[type].unshift([guards, handler])
65 end
66
67protected
68
69 def wrap_handler(*args, &block)
70 v = block.call(*args)
71 v.catch(&method(:panic)) if v.is_a?(Promise)
72 true # Do not run other handlers unless throw :pass
73 rescue Exception => e
74 panic(e)
75 end
76end
77
78# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
79module CatapultSettingFlagBits
80 VOICEMAIL_TRANSCRIPTION_DISABLED = 0
81 MMS_ON_OOB_URL = 1
82end
83
84module SGXbwmsgsv2
85 extend Blather::DSL
86
87 @client = SGXClient.new
88 @gateway_features = [
89 "http://jabber.org/protocol/disco#info",
90 "http://jabber.org/protocol/address/",
91 "jabber:iq:register"
92 ]
93
94 def self.run
95 # TODO: read/save ARGV[7] creds to local variables
96 client.run
97 end
98
99 # so classes outside this module can write messages, too
100 def self.write(stanza)
101 client.write(stanza)
102 end
103
104 def self.before_handler(type, *guards, &block)
105 client.register_handler_before(type, *guards, &block)
106 end
107
108 def self.send_media(from, to, media_url, desc=nil, subject=nil, m=nil)
109 # we assume media_url is one of these (always the case so far):
110 # https://api.catapult.inetwork.com/v1/users/[uid]/media/[file]
111 # https://messaging.bandwidth.com/api/v2/users/[u]/media/[path]
112
113 usr = to
114 pth = ''
115 if media_url.start_with?(
116 'https://api.catapult.inetwork.com/v1/users/')
117
118 # TODO: MUST fix this TERRIBLE hack
119 # there must be a catapult_cred-<usr> key with V1 creds
120 usr = 'v1'
121
122 pth = media_url.split('/', 8)[7]
123
124 elsif media_url.start_with?(
125 'https://messaging.bandwidth.com/api/v2/users/')
126
127 pth = media_url.split('/', 9)[8]
128 else
129 puts "ERROR2: unrecognized media_url: '#{media_url}'"
130 return
131 end
132
133 # the caller must guarantee that 'to' is a bare JID
134 proxy_url = ARGV[6] + WEBrick::HTTPUtils.escape(usr) + '/' + pth
135
136 puts 'ORIG_URL: ' + media_url
137 puts 'PROX_URL: ' + proxy_url
138
139 # put URL in the body (so Conversations will still see it)...
140 msg = Blather::Stanza::Message.new(to, proxy_url)
141 if m
142 msg = m.copy
143 msg.body = proxy_url
144 end
145 msg.from = from
146 msg.subject = subject if subject
147
148 # ...but also provide URL in XEP-0066 (OOB) fashion
149 # TODO: confirm client supports OOB or don't send this
150 x = Nokogiri::XML::Node.new 'x', msg.document
151 x['xmlns'] = 'jabber:x:oob'
152
153 urln = Nokogiri::XML::Node.new 'url', msg.document
154 urlc = Nokogiri::XML::Text.new proxy_url, msg.document
155 urln.add_child(urlc)
156 x.add_child(urln)
157
158 if desc
159 descn = Nokogiri::XML::Node.new('desc', msg.document)
160 descc = Nokogiri::XML::Text.new(desc, msg.document)
161 descn.add_child(descc)
162 x.add_child(descn)
163 end
164
165 msg.add_child(x)
166
167 write(msg)
168 rescue Exception => e
169 panic(e)
170 end
171
172 def self.error_msg(orig, query_node, type, name, text=nil)
173 orig.type = :error
174
175 error = Nokogiri::XML::Node.new 'error', orig.document
176 error['type'] = type
177 orig.add_child(error)
178
179 suberr = Nokogiri::XML::Node.new name, orig.document
180 suberr['xmlns'] = 'urn:ietf:params:xml:ns:xmpp-stanzas'
181 error.add_child(suberr)
182
183 orig.add_child(query_node) if query_node
184
185 # TODO: add some explanatory xml:lang='en' text (see text param)
186 puts "RESPONSE3: #{orig.inspect}"
187 return orig
188 end
189
190 # workqueue_count MUST be 0 or else Blather uses threads!
191 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, workqueue_count: 0
192
193 def self.pass_on_message(m, users_num, jid)
194 # setup delivery receipt; similar to a reply
195 rcpt = ReceiptMessage.new(m.from.stripped)
196 rcpt.from = m.to
197
198 # pass original message (before sending receipt)
199 m.to = jid
200 m.from = "#{users_num}@#{ARGV[0]}"
201
202 puts 'XRESPONSE0: ' + m.inspect
203 write_to_stream m
204
205 # send a delivery receipt back to the sender
206 # TODO: send only when requested per XEP-0184
207 # TODO: pass receipts from target if supported
208
209 # TODO: put in member/instance variable
210 rcpt['id'] = SecureRandom.uuid
211 rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
212 rcvd['xmlns'] = 'urn:xmpp:receipts'
213 rcvd['id'] = m.id
214 rcpt.add_child(rcvd)
215
216 puts 'XRESPONSE1: ' + rcpt.inspect
217 write_to_stream rcpt
218 end
219
220 def self.call_catapult(
221 token, secret, m, pth, body=nil,
222 head={}, code=[200], respond_with=:body
223 )
224 # pth looks like one of:
225 # "api/v2/users/#{user_id}/[endpoint_name]"
226 # "v1/users/#{user_id}/[endpoint_name]"
227
228 url_prefix = ''
229
230 # TODO: need to make a separate thing for voice.bw.c eventually
231 if pth.start_with? 'api/v2/users'
232 url_prefix = 'https://messaging.bandwidth.com/'
233 elsif pth.start_with? 'v1/users'
234 # begin hack for running V2 messages along with V1 voice
235 url_prefix = 'https://api.catapult.inetwork.com/'
236 # TODO: set token and secret to vals provided at startup
237 # TODO: replace pth's user_id with user_id from ARGV[7]
238 # -> pth = "v1/users/#{new_id}/" + pth.split('/', 4)[3]
239 else
240 # TODO: error
241 end
242
243 EM::HttpRequest.new(
244 url_prefix + pth
245 ).public_send(
246 m,
247 head: {
248 'Authorization' => [token, secret]
249 }.merge(head),
250 body: body
251 ).then { |http|
252 puts "API response to send: #{http.response} with code"\
253 " response.code #{http.response_header.status}"
254
255 if code.include?(http.response_header.status)
256 case respond_with
257 when :body
258 http.response
259 when :headers
260 http.response_header
261 else
262 http
263 end
264 else
265 EMPromise.reject(http.response_header.status)
266 end
267 }
268 end
269
270 def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
271 usern)
272
273 # TODO: fix indentation
274
275 un = s.at("oob|x > oob|url", oob: "jabber:x:oob")
276 if not un
277 puts "MMSOOB: no url node found so process as normal"
278 to_catapult(s, nil, num_dest, user_id, token, secret,
279 usern)
280 return
281 end
282 puts "MMSOOB: found a url node - checking if to make MMS..."
283
284 # TODO: check size of file at un.text and shrink if need
285
286 body = s.respond_to?(:body) ? s.body : ''
287 # some clients send URI in both body & <url/> so delete
288 s.body = body.sub(/\s*#{Regexp::escape(un.text)}\s*$/, '')
289
290 puts "MMSOOB: url text is '#{un.text}'"
291 puts "MMSOOB: the body is '#{body.to_s.strip}'"
292
293 puts "MMSOOB: sending MMS since found OOB & user asked"
294 to_catapult(s, un.text, num_dest, user_id, token,
295 secret, usern)
296 end
297
298 def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
299 body = s.respond_to?(:body) ? s.body : ''
300 if murl.to_s.empty? && body.to_s.strip.empty?
301 return EMPromise.reject(
302 [:modify, 'policy-violation']
303 )
304 end
305
306 extra = {}
307 if murl
308 extra = { media: murl }
309 end
310
311 call_catapult(
312 token,
313 secret,
314 :post,
315 "api/v2/users/#{user_id}/messages",
316 JSON.dump(extra.merge(
317 from: usern,
318 to: num_dest,
319 text: body,
320 applicationId: ARGV[4],
321 tag:
322 # callbacks need id and resourcepart
323 WEBrick::HTTPUtils.escape(s.id.to_s) +
324 ' ' +
325 WEBrick::HTTPUtils.escape(
326 s.from.resource.to_s
327 )
328 )),
329 {'Content-Type' => 'application/json'},
330 [201]
331 ).catch {
332 # TODO: add text; mention code number
333 EMPromise.reject(
334 [:cancel, 'internal-server-error']
335 )
336 }
337
338 t = Time.now
339 tai_timestamp = `./tai`.strip
340 tai_yyyymmdd = Time.at(tai_timestamp.to_i).strftime('%Y%m%d')
341 puts "SMU %d.%09d, %s: msg for %s sent on %s - incrementing\n" %
342 [t.to_i, t.nsec, tai_timestamp, usern, tai_yyyymmdd]
343
344 REDIS.incr('usage_messages-' + tai_yyyymmdd + '-' +
345 usern).then { |total|
346
347 t = Time.now
348 puts "SMU %d.%09d: total msgs for %s-%s now at %s\n" %
349 [t.to_i, t.nsec, tai_yyyymmdd, usern, total]
350 }
351 end
352
353 def self.validate_num(m)
354 # if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
355 if m.to == ARGV[0]
356 an = m.children.find { |v| v.element_name == "addresses"
357 }
358 if not an
359 return EMPromise.reject([:cancel,
360 'item-not-found'])
361 end
362 puts "ADRXEP: found an addresses node - iterate addrs.."
363
364 nums = []
365 an.children.each do |e|
366 num = ''
367 type = ''
368 e.attributes.each do |c|
369 if c[0] == 'type'
370 if c[1] != 'to'
371 # TODO: error
372 end
373 type = c[1].to_s
374 elsif c[0] == 'uri'
375 if !c[1].to_s.start_with? 'sms:'
376 # TODO: error
377 end
378 num = c[1].to_s[4..-1]
379 # TODO: confirm num validates
380 else
381 # TODO: error - unexpected name
382 end
383 end
384 if num.empty? or type.empty?
385 # TODO: error
386 end
387 nums << num
388 end
389 return nums
390 end
391
392 # if not sent to SGX domain, then assume destination is in 'to'
393 EMPromise.resolve(m.to.node.to_s).then { |num_dest|
394 if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
395 next num_dest if num_dest[0] == '+'
396 shortcode = extract_shortcode(num_dest)
397 next shortcode if shortcode
398 end
399
400 if is_anonymous_tel?(num_dest)
401 EMPromise.reject([:cancel, 'gone'])
402 else
403 # TODO: text re num not (yet) supportd/implmentd
404 EMPromise.reject([:cancel, 'item-not-found'])
405 end
406 }
407 end
408
409 def self.fetch_catapult_cred_for(jid)
410 cred_key = "catapult_cred-#{jid.stripped}"
411 REDIS.lrange(cred_key, 0, 3).then { |creds|
412 if creds.length < 4
413 # TODO: add text re credentials not registered
414 EMPromise.reject(
415 [:auth, 'registration-required']
416 )
417 else
418 creds
419 end
420 }
421 end
422
423 message :error? do |m|
424 # TODO: report it somewhere/somehow - eat for now so no err loop
425 puts "EATERROR1: #{m.inspect}"
426 end
427
428 message :body do |m|
429 EMPromise.all([
430 validate_num(m),
431 fetch_catapult_cred_for(m.from)
432 ]).then { |(num_dest, creds)|
433 jid_key = "catapult_jid-#{num_dest}"
434 REDIS.get(jid_key).then { |jid|
435 [jid, num_dest] + creds
436 }
437 }.then { |(jid, num_dest, *creds)|
438 if jid
439 cred_key = "catapult_cred-#{jid}"
440 REDIS.lrange(cred_key, 0, 0).then { |other_user|
441 [jid, num_dest] + creds + other_user
442 }
443 else
444 [jid, num_dest] + creds + [nil]
445 end
446 }.then { |(jid, num_dest, *creds, other_user)|
447 # if destination user is in the system pass on directly
448 if other_user and not other_user.start_with? 'u-'
449 pass_on_message(m, creds.last, jid)
450 else
451 to_catapult_possible_oob(m, num_dest, *creds)
452 end
453 }.catch { |e|
454 if e.is_a?(Array) && e.length == 2
455 write_to_stream error_msg(m.reply, m.body, *e)
456 else
457 EMPromise.reject(e)
458 end
459 }
460 end
461
462 def self.user_cap_identities
463 [{category: 'client', type: 'sms'}]
464 end
465
466 # TODO: must re-add stuff so can do ad-hoc commands
467 def self.user_cap_features
468 [
469 "urn:xmpp:receipts",
470 ]
471 end
472
473 def self.add_gateway_feature(feature)
474 @gateway_features << feature
475 @gateway_features.uniq!
476 end
477
478 subscription :request? do |p|
479 puts "PRESENCE1: #{p.inspect}"
480
481 # subscriptions are allowed from anyone - send reply immediately
482 msg = Blather::Stanza::Presence.new
483 msg.to = p.from
484 msg.from = p.to
485 msg.type = :subscribed
486
487 puts 'RESPONSE5a: ' + msg.inspect
488 write_to_stream msg
489
490 # send a <presence> immediately; not automatically probed for it
491 # TODO: refactor so no "presence :probe? do |p|" duplicate below
492 caps = Blather::Stanza::Capabilities.new
493 # TODO: user a better node URI (?)
494 caps.node = 'http://catapult.sgx.soprani.ca/'
495 caps.identities = user_cap_identities
496 caps.features = user_cap_features
497
498 msg = caps.c
499 msg.to = p.from
500 msg.from = p.to.to_s + '/sgx'
501
502 puts 'RESPONSE5b: ' + msg.inspect
503 write_to_stream msg
504
505 # need to subscribe back so Conversations displays images inline
506 msg = Blather::Stanza::Presence.new
507 msg.to = p.from.to_s.split('/', 2)[0]
508 msg.from = p.to.to_s.split('/', 2)[0]
509 msg.type = :subscribe
510
511 puts 'RESPONSE5c: ' + msg.inspect
512 write_to_stream msg
513 end
514
515 presence :probe? do |p|
516 puts 'PRESENCE2: ' + p.inspect
517
518 caps = Blather::Stanza::Capabilities.new
519 # TODO: user a better node URI (?)
520 caps.node = 'http://catapult.sgx.soprani.ca/'
521 caps.identities = user_cap_identities
522 caps.features = user_cap_features
523
524 msg = caps.c
525 msg.to = p.from
526 msg.from = p.to.to_s + '/sgx'
527
528 puts 'RESPONSE6: ' + msg.inspect
529 write_to_stream msg
530 end
531
532 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#info' do |i|
533 # respond to capabilities request for an sgx-bwmsgsv2 number JID
534 if i.to.node
535 # TODO: confirm the node URL is expected using below
536 #puts "XR[node]: #{xpath_result[0]['node']}"
537
538 msg = i.reply
539 msg.node = i.node
540 msg.identities = user_cap_identities
541 msg.features = user_cap_features
542
543 puts 'RESPONSE7: ' + msg.inspect
544 write_to_stream msg
545 next
546 end
547
548 # respond to capabilities request for sgx-bwmsgsv2 itself
549 msg = i.reply
550 msg.node = i.node
551 msg.identities = [{
552 name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
553 type: 'sms', category: 'gateway'
554 }]
555 msg.features = @gateway_features
556 write_to_stream msg
557 end
558
559 def self.check_then_register(i, *creds)
560 jid_key = "catapult_jid-#{creds.last}"
561 bare_jid = i.from.stripped
562 cred_key = "catapult_cred-#{bare_jid}"
563
564 REDIS.get(jid_key).then { |existing_jid|
565 if existing_jid && existing_jid != bare_jid
566 # TODO: add/log text: credentials exist already
567 EMPromise.reject([:cancel, 'conflict'])
568 end
569 }.then {
570 REDIS.lrange(cred_key, 0, 3)
571 }.then { |existing_creds|
572 # TODO: add/log text: credentials exist already
573 if existing_creds.length == 4 && creds != existing_creds
574 EMPromise.reject([:cancel, 'conflict'])
575 elsif existing_creds.length < 4
576 REDIS.rpush(cred_key, *creds).then { |length|
577 if length != 4
578 EMPromise.reject([
579 :cancel,
580 'internal-server-error'
581 ])
582 end
583 }
584 end
585 }.then {
586 # not necessary if existing_jid non-nil, easier this way
587 REDIS.set(jid_key, bare_jid)
588 }.then { |result|
589 if result != 'OK'
590 # TODO: add txt re push failure
591 EMPromise.reject(
592 [:cancel, 'internal-server-error']
593 )
594 end
595 }.then {
596 write_to_stream i.reply
597 }
598 end
599
600 def self.creds_from_registration_query(qn)
601 xn = qn.children.find { |v| v.element_name == "x" }
602
603 if xn
604 xn.children.each_with_object({}) do |field, h|
605 next if field.element_name != "field"
606 val = field.children.find { |v|
607 v.element_name == "value"
608 }
609
610 case field['var']
611 when 'nick'
612 h[:user_id] = val.text
613 when 'username'
614 h[:api_token] = val.text
615 when 'password'
616 h[:api_secret] = val.text
617 when 'phone'
618 h[:phone_num] = val.text
619 else
620 # TODO: error
621 puts "?: #{field['var']}"
622 end
623 end
624 else
625 qn.children.each_with_object({}) do |field, h|
626 case field.element_name
627 when "nick"
628 h[:user_id] = field.text
629 when "username"
630 h[:api_token] = field.text
631 when "password"
632 h[:api_secret] = field.text
633 when "phone"
634 h[:phone_num] = field.text
635 end
636 end
637 end.values_at(:user_id, :api_token, :api_secret, :phone_num)
638 end
639
640 def self.process_registration(i, qn)
641 EMPromise.resolve(
642 qn.children.find { |v| v.element_name == "remove" }
643 ).then { |rn|
644 if rn
645 puts "received <remove/> - ignoring for now..."
646 EMPromise.reject(:done)
647 else
648 creds_from_registration_query(qn)
649 end
650 }.then { |user_id, api_token, api_secret, phone_num|
651 if phone_num[0] == '+'
652 [user_id, api_token, api_secret, phone_num]
653 else
654 # TODO: add text re number not (yet) supported
655 EMPromise.reject([:cancel, 'item-not-found'])
656 end
657 }.then { |user_id, api_token, api_secret, phone_num|
658 # TODO: find way to verify #{phone_num}, too
659 call_catapult(
660 api_token,
661 api_secret,
662 :get,
663 "api/v2/users/#{user_id}/media"
664 ).then { |response|
665 params = JSON.parse(response)
666 # TODO: confirm params is array - could be empty
667
668 puts "register got str #{response.to_s[0..999]}"
669
670 check_then_register(
671 i,
672 user_id,
673 api_token,
674 api_secret,
675 phone_num
676 )
677 }
678 }.catch { |e|
679 EMPromise.reject(case e
680 when 401
681 # TODO: add text re bad credentials
682 [:auth, 'not-authorized']
683 when 404
684 # TODO: add text re number not found or disabled
685 [:cancel, 'item-not-found']
686 when Integer
687 [:modify, 'not-acceptable']
688 else
689 e
690 end)
691 }
692 end
693
694 def self.registration_form(orig, existing_number=nil)
695 msg = Nokogiri::XML::Node.new 'query', orig.document
696 msg['xmlns'] = 'jabber:iq:register'
697
698 if existing_number
699 msg.add_child(
700 Nokogiri::XML::Node.new(
701 'registered', msg.document
702 )
703 )
704 end
705
706 # TODO: update "User Id" x2 below (to "accountId"?), and others?
707 n1 = Nokogiri::XML::Node.new(
708 'instructions', msg.document
709 )
710 n1.content = "Enter the information from your Account "\
711 "page as well as the Phone Number\nin your "\
712 "account you want to use (ie. '+12345678901')"\
713 ".\nUser Id is nick, API Token is username, "\
714 "API Secret is password, Phone Number is phone"\
715 ".\n\nThe source code for this gateway is at "\
716 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
717 "\nCopyright (C) 2017-2020 Denver Gingerich "\
718 "and others, licensed under AGPLv3+."
719 n2 = Nokogiri::XML::Node.new 'nick', msg.document
720 n3 = Nokogiri::XML::Node.new 'username', msg.document
721 n4 = Nokogiri::XML::Node.new 'password', msg.document
722 n5 = Nokogiri::XML::Node.new 'phone', msg.document
723 n5.content = existing_number.to_s
724 msg.add_child(n1)
725 msg.add_child(n2)
726 msg.add_child(n3)
727 msg.add_child(n4)
728 msg.add_child(n5)
729
730 x = Blather::Stanza::X.new :form, [
731 {
732 required: true, type: :"text-single",
733 label: 'User Id', var: 'nick'
734 },
735 {
736 required: true, type: :"text-single",
737 label: 'API Token', var: 'username'
738 },
739 {
740 required: true, type: :"text-private",
741 label: 'API Secret', var: 'password'
742 },
743 {
744 required: true, type: :"text-single",
745 label: 'Phone Number', var: 'phone',
746 value: existing_number.to_s
747 }
748 ]
749 x.title = 'Register for '\
750 'Soprani.ca Gateway to XMPP - Bandwidth API V2'
751 x.instructions = "Enter the details from your Account "\
752 "page as well as the Phone Number\nin your "\
753 "account you want to use (ie. '+12345678901')"\
754 ".\n\nThe source code for this gateway is at "\
755 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
756 "\nCopyright (C) 2017-2020 Denver Gingerich "\
757 "and others, licensed under AGPLv3+."
758 msg.add_child(x)
759
760 orig.add_child(msg)
761
762 return orig
763 end
764
765 iq '/iq/ns:query', ns: 'jabber:iq:register' do |i, qn|
766 puts "IQ: #{i.inspect}"
767
768 case i.type
769 when :set
770 process_registration(i, qn)
771 when :get
772 bare_jid = i.from.stripped
773 cred_key = "catapult_cred-#{bare_jid}"
774 REDIS.lindex(cred_key, 3).then { |existing_number|
775 reply = registration_form(i.reply, existing_number)
776 puts "RESPONSE2: #{reply.inspect}"
777 write_to_stream reply
778 }
779 else
780 # Unknown IQ, ignore for now
781 EMPromise.reject(:done)
782 end.catch { |e|
783 if e.is_a?(Array) && e.length == 2
784 write_to_stream error_msg(i.reply, qn, *e)
785 elsif e != :done
786 EMPromise.reject(e)
787 end
788 }.catch(&method(:panic))
789 end
790
791 iq :get? do |i|
792 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
793 end
794
795 iq :set? do |i|
796 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
797 end
798end
799
800class ReceiptMessage < Blather::Stanza
801 def self.new(to=nil)
802 node = super :message
803 node.to = to
804 node
805 end
806end
807
808class WebhookHandler < Goliath::API
809 use Goliath::Rack::Params
810
811 def response(env)
812 # TODO: add timestamp grab here, and MUST include ./tai version
813
814 puts 'ENV: ' + env.reject{ |k| k == 'params' }.to_s
815
816 # TODO: process each message in list, not just first one
817 jparams = params['_json'][0]['message']
818
819 type = params['_json'][0]['type']
820
821 users_num = ''
822 others_num = ''
823 if jparams['direction'] == 'in'
824 users_num = jparams['owner']
825 others_num = jparams['from']
826 elsif jparams['direction'] == 'out'
827 users_num = jparams['from']
828 others_num = jparams['owner']
829 else
830 # TODO: exception or similar
831 puts "big prob: '" + jparams['direction'] + "'" + body
832 return [200, {}, "OK"]
833 end
834
835 puts 'BODY - messageId: ' + jparams['id'] +
836 ', eventType: ' + type +
837 ', time: ' + jparams['time'] +
838 ', direction: ' + jparams['direction'] +
839 #', state: ' + jparams['state'] +
840 ', deliveryState: ' + (jparams['deliveryState'] ?
841 jparams['deliveryState'] : 'NONE') +
842 ', deliveryCode: ' + (jparams['deliveryCode'] ?
843 jparams['deliveryCode'] : 'NONE') +
844 ', deliveryDesc: ' + (jparams['deliveryDescription'] ?
845 jparams['deliveryDescription'] : 'NONE') +
846 ', tag: ' + (jparams['tag'] ? jparams['tag'] : 'NONE') +
847 ', media: ' + (jparams['media'] ?
848 jparams['media'].to_s : 'NONE')
849
850 if others_num[0] != '+'
851 # TODO: check that others_num actually a shortcode first
852 others_num +=
853 ';phone-context=ca-us.phone-context.soprani.ca'
854 end
855
856 jid_key = "catapult_jid-#{users_num}"
857 bare_jid = REDIS.get(jid_key).promise.sync
858
859 if !bare_jid
860 puts "jid_key (#{jid_key}) DNE; BW API misconfigured?"
861
862 # TODO: likely not appropriate; give error to BW API?
863 # TODO: add text re credentials not being registered
864 #write_to_stream error_msg(m.reply, m.body, :auth,
865 # 'registration-required')
866 return [200, {}, "OK"]
867 end
868
869 msg = nil
870 case jparams['direction']
871 when 'in'
872 text = ''
873 case type
874 when 'sms'
875 text = jparams['text']
876 when 'mms'
877 has_media = false
878
879 if jparams['text'].empty?
880 if not has_media
881 text = '[suspected group msg '\
882 'with no text (odd)]'
883 end
884 else
885 text = if has_media
886 # TODO: write/use a caption XEP
887 jparams['text']
888 else
889 '[suspected group msg '\
890 '(recipient list not '\
891 'available) with '\
892 'following text] ' +
893 jparams['text']
894 end
895 end
896
897 # ie. if text param non-empty or had no media
898 if not text.empty?
899 msg = Blather::Stanza::Message.new(
900 bare_jid, text)
901 msg.from = others_num + '@' + ARGV[0]
902 SGXbwmsgsv2.write(msg)
903 end
904
905 return [200, {}, "OK"]
906 when 'message-received'
907 # TODO: handle group chat, and fix above
908 text = jparams['text']
909
910 if jparams['to'].length > 1
911 msg = Blather::Stanza::Message.new(
912 'cheogram.com', text) # TODO
913
914 addrs = Nokogiri::XML::Node.new(
915 'addresses', msg.document)
916 addrs['xmlns'] = 'http://jabber.org/' +
917 'protocol/address'
918
919 addr1 = Nokogiri::XML::Node.new(
920 'address', msg.document)
921 addr1['type'] = 'to'
922 addr1['jid'] = bare_jid
923 addrs.add_child(addr1)
924
925 jparams['to'].each do |receiver|
926 if receiver == users_num
927 # already there in addr1
928 next
929 end
930
931 addrn = Nokogiri::XML::Node.new(
932 'address', msg.document)
933 addrn['type'] = 'to'
934 addrn['uri'] = "sms:#{receiver}"
935 addrn['delivered'] = 'true'
936 addrs.add_child(addrn)
937 end
938
939 msg.add_child(addrs)
940
941 # TODO: delete
942 puts "RESPONSE9: #{msg.inspect}"
943 end
944
945 jparams['media'].each do |media_url|
946 if not media_url.end_with?(
947 '.smil', '.txt', '.xml'
948 )
949
950 has_media = true
951 SGXbwmsgsv2.send_media(
952 others_num + '@' +
953 ARGV[0],
954 bare_jid, media_url,
955 nil, nil, msg
956 )
957 end
958 end unless not jparams['media']
959 else
960 text = "unknown type (#{type})"\
961 " with text: " + jparams['text']
962
963 # TODO: log/notify of this properly
964 puts text
965 end
966
967 if not msg
968 msg = Blather::Stanza::Message.new(bare_jid,
969 text)
970 end
971 else # per prior switch, this is: jparams['direction'] == 'out'
972 tag_parts = jparams['tag'].split(/ /, 2)
973 id = WEBrick::HTTPUtils.unescape(tag_parts[0])
974 resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
975
976 # TODO: remove this hack
977 if jparams['to'].length > 1
978 puts "WARN! group no rcpt: #{users_num}"
979 return [200, {}, "OK"]
980 end
981
982 case type
983 when 'message-failed'
984 # create a bare message like the one user sent
985 msg = Blather::Stanza::Message.new(
986 others_num + '@' + ARGV[0])
987 msg.from = bare_jid + '/' + resourcepart
988 msg['id'] = id
989
990 # TODO: add 'errorCode' and/or 'description' val
991 # create an error reply to the bare message
992 msg = Blather::StanzaError.new(
993 msg,
994 'recipient-unavailable',
995 :wait
996 ).to_node
997
998 # TODO: make prettier: this should be done above
999 others_num = params['_json'][0]['to']
1000 when 'message-delivered'
1001
1002 msg = ReceiptMessage.new(bare_jid)
1003
1004 # TODO: put in member/instance variable
1005 msg['id'] = SecureRandom.uuid
1006
1007 # TODO: send only when requested per XEP-0184
1008 rcvd = Nokogiri::XML::Node.new(
1009 'received',
1010 msg.document
1011 )
1012 rcvd['xmlns'] = 'urn:xmpp:receipts'
1013 rcvd['id'] = id
1014 msg.add_child(rcvd)
1015
1016 # TODO: make prettier: this should be done above
1017 others_num = params['_json'][0]['to']
1018 else
1019 # TODO: notify somehow of unknown state receivd?
1020 puts "message with id #{id} has "\
1021 "other type #{type}"
1022 return [200, {}, "OK"]
1023 end
1024
1025 puts "RESPONSE4: #{msg.inspect}"
1026 end
1027
1028 msg.from = others_num + '@' + ARGV[0]
1029 SGXbwmsgsv2.write(msg)
1030
1031 [200, {}, "OK"]
1032
1033 rescue Exception => e
1034 puts 'Shutting down gateway due to exception 013: ' + e.message
1035 SGXbwmsgsv2.shutdown
1036 puts 'Gateway has terminated.'
1037 EM.stop
1038 end
1039end
1040
1041at_exit do
1042 $stdout.sync = true
1043
1044 puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\
1045 "==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
1046
1047 if ARGV.size != 7 and ARGV.size != 8
1048 puts "Usage: sgx-bwmsgsv2.rb <component_jid> "\
1049 "<component_password> <server_hostname> "\
1050 "<server_port> <application_id> "\
1051 "<http_listen_port> <mms_proxy_prefix_url> [V1_creds_file]"
1052 exit 0
1053 end
1054
1055 t = Time.now
1056 puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
1057
1058 EM.run do
1059 REDIS = EM::Hiredis.connect
1060
1061 SGXbwmsgsv2.run
1062
1063 # required when using Prosody otherwise disconnects on 6-hour inactivity
1064 EM.add_periodic_timer(3600) do
1065 msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1066 msg.from = ARGV[0]
1067 SGXbwmsgsv2.write(msg)
1068 end
1069
1070 server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
1071 server.api = WebhookHandler.new
1072 server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1073 server.logger = Log4r::Logger.new('goliath')
1074 server.logger.add(Log4r::StdoutOutputter.new('console'))
1075 server.logger.level = Log4r::INFO
1076 server.start do
1077 ["INT", "TERM"].each do |sig|
1078 trap(sig) do
1079 EM.defer do
1080 puts 'Shutting down gateway...'
1081 SGXbwmsgsv2.shutdown
1082
1083 puts 'Gateway has terminated.'
1084 EM.stop
1085 end
1086 end
1087 end
1088 end
1089 end
1090end