1#!/usr/bin/env ruby
2# frozen_string_literal: true
3
4# Copyright (C) 2017-2020 Denver Gingerich <denver@ossguy.com>
5# Copyright (C) 2017 Stephen Paul Weber <singpolyma@singpolyma.net>
6#
7# This file is part of sgx-bwmsgsv2.
8#
9# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
10# the terms of the GNU Affero General Public License as published by the Free
11# Software Foundation, either version 3 of the License, or (at your option) any
12# later version.
13#
14# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
15# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17# details.
18#
19# You should have received a copy of the GNU Affero General Public License along
20# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
21
22require 'delegate'
23require 'blather/client/dsl'
24require 'em-hiredis'
25require 'em-http-request'
26require 'json'
27require 'multibases'
28require 'multihashes'
29require 'securerandom'
30require "sentry-ruby"
31require 'time'
32require 'uri'
33require 'webrick'
34
35require 'goliath/api'
36require 'goliath/server'
37require "ougai"
38
39require 'em_promise'
40require 'em-synchrony'
41
42require_relative 'lib/bandwidth_error'
43require_relative 'lib/bandwidth_tn_options'
44require_relative 'lib/message_event'
45require_relative 'lib/registration_repo'
46
47Sentry.init
48
49require_relative 'lib/background_log'
50
51$stdout.sync = true
52LOG = Ougai::Logger.new(ENV["ENV"] == "test" ? $stdout : BackgroundLog.new($stdout))
53LOG.level = ENV.fetch("LOG_LEVEL", "info")
54LOG.formatter = Ougai::Formatters::Readable.new(
55 nil,
56 nil,
57 plain: !$stdout.isatty
58)
59Blather.logger = LOG
60EM::Hiredis.logger = LOG
61
62Sentry.init do |config|
63 config.logger = LOG
64 config.breadcrumbs_logger = [:sentry_logger]
65end
66
67BADWORD_LIST = [
68 "marijuana",
69 "psilocybin",
70 "cannabis",
71 "cocaine",
72 "heroin",
73 "meth",
74 "methamphetamine",
75 "methamphetamines",
76 "cigarette",
77 "tobacco",
78 "cbd",
79 "thc",
80 "morphine",
81 "incall",
82 "in-call",
83 "outcall",
84 "out-call",
85 "shrooms",
86 "lsd",
87 "kratom",
88 "mdma",
89 "addy",
90 "xanz",
91 "cialis",
92 "viagra",
93 "bbfs",
94 "fentanyl",
95 "opium",
96 "golden teacher",
97 "bbbj",
98 "canna",
99 "fuck",
100 "xanax",
101 "zarareturns",
102 "zarareturns.com",
103 "plantation",
104].freeze
105
106BADWORDS = Regexp.union(
107 BADWORD_LIST.map { |w| /\b#{Regexp.escape(w)}\b/ }
108)
109
110# List of supported MIME types from Bandwidth - https://support.bandwidth.com/hc/en-us/articles/360014128994-What-MMS-file-types-are-supported-
111MMS_MIME_TYPES = [
112 "application/json",
113 "application/ogg",
114 "application/pdf",
115 "application/rtf",
116 "application/zip",
117 "application/x-tar",
118 "application/xml",
119 "application/gzip",
120 "application/x-bzip2",
121 "application/x-gzip",
122 "application/smil",
123 "application/javascript",
124 "audio/mp4",
125 "audio/mpeg",
126 "audio/ogg",
127 "audio/flac",
128 "audio/webm",
129 "audio/wav",
130 "audio/amr",
131 "audio/3gpp",
132 "image/bmp",
133 "image/gif",
134 "image/jpeg",
135 "image/pjpeg",
136 "image/png",
137 "image/svg+xml",
138 "image/tiff",
139 "image/webp",
140 "image/x-icon",
141 "text/css",
142 "text/csv",
143 "text/calendar",
144 "text/plain",
145 "text/javascript",
146 "text/vcard",
147 "text/vnd.wap.wml",
148 "text/xml",
149 "video/avi",
150 "video/mp4",
151 "video/mpeg",
152 "video/ogg",
153 "video/quicktime",
154 "video/webm",
155 "video/x-ms-wmv",
156 "video/x-flv"
157]
158
159def panic(e)
160 if e.is_a?(Exception)
161 Sentry.capture_exception(e, hint: { background: false })
162 else
163 Sentry.capture_message(e.to_s, hint: { background: false })
164 end
165 LOG.fatal("Shutting down gateway", e)
166 SGXbwmsgsv2.shutdown
167 LOG.info "Gateway has terminated"
168 EM.stop
169end
170
171EM.error_handler(&method(:panic))
172
173def extract_shortcode(dest)
174 num, context = dest.split(';', 2)
175 num if context == 'phone-context=ca-us.phone-context.soprani.ca'
176end
177
178def anonymous_tel?(dest)
179 dest.split(';', 2)[1] == 'phone-context=anonymous.phone-context.soprani.ca'
180end
181
182class SGXClient < Blather::Client
183 def handle_data(stanza)
184 promise = EMPromise.resolve(nil).then {
185 with_sentry(stanza) do |scope|
186 super
187 rescue StandardError => e
188 handle_error(scope, stanza, e)
189 end
190 }.catch { |e| panic(e) }
191 promise.sync if ENV["ENV"] == "test"
192 promise
193 end
194
195 # Override the default call_handler to syncify during testing.
196 def call_handler(handler, guards, stanza)
197 result = if guards.first.respond_to?(:to_str)
198 found = stanza.find(*guards)
199 throw :pass if found.empty?
200
201 handler.call(stanza, found)
202 else
203 throw :pass if guarded?(guards, stanza)
204
205 handler.call(stanza)
206 end
207
208 # Up to here, identical to upstream impl
209
210 return result unless result.is_a?(Promise)
211
212 result.sync if ENV["ENV"] == "test"
213 result
214 end
215
216protected
217
218 def with_sentry(stanza)
219 Sentry.clone_hub_to_current_thread
220
221 Sentry.with_scope do |scope|
222 setup_scope(stanza, scope)
223 yield scope
224 ensure
225 scope.get_transaction&.then do |tx|
226 tx.set_status("ok") unless tx.status
227 tx.finish
228 end
229 end
230 end
231
232 def setup_scope(stanza, scope)
233 name = stanza.respond_to?(:node) ? stanza.node : stanza.name
234 scope.clear_breadcrumbs
235 scope.set_transaction_name(name)
236 scope.set_user(jid: stanza.from&.stripped.to_s)
237
238 transaction = Sentry.start_transaction(
239 name: name,
240 op: "blather.handle_data"
241 )
242 scope.set_span(transaction) if transaction
243 end
244
245 def handle_error(scope, stanza, e)
246 LOG.error("Error during #{scope.transaction_name}", e)
247 Sentry.capture_exception(e) unless e.is_a?(Sentry::Error)
248 scope.get_transaction&.set_status("internal_error")
249 return if e.respond_to?(:replied?) && e.replied?
250
251 SGXbwmsgsv2.write_to_stream stanza.as_error("internal-server-error", :cancel)
252 end
253end
254
255# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
256module CatapultSettingFlagBits
257 VOICEMAIL_TRANSCRIPTION_DISABLED = 0
258 MMS_ON_OOB_URL = 1
259end
260
261module SGXbwmsgsv2
262 extend Blather::DSL
263
264 @registration_repo = RegistrationRepo.new
265 @client = SGXClient.new
266 @gateway_features = [
267 "http://jabber.org/protocol/disco#info",
268 "http://jabber.org/protocol/address/",
269 "jabber:iq:register",
270 "http://jabber.org/protocol/commands"
271 ]
272
273 def self.run
274 # TODO: read/save ARGV[7] creds to local variables
275 client.run
276 end
277
278 # so classes outside this module can write messages, too
279 def self.write(stanza)
280 client.write(stanza)
281 end
282
283 def self.before_handler(type, *guards, &block)
284 client.register_handler_before(type, *guards, &block)
285 end
286
287 def self.send_media(from, to, media_url, desc=nil, subject=nil, m=nil)
288 # we assume media_url is one of these (always the case so far):
289 # https://messaging.bandwidth.com/api/v2/users/[u]/media/[path]
290
291 LOG.debug("Original media URL", url: media_url)
292 usr = to
293 if media_url.start_with?('https://messaging.bandwidth.com/api/v2/users/')
294 pth = media_url.split('/', 9)[8]
295 # the caller must guarantee that 'to' is a bare JID
296 media_url = ARGV[6] + WEBrick::HTTPUtils.escape(usr) + '/' + pth
297 LOG.debug("Proxied media URL", url: media_url)
298 end
299
300 msg = m ? m.copy : Blather::Stanza::Message.new(to, "")
301 msg.from = from
302 msg.subject = subject if subject
303
304 # provide URL in XEP-0066 (OOB) fashion
305 x = Nokogiri::XML::Node.new 'x', msg.document
306 x['xmlns'] = 'jabber:x:oob'
307
308 urln = Nokogiri::XML::Node.new 'url', msg.document
309 urlc = Nokogiri::XML::Text.new media_url, msg.document
310 urln.add_child(urlc)
311 x.add_child(urln)
312
313 if desc
314 descn = Nokogiri::XML::Node.new('desc', msg.document)
315 descc = Nokogiri::XML::Text.new(desc, msg.document)
316 descn.add_child(descc)
317 x.add_child(descn)
318 end
319
320 msg.add_child(x)
321
322 write(msg)
323 rescue Exception => e
324 panic(e)
325 end
326
327 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, async: true
328
329 def self.pass_on_message(m, users_num, jid)
330 # Capture destination before modifying m.to
331 dest_num = m.to.node
332
333 # setup delivery receipt; similar to a reply
334 rcpt = ReceiptMessage.new(m.from.stripped)
335 rcpt.from = m.to
336
337 # pass original message (before sending receipt)
338 m.to = jid
339 m.from = "#{users_num}@#{ARGV[0]}"
340
341 write_to_stream m
342
343 # Emit pass-through event. Thru events don't capture a timestamp because XMPP
344 # stanzas don't carry timestamps for realtime messages, and the Redis stream
345 # ID provides the emit time.
346 oob_url = m.at("oob|x > oob|url", oob: "jabber:x:oob")&.text
347 MessageEvent::Thru.new(
348 owner: users_num,
349 from: users_num,
350 to: [dest_num],
351 stanza_id: m.id.to_s,
352 body: m.body.to_s,
353 media_urls: [oob_url].compact
354 ).emit(REDIS)
355
356 # send a delivery receipt back to the sender
357 # TODO: send only when requested per XEP-0184
358 # TODO: pass receipts from target if supported
359
360 # TODO: put in member/instance variable
361 rcpt['id'] = SecureRandom.uuid
362 rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
363 rcvd['xmlns'] = 'urn:xmpp:receipts'
364 rcvd['id'] = m.id
365 rcpt.add_child(rcvd)
366
367 write_to_stream rcpt
368 end
369
370 def self.call_catapult(
371 token, secret, m, pth, body=nil,
372 head={}, code=[200], respond_with=:body
373 )
374 # pth looks like one of:
375 # "api/v2/users/#{user_id}/[endpoint_name]"
376
377 url_prefix = ''
378
379 # TODO: need to make a separate thing for voice.bw.c eventually
380 if pth.start_with? 'api/v2/users'
381 url_prefix = 'https://messaging.bandwidth.com/'
382 end
383
384 EM::HttpRequest.new(
385 URI.parse(url_prefix + pth), tls: {verify_peer: true}
386 ).public_send(
387 "a#{m}",
388 head: {
389 'Authorization' => [token, secret]
390 }.merge(head),
391 body: body
392 ).then { |http|
393 if code.include?(http.response_header.status)
394 case respond_with
395 when :body
396 http.response
397 when :headers
398 http.response_header
399 else
400 http
401 end
402 else
403 EMPromise.reject(
404 BandwidthError.for(http.response_header.status, http.response)
405 )
406 end
407 }
408 end
409
410 def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
411 usern)
412 un = s.at("oob|x > oob|url", oob: "jabber:x:oob")
413 unless un
414 LOG.debug "No OOB URL node, processing as normal"
415 return to_catapult(s, nil, num_dest, user_id, token,
416 secret, usern)
417 end
418 LOG.debug "Found OOB URL node, checking MMS eligibility"
419
420 body = s.respond_to?(:body) ? s.body.to_s : ''
421
422 if num_dest.is_a?(String) && num_dest !~ /^\+?1/
423 unless body.include?(un.text)
424 s.body = body.empty? ? un.text : "#{body}\n#{un.text}"
425 end
426 return to_catapult(s, nil, num_dest, user_id, token, secret, usern)
427 end
428
429 EM::HttpRequest.new(URI.parse(un.text), tls: {verify_peer: true}).ahead.then { |http|
430 # If content is too large, or MIME type is not supported, place the link inside the body and do not send MMS.
431 if http.response_header["CONTENT_LENGTH"].to_i > 3500000 ||
432 !MMS_MIME_TYPES.include?(http.response_header["CONTENT_TYPE"])
433 unless body.include?(un.text)
434 s.body = body.empty? ? un.text : "#{body}\n#{un.text}"
435 end
436 to_catapult(s, nil, num_dest, user_id, token, secret, usern)
437 else # If size is less than ~3.5MB, strip the link from the body and attach media in the body.
438 # some clients send URI in both body & <url/> so delete
439 s.body = body.sub(/\s*#{Regexp.escape(un.text)}\s*$/, '')
440
441 LOG.debug("OOB MMS details", url: un.text, body: body.to_s.strip)
442 to_catapult(s, un.text, num_dest, user_id, token, secret, usern)
443 end
444 }
445 end
446
447 def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
448 body = s.respond_to?(:body) ? s.body.to_s : ''
449 if murl.to_s.empty? && body.strip.empty?
450 return EMPromise.reject(
451 [:modify, 'policy-violation']
452 )
453 end
454
455 if body.downcase.match?(BADWORDS)
456 return EMPromise.reject([
457 :wait,
458 'recipient-unavailable',
459 'Single message blocked by carrier content policy, see https://blog.jmp.chat/b/sms-censorship for details'
460 ])
461 end
462
463 if body =~ /\u2063/
464 return EMPromise.reject([
465 :wait,
466 'recipient-unavailable',
467 'Please contact JMP support about your message'
468 ])
469 end
470
471 segment_size = body.ascii_only? ? 160 : 70
472 if !murl && ENV["MMS_PATH"] && num_dest =~ /^\+?1/ && body.length > segment_size*3
473 file = Multibases.pack(
474 'base58btc',
475 Multihashes.encode(Digest::SHA256.digest(body), "sha2-256")
476 ).to_s
477 File.open("#{ENV['MMS_PATH']}/#{file}", "w") { |fh| fh.write body }
478 murl = "#{ENV['MMS_URL']}/#{file}.txt"
479 body = ""
480 end
481
482 extra = {}
483 extra[:media] = murl if murl
484
485 call_catapult(
486 token,
487 secret,
488 :post,
489 "api/v2/users/#{user_id}/messages",
490 JSON.dump(extra.merge(
491 from: usern,
492 to: num_dest,
493 text: body,
494 applicationId: ARGV[4],
495 tag:
496 # callbacks need id and resourcepart
497 WEBrick::HTTPUtils.escape(s.id.to_s) +
498 ' ' +
499 WEBrick::HTTPUtils.escape(
500 s.from.resource.to_s
501 )
502 )),
503 {'Content-Type' => 'application/json'},
504 [201, 202]
505 ).then { |response|
506 parsed = JSON.parse(response) rescue {}
507 MessageEvent::Out.new(
508 timestamp: parsed["time"] || Time.now,
509 owner: usern,
510 from: usern,
511 to: Array(num_dest),
512 stanza_id: s.id.to_s,
513 bandwidth_id: parsed["id"],
514 body: body,
515 media_urls: [murl].compact
516 ).emit(REDIS)
517 response
518 }.catch { |e|
519 EMPromise.reject(
520 [:cancel, 'internal-server-error', e.message]
521 )
522 }
523 end
524
525 def self.validate_num(m)
526 # if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
527 if m.to == ARGV[0]
528 an = m.children.find { |v| v.element_name == "addresses" }
529 if not an
530 return EMPromise.reject(
531 [:cancel, 'item-not-found']
532 )
533 end
534 LOG.debug "Found addresses node, iterating"
535
536 nums = []
537 an.children.each do |e|
538 num = ''
539 type = ''
540 e.attributes.each do |c|
541 if c[0] == 'type'
542 if c[1] != 'to'
543 # TODO: error
544 end
545 type = c[1].to_s
546 elsif c[0] == 'uri'
547 if !c[1].to_s.start_with? 'sms:'
548 # TODO: error
549 end
550 num = c[1].to_s[4..-1]
551 # TODO: confirm num validates
552 # TODO: else, error - unexpected name
553 end
554 end
555 if num.empty? or type.empty?
556 # TODO: error
557 end
558 nums << num
559 end
560 return nums
561 end
562
563 # if not sent to SGX domain, then assume destination is in 'to'
564 EMPromise.resolve(m.to.node.to_s).then { |num_dest|
565 if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
566 next num_dest if num_dest[0] == '+'
567
568 shortcode = extract_shortcode(num_dest)
569 next shortcode if shortcode
570 end
571
572 if anonymous_tel?(num_dest)
573 EMPromise.reject([:cancel, 'gone'])
574 else
575 # TODO: text re num not (yet) supportd/implmentd
576 EMPromise.reject([:cancel, 'item-not-found'])
577 end
578 }
579 end
580
581 def self.fetch_catapult_cred_for(jid)
582 @registration_repo.find(jid).then { |creds|
583 if creds.length < 4
584 # TODO: add text re credentials not registered
585 EMPromise.reject(
586 [:auth, 'registration-required']
587 )
588 else
589 creds
590 end
591 }
592 end
593
594 message :error? do |m|
595 # TODO: report it somewhere/somehow - eat for now so no err loop
596 LOG.warn("Eating error stanza", stanza: m.inspect)
597 true
598 end
599
600 message(->(m) { m.body || m.at("oob|x > oob|url", oob: "jabber:x:oob") }) do |m|
601 EMPromise.all([
602 validate_num(m),
603 fetch_catapult_cred_for(m.from)
604 ]).then { |(num_dest, creds)|
605 @registration_repo.find_jid(num_dest).then { |jid|
606 [jid, num_dest] + creds
607 }
608 }.then { |(jid, num_dest, *creds)|
609 if jid
610 @registration_repo.find(jid).then { |other_user|
611 [jid, num_dest] + creds + [other_user.first]
612 }
613 else
614 [jid, num_dest] + creds + [nil]
615 end
616 }.then { |(jid, num_dest, *creds, other_user)|
617 # if destination user is in the system pass on directly
618 if other_user and not other_user.start_with? 'u-'
619 pass_on_message(m, creds.last, jid)
620 else
621 to_catapult_possible_oob(m, num_dest, *creds)
622 end
623 }.catch { |e|
624 if e.is_a?(Array) && (e.length == 2 || e.length == 3)
625 write_to_stream m.as_error(e[1], e[0], e[2])
626 else
627 EMPromise.reject(e)
628 end
629 }
630 end
631
632 def self.user_cap_identities
633 [{category: 'client', type: 'sms'}]
634 end
635
636 # TODO: must re-add stuff so can do ad-hoc commands
637 def self.user_cap_features
638 ["urn:xmpp:receipts"]
639 end
640
641 def self.add_gateway_feature(feature)
642 @gateway_features << feature
643 @gateway_features.uniq!
644 end
645
646 subscription :request? do |p|
647 # subscriptions are allowed from anyone - send reply immediately
648 msg = Blather::Stanza::Presence.new
649 msg.to = p.from
650 msg.from = p.to
651 msg.type = :subscribed
652
653 write_to_stream msg
654
655 # send a <presence> immediately; not automatically probed for it
656 # TODO: refactor so no "presence :probe? do |p|" duplicate below
657 caps = Blather::Stanza::Capabilities.new
658 # TODO: user a better node URI (?)
659 caps.node = 'http://catapult.sgx.soprani.ca/'
660 caps.identities = user_cap_identities
661 caps.features = user_cap_features
662
663 msg = caps.c
664 msg.to = p.from
665 msg.from = p.to.to_s + '/sgx'
666
667 write_to_stream msg
668
669 # need to subscribe back so Conversations displays images inline
670 msg = Blather::Stanza::Presence.new
671 msg.to = p.from.to_s.split('/', 2)[0]
672 msg.from = p.to.to_s.split('/', 2)[0]
673 msg.type = :subscribe
674
675 write_to_stream msg
676 end
677
678 presence :probe? do |p|
679 caps = Blather::Stanza::Capabilities.new
680 # TODO: user a better node URI (?)
681 caps.node = 'http://catapult.sgx.soprani.ca/'
682 caps.identities = user_cap_identities
683 caps.features = user_cap_features
684
685 msg = caps.c
686 msg.to = p.from
687 msg.from = p.to.to_s + '/sgx'
688
689 write_to_stream msg
690 end
691
692 disco_items(
693 to: Blather::JID.new(ARGV[0]),
694 node: "http://jabber.org/protocol/commands"
695 ) do |i|
696 fetch_catapult_cred_for(i.from).then { |creds|
697 BandwidthTNOptions.tn_eligible_for_port_out_pin?(creds).then { |eligible|
698 reply = i.reply
699 reply.node = 'http://jabber.org/protocol/commands'
700
701 if eligible
702 reply.items = [
703 Blather::Stanza::DiscoItems::Item.new(
704 i.to,
705 'set-port-out-pin',
706 'Set Port-Out PIN'
707 )
708 ]
709 else
710 reply.items = []
711 end
712
713 write_to_stream reply
714 }
715 }.catch { |e|
716 if e.is_a?(Array) && [2, 3].include?(e.length)
717 write_to_stream i.as_error(e[1], e[0], e[2])
718 else
719 EMPromise.reject(e)
720 end
721 }
722 end
723
724 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#info' do |i|
725 # TODO: return error if i.type is :set - if it is :reply or
726 # :error it should be ignored (as the below does currently);
727 # review specification to see how to handle other type values
728 if i.type != :get
729 LOG.warn("Ignoring non-get disco IQ", type: i.type.to_s, stanza: i.inspect)
730 next
731 end
732
733 # respond to capabilities request for an sgx-bwmsgsv2 number JID
734 if i.to.node
735 # TODO: confirm the node URL is expected using below
736 #puts "XR[node]: #{xpath_result[0]['node']}"
737
738 msg = i.reply
739 msg.node = i.node
740 msg.identities = user_cap_identities
741 msg.features = user_cap_features
742
743 write_to_stream msg
744 next
745 end
746
747 # respond to capabilities request for sgx-bwmsgsv2 itself
748 msg = i.reply
749 msg.node = i.node
750 msg.identities = [{
751 name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
752 type: 'sms', category: 'gateway'
753 }]
754 msg.features = @gateway_features
755 write_to_stream msg
756 end
757
758 def self.check_then_register(i, *creds)
759 @registration_repo
760 .put(i.from, *creds)
761 .catch_only(RegistrationRepo::Conflict) { |e|
762 EMPromise.reject([:cancel, 'conflict', e.message])
763 }.then {
764 write_to_stream i.reply
765 }
766 end
767
768 def self.creds_from_registration_query(i)
769 if i.query.find_first("./ns:x", ns: "jabber:x:data")
770 [
771 i.form.field("nick")&.value,
772 i.form.field("username")&.value,
773 i.form.field("password")&.value,
774 i.form.field("phone")&.value
775 ]
776 else
777 [i.nick, i.username, i.password, i.phone]
778 end
779 end
780
781 def self.process_registration(i)
782 EMPromise.resolve(nil).then {
783 if i.remove?
784 @registration_repo.delete(i.from).then do
785 write_to_stream i.reply
786 EMPromise.reject(:done)
787 end
788 else
789 creds_from_registration_query(i)
790 end
791 }.then { |user_id, api_token, api_secret, phone_num|
792 if phone_num && phone_num[0] == '+'
793 [user_id, api_token, api_secret, phone_num]
794 else
795 # TODO: add text re number not (yet) supported
796 EMPromise.reject([:cancel, 'item-not-found'])
797 end
798 }.then { |user_id, api_token, api_secret, phone_num|
799 # TODO: find way to verify #{phone_num}, too
800 call_catapult(
801 api_token,
802 api_secret,
803 :get,
804 "api/v2/users/#{user_id}/media"
805 ).then { |response|
806 JSON.parse(response)
807 # TODO: confirm response is array - could be empty
808
809 LOG.debug("Registration verify response", response: response.to_s[0..999])
810
811 check_then_register(
812 i,
813 user_id,
814 api_token,
815 api_secret,
816 phone_num
817 )
818 }
819 }.catch_only(BandwidthError) { |e|
820 EMPromise.reject(case e.code
821 when 401
822 # TODO: add text re bad credentials
823 [:auth, 'not-authorized']
824 when 404
825 # TODO: add text re number not found or disabled
826 [:cancel, 'item-not-found']
827 else
828 [:modify, 'not-acceptable']
829 end)
830 }
831 end
832
833 def self.registration_form(orig, existing_number=nil)
834 orig.registered = !!existing_number
835
836 # TODO: update "User Id" x2 below (to "accountId"?), and others?
837 orig.instructions = "Enter the information from your Account "\
838 "page as well as the Phone Number\nin your "\
839 "account you want to use (ie. '+12345678901')"\
840 ".\nUser Id is nick, API Token is username, "\
841 "API Secret is password, Phone Number is phone"\
842 ".\n\nThe source code for this gateway is at "\
843 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
844 "\nCopyright (C) 2017-2020 Denver Gingerich "\
845 "and others, licensed under AGPLv3+."
846 orig.nick = ""
847 orig.username = ""
848 orig.password = ""
849 orig.phone = existing_number.to_s
850
851 orig.form.fields = [
852 {
853 required: true, type: :"text-single",
854 label: 'User Id', var: 'nick'
855 },
856 {
857 required: true, type: :"text-single",
858 label: 'API Token', var: 'username'
859 },
860 {
861 required: true, type: :"text-private",
862 label: 'API Secret', var: 'password'
863 },
864 {
865 required: true, type: :"text-single",
866 label: 'Phone Number', var: 'phone',
867 value: existing_number.to_s
868 }
869 ]
870 orig.form.title = 'Register for '\
871 'Soprani.ca Gateway to XMPP - Bandwidth API V2'
872 orig.form.instructions = "Enter the details from your Account "\
873 "page as well as the Phone Number\nin your "\
874 "account you want to use (ie. '+12345678901')"\
875 ".\n\nThe source code for this gateway is at "\
876 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
877 "\nCopyright (C) 2017-2020 Denver Gingerich "\
878 "and others, licensed under AGPLv3+."
879
880 orig
881 end
882
883 ibr do |i|
884 case i.type
885 when :set
886 process_registration(i)
887 when :get
888 bare_jid = i.from.stripped
889 @registration_repo.find(bare_jid).then { |creds|
890 reply = registration_form(i.reply, creds.last)
891 write_to_stream reply
892 }
893 else
894 # Unknown IQ, ignore for now
895 EMPromise.reject(:done)
896 end.catch { |e|
897 if e.is_a?(Array) && (e.length == 2 || e.length == 3)
898 write_to_stream i.as_error(e[1], e[0], e[2])
899 elsif e != :done
900 EMPromise.reject(e)
901 end
902 }.catch(&method(:panic))
903 end
904
905 command :execute?, node: "set-port-out-pin", sessionid: nil do |iq|
906 # Ensure user is registered, but discard their credentials
907 # because we're just showing them a form.
908 fetch_catapult_cred_for(iq.from).then { |_creds|
909 reply = iq.reply
910 reply.node = 'set-port-out-pin'
911 reply.sessionid = SecureRandom.uuid
912 reply.status = :executing
913
914 form = Blather::Stanza::X.find_or_create(reply.command)
915 form.type = "form"
916 form.fields = [
917 {
918 var: 'pin',
919 type: 'text-private',
920 label: 'Port-Out PIN',
921 required: true
922 },
923 {
924 var: 'confirm_pin',
925 type: 'text-private',
926 label: 'Confirm PIN',
927 required: true
928 }
929 ]
930
931 reply.command.add_child(form)
932 reply.allowed_actions = [:complete]
933
934 write_to_stream reply
935 }.catch { |e|
936 if e.is_a?(Array) && [2, 3].include?(e.length)
937 write_to_stream iq.as_error(e[1], e[0], e[2])
938 else
939 EMPromise.reject(e)
940 end
941 }.catch(&method(:panic))
942 end
943
944 command :complete?, node: "set-port-out-pin", sessionid: /./ do |iq|
945 pin = iq.form.field('pin')&.value
946 confirm_pin = iq.form.field('confirm_pin')&.value
947
948 if pin.nil? || confirm_pin.nil?
949 write_to_stream iq.as_error(
950 'bad-request',
951 :modify,
952 'PIN fields are required'
953 )
954 next
955 end
956
957 if pin != confirm_pin
958 write_to_stream iq.as_error(
959 'bad-request',
960 :modify,
961 'PIN confirmation does not match'
962 )
963 next
964 end
965
966 if pin !~ /\A[a-zA-Z0-9]{4,10}\z/
967 write_to_stream iq.as_error(
968 'bad-request',
969 :modify,
970 'PIN must be 4-10 alphanumeric characters'
971 )
972 next
973 end
974
975 fetch_catapult_cred_for(iq.from).then { |creds|
976 BandwidthTNOptions.set_port_out_pin(creds, pin).then {
977 reply = iq.reply
978 reply.node = 'set-port-out-pin'
979 reply.sessionid = iq.sessionid
980 reply.status = :completed
981 reply.note_type = :info
982 reply.note_text = 'Port-out PIN has been set successfully.'
983
984 write_to_stream reply
985 }.catch { |e|
986 reply = iq.reply
987 reply.node = 'set-port-out-pin'
988 reply.sessionid = iq.sessionid
989 reply.status = :completed
990 reply.note_type = :error
991 error_msg = if e.respond_to?(:message) && e.message.include?('not valid')
992 "Invalid phone number format. "\
993 "Please check your registered phone number."
994 elsif e.respond_to?(:message) && e.message.include?('ErrorCode')
995 "Bandwidth API error: #{e.message}"
996 else
997 "Failed to set port-out PIN. Please try again later."
998 end
999 reply.note_text = error_msg
1000
1001 write_to_stream reply
1002 }
1003 }.catch { |e|
1004 if e.is_a?(Array) && [2, 3].include?(e.length)
1005 write_to_stream iq.as_error(e[1], e[0], e[2])
1006 else
1007 EMPromise.reject(e)
1008 end
1009 }.catch(&method(:panic))
1010 end
1011
1012 iq type: [:get, :set] do |iq|
1013 write_to_stream(Blather::StanzaError.new(
1014 iq,
1015 'feature-not-implemented',
1016 :cancel
1017 ))
1018 end
1019end
1020
1021class ReceiptMessage < Blather::Stanza
1022 def self.new(to=nil)
1023 node = super :message
1024 node.to = to
1025 node
1026 end
1027end
1028
1029class WebhookHandler < Goliath::API
1030 use Sentry::Rack::CaptureExceptions
1031 use Goliath::Rack::Params
1032
1033 def response(env)
1034 @registration_repo = RegistrationRepo.new
1035 # TODO: add timestamp grab here, and MUST include ./tai version
1036
1037 LOG.debug("Webhook request env", env: env.reject { |k| k == 'params' })
1038
1039 if params.empty?
1040 LOG.warn "Empty webhook params"
1041 return [200, {}, "OK"]
1042 end
1043
1044 if env['REQUEST_URI'] != '/'
1045 LOG.warn("Non-/ request", uri: env['REQUEST_URI'], method: env['REQUEST_METHOD'])
1046 return [200, {}, "OK"]
1047 end
1048
1049 if env['REQUEST_METHOD'] != 'POST'
1050 LOG.warn("Non-POST request", uri: env['REQUEST_URI'], method: env['REQUEST_METHOD'])
1051 return [200, {}, "OK"]
1052 end
1053
1054 # TODO: process each message in list, not just first one
1055 jparams = params.dig('_json', 0, 'message')
1056 type = params.dig('_json', 0, 'type')
1057
1058 return [400, {}, "Missing params\n"] unless jparams && type
1059
1060 users_num, others_num =
1061 if type == 'message-failed' # NOTE: This implies direction == 'out'
1062 [jparams['from'], params['_json'][0]['to']]
1063 elsif jparams['direction'] == 'in'
1064 [jparams['owner'], jparams['from']]
1065 elsif jparams['direction'] == 'out'
1066 [jparams['from'], jparams['owner']] # NOTE: for outbound, 'from' == 'owner'
1067 else
1068 LOG.error("Unexpected message direction", direction: jparams['direction'])
1069 [jparams['from'], jparams['owner']]
1070 end
1071
1072 return [400, {}, "Missing params\n"] unless users_num && others_num
1073 return [400, {}, "Missing params\n"] unless jparams['to'].is_a?(Array)
1074 return [400, {}, "Missing params\n"] if jparams['to'].empty?
1075
1076 LOG.info(
1077 "Webhook message",
1078 message_id: jparams['id'],
1079 event_type: type,
1080 time: jparams['time'],
1081 direction: jparams['direction'],
1082 delivery_state: jparams['deliveryState'],
1083 error_code: jparams['errorCode'],
1084 description: jparams['description'],
1085 tag: jparams['tag'],
1086 media: jparams['media']
1087 )
1088
1089 if others_num[0] != '+'
1090 # TODO: check that others_num actually a shortcode first
1091 others_num +=
1092 ';phone-context=ca-us.phone-context.soprani.ca'
1093 end
1094
1095 bare_jid = @registration_repo.find_jid(users_num).sync
1096
1097 if !bare_jid
1098 LOG.warn("JID not found for number, BW API misconfigured?", users_num: users_num)
1099
1100 return [403, {}, "Customer not found\n"]
1101 end
1102
1103 msg = nil
1104 case jparams['direction']
1105 when 'in'
1106 text = ''
1107 case type
1108 when 'message-received'
1109 # TODO: handle group chat, and fix above
1110 text = jparams['text']
1111
1112 if text.to_s.empty? && Array(jparams['media']).empty?
1113 return [400, {}, "Missing params\n"]
1114 end
1115
1116 if jparams['to'].length > 1
1117 msg = Blather::Stanza::Message.new(
1118 Blather::JID.new(bare_jid).domain
1119 )
1120 msg.body = text unless text&.empty?
1121
1122 addrs = Nokogiri::XML::Node.new(
1123 'addresses', msg.document)
1124 addrs['xmlns'] = 'http://jabber.org/' \
1125 'protocol/address'
1126
1127 addr1 = Nokogiri::XML::Node.new(
1128 'address', msg.document)
1129 addr1['type'] = 'to'
1130 addr1['jid'] = bare_jid
1131 addrs.add_child(addr1)
1132
1133 jparams['to'].reject(
1134 # Don't send to the same person twice,
1135 # and don't send to the person who sent it
1136 &[users_num, others_num].method(:include?)
1137 ).each do |receiver|
1138 addrn = Nokogiri::XML::Node.new(
1139 'address', msg.document)
1140 addrn['type'] = 'to'
1141 addrn['uri'] = "sms:#{receiver}"
1142 addrn['delivered'] = 'true'
1143 addrs.add_child(addrn)
1144 end
1145
1146 msg.add_child(addrs)
1147 end
1148
1149 media_urls = Array(jparams['media']).map { |media_url|
1150 unless media_url.end_with?(
1151 '.smil', '.txt', '.xml'
1152 )
1153 SGXbwmsgsv2.send_media(
1154 others_num + '@' +
1155 ARGV[0],
1156 bare_jid, media_url,
1157 nil, nil, msg
1158 )
1159 media_url
1160 end
1161 }.compact
1162
1163 if text&.empty? || (media_urls.any? && jparams['to'].length > 1)
1164 if !env['HTTP_X_JMP_RESEND_OF'].to_s.empty?
1165 MessageEvent::ResendIn.new(
1166 original_stream_id: env['HTTP_X_JMP_RESEND_OF'],
1167 original_bandwidth_id: jparams['id'],
1168 owner: jparams['owner']
1169 ).emit(REDIS)
1170 else
1171 MessageEvent::In.new(
1172 timestamp: jparams['time'],
1173 from: jparams['from'],
1174 to: jparams['to'],
1175 owner: jparams['owner'],
1176 bandwidth_id: jparams['id'],
1177 body: jparams['text'].to_s,
1178 media_urls: media_urls
1179 ).emit(REDIS)
1180 end
1181
1182 return [200, {}, "OK"]
1183 end
1184 else
1185 text = "unknown type (#{type})" \
1186 " with text: #{jparams['text']}"
1187
1188 # TODO: log/notify of this properly
1189 LOG.warn("Unknown inbound message type", text: text)
1190 end
1191
1192 # If text is not empty, but there isn't a msg,
1193 # we need to construct a msg to convey that text
1194 unless msg || text.to_s.empty?
1195 msg = Blather::Stanza::Message.new(
1196 bare_jid,
1197 # Strip control codes.
1198 # This only happened, or at least caused a problem, once,
1199 # but it seems sensible to say that we shouldn't be getting
1200 # control codes from the PSTN.
1201 text.gsub(/[\u0000-\u0008\u000b-\u001f]/, "")
1202 )
1203 msg.document.encoding = "utf-8"
1204 msg.chat_state = nil
1205 end
1206 else # per prior switch, this is: jparams['direction'] == 'out'
1207 tag_parts = jparams['tag'].split(/ /, 2)
1208 id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1209 resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
1210
1211 # TODO: remove this hack
1212 if jparams['to'].length > 1
1213 case type
1214 when 'message-failed'
1215 MessageEvent::Failed.new(
1216 timestamp: jparams['time'],
1217 stanza_id: id,
1218 bandwidth_id: jparams['id'],
1219 error_code: jparams['errorCode'].to_s,
1220 error_description: jparams['description'].to_s
1221 ).emit(REDIS)
1222 when 'message-delivered'
1223 MessageEvent::Delivered.new(
1224 timestamp: jparams['time'],
1225 stanza_id: id,
1226 bandwidth_id: jparams['id']
1227 ).emit(REDIS)
1228 end
1229 LOG.warn("Group message, skipping receipt", users_num: users_num)
1230 return [200, {}, "OK"]
1231 end
1232
1233 case type
1234 when 'message-failed'
1235 # create a bare message like the one user sent
1236 msg = Blather::Stanza::Message.new(
1237 others_num + '@' + ARGV[0])
1238 msg.from = bare_jid + '/' + resourcepart
1239 msg['id'] = id
1240
1241 # TODO: add 'errorCode' and/or 'description' val
1242 # create an error reply to the bare message
1243 msg = msg.as_error(
1244 'recipient-unavailable',
1245 :wait,
1246 params['_json'][0]['description']
1247 )
1248
1249 when 'message-delivered'
1250
1251 msg = ReceiptMessage.new(bare_jid)
1252
1253 # TODO: put in member/instance variable
1254 msg['id'] = SecureRandom.uuid
1255
1256 # TODO: send only when requested per XEP-0184
1257 rcvd = Nokogiri::XML::Node.new(
1258 'received',
1259 msg.document
1260 )
1261 rcvd['xmlns'] = 'urn:xmpp:receipts'
1262 rcvd['id'] = id
1263 msg.add_child(rcvd)
1264
1265 # TODO: make prettier: this should be done above
1266 others_num = params['_json'][0]['to']
1267 else
1268 # TODO: notify somehow of unknown state receivd?
1269 LOG.warn("Unknown outbound message type", id: id, type: type)
1270 return [200, {}, "OK"]
1271 end
1272
1273 # Keeping this due to the `msg.from=` shuffle just below
1274 LOG.debug("Outbound callback response", stanza: msg.inspect)
1275 end
1276
1277 # if message-failed, we already set msg.from
1278 # moreover, we said `msg = msg.as_error`, and StanzaError
1279 msg.from = others_num + '@' + ARGV[0] if msg.respond_to?(:from=)
1280 SGXbwmsgsv2.write(msg)
1281
1282 # Emit event to messages stream
1283 case [jparams['direction'], type]
1284 when ['in', 'message-received']
1285 if !env['HTTP_X_JMP_RESEND_OF'].to_s.empty?
1286 MessageEvent::ResendIn.new(
1287 original_stream_id: env['HTTP_X_JMP_RESEND_OF'],
1288 original_bandwidth_id: jparams['id'],
1289 owner: jparams['owner']
1290 ).emit(REDIS)
1291 else
1292 media_urls = Array(jparams['media']).reject { |u|
1293 u.end_with?('.smil', '.txt', '.xml')
1294 }
1295 MessageEvent::In.new(
1296 timestamp: jparams['time'],
1297 from: jparams['from'],
1298 to: jparams['to'],
1299 owner: jparams['owner'],
1300 bandwidth_id: jparams['id'],
1301 body: jparams['text'].to_s,
1302 media_urls: media_urls
1303 ).emit(REDIS)
1304 end
1305 when ['out', 'message-failed']
1306 tag_parts = jparams['tag'].split(/ /, 2)
1307 stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1308 MessageEvent::Failed.new(
1309 timestamp: jparams['time'],
1310 stanza_id: stanza_id,
1311 bandwidth_id: jparams['id'],
1312 error_code: jparams['errorCode'].to_s,
1313 error_description: jparams['description'].to_s
1314 ).emit(REDIS)
1315 when ['out', 'message-delivered']
1316 tag_parts = jparams['tag'].split(/ /, 2)
1317 stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1318 MessageEvent::Delivered.new(
1319 timestamp: jparams['time'],
1320 stanza_id: stanza_id,
1321 bandwidth_id: jparams['id']
1322 ).emit(REDIS)
1323 end
1324
1325 [200, {}, "OK"]
1326 rescue Exception => e
1327 Sentry.capture_exception(e)
1328 [500, {}, "Error"]
1329 end
1330end
1331
1332at_exit do
1333 LOG.info("Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2",
1334 version: `git rev-parse HEAD`.chomp)
1335
1336 if ARGV.size != 7
1337 warn "Usage: sgx-bwmsgsv2.rb <component_jid> "\
1338 "<component_password> <server_hostname> "\
1339 "<server_port> <application_id> "\
1340 "<http_listen_port> <mms_proxy_prefix_url>"
1341 exit 0
1342 end
1343
1344 LOG.info "Starting"
1345
1346 EM.run do
1347 REDIS = EM::Hiredis.connect
1348
1349 SGXbwmsgsv2.run
1350
1351 # required when using Prosody otherwise disconnects on 6-hour inactivity
1352 EM.add_periodic_timer(3600) do
1353 msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1354 msg.from = ARGV[0]
1355 SGXbwmsgsv2.write(msg)
1356 end
1357
1358 server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
1359 server.api = WebhookHandler.new
1360 server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1361 server.logger = LOG
1362 server.start do
1363 ["INT", "TERM"].each do |sig|
1364 trap(sig) do
1365 EM.defer do
1366 LOG.info "Shutting down gateway"
1367 SGXbwmsgsv2.shutdown
1368
1369 LOG.info "Gateway has terminated"
1370 EM.stop
1371 end
1372 end
1373 end
1374 end
1375 end
1376end unless ENV['ENV'] == 'test'