1#!/usr/bin/env ruby
2#
3# Copyright (C) 2017-2020 Denver Gingerich <denver@ossguy.com>
4# Copyright (C) 2017 Stephen Paul Weber <singpolyma@singpolyma.net>
5#
6# This file is part of sgx-bwmsgsv2.
7#
8# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
9# the terms of the GNU Affero General Public License as published by the Free
10# Software Foundation, either version 3 of the License, or (at your option) any
11# later version.
12#
13# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
14# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
16# details.
17#
18# You should have received a copy of the GNU Affero General Public License along
19# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
20
21require 'blather/client/dsl'
22require 'em-hiredis'
23require 'em-http-request'
24require 'json'
25require 'securerandom'
26require 'time'
27require 'uri'
28require 'webrick'
29
30require 'goliath/api'
31require 'goliath/server'
32require 'log4r'
33
34require 'em_promise'
35
36def panic(e)
37 puts "Shutting down gateway due to exception: #{e.message}"
38 puts e.backtrace
39 SGXbwmsgsv2.shutdown
40 puts 'Gateway has terminated.'
41 EM.stop
42end
43
44def extract_shortcode(dest)
45 num, context = dest.split(';', 2)
46 num if context == 'phone-context=ca-us.phone-context.soprani.ca'
47end
48
49def anonymous_tel?(dest)
50 dest.split(';', 2)[1] == 'phone-context=anonymous.phone-context.soprani.ca'
51end
52
53class SGXClient < Blather::Client
54 def register_handler(type, *guards, &block)
55 super(type, *guards) { |*args| wrap_handler(*args, &block) }
56 end
57
58 def register_handler_before(type, *guards, &block)
59 check_handler(type, guards)
60 handler = lambda { |*args| wrap_handler(*args, &block) }
61
62 @handlers[type] ||= []
63 @handlers[type].unshift([guards, handler])
64 end
65
66protected
67
68 def wrap_handler(*args)
69 v = yield(*args)
70 v.catch(&method(:panic)) if v.is_a?(Promise)
71 true # Do not run other handlers unless throw :pass
72 rescue Exception => e
73 panic(e)
74 end
75end
76
77# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
78module CatapultSettingFlagBits
79 VOICEMAIL_TRANSCRIPTION_DISABLED = 0
80 MMS_ON_OOB_URL = 1
81end
82
83module SGXbwmsgsv2
84 extend Blather::DSL
85
86 @client = SGXClient.new
87 @gateway_features = [
88 "http://jabber.org/protocol/disco#info",
89 "http://jabber.org/protocol/address/",
90 "jabber:iq:register"
91 ]
92
93 def self.run
94 # TODO: read/save ARGV[7] creds to local variables
95 client.run
96 end
97
98 # so classes outside this module can write messages, too
99 def self.write(stanza)
100 client.write(stanza)
101 end
102
103 def self.before_handler(type, *guards, &block)
104 client.register_handler_before(type, *guards, &block)
105 end
106
107 def self.send_media(from, to, media_url, desc=nil, subject=nil, m=nil)
108 # we assume media_url is one of these (always the case so far):
109 # https://api.catapult.inetwork.com/v1/users/[uid]/media/[file]
110 # https://messaging.bandwidth.com/api/v2/users/[u]/media/[path]
111
112 usr = to
113 pth = ''
114 if media_url.start_with?(
115 'https://api.catapult.inetwork.com/v1/users/')
116
117 # TODO: MUST fix this TERRIBLE hack
118 # there must be a catapult_cred-<usr> key with V1 creds
119 usr = 'v1'
120
121 pth = media_url.split('/', 8)[7]
122
123 elsif media_url.start_with?(
124 'https://messaging.bandwidth.com/api/v2/users/')
125
126 pth = media_url.split('/', 9)[8]
127 else
128 puts "ERROR2: unrecognized media_url: '#{media_url}'"
129 return
130 end
131
132 # the caller must guarantee that 'to' is a bare JID
133 proxy_url = ARGV[6] + WEBrick::HTTPUtils.escape(usr) + '/' + pth
134
135 puts 'ORIG_URL: ' + media_url
136 puts 'PROX_URL: ' + proxy_url
137
138 # put URL in the body (so Conversations will still see it)...
139 msg = Blather::Stanza::Message.new(to, proxy_url)
140 if m
141 msg = m.copy
142 msg.body = proxy_url
143 end
144 msg.from = from
145 msg.subject = subject if subject
146
147 # ...but also provide URL in XEP-0066 (OOB) fashion
148 # TODO: confirm client supports OOB or don't send this
149 x = Nokogiri::XML::Node.new 'x', msg.document
150 x['xmlns'] = 'jabber:x:oob'
151
152 urln = Nokogiri::XML::Node.new 'url', msg.document
153 urlc = Nokogiri::XML::Text.new proxy_url, msg.document
154 urln.add_child(urlc)
155 x.add_child(urln)
156
157 if desc
158 descn = Nokogiri::XML::Node.new('desc', msg.document)
159 descc = Nokogiri::XML::Text.new(desc, msg.document)
160 descn.add_child(descc)
161 x.add_child(descn)
162 end
163
164 msg.add_child(x)
165
166 write(msg)
167 rescue Exception => e
168 panic(e)
169 end
170
171 def self.error_msg(orig, query_node, type, name, _text=nil)
172 orig.type = :error
173
174 error = Nokogiri::XML::Node.new 'error', orig.document
175 error['type'] = type
176 orig.add_child(error)
177
178 suberr = Nokogiri::XML::Node.new name, orig.document
179 suberr['xmlns'] = 'urn:ietf:params:xml:ns:xmpp-stanzas'
180 error.add_child(suberr)
181
182 orig.add_child(query_node) if query_node
183
184 # TODO: add some explanatory xml:lang='en' text (see text param)
185 puts "RESPONSE3: #{orig.inspect}"
186 return orig
187 end
188
189 # workqueue_count MUST be 0 or else Blather uses threads!
190 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, workqueue_count: 0
191
192 def self.pass_on_message(m, users_num, jid)
193 # setup delivery receipt; similar to a reply
194 rcpt = ReceiptMessage.new(m.from.stripped)
195 rcpt.from = m.to
196
197 # pass original message (before sending receipt)
198 m.to = jid
199 m.from = "#{users_num}@#{ARGV[0]}"
200
201 puts 'XRESPONSE0: ' + m.inspect
202 write_to_stream m
203
204 # send a delivery receipt back to the sender
205 # TODO: send only when requested per XEP-0184
206 # TODO: pass receipts from target if supported
207
208 # TODO: put in member/instance variable
209 rcpt['id'] = SecureRandom.uuid
210 rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
211 rcvd['xmlns'] = 'urn:xmpp:receipts'
212 rcvd['id'] = m.id
213 rcpt.add_child(rcvd)
214
215 puts 'XRESPONSE1: ' + rcpt.inspect
216 write_to_stream rcpt
217 end
218
219 def self.call_catapult(
220 token, secret, m, pth, body=nil,
221 head={}, code=[200], respond_with=:body
222 )
223 # pth looks like one of:
224 # "api/v2/users/#{user_id}/[endpoint_name]"
225 # "v1/users/#{user_id}/[endpoint_name]"
226
227 url_prefix = ''
228
229 # TODO: need to make a separate thing for voice.bw.c eventually
230 if pth.start_with? 'api/v2/users'
231 url_prefix = 'https://messaging.bandwidth.com/'
232 elsif pth.start_with? 'v1/users'
233 # begin hack for running V2 messages along with V1 voice
234 url_prefix = 'https://api.catapult.inetwork.com/'
235 # TODO: set token and secret to vals provided at startup
236 # TODO: replace pth's user_id with user_id from ARGV[7]
237 # -> pth = "v1/users/#{new_id}/" + pth.split('/', 4)[3]
238 # TODO: else error
239 end
240
241 EM::HttpRequest.new(
242 url_prefix + pth
243 ).public_send(
244 m,
245 head: {
246 'Authorization' => [token, secret]
247 }.merge(head),
248 body: body
249 ).then { |http|
250 puts "API response to send: #{http.response} with code"\
251 " response.code #{http.response_header.status}"
252
253 if code.include?(http.response_header.status)
254 case respond_with
255 when :body
256 http.response
257 when :headers
258 http.response_header
259 else
260 http
261 end
262 else
263 EMPromise.reject(http.response_header.status)
264 end
265 }
266 end
267
268 def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
269 usern)
270 un = s.at("oob|x > oob|url", oob: "jabber:x:oob")
271 unless un
272 puts "MMSOOB: no url node found so process as normal"
273 return to_catapult(s, nil, num_dest, user_id, token,
274 secret, usern)
275 end
276 puts "MMSOOB: found a url node - checking if to make MMS..."
277
278 # TODO: check size of file at un.text and shrink if need
279
280 body = s.respond_to?(:body) ? s.body : ''
281 # some clients send URI in both body & <url/> so delete
282 s.body = body.sub(/\s*#{Regexp.escape(un.text)}\s*$/, '')
283
284 puts "MMSOOB: url text is '#{un.text}'"
285 puts "MMSOOB: the body is '#{body.to_s.strip}'"
286
287 puts "MMSOOB: sending MMS since found OOB & user asked"
288 to_catapult(s, un.text, num_dest, user_id, token, secret, usern)
289 end
290
291 def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
292 body = s.respond_to?(:body) ? s.body : ''
293 if murl.to_s.empty? && body.to_s.strip.empty?
294 return EMPromise.reject(
295 [:modify, 'policy-violation']
296 )
297 end
298
299 extra = {}
300 extra[:media] = murl if murl
301
302 call_catapult(
303 token,
304 secret,
305 :post,
306 "api/v2/users/#{user_id}/messages",
307 JSON.dump(extra.merge(
308 from: usern,
309 to: num_dest,
310 text: body,
311 applicationId: ARGV[4],
312 tag:
313 # callbacks need id and resourcepart
314 WEBrick::HTTPUtils.escape(s.id.to_s) +
315 ' ' +
316 WEBrick::HTTPUtils.escape(
317 s.from.resource.to_s
318 )
319 )),
320 {'Content-Type' => 'application/json'},
321 [201]
322 ).catch {
323 # TODO: add text; mention code number
324 EMPromise.reject(
325 [:cancel, 'internal-server-error']
326 )
327 }
328 end
329
330 def self.validate_num(m)
331 # if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
332 if m.to == ARGV[0]
333 an = m.children.find { |v| v.element_name == "addresses" }
334 if not an
335 return EMPromise.reject(
336 [:cancel, 'item-not-found']
337 )
338 end
339 puts "ADRXEP: found an addresses node - iterate addrs.."
340
341 nums = []
342 an.children.each do |e|
343 num = ''
344 type = ''
345 e.attributes.each do |c|
346 if c[0] == 'type'
347 if c[1] != 'to'
348 # TODO: error
349 end
350 type = c[1].to_s
351 elsif c[0] == 'uri'
352 if !c[1].to_s.start_with? 'sms:'
353 # TODO: error
354 end
355 num = c[1].to_s[4..-1]
356 # TODO: confirm num validates
357 # TODO: else, error - unexpected name
358 end
359 end
360 if num.empty? or type.empty?
361 # TODO: error
362 end
363 nums << num
364 end
365 return nums
366 end
367
368 # if not sent to SGX domain, then assume destination is in 'to'
369 EMPromise.resolve(m.to.node.to_s).then { |num_dest|
370 if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
371 next num_dest if num_dest[0] == '+'
372 shortcode = extract_shortcode(num_dest)
373 next shortcode if shortcode
374 end
375
376 if anonymous_tel?(num_dest)
377 EMPromise.reject([:cancel, 'gone'])
378 else
379 # TODO: text re num not (yet) supportd/implmentd
380 EMPromise.reject([:cancel, 'item-not-found'])
381 end
382 }
383 end
384
385 def self.fetch_catapult_cred_for(jid)
386 cred_key = "catapult_cred-#{jid.stripped}"
387 REDIS.lrange(cred_key, 0, 3).then { |creds|
388 if creds.length < 4
389 # TODO: add text re credentials not registered
390 EMPromise.reject(
391 [:auth, 'registration-required']
392 )
393 else
394 creds
395 end
396 }
397 end
398
399 message :error? do |m|
400 # TODO: report it somewhere/somehow - eat for now so no err loop
401 puts "EATERROR1: #{m.inspect}"
402 end
403
404 message :body do |m|
405 EMPromise.all([
406 validate_num(m),
407 fetch_catapult_cred_for(m.from)
408 ]).then { |(num_dest, creds)|
409 jid_key = "catapult_jid-#{num_dest}"
410 REDIS.get(jid_key).then { |jid|
411 [jid, num_dest] + creds
412 }
413 }.then { |(jid, num_dest, *creds)|
414 if jid
415 cred_key = "catapult_cred-#{jid}"
416 REDIS.lrange(cred_key, 0, 0).then { |other_user|
417 [jid, num_dest] + creds + other_user
418 }
419 else
420 [jid, num_dest] + creds + [nil]
421 end
422 }.then { |(jid, num_dest, *creds, other_user)|
423 # if destination user is in the system pass on directly
424 if other_user and not other_user.start_with? 'u-'
425 pass_on_message(m, creds.last, jid)
426 else
427 to_catapult_possible_oob(m, num_dest, *creds)
428 end
429 }.catch { |e|
430 if e.is_a?(Array) && e.length == 2
431 write_to_stream error_msg(m.reply, m.body, *e)
432 else
433 EMPromise.reject(e)
434 end
435 }
436 end
437
438 def self.user_cap_identities
439 [{category: 'client', type: 'sms'}]
440 end
441
442 # TODO: must re-add stuff so can do ad-hoc commands
443 def self.user_cap_features
444 ["urn:xmpp:receipts"]
445 end
446
447 def self.add_gateway_feature(feature)
448 @gateway_features << feature
449 @gateway_features.uniq!
450 end
451
452 subscription :request? do |p|
453 puts "PRESENCE1: #{p.inspect}"
454
455 # subscriptions are allowed from anyone - send reply immediately
456 msg = Blather::Stanza::Presence.new
457 msg.to = p.from
458 msg.from = p.to
459 msg.type = :subscribed
460
461 puts 'RESPONSE5a: ' + msg.inspect
462 write_to_stream msg
463
464 # send a <presence> immediately; not automatically probed for it
465 # TODO: refactor so no "presence :probe? do |p|" duplicate below
466 caps = Blather::Stanza::Capabilities.new
467 # TODO: user a better node URI (?)
468 caps.node = 'http://catapult.sgx.soprani.ca/'
469 caps.identities = user_cap_identities
470 caps.features = user_cap_features
471
472 msg = caps.c
473 msg.to = p.from
474 msg.from = p.to.to_s + '/sgx'
475
476 puts 'RESPONSE5b: ' + msg.inspect
477 write_to_stream msg
478
479 # need to subscribe back so Conversations displays images inline
480 msg = Blather::Stanza::Presence.new
481 msg.to = p.from.to_s.split('/', 2)[0]
482 msg.from = p.to.to_s.split('/', 2)[0]
483 msg.type = :subscribe
484
485 puts 'RESPONSE5c: ' + msg.inspect
486 write_to_stream msg
487 end
488
489 presence :probe? do |p|
490 puts 'PRESENCE2: ' + p.inspect
491
492 caps = Blather::Stanza::Capabilities.new
493 # TODO: user a better node URI (?)
494 caps.node = 'http://catapult.sgx.soprani.ca/'
495 caps.identities = user_cap_identities
496 caps.features = user_cap_features
497
498 msg = caps.c
499 msg.to = p.from
500 msg.from = p.to.to_s + '/sgx'
501
502 puts 'RESPONSE6: ' + msg.inspect
503 write_to_stream msg
504 end
505
506 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#info' do |i|
507 # TODO: return error if i.type is :set - if it is :reply or
508 # :error it should be ignored (as the below does currently);
509 # review specification to see how to handle other type values
510 if i.type != :get
511 puts 'DISCO iq rcvd, of non-get type "' + i.type.to_s +
512 '" for message "' + i.inspect + '"; ignoring...'
513 next
514 end
515
516 # respond to capabilities request for an sgx-bwmsgsv2 number JID
517 if i.to.node
518 # TODO: confirm the node URL is expected using below
519 #puts "XR[node]: #{xpath_result[0]['node']}"
520
521 msg = i.reply
522 msg.node = i.node
523 msg.identities = user_cap_identities
524 msg.features = user_cap_features
525
526 puts 'RESPONSE7: ' + msg.inspect
527 write_to_stream msg
528 next
529 end
530
531 # respond to capabilities request for sgx-bwmsgsv2 itself
532 msg = i.reply
533 msg.node = i.node
534 msg.identities = [{
535 name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
536 type: 'sms', category: 'gateway'
537 }]
538 msg.features = @gateway_features
539 write_to_stream msg
540 end
541
542 def self.check_then_register(i, *creds)
543 jid_key = "catapult_jid-#{creds.last}"
544 bare_jid = i.from.stripped
545 cred_key = "catapult_cred-#{bare_jid}"
546
547 REDIS.get(jid_key).then { |existing_jid|
548 if existing_jid && existing_jid != bare_jid
549 # TODO: add/log text: credentials exist already
550 EMPromise.reject([:cancel, 'conflict'])
551 end
552 }.then {
553 REDIS.lrange(cred_key, 0, 3)
554 }.then { |existing_creds|
555 # TODO: add/log text: credentials exist already
556 if existing_creds.length == 4 && creds != existing_creds
557 EMPromise.reject([:cancel, 'conflict'])
558 elsif existing_creds.length < 4
559 REDIS.rpush(cred_key, *creds).then { |length|
560 if length != 4
561 EMPromise.reject([
562 :cancel,
563 'internal-server-error'
564 ])
565 end
566 }
567 end
568 }.then {
569 # not necessary if existing_jid non-nil, easier this way
570 REDIS.set(jid_key, bare_jid)
571 }.then { |result|
572 if result != 'OK'
573 # TODO: add txt re push failure
574 EMPromise.reject(
575 [:cancel, 'internal-server-error']
576 )
577 end
578 }.then {
579 write_to_stream i.reply
580 }
581 end
582
583 def self.creds_from_registration_query(qn)
584 xn = qn.children.find { |v| v.element_name == "x" }
585
586 if xn
587 xn.children.each_with_object({}) do |field, h|
588 next if field.element_name != "field"
589 val = field.children.find { |v|
590 v.element_name == "value"
591 }
592
593 case field['var']
594 when 'nick'
595 h[:user_id] = val.text
596 when 'username'
597 h[:api_token] = val.text
598 when 'password'
599 h[:api_secret] = val.text
600 when 'phone'
601 h[:phone_num] = val.text
602 else
603 # TODO: error
604 puts "?: #{field['var']}"
605 end
606 end
607 else
608 qn.children.each_with_object({}) do |field, h|
609 case field.element_name
610 when "nick"
611 h[:user_id] = field.text
612 when "username"
613 h[:api_token] = field.text
614 when "password"
615 h[:api_secret] = field.text
616 when "phone"
617 h[:phone_num] = field.text
618 end
619 end
620 end.values_at(:user_id, :api_token, :api_secret, :phone_num)
621 end
622
623 def self.process_registration(i, qn)
624 EMPromise.resolve(
625 qn.children.find { |v| v.element_name == "remove" }
626 ).then { |rn|
627 if rn
628 puts "received <remove/> - ignoring for now..."
629 EMPromise.reject(:done)
630 else
631 creds_from_registration_query(qn)
632 end
633 }.then { |user_id, api_token, api_secret, phone_num|
634 if phone_num[0] == '+'
635 [user_id, api_token, api_secret, phone_num]
636 else
637 # TODO: add text re number not (yet) supported
638 EMPromise.reject([:cancel, 'item-not-found'])
639 end
640 }.then { |user_id, api_token, api_secret, phone_num|
641 # TODO: find way to verify #{phone_num}, too
642 call_catapult(
643 api_token,
644 api_secret,
645 :get,
646 "api/v2/users/#{user_id}/media"
647 ).then { |response|
648 JSON.parse(response)
649 # TODO: confirm response is array - could be empty
650
651 puts "register got str #{response.to_s[0..999]}"
652
653 check_then_register(
654 i,
655 user_id,
656 api_token,
657 api_secret,
658 phone_num
659 )
660 }
661 }.catch { |e|
662 EMPromise.reject(case e
663 when 401
664 # TODO: add text re bad credentials
665 [:auth, 'not-authorized']
666 when 404
667 # TODO: add text re number not found or disabled
668 [:cancel, 'item-not-found']
669 when Integer
670 [:modify, 'not-acceptable']
671 else
672 e
673 end)
674 }
675 end
676
677 def self.registration_form(orig, existing_number=nil)
678 msg = Nokogiri::XML::Node.new 'query', orig.document
679 msg['xmlns'] = 'jabber:iq:register'
680
681 if existing_number
682 msg.add_child(
683 Nokogiri::XML::Node.new(
684 'registered', msg.document
685 )
686 )
687 end
688
689 # TODO: update "User Id" x2 below (to "accountId"?), and others?
690 n1 = Nokogiri::XML::Node.new(
691 'instructions', msg.document
692 )
693 n1.content = "Enter the information from your Account "\
694 "page as well as the Phone Number\nin your "\
695 "account you want to use (ie. '+12345678901')"\
696 ".\nUser Id is nick, API Token is username, "\
697 "API Secret is password, Phone Number is phone"\
698 ".\n\nThe source code for this gateway is at "\
699 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
700 "\nCopyright (C) 2017-2020 Denver Gingerich "\
701 "and others, licensed under AGPLv3+."
702 n2 = Nokogiri::XML::Node.new 'nick', msg.document
703 n3 = Nokogiri::XML::Node.new 'username', msg.document
704 n4 = Nokogiri::XML::Node.new 'password', msg.document
705 n5 = Nokogiri::XML::Node.new 'phone', msg.document
706 n5.content = existing_number.to_s
707 msg.add_child(n1)
708 msg.add_child(n2)
709 msg.add_child(n3)
710 msg.add_child(n4)
711 msg.add_child(n5)
712
713 x = Blather::Stanza::X.new :form, [
714 {
715 required: true, type: :"text-single",
716 label: 'User Id', var: 'nick'
717 },
718 {
719 required: true, type: :"text-single",
720 label: 'API Token', var: 'username'
721 },
722 {
723 required: true, type: :"text-private",
724 label: 'API Secret', var: 'password'
725 },
726 {
727 required: true, type: :"text-single",
728 label: 'Phone Number', var: 'phone',
729 value: existing_number.to_s
730 }
731 ]
732 x.title = 'Register for '\
733 'Soprani.ca Gateway to XMPP - Bandwidth API V2'
734 x.instructions = "Enter the details from your Account "\
735 "page as well as the Phone Number\nin your "\
736 "account you want to use (ie. '+12345678901')"\
737 ".\n\nThe source code for this gateway is at "\
738 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
739 "\nCopyright (C) 2017-2020 Denver Gingerich "\
740 "and others, licensed under AGPLv3+."
741 msg.add_child(x)
742
743 orig.add_child(msg)
744
745 return orig
746 end
747
748 iq '/iq/ns:query', ns: 'jabber:iq:register' do |i, qn|
749 puts "IQ: #{i.inspect}"
750
751 case i.type
752 when :set
753 process_registration(i, qn)
754 when :get
755 bare_jid = i.from.stripped
756 cred_key = "catapult_cred-#{bare_jid}"
757 REDIS.lindex(cred_key, 3).then { |existing_number|
758 reply = registration_form(i.reply, existing_number)
759 puts "RESPONSE2: #{reply.inspect}"
760 write_to_stream reply
761 }
762 else
763 # Unknown IQ, ignore for now
764 EMPromise.reject(:done)
765 end.catch { |e|
766 if e.is_a?(Array) && e.length == 2
767 write_to_stream error_msg(i.reply, qn, *e)
768 elsif e != :done
769 EMPromise.reject(e)
770 end
771 }.catch(&method(:panic))
772 end
773
774 iq type: [:get, :set] do |iq|
775 write_to_stream(Blather::StanzaError.new(
776 iq,
777 'feature-not-implemented',
778 :cancel
779 ))
780 end
781end
782
783class ReceiptMessage < Blather::Stanza
784 def self.new(to=nil)
785 node = super :message
786 node.to = to
787 node
788 end
789end
790
791class WebhookHandler < Goliath::API
792 use Goliath::Rack::Params
793
794 def response(env)
795 # TODO: add timestamp grab here, and MUST include ./tai version
796
797 puts 'ENV: ' + env.reject { |k| k == 'params' }.to_s
798
799 if params.empty?
800 puts 'PARAMS empty!'
801 return [200, {}, "OK"]
802 end
803
804 if env['REQUEST_URI'] != '/'
805 puts 'BADREQUEST1: non-/ request "' +
806 env['REQUEST_URI'] + '", method "' +
807 env['REQUEST_METHOD'] + '"'
808 return [200, {}, "OK"]
809 end
810
811 if env['REQUEST_METHOD'] != 'POST'
812 puts 'BADREQUEST2: non-POST request; URI: "' +
813 env['REQUEST_URI'] + '", method "' +
814 env['REQUEST_METHOD'] + '"'
815 return [200, {}, "OK"]
816 end
817
818 # TODO: process each message in list, not just first one
819 jparams = params['_json'][0]['message']
820
821 type = params['_json'][0]['type']
822
823 users_num = ''
824 others_num = ''
825 if jparams['direction'] == 'in'
826 users_num = jparams['owner']
827 others_num = jparams['from']
828 elsif jparams['direction'] == 'out'
829 users_num = jparams['from']
830 others_num = jparams['owner']
831 else
832 # TODO: exception or similar
833 puts "big prob: '" + jparams['direction'] + "'" + body
834 return [200, {}, "OK"]
835 end
836
837 puts 'BODY - messageId: ' + jparams['id'] +
838 ', eventType: ' + type +
839 ', time: ' + jparams['time'] +
840 ', direction: ' + jparams['direction'] +
841 #', state: ' + jparams['state'] +
842 ', deliveryState: ' + (jparams['deliveryState'] ?
843 jparams['deliveryState'] : 'NONE') +
844 ', errorCode: ' + (jparams['errorCode'] ?
845 jparams['errorCode'] : 'NONE') +
846 ', description: ' + (jparams['description'] ?
847 jparams['description'] : 'NONE') +
848 ', tag: ' + (jparams['tag'] ? jparams['tag'] : 'NONE') +
849 ', media: ' + (jparams['media'] ?
850 jparams['media'].to_s : 'NONE')
851
852 if others_num[0] != '+'
853 # TODO: check that others_num actually a shortcode first
854 others_num +=
855 ';phone-context=ca-us.phone-context.soprani.ca'
856 end
857
858 jid_key = "catapult_jid-#{users_num}"
859 bare_jid = REDIS.get(jid_key).promise.sync
860
861 if !bare_jid
862 puts "jid_key (#{jid_key}) DNE; BW API misconfigured?"
863
864 # TODO: likely not appropriate; give error to BW API?
865 # TODO: add text re credentials not being registered
866 #write_to_stream error_msg(m.reply, m.body, :auth,
867 # 'registration-required')
868 return [200, {}, "OK"]
869 end
870
871 msg = nil
872 case jparams['direction']
873 when 'in'
874 text = ''
875 case type
876 when 'sms'
877 text = jparams['text']
878 when 'mms'
879 has_media = false
880
881 if jparams['text'].empty?
882 if not has_media
883 text = '[suspected group msg '\
884 'with no text (odd)]'
885 end
886 else
887 text = if has_media
888 # TODO: write/use a caption XEP
889 jparams['text']
890 else
891 '[suspected group msg '\
892 '(recipient list not '\
893 'available) with '\
894 'following text] ' +
895 jparams['text']
896 end
897 end
898
899 # ie. if text param non-empty or had no media
900 if not text.empty?
901 msg = Blather::Stanza::Message.new(
902 bare_jid, text)
903 msg.from = others_num + '@' + ARGV[0]
904 SGXbwmsgsv2.write(msg)
905 end
906
907 return [200, {}, "OK"]
908 when 'message-received'
909 # TODO: handle group chat, and fix above
910 text = jparams['text']
911
912 if jparams['to'].length > 1
913 msg = Blather::Stanza::Message.new(
914 Blather::JID.new(bare_jid).domain,
915 text
916 )
917
918 addrs = Nokogiri::XML::Node.new(
919 'addresses', msg.document)
920 addrs['xmlns'] = 'http://jabber.org/' \
921 'protocol/address'
922
923 addr1 = Nokogiri::XML::Node.new(
924 'address', msg.document)
925 addr1['type'] = 'to'
926 addr1['jid'] = bare_jid
927 addrs.add_child(addr1)
928
929 jparams['to'].each do |receiver|
930 if receiver == users_num
931 # already there in addr1
932 next
933 end
934
935 addrn = Nokogiri::XML::Node.new(
936 'address', msg.document)
937 addrn['type'] = 'to'
938 addrn['uri'] = "sms:#{receiver}"
939 addrn['delivered'] = 'true'
940 addrs.add_child(addrn)
941 end
942
943 msg.add_child(addrs)
944
945 # TODO: delete
946 puts "RESPONSE9: #{msg.inspect}"
947 end
948
949 Array(jparams['media']).each do |media_url|
950 unless media_url.end_with?(
951 '.smil', '.txt', '.xml'
952 )
953 has_media = true
954 SGXbwmsgsv2.send_media(
955 others_num + '@' +
956 ARGV[0],
957 bare_jid, media_url,
958 nil, nil, msg
959 )
960 end
961 end
962 else
963 text = "unknown type (#{type})"\
964 " with text: " + jparams['text']
965
966 # TODO: log/notify of this properly
967 puts text
968 end
969
970 if not msg
971 msg = Blather::Stanza::Message.new(bare_jid,
972 text)
973 end
974 else # per prior switch, this is: jparams['direction'] == 'out'
975 tag_parts = jparams['tag'].split(/ /, 2)
976 id = WEBrick::HTTPUtils.unescape(tag_parts[0])
977 resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
978
979 # TODO: remove this hack
980 if jparams['to'].length > 1
981 puts "WARN! group no rcpt: #{users_num}"
982 return [200, {}, "OK"]
983 end
984
985 case type
986 when 'message-failed'
987 # create a bare message like the one user sent
988 msg = Blather::Stanza::Message.new(
989 others_num + '@' + ARGV[0])
990 msg.from = bare_jid + '/' + resourcepart
991 msg['id'] = id
992
993 # TODO: add 'errorCode' and/or 'description' val
994 # create an error reply to the bare message
995 msg = msg.as_error(
996 'recipient-unavailable',
997 :wait,
998 jparams['description']
999 )
1000
1001 # TODO: make prettier: this should be done above
1002 others_num = params['_json'][0]['to']
1003 when 'message-delivered'
1004
1005 msg = ReceiptMessage.new(bare_jid)
1006
1007 # TODO: put in member/instance variable
1008 msg['id'] = SecureRandom.uuid
1009
1010 # TODO: send only when requested per XEP-0184
1011 rcvd = Nokogiri::XML::Node.new(
1012 'received',
1013 msg.document
1014 )
1015 rcvd['xmlns'] = 'urn:xmpp:receipts'
1016 rcvd['id'] = id
1017 msg.add_child(rcvd)
1018
1019 # TODO: make prettier: this should be done above
1020 others_num = params['_json'][0]['to']
1021 else
1022 # TODO: notify somehow of unknown state receivd?
1023 puts "message with id #{id} has "\
1024 "other type #{type}"
1025 return [200, {}, "OK"]
1026 end
1027
1028 puts "RESPONSE4: #{msg.inspect}"
1029 end
1030
1031 msg.from = others_num + '@' + ARGV[0]
1032 SGXbwmsgsv2.write(msg)
1033
1034 [200, {}, "OK"]
1035 rescue Exception => e
1036 puts 'Shutting down gateway due to exception 013: ' + e.message
1037 SGXbwmsgsv2.shutdown
1038 puts 'Gateway has terminated.'
1039 EM.stop
1040 end
1041end
1042
1043at_exit do
1044 $stdout.sync = true
1045
1046 puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\
1047 "==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
1048
1049 if ARGV.size != 7 and ARGV.size != 8
1050 puts "Usage: sgx-bwmsgsv2.rb <component_jid> "\
1051 "<component_password> <server_hostname> "\
1052 "<server_port> <application_id> "\
1053 "<http_listen_port> <mms_proxy_prefix_url> [V1_creds_file]"
1054 exit 0
1055 end
1056
1057 t = Time.now
1058 puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
1059
1060 EM.run do
1061 REDIS = EM::Hiredis.connect
1062
1063 SGXbwmsgsv2.run
1064
1065 # required when using Prosody otherwise disconnects on 6-hour inactivity
1066 EM.add_periodic_timer(3600) do
1067 msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1068 msg.from = ARGV[0]
1069 SGXbwmsgsv2.write(msg)
1070 end
1071
1072 server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
1073 server.api = WebhookHandler.new
1074 server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1075 server.logger = Log4r::Logger.new('goliath')
1076 server.logger.add(Log4r::StdoutOutputter.new('console'))
1077 server.logger.level = Log4r::INFO
1078 server.start do
1079 ["INT", "TERM"].each do |sig|
1080 trap(sig) do
1081 EM.defer do
1082 puts 'Shutting down gateway...'
1083 SGXbwmsgsv2.shutdown
1084
1085 puts 'Gateway has terminated.'
1086 EM.stop
1087 end
1088 end
1089 end
1090 end
1091 end
1092end