1#!/usr/bin/env ruby
2#
3# Copyright (C) 2017-2020 Denver Gingerich <denver@ossguy.com>
4# Copyright (C) 2017 Stephen Paul Weber <singpolyma@singpolyma.net>
5#
6# This file is part of sgx-bwmsgsv2.
7#
8# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
9# the terms of the GNU Affero General Public License as published by the Free
10# Software Foundation, either version 3 of the License, or (at your option) any
11# later version.
12#
13# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
14# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
16# details.
17#
18# You should have received a copy of the GNU Affero General Public License along
19# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
20
21require 'blather/client/dsl'
22require 'em-hiredis'
23require 'em-http-request'
24require 'json'
25require 'securerandom'
26require 'time'
27require 'uri'
28require 'webrick'
29
30require 'goliath/api'
31require 'goliath/server'
32require 'log4r'
33
34require_relative 'em_promise'
35
36def panic(e)
37 puts "Shutting down gateway due to exception: #{e.message}"
38 puts e.backtrace
39 SGXbwmsgsv2.shutdown
40 puts 'Gateway has terminated.'
41 EM.stop
42end
43
44def extract_shortcode(dest)
45 num, context = dest.split(';', 2)
46 num if context && context == 'phone-context=ca-us.phone-context.soprani.ca'
47end
48
49def is_anonymous_tel?(dest)
50 num, context = dest.split(';', 2)
51 context && context == 'phone-context=anonymous.phone-context.soprani.ca'
52end
53
54class SGXClient < Blather::Client
55 def register_handler(type, *guards, &block)
56 super(type, *guards) { |*args| wrap_handler(*args, &block) }
57 end
58
59 def register_handler_before(type, *guards, &block)
60 check_handler(type, guards)
61 handler = lambda { |*args| wrap_handler(*args, &block) }
62
63 @handlers[type] ||= []
64 @handlers[type].unshift([guards, handler])
65 end
66
67protected
68
69 def wrap_handler(*args, &block)
70 v = block.call(*args)
71 v.catch(&method(:panic)) if v.is_a?(Promise)
72 true # Do not run other handlers unless throw :pass
73 rescue Exception => e
74 panic(e)
75 end
76end
77
78# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
79module CatapultSettingFlagBits
80 VOICEMAIL_TRANSCRIPTION_DISABLED = 0
81 MMS_ON_OOB_URL = 1
82end
83
84module SGXbwmsgsv2
85 extend Blather::DSL
86
87 @client = SGXClient.new
88 @gateway_features = [
89 "http://jabber.org/protocol/disco#info",
90 "http://jabber.org/protocol/address/",
91 "jabber:iq:register"
92 ]
93
94 def self.run
95 # TODO: read/save ARGV[7] creds to local variables
96 client.run
97 end
98
99 # so classes outside this module can write messages, too
100 def self.write(stanza)
101 client.write(stanza)
102 end
103
104 def self.before_handler(type, *guards, &block)
105 client.register_handler_before(type, *guards, &block)
106 end
107
108 def self.send_media(from, to, media_url, desc=nil, subject=nil, m=nil)
109 # we assume media_url is of the form (always the case so far):
110 # https://messaging.bandwidth.com/api/v2/users/[u]/media/[path]
111
112 # the caller must guarantee that 'to' is a bare JID
113 proxy_url = ARGV[6] + WEBrick::HTTPUtils.escape(to) + '/' +
114 media_url.split('/', 9)[8]
115
116 puts 'ORIG_URL: ' + media_url
117 puts 'PROX_URL: ' + proxy_url
118
119 # put URL in the body (so Conversations will still see it)...
120 msg = Blather::Stanza::Message.new(to, proxy_url)
121 if m
122 msg = m.copy
123 msg.body = proxy_url
124 end
125 msg.from = from
126 msg.subject = subject if subject
127
128 # ...but also provide URL in XEP-0066 (OOB) fashion
129 # TODO: confirm client supports OOB or don't send this
130 x = Nokogiri::XML::Node.new 'x', msg.document
131 x['xmlns'] = 'jabber:x:oob'
132
133 urln = Nokogiri::XML::Node.new 'url', msg.document
134 urlc = Nokogiri::XML::Text.new proxy_url, msg.document
135 urln.add_child(urlc)
136 x.add_child(urln)
137
138 if desc
139 descn = Nokogiri::XML::Node.new('desc', msg.document)
140 descc = Nokogiri::XML::Text.new(desc, msg.document)
141 descn.add_child(descc)
142 x.add_child(descn)
143 end
144
145 msg.add_child(x)
146
147 write(msg)
148 rescue Exception => e
149 panic(e)
150 end
151
152 def self.error_msg(orig, query_node, type, name, text=nil)
153 orig.type = :error
154
155 error = Nokogiri::XML::Node.new 'error', orig.document
156 error['type'] = type
157 orig.add_child(error)
158
159 suberr = Nokogiri::XML::Node.new name, orig.document
160 suberr['xmlns'] = 'urn:ietf:params:xml:ns:xmpp-stanzas'
161 error.add_child(suberr)
162
163 orig.add_child(query_node) if query_node
164
165 # TODO: add some explanatory xml:lang='en' text (see text param)
166 puts "RESPONSE3: #{orig.inspect}"
167 return orig
168 end
169
170 # workqueue_count MUST be 0 or else Blather uses threads!
171 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, workqueue_count: 0
172
173 def self.pass_on_message(m, users_num, jid)
174 # setup delivery receipt; similar to a reply
175 rcpt = ReceiptMessage.new(m.from.stripped)
176 rcpt.from = m.to
177
178 # pass original message (before sending receipt)
179 m.to = jid
180 m.from = "#{users_num}@#{ARGV[0]}"
181
182 puts 'XRESPONSE0: ' + m.inspect
183 write_to_stream m
184
185 # send a delivery receipt back to the sender
186 # TODO: send only when requested per XEP-0184
187 # TODO: pass receipts from target if supported
188
189 # TODO: put in member/instance variable
190 rcpt['id'] = SecureRandom.uuid
191 rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
192 rcvd['xmlns'] = 'urn:xmpp:receipts'
193 rcvd['id'] = m.id
194 rcpt.add_child(rcvd)
195
196 puts 'XRESPONSE1: ' + rcpt.inspect
197 write_to_stream rcpt
198 end
199
200 def self.call_catapult(
201 token, secret, m, pth, body=nil,
202 head={}, code=[200], respond_with=:body
203 )
204 # pth looks like one of:
205 # "api/v2/users/#{user_id}/[endpoint_name]"
206 # "v1/users/#{user_id}/[endpoint_name]"
207
208 url_prefix = ''
209
210 # TODO: need to make a separate thing for voice.bw.c eventually
211 if pth.start_with? 'api/v2/users'
212 url_prefix = 'https://messaging.bandwidth.com/'
213 elsif pth.start_with? 'v1/users'
214 # begin hack for running V2 messages along with V1 voice
215 url_prefix = 'https://api.catapult.inetwork.com/'
216 # TODO: set token and secret to vals provided at startup
217 # TODO: replace pth's user_id with user_id from ARGV[7]
218 # -> pth = "v1/users/#{new_id}/" + pth.split('/', 4)[3]
219 else
220 # TODO: error
221 end
222
223 EM::HttpRequest.new(
224 url_prefix + pth
225 ).public_send(
226 m,
227 head: {
228 'Authorization' => [token, secret]
229 }.merge(head),
230 body: body
231 ).then { |http|
232 puts "API response to send: #{http.response} with code"\
233 " response.code #{http.response_header.status}"
234
235 if code.include?(http.response_header.status)
236 case respond_with
237 when :body
238 http.response
239 when :headers
240 http.response_header
241 else
242 http
243 end
244 else
245 EMPromise.reject(http.response_header.status)
246 end
247 }
248 end
249
250 def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
251 usern)
252
253 xn = s.children.find { |v| v.element_name == "x" }
254 if not xn
255 to_catapult(s, nil, num_dest, user_id, token, secret,
256 usern)
257 return
258 end
259 puts "MMSOOB: found an x node - checking for url node..."
260
261 # TODO: also check for xmlns='jabber:x:oob' in <x/> - the below
262 # is probably fine, though, as non-OOB <x><url/></x> unlikely
263
264 un = xn.children.find { |v| v.element_name == "url" }
265 if not un
266 puts "MMSOOB: no url node found so process as normal"
267 to_catapult(s, nil, num_dest, user_id, token, secret,
268 usern)
269 return
270 end
271 puts "MMSOOB: found a url node - checking if to make MMS..."
272
273 # TODO: always enable MMS on OOB URL for V2? need 2 inform users
274 REDIS.getbit("catapult_setting_flags-#{s.from.stripped}",
275 CatapultSettingFlagBits::MMS_ON_OOB_URL).then { |oob_on|
276
277 puts "MMSOOB: found MMS_ON_OOB_URL value is '#{oob_on}'"
278 if 0 == oob_on
279 puts "MMSOOB: MMS_ON_OOB_URL off so no MMS send"
280 to_catapult(s, nil, num_dest, user_id, token,
281 secret, usern)
282 next
283 end
284
285 # TODO: check size of file at un.text and shrink if need
286
287 body = s.respond_to?(:body) ? s.body : ''
288 puts "MMSOOB: url text is '#{un.text}'"
289 puts "MMSOOB: the body is '#{body.to_s.strip}'"
290
291 # some clients send URI in both body & <url/> so delete
292 if un.text == body.to_s.strip
293 puts "MMSOOB: url matches body so deleting body"
294 s.body = ''
295 end
296
297 puts "MMSOOB: sending MMS since found OOB & user asked"
298 to_catapult(s, un.text, num_dest, user_id, token,
299 secret, usern)
300 }
301 end
302
303 def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
304 body = s.respond_to?(:body) ? s.body : ''
305 if murl.to_s.empty? && body.to_s.strip.empty?
306 return EMPromise.reject(
307 [:modify, 'policy-violation']
308 )
309 end
310
311 extra = {}
312 if murl
313 extra = { media: murl }
314 end
315
316 call_catapult(
317 token,
318 secret,
319 :post,
320 "api/v2/users/#{user_id}/messages",
321 JSON.dump(extra.merge(
322 from: usern,
323 to: num_dest,
324 text: body,
325 applicationId: ARGV[4],
326 tag:
327 # callbacks need id and resourcepart
328 WEBrick::HTTPUtils.escape(s.id.to_s) +
329 ' ' +
330 WEBrick::HTTPUtils.escape(
331 s.from.resource.to_s
332 )
333 )),
334 {'Content-Type' => 'application/json'},
335 [201]
336 ).catch {
337 # TODO: add text; mention code number
338 EMPromise.reject(
339 [:cancel, 'internal-server-error']
340 )
341 }
342
343 t = Time.now
344 tai_timestamp = `./tai`.strip
345 tai_yyyymmdd = Time.at(tai_timestamp.to_i).strftime('%Y%m%d')
346 puts "SMU %d.%09d, %s: msg for %s sent on %s - incrementing\n" %
347 [t.to_i, t.nsec, tai_timestamp, usern, tai_yyyymmdd]
348
349 REDIS.incr('usage_messages-' + tai_yyyymmdd + '-' +
350 usern).then { |total|
351
352 t = Time.now
353 puts "SMU %d.%09d: total msgs for %s-%s now at %s\n" %
354 [t.to_i, t.nsec, tai_yyyymmdd, usern, total]
355 }
356 end
357
358 def self.validate_num(m)
359 # if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
360 if m.to == ARGV[0]
361 an = m.children.find { |v| v.element_name == "addresses"
362 }
363 if not an
364 return EMPromise.reject([:cancel,
365 'item-not-found'])
366 end
367 puts "ADRXEP: found an addresses node - iterate addrs.."
368
369 nums = []
370 an.children.each do |e|
371 num = ''
372 type = ''
373 e.attributes.each do |c|
374 if c[0] == 'type'
375 if c[1] != 'to'
376 # TODO: error
377 end
378 type = c[1].to_s
379 elsif c[0] == 'uri'
380 if !c[1].to_s.start_with? 'sms:'
381 # TODO: error
382 end
383 num = c[1].to_s[4..-1]
384 # TODO: confirm num validates
385 else
386 # TODO: error - unexpected name
387 end
388 end
389 if num.empty? or type.empty?
390 # TODO: error
391 end
392 nums << num
393 end
394 return nums
395 end
396
397 # if not sent to SGX domain, then assume destination is in 'to'
398 EMPromise.resolve(m.to.node.to_s).then { |num_dest|
399 if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
400 next num_dest if num_dest[0] == '+'
401 shortcode = extract_shortcode(num_dest)
402 next shortcode if shortcode
403 end
404
405 if is_anonymous_tel?(num_dest)
406 EMPromise.reject([:cancel, 'gone'])
407 else
408 # TODO: text re num not (yet) supportd/implmentd
409 EMPromise.reject([:cancel, 'item-not-found'])
410 end
411 }
412 end
413
414 def self.fetch_catapult_cred_for(jid)
415 cred_key = "catapult_cred-#{jid.stripped}"
416 REDIS.lrange(cred_key, 0, 3).then { |creds|
417 if creds.length < 4
418 # TODO: add text re credentials not registered
419 EMPromise.reject(
420 [:auth, 'registration-required']
421 )
422 else
423 creds
424 end
425 }
426 end
427
428 message :error? do |m|
429 # TODO: report it somewhere/somehow - eat for now so no err loop
430 puts "EATERROR1: #{m.inspect}"
431 end
432
433 message :body do |m|
434 EMPromise.all([
435 validate_num(m),
436 fetch_catapult_cred_for(m.from)
437 ]).then { |(num_dest, creds)|
438 jid_key = "catapult_jid-#{num_dest}"
439 REDIS.get(jid_key).then { |jid|
440 [jid, num_dest] + creds
441 }
442 }.then { |(jid, num_dest, *creds)|
443 # if destination user is in the system pass on directly
444 if jid
445 pass_on_message(m, creds.last, jid)
446 else
447 to_catapult_possible_oob(m, num_dest, *creds)
448 end
449 }.catch { |e|
450 if e.is_a?(Array) && e.length == 2
451 write_to_stream error_msg(m.reply, m.body, *e)
452 else
453 EMPromise.reject(e)
454 end
455 }
456 end
457
458 def self.user_cap_identities
459 [{category: 'client', type: 'sms'}]
460 end
461
462 # TODO: must re-add stuff so can do ad-hoc commands
463 def self.user_cap_features
464 [
465 "urn:xmpp:receipts",
466 ]
467 end
468
469 def self.add_gateway_feature(feature)
470 @gateway_features << feature
471 @gateway_features.uniq!
472 end
473
474 subscription :request? do |p|
475 puts "PRESENCE1: #{p.inspect}"
476
477 # subscriptions are allowed from anyone - send reply immediately
478 msg = Blather::Stanza::Presence.new
479 msg.to = p.from
480 msg.from = p.to
481 msg.type = :subscribed
482
483 puts 'RESPONSE5a: ' + msg.inspect
484 write_to_stream msg
485
486 # send a <presence> immediately; not automatically probed for it
487 # TODO: refactor so no "presence :probe? do |p|" duplicate below
488 caps = Blather::Stanza::Capabilities.new
489 # TODO: user a better node URI (?)
490 caps.node = 'http://catapult.sgx.soprani.ca/'
491 caps.identities = user_cap_identities
492 caps.features = user_cap_features
493
494 msg = caps.c
495 msg.to = p.from
496 msg.from = p.to.to_s + '/sgx'
497
498 puts 'RESPONSE5b: ' + msg.inspect
499 write_to_stream msg
500
501 # need to subscribe back so Conversations displays images inline
502 msg = Blather::Stanza::Presence.new
503 msg.to = p.from.to_s.split('/', 2)[0]
504 msg.from = p.to.to_s.split('/', 2)[0]
505 msg.type = :subscribe
506
507 puts 'RESPONSE5c: ' + msg.inspect
508 write_to_stream msg
509 end
510
511 presence :probe? do |p|
512 puts 'PRESENCE2: ' + p.inspect
513
514 caps = Blather::Stanza::Capabilities.new
515 # TODO: user a better node URI (?)
516 caps.node = 'http://catapult.sgx.soprani.ca/'
517 caps.identities = user_cap_identities
518 caps.features = user_cap_features
519
520 msg = caps.c
521 msg.to = p.from
522 msg.from = p.to.to_s + '/sgx'
523
524 puts 'RESPONSE6: ' + msg.inspect
525 write_to_stream msg
526 end
527
528 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#info' do |i|
529 # respond to capabilities request for an sgx-bwmsgsv2 number JID
530 if i.to.node
531 # TODO: confirm the node URL is expected using below
532 #puts "XR[node]: #{xpath_result[0]['node']}"
533
534 msg = i.reply
535 msg.node = i.node
536 msg.identities = user_cap_identities
537 msg.features = user_cap_features
538
539 puts 'RESPONSE7: ' + msg.inspect
540 write_to_stream msg
541 next
542 end
543
544 # respond to capabilities request for sgx-bwmsgsv2 itself
545 msg = i.reply
546 msg.node = i.node
547 msg.identities = [{
548 name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
549 type: 'sms', category: 'gateway'
550 }]
551 msg.features = @gateway_features
552 write_to_stream msg
553 end
554
555 def self.check_then_register(i, *creds)
556 jid_key = "catapult_jid-#{creds.last}"
557 bare_jid = i.from.stripped
558 cred_key = "catapult_cred-#{bare_jid}"
559
560 REDIS.get(jid_key).then { |existing_jid|
561 if existing_jid && existing_jid != bare_jid
562 # TODO: add/log text: credentials exist already
563 EMPromise.reject([:cancel, 'conflict'])
564 end
565 }.then {
566 REDIS.lrange(cred_key, 0, 3)
567 }.then { |existing_creds|
568 # TODO: add/log text: credentials exist already
569 if existing_creds.length == 4 && creds != existing_creds
570 EMPromise.reject([:cancel, 'conflict'])
571 elsif existing_creds.length < 4
572 REDIS.rpush(cred_key, *creds).then { |length|
573 if length != 4
574 EMPromise.reject([
575 :cancel,
576 'internal-server-error'
577 ])
578 end
579 }
580 end
581 }.then {
582 # not necessary if existing_jid non-nil, easier this way
583 REDIS.set(jid_key, bare_jid)
584 }.then { |result|
585 if result != 'OK'
586 # TODO: add txt re push failure
587 EMPromise.reject(
588 [:cancel, 'internal-server-error']
589 )
590 end
591 }.then {
592 write_to_stream i.reply
593 }
594 end
595
596 def self.creds_from_registration_query(qn)
597 xn = qn.children.find { |v| v.element_name == "x" }
598
599 if xn
600 xn.children.each_with_object({}) do |field, h|
601 next if field.element_name != "field"
602 val = field.children.find { |v|
603 v.element_name == "value"
604 }
605
606 case field['var']
607 when 'nick'
608 h[:user_id] = val.text
609 when 'username'
610 h[:api_token] = val.text
611 when 'password'
612 h[:api_secret] = val.text
613 when 'phone'
614 h[:phone_num] = val.text
615 else
616 # TODO: error
617 puts "?: #{field['var']}"
618 end
619 end
620 else
621 qn.children.each_with_object({}) do |field, h|
622 case field.element_name
623 when "nick"
624 h[:user_id] = field.text
625 when "username"
626 h[:api_token] = field.text
627 when "password"
628 h[:api_secret] = field.text
629 when "phone"
630 h[:phone_num] = field.text
631 end
632 end
633 end.values_at(:user_id, :api_token, :api_secret, :phone_num)
634 end
635
636 def self.process_registration(i, qn)
637 EMPromise.resolve(
638 qn.children.find { |v| v.element_name == "remove" }
639 ).then { |rn|
640 if rn
641 puts "received <remove/> - ignoring for now..."
642 EMPromise.reject(:done)
643 else
644 creds_from_registration_query(qn)
645 end
646 }.then { |user_id, api_token, api_secret, phone_num|
647 if phone_num[0] == '+'
648 [user_id, api_token, api_secret, phone_num]
649 else
650 # TODO: add text re number not (yet) supported
651 EMPromise.reject([:cancel, 'item-not-found'])
652 end
653 }.then { |user_id, api_token, api_secret, phone_num|
654 # TODO: find way to verify #{phone_num}, too
655 call_catapult(
656 api_token,
657 api_secret,
658 :get,
659 "api/v2/users/#{user_id}/media"
660 ).then { |response|
661 params = JSON.parse(response)
662 # TODO: confirm params is array - could be empty
663
664 puts "register got str #{response.to_s[0..999]}"
665
666 check_then_register(
667 i,
668 user_id,
669 api_token,
670 api_secret,
671 phone_num
672 )
673 }
674 }.catch { |e|
675 EMPromise.reject(case e
676 when 401
677 # TODO: add text re bad credentials
678 [:auth, 'not-authorized']
679 when 404
680 # TODO: add text re number not found or disabled
681 [:cancel, 'item-not-found']
682 when Integer
683 [:modify, 'not-acceptable']
684 else
685 e
686 end)
687 }
688 end
689
690 def self.registration_form(orig, existing_number=nil)
691 msg = Nokogiri::XML::Node.new 'query', orig.document
692 msg['xmlns'] = 'jabber:iq:register'
693
694 if existing_number
695 msg.add_child(
696 Nokogiri::XML::Node.new(
697 'registered', msg.document
698 )
699 )
700 end
701
702 # TODO: update "User Id" x2 below (to "accountId"?), and others?
703 n1 = Nokogiri::XML::Node.new(
704 'instructions', msg.document
705 )
706 n1.content = "Enter the information from your Account "\
707 "page as well as the Phone Number\nin your "\
708 "account you want to use (ie. '+12345678901')"\
709 ".\nUser Id is nick, API Token is username, "\
710 "API Secret is password, Phone Number is phone"\
711 ".\n\nThe source code for this gateway is at "\
712 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
713 "\nCopyright (C) 2017-2020 Denver Gingerich "\
714 "and others, licensed under AGPLv3+."
715 n2 = Nokogiri::XML::Node.new 'nick', msg.document
716 n3 = Nokogiri::XML::Node.new 'username', msg.document
717 n4 = Nokogiri::XML::Node.new 'password', msg.document
718 n5 = Nokogiri::XML::Node.new 'phone', msg.document
719 n5.content = existing_number.to_s
720 msg.add_child(n1)
721 msg.add_child(n2)
722 msg.add_child(n3)
723 msg.add_child(n4)
724 msg.add_child(n5)
725
726 x = Blather::Stanza::X.new :form, [
727 {
728 required: true, type: :"text-single",
729 label: 'User Id', var: 'nick'
730 },
731 {
732 required: true, type: :"text-single",
733 label: 'API Token', var: 'username'
734 },
735 {
736 required: true, type: :"text-private",
737 label: 'API Secret', var: 'password'
738 },
739 {
740 required: true, type: :"text-single",
741 label: 'Phone Number', var: 'phone',
742 value: existing_number.to_s
743 }
744 ]
745 x.title = 'Register for '\
746 'Soprani.ca Gateway to XMPP - Bandwidth API V2'
747 x.instructions = "Enter the details from your Account "\
748 "page as well as the Phone Number\nin your "\
749 "account you want to use (ie. '+12345678901')"\
750 ".\n\nThe source code for this gateway is at "\
751 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
752 "\nCopyright (C) 2017-2020 Denver Gingerich "\
753 "and others, licensed under AGPLv3+."
754 msg.add_child(x)
755
756 orig.add_child(msg)
757
758 return orig
759 end
760
761 iq '/iq/ns:query', ns: 'jabber:iq:register' do |i, qn|
762 puts "IQ: #{i.inspect}"
763
764 case i.type
765 when :set
766 process_registration(i, qn)
767 when :get
768 bare_jid = i.from.stripped
769 cred_key = "catapult_cred-#{bare_jid}"
770 REDIS.lindex(cred_key, 3).then { |existing_number|
771 reply = registration_form(i.reply, existing_number)
772 puts "RESPONSE2: #{reply.inspect}"
773 write_to_stream reply
774 }
775 else
776 # Unknown IQ, ignore for now
777 EMPromise.reject(:done)
778 end.catch { |e|
779 if e.is_a?(Array) && e.length == 2
780 write_to_stream error_msg(i.reply, qn, *e)
781 elsif e != :done
782 EMPromise.reject(e)
783 end
784 }.catch(&method(:panic))
785 end
786
787 iq :get? do |i|
788 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
789 end
790
791 iq :set? do |i|
792 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
793 end
794end
795
796class ReceiptMessage < Blather::Stanza
797 def self.new(to=nil)
798 node = super :message
799 node.to = to
800 node
801 end
802end
803
804class WebhookHandler < Goliath::API
805 use Goliath::Rack::Params
806
807 def response(env)
808 # TODO: add timestamp grab here, and MUST include ./tai version
809
810 puts 'ENV: ' + env.reject{ |k| k == 'params' }.to_s
811
812 # TODO: process each message in list, not just first one
813 jparams = params['_json'][0]['message']
814
815 type = params['_json'][0]['type']
816
817 users_num = ''
818 others_num = ''
819 if jparams['direction'] == 'in'
820 users_num = jparams['owner']
821 others_num = jparams['from']
822 elsif jparams['direction'] == 'out'
823 users_num = jparams['from']
824 others_num = jparams['owner']
825 else
826 # TODO: exception or similar
827 puts "big prob: '" + jparams['direction'] + "'" + body
828 return [200, {}, "OK"]
829 end
830
831 puts 'BODY - messageId: ' + jparams['id'] +
832 ', eventType: ' + type +
833 ', time: ' + jparams['time'] +
834 ', direction: ' + jparams['direction'] +
835 #', state: ' + jparams['state'] +
836 ', deliveryState: ' + (jparams['deliveryState'] ?
837 jparams['deliveryState'] : 'NONE') +
838 ', deliveryCode: ' + (jparams['deliveryCode'] ?
839 jparams['deliveryCode'] : 'NONE') +
840 ', deliveryDesc: ' + (jparams['deliveryDescription'] ?
841 jparams['deliveryDescription'] : 'NONE') +
842 ', tag: ' + (jparams['tag'] ? jparams['tag'] : 'NONE') +
843 ', media: ' + (jparams['media'] ?
844 jparams['media'].to_s : 'NONE')
845
846 if others_num[0] != '+'
847 # TODO: check that others_num actually a shortcode first
848 others_num +=
849 ';phone-context=ca-us.phone-context.soprani.ca'
850 end
851
852 jid_key = "catapult_jid-#{users_num}"
853 bare_jid = REDIS.get(jid_key).promise.sync
854
855 if !bare_jid
856 puts "jid_key (#{jid_key}) DNE; BW API misconfigured?"
857
858 # TODO: likely not appropriate; give error to BW API?
859 # TODO: add text re credentials not being registered
860 #write_to_stream error_msg(m.reply, m.body, :auth,
861 # 'registration-required')
862 return [200, {}, "OK"]
863 end
864
865 msg = nil
866 case jparams['direction']
867 when 'in'
868 text = ''
869 case type
870 when 'sms'
871 text = jparams['text']
872 when 'mms'
873 has_media = false
874
875 if jparams['text'].empty?
876 if not has_media
877 text = '[suspected group msg '\
878 'with no text (odd)]'
879 end
880 else
881 text = if has_media
882 # TODO: write/use a caption XEP
883 jparams['text']
884 else
885 '[suspected group msg '\
886 '(recipient list not '\
887 'available) with '\
888 'following text] ' +
889 jparams['text']
890 end
891 end
892
893 # ie. if text param non-empty or had no media
894 if not text.empty?
895 msg = Blather::Stanza::Message.new(
896 bare_jid, text)
897 msg.from = others_num + '@' + ARGV[0]
898 SGXbwmsgsv2.write(msg)
899 end
900
901 return [200, {}, "OK"]
902 when 'message-received'
903 # TODO: handle group chat, and fix above
904 text = jparams['text']
905
906 if jparams['to'].length > 1
907 msg = Blather::Stanza::Message.new(
908 'cheogram.com', text) # TODO
909
910 addrs = Nokogiri::XML::Node.new(
911 'addresses', msg.document)
912 addrs['xmlns'] = 'http://jabber.org/' +
913 'protocol/address'
914
915 addr1 = Nokogiri::XML::Node.new(
916 'address', msg.document)
917 addr1['type'] = 'to'
918 addr1['jid'] = bare_jid
919 addrs.add_child(addr1)
920
921 jparams['to'].each do |receiver|
922 if receiver == users_num
923 # already there in addr1
924 next
925 end
926
927 addrn = Nokogiri::XML::Node.new(
928 'address', msg.document)
929 addrn['type'] = 'to'
930 addrn['uri'] = "sms:#{receiver}"
931 addrn['delivered'] = 'true'
932 addrs.add_child(addrn)
933 end
934
935 msg.add_child(addrs)
936
937 # TODO: delete
938 puts "RESPONSE9: #{msg.inspect}"
939 end
940
941 jparams['media'].each do |media_url|
942 if not media_url.end_with?(
943 '.smil', '.txt', '.xml'
944 )
945
946 has_media = true
947 SGXbwmsgsv2.send_media(
948 others_num + '@' +
949 ARGV[0],
950 bare_jid, media_url,
951 nil, nil, msg
952 )
953 end
954 end unless not jparams['media']
955 else
956 text = "unknown type (#{type})"\
957 " with text: " + jparams['text']
958
959 # TODO: log/notify of this properly
960 puts text
961 end
962
963 if not msg
964 msg = Blather::Stanza::Message.new(bare_jid,
965 text)
966 end
967 else # per prior switch, this is: jparams['direction'] == 'out'
968 tag_parts = jparams['tag'].split(/ /, 2)
969 id = WEBrick::HTTPUtils.unescape(tag_parts[0])
970 resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
971
972 # TODO: remove this hack
973 if jparams['to'].length > 1
974 puts "WARN! group no rcpt: #{users_num}"
975 return [200, {}, "OK"]
976 end
977
978 case type
979 when 'message-failed'
980 # create a bare message like the one user sent
981 msg = Blather::Stanza::Message.new(
982 others_num + '@' + ARGV[0])
983 msg.from = bare_jid + '/' + resourcepart
984 msg['id'] = id
985
986 # TODO: add 'errorCode' and/or 'description' val
987 # create an error reply to the bare message
988 msg = Blather::StanzaError.new(
989 msg,
990 'recipient-unavailable',
991 :wait
992 ).to_node
993
994 # TODO: make prettier: this should be done above
995 others_num = params['_json'][0]['to']
996 when 'message-delivered'
997
998 msg = ReceiptMessage.new(bare_jid)
999
1000 # TODO: put in member/instance variable
1001 msg['id'] = SecureRandom.uuid
1002
1003 # TODO: send only when requested per XEP-0184
1004 rcvd = Nokogiri::XML::Node.new(
1005 'received',
1006 msg.document
1007 )
1008 rcvd['xmlns'] = 'urn:xmpp:receipts'
1009 rcvd['id'] = id
1010 msg.add_child(rcvd)
1011
1012 # TODO: make prettier: this should be done above
1013 others_num = params['_json'][0]['to']
1014 else
1015 # TODO: notify somehow of unknown state receivd?
1016 puts "message with id #{id} has "\
1017 "other type #{type}"
1018 return [200, {}, "OK"]
1019 end
1020
1021 puts "RESPONSE4: #{msg.inspect}"
1022 end
1023
1024 msg.from = others_num + '@' + ARGV[0]
1025 SGXbwmsgsv2.write(msg)
1026
1027 [200, {}, "OK"]
1028
1029 rescue Exception => e
1030 puts 'Shutting down gateway due to exception 013: ' + e.message
1031 SGXbwmsgsv2.shutdown
1032 puts 'Gateway has terminated.'
1033 EM.stop
1034 end
1035end
1036
1037at_exit do
1038 $stdout.sync = true
1039
1040 puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\
1041 "==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
1042
1043 if ARGV.size != 7 and ARGV.size != 8
1044 puts "Usage: sgx-bwmsgsv2.rb <component_jid> "\
1045 "<component_password> <server_hostname> "\
1046 "<server_port> <application_id> "\
1047 "<http_listen_port> <mms_proxy_prefix_url> [V1_creds_file]"
1048 exit 0
1049 end
1050
1051 t = Time.now
1052 puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
1053
1054 EM.run do
1055 REDIS = EM::Hiredis.connect
1056
1057 SGXbwmsgsv2.run
1058
1059 # required when using Prosody otherwise disconnects on 6-hour inactivity
1060 EM.add_periodic_timer(3600) do
1061 msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1062 msg.from = ARGV[0]
1063 SGXbwmsgsv2.write(msg)
1064 end
1065
1066 server = Goliath::Server.new('127.0.0.1', ARGV[5].to_i)
1067 server.api = WebhookHandler.new
1068 server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1069 server.logger = Log4r::Logger.new('goliath')
1070 server.logger.add(Log4r::StdoutOutputter.new('console'))
1071 server.logger.level = Log4r::INFO
1072 server.start do
1073 ["INT", "TERM"].each do |sig|
1074 trap(sig) do
1075 EM.defer do
1076 puts 'Shutting down gateway...'
1077 SGXbwmsgsv2.shutdown
1078
1079 puts 'Gateway has terminated.'
1080 EM.stop
1081 end
1082 end
1083 end
1084 end
1085 end
1086end