1#!/usr/bin/env ruby
2# frozen_string_literal: true
3
4# Copyright (C) 2017-2020 Denver Gingerich <denver@ossguy.com>
5# Copyright (C) 2017 Stephen Paul Weber <singpolyma@singpolyma.net>
6#
7# This file is part of sgx-bwmsgsv2.
8#
9# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
10# the terms of the GNU Affero General Public License as published by the Free
11# Software Foundation, either version 3 of the License, or (at your option) any
12# later version.
13#
14# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
15# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17# details.
18#
19# You should have received a copy of the GNU Affero General Public License along
20# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
21
22require 'delegate'
23require 'blather/client/dsl'
24require 'em-hiredis'
25require 'em-http-request'
26require 'json'
27require 'multibases'
28require 'multihashes'
29require 'securerandom'
30require "sentry-ruby"
31require 'time'
32require 'uri'
33require 'webrick'
34
35require 'goliath/api'
36require 'goliath/server'
37require 'log4r'
38
39require 'em_promise'
40require 'em-synchrony'
41
42require_relative 'lib/bandwidth_error'
43require_relative 'lib/bandwidth_tn_options'
44require_relative 'lib/message_event'
45require_relative 'lib/registration_repo'
46
47Sentry.init
48
49# List of supported MIME types from Bandwidth - https://support.bandwidth.com/hc/en-us/articles/360014128994-What-MMS-file-types-are-supported-
50MMS_MIME_TYPES = [
51 "application/json",
52 "application/ogg",
53 "application/pdf",
54 "application/rtf",
55 "application/zip",
56 "application/x-tar",
57 "application/xml",
58 "application/gzip",
59 "application/x-bzip2",
60 "application/x-gzip",
61 "application/smil",
62 "application/javascript",
63 "audio/mp4",
64 "audio/mpeg",
65 "audio/ogg",
66 "audio/flac",
67 "audio/webm",
68 "audio/wav",
69 "audio/amr",
70 "audio/3gpp",
71 "image/bmp",
72 "image/gif",
73 "image/jpeg",
74 "image/pjpeg",
75 "image/png",
76 "image/svg+xml",
77 "image/tiff",
78 "image/webp",
79 "image/x-icon",
80 "text/css",
81 "text/csv",
82 "text/calendar",
83 "text/plain",
84 "text/javascript",
85 "text/vcard",
86 "text/vnd.wap.wml",
87 "text/xml",
88 "video/avi",
89 "video/mp4",
90 "video/mpeg",
91 "video/ogg",
92 "video/quicktime",
93 "video/webm",
94 "video/x-ms-wmv",
95 "video/x-flv"
96]
97
98def panic(e)
99 Sentry.capture_exception(e)
100 puts "Shutting down gateway due to exception: #{e.message}"
101 puts e.backtrace
102 SGXbwmsgsv2.shutdown
103 puts 'Gateway has terminated.'
104 EM.stop
105end
106
107EM.error_handler(&method(:panic))
108
109def extract_shortcode(dest)
110 num, context = dest.split(';', 2)
111 num if context == 'phone-context=ca-us.phone-context.soprani.ca'
112end
113
114def anonymous_tel?(dest)
115 dest.split(';', 2)[1] == 'phone-context=anonymous.phone-context.soprani.ca'
116end
117
118class SGXClient < Blather::Client
119 def handle_data(stanza)
120 promise = EMPromise.resolve(nil).then {
121 with_sentry(stanza) do |scope|
122 super
123 rescue StandardError => e
124 handle_error(scope, stanza, e)
125 end
126 }.catch { |e| panic(e) }
127 promise.sync if ENV["ENV"] == "test"
128 promise
129 end
130
131 # Override the default call_handler to syncify during testing.
132 def call_handler(handler, guards, stanza)
133 result = if guards.first.respond_to?(:to_str)
134 found = stanza.find(*guards)
135 throw :pass if found.empty?
136
137 handler.call(stanza, found)
138 else
139 throw :pass if guarded?(guards, stanza)
140
141 handler.call(stanza)
142 end
143
144 # Up to here, identical to upstream impl
145
146 return result unless result.is_a?(Promise)
147
148 result.sync if ENV["ENV"] == "test"
149 result
150 end
151
152protected
153
154 def with_sentry(stanza)
155 Sentry.clone_hub_to_current_thread
156
157 Sentry.with_scope do |scope|
158 setup_scope(stanza, scope)
159 yield scope
160 ensure
161 scope.get_transaction&.then do |tx|
162 tx.set_status("ok") unless tx.status
163 tx.finish
164 end
165 end
166 end
167
168 def setup_scope(stanza, scope)
169 name = stanza.respond_to?(:node) ? stanza.node : stanza.name
170 scope.clear_breadcrumbs
171 scope.set_transaction_name(name)
172 scope.set_user(jid: stanza.from&.stripped.to_s)
173
174 transaction = Sentry.start_transaction(
175 name: name,
176 op: "blather.handle_data"
177 )
178 scope.set_span(transaction) if transaction
179 end
180
181 def handle_error(scope, stanza, e)
182 puts "Error raised during #{scope.transaction_name}: #{e.class}"
183 puts e.message
184 puts e.backtrace
185 Sentry.capture_exception(e) unless e.is_a?(Sentry::Error)
186 scope.get_transaction&.set_status("internal_error")
187 return if e.respond_to?(:replied?) && e.replied?
188
189 SGXbwmsgsv2.write_to_stream stanza.as_error("internal-server-error", :cancel)
190 end
191end
192
193# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
194module CatapultSettingFlagBits
195 VOICEMAIL_TRANSCRIPTION_DISABLED = 0
196 MMS_ON_OOB_URL = 1
197end
198
199module SGXbwmsgsv2
200 extend Blather::DSL
201
202 @registration_repo = RegistrationRepo.new
203 @client = SGXClient.new
204 @gateway_features = [
205 "http://jabber.org/protocol/disco#info",
206 "http://jabber.org/protocol/address/",
207 "jabber:iq:register",
208 "http://jabber.org/protocol/commands"
209 ]
210
211 def self.run
212 # TODO: read/save ARGV[7] creds to local variables
213 client.run
214 end
215
216 # so classes outside this module can write messages, too
217 def self.write(stanza)
218 client.write(stanza)
219 end
220
221 def self.before_handler(type, *guards, &block)
222 client.register_handler_before(type, *guards, &block)
223 end
224
225 def self.send_media(from, to, media_url, desc=nil, subject=nil, m=nil)
226 # we assume media_url is one of these (always the case so far):
227 # https://messaging.bandwidth.com/api/v2/users/[u]/media/[path]
228
229 puts 'ORIG_URL: ' + media_url
230 usr = to
231 if media_url.start_with?('https://messaging.bandwidth.com/api/v2/users/')
232 pth = media_url.split('/', 9)[8]
233 # the caller must guarantee that 'to' is a bare JID
234 media_url = ARGV[6] + WEBrick::HTTPUtils.escape(usr) + '/' + pth
235 puts 'PROX_URL: ' + media_url
236 end
237
238 msg = m ? m.copy : Blather::Stanza::Message.new(to, "")
239 msg.from = from
240 msg.subject = subject if subject
241
242 # provide URL in XEP-0066 (OOB) fashion
243 x = Nokogiri::XML::Node.new 'x', msg.document
244 x['xmlns'] = 'jabber:x:oob'
245
246 urln = Nokogiri::XML::Node.new 'url', msg.document
247 urlc = Nokogiri::XML::Text.new media_url, msg.document
248 urln.add_child(urlc)
249 x.add_child(urln)
250
251 if desc
252 descn = Nokogiri::XML::Node.new('desc', msg.document)
253 descc = Nokogiri::XML::Text.new(desc, msg.document)
254 descn.add_child(descc)
255 x.add_child(descn)
256 end
257
258 msg.add_child(x)
259
260 write(msg)
261 rescue Exception => e
262 panic(e)
263 end
264
265 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, async: true
266
267 def self.pass_on_message(m, users_num, jid)
268 # Capture destination before modifying m.to
269 dest_num = m.to.node
270
271 # setup delivery receipt; similar to a reply
272 rcpt = ReceiptMessage.new(m.from.stripped)
273 rcpt.from = m.to
274
275 # pass original message (before sending receipt)
276 m.to = jid
277 m.from = "#{users_num}@#{ARGV[0]}"
278
279 puts 'XRESPONSE0: ' + m.inspect
280 write_to_stream m
281
282 # Emit pass-through event. Thru events don't capture a timestamp because XMPP
283 # stanzas don't carry timestamps for realtime messages, and the Redis stream
284 # ID provides the emit time.
285 oob_url = m.at("oob|x > oob|url", oob: "jabber:x:oob")&.text
286 MessageEvent::Thru.new(
287 owner: users_num,
288 from: users_num,
289 to: [dest_num],
290 stanza_id: m.id.to_s,
291 body: m.body.to_s,
292 media_urls: [oob_url].compact
293 ).emit(REDIS)
294
295 # send a delivery receipt back to the sender
296 # TODO: send only when requested per XEP-0184
297 # TODO: pass receipts from target if supported
298
299 # TODO: put in member/instance variable
300 rcpt['id'] = SecureRandom.uuid
301 rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
302 rcvd['xmlns'] = 'urn:xmpp:receipts'
303 rcvd['id'] = m.id
304 rcpt.add_child(rcvd)
305
306 puts 'XRESPONSE1: ' + rcpt.inspect
307 write_to_stream rcpt
308 end
309
310 def self.call_catapult(
311 token, secret, m, pth, body=nil,
312 head={}, code=[200], respond_with=:body
313 )
314 # pth looks like one of:
315 # "api/v2/users/#{user_id}/[endpoint_name]"
316
317 url_prefix = ''
318
319 # TODO: need to make a separate thing for voice.bw.c eventually
320 if pth.start_with? 'api/v2/users'
321 url_prefix = 'https://messaging.bandwidth.com/'
322 end
323
324 EM::HttpRequest.new(
325 url_prefix + pth
326 ).public_send(
327 "a#{m}",
328 head: {
329 'Authorization' => [token, secret]
330 }.merge(head),
331 body: body
332 ).then { |http|
333 puts "API response to send: #{http.response} with code"\
334 " response.code #{http.response_header.status}"
335
336 if code.include?(http.response_header.status)
337 case respond_with
338 when :body
339 http.response
340 when :headers
341 http.response_header
342 else
343 http
344 end
345 else
346 EMPromise.reject(
347 BandwidthError.for(http.response_header.status, http.response)
348 )
349 end
350 }
351 end
352
353 def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
354 usern)
355 un = s.at("oob|x > oob|url", oob: "jabber:x:oob")
356 unless un
357 puts "MMSOOB: no url node found so process as normal"
358 return to_catapult(s, nil, num_dest, user_id, token,
359 secret, usern)
360 end
361 puts "MMSOOB: found a url node - checking if to make MMS..."
362
363 body = s.respond_to?(:body) ? s.body : ''
364 EM::HttpRequest.new(un.text, tls: { verify_peer: true }).ahead.then { |http|
365 # If content is too large, or MIME type is not supported, place the link inside the body and do not send MMS.
366 if http.response_header["CONTENT_LENGTH"].to_i > 3500000 ||
367 !MMS_MIME_TYPES.include?(http.response_header["CONTENT_TYPE"])
368 unless body.include?(un.text)
369 s.body = body.empty? ? un.text : "#{body}\n#{un.text}"
370 end
371 to_catapult(s, nil, num_dest, user_id, token, secret, usern)
372 else # If size is less than ~3.5MB, strip the link from the body and attach media in the body.
373 # some clients send URI in both body & <url/> so delete
374 s.body = body.sub(/\s*#{Regexp.escape(un.text)}\s*$/, '')
375
376 puts "MMSOOB: url text is '#{un.text}'"
377 puts "MMSOOB: the body is '#{body.to_s.strip}'"
378
379 puts "MMSOOB: sending MMS since found OOB & user asked"
380 to_catapult(s, un.text, num_dest, user_id, token, secret, usern)
381 end
382 }
383 end
384
385 def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
386 body = s.respond_to?(:body) ? s.body.to_s : ''
387 if murl.to_s.empty? && body.strip.empty?
388 return EMPromise.reject(
389 [:modify, 'policy-violation']
390 )
391 end
392
393 segment_size = body.ascii_only? ? 160 : 70
394 if !murl && ENV["MMS_PATH"] && body.length > segment_size*3
395 file = Multibases.pack(
396 'base58btc',
397 Multihashes.encode(Digest::SHA256.digest(body), "sha2-256")
398 ).to_s
399 File.open("#{ENV['MMS_PATH']}/#{file}", "w") { |fh| fh.write body }
400 murl = "#{ENV['MMS_URL']}/#{file}.txt"
401 body = ""
402 end
403
404 extra = {}
405 extra[:media] = murl if murl
406
407 call_catapult(
408 token,
409 secret,
410 :post,
411 "api/v2/users/#{user_id}/messages",
412 JSON.dump(extra.merge(
413 from: usern,
414 to: num_dest,
415 text: body,
416 applicationId: ARGV[4],
417 tag:
418 # callbacks need id and resourcepart
419 WEBrick::HTTPUtils.escape(s.id.to_s) +
420 ' ' +
421 WEBrick::HTTPUtils.escape(
422 s.from.resource.to_s
423 )
424 )),
425 {'Content-Type' => 'application/json'},
426 [201]
427 ).then { |response|
428 parsed = JSON.parse(response) rescue {}
429 MessageEvent::Out.new(
430 timestamp: parsed["time"] || Time.now,
431 owner: usern,
432 from: usern,
433 to: Array(num_dest),
434 stanza_id: s.id.to_s,
435 bandwidth_id: parsed["id"],
436 body: body,
437 media_urls: [murl].compact
438 ).emit(REDIS)
439 response
440 }.catch { |e|
441 EMPromise.reject(
442 [:cancel, 'internal-server-error', e.message]
443 )
444 }
445 end
446
447 def self.validate_num(m)
448 # if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
449 if m.to == ARGV[0]
450 an = m.children.find { |v| v.element_name == "addresses" }
451 if not an
452 return EMPromise.reject(
453 [:cancel, 'item-not-found']
454 )
455 end
456 puts "ADRXEP: found an addresses node - iterate addrs.."
457
458 nums = []
459 an.children.each do |e|
460 num = ''
461 type = ''
462 e.attributes.each do |c|
463 if c[0] == 'type'
464 if c[1] != 'to'
465 # TODO: error
466 end
467 type = c[1].to_s
468 elsif c[0] == 'uri'
469 if !c[1].to_s.start_with? 'sms:'
470 # TODO: error
471 end
472 num = c[1].to_s[4..-1]
473 # TODO: confirm num validates
474 # TODO: else, error - unexpected name
475 end
476 end
477 if num.empty? or type.empty?
478 # TODO: error
479 end
480 nums << num
481 end
482 return nums
483 end
484
485 # if not sent to SGX domain, then assume destination is in 'to'
486 EMPromise.resolve(m.to.node.to_s).then { |num_dest|
487 if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
488 next num_dest if num_dest[0] == '+'
489
490 shortcode = extract_shortcode(num_dest)
491 next shortcode if shortcode
492 end
493
494 if anonymous_tel?(num_dest)
495 EMPromise.reject([:cancel, 'gone'])
496 else
497 # TODO: text re num not (yet) supportd/implmentd
498 EMPromise.reject([:cancel, 'item-not-found'])
499 end
500 }
501 end
502
503 def self.fetch_catapult_cred_for(jid)
504 @registration_repo.find(jid).then { |creds|
505 if creds.length < 4
506 # TODO: add text re credentials not registered
507 EMPromise.reject(
508 [:auth, 'registration-required']
509 )
510 else
511 creds
512 end
513 }
514 end
515
516 message :error? do |m|
517 # TODO: report it somewhere/somehow - eat for now so no err loop
518 puts "EATERROR1: #{m.inspect}"
519 true
520 end
521
522 message :body do |m|
523 EMPromise.all([
524 validate_num(m),
525 fetch_catapult_cred_for(m.from)
526 ]).then { |(num_dest, creds)|
527 @registration_repo.find_jid(num_dest).then { |jid|
528 [jid, num_dest] + creds
529 }
530 }.then { |(jid, num_dest, *creds)|
531 if jid
532 @registration_repo.find(jid).then { |other_user|
533 [jid, num_dest] + creds + [other_user.first]
534 }
535 else
536 [jid, num_dest] + creds + [nil]
537 end
538 }.then { |(jid, num_dest, *creds, other_user)|
539 # if destination user is in the system pass on directly
540 if other_user and not other_user.start_with? 'u-'
541 pass_on_message(m, creds.last, jid)
542 else
543 to_catapult_possible_oob(m, num_dest, *creds)
544 end
545 }.catch { |e|
546 if e.is_a?(Array) && (e.length == 2 || e.length == 3)
547 write_to_stream m.as_error(e[1], e[0], e[2])
548 else
549 EMPromise.reject(e)
550 end
551 }
552 end
553
554 def self.user_cap_identities
555 [{category: 'client', type: 'sms'}]
556 end
557
558 # TODO: must re-add stuff so can do ad-hoc commands
559 def self.user_cap_features
560 ["urn:xmpp:receipts"]
561 end
562
563 def self.add_gateway_feature(feature)
564 @gateway_features << feature
565 @gateway_features.uniq!
566 end
567
568 subscription :request? do |p|
569 puts "PRESENCE1: #{p.inspect}"
570
571 # subscriptions are allowed from anyone - send reply immediately
572 msg = Blather::Stanza::Presence.new
573 msg.to = p.from
574 msg.from = p.to
575 msg.type = :subscribed
576
577 puts 'RESPONSE5a: ' + msg.inspect
578 write_to_stream msg
579
580 # send a <presence> immediately; not automatically probed for it
581 # TODO: refactor so no "presence :probe? do |p|" duplicate below
582 caps = Blather::Stanza::Capabilities.new
583 # TODO: user a better node URI (?)
584 caps.node = 'http://catapult.sgx.soprani.ca/'
585 caps.identities = user_cap_identities
586 caps.features = user_cap_features
587
588 msg = caps.c
589 msg.to = p.from
590 msg.from = p.to.to_s + '/sgx'
591
592 puts 'RESPONSE5b: ' + msg.inspect
593 write_to_stream msg
594
595 # need to subscribe back so Conversations displays images inline
596 msg = Blather::Stanza::Presence.new
597 msg.to = p.from.to_s.split('/', 2)[0]
598 msg.from = p.to.to_s.split('/', 2)[0]
599 msg.type = :subscribe
600
601 puts 'RESPONSE5c: ' + msg.inspect
602 write_to_stream msg
603 end
604
605 presence :probe? do |p|
606 puts 'PRESENCE2: ' + p.inspect
607
608 caps = Blather::Stanza::Capabilities.new
609 # TODO: user a better node URI (?)
610 caps.node = 'http://catapult.sgx.soprani.ca/'
611 caps.identities = user_cap_identities
612 caps.features = user_cap_features
613
614 msg = caps.c
615 msg.to = p.from
616 msg.from = p.to.to_s + '/sgx'
617
618 puts 'RESPONSE6: ' + msg.inspect
619 write_to_stream msg
620 end
621
622 disco_items(
623 to: Blather::JID.new(ARGV[0]),
624 node: "http://jabber.org/protocol/commands"
625 ) do |i|
626 fetch_catapult_cred_for(i.from).then { |creds|
627 BandwidthTNOptions.tn_eligible_for_port_out_pin?(creds).then { |eligible|
628 reply = i.reply
629 reply.node = 'http://jabber.org/protocol/commands'
630
631 if eligible
632 reply.items = [
633 Blather::Stanza::DiscoItems::Item.new(
634 i.to,
635 'set-port-out-pin',
636 'Set Port-Out PIN'
637 )
638 ]
639 else
640 reply.items = []
641 end
642
643 puts 'RESPONSE_CMD_DISCO: ' + reply.inspect
644 write_to_stream reply
645 }
646 }.catch { |e|
647 if e.is_a?(Array) && [2, 3].include?(e.length)
648 write_to_stream i.as_error(e[1], e[0], e[2])
649 else
650 EMPromise.reject(e)
651 end
652 }
653 end
654
655 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#info' do |i|
656 # TODO: return error if i.type is :set - if it is :reply or
657 # :error it should be ignored (as the below does currently);
658 # review specification to see how to handle other type values
659 if i.type != :get
660 puts 'DISCO iq rcvd, of non-get type "' + i.type.to_s +
661 '" for message "' + i.inspect + '"; ignoring...'
662 next
663 end
664
665 # respond to capabilities request for an sgx-bwmsgsv2 number JID
666 if i.to.node
667 # TODO: confirm the node URL is expected using below
668 #puts "XR[node]: #{xpath_result[0]['node']}"
669
670 msg = i.reply
671 msg.node = i.node
672 msg.identities = user_cap_identities
673 msg.features = user_cap_features
674
675 puts 'RESPONSE7: ' + msg.inspect
676 write_to_stream msg
677 next
678 end
679
680 # respond to capabilities request for sgx-bwmsgsv2 itself
681 msg = i.reply
682 msg.node = i.node
683 msg.identities = [{
684 name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
685 type: 'sms', category: 'gateway'
686 }]
687 msg.features = @gateway_features
688 write_to_stream msg
689 end
690
691 def self.check_then_register(i, *creds)
692 @registration_repo
693 .put(i.from, *creds)
694 .catch_only(RegistrationRepo::Conflict) { |e|
695 EMPromise.reject([:cancel, 'conflict', e.message])
696 }.then {
697 write_to_stream i.reply
698 }
699 end
700
701 def self.creds_from_registration_query(i)
702 if i.query.find_first("./ns:x", ns: "jabber:x:data")
703 [
704 i.form.field("nick")&.value,
705 i.form.field("username")&.value,
706 i.form.field("password")&.value,
707 i.form.field("phone")&.value
708 ]
709 else
710 [i.nick, i.username, i.password, i.phone]
711 end
712 end
713
714 def self.process_registration(i)
715 EMPromise.resolve(nil).then {
716 if i.remove?
717 @registration_repo.delete(i.from).then do
718 write_to_stream i.reply
719 EMPromise.reject(:done)
720 end
721 else
722 creds_from_registration_query(i)
723 end
724 }.then { |user_id, api_token, api_secret, phone_num|
725 if phone_num && phone_num[0] == '+'
726 [user_id, api_token, api_secret, phone_num]
727 else
728 # TODO: add text re number not (yet) supported
729 EMPromise.reject([:cancel, 'item-not-found'])
730 end
731 }.then { |user_id, api_token, api_secret, phone_num|
732 # TODO: find way to verify #{phone_num}, too
733 call_catapult(
734 api_token,
735 api_secret,
736 :get,
737 "api/v2/users/#{user_id}/media"
738 ).then { |response|
739 JSON.parse(response)
740 # TODO: confirm response is array - could be empty
741
742 puts "register got str #{response.to_s[0..999]}"
743
744 check_then_register(
745 i,
746 user_id,
747 api_token,
748 api_secret,
749 phone_num
750 )
751 }
752 }.catch_only(BandwidthError) { |e|
753 EMPromise.reject(case e.code
754 when 401
755 # TODO: add text re bad credentials
756 [:auth, 'not-authorized']
757 when 404
758 # TODO: add text re number not found or disabled
759 [:cancel, 'item-not-found']
760 else
761 [:modify, 'not-acceptable']
762 end)
763 }
764 end
765
766 def self.registration_form(orig, existing_number=nil)
767 orig.registered = !!existing_number
768
769 # TODO: update "User Id" x2 below (to "accountId"?), and others?
770 orig.instructions = "Enter the information from your Account "\
771 "page as well as the Phone Number\nin your "\
772 "account you want to use (ie. '+12345678901')"\
773 ".\nUser Id is nick, API Token is username, "\
774 "API Secret is password, Phone Number is phone"\
775 ".\n\nThe source code for this gateway is at "\
776 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
777 "\nCopyright (C) 2017-2020 Denver Gingerich "\
778 "and others, licensed under AGPLv3+."
779 orig.nick = ""
780 orig.username = ""
781 orig.password = ""
782 orig.phone = existing_number.to_s
783
784 orig.form.fields = [
785 {
786 required: true, type: :"text-single",
787 label: 'User Id', var: 'nick'
788 },
789 {
790 required: true, type: :"text-single",
791 label: 'API Token', var: 'username'
792 },
793 {
794 required: true, type: :"text-private",
795 label: 'API Secret', var: 'password'
796 },
797 {
798 required: true, type: :"text-single",
799 label: 'Phone Number', var: 'phone',
800 value: existing_number.to_s
801 }
802 ]
803 orig.form.title = 'Register for '\
804 'Soprani.ca Gateway to XMPP - Bandwidth API V2'
805 orig.form.instructions = "Enter the details from your Account "\
806 "page as well as the Phone Number\nin your "\
807 "account you want to use (ie. '+12345678901')"\
808 ".\n\nThe source code for this gateway is at "\
809 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
810 "\nCopyright (C) 2017-2020 Denver Gingerich "\
811 "and others, licensed under AGPLv3+."
812
813 orig
814 end
815
816 ibr do |i|
817 puts "IQ: #{i.inspect}"
818
819 case i.type
820 when :set
821 process_registration(i)
822 when :get
823 bare_jid = i.from.stripped
824 @registration_repo.find(bare_jid).then { |creds|
825 reply = registration_form(i.reply, creds.last)
826 puts "RESPONSE2: #{reply.inspect}"
827 write_to_stream reply
828 }
829 else
830 # Unknown IQ, ignore for now
831 EMPromise.reject(:done)
832 end.catch { |e|
833 if e.is_a?(Array) && (e.length == 2 || e.length == 3)
834 write_to_stream i.as_error(e[1], e[0], e[2])
835 elsif e != :done
836 EMPromise.reject(e)
837 end
838 }.catch(&method(:panic))
839 end
840
841 command :execute?, node: "set-port-out-pin", sessionid: nil do |iq|
842 # Ensure user is registered, but discard their credentials
843 # because we're just showing them a form.
844 fetch_catapult_cred_for(iq.from).then { |_creds|
845 reply = iq.reply
846 reply.node = 'set-port-out-pin'
847 reply.sessionid = SecureRandom.uuid
848 reply.status = :executing
849
850 form = Blather::Stanza::X.find_or_create(reply.command)
851 form.type = "form"
852 form.fields = [
853 {
854 var: 'pin',
855 type: 'text-private',
856 label: 'Port-Out PIN',
857 required: true
858 },
859 {
860 var: 'confirm_pin',
861 type: 'text-private',
862 label: 'Confirm PIN',
863 required: true
864 }
865 ]
866
867 reply.command.add_child(form)
868 reply.allowed_actions = [:complete]
869
870 puts "RESPONSE_CMD_FORM: #{reply.inspect}"
871 write_to_stream reply
872 }.catch { |e|
873 if e.is_a?(Array) && [2, 3].include?(e.length)
874 write_to_stream iq.as_error(e[1], e[0], e[2])
875 else
876 EMPromise.reject(e)
877 end
878 }.catch(&method(:panic))
879 end
880
881 command :complete?, node: "set-port-out-pin", sessionid: /./ do |iq|
882 pin = iq.form.field('pin')&.value
883 confirm_pin = iq.form.field('confirm_pin')&.value
884
885 if pin.nil? || confirm_pin.nil?
886 write_to_stream iq.as_error(
887 'bad-request',
888 :modify,
889 'PIN fields are required'
890 )
891 next
892 end
893
894 if pin != confirm_pin
895 write_to_stream iq.as_error(
896 'bad-request',
897 :modify,
898 'PIN confirmation does not match'
899 )
900 next
901 end
902
903 if pin !~ /\A[a-zA-Z0-9]{4,10}\z/
904 write_to_stream iq.as_error(
905 'bad-request',
906 :modify,
907 'PIN must be 4-10 alphanumeric characters'
908 )
909 next
910 end
911
912 fetch_catapult_cred_for(iq.from).then { |creds|
913 BandwidthTNOptions.set_port_out_pin(creds, pin).then {
914 reply = iq.reply
915 reply.node = 'set-port-out-pin'
916 reply.sessionid = iq.sessionid
917 reply.status = :completed
918 reply.note_type = :info
919 reply.note_text = 'Port-out PIN has been set successfully.'
920
921 write_to_stream reply
922 }.catch { |e|
923 reply = iq.reply
924 reply.node = 'set-port-out-pin'
925 reply.sessionid = iq.sessionid
926 reply.status = :completed
927 reply.note_type = :error
928 error_msg = if e.respond_to?(:message) && e.message.include?('not valid')
929 "Invalid phone number format. "\
930 "Please check your registered phone number."
931 elsif e.respond_to?(:message) && e.message.include?('ErrorCode')
932 "Bandwidth API error: #{e.message}"
933 else
934 "Failed to set port-out PIN. Please try again later."
935 end
936 reply.note_text = error_msg
937
938 write_to_stream reply
939 }
940 }.catch { |e|
941 if e.is_a?(Array) && [2, 3].include?(e.length)
942 write_to_stream iq.as_error(e[1], e[0], e[2])
943 else
944 EMPromise.reject(e)
945 end
946 }.catch(&method(:panic))
947 end
948
949 iq type: [:get, :set] do |iq|
950 write_to_stream(Blather::StanzaError.new(
951 iq,
952 'feature-not-implemented',
953 :cancel
954 ))
955 end
956end
957
958class ReceiptMessage < Blather::Stanza
959 def self.new(to=nil)
960 node = super :message
961 node.to = to
962 node
963 end
964end
965
966class WebhookHandler < Goliath::API
967 use Sentry::Rack::CaptureExceptions
968 use Goliath::Rack::Params
969
970 def response(env)
971 @registration_repo = RegistrationRepo.new
972 # TODO: add timestamp grab here, and MUST include ./tai version
973
974 puts 'ENV: ' + env.reject { |k| k == 'params' }.to_s
975
976 if params.empty?
977 puts 'PARAMS empty!'
978 return [200, {}, "OK"]
979 end
980
981 if env['REQUEST_URI'] != '/'
982 puts 'BADREQUEST1: non-/ request "' +
983 env['REQUEST_URI'] + '", method "' +
984 env['REQUEST_METHOD'] + '"'
985 return [200, {}, "OK"]
986 end
987
988 if env['REQUEST_METHOD'] != 'POST'
989 puts 'BADREQUEST2: non-POST request; URI: "' +
990 env['REQUEST_URI'] + '", method "' +
991 env['REQUEST_METHOD'] + '"'
992 return [200, {}, "OK"]
993 end
994
995 # TODO: process each message in list, not just first one
996 jparams = params.dig('_json', 0, 'message')
997 type = params.dig('_json', 0, 'type')
998
999 return [400, {}, "Missing params\n"] unless jparams && type
1000
1001 users_num, others_num =
1002 if type == 'message-failed' # NOTE: This implies direction == 'out'
1003 [jparams['from'], params['_json'][0]['to']]
1004 elsif jparams['direction'] == 'in'
1005 [jparams['owner'], jparams['from']]
1006 elsif jparams['direction'] == 'out'
1007 [jparams['from'], jparams['owner']] # NOTE: for outbound, 'from' == 'owner'
1008 else
1009 puts "big prob: '#{jparams['direction']}'"
1010 return [400, {}, "OK"]
1011 end
1012
1013 return [400, {}, "Missing params\n"] unless users_num && others_num
1014 return [400, {}, "Missing params\n"] unless jparams['to'].is_a?(Array)
1015 return [400, {}, "Missing params\n"] if jparams['to'].empty?
1016
1017 puts "BODY - messageId: #{jparams['id']}" \
1018 ", eventType: #{type}" \
1019 ", time: #{jparams['time']}" \
1020 ", direction: #{jparams['direction']}" \
1021 ", deliveryState: #{jparams['deliveryState'] || 'NONE'}" \
1022 ", errorCode: #{jparams['errorCode'] || 'NONE'}" \
1023 ", description: #{jparams['description'] || 'NONE'}" \
1024 ", tag: #{jparams['tag'] || 'NONE'}" \
1025 ", media: #{jparams['media'] || 'NONE'}"
1026
1027 if others_num[0] != '+'
1028 # TODO: check that others_num actually a shortcode first
1029 others_num +=
1030 ';phone-context=ca-us.phone-context.soprani.ca'
1031 end
1032
1033 bare_jid = @registration_repo.find_jid(users_num).sync
1034
1035 if !bare_jid
1036 puts "jid_key for (#{users_num}) DNE; BW API misconfigured?"
1037
1038 return [403, {}, "Customer not found\n"]
1039 end
1040
1041 msg = nil
1042 case jparams['direction']
1043 when 'in'
1044 text = ''
1045 case type
1046 when 'message-received'
1047 # TODO: handle group chat, and fix above
1048 text = jparams['text']
1049
1050 if text.to_s.empty? && Array(jparams['media']).empty?
1051 return [400, {}, "Missing params\n"]
1052 end
1053
1054 if jparams['to'].length > 1
1055 msg = Blather::Stanza::Message.new(
1056 Blather::JID.new(bare_jid).domain
1057 )
1058 msg.body = text unless text&.empty?
1059
1060 addrs = Nokogiri::XML::Node.new(
1061 'addresses', msg.document)
1062 addrs['xmlns'] = 'http://jabber.org/' \
1063 'protocol/address'
1064
1065 addr1 = Nokogiri::XML::Node.new(
1066 'address', msg.document)
1067 addr1['type'] = 'to'
1068 addr1['jid'] = bare_jid
1069 addrs.add_child(addr1)
1070
1071 jparams['to'].reject(
1072 # Don't send to the same person twice,
1073 # and don't send to the person who sent it
1074 &[users_num, others_num].method(:include?)
1075 ).each do |receiver|
1076 addrn = Nokogiri::XML::Node.new(
1077 'address', msg.document)
1078 addrn['type'] = 'to'
1079 addrn['uri'] = "sms:#{receiver}"
1080 addrn['delivered'] = 'true'
1081 addrs.add_child(addrn)
1082 end
1083
1084 msg.add_child(addrs)
1085
1086 # TODO: delete
1087 puts "RESPONSE9: #{msg.inspect}"
1088 end
1089
1090 media_urls = Array(jparams['media']).map { |media_url|
1091 unless media_url.end_with?(
1092 '.smil', '.txt', '.xml'
1093 )
1094 SGXbwmsgsv2.send_media(
1095 others_num + '@' +
1096 ARGV[0],
1097 bare_jid, media_url,
1098 nil, nil, msg
1099 )
1100 media_url
1101 end
1102 }.compact
1103
1104 if text&.empty? || (media_urls.any? && jparams['to'].length > 1)
1105 if !env['HTTP_X_JMP_RESEND_OF'].to_s.empty?
1106 MessageEvent::ResendIn.new(
1107 original_stream_id: env['HTTP_X_JMP_RESEND_OF'],
1108 original_bandwidth_id: jparams['id'],
1109 owner: jparams['owner']
1110 ).emit(REDIS)
1111 else
1112 MessageEvent::In.new(
1113 timestamp: jparams['time'],
1114 from: jparams['from'],
1115 to: jparams['to'],
1116 owner: jparams['owner'],
1117 bandwidth_id: jparams['id'],
1118 body: jparams['text'].to_s,
1119 media_urls: media_urls
1120 ).emit(REDIS)
1121 end
1122
1123 return [200, {}, "OK"]
1124 end
1125 else
1126 text = "unknown type (#{type})"\
1127 " with text: " + jparams['text']
1128
1129 # TODO: log/notify of this properly
1130 puts text
1131 end
1132
1133 # If text is not empty, but there isn't a msg,
1134 # we need to construct a msg to convey that text
1135 unless msg || text.to_s.empty?
1136 msg = Blather::Stanza::Message.new(
1137 bare_jid,
1138 # Strip control codes.
1139 # This only happened, or at least caused a problem, once,
1140 # but it seems sensible to say that we shouldn't be getting
1141 # control codes from the PSTN.
1142 text.gsub(/[\u0000-\u0008\u000b-\u001f]/, "")
1143 )
1144 msg.document.encoding = "utf-8"
1145 msg.chat_state = nil
1146 end
1147 else # per prior switch, this is: jparams['direction'] == 'out'
1148 tag_parts = jparams['tag'].split(/ /, 2)
1149 id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1150 resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
1151
1152 # TODO: remove this hack
1153 if jparams['to'].length > 1
1154 case type
1155 when 'message-failed'
1156 MessageEvent::Failed.new(
1157 timestamp: jparams['time'],
1158 stanza_id: id,
1159 bandwidth_id: jparams['id'],
1160 error_code: jparams['errorCode'].to_s,
1161 error_description: jparams['description'].to_s
1162 ).emit(REDIS)
1163 when 'message-delivered'
1164 MessageEvent::Delivered.new(
1165 timestamp: jparams['time'],
1166 stanza_id: id,
1167 bandwidth_id: jparams['id']
1168 ).emit(REDIS)
1169 end
1170 puts "WARN! group no rcpt: #{users_num}"
1171 return [200, {}, "OK"]
1172 end
1173
1174 case type
1175 when 'message-failed'
1176 # create a bare message like the one user sent
1177 msg = Blather::Stanza::Message.new(
1178 others_num + '@' + ARGV[0])
1179 msg.from = bare_jid + '/' + resourcepart
1180 msg['id'] = id
1181
1182 # TODO: add 'errorCode' and/or 'description' val
1183 # create an error reply to the bare message
1184 msg = msg.as_error(
1185 'recipient-unavailable',
1186 :wait,
1187 params['_json'][0]['description']
1188 )
1189
1190 when 'message-delivered'
1191
1192 msg = ReceiptMessage.new(bare_jid)
1193
1194 # TODO: put in member/instance variable
1195 msg['id'] = SecureRandom.uuid
1196
1197 # TODO: send only when requested per XEP-0184
1198 rcvd = Nokogiri::XML::Node.new(
1199 'received',
1200 msg.document
1201 )
1202 rcvd['xmlns'] = 'urn:xmpp:receipts'
1203 rcvd['id'] = id
1204 msg.add_child(rcvd)
1205
1206 # TODO: make prettier: this should be done above
1207 others_num = params['_json'][0]['to']
1208 else
1209 # TODO: notify somehow of unknown state receivd?
1210 puts "message with id #{id} has "\
1211 "other type #{type}"
1212 return [200, {}, "OK"]
1213 end
1214
1215 puts "RESPONSE4: #{msg.inspect}"
1216 end
1217
1218 # if message-failed, we already set msg.from
1219 # moreover, we said `msg = msg.as_error`, and StanzaError
1220 msg.from = others_num + '@' + ARGV[0] if msg.respond_to?(:from=)
1221 SGXbwmsgsv2.write(msg)
1222
1223 # Emit event to messages stream
1224 case [jparams['direction'], type]
1225 when ['in', 'message-received']
1226 if !env['HTTP_X_JMP_RESEND_OF'].to_s.empty?
1227 MessageEvent::ResendIn.new(
1228 original_stream_id: env['HTTP_X_JMP_RESEND_OF'],
1229 original_bandwidth_id: jparams['id'],
1230 owner: jparams['owner']
1231 ).emit(REDIS)
1232 else
1233 media_urls = Array(jparams['media']).reject { |u|
1234 u.end_with?('.smil', '.txt', '.xml')
1235 }
1236 MessageEvent::In.new(
1237 timestamp: jparams['time'],
1238 from: jparams['from'],
1239 to: jparams['to'],
1240 owner: jparams['owner'],
1241 bandwidth_id: jparams['id'],
1242 body: jparams['text'].to_s,
1243 media_urls: media_urls
1244 ).emit(REDIS)
1245 end
1246 when ['out', 'message-failed']
1247 tag_parts = jparams['tag'].split(/ /, 2)
1248 stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1249 MessageEvent::Failed.new(
1250 timestamp: jparams['time'],
1251 stanza_id: stanza_id,
1252 bandwidth_id: jparams['id'],
1253 error_code: jparams['errorCode'].to_s,
1254 error_description: jparams['description'].to_s
1255 ).emit(REDIS)
1256 when ['out', 'message-delivered']
1257 tag_parts = jparams['tag'].split(/ /, 2)
1258 stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1259 MessageEvent::Delivered.new(
1260 timestamp: jparams['time'],
1261 stanza_id: stanza_id,
1262 bandwidth_id: jparams['id']
1263 ).emit(REDIS)
1264 end
1265
1266 [200, {}, "OK"]
1267 rescue Exception => e
1268 Sentry.capture_exception(e)
1269 puts 'Shutting down gateway due to exception 013: ' + e.message
1270 SGXbwmsgsv2.shutdown
1271 puts 'Gateway has terminated.'
1272 EM.stop unless ENV['ENV'] == 'test'
1273 end
1274end
1275
1276at_exit do
1277 $stdout.sync = true
1278
1279 puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\
1280 "==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
1281
1282 if ARGV.size != 7
1283 puts "Usage: sgx-bwmsgsv2.rb <component_jid> "\
1284 "<component_password> <server_hostname> "\
1285 "<server_port> <application_id> "\
1286 "<http_listen_port> <mms_proxy_prefix_url>"
1287 exit 0
1288 end
1289
1290 t = Time.now
1291 puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
1292
1293 EM.run do
1294 REDIS = EM::Hiredis.connect
1295
1296 SGXbwmsgsv2.run
1297
1298 # required when using Prosody otherwise disconnects on 6-hour inactivity
1299 EM.add_periodic_timer(3600) do
1300 msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1301 msg.from = ARGV[0]
1302 SGXbwmsgsv2.write(msg)
1303 end
1304
1305 server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
1306 server.api = WebhookHandler.new
1307 server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1308 server.logger = Log4r::Logger.new('goliath')
1309 server.logger.add(Log4r::StdoutOutputter.new('console'))
1310 server.logger.level = Log4r::INFO
1311 server.start do
1312 ["INT", "TERM"].each do |sig|
1313 trap(sig) do
1314 EM.defer do
1315 puts 'Shutting down gateway...'
1316 SGXbwmsgsv2.shutdown
1317
1318 puts 'Gateway has terminated.'
1319 EM.stop
1320 end
1321 end
1322 end
1323 end
1324 end
1325end unless ENV['ENV'] == 'test'