1#!/usr/bin/env ruby
2# frozen_string_literal: true
3
4# Copyright (C) 2017-2020 Denver Gingerich <denver@ossguy.com>
5# Copyright (C) 2017 Stephen Paul Weber <singpolyma@singpolyma.net>
6#
7# This file is part of sgx-bwmsgsv2.
8#
9# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
10# the terms of the GNU Affero General Public License as published by the Free
11# Software Foundation, either version 3 of the License, or (at your option) any
12# later version.
13#
14# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
15# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17# details.
18#
19# You should have received a copy of the GNU Affero General Public License along
20# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
21
22require 'blather/client/dsl'
23require 'em-hiredis'
24require 'em-http-request'
25require 'json'
26require 'multibases'
27require 'multihashes'
28require 'securerandom'
29require "sentry-ruby"
30require 'time'
31require 'uri'
32require 'webrick'
33
34require 'goliath/api'
35require 'goliath/server'
36require 'log4r'
37
38require 'em_promise'
39require 'em-synchrony'
40
41require_relative 'lib/bandwidth_error'
42require_relative 'lib/bandwidth_tn_options'
43require_relative 'lib/message_event'
44require_relative 'lib/registration_repo'
45
46Sentry.init
47
48# List of supported MIME types from Bandwidth - https://support.bandwidth.com/hc/en-us/articles/360014128994-What-MMS-file-types-are-supported-
49MMS_MIME_TYPES = [
50 "application/json",
51 "application/ogg",
52 "application/pdf",
53 "application/rtf",
54 "application/zip",
55 "application/x-tar",
56 "application/xml",
57 "application/gzip",
58 "application/x-bzip2",
59 "application/x-gzip",
60 "application/smil",
61 "application/javascript",
62 "audio/mp4",
63 "audio/mpeg",
64 "audio/ogg",
65 "audio/flac",
66 "audio/webm",
67 "audio/wav",
68 "audio/amr",
69 "audio/3gpp",
70 "image/bmp",
71 "image/gif",
72 "image/jpeg",
73 "image/pjpeg",
74 "image/png",
75 "image/svg+xml",
76 "image/tiff",
77 "image/webp",
78 "image/x-icon",
79 "text/css",
80 "text/csv",
81 "text/calendar",
82 "text/plain",
83 "text/javascript",
84 "text/vcard",
85 "text/vnd.wap.wml",
86 "text/xml",
87 "video/avi",
88 "video/mp4",
89 "video/mpeg",
90 "video/ogg",
91 "video/quicktime",
92 "video/webm",
93 "video/x-ms-wmv",
94 "video/x-flv"
95]
96
97def panic(e)
98 Sentry.capture_exception(e)
99 puts "Shutting down gateway due to exception: #{e.message}"
100 puts e.backtrace
101 SGXbwmsgsv2.shutdown
102 puts 'Gateway has terminated.'
103 EM.stop
104end
105
106EM.error_handler(&method(:panic))
107
108def extract_shortcode(dest)
109 num, context = dest.split(';', 2)
110 num if context == 'phone-context=ca-us.phone-context.soprani.ca'
111end
112
113def anonymous_tel?(dest)
114 dest.split(';', 2)[1] == 'phone-context=anonymous.phone-context.soprani.ca'
115end
116
117class SGXClient < Blather::Client
118 def handle_data(stanza)
119 promise = EMPromise.resolve(nil).then {
120 with_sentry(stanza) do |scope|
121 super
122 rescue StandardError => e
123 handle_error(scope, stanza, e)
124 end
125 }.catch { |e| panic(e) }
126 promise.sync if ENV["ENV"] == "test"
127 promise
128 end
129
130 # Override the default call_handler to syncify during testing.
131 def call_handler(handler, guards, stanza)
132 result = if guards.first.respond_to?(:to_str)
133 found = stanza.find(*guards)
134 throw :pass if found.empty?
135
136 handler.call(stanza, found)
137 else
138 throw :pass if guarded?(guards, stanza)
139
140 handler.call(stanza)
141 end
142
143 # Up to here, identical to upstream impl
144
145 return result unless result.is_a?(Promise)
146
147 result.sync if ENV["ENV"] == "test"
148 result
149 end
150
151protected
152
153 def with_sentry(stanza)
154 Sentry.clone_hub_to_current_thread
155
156 Sentry.with_scope do |scope|
157 setup_scope(stanza, scope)
158 yield scope
159 ensure
160 scope.get_transaction&.then do |tx|
161 tx.set_status("ok") unless tx.status
162 tx.finish
163 end
164 end
165 end
166
167 def setup_scope(stanza, scope)
168 name = stanza.respond_to?(:node) ? stanza.node : stanza.name
169 scope.clear_breadcrumbs
170 scope.set_transaction_name(name)
171 scope.set_user(jid: stanza.from&.stripped.to_s)
172
173 transaction = Sentry.start_transaction(
174 name: name,
175 op: "blather.handle_data"
176 )
177 scope.set_span(transaction) if transaction
178 end
179
180 def handle_error(scope, stanza, e)
181 puts "Error raised during #{scope.transaction_name}: #{e.class}"
182 puts e.message
183 puts e.backtrace
184 Sentry.capture_exception(e) unless e.is_a?(Sentry::Error)
185 scope.get_transaction&.set_status("internal_error")
186 return if e.respond_to?(:replied?) && e.replied?
187
188 SGXbwmsgsv2.write_to_stream stanza.as_error("internal-server-error", :cancel)
189 end
190end
191
192# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
193module CatapultSettingFlagBits
194 VOICEMAIL_TRANSCRIPTION_DISABLED = 0
195 MMS_ON_OOB_URL = 1
196end
197
198module SGXbwmsgsv2
199 extend Blather::DSL
200
201 @registration_repo = RegistrationRepo.new
202 @client = SGXClient.new
203 @gateway_features = [
204 "http://jabber.org/protocol/disco#info",
205 "http://jabber.org/protocol/address/",
206 "jabber:iq:register",
207 "http://jabber.org/protocol/commands"
208 ]
209
210 def self.run
211 # TODO: read/save ARGV[7] creds to local variables
212 client.run
213 end
214
215 # so classes outside this module can write messages, too
216 def self.write(stanza)
217 client.write(stanza)
218 end
219
220 def self.before_handler(type, *guards, &block)
221 client.register_handler_before(type, *guards, &block)
222 end
223
224 def self.send_media(from, to, media_url, desc=nil, subject=nil, m=nil)
225 # we assume media_url is one of these (always the case so far):
226 # https://messaging.bandwidth.com/api/v2/users/[u]/media/[path]
227
228 puts 'ORIG_URL: ' + media_url
229 usr = to
230 if media_url.start_with?('https://messaging.bandwidth.com/api/v2/users/')
231 pth = media_url.split('/', 9)[8]
232 # the caller must guarantee that 'to' is a bare JID
233 media_url = ARGV[6] + WEBrick::HTTPUtils.escape(usr) + '/' + pth
234 puts 'PROX_URL: ' + media_url
235 end
236
237 msg = m ? m.copy : Blather::Stanza::Message.new(to, "")
238 msg.from = from
239 msg.subject = subject if subject
240
241 # provide URL in XEP-0066 (OOB) fashion
242 x = Nokogiri::XML::Node.new 'x', msg.document
243 x['xmlns'] = 'jabber:x:oob'
244
245 urln = Nokogiri::XML::Node.new 'url', msg.document
246 urlc = Nokogiri::XML::Text.new media_url, msg.document
247 urln.add_child(urlc)
248 x.add_child(urln)
249
250 if desc
251 descn = Nokogiri::XML::Node.new('desc', msg.document)
252 descc = Nokogiri::XML::Text.new(desc, msg.document)
253 descn.add_child(descc)
254 x.add_child(descn)
255 end
256
257 msg.add_child(x)
258
259 write(msg)
260 rescue Exception => e
261 panic(e)
262 end
263
264 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, async: true
265
266 def self.pass_on_message(m, users_num, jid)
267 # Capture destination before modifying m.to
268 dest_num = m.to.node
269
270 # setup delivery receipt; similar to a reply
271 rcpt = ReceiptMessage.new(m.from.stripped)
272 rcpt.from = m.to
273
274 # pass original message (before sending receipt)
275 m.to = jid
276 m.from = "#{users_num}@#{ARGV[0]}"
277
278 puts 'XRESPONSE0: ' + m.inspect
279 write_to_stream m
280
281 # Emit pass-through event. Thru events don't capture a timestamp because XMPP
282 # stanzas don't carry timestamps for realtime messages, and the Redis stream
283 # ID provides the emit time.
284 oob_url = m.at("oob|x > oob|url", oob: "jabber:x:oob")&.text
285 MessageEvent::Thru.new(
286 owner: users_num,
287 from: users_num,
288 to: [dest_num],
289 stanza_id: m.id.to_s,
290 body: m.body.to_s,
291 media_urls: [oob_url].compact
292 ).emit(REDIS)
293
294 # send a delivery receipt back to the sender
295 # TODO: send only when requested per XEP-0184
296 # TODO: pass receipts from target if supported
297
298 # TODO: put in member/instance variable
299 rcpt['id'] = SecureRandom.uuid
300 rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
301 rcvd['xmlns'] = 'urn:xmpp:receipts'
302 rcvd['id'] = m.id
303 rcpt.add_child(rcvd)
304
305 puts 'XRESPONSE1: ' + rcpt.inspect
306 write_to_stream rcpt
307 end
308
309 def self.call_catapult(
310 token, secret, m, pth, body=nil,
311 head={}, code=[200], respond_with=:body
312 )
313 # pth looks like one of:
314 # "api/v2/users/#{user_id}/[endpoint_name]"
315
316 url_prefix = ''
317
318 # TODO: need to make a separate thing for voice.bw.c eventually
319 if pth.start_with? 'api/v2/users'
320 url_prefix = 'https://messaging.bandwidth.com/'
321 end
322
323 EM::HttpRequest.new(
324 url_prefix + pth
325 ).public_send(
326 "a#{m}",
327 head: {
328 'Authorization' => [token, secret]
329 }.merge(head),
330 body: body
331 ).then { |http|
332 puts "API response to send: #{http.response} with code"\
333 " response.code #{http.response_header.status}"
334
335 if code.include?(http.response_header.status)
336 case respond_with
337 when :body
338 http.response
339 when :headers
340 http.response_header
341 else
342 http
343 end
344 else
345 EMPromise.reject(
346 BandwidthError.for(http.response_header.status, http.response)
347 )
348 end
349 }
350 end
351
352 def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
353 usern)
354 un = s.at("oob|x > oob|url", oob: "jabber:x:oob")
355 unless un
356 puts "MMSOOB: no url node found so process as normal"
357 return to_catapult(s, nil, num_dest, user_id, token,
358 secret, usern)
359 end
360 puts "MMSOOB: found a url node - checking if to make MMS..."
361
362 body = s.respond_to?(:body) ? s.body : ''
363 EM::HttpRequest.new(un.text, tls: { verify_peer: true }).ahead.then { |http|
364 # If content is too large, or MIME type is not supported, place the link inside the body and do not send MMS.
365 if http.response_header["CONTENT_LENGTH"].to_i > 3500000 ||
366 !MMS_MIME_TYPES.include?(http.response_header["CONTENT_TYPE"])
367 unless body.include?(un.text)
368 s.body = body.empty? ? un.text : "#{body}\n#{un.text}"
369 end
370 to_catapult(s, nil, num_dest, user_id, token, secret, usern)
371 else # If size is less than ~3.5MB, strip the link from the body and attach media in the body.
372 # some clients send URI in both body & <url/> so delete
373 s.body = body.sub(/\s*#{Regexp.escape(un.text)}\s*$/, '')
374
375 puts "MMSOOB: url text is '#{un.text}'"
376 puts "MMSOOB: the body is '#{body.to_s.strip}'"
377
378 puts "MMSOOB: sending MMS since found OOB & user asked"
379 to_catapult(s, un.text, num_dest, user_id, token, secret, usern)
380 end
381 }
382 end
383
384 def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
385 body = s.respond_to?(:body) ? s.body.to_s : ''
386 if murl.to_s.empty? && body.strip.empty?
387 return EMPromise.reject(
388 [:modify, 'policy-violation']
389 )
390 end
391
392 segment_size = body.ascii_only? ? 160 : 70
393 if !murl && ENV["MMS_PATH"] && body.length > segment_size*3
394 file = Multibases.pack(
395 'base58btc',
396 Multihashes.encode(Digest::SHA256.digest(body), "sha2-256")
397 ).to_s
398 File.open("#{ENV['MMS_PATH']}/#{file}", "w") { |fh| fh.write body }
399 murl = "#{ENV['MMS_URL']}/#{file}.txt"
400 body = ""
401 end
402
403 extra = {}
404 extra[:media] = murl if murl
405
406 call_catapult(
407 token,
408 secret,
409 :post,
410 "api/v2/users/#{user_id}/messages",
411 JSON.dump(extra.merge(
412 from: usern,
413 to: num_dest,
414 text: body,
415 applicationId: ARGV[4],
416 tag:
417 # callbacks need id and resourcepart
418 WEBrick::HTTPUtils.escape(s.id.to_s) +
419 ' ' +
420 WEBrick::HTTPUtils.escape(
421 s.from.resource.to_s
422 )
423 )),
424 {'Content-Type' => 'application/json'},
425 [201]
426 ).then { |response|
427 parsed = JSON.parse(response) rescue {}
428 MessageEvent::Out.new(
429 timestamp: parsed["time"] || Time.now,
430 owner: usern,
431 from: usern,
432 to: Array(num_dest),
433 stanza_id: s.id.to_s,
434 bandwidth_id: parsed["id"],
435 body: body,
436 media_urls: [murl].compact
437 ).emit(REDIS)
438 response
439 }.catch { |e|
440 EMPromise.reject(
441 [:cancel, 'internal-server-error', e.message]
442 )
443 }
444 end
445
446 def self.validate_num(m)
447 # if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
448 if m.to == ARGV[0]
449 an = m.children.find { |v| v.element_name == "addresses" }
450 if not an
451 return EMPromise.reject(
452 [:cancel, 'item-not-found']
453 )
454 end
455 puts "ADRXEP: found an addresses node - iterate addrs.."
456
457 nums = []
458 an.children.each do |e|
459 num = ''
460 type = ''
461 e.attributes.each do |c|
462 if c[0] == 'type'
463 if c[1] != 'to'
464 # TODO: error
465 end
466 type = c[1].to_s
467 elsif c[0] == 'uri'
468 if !c[1].to_s.start_with? 'sms:'
469 # TODO: error
470 end
471 num = c[1].to_s[4..-1]
472 # TODO: confirm num validates
473 # TODO: else, error - unexpected name
474 end
475 end
476 if num.empty? or type.empty?
477 # TODO: error
478 end
479 nums << num
480 end
481 return nums
482 end
483
484 # if not sent to SGX domain, then assume destination is in 'to'
485 EMPromise.resolve(m.to.node.to_s).then { |num_dest|
486 if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
487 next num_dest if num_dest[0] == '+'
488
489 shortcode = extract_shortcode(num_dest)
490 next shortcode if shortcode
491 end
492
493 if anonymous_tel?(num_dest)
494 EMPromise.reject([:cancel, 'gone'])
495 else
496 # TODO: text re num not (yet) supportd/implmentd
497 EMPromise.reject([:cancel, 'item-not-found'])
498 end
499 }
500 end
501
502 def self.fetch_catapult_cred_for(jid)
503 @registration_repo.find(jid).then { |creds|
504 if creds.length < 4
505 # TODO: add text re credentials not registered
506 EMPromise.reject(
507 [:auth, 'registration-required']
508 )
509 else
510 creds
511 end
512 }
513 end
514
515 message :error? do |m|
516 # TODO: report it somewhere/somehow - eat for now so no err loop
517 puts "EATERROR1: #{m.inspect}"
518 end
519
520 message :body do |m|
521 EMPromise.all([
522 validate_num(m),
523 fetch_catapult_cred_for(m.from)
524 ]).then { |(num_dest, creds)|
525 @registration_repo.find_jid(num_dest).then { |jid|
526 [jid, num_dest] + creds
527 }
528 }.then { |(jid, num_dest, *creds)|
529 if jid
530 @registration_repo.find(jid).then { |other_user|
531 [jid, num_dest] + creds + [other_user.first]
532 }
533 else
534 [jid, num_dest] + creds + [nil]
535 end
536 }.then { |(jid, num_dest, *creds, other_user)|
537 # if destination user is in the system pass on directly
538 if other_user and not other_user.start_with? 'u-'
539 pass_on_message(m, creds.last, jid)
540 else
541 to_catapult_possible_oob(m, num_dest, *creds)
542 end
543 }.catch { |e|
544 if e.is_a?(Array) && (e.length == 2 || e.length == 3)
545 write_to_stream m.as_error(e[1], e[0], e[2])
546 else
547 EMPromise.reject(e)
548 end
549 }
550 end
551
552 def self.user_cap_identities
553 [{category: 'client', type: 'sms'}]
554 end
555
556 # TODO: must re-add stuff so can do ad-hoc commands
557 def self.user_cap_features
558 ["urn:xmpp:receipts"]
559 end
560
561 def self.add_gateway_feature(feature)
562 @gateway_features << feature
563 @gateway_features.uniq!
564 end
565
566 subscription :request? do |p|
567 puts "PRESENCE1: #{p.inspect}"
568
569 # subscriptions are allowed from anyone - send reply immediately
570 msg = Blather::Stanza::Presence.new
571 msg.to = p.from
572 msg.from = p.to
573 msg.type = :subscribed
574
575 puts 'RESPONSE5a: ' + msg.inspect
576 write_to_stream msg
577
578 # send a <presence> immediately; not automatically probed for it
579 # TODO: refactor so no "presence :probe? do |p|" duplicate below
580 caps = Blather::Stanza::Capabilities.new
581 # TODO: user a better node URI (?)
582 caps.node = 'http://catapult.sgx.soprani.ca/'
583 caps.identities = user_cap_identities
584 caps.features = user_cap_features
585
586 msg = caps.c
587 msg.to = p.from
588 msg.from = p.to.to_s + '/sgx'
589
590 puts 'RESPONSE5b: ' + msg.inspect
591 write_to_stream msg
592
593 # need to subscribe back so Conversations displays images inline
594 msg = Blather::Stanza::Presence.new
595 msg.to = p.from.to_s.split('/', 2)[0]
596 msg.from = p.to.to_s.split('/', 2)[0]
597 msg.type = :subscribe
598
599 puts 'RESPONSE5c: ' + msg.inspect
600 write_to_stream msg
601 end
602
603 presence :probe? do |p|
604 puts 'PRESENCE2: ' + p.inspect
605
606 caps = Blather::Stanza::Capabilities.new
607 # TODO: user a better node URI (?)
608 caps.node = 'http://catapult.sgx.soprani.ca/'
609 caps.identities = user_cap_identities
610 caps.features = user_cap_features
611
612 msg = caps.c
613 msg.to = p.from
614 msg.from = p.to.to_s + '/sgx'
615
616 puts 'RESPONSE6: ' + msg.inspect
617 write_to_stream msg
618 end
619
620 disco_items(
621 to: Blather::JID.new(ARGV[0]),
622 node: "http://jabber.org/protocol/commands"
623 ) do |i|
624 fetch_catapult_cred_for(i.from).then { |creds|
625 BandwidthTNOptions.tn_eligible_for_port_out_pin?(creds).then { |eligible|
626 reply = i.reply
627 reply.node = 'http://jabber.org/protocol/commands'
628
629 if eligible
630 reply.items = [
631 Blather::Stanza::DiscoItems::Item.new(
632 i.to,
633 'set-port-out-pin',
634 'Set Port-Out PIN'
635 )
636 ]
637 else
638 reply.items = []
639 end
640
641 puts 'RESPONSE_CMD_DISCO: ' + reply.inspect
642 write_to_stream reply
643 }
644 }.catch { |e|
645 if e.is_a?(Array) && [2, 3].include?(e.length)
646 write_to_stream i.as_error(e[1], e[0], e[2])
647 else
648 EMPromise.reject(e)
649 end
650 }
651 end
652
653 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#info' do |i|
654 # TODO: return error if i.type is :set - if it is :reply or
655 # :error it should be ignored (as the below does currently);
656 # review specification to see how to handle other type values
657 if i.type != :get
658 puts 'DISCO iq rcvd, of non-get type "' + i.type.to_s +
659 '" for message "' + i.inspect + '"; ignoring...'
660 next
661 end
662
663 # respond to capabilities request for an sgx-bwmsgsv2 number JID
664 if i.to.node
665 # TODO: confirm the node URL is expected using below
666 #puts "XR[node]: #{xpath_result[0]['node']}"
667
668 msg = i.reply
669 msg.node = i.node
670 msg.identities = user_cap_identities
671 msg.features = user_cap_features
672
673 puts 'RESPONSE7: ' + msg.inspect
674 write_to_stream msg
675 next
676 end
677
678 # respond to capabilities request for sgx-bwmsgsv2 itself
679 msg = i.reply
680 msg.node = i.node
681 msg.identities = [{
682 name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
683 type: 'sms', category: 'gateway'
684 }]
685 msg.features = @gateway_features
686 write_to_stream msg
687 end
688
689 def self.check_then_register(i, *creds)
690 @registration_repo
691 .put(i.from, *creds)
692 .catch_only(RegistrationRepo::Conflict) { |e|
693 EMPromise.reject([:cancel, 'conflict', e.message])
694 }.then {
695 write_to_stream i.reply
696 }
697 end
698
699 def self.creds_from_registration_query(i)
700 if i.query.find_first("./ns:x", ns: "jabber:x:data")
701 [
702 i.form.field("nick")&.value,
703 i.form.field("username")&.value,
704 i.form.field("password")&.value,
705 i.form.field("phone")&.value
706 ]
707 else
708 [i.nick, i.username, i.password, i.phone]
709 end
710 end
711
712 def self.process_registration(i)
713 EMPromise.resolve(nil).then {
714 if i.remove?
715 @registration_repo.delete(i.from).then do
716 write_to_stream i.reply
717 EMPromise.reject(:done)
718 end
719 else
720 creds_from_registration_query(i)
721 end
722 }.then { |user_id, api_token, api_secret, phone_num|
723 if phone_num && phone_num[0] == '+'
724 [user_id, api_token, api_secret, phone_num]
725 else
726 # TODO: add text re number not (yet) supported
727 EMPromise.reject([:cancel, 'item-not-found'])
728 end
729 }.then { |user_id, api_token, api_secret, phone_num|
730 # TODO: find way to verify #{phone_num}, too
731 call_catapult(
732 api_token,
733 api_secret,
734 :get,
735 "api/v2/users/#{user_id}/media"
736 ).then { |response|
737 JSON.parse(response)
738 # TODO: confirm response is array - could be empty
739
740 puts "register got str #{response.to_s[0..999]}"
741
742 check_then_register(
743 i,
744 user_id,
745 api_token,
746 api_secret,
747 phone_num
748 )
749 }
750 }.catch_only(BandwidthError) { |e|
751 EMPromise.reject(case e.code
752 when 401
753 # TODO: add text re bad credentials
754 [:auth, 'not-authorized']
755 when 404
756 # TODO: add text re number not found or disabled
757 [:cancel, 'item-not-found']
758 else
759 [:modify, 'not-acceptable']
760 end)
761 }
762 end
763
764 def self.registration_form(orig, existing_number=nil)
765 orig.registered = !!existing_number
766
767 # TODO: update "User Id" x2 below (to "accountId"?), and others?
768 orig.instructions = "Enter the information from your Account "\
769 "page as well as the Phone Number\nin your "\
770 "account you want to use (ie. '+12345678901')"\
771 ".\nUser Id is nick, API Token is username, "\
772 "API Secret is password, Phone Number is phone"\
773 ".\n\nThe source code for this gateway is at "\
774 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
775 "\nCopyright (C) 2017-2020 Denver Gingerich "\
776 "and others, licensed under AGPLv3+."
777 orig.nick = ""
778 orig.username = ""
779 orig.password = ""
780 orig.phone = existing_number.to_s
781
782 orig.form.fields = [
783 {
784 required: true, type: :"text-single",
785 label: 'User Id', var: 'nick'
786 },
787 {
788 required: true, type: :"text-single",
789 label: 'API Token', var: 'username'
790 },
791 {
792 required: true, type: :"text-private",
793 label: 'API Secret', var: 'password'
794 },
795 {
796 required: true, type: :"text-single",
797 label: 'Phone Number', var: 'phone',
798 value: existing_number.to_s
799 }
800 ]
801 orig.form.title = 'Register for '\
802 'Soprani.ca Gateway to XMPP - Bandwidth API V2'
803 orig.form.instructions = "Enter the details from your Account "\
804 "page as well as the Phone Number\nin your "\
805 "account you want to use (ie. '+12345678901')"\
806 ".\n\nThe source code for this gateway is at "\
807 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
808 "\nCopyright (C) 2017-2020 Denver Gingerich "\
809 "and others, licensed under AGPLv3+."
810
811 orig
812 end
813
814 ibr do |i|
815 puts "IQ: #{i.inspect}"
816
817 case i.type
818 when :set
819 process_registration(i)
820 when :get
821 bare_jid = i.from.stripped
822 @registration_repo.find(bare_jid).then { |creds|
823 reply = registration_form(i.reply, creds.last)
824 puts "RESPONSE2: #{reply.inspect}"
825 write_to_stream reply
826 }
827 else
828 # Unknown IQ, ignore for now
829 EMPromise.reject(:done)
830 end.catch { |e|
831 if e.is_a?(Array) && (e.length == 2 || e.length == 3)
832 write_to_stream i.as_error(e[1], e[0], e[2])
833 elsif e != :done
834 EMPromise.reject(e)
835 end
836 }.catch(&method(:panic))
837 end
838
839 command :execute?, node: "set-port-out-pin", sessionid: nil do |iq|
840 # Ensure user is registered, but discard their credentials
841 # because we're just showing them a form.
842 fetch_catapult_cred_for(iq.from).then { |_creds|
843 reply = iq.reply
844 reply.node = 'set-port-out-pin'
845 reply.sessionid = SecureRandom.uuid
846 reply.status = :executing
847
848 form = Blather::Stanza::X.find_or_create(reply.command)
849 form.type = "form"
850 form.fields = [
851 {
852 var: 'pin',
853 type: 'text-private',
854 label: 'Port-Out PIN',
855 required: true
856 },
857 {
858 var: 'confirm_pin',
859 type: 'text-private',
860 label: 'Confirm PIN',
861 required: true
862 }
863 ]
864
865 reply.command.add_child(form)
866 reply.allowed_actions = [:complete]
867
868 puts "RESPONSE_CMD_FORM: #{reply.inspect}"
869 write_to_stream reply
870 }.catch { |e|
871 if e.is_a?(Array) && [2, 3].include?(e.length)
872 write_to_stream iq.as_error(e[1], e[0], e[2])
873 else
874 EMPromise.reject(e)
875 end
876 }.catch(&method(:panic))
877 end
878
879 command :complete?, node: "set-port-out-pin", sessionid: /./ do |iq|
880 pin = iq.form.field('pin')&.value
881 confirm_pin = iq.form.field('confirm_pin')&.value
882
883 if pin.nil? || confirm_pin.nil?
884 write_to_stream iq.as_error(
885 'bad-request',
886 :modify,
887 'PIN fields are required'
888 )
889 next
890 end
891
892 if pin != confirm_pin
893 write_to_stream iq.as_error(
894 'bad-request',
895 :modify,
896 'PIN confirmation does not match'
897 )
898 next
899 end
900
901 if pin !~ /\A[a-zA-Z0-9]{4,10}\z/
902 write_to_stream iq.as_error(
903 'bad-request',
904 :modify,
905 'PIN must be 4-10 alphanumeric characters'
906 )
907 next
908 end
909
910 fetch_catapult_cred_for(iq.from).then { |creds|
911 BandwidthTNOptions.set_port_out_pin(creds, pin).then {
912 reply = iq.reply
913 reply.node = 'set-port-out-pin'
914 reply.sessionid = iq.sessionid
915 reply.status = :completed
916 reply.note_type = :info
917 reply.note_text = 'Port-out PIN has been set successfully.'
918
919 write_to_stream reply
920 }.catch { |e|
921 reply = iq.reply
922 reply.node = 'set-port-out-pin'
923 reply.sessionid = iq.sessionid
924 reply.status = :completed
925 reply.note_type = :error
926 error_msg = if e.respond_to?(:message) && e.message.include?('not valid')
927 "Invalid phone number format. "\
928 "Please check your registered phone number."
929 elsif e.respond_to?(:message) && e.message.include?('ErrorCode')
930 "Bandwidth API error: #{e.message}"
931 else
932 "Failed to set port-out PIN. Please try again later."
933 end
934 reply.note_text = error_msg
935
936 write_to_stream reply
937 }
938 }.catch { |e|
939 if e.is_a?(Array) && [2, 3].include?(e.length)
940 write_to_stream iq.as_error(e[1], e[0], e[2])
941 else
942 EMPromise.reject(e)
943 end
944 }.catch(&method(:panic))
945 end
946
947 iq type: [:get, :set] do |iq|
948 write_to_stream(Blather::StanzaError.new(
949 iq,
950 'feature-not-implemented',
951 :cancel
952 ))
953 end
954end
955
956class ReceiptMessage < Blather::Stanza
957 def self.new(to=nil)
958 node = super :message
959 node.to = to
960 node
961 end
962end
963
964class WebhookHandler < Goliath::API
965 use Sentry::Rack::CaptureExceptions
966 use Goliath::Rack::Params
967
968 def response(env)
969 @registration_repo = RegistrationRepo.new
970 # TODO: add timestamp grab here, and MUST include ./tai version
971
972 puts 'ENV: ' + env.reject { |k| k == 'params' }.to_s
973
974 if params.empty?
975 puts 'PARAMS empty!'
976 return [200, {}, "OK"]
977 end
978
979 if env['REQUEST_URI'] != '/'
980 puts 'BADREQUEST1: non-/ request "' +
981 env['REQUEST_URI'] + '", method "' +
982 env['REQUEST_METHOD'] + '"'
983 return [200, {}, "OK"]
984 end
985
986 if env['REQUEST_METHOD'] != 'POST'
987 puts 'BADREQUEST2: non-POST request; URI: "' +
988 env['REQUEST_URI'] + '", method "' +
989 env['REQUEST_METHOD'] + '"'
990 return [200, {}, "OK"]
991 end
992
993 # TODO: process each message in list, not just first one
994 jparams = params.dig('_json', 0, 'message')
995 type = params.dig('_json', 0, 'type')
996
997 return [400, {}, "Missing params\n"] unless jparams && type
998
999 users_num, others_num = if jparams['direction'] == 'in'
1000 [jparams['owner'], jparams['from']]
1001 elsif jparams['direction'] == 'out'
1002 [jparams['from'], jparams['owner']]
1003 else
1004 puts "big prob: '#{jparams['direction']}'"
1005 return [400, {}, "OK"]
1006 end
1007
1008 jparams['to'].reject! { |num|
1009 num == users_num || num == others_num
1010 }
1011
1012 return [400, {}, "Missing params\n"] unless users_num && others_num
1013 return [400, {}, "Missing params\n"] unless jparams['to'].is_a?(Array)
1014
1015 puts "BODY - messageId: #{jparams['id']}" \
1016 ", eventType: #{type}" \
1017 ", time: #{jparams['time']}" \
1018 ", direction: #{jparams['direction']}" \
1019 ", deliveryState: #{jparams['deliveryState'] || 'NONE'}" \
1020 ", errorCode: #{jparams['errorCode'] || 'NONE'}" \
1021 ", description: #{jparams['description'] || 'NONE'}" \
1022 ", tag: #{jparams['tag'] || 'NONE'}" \
1023 ", media: #{jparams['media'] || 'NONE'}"
1024
1025 if others_num[0] != '+'
1026 # TODO: check that others_num actually a shortcode first
1027 others_num +=
1028 ';phone-context=ca-us.phone-context.soprani.ca'
1029 end
1030
1031 bare_jid = @registration_repo.find_jid(users_num).sync
1032
1033 if !bare_jid
1034 puts "jid_key for (#{users_num}) DNE; BW API misconfigured?"
1035
1036 return [403, {}, "Customer not found\n"]
1037 end
1038
1039 msg = nil
1040 case jparams['direction']
1041 when 'in'
1042 text = ''
1043 case type
1044 when 'sms'
1045 text = jparams['text']
1046 when 'mms'
1047 has_media = false
1048
1049 if jparams['text'].empty?
1050 if not has_media
1051 text = '[suspected group msg '\
1052 'with no text (odd)]'
1053 end
1054 else
1055 text = if has_media
1056 # TODO: write/use a caption XEP
1057 jparams['text']
1058 else
1059 '[suspected group msg '\
1060 '(recipient list not '\
1061 'available) with '\
1062 'following text] ' +
1063 jparams['text']
1064 end
1065 end
1066
1067 # ie. if text param non-empty or had no media
1068 if not text.empty?
1069 msg = Blather::Stanza::Message.new(
1070 bare_jid, text)
1071 msg.from = others_num + '@' + ARGV[0]
1072 SGXbwmsgsv2.write(msg)
1073 end
1074
1075 return [200, {}, "OK"]
1076 when 'message-received'
1077 # TODO: handle group chat, and fix above
1078 text = jparams['text']
1079
1080 if jparams['to'].length > 1
1081 msg = Blather::Stanza::Message.new(
1082 Blather::JID.new(bare_jid).domain,
1083 text
1084 )
1085
1086 addrs = Nokogiri::XML::Node.new(
1087 'addresses', msg.document)
1088 addrs['xmlns'] = 'http://jabber.org/' \
1089 'protocol/address'
1090
1091 addr1 = Nokogiri::XML::Node.new(
1092 'address', msg.document)
1093 addr1['type'] = 'to'
1094 addr1['jid'] = bare_jid
1095 addrs.add_child(addr1)
1096
1097 jparams['to'].each do |receiver|
1098 addrn = Nokogiri::XML::Node.new(
1099 'address', msg.document)
1100 addrn['type'] = 'to'
1101 addrn['uri'] = "sms:#{receiver}"
1102 addrn['delivered'] = 'true'
1103 addrs.add_child(addrn)
1104 end
1105
1106 msg.add_child(addrs)
1107
1108 # TODO: delete
1109 puts "RESPONSE9: #{msg.inspect}"
1110 end
1111
1112 Array(jparams['media']).each do |media_url|
1113 unless media_url.end_with?(
1114 '.smil', '.txt', '.xml'
1115 )
1116 has_media = true
1117 SGXbwmsgsv2.send_media(
1118 others_num + '@' +
1119 ARGV[0],
1120 bare_jid, media_url,
1121 nil, nil, msg
1122 )
1123 end
1124 end
1125 else
1126 text = "unknown type (#{type})"\
1127 " with text: " + jparams['text']
1128
1129 # TODO: log/notify of this properly
1130 puts text
1131 end
1132
1133 if not msg
1134 msg = Blather::Stanza::Message.new(bare_jid, text)
1135 end
1136 else # per prior switch, this is: jparams['direction'] == 'out'
1137 tag_parts = jparams['tag'].split(/ /, 2)
1138 id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1139 resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
1140
1141 # TODO: remove this hack
1142 if jparams['to'].length > 1
1143 puts "WARN! group no rcpt: #{users_num}"
1144 return [200, {}, "OK"]
1145 end
1146
1147 case type
1148 when 'message-failed'
1149 # create a bare message like the one user sent
1150 msg = Blather::Stanza::Message.new(
1151 others_num + '@' + ARGV[0])
1152 msg.from = bare_jid + '/' + resourcepart
1153 msg['id'] = id
1154
1155 # TODO: add 'errorCode' and/or 'description' val
1156 # create an error reply to the bare message
1157 msg = msg.as_error(
1158 'recipient-unavailable',
1159 :wait,
1160 jparams['description']
1161 )
1162
1163 # TODO: make prettier: this should be done above
1164 others_num = params['_json'][0]['to']
1165 when 'message-delivered'
1166
1167 msg = ReceiptMessage.new(bare_jid)
1168
1169 # TODO: put in member/instance variable
1170 msg['id'] = SecureRandom.uuid
1171
1172 # TODO: send only when requested per XEP-0184
1173 rcvd = Nokogiri::XML::Node.new(
1174 'received',
1175 msg.document
1176 )
1177 rcvd['xmlns'] = 'urn:xmpp:receipts'
1178 rcvd['id'] = id
1179 msg.add_child(rcvd)
1180
1181 # TODO: make prettier: this should be done above
1182 others_num = params['_json'][0]['to']
1183 else
1184 # TODO: notify somehow of unknown state receivd?
1185 puts "message with id #{id} has "\
1186 "other type #{type}"
1187 return [200, {}, "OK"]
1188 end
1189
1190 puts "RESPONSE4: #{msg.inspect}"
1191 end
1192
1193 msg.from = others_num + '@' + ARGV[0]
1194 SGXbwmsgsv2.write(msg)
1195
1196 # Emit event to messages stream
1197 case [jparams['direction'], type]
1198 when ['in', 'message-received']
1199 media_urls = Array(jparams['media']).reject { |u|
1200 u.end_with?('.smil', '.txt', '.xml')
1201 }
1202 MessageEvent::In.new(
1203 timestamp: jparams['time'],
1204 from: jparams['from'],
1205 to: jparams['to'],
1206 owner: jparams['owner'],
1207 bandwidth_id: jparams['id'],
1208 body: jparams['text'].to_s,
1209 media_urls: media_urls
1210 ).emit(REDIS)
1211 when ['out', 'message-failed']
1212 tag_parts = jparams['tag'].split(/ /, 2)
1213 stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1214 MessageEvent::Failed.new(
1215 timestamp: jparams['time'],
1216 stanza_id: stanza_id,
1217 bandwidth_id: jparams['id'],
1218 error_code: jparams['errorCode'].to_s,
1219 error_description: jparams['description'].to_s
1220 ).emit(REDIS)
1221 when ['out', 'message-delivered']
1222 tag_parts = jparams['tag'].split(/ /, 2)
1223 stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1224 MessageEvent::Delivered.new(
1225 timestamp: jparams['time'],
1226 stanza_id: stanza_id,
1227 bandwidth_id: jparams['id']
1228 ).emit(REDIS)
1229 end
1230
1231 [200, {}, "OK"]
1232 rescue Exception => e
1233 Sentry.capture_exception(e)
1234 puts 'Shutting down gateway due to exception 013: ' + e.message
1235 SGXbwmsgsv2.shutdown
1236 puts 'Gateway has terminated.'
1237 EM.stop
1238 end
1239end
1240
1241at_exit do
1242 $stdout.sync = true
1243
1244 puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\
1245 "==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
1246
1247 if ARGV.size != 7
1248 puts "Usage: sgx-bwmsgsv2.rb <component_jid> "\
1249 "<component_password> <server_hostname> "\
1250 "<server_port> <application_id> "\
1251 "<http_listen_port> <mms_proxy_prefix_url>"
1252 exit 0
1253 end
1254
1255 t = Time.now
1256 puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
1257
1258 EM.run do
1259 REDIS = EM::Hiredis.connect
1260
1261 SGXbwmsgsv2.run
1262
1263 # required when using Prosody otherwise disconnects on 6-hour inactivity
1264 EM.add_periodic_timer(3600) do
1265 msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1266 msg.from = ARGV[0]
1267 SGXbwmsgsv2.write(msg)
1268 end
1269
1270 server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
1271 server.api = WebhookHandler.new
1272 server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1273 server.logger = Log4r::Logger.new('goliath')
1274 server.logger.add(Log4r::StdoutOutputter.new('console'))
1275 server.logger.level = Log4r::INFO
1276 server.start do
1277 ["INT", "TERM"].each do |sig|
1278 trap(sig) do
1279 EM.defer do
1280 puts 'Shutting down gateway...'
1281 SGXbwmsgsv2.shutdown
1282
1283 puts 'Gateway has terminated.'
1284 EM.stop
1285 end
1286 end
1287 end
1288 end
1289 end
1290end unless ENV['ENV'] == 'test'