1#!/usr/bin/env ruby
2# frozen_string_literal: true
3
4# Copyright (C) 2017-2020 Denver Gingerich <denver@ossguy.com>
5# Copyright (C) 2017 Stephen Paul Weber <singpolyma@singpolyma.net>
6#
7# This file is part of sgx-bwmsgsv2.
8#
9# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
10# the terms of the GNU Affero General Public License as published by the Free
11# Software Foundation, either version 3 of the License, or (at your option) any
12# later version.
13#
14# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
15# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17# details.
18#
19# You should have received a copy of the GNU Affero General Public License along
20# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
21
22require 'delegate'
23require 'blather/client/dsl'
24require 'em-hiredis'
25require 'em-http-request'
26require 'json'
27require 'multibases'
28require 'multihashes'
29require 'securerandom'
30require "sentry-ruby"
31require 'time'
32require 'uri'
33require 'webrick'
34
35require 'goliath/api'
36require 'goliath/server'
37require "ougai"
38
39require 'em_promise'
40require 'em-synchrony'
41
42require_relative 'lib/bandwidth_error'
43require_relative 'lib/bandwidth_tn_options'
44require_relative 'lib/message_event'
45require_relative 'lib/registration_repo'
46
47Sentry.init
48
49require_relative 'lib/background_log'
50
51$stdout.sync = true
52LOG = Ougai::Logger.new(ENV["ENV"] == "test" ? $stdout : BackgroundLog.new($stdout))
53LOG.level = ENV.fetch("LOG_LEVEL", "info")
54LOG.formatter = Ougai::Formatters::Readable.new(
55 nil,
56 nil,
57 plain: !$stdout.isatty
58)
59Blather.logger = LOG
60EM::Hiredis.logger = LOG
61
62Sentry.init do |config|
63 config.logger = LOG
64 config.breadcrumbs_logger = [:sentry_logger]
65end
66
67BADWORD_LIST = [
68 "marijuana",
69 "psilocybin",
70 "cannabis",
71 "cocaine",
72 "heroin",
73 "meth",
74 "methamphetamine",
75 "methamphetamines",
76 "cigarette",
77 "tobacco",
78 "cbd",
79 "thc",
80 "morphine",
81 "incall",
82 "in-call",
83 "outcall",
84 "out-call",
85 "shrooms",
86 "lsd",
87 "kratom",
88 "mdma",
89 "addy",
90 "xanz",
91 "cialis",
92 "viagra",
93 "bbfs",
94 "fentanyl",
95 "opium",
96 "golden teacher",
97 "bbbj",
98 "canna",
99 "fuck",
100 "xanax",
101 "zarareturns",
102 "zarareturns.com",
103 "plantation",
104].freeze
105
106BADWORDS = Regexp.union(
107 BADWORD_LIST.map { |w| /\b#{Regexp.escape(w)}\b/ }
108)
109
110WHISPER_NUMBER = /\A\+?1?2266669977\z/.freeze
111
112# List of supported MIME types from Bandwidth - https://support.bandwidth.com/hc/en-us/articles/360014128994-What-MMS-file-types-are-supported-
113MMS_MIME_TYPES = [
114 "application/json",
115 "application/ogg",
116 "application/pdf",
117 "application/rtf",
118 "application/zip",
119 "application/x-tar",
120 "application/xml",
121 "application/gzip",
122 "application/x-bzip2",
123 "application/x-gzip",
124 "application/smil",
125 "application/javascript",
126 "audio/mp4",
127 "audio/mpeg",
128 "audio/ogg",
129 "audio/flac",
130 "audio/webm",
131 "audio/wav",
132 "audio/amr",
133 "audio/3gpp",
134 "image/bmp",
135 "image/gif",
136 "image/jpeg",
137 "image/pjpeg",
138 "image/png",
139 "image/svg+xml",
140 "image/tiff",
141 "image/webp",
142 "image/x-icon",
143 "text/css",
144 "text/csv",
145 "text/calendar",
146 "text/plain",
147 "text/javascript",
148 "text/vcard",
149 "text/vnd.wap.wml",
150 "text/xml",
151 "video/avi",
152 "video/mp4",
153 "video/mpeg",
154 "video/ogg",
155 "video/quicktime",
156 "video/webm",
157 "video/x-ms-wmv",
158 "video/x-flv"
159]
160
161# 1 MB
162MAX_MEDIA_SIZE = 1000000
163
164def panic(e)
165 EMPromise.resolve(nil).then {
166 if e.is_a?(Exception)
167 Sentry.capture_exception(e, hint: { background: false })
168 else
169 Sentry.capture_message(e.to_s, hint: { background: false })
170 end
171 LOG.fatal("Shutting down gateway", e)
172 SGXbwmsgsv2.shutdown
173 LOG.info "Gateway has terminated"
174 EM.stop
175 }
176end
177
178EM.error_handler(&method(:panic))
179
180def extract_shortcode(dest)
181 num, context = dest.split(';', 2)
182 num if context == 'phone-context=ca-us.phone-context.soprani.ca'
183end
184
185def anonymous_tel?(dest)
186 dest.split(';', 2)[1] == 'phone-context=anonymous.phone-context.soprani.ca'
187end
188
189class SGXClient < Blather::Client
190 def handle_data(stanza)
191 promise = EMPromise.resolve(nil).then {
192 with_sentry(stanza) do |scope|
193 super
194 rescue StandardError => e
195 handle_error(scope, stanza, e)
196 end
197 }.catch { |e| panic(e) }
198 promise.sync if ENV["ENV"] == "test"
199 promise
200 end
201
202 # Override the default call_handler to syncify during testing.
203 def call_handler(handler, guards, stanza)
204 result = if guards.first.respond_to?(:to_str)
205 found = stanza.find(*guards)
206 throw :pass if found.empty?
207
208 handler.call(stanza, found)
209 else
210 throw :pass if guarded?(guards, stanza)
211
212 handler.call(stanza)
213 end
214
215 # Up to here, identical to upstream impl
216
217 return result unless result.is_a?(Promise)
218
219 result.sync if ENV["ENV"] == "test"
220 result
221 end
222
223protected
224
225 def with_sentry(stanza)
226 Sentry.clone_hub_to_current_thread
227
228 Sentry.with_scope do |scope|
229 setup_scope(stanza, scope)
230 yield scope
231 ensure
232 scope.get_transaction&.then do |tx|
233 tx.set_status("ok") unless tx.status
234 tx.finish
235 end
236 end
237 end
238
239 def setup_scope(stanza, scope)
240 name = stanza.respond_to?(:node) ? stanza.node : stanza.name
241 scope.clear_breadcrumbs
242 scope.set_transaction_name(name)
243 scope.set_user(jid: stanza.from&.stripped.to_s)
244
245 transaction = Sentry.start_transaction(
246 name: name,
247 op: "blather.handle_data"
248 )
249 scope.set_span(transaction) if transaction
250 end
251
252 def handle_error(scope, stanza, e)
253 LOG.error("Error during #{scope.transaction_name}", e)
254 Sentry.capture_exception(e) unless e.is_a?(Sentry::Error)
255 scope.get_transaction&.set_status("internal_error")
256 return if e.respond_to?(:replied?) && e.replied?
257
258 SGXbwmsgsv2.write_to_stream stanza.as_error("internal-server-error", :cancel)
259 end
260end
261
262# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
263module CatapultSettingFlagBits
264 VOICEMAIL_TRANSCRIPTION_DISABLED = 0
265 MMS_ON_OOB_URL = 1
266end
267
268module SGXbwmsgsv2
269 extend Blather::DSL
270
271 @registration_repo = RegistrationRepo.new
272 @client = SGXClient.new
273 @gateway_features = [
274 "http://jabber.org/protocol/disco#info",
275 "http://jabber.org/protocol/address/",
276 "jabber:iq:register",
277 "http://jabber.org/protocol/commands"
278 ]
279
280 def self.run
281 # TODO: read/save ARGV[7] creds to local variables
282 client.run
283 end
284
285 # so classes outside this module can write messages, too
286 def self.write(stanza)
287 client.write(stanza)
288 end
289
290 def self.send_media(from, to, media_url, desc=nil, subject=nil, m=nil)
291 # we assume media_url is one of these (always the case so far):
292 # https://messaging.bandwidth.com/api/v2/users/[u]/media/[path]
293
294 LOG.debug("Original media URL", url: media_url)
295 usr = to
296 if media_url.start_with?('https://messaging.bandwidth.com/api/v2/users/')
297 pth = media_url.split('/', 9)[8]
298 # the caller must guarantee that 'to' is a bare JID
299 media_url = ARGV[6] + WEBrick::HTTPUtils.escape(usr) + '/' + pth
300 LOG.debug("Proxied media URL", url: media_url)
301 end
302
303 msg = m ? m.copy : Blather::Stanza::Message.new(to)
304 msg.from = from
305 msg.subject = subject if subject
306
307 # provide URL in XEP-0066 (OOB) fashion
308 x = Nokogiri::XML::Node.new 'x', msg.document
309 x['xmlns'] = 'jabber:x:oob'
310
311 urln = Nokogiri::XML::Node.new 'url', msg.document
312 urlc = Nokogiri::XML::Text.new media_url, msg.document
313 urln.add_child(urlc)
314 x.add_child(urln)
315
316 if desc
317 descn = Nokogiri::XML::Node.new('desc', msg.document)
318 descc = Nokogiri::XML::Text.new(desc, msg.document)
319 descn.add_child(descc)
320 x.add_child(descn)
321 end
322
323 msg.add_child(x)
324
325 store = Nokogiri::XML::Node.new 'store', msg.document
326 store['xmlns'] = 'urn:xmpp:hints'
327 msg.add_child(store)
328
329 write(msg)
330 rescue Exception => e
331 panic(e)
332 end
333
334 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, async: true
335
336 def self.pass_on_message(m, users_num, jid)
337 # Capture destination before modifying m.to
338 dest_num = m.to.node
339
340 # setup delivery receipt; similar to a reply
341 rcpt = ReceiptMessage.new(m.from.stripped)
342 rcpt.from = m.to
343
344 # pass original message (before sending receipt)
345 m.to = jid
346 m.from = "#{users_num}@#{ARGV[0]}"
347
348 write_to_stream m
349
350 # Emit pass-through event. Thru events don't capture a timestamp because XMPP
351 # stanzas don't carry timestamps for realtime messages, and the Redis stream
352 # ID provides the emit time.
353 oob_url = m.at("oob|x > oob|url", oob: "jabber:x:oob")&.text
354 MessageEvent::Thru.new(
355 owner: users_num,
356 from: users_num,
357 to: [dest_num],
358 stanza_id: m.id.to_s,
359 body: m.body.to_s,
360 media_urls: [oob_url].compact
361 ).emit(REDIS)
362
363 # send a delivery receipt back to the sender
364 # TODO: send only when requested per XEP-0184
365 # TODO: pass receipts from target if supported
366
367 # TODO: put in member/instance variable
368 rcpt['id'] = SecureRandom.uuid
369 rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
370 rcvd['xmlns'] = 'urn:xmpp:receipts'
371 rcvd['id'] = m.id
372 rcpt.add_child(rcvd)
373
374 write_to_stream rcpt
375
376 true
377 end
378
379 def self.call_catapult(
380 token, secret, m, pth, body=nil,
381 head={}, code=[200], respond_with=:body
382 )
383 # pth looks like one of:
384 # "api/v2/users/#{user_id}/[endpoint_name]"
385
386 url_prefix = ''
387
388 # TODO: need to make a separate thing for voice.bw.c eventually
389 if pth.start_with? 'api/v2/users'
390 url_prefix = 'https://messaging.bandwidth.com/'
391 end
392
393 EM::HttpRequest.new(
394 URI.parse(url_prefix + pth), tls: {verify_peer: true}
395 ).public_send(
396 "a#{m}",
397 head: {
398 'Authorization' => [token, secret]
399 }.merge(head),
400 body: body
401 ).then { |http|
402 if code.include?(http.response_header.status)
403 case respond_with
404 when :body
405 http.response
406 when :headers
407 http.response_header
408 else
409 http
410 end
411 else
412 EMPromise.reject(
413 BandwidthError.for(http.response_header.status, http.response)
414 )
415 end
416 }
417 end
418
419 def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
420 usern)
421 un = s.at("oob|x > oob|url", oob: "jabber:x:oob")
422 unless un
423 LOG.debug "No OOB URL node, processing as normal"
424 return to_catapult(s, nil, num_dest, user_id, token,
425 secret, usern)
426 end
427 LOG.debug "Found OOB URL node, checking MMS eligibility"
428
429 body = s.respond_to?(:body) ? s.body.to_s : ''
430
431 if num_dest.is_a?(String) && num_dest !~ /^\+?1/
432 unless body.include?(un.text)
433 s.body = body.empty? ? un.text : "#{body}\n#{un.text}"
434 end
435 return to_catapult(s, nil, num_dest, user_id, token, secret, usern)
436 end
437
438 EM::HttpRequest.new(URI.parse(un.text), tls: {verify_peer: true}).ahead.then { |http|
439 # If content is too large, or MIME type is not supported, place the link inside the body and do not send MMS.
440 if http.response_header["CONTENT_LENGTH"].to_i > MAX_MEDIA_SIZE ||
441 !MMS_MIME_TYPES.include?(http.response_header["CONTENT_TYPE"])
442 unless body.include?(un.text)
443 s.body = body.empty? ? un.text : "#{body}\n#{un.text}"
444 end
445 to_catapult(s, nil, num_dest, user_id, token, secret, usern)
446 else # If size is less than ~3.5MB, strip the link from the body and attach media in the body.
447 # some clients send URI in both body & <url/> so delete
448 s.body = body.sub(/\s*#{Regexp.escape(un.text)}\s*$/, '')
449
450 LOG.debug("OOB MMS details", url: un.text, body: body.to_s.strip)
451 to_catapult(s, un.text, num_dest, user_id, token, secret, usern)
452 end
453 }.catch { |e|
454 LOG.error("Error sending MMS, falling back to sending link", error: e)
455 unless body.include?(un.text)
456 s.body = body.empty? ? un.text : "#{body}\n#{un.text}"
457 end
458 to_catapult(s, nil, num_dest, user_id, token, secret, usern)
459 }
460 end
461
462 def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
463 body = s.respond_to?(:body) ? s.body.to_s : ''
464 if murl.to_s.empty? && body.strip.empty?
465 return EMPromise.reject(
466 [:modify, 'policy-violation']
467 )
468 end
469
470 if body.downcase.match?(BADWORDS)
471 return EMPromise.reject([
472 :wait,
473 'recipient-unavailable',
474 'Single message blocked by carrier content policy, see https://blog.jmp.chat/b/sms-censorship for details'
475 ])
476 end
477
478 if body =~ /\u2063/
479 return EMPromise.reject([
480 :wait,
481 'recipient-unavailable',
482 'Please contact JMP support about your message'
483 ])
484 end
485
486 if usern.match?(WHISPER_NUMBER)
487 return EMPromise.reject([
488 :cancel,
489 'recipient-unavailable',
490 'Please register with a backend'
491 ])
492 end
493
494 segment_size = body.ascii_only? ? 160 : 70
495 if !murl && ENV["MMS_PATH"] && num_dest =~ /^\+?1/ && body.length > segment_size*3
496 file = Multibases.pack(
497 'base58btc',
498 Multihashes.encode(Digest::SHA256.digest(body), "sha2-256")
499 ).to_s
500 File.open("#{ENV['MMS_PATH']}/#{file}", "w") { |fh| fh.write body }
501 murl = "#{ENV['MMS_URL']}/#{file}.txt"
502 body = ""
503 end
504
505 extra = {}
506 extra[:media] = murl if murl
507
508 call_catapult(
509 token,
510 secret,
511 :post,
512 "api/v2/users/#{user_id}/messages",
513 JSON.dump(extra.merge(
514 from: usern,
515 to: num_dest,
516 text: body,
517 applicationId: ARGV[4],
518 tag:
519 # callbacks need id and resourcepart
520 WEBrick::HTTPUtils.escape(s.id.to_s) +
521 ' ' +
522 WEBrick::HTTPUtils.escape(
523 s.from.resource.to_s
524 )
525 )),
526 {'Content-Type' => 'application/json'},
527 [201, 202]
528 ).then { |response|
529 parsed = JSON.parse(response) rescue {}
530 MessageEvent::Out.new(
531 timestamp: parsed["time"] || Time.now,
532 owner: usern,
533 from: usern,
534 to: Array(num_dest),
535 stanza_id: s.id.to_s,
536 bandwidth_id: parsed["id"],
537 body: body,
538 media_urls: [murl].compact
539 ).emit(REDIS)
540 response
541 }.catch { |e|
542 EMPromise.reject(
543 [:cancel, 'internal-server-error', e.message]
544 )
545 }
546 end
547
548 def self.validate_num(m)
549 # if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
550 if m.to == ARGV[0]
551 an = m.children.find { |v| v.element_name == "addresses" }
552 if not an
553 return EMPromise.reject(
554 [:cancel, 'item-not-found']
555 )
556 end
557 LOG.debug "Found addresses node, iterating"
558
559 nums = []
560 an.children.each do |e|
561 num = ''
562 type = ''
563 e.attributes.each do |c|
564 if c[0] == 'type'
565 if c[1] != 'to'
566 # TODO: error
567 end
568 type = c[1].to_s
569 elsif c[0] == 'uri'
570 if !c[1].to_s.start_with? 'sms:'
571 # TODO: error
572 end
573 num = c[1].to_s[4..-1]
574 # TODO: confirm num validates
575 # TODO: else, error - unexpected name
576 end
577 end
578 if num.empty? or type.empty?
579 # TODO: error
580 end
581 nums << num
582 end
583 return nums
584 end
585
586 # if not sent to SGX domain, then assume destination is in 'to'
587 EMPromise.resolve(m.to.node.to_s).then { |num_dest|
588 if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
589 next num_dest if num_dest[0] == '+'
590
591 shortcode = extract_shortcode(num_dest)
592 next shortcode if shortcode
593 end
594
595 if anonymous_tel?(num_dest)
596 EMPromise.reject([:cancel, 'gone'])
597 else
598 # TODO: text re num not (yet) supportd/implmentd
599 EMPromise.reject([:cancel, 'item-not-found'])
600 end
601 }
602 end
603
604 def self.fetch_catapult_cred_for(jid)
605 @registration_repo.find(jid).then { |creds|
606 if creds.length < 4
607 # TODO: add text re credentials not registered
608 EMPromise.reject(
609 [:auth, 'registration-required']
610 )
611 else
612 creds
613 end
614 }
615 end
616
617 message :error? do |m|
618 # TODO: report it somewhere/somehow - eat for now so no err loop
619 LOG.warn("Eating error stanza", stanza: m.inspect)
620 true
621 end
622
623 message(->(m) { m.body || m.at("oob|x > oob|url", oob: "jabber:x:oob") }) do |m|
624 EMPromise.all([
625 validate_num(m),
626 fetch_catapult_cred_for(m.from)
627 ]).then { |(num_dest, creds)|
628 @registration_repo.find_jid(num_dest).then { |jid|
629 [jid, num_dest] + creds
630 }
631 }.then { |(jid, num_dest, *creds)|
632 if jid
633 @registration_repo.find(jid).then { |other_user|
634 [jid, num_dest] + creds + [other_user.first]
635 }
636 else
637 [jid, num_dest] + creds + [nil]
638 end
639 }.then { |(jid, num_dest, *creds, other_user)|
640 # if destination user is in the system pass on directly
641 if other_user and not other_user.start_with? 'u-'
642 pass_on_message(m, creds.last, jid)
643 else
644 to_catapult_possible_oob(m, num_dest, *creds)
645 end
646 }.catch { |e|
647 if e.is_a?(Array) && (e.length == 2 || e.length == 3)
648 write_to_stream m.as_error(e[1], e[0], e[2])
649 else
650 EMPromise.reject(e)
651 end
652 }
653 end
654
655 def self.user_cap_identities
656 [{category: 'client', type: 'sms'}]
657 end
658
659 # TODO: must re-add stuff so can do ad-hoc commands
660 def self.user_cap_features
661 ["urn:xmpp:receipts"]
662 end
663
664 def self.add_gateway_feature(feature)
665 @gateway_features << feature
666 @gateway_features.uniq!
667 end
668
669 subscription :request? do |p|
670 # subscriptions are allowed from anyone - send reply immediately
671 msg = Blather::Stanza::Presence.new
672 msg.to = p.from
673 msg.from = p.to
674 msg.type = :subscribed
675
676 write_to_stream msg
677
678 # send a <presence> immediately; not automatically probed for it
679 # TODO: refactor so no "presence :probe? do |p|" duplicate below
680 caps = Blather::Stanza::Capabilities.new
681 # TODO: user a better node URI (?)
682 caps.node = 'http://catapult.sgx.soprani.ca/'
683 caps.identities = user_cap_identities
684 caps.features = user_cap_features
685
686 msg = caps.c
687 msg.to = p.from
688 msg.from = p.to.to_s + '/sgx'
689
690 write_to_stream msg
691
692 # need to subscribe back so Conversations displays images inline
693 msg = Blather::Stanza::Presence.new
694 msg.to = p.from.to_s.split('/', 2)[0]
695 msg.from = p.to.to_s.split('/', 2)[0]
696 msg.type = :subscribe
697
698 write_to_stream msg
699
700 true
701 end
702
703 presence :probe? do |p|
704 caps = Blather::Stanza::Capabilities.new
705 # TODO: user a better node URI (?)
706 caps.node = 'http://catapult.sgx.soprani.ca/'
707 caps.identities = user_cap_identities
708 caps.features = user_cap_features
709
710 msg = caps.c
711 msg.to = p.from
712 msg.from = p.to.to_s + '/sgx'
713
714 write_to_stream msg
715
716 true
717 end
718
719 disco_items(
720 to: Blather::JID.new(ARGV[0]),
721 node: "http://jabber.org/protocol/commands"
722 ) do |i|
723 fetch_catapult_cred_for(i.from).then { |creds|
724 BandwidthTNOptions.tn_eligible_for_port_out_pin?(creds).then { |eligible|
725 reply = i.reply
726 reply.node = 'http://jabber.org/protocol/commands'
727
728 if eligible
729 reply.items = [
730 Blather::Stanza::DiscoItems::Item.new(
731 i.to,
732 'set-port-out-pin',
733 'Set Port-Out PIN'
734 )
735 ]
736 else
737 reply.items = []
738 end
739
740 write_to_stream reply
741 }
742 }.catch { |e|
743 if e.is_a?(Array) && [2, 3].include?(e.length)
744 write_to_stream i.as_error(e[1], e[0], e[2])
745 else
746 EMPromise.reject(e)
747 end
748 }
749 end
750
751 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#info' do |i|
752 # TODO: return error if i.type is :set - if it is :reply or
753 # :error it should be ignored (as the below does currently);
754 # review specification to see how to handle other type values
755 if i.type != :get
756 LOG.warn("Ignoring non-get disco IQ", type: i.type.to_s, stanza: i.inspect)
757 next
758 end
759
760 # respond to capabilities request for an sgx-bwmsgsv2 number JID
761 if i.to.node
762 # TODO: confirm the node URL is expected using below
763 #puts "XR[node]: #{xpath_result[0]['node']}"
764
765 msg = i.reply
766 msg.node = i.node
767 msg.identities = user_cap_identities
768 msg.features = user_cap_features
769
770 write_to_stream msg
771 next
772 end
773
774 # respond to capabilities request for sgx-bwmsgsv2 itself
775 msg = i.reply
776 msg.node = i.node
777 msg.identities = [{
778 name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
779 type: 'sms', category: 'gateway'
780 }]
781 msg.features = @gateway_features
782 write_to_stream msg
783
784 true
785 end
786
787 def self.check_then_register(i, *creds)
788 @registration_repo
789 .put(i.from, *creds)
790 .catch_only(RegistrationRepo::Conflict) { |e|
791 EMPromise.reject([:cancel, 'conflict', e.message])
792 }.then {
793 write_to_stream i.reply
794 }
795 end
796
797 def self.creds_from_registration_query(i)
798 if i.query.find_first("./ns:x", ns: "jabber:x:data")
799 [
800 i.form.field("nick")&.value,
801 i.form.field("username")&.value,
802 i.form.field("password")&.value,
803 i.form.field("phone")&.value
804 ]
805 else
806 [i.nick, i.username, i.password, i.phone]
807 end
808 end
809
810 def self.process_registration(i)
811 EMPromise.resolve(nil).then {
812 if i.remove?
813 @registration_repo.delete(i.from).then do
814 write_to_stream i.reply
815 EMPromise.reject(:done)
816 end
817 else
818 creds_from_registration_query(i)
819 end
820 }.then { |user_id, api_token, api_secret, phone_num|
821 if phone_num && phone_num[0] == '+'
822 [user_id, api_token, api_secret, phone_num]
823 else
824 # TODO: add text re number not (yet) supported
825 EMPromise.reject([:cancel, 'item-not-found'])
826 end
827 }.then { |user_id, api_token, api_secret, phone_num|
828 # TODO: find way to verify #{phone_num}, too
829 call_catapult(
830 api_token,
831 api_secret,
832 :get,
833 "api/v2/users/#{user_id}/media",
834 nil,
835 {'Content-Type' => 'application/json'}
836 ).then { |response|
837 JSON.parse(response)
838 # TODO: confirm response is array - could be empty
839
840 LOG.debug("Registration verify response", response: response.to_s[0..999])
841
842 check_then_register(
843 i,
844 user_id,
845 api_token,
846 api_secret,
847 phone_num
848 )
849 }
850 }.catch_only(BandwidthError) { |e|
851 EMPromise.reject(case e.code
852 when 401
853 [:auth, 'not-authorized', e.to_s]
854 when 404
855 [:cancel, 'item-not-found', e.to_s]
856 else
857 [:modify, 'not-acceptable', e.to_s]
858 end)
859 }
860 end
861
862 def self.registration_form(orig, existing_number=nil)
863 orig.registered = !!existing_number
864
865 # TODO: update "User Id" x2 below (to "accountId"?), and others?
866 orig.instructions = "Enter the information from your Account "\
867 "page as well as the Phone Number\nin your "\
868 "account you want to use (ie. '+12345678901')"\
869 ".\nUser Id is nick, API Token is username, "\
870 "API Secret is password, Phone Number is phone"\
871 ".\n\nThe source code for this gateway is at "\
872 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
873 "\nCopyright (C) 2017-2020 Denver Gingerich "\
874 "and others, licensed under AGPLv3+."
875 orig.nick = ""
876 orig.username = ""
877 orig.password = ""
878 orig.phone = existing_number.to_s
879
880 orig.form.fields = [
881 {
882 required: true, type: :"text-single",
883 label: 'User Id', var: 'nick'
884 },
885 {
886 required: true, type: :"text-single",
887 label: 'API Token', var: 'username'
888 },
889 {
890 required: true, type: :"text-private",
891 label: 'API Secret', var: 'password'
892 },
893 {
894 required: true, type: :"text-single",
895 label: 'Phone Number', var: 'phone',
896 value: existing_number.to_s
897 }
898 ]
899 orig.form.title = 'Register for '\
900 'Soprani.ca Gateway to XMPP - Bandwidth API V2'
901 orig.form.instructions = "Enter the details from your Account "\
902 "page as well as the Phone Number\nin your "\
903 "account you want to use (ie. '+12345678901')"\
904 ".\n\nThe source code for this gateway is at "\
905 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
906 "\nCopyright (C) 2017-2020 Denver Gingerich "\
907 "and others, licensed under AGPLv3+."
908
909 orig
910 end
911
912 ibr do |i|
913 case i.type
914 when :set
915 process_registration(i)
916 when :get
917 bare_jid = i.from.stripped
918 @registration_repo.find(bare_jid).then { |creds|
919 reply = registration_form(i.reply, creds.last)
920 write_to_stream reply
921 }
922 else
923 # Unknown IQ, ignore for now
924 EMPromise.reject(:done)
925 end.catch { |e|
926 if e.is_a?(Array) && (e.length == 2 || e.length == 3)
927 write_to_stream i.as_error(e[1], e[0], e[2])
928 elsif e != :done
929 EMPromise.reject(e)
930 end
931 }.catch(&method(:panic))
932 end
933
934 command :execute?, node: "set-port-out-pin", sessionid: nil do |iq|
935 # Ensure user is registered, but discard their credentials
936 # because we're just showing them a form.
937 fetch_catapult_cred_for(iq.from).then { |_creds|
938 reply = iq.reply
939 reply.node = 'set-port-out-pin'
940 reply.sessionid = SecureRandom.uuid
941 reply.status = :executing
942
943 form = Blather::Stanza::X.find_or_create(reply.command)
944 form.type = "form"
945 form.fields = [
946 {
947 var: 'pin',
948 type: 'text-private',
949 label: 'Port-Out PIN',
950 required: true
951 },
952 {
953 var: 'confirm_pin',
954 type: 'text-private',
955 label: 'Confirm PIN',
956 required: true
957 }
958 ]
959
960 reply.command.add_child(form)
961 reply.allowed_actions = [:complete]
962
963 write_to_stream reply
964 }.catch { |e|
965 if e.is_a?(Array) && [2, 3].include?(e.length)
966 write_to_stream iq.as_error(e[1], e[0], e[2])
967 else
968 EMPromise.reject(e)
969 end
970 }.catch(&method(:panic))
971 end
972
973 command :complete?, node: "set-port-out-pin", sessionid: /./ do |iq|
974 pin = iq.form.field('pin')&.value
975 confirm_pin = iq.form.field('confirm_pin')&.value
976
977 if pin.nil? || confirm_pin.nil?
978 write_to_stream iq.as_error(
979 'bad-request',
980 :modify,
981 'PIN fields are required'
982 )
983 next
984 end
985
986 if pin != confirm_pin
987 write_to_stream iq.as_error(
988 'bad-request',
989 :modify,
990 'PIN confirmation does not match'
991 )
992 next
993 end
994
995 if pin !~ /\A[a-zA-Z0-9]{4,10}\z/
996 write_to_stream iq.as_error(
997 'bad-request',
998 :modify,
999 'PIN must be 4-10 alphanumeric characters'
1000 )
1001 next
1002 end
1003
1004 fetch_catapult_cred_for(iq.from).then { |creds|
1005 BandwidthTNOptions.set_port_out_pin(creds, pin).then {
1006 reply = iq.reply
1007 reply.node = 'set-port-out-pin'
1008 reply.sessionid = iq.sessionid
1009 reply.status = :completed
1010 reply.note_type = :info
1011 reply.note_text = 'Port-out PIN has been set successfully.'
1012
1013 write_to_stream reply
1014 }.catch { |e|
1015 reply = iq.reply
1016 reply.node = 'set-port-out-pin'
1017 reply.sessionid = iq.sessionid
1018 reply.status = :completed
1019 reply.note_type = :error
1020 error_msg = if e.respond_to?(:message) && e.message.include?('not valid')
1021 "Invalid phone number format. "\
1022 "Please check your registered phone number."
1023 elsif e.respond_to?(:message) && e.message.include?('ErrorCode')
1024 "Bandwidth API error: #{e.message}"
1025 else
1026 "Failed to set port-out PIN. Please try again later."
1027 end
1028 reply.note_text = error_msg
1029
1030 write_to_stream reply
1031 }
1032 }.catch { |e|
1033 if e.is_a?(Array) && [2, 3].include?(e.length)
1034 write_to_stream iq.as_error(e[1], e[0], e[2])
1035 else
1036 EMPromise.reject(e)
1037 end
1038 }.catch(&method(:panic))
1039 end
1040
1041 iq type: [:get, :set] do |iq|
1042 write_to_stream(Blather::StanzaError.new(
1043 iq,
1044 'feature-not-implemented',
1045 :cancel
1046 ))
1047
1048 true
1049 end
1050end
1051
1052class ReceiptMessage < Blather::Stanza
1053 def self.new(to=nil)
1054 node = super :message
1055 node.to = to
1056 node
1057 end
1058end
1059
1060class WebhookHandler < Goliath::API
1061 use Sentry::Rack::CaptureExceptions
1062 use Goliath::Rack::Params
1063
1064 def response(env)
1065 @registration_repo = RegistrationRepo.new
1066 # TODO: add timestamp grab here, and MUST include ./tai version
1067
1068 LOG.debug("Webhook request env", env: env.reject { |k| k == 'params' })
1069
1070 if params.empty?
1071 LOG.warn "Empty webhook params"
1072 return [200, {}, "OK"]
1073 end
1074
1075 if env['REQUEST_URI'] != '/'
1076 LOG.warn("Non-/ request", uri: env['REQUEST_URI'], method: env['REQUEST_METHOD'])
1077 return [200, {}, "OK"]
1078 end
1079
1080 if env['REQUEST_METHOD'] != 'POST'
1081 LOG.warn("Non-POST request", uri: env['REQUEST_URI'], method: env['REQUEST_METHOD'])
1082 return [200, {}, "OK"]
1083 end
1084
1085 # TODO: process each message in list, not just first one
1086 jparams = params.dig('_json', 0, 'message')
1087 type = params.dig('_json', 0, 'type')
1088
1089 return [400, {}, "Missing params\n"] unless jparams && type
1090
1091 users_num, others_num =
1092 if type == 'message-failed' # NOTE: This implies direction == 'out'
1093 [jparams['from'], params['_json'][0]['to']]
1094 elsif jparams['direction'] == 'in'
1095 [jparams['owner'], jparams['from']]
1096 elsif jparams['direction'] == 'out'
1097 [jparams['from'], jparams['owner']] # NOTE: for outbound, 'from' == 'owner'
1098 else
1099 LOG.error("Unexpected message direction", direction: jparams['direction'])
1100 [jparams['from'], jparams['owner']]
1101 end
1102
1103 return [400, {}, "Missing params\n"] unless users_num && others_num
1104 return [400, {}, "Missing params\n"] unless jparams['to'].is_a?(Array)
1105 return [400, {}, "Missing params\n"] if jparams['to'].empty?
1106
1107 LOG.info(
1108 "Webhook message",
1109 message_id: jparams['id'],
1110 event_type: type,
1111 time: jparams['time'],
1112 direction: jparams['direction'],
1113 delivery_state: jparams['deliveryState'],
1114 error_code: jparams['errorCode'],
1115 description: jparams['description'],
1116 tag: jparams['tag'],
1117 media: jparams['media']
1118 )
1119
1120 if others_num[0] != '+'
1121 # TODO: check that others_num actually a shortcode first
1122 others_num +=
1123 ';phone-context=ca-us.phone-context.soprani.ca'
1124 end
1125
1126 bare_jid = @registration_repo.find_jid(users_num).sync
1127
1128 if !bare_jid
1129 LOG.warn("JID not found for number, BW API misconfigured?", users_num: users_num)
1130
1131 return [403, {}, "Customer not found\n"]
1132 end
1133
1134 msg = nil
1135 case jparams['direction']
1136 when 'in'
1137 text = ''
1138 case type
1139 when 'message-received'
1140 # TODO: handle group chat, and fix above
1141 text = jparams['text']
1142
1143 if text.to_s.empty? && Array(jparams['media']).empty?
1144 return [400, {}, "Missing params\n"]
1145 end
1146
1147 if jparams['to'].length > 1
1148 msg = Blather::Stanza::Message.new(
1149 Blather::JID.new(bare_jid).domain
1150 )
1151 msg.body = text unless text&.empty?
1152
1153 addrs = Nokogiri::XML::Node.new(
1154 'addresses', msg.document)
1155 addrs['xmlns'] = 'http://jabber.org/' \
1156 'protocol/address'
1157
1158 addr1 = Nokogiri::XML::Node.new(
1159 'address', msg.document)
1160 addr1['type'] = 'to'
1161 addr1['jid'] = bare_jid
1162 addrs.add_child(addr1)
1163
1164 jparams['to'].reject(
1165 # Don't send to the same person twice,
1166 # and don't send to the person who sent it
1167 &[users_num, others_num].method(:include?)
1168 ).each do |receiver|
1169 addrn = Nokogiri::XML::Node.new(
1170 'address', msg.document)
1171 addrn['type'] = 'to'
1172 addrn['uri'] = "sms:#{receiver}"
1173 addrn['delivered'] = 'true'
1174 addrs.add_child(addrn)
1175 end
1176
1177 msg.add_child(addrs)
1178 end
1179
1180 media_urls = Array(jparams['media']).map { |media_url|
1181 unless media_url.end_with?(
1182 '.smil', '.txt', '.xml'
1183 )
1184 SGXbwmsgsv2.send_media(
1185 others_num + '@' +
1186 ARGV[0],
1187 bare_jid, media_url,
1188 nil, nil, msg
1189 )
1190 media_url
1191 end
1192 }.compact
1193
1194 if text&.empty? || (media_urls.any? && jparams['to'].length > 1)
1195 if !env['HTTP_X_JMP_RESEND_OF'].to_s.empty?
1196 MessageEvent::ResendIn.new(
1197 original_stream_id: env['HTTP_X_JMP_RESEND_OF'],
1198 original_bandwidth_id: jparams['id'],
1199 owner: jparams['owner']
1200 ).emit(REDIS)
1201 else
1202 MessageEvent::In.new(
1203 timestamp: jparams['time'],
1204 from: jparams['from'],
1205 to: jparams['to'],
1206 owner: jparams['owner'],
1207 bandwidth_id: jparams['id'],
1208 body: jparams['text'].to_s,
1209 media_urls: media_urls
1210 ).emit(REDIS)
1211 end
1212
1213 return [200, {}, "OK"]
1214 end
1215
1216 if !text || text.empty? || (jparams['to'].length > 1 && media_urls.any?)
1217 return [200, {}, "OK"]
1218 end
1219 else
1220 text = "unknown type (#{type})" \
1221 " with text: #{jparams['text']}"
1222
1223 # TODO: log/notify of this properly
1224 LOG.warn("Unknown inbound message type", text: text)
1225 end
1226
1227 # If text is not empty, but there isn't a msg,
1228 # we need to construct a msg to convey that text
1229 unless msg || text.to_s.empty?
1230 msg = Blather::Stanza::Message.new(
1231 bare_jid,
1232 # Strip control codes.
1233 # This only happened, or at least caused a problem, once,
1234 # but it seems sensible to say that we shouldn't be getting
1235 # control codes from the PSTN.
1236 text.gsub(/[\u0000-\u0008\u000b-\u001f]/, "")
1237 )
1238 msg.document.encoding = "utf-8"
1239 msg.chat_state = nil
1240 end
1241 else # per prior switch, this is: jparams['direction'] == 'out'
1242 tag_parts = jparams['tag'].split(/ /, 2)
1243 id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1244 resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
1245
1246 # TODO: remove this hack
1247 if jparams['to'].length > 1
1248 case type
1249 when 'message-failed'
1250 MessageEvent::Failed.new(
1251 timestamp: jparams['time'],
1252 stanza_id: id,
1253 bandwidth_id: jparams['id'],
1254 error_code: jparams['errorCode'].to_s,
1255 error_description: jparams['description'].to_s
1256 ).emit(REDIS)
1257 when 'message-delivered'
1258 MessageEvent::Delivered.new(
1259 timestamp: jparams['time'],
1260 stanza_id: id,
1261 bandwidth_id: jparams['id']
1262 ).emit(REDIS)
1263 end
1264 LOG.warn("Group message, skipping receipt", users_num: users_num)
1265 return [200, {}, "OK"]
1266 end
1267
1268 case type
1269 when 'message-failed'
1270 # create a bare message like the one user sent
1271 msg = Blather::Stanza::Message.new(
1272 others_num + '@' + ARGV[0])
1273 msg.from = bare_jid + '/' + resourcepart
1274 msg['id'] = id
1275
1276 # TODO: add 'errorCode' and/or 'description' val
1277 # create an error reply to the bare message
1278 msg = msg.as_error(
1279 'recipient-unavailable',
1280 :wait,
1281 params['_json'][0]['description']
1282 )
1283
1284 when 'message-delivered'
1285
1286 msg = ReceiptMessage.new(bare_jid)
1287
1288 # TODO: put in member/instance variable
1289 msg['id'] = SecureRandom.uuid
1290
1291 # TODO: send only when requested per XEP-0184
1292 rcvd = Nokogiri::XML::Node.new(
1293 'received',
1294 msg.document
1295 )
1296 rcvd['xmlns'] = 'urn:xmpp:receipts'
1297 rcvd['id'] = id
1298 msg.add_child(rcvd)
1299
1300 # TODO: make prettier: this should be done above
1301 others_num = params['_json'][0]['to']
1302 else
1303 # TODO: notify somehow of unknown state receivd?
1304 LOG.warn("Unknown outbound message type", id: id, type: type)
1305 return [200, {}, "OK"]
1306 end
1307
1308 # Keeping this due to the `msg.from=` shuffle just below
1309 LOG.debug("Outbound callback response", stanza: msg.inspect)
1310 end
1311
1312 # if message-failed, we already set msg.from
1313 # moreover, we said `msg = msg.as_error`, and StanzaError
1314 if msg
1315 msg.from = others_num + '@' + ARGV[0] if msg.respond_to?(:from=)
1316 SGXbwmsgsv2.write(msg)
1317 end
1318
1319 # Emit event to messages stream
1320 case [jparams['direction'], type]
1321 when ['in', 'message-received']
1322 if !env['HTTP_X_JMP_RESEND_OF'].to_s.empty?
1323 MessageEvent::ResendIn.new(
1324 original_stream_id: env['HTTP_X_JMP_RESEND_OF'],
1325 original_bandwidth_id: jparams['id'],
1326 owner: jparams['owner']
1327 ).emit(REDIS)
1328 else
1329 media_urls = Array(jparams['media']).reject { |u|
1330 u.end_with?('.smil', '.txt', '.xml')
1331 }
1332 MessageEvent::In.new(
1333 timestamp: jparams['time'],
1334 from: jparams['from'],
1335 to: jparams['to'],
1336 owner: jparams['owner'],
1337 bandwidth_id: jparams['id'],
1338 body: jparams['text'].to_s,
1339 media_urls: media_urls
1340 ).emit(REDIS)
1341 end
1342 when ['out', 'message-failed']
1343 tag_parts = jparams['tag'].split(/ /, 2)
1344 stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1345 MessageEvent::Failed.new(
1346 timestamp: jparams['time'],
1347 stanza_id: stanza_id,
1348 bandwidth_id: jparams['id'],
1349 error_code: jparams['errorCode'].to_s,
1350 error_description: jparams['description'].to_s
1351 ).emit(REDIS)
1352 when ['out', 'message-delivered']
1353 tag_parts = jparams['tag'].split(/ /, 2)
1354 stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1355 MessageEvent::Delivered.new(
1356 timestamp: jparams['time'],
1357 stanza_id: stanza_id,
1358 bandwidth_id: jparams['id']
1359 ).emit(REDIS)
1360 end
1361
1362 [200, {}, "OK"]
1363 rescue Exception => e
1364 Sentry.capture_exception(e)
1365 [500, {}, "Error"]
1366 end
1367end
1368
1369at_exit do
1370 LOG.info("Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2",
1371 version: `git rev-parse HEAD`.chomp)
1372
1373 if ARGV.size != 7
1374 warn "Usage: sgx-bwmsgsv2.rb <component_jid> "\
1375 "<component_password> <server_hostname> "\
1376 "<server_port> <application_id> "\
1377 "<http_listen_port> <mms_proxy_prefix_url>"
1378 exit 0
1379 end
1380
1381 LOG.info "Starting"
1382
1383 EM.run do
1384 REDIS = EM::Hiredis.connect
1385
1386 SGXbwmsgsv2.run
1387
1388 # required when using Prosody otherwise disconnects on 6-hour inactivity
1389 EM.add_periodic_timer(3600) do
1390 msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1391 msg.from = ARGV[0]
1392 SGXbwmsgsv2.write(msg)
1393 end
1394
1395 server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
1396 server.api = WebhookHandler.new
1397 server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1398 server.logger = LOG
1399 server.start do
1400 ["INT", "TERM"].each do |sig|
1401 trap(sig) do
1402 EM.defer do
1403 LOG.info "Shutting down gateway"
1404 SGXbwmsgsv2.shutdown
1405
1406 LOG.info "Gateway has terminated"
1407 EM.stop
1408 end
1409 end
1410 end
1411 end
1412 end
1413end unless ENV['ENV'] == 'test'