1#!/usr/bin/env ruby
2# frozen_string_literal: true
3
4# Copyright (C) 2017-2020 Denver Gingerich <denver@ossguy.com>
5# Copyright (C) 2017 Stephen Paul Weber <singpolyma@singpolyma.net>
6#
7# This file is part of sgx-bwmsgsv2.
8#
9# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
10# the terms of the GNU Affero General Public License as published by the Free
11# Software Foundation, either version 3 of the License, or (at your option) any
12# later version.
13#
14# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
15# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17# details.
18#
19# You should have received a copy of the GNU Affero General Public License along
20# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
21
22require 'blather/client/dsl'
23require 'em-hiredis'
24require 'em-http-request'
25require 'json'
26require 'securerandom'
27require 'time'
28require 'uri'
29require 'webrick'
30
31require 'goliath/api'
32require 'goliath/server'
33require 'log4r'
34
35require 'em_promise'
36
37def panic(e)
38 puts "Shutting down gateway due to exception: #{e.message}"
39 puts e.backtrace
40 SGXbwmsgsv2.shutdown
41 puts 'Gateway has terminated.'
42 EM.stop
43end
44
45def extract_shortcode(dest)
46 num, context = dest.split(';', 2)
47 num if context == 'phone-context=ca-us.phone-context.soprani.ca'
48end
49
50def anonymous_tel?(dest)
51 dest.split(';', 2)[1] == 'phone-context=anonymous.phone-context.soprani.ca'
52end
53
54class SGXClient < Blather::Client
55 def register_handler(type, *guards, &block)
56 super(type, *guards) { |*args| wrap_handler(*args, &block) }
57 end
58
59 def register_handler_before(type, *guards, &block)
60 check_handler(type, guards)
61 handler = lambda { |*args| wrap_handler(*args, &block) }
62
63 @handlers[type] ||= []
64 @handlers[type].unshift([guards, handler])
65 end
66
67protected
68
69 def wrap_handler(*args)
70 v = yield(*args)
71 v.catch(&method(:panic)) if v.is_a?(Promise)
72 true # Do not run other handlers unless throw :pass
73 rescue Exception => e
74 panic(e)
75 end
76end
77
78# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
79module CatapultSettingFlagBits
80 VOICEMAIL_TRANSCRIPTION_DISABLED = 0
81 MMS_ON_OOB_URL = 1
82end
83
84module SGXbwmsgsv2
85 extend Blather::DSL
86
87 @client = SGXClient.new
88 @gateway_features = [
89 "http://jabber.org/protocol/disco#info",
90 "http://jabber.org/protocol/address/",
91 "jabber:iq:register"
92 ]
93
94 def self.run
95 # TODO: read/save ARGV[7] creds to local variables
96 client.run
97 end
98
99 # so classes outside this module can write messages, too
100 def self.write(stanza)
101 client.write(stanza)
102 end
103
104 def self.before_handler(type, *guards, &block)
105 client.register_handler_before(type, *guards, &block)
106 end
107
108 def self.send_media(from, to, media_url, desc=nil, subject=nil, m=nil)
109 # we assume media_url is one of these (always the case so far):
110 # https://api.catapult.inetwork.com/v1/users/[uid]/media/[file]
111 # https://messaging.bandwidth.com/api/v2/users/[u]/media/[path]
112
113 usr = to
114 pth = ''
115 if media_url.start_with?(
116 'https://api.catapult.inetwork.com/v1/users/')
117
118 # TODO: MUST fix this TERRIBLE hack
119 # there must be a catapult_cred-<usr> key with V1 creds
120 usr = 'v1'
121
122 pth = media_url.split('/', 8)[7]
123
124 elsif media_url.start_with?(
125 'https://messaging.bandwidth.com/api/v2/users/')
126
127 pth = media_url.split('/', 9)[8]
128 else
129 puts "ERROR2: unrecognized media_url: '#{media_url}'"
130 return
131 end
132
133 # the caller must guarantee that 'to' is a bare JID
134 proxy_url = ARGV[6] + WEBrick::HTTPUtils.escape(usr) + '/' + pth
135
136 puts 'ORIG_URL: ' + media_url
137 puts 'PROX_URL: ' + proxy_url
138
139 # put URL in the body (so Conversations will still see it)...
140 msg = Blather::Stanza::Message.new(to, proxy_url)
141 if m
142 msg = m.copy
143 msg.body = proxy_url
144 end
145 msg.from = from
146 msg.subject = subject if subject
147
148 # ...but also provide URL in XEP-0066 (OOB) fashion
149 # TODO: confirm client supports OOB or don't send this
150 x = Nokogiri::XML::Node.new 'x', msg.document
151 x['xmlns'] = 'jabber:x:oob'
152
153 urln = Nokogiri::XML::Node.new 'url', msg.document
154 urlc = Nokogiri::XML::Text.new proxy_url, msg.document
155 urln.add_child(urlc)
156 x.add_child(urln)
157
158 if desc
159 descn = Nokogiri::XML::Node.new('desc', msg.document)
160 descc = Nokogiri::XML::Text.new(desc, msg.document)
161 descn.add_child(descc)
162 x.add_child(descn)
163 end
164
165 msg.add_child(x)
166
167 write(msg)
168 rescue Exception => e
169 panic(e)
170 end
171
172 def self.error_msg(orig, query_node, type, name, _text=nil)
173 orig.type = :error
174
175 error = Nokogiri::XML::Node.new 'error', orig.document
176 error['type'] = type
177 orig.add_child(error)
178
179 suberr = Nokogiri::XML::Node.new name, orig.document
180 suberr['xmlns'] = 'urn:ietf:params:xml:ns:xmpp-stanzas'
181 error.add_child(suberr)
182
183 orig.add_child(query_node) if query_node
184
185 # TODO: add some explanatory xml:lang='en' text (see text param)
186 puts "RESPONSE3: #{orig.inspect}"
187 return orig
188 end
189
190 # workqueue_count MUST be 0 or else Blather uses threads!
191 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, workqueue_count: 0
192
193 def self.pass_on_message(m, users_num, jid)
194 # setup delivery receipt; similar to a reply
195 rcpt = ReceiptMessage.new(m.from.stripped)
196 rcpt.from = m.to
197
198 # pass original message (before sending receipt)
199 m.to = jid
200 m.from = "#{users_num}@#{ARGV[0]}"
201
202 puts 'XRESPONSE0: ' + m.inspect
203 write_to_stream m
204
205 # send a delivery receipt back to the sender
206 # TODO: send only when requested per XEP-0184
207 # TODO: pass receipts from target if supported
208
209 # TODO: put in member/instance variable
210 rcpt['id'] = SecureRandom.uuid
211 rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
212 rcvd['xmlns'] = 'urn:xmpp:receipts'
213 rcvd['id'] = m.id
214 rcpt.add_child(rcvd)
215
216 puts 'XRESPONSE1: ' + rcpt.inspect
217 write_to_stream rcpt
218 end
219
220 def self.call_catapult(
221 token, secret, m, pth, body=nil,
222 head={}, code=[200], respond_with=:body
223 )
224 # pth looks like one of:
225 # "api/v2/users/#{user_id}/[endpoint_name]"
226 # "v1/users/#{user_id}/[endpoint_name]"
227
228 url_prefix = ''
229
230 # TODO: need to make a separate thing for voice.bw.c eventually
231 if pth.start_with? 'api/v2/users'
232 url_prefix = 'https://messaging.bandwidth.com/'
233 elsif pth.start_with? 'v1/users'
234 # begin hack for running V2 messages along with V1 voice
235 url_prefix = 'https://api.catapult.inetwork.com/'
236 # TODO: set token and secret to vals provided at startup
237 # TODO: replace pth's user_id with user_id from ARGV[7]
238 # -> pth = "v1/users/#{new_id}/" + pth.split('/', 4)[3]
239 # TODO: else error
240 end
241
242 EM::HttpRequest.new(
243 url_prefix + pth
244 ).public_send(
245 m,
246 head: {
247 'Authorization' => [token, secret]
248 }.merge(head),
249 body: body
250 ).then { |http|
251 puts "API response to send: #{http.response} with code"\
252 " response.code #{http.response_header.status}"
253
254 if code.include?(http.response_header.status)
255 case respond_with
256 when :body
257 http.response
258 when :headers
259 http.response_header
260 else
261 http
262 end
263 else
264 EMPromise.reject(http.response_header.status)
265 end
266 }
267 end
268
269 def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
270 usern)
271 un = s.at("oob|x > oob|url", oob: "jabber:x:oob")
272 unless un
273 puts "MMSOOB: no url node found so process as normal"
274 return to_catapult(s, nil, num_dest, user_id, token,
275 secret, usern)
276 end
277 puts "MMSOOB: found a url node - checking if to make MMS..."
278
279 # TODO: check size of file at un.text and shrink if need
280
281 body = s.respond_to?(:body) ? s.body : ''
282 # some clients send URI in both body & <url/> so delete
283 s.body = body.sub(/\s*#{Regexp.escape(un.text)}\s*$/, '')
284
285 puts "MMSOOB: url text is '#{un.text}'"
286 puts "MMSOOB: the body is '#{body.to_s.strip}'"
287
288 puts "MMSOOB: sending MMS since found OOB & user asked"
289 to_catapult(s, un.text, num_dest, user_id, token, secret, usern)
290 end
291
292 def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
293 body = s.respond_to?(:body) ? s.body : ''
294 if murl.to_s.empty? && body.to_s.strip.empty?
295 return EMPromise.reject(
296 [:modify, 'policy-violation']
297 )
298 end
299
300 extra = {}
301 extra[:media] = murl if murl
302
303 call_catapult(
304 token,
305 secret,
306 :post,
307 "api/v2/users/#{user_id}/messages",
308 JSON.dump(extra.merge(
309 from: usern,
310 to: num_dest,
311 text: body,
312 applicationId: ARGV[4],
313 tag:
314 # callbacks need id and resourcepart
315 WEBrick::HTTPUtils.escape(s.id.to_s) +
316 ' ' +
317 WEBrick::HTTPUtils.escape(
318 s.from.resource.to_s
319 )
320 )),
321 {'Content-Type' => 'application/json'},
322 [201]
323 ).catch {
324 # TODO: add text; mention code number
325 EMPromise.reject(
326 [:cancel, 'internal-server-error']
327 )
328 }
329 end
330
331 def self.validate_num(m)
332 # if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
333 if m.to == ARGV[0]
334 an = m.children.find { |v| v.element_name == "addresses" }
335 if not an
336 return EMPromise.reject(
337 [:cancel, 'item-not-found']
338 )
339 end
340 puts "ADRXEP: found an addresses node - iterate addrs.."
341
342 nums = []
343 an.children.each do |e|
344 num = ''
345 type = ''
346 e.attributes.each do |c|
347 if c[0] == 'type'
348 if c[1] != 'to'
349 # TODO: error
350 end
351 type = c[1].to_s
352 elsif c[0] == 'uri'
353 if !c[1].to_s.start_with? 'sms:'
354 # TODO: error
355 end
356 num = c[1].to_s[4..-1]
357 # TODO: confirm num validates
358 # TODO: else, error - unexpected name
359 end
360 end
361 if num.empty? or type.empty?
362 # TODO: error
363 end
364 nums << num
365 end
366 return nums
367 end
368
369 # if not sent to SGX domain, then assume destination is in 'to'
370 EMPromise.resolve(m.to.node.to_s).then { |num_dest|
371 if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
372 next num_dest if num_dest[0] == '+'
373
374 shortcode = extract_shortcode(num_dest)
375 next shortcode if shortcode
376 end
377
378 if anonymous_tel?(num_dest)
379 EMPromise.reject([:cancel, 'gone'])
380 else
381 # TODO: text re num not (yet) supportd/implmentd
382 EMPromise.reject([:cancel, 'item-not-found'])
383 end
384 }
385 end
386
387 def self.fetch_catapult_cred_for(jid)
388 cred_key = "catapult_cred-#{jid.stripped}"
389 REDIS.lrange(cred_key, 0, 3).then { |creds|
390 if creds.length < 4
391 # TODO: add text re credentials not registered
392 EMPromise.reject(
393 [:auth, 'registration-required']
394 )
395 else
396 creds
397 end
398 }
399 end
400
401 message :error? do |m|
402 # TODO: report it somewhere/somehow - eat for now so no err loop
403 puts "EATERROR1: #{m.inspect}"
404 end
405
406 message :body do |m|
407 EMPromise.all([
408 validate_num(m),
409 fetch_catapult_cred_for(m.from)
410 ]).then { |(num_dest, creds)|
411 jid_key = "catapult_jid-#{num_dest}"
412 REDIS.get(jid_key).then { |jid|
413 [jid, num_dest] + creds
414 }
415 }.then { |(jid, num_dest, *creds)|
416 if jid
417 cred_key = "catapult_cred-#{jid}"
418 REDIS.lrange(cred_key, 0, 0).then { |other_user|
419 [jid, num_dest] + creds + other_user
420 }
421 else
422 [jid, num_dest] + creds + [nil]
423 end
424 }.then { |(jid, num_dest, *creds, other_user)|
425 # if destination user is in the system pass on directly
426 if other_user and not other_user.start_with? 'u-'
427 pass_on_message(m, creds.last, jid)
428 else
429 to_catapult_possible_oob(m, num_dest, *creds)
430 end
431 }.catch { |e|
432 if e.is_a?(Array) && e.length == 2
433 write_to_stream error_msg(m.reply, m.body, *e)
434 else
435 EMPromise.reject(e)
436 end
437 }
438 end
439
440 def self.user_cap_identities
441 [{category: 'client', type: 'sms'}]
442 end
443
444 # TODO: must re-add stuff so can do ad-hoc commands
445 def self.user_cap_features
446 ["urn:xmpp:receipts"]
447 end
448
449 def self.add_gateway_feature(feature)
450 @gateway_features << feature
451 @gateway_features.uniq!
452 end
453
454 subscription :request? do |p|
455 puts "PRESENCE1: #{p.inspect}"
456
457 # subscriptions are allowed from anyone - send reply immediately
458 msg = Blather::Stanza::Presence.new
459 msg.to = p.from
460 msg.from = p.to
461 msg.type = :subscribed
462
463 puts 'RESPONSE5a: ' + msg.inspect
464 write_to_stream msg
465
466 # send a <presence> immediately; not automatically probed for it
467 # TODO: refactor so no "presence :probe? do |p|" duplicate below
468 caps = Blather::Stanza::Capabilities.new
469 # TODO: user a better node URI (?)
470 caps.node = 'http://catapult.sgx.soprani.ca/'
471 caps.identities = user_cap_identities
472 caps.features = user_cap_features
473
474 msg = caps.c
475 msg.to = p.from
476 msg.from = p.to.to_s + '/sgx'
477
478 puts 'RESPONSE5b: ' + msg.inspect
479 write_to_stream msg
480
481 # need to subscribe back so Conversations displays images inline
482 msg = Blather::Stanza::Presence.new
483 msg.to = p.from.to_s.split('/', 2)[0]
484 msg.from = p.to.to_s.split('/', 2)[0]
485 msg.type = :subscribe
486
487 puts 'RESPONSE5c: ' + msg.inspect
488 write_to_stream msg
489 end
490
491 presence :probe? do |p|
492 puts 'PRESENCE2: ' + p.inspect
493
494 caps = Blather::Stanza::Capabilities.new
495 # TODO: user a better node URI (?)
496 caps.node = 'http://catapult.sgx.soprani.ca/'
497 caps.identities = user_cap_identities
498 caps.features = user_cap_features
499
500 msg = caps.c
501 msg.to = p.from
502 msg.from = p.to.to_s + '/sgx'
503
504 puts 'RESPONSE6: ' + msg.inspect
505 write_to_stream msg
506 end
507
508 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#info' do |i|
509 # TODO: return error if i.type is :set - if it is :reply or
510 # :error it should be ignored (as the below does currently);
511 # review specification to see how to handle other type values
512 if i.type != :get
513 puts 'DISCO iq rcvd, of non-get type "' + i.type.to_s +
514 '" for message "' + i.inspect + '"; ignoring...'
515 next
516 end
517
518 # respond to capabilities request for an sgx-bwmsgsv2 number JID
519 if i.to.node
520 # TODO: confirm the node URL is expected using below
521 #puts "XR[node]: #{xpath_result[0]['node']}"
522
523 msg = i.reply
524 msg.node = i.node
525 msg.identities = user_cap_identities
526 msg.features = user_cap_features
527
528 puts 'RESPONSE7: ' + msg.inspect
529 write_to_stream msg
530 next
531 end
532
533 # respond to capabilities request for sgx-bwmsgsv2 itself
534 msg = i.reply
535 msg.node = i.node
536 msg.identities = [{
537 name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
538 type: 'sms', category: 'gateway'
539 }]
540 msg.features = @gateway_features
541 write_to_stream msg
542 end
543
544 def self.check_then_register(i, *creds)
545 jid_key = "catapult_jid-#{creds.last}"
546 bare_jid = i.from.stripped
547 cred_key = "catapult_cred-#{bare_jid}"
548
549 REDIS.get(jid_key).then { |existing_jid|
550 if existing_jid && existing_jid != bare_jid
551 # TODO: add/log text: credentials exist already
552 EMPromise.reject([:cancel, 'conflict'])
553 end
554 }.then {
555 REDIS.lrange(cred_key, 0, 3)
556 }.then { |existing_creds|
557 # TODO: add/log text: credentials exist already
558 if existing_creds.length == 4 && creds != existing_creds
559 EMPromise.reject([:cancel, 'conflict'])
560 elsif existing_creds.length < 4
561 REDIS.rpush(cred_key, *creds).then { |length|
562 if length != 4
563 EMPromise.reject([
564 :cancel,
565 'internal-server-error'
566 ])
567 end
568 }
569 end
570 }.then {
571 # not necessary if existing_jid non-nil, easier this way
572 REDIS.set(jid_key, bare_jid)
573 }.then { |result|
574 if result != 'OK'
575 # TODO: add txt re push failure
576 EMPromise.reject(
577 [:cancel, 'internal-server-error']
578 )
579 end
580 }.then {
581 write_to_stream i.reply
582 }
583 end
584
585 def self.creds_from_registration_query(qn)
586 xn = qn.children.find { |v| v.element_name == "x" }
587
588 if xn
589 xn.children.each_with_object({}) do |field, h|
590 next if field.element_name != "field"
591
592 val = field.children.find { |v|
593 v.element_name == "value"
594 }
595
596 case field['var']
597 when 'nick'
598 h[:user_id] = val.text
599 when 'username'
600 h[:api_token] = val.text
601 when 'password'
602 h[:api_secret] = val.text
603 when 'phone'
604 h[:phone_num] = val.text
605 else
606 # TODO: error
607 puts "?: #{field['var']}"
608 end
609 end
610 else
611 qn.children.each_with_object({}) do |field, h|
612 case field.element_name
613 when "nick"
614 h[:user_id] = field.text
615 when "username"
616 h[:api_token] = field.text
617 when "password"
618 h[:api_secret] = field.text
619 when "phone"
620 h[:phone_num] = field.text
621 end
622 end
623 end.values_at(:user_id, :api_token, :api_secret, :phone_num)
624 end
625
626 def self.process_registration(i, qn)
627 EMPromise.resolve(
628 qn.children.find { |v| v.element_name == "remove" }
629 ).then { |rn|
630 if rn
631 puts "received <remove/> - ignoring for now..."
632 EMPromise.reject(:done)
633 else
634 creds_from_registration_query(qn)
635 end
636 }.then { |user_id, api_token, api_secret, phone_num|
637 if phone_num[0] == '+'
638 [user_id, api_token, api_secret, phone_num]
639 else
640 # TODO: add text re number not (yet) supported
641 EMPromise.reject([:cancel, 'item-not-found'])
642 end
643 }.then { |user_id, api_token, api_secret, phone_num|
644 # TODO: find way to verify #{phone_num}, too
645 call_catapult(
646 api_token,
647 api_secret,
648 :get,
649 "api/v2/users/#{user_id}/media"
650 ).then { |response|
651 JSON.parse(response)
652 # TODO: confirm response is array - could be empty
653
654 puts "register got str #{response.to_s[0..999]}"
655
656 check_then_register(
657 i,
658 user_id,
659 api_token,
660 api_secret,
661 phone_num
662 )
663 }
664 }.catch { |e|
665 EMPromise.reject(case e
666 when 401
667 # TODO: add text re bad credentials
668 [:auth, 'not-authorized']
669 when 404
670 # TODO: add text re number not found or disabled
671 [:cancel, 'item-not-found']
672 when Integer
673 [:modify, 'not-acceptable']
674 else
675 e
676 end)
677 }
678 end
679
680 def self.registration_form(orig, existing_number=nil)
681 msg = Nokogiri::XML::Node.new 'query', orig.document
682 msg['xmlns'] = 'jabber:iq:register'
683
684 if existing_number
685 msg.add_child(
686 Nokogiri::XML::Node.new(
687 'registered', msg.document
688 )
689 )
690 end
691
692 # TODO: update "User Id" x2 below (to "accountId"?), and others?
693 n1 = Nokogiri::XML::Node.new(
694 'instructions', msg.document
695 )
696 n1.content = "Enter the information from your Account "\
697 "page as well as the Phone Number\nin your "\
698 "account you want to use (ie. '+12345678901')"\
699 ".\nUser Id is nick, API Token is username, "\
700 "API Secret is password, Phone Number is phone"\
701 ".\n\nThe source code for this gateway is at "\
702 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
703 "\nCopyright (C) 2017-2020 Denver Gingerich "\
704 "and others, licensed under AGPLv3+."
705 n2 = Nokogiri::XML::Node.new 'nick', msg.document
706 n3 = Nokogiri::XML::Node.new 'username', msg.document
707 n4 = Nokogiri::XML::Node.new 'password', msg.document
708 n5 = Nokogiri::XML::Node.new 'phone', msg.document
709 n5.content = existing_number.to_s
710 msg.add_child(n1)
711 msg.add_child(n2)
712 msg.add_child(n3)
713 msg.add_child(n4)
714 msg.add_child(n5)
715
716 x = Blather::Stanza::X.new :form, [
717 {
718 required: true, type: :"text-single",
719 label: 'User Id', var: 'nick'
720 },
721 {
722 required: true, type: :"text-single",
723 label: 'API Token', var: 'username'
724 },
725 {
726 required: true, type: :"text-private",
727 label: 'API Secret', var: 'password'
728 },
729 {
730 required: true, type: :"text-single",
731 label: 'Phone Number', var: 'phone',
732 value: existing_number.to_s
733 }
734 ]
735 x.title = 'Register for '\
736 'Soprani.ca Gateway to XMPP - Bandwidth API V2'
737 x.instructions = "Enter the details from your Account "\
738 "page as well as the Phone Number\nin your "\
739 "account you want to use (ie. '+12345678901')"\
740 ".\n\nThe source code for this gateway is at "\
741 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
742 "\nCopyright (C) 2017-2020 Denver Gingerich "\
743 "and others, licensed under AGPLv3+."
744 msg.add_child(x)
745
746 orig.add_child(msg)
747
748 return orig
749 end
750
751 iq '/iq/ns:query', ns: 'jabber:iq:register' do |i, qn|
752 puts "IQ: #{i.inspect}"
753
754 case i.type
755 when :set
756 process_registration(i, qn)
757 when :get
758 bare_jid = i.from.stripped
759 cred_key = "catapult_cred-#{bare_jid}"
760 REDIS.lindex(cred_key, 3).then { |existing_number|
761 reply = registration_form(i.reply, existing_number)
762 puts "RESPONSE2: #{reply.inspect}"
763 write_to_stream reply
764 }
765 else
766 # Unknown IQ, ignore for now
767 EMPromise.reject(:done)
768 end.catch { |e|
769 if e.is_a?(Array) && e.length == 2
770 write_to_stream error_msg(i.reply, qn, *e)
771 elsif e != :done
772 EMPromise.reject(e)
773 end
774 }.catch(&method(:panic))
775 end
776
777 iq type: [:get, :set] do |iq|
778 write_to_stream(Blather::StanzaError.new(
779 iq,
780 'feature-not-implemented',
781 :cancel
782 ))
783 end
784end
785
786class ReceiptMessage < Blather::Stanza
787 def self.new(to=nil)
788 node = super :message
789 node.to = to
790 node
791 end
792end
793
794class WebhookHandler < Goliath::API
795 use Goliath::Rack::Params
796
797 def response(env)
798 # TODO: add timestamp grab here, and MUST include ./tai version
799
800 puts 'ENV: ' + env.reject { |k| k == 'params' }.to_s
801
802 if params.empty?
803 puts 'PARAMS empty!'
804 return [200, {}, "OK"]
805 end
806
807 if env['REQUEST_URI'] != '/'
808 puts 'BADREQUEST1: non-/ request "' +
809 env['REQUEST_URI'] + '", method "' +
810 env['REQUEST_METHOD'] + '"'
811 return [200, {}, "OK"]
812 end
813
814 if env['REQUEST_METHOD'] != 'POST'
815 puts 'BADREQUEST2: non-POST request; URI: "' +
816 env['REQUEST_URI'] + '", method "' +
817 env['REQUEST_METHOD'] + '"'
818 return [200, {}, "OK"]
819 end
820
821 # TODO: process each message in list, not just first one
822 jparams = params['_json'][0]['message']
823
824 type = params['_json'][0]['type']
825
826 users_num = ''
827 others_num = ''
828 if jparams['direction'] == 'in'
829 users_num = jparams['owner']
830 others_num = jparams['from']
831 elsif jparams['direction'] == 'out'
832 users_num = jparams['from']
833 others_num = jparams['owner']
834 else
835 # TODO: exception or similar
836 puts "big prob: '" + jparams['direction'] + "'" + body
837 return [200, {}, "OK"]
838 end
839
840 puts 'BODY - messageId: ' + jparams['id'] +
841 ', eventType: ' + type +
842 ', time: ' + jparams['time'] +
843 ', direction: ' + jparams['direction'] +
844 #', state: ' + jparams['state'] +
845 ', deliveryState: ' + (jparams['deliveryState'] ?
846 jparams['deliveryState'] : 'NONE') +
847 ', errorCode: ' + (jparams['errorCode'] ?
848 jparams['errorCode'] : 'NONE') +
849 ', description: ' + (jparams['description'] ?
850 jparams['description'] : 'NONE') +
851 ', tag: ' + (jparams['tag'] ? jparams['tag'] : 'NONE') +
852 ', media: ' + (jparams['media'] ?
853 jparams['media'].to_s : 'NONE')
854
855 if others_num[0] != '+'
856 # TODO: check that others_num actually a shortcode first
857 others_num +=
858 ';phone-context=ca-us.phone-context.soprani.ca'
859 end
860
861 jid_key = "catapult_jid-#{users_num}"
862 bare_jid = REDIS.get(jid_key).promise.sync
863
864 if !bare_jid
865 puts "jid_key (#{jid_key}) DNE; BW API misconfigured?"
866
867 # TODO: likely not appropriate; give error to BW API?
868 # TODO: add text re credentials not being registered
869 #write_to_stream error_msg(m.reply, m.body, :auth,
870 # 'registration-required')
871 return [200, {}, "OK"]
872 end
873
874 msg = nil
875 case jparams['direction']
876 when 'in'
877 text = ''
878 case type
879 when 'sms'
880 text = jparams['text']
881 when 'mms'
882 has_media = false
883
884 if jparams['text'].empty?
885 if not has_media
886 text = '[suspected group msg '\
887 'with no text (odd)]'
888 end
889 else
890 text = if has_media
891 # TODO: write/use a caption XEP
892 jparams['text']
893 else
894 '[suspected group msg '\
895 '(recipient list not '\
896 'available) with '\
897 'following text] ' +
898 jparams['text']
899 end
900 end
901
902 # ie. if text param non-empty or had no media
903 if not text.empty?
904 msg = Blather::Stanza::Message.new(
905 bare_jid, text)
906 msg.from = others_num + '@' + ARGV[0]
907 SGXbwmsgsv2.write(msg)
908 end
909
910 return [200, {}, "OK"]
911 when 'message-received'
912 # TODO: handle group chat, and fix above
913 text = jparams['text']
914
915 if jparams['to'].length > 1
916 msg = Blather::Stanza::Message.new(
917 Blather::JID.new(bare_jid).domain,
918 text
919 )
920
921 addrs = Nokogiri::XML::Node.new(
922 'addresses', msg.document)
923 addrs['xmlns'] = 'http://jabber.org/' \
924 'protocol/address'
925
926 addr1 = Nokogiri::XML::Node.new(
927 'address', msg.document)
928 addr1['type'] = 'to'
929 addr1['jid'] = bare_jid
930 addrs.add_child(addr1)
931
932 jparams['to'].each do |receiver|
933 if receiver == users_num
934 # already there in addr1
935 next
936 end
937
938 addrn = Nokogiri::XML::Node.new(
939 'address', msg.document)
940 addrn['type'] = 'to'
941 addrn['uri'] = "sms:#{receiver}"
942 addrn['delivered'] = 'true'
943 addrs.add_child(addrn)
944 end
945
946 msg.add_child(addrs)
947
948 # TODO: delete
949 puts "RESPONSE9: #{msg.inspect}"
950 end
951
952 Array(jparams['media']).each do |media_url|
953 unless media_url.end_with?(
954 '.smil', '.txt', '.xml'
955 )
956 has_media = true
957 SGXbwmsgsv2.send_media(
958 others_num + '@' +
959 ARGV[0],
960 bare_jid, media_url,
961 nil, nil, msg
962 )
963 end
964 end
965 else
966 text = "unknown type (#{type})"\
967 " with text: " + jparams['text']
968
969 # TODO: log/notify of this properly
970 puts text
971 end
972
973 if not msg
974 msg = Blather::Stanza::Message.new(bare_jid, text)
975 end
976 else # per prior switch, this is: jparams['direction'] == 'out'
977 tag_parts = jparams['tag'].split(/ /, 2)
978 id = WEBrick::HTTPUtils.unescape(tag_parts[0])
979 resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
980
981 # TODO: remove this hack
982 if jparams['to'].length > 1
983 puts "WARN! group no rcpt: #{users_num}"
984 return [200, {}, "OK"]
985 end
986
987 case type
988 when 'message-failed'
989 # create a bare message like the one user sent
990 msg = Blather::Stanza::Message.new(
991 others_num + '@' + ARGV[0])
992 msg.from = bare_jid + '/' + resourcepart
993 msg['id'] = id
994
995 # TODO: add 'errorCode' and/or 'description' val
996 # create an error reply to the bare message
997 msg = msg.as_error(
998 'recipient-unavailable',
999 :wait,
1000 jparams['description']
1001 )
1002
1003 # TODO: make prettier: this should be done above
1004 others_num = params['_json'][0]['to']
1005 when 'message-delivered'
1006
1007 msg = ReceiptMessage.new(bare_jid)
1008
1009 # TODO: put in member/instance variable
1010 msg['id'] = SecureRandom.uuid
1011
1012 # TODO: send only when requested per XEP-0184
1013 rcvd = Nokogiri::XML::Node.new(
1014 'received',
1015 msg.document
1016 )
1017 rcvd['xmlns'] = 'urn:xmpp:receipts'
1018 rcvd['id'] = id
1019 msg.add_child(rcvd)
1020
1021 # TODO: make prettier: this should be done above
1022 others_num = params['_json'][0]['to']
1023 else
1024 # TODO: notify somehow of unknown state receivd?
1025 puts "message with id #{id} has "\
1026 "other type #{type}"
1027 return [200, {}, "OK"]
1028 end
1029
1030 puts "RESPONSE4: #{msg.inspect}"
1031 end
1032
1033 msg.from = others_num + '@' + ARGV[0]
1034 SGXbwmsgsv2.write(msg)
1035
1036 [200, {}, "OK"]
1037 rescue Exception => e
1038 puts 'Shutting down gateway due to exception 013: ' + e.message
1039 SGXbwmsgsv2.shutdown
1040 puts 'Gateway has terminated.'
1041 EM.stop
1042 end
1043end
1044
1045at_exit do
1046 $stdout.sync = true
1047
1048 puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\
1049 "==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
1050
1051 if ARGV.size != 7 and ARGV.size != 8
1052 puts "Usage: sgx-bwmsgsv2.rb <component_jid> "\
1053 "<component_password> <server_hostname> "\
1054 "<server_port> <application_id> "\
1055 "<http_listen_port> <mms_proxy_prefix_url> [V1_creds_file]"
1056 exit 0
1057 end
1058
1059 t = Time.now
1060 puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
1061
1062 EM.run do
1063 REDIS = EM::Hiredis.connect
1064
1065 SGXbwmsgsv2.run
1066
1067 # required when using Prosody otherwise disconnects on 6-hour inactivity
1068 EM.add_periodic_timer(3600) do
1069 msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1070 msg.from = ARGV[0]
1071 SGXbwmsgsv2.write(msg)
1072 end
1073
1074 server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
1075 server.api = WebhookHandler.new
1076 server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1077 server.logger = Log4r::Logger.new('goliath')
1078 server.logger.add(Log4r::StdoutOutputter.new('console'))
1079 server.logger.level = Log4r::INFO
1080 server.start do
1081 ["INT", "TERM"].each do |sig|
1082 trap(sig) do
1083 EM.defer do
1084 puts 'Shutting down gateway...'
1085 SGXbwmsgsv2.shutdown
1086
1087 puts 'Gateway has terminated.'
1088 EM.stop
1089 end
1090 end
1091 end
1092 end
1093 end
1094end