sgx-bwmsgsv2.rb

   1#!/usr/bin/env ruby
   2# frozen_string_literal: true
   3
   4# Copyright (C) 2017-2020  Denver Gingerich <denver@ossguy.com>
   5# Copyright (C) 2017  Stephen Paul Weber <singpolyma@singpolyma.net>
   6#
   7# This file is part of sgx-bwmsgsv2.
   8#
   9# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
  10# the terms of the GNU Affero General Public License as published by the Free
  11# Software Foundation, either version 3 of the License, or (at your option) any
  12# later version.
  13#
  14# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
  15# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16# FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more
  17# details.
  18#
  19# You should have received a copy of the GNU Affero General Public License along
  20# with sgx-bwmsgsv2.  If not, see <http://www.gnu.org/licenses/>.
  21
  22require 'delegate'
  23require 'blather/client/dsl'
  24require 'em-hiredis'
  25require 'em-http-request'
  26require 'json'
  27require 'multibases'
  28require 'multihashes'
  29require 'securerandom'
  30require "sentry-ruby"
  31require 'time'
  32require 'uri'
  33require 'webrick'
  34
  35require 'goliath/api'
  36require 'goliath/server'
  37require "ougai"
  38
  39require 'em_promise'
  40require 'em-synchrony'
  41
  42require_relative 'lib/bandwidth_error'
  43require_relative 'lib/bandwidth_tn_options'
  44require_relative 'lib/message_event'
  45require_relative 'lib/registration_repo'
  46
  47Sentry.init
  48
  49require_relative 'lib/background_log'
  50
  51$stdout.sync = true
  52LOG = Ougai::Logger.new(ENV["ENV"] == "test" ? $stdout : BackgroundLog.new($stdout))
  53LOG.level = ENV.fetch("LOG_LEVEL", "info")
  54LOG.formatter = Ougai::Formatters::Readable.new(
  55	nil,
  56	nil,
  57	plain: !$stdout.isatty
  58)
  59Blather.logger = LOG
  60EM::Hiredis.logger = LOG
  61
  62Sentry.init do |config|
  63	config.logger = LOG
  64	config.breadcrumbs_logger = [:sentry_logger]
  65end
  66
  67BADWORD_LIST = [
  68	"marijuana",
  69	"psilocybin",
  70	"cannabis",
  71	"cocaine",
  72	"heroin",
  73	"meth",
  74	"methamphetamine",
  75	"methamphetamines",
  76	"cigarette",
  77	"tobacco",
  78	"cbd",
  79	"thc",
  80	"morphine",
  81	"incall",
  82	"in-call",
  83	"outcall",
  84	"out-call",
  85	"shrooms",
  86	"lsd",
  87	"kratom",
  88	"mdma",
  89	"addy",
  90	"xanz",
  91	"cialis",
  92	"viagra",
  93	"bbfs",
  94	"fentanyl",
  95	"opium",
  96	"golden teacher",
  97	"bbbj",
  98	"canna",
  99	"fuck",
 100	"xanax",
 101	"zarareturns",
 102	"zarareturns.com",
 103	"plantation",
 104].freeze
 105
 106BADWORDS = Regexp.union(
 107	BADWORD_LIST.map { |w| /\b#{Regexp.escape(w)}\b/ }
 108)
 109
 110# List of supported MIME types from Bandwidth - https://support.bandwidth.com/hc/en-us/articles/360014128994-What-MMS-file-types-are-supported-
 111MMS_MIME_TYPES = [
 112	"application/json",
 113	"application/ogg",
 114	"application/pdf",
 115	"application/rtf",
 116	"application/zip",
 117	"application/x-tar",
 118	"application/xml",
 119	"application/gzip",
 120	"application/x-bzip2",
 121	"application/x-gzip",
 122	"application/smil",
 123	"application/javascript",
 124	"audio/mp4",
 125	"audio/mpeg",
 126	"audio/ogg",
 127	"audio/flac",
 128	"audio/webm",
 129	"audio/wav",
 130	"audio/amr",
 131	"audio/3gpp",
 132	"image/bmp",
 133	"image/gif",
 134	"image/jpeg",
 135	"image/pjpeg",
 136	"image/png",
 137	"image/svg+xml",
 138	"image/tiff",
 139	"image/webp",
 140	"image/x-icon",
 141	"text/css",
 142	"text/csv",
 143	"text/calendar",
 144	"text/plain",
 145	"text/javascript",
 146	"text/vcard",
 147	"text/vnd.wap.wml",
 148	"text/xml",
 149	"video/avi",
 150	"video/mp4",
 151	"video/mpeg",
 152	"video/ogg",
 153	"video/quicktime",
 154	"video/webm",
 155	"video/x-ms-wmv",
 156	"video/x-flv"
 157]
 158
 159def panic(e)
 160	if e.is_a?(Exception)
 161		Sentry.capture_exception(e, hint: { background: false })
 162	else
 163		Sentry.capture_message(e.to_s, hint: { background: false })
 164	end
 165	LOG.fatal("Shutting down gateway", e)
 166	SGXbwmsgsv2.shutdown
 167	LOG.info "Gateway has terminated"
 168	EM.stop
 169end
 170
 171EM.error_handler(&method(:panic))
 172
 173def extract_shortcode(dest)
 174	num, context = dest.split(';', 2)
 175	num if context == 'phone-context=ca-us.phone-context.soprani.ca'
 176end
 177
 178def anonymous_tel?(dest)
 179	dest.split(';', 2)[1] == 'phone-context=anonymous.phone-context.soprani.ca'
 180end
 181
 182class SGXClient < Blather::Client
 183	def handle_data(stanza)
 184		promise = EMPromise.resolve(nil).then {
 185			with_sentry(stanza) do |scope|
 186				super
 187			rescue StandardError => e
 188				handle_error(scope, stanza, e)
 189			end
 190		}.catch { |e| panic(e) }
 191		promise.sync if ENV["ENV"] == "test"
 192		promise
 193	end
 194
 195	# Override the default call_handler to syncify during testing.
 196	def call_handler(handler, guards, stanza)
 197		result = if guards.first.respond_to?(:to_str)
 198			found = stanza.find(*guards)
 199			throw :pass if found.empty?
 200
 201			handler.call(stanza, found)
 202		else
 203			throw :pass if guarded?(guards, stanza)
 204
 205			handler.call(stanza)
 206		end
 207
 208		# Up to here, identical to upstream impl
 209
 210		return result unless result.is_a?(Promise)
 211
 212		result.sync if ENV["ENV"] == "test"
 213		result
 214	end
 215
 216protected
 217
 218	def with_sentry(stanza)
 219		Sentry.clone_hub_to_current_thread
 220
 221		Sentry.with_scope do |scope|
 222			setup_scope(stanza, scope)
 223			yield scope
 224		ensure
 225			scope.get_transaction&.then do |tx|
 226				tx.set_status("ok") unless tx.status
 227				tx.finish
 228			end
 229		end
 230	end
 231
 232	def setup_scope(stanza, scope)
 233		name = stanza.respond_to?(:node) ? stanza.node : stanza.name
 234		scope.clear_breadcrumbs
 235		scope.set_transaction_name(name)
 236		scope.set_user(jid: stanza.from&.stripped.to_s)
 237
 238		transaction = Sentry.start_transaction(
 239			name: name,
 240			op: "blather.handle_data"
 241		)
 242		scope.set_span(transaction) if transaction
 243	end
 244
 245	def handle_error(scope, stanza, e)
 246		LOG.error("Error during #{scope.transaction_name}", e)
 247		Sentry.capture_exception(e) unless e.is_a?(Sentry::Error)
 248		scope.get_transaction&.set_status("internal_error")
 249		return if e.respond_to?(:replied?) && e.replied?
 250
 251		SGXbwmsgsv2.write_to_stream stanza.as_error("internal-server-error", :cancel)
 252	end
 253end
 254
 255# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
 256module CatapultSettingFlagBits
 257	VOICEMAIL_TRANSCRIPTION_DISABLED = 0
 258	MMS_ON_OOB_URL = 1
 259end
 260
 261module SGXbwmsgsv2
 262	extend Blather::DSL
 263
 264	@registration_repo = RegistrationRepo.new
 265	@client = SGXClient.new
 266	@gateway_features = [
 267		"http://jabber.org/protocol/disco#info",
 268		"http://jabber.org/protocol/address/",
 269		"jabber:iq:register",
 270		"http://jabber.org/protocol/commands"
 271	]
 272
 273	def self.run
 274		# TODO: read/save ARGV[7] creds to local variables
 275		client.run
 276	end
 277
 278	# so classes outside this module can write messages, too
 279	def self.write(stanza)
 280		client.write(stanza)
 281	end
 282
 283	def self.before_handler(type, *guards, &block)
 284		client.register_handler_before(type, *guards, &block)
 285	end
 286
 287	def self.send_media(from, to, media_url, desc=nil, subject=nil, m=nil)
 288		# we assume media_url is one of these (always the case so far):
 289		#  https://messaging.bandwidth.com/api/v2/users/[u]/media/[path]
 290
 291		LOG.debug("Original media URL", url: media_url)
 292		usr = to
 293		if media_url.start_with?('https://messaging.bandwidth.com/api/v2/users/')
 294			pth = media_url.split('/', 9)[8]
 295			# the caller must guarantee that 'to' is a bare JID
 296			media_url = ARGV[6] + WEBrick::HTTPUtils.escape(usr) + '/' + pth
 297			LOG.debug("Proxied media URL", url: media_url)
 298		end
 299
 300		msg = m ? m.copy : Blather::Stanza::Message.new(to, "")
 301		msg.from = from
 302		msg.subject = subject if subject
 303
 304		# provide URL in XEP-0066 (OOB) fashion
 305		x = Nokogiri::XML::Node.new 'x', msg.document
 306		x['xmlns'] = 'jabber:x:oob'
 307
 308		urln = Nokogiri::XML::Node.new 'url', msg.document
 309		urlc = Nokogiri::XML::Text.new media_url, msg.document
 310		urln.add_child(urlc)
 311		x.add_child(urln)
 312
 313		if desc
 314			descn = Nokogiri::XML::Node.new('desc', msg.document)
 315			descc = Nokogiri::XML::Text.new(desc, msg.document)
 316			descn.add_child(descc)
 317			x.add_child(descn)
 318		end
 319
 320		msg.add_child(x)
 321
 322		write(msg)
 323	rescue Exception => e
 324		panic(e)
 325	end
 326
 327	setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, async: true
 328
 329	def self.pass_on_message(m, users_num, jid)
 330		# Capture destination before modifying m.to
 331		dest_num = m.to.node
 332
 333		# setup delivery receipt; similar to a reply
 334		rcpt = ReceiptMessage.new(m.from.stripped)
 335		rcpt.from = m.to
 336
 337		# pass original message (before sending receipt)
 338		m.to = jid
 339		m.from = "#{users_num}@#{ARGV[0]}"
 340
 341		write_to_stream m
 342
 343		# Emit pass-through event. Thru events don't capture a timestamp because XMPP
 344		# stanzas don't carry timestamps for realtime messages, and the Redis stream
 345		# ID provides the emit time.
 346		oob_url = m.at("oob|x > oob|url", oob: "jabber:x:oob")&.text
 347		MessageEvent::Thru.new(
 348			owner: users_num,
 349			from: users_num,
 350			to: [dest_num],
 351			stanza_id: m.id.to_s,
 352			body: m.body.to_s,
 353			media_urls: [oob_url].compact
 354		).emit(REDIS)
 355
 356		# send a delivery receipt back to the sender
 357		# TODO: send only when requested per XEP-0184
 358		# TODO: pass receipts from target if supported
 359
 360		# TODO: put in member/instance variable
 361		rcpt['id'] = SecureRandom.uuid
 362		rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
 363		rcvd['xmlns'] = 'urn:xmpp:receipts'
 364		rcvd['id'] = m.id
 365		rcpt.add_child(rcvd)
 366
 367		write_to_stream rcpt
 368	end
 369
 370	def self.call_catapult(
 371		token, secret, m, pth, body=nil,
 372		head={}, code=[200], respond_with=:body
 373	)
 374		# pth looks like one of:
 375		#  "api/v2/users/#{user_id}/[endpoint_name]"
 376
 377		url_prefix = ''
 378
 379		# TODO: need to make a separate thing for voice.bw.c eventually
 380		if pth.start_with? 'api/v2/users'
 381			url_prefix = 'https://messaging.bandwidth.com/'
 382		end
 383
 384		EM::HttpRequest.new(
 385			URI.parse(url_prefix + pth), tls: {verify_peer: true}
 386		).public_send(
 387			"a#{m}",
 388			head: {
 389				'Authorization' => [token, secret]
 390			}.merge(head),
 391			body: body
 392		).then { |http|
 393			if code.include?(http.response_header.status)
 394				case respond_with
 395				when :body
 396					http.response
 397				when :headers
 398					http.response_header
 399				else
 400					http
 401				end
 402			else
 403				EMPromise.reject(
 404					BandwidthError.for(http.response_header.status, http.response)
 405				)
 406			end
 407		}
 408	end
 409
 410	def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
 411		usern)
 412		un = s.at("oob|x > oob|url", oob: "jabber:x:oob")
 413		unless un
 414			LOG.debug "No OOB URL node, processing as normal"
 415			return to_catapult(s, nil, num_dest, user_id, token,
 416				secret, usern)
 417		end
 418		LOG.debug "Found OOB URL node, checking MMS eligibility"
 419
 420		body = s.respond_to?(:body) ? s.body.to_s : ''
 421
 422		if num_dest.is_a?(String) && num_dest !~ /^\+?1/
 423			unless body.include?(un.text)
 424				s.body = body.empty? ? un.text : "#{body}\n#{un.text}"
 425			end
 426			return to_catapult(s, nil, num_dest, user_id, token, secret, usern)
 427		end
 428
 429		EM::HttpRequest.new(URI.parse(un.text), tls: {verify_peer: true}).ahead.then { |http|
 430			# If content is too large, or MIME type is not supported, place the link inside the body and do not send MMS.
 431			if http.response_header["CONTENT_LENGTH"].to_i > 3500000 ||
 432			   !MMS_MIME_TYPES.include?(http.response_header["CONTENT_TYPE"])
 433				unless body.include?(un.text)
 434					s.body = body.empty? ? un.text : "#{body}\n#{un.text}"
 435				end
 436				to_catapult(s, nil, num_dest, user_id, token, secret, usern)
 437			else # If size is less than ~3.5MB, strip the link from the body and attach media in the body.
 438				# some clients send URI in both body & <url/> so delete
 439				s.body = body.sub(/\s*#{Regexp.escape(un.text)}\s*$/, '')
 440
 441				LOG.debug("OOB MMS details", url: un.text, body: body.to_s.strip)
 442				to_catapult(s, un.text, num_dest, user_id, token, secret, usern)
 443			end
 444		}
 445	end
 446
 447	def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
 448		body = s.respond_to?(:body) ? s.body.to_s : ''
 449		if murl.to_s.empty? && body.strip.empty?
 450			return EMPromise.reject(
 451				[:modify, 'policy-violation']
 452			)
 453		end
 454
 455		if body.downcase.match?(BADWORDS)
 456			return EMPromise.reject([
 457				:wait,
 458				'recipient-unavailable',
 459				'Single message blocked by carrier content policy, see https://blog.jmp.chat/b/sms-censorship for details'
 460			])
 461		end
 462
 463		if body =~ /\u2063/
 464			return EMPromise.reject([
 465				:wait,
 466				'recipient-unavailable',
 467				'Please contact JMP support about your message'
 468			])
 469		end
 470
 471		segment_size = body.ascii_only? ? 160 : 70
 472		if !murl && ENV["MMS_PATH"] && num_dest =~ /^\+?1/ && body.length > segment_size*3
 473			file = Multibases.pack(
 474				'base58btc',
 475				Multihashes.encode(Digest::SHA256.digest(body), "sha2-256")
 476			).to_s
 477			File.open("#{ENV['MMS_PATH']}/#{file}", "w") { |fh| fh.write body }
 478			murl = "#{ENV['MMS_URL']}/#{file}.txt"
 479			body = ""
 480		end
 481
 482		extra = {}
 483		extra[:media] = murl if murl
 484
 485		call_catapult(
 486			token,
 487			secret,
 488			:post,
 489			"api/v2/users/#{user_id}/messages",
 490			JSON.dump(extra.merge(
 491				from: usern,
 492				to:   num_dest,
 493				text: body,
 494				applicationId:  ARGV[4],
 495				tag:
 496					# callbacks need id and resourcepart
 497					WEBrick::HTTPUtils.escape(s.id.to_s) +
 498					' ' +
 499					WEBrick::HTTPUtils.escape(
 500						s.from.resource.to_s
 501					)
 502			)),
 503			{'Content-Type' => 'application/json'},
 504			[201, 202]
 505		).then { |response|
 506			parsed = JSON.parse(response) rescue {}
 507			MessageEvent::Out.new(
 508				timestamp: parsed["time"] || Time.now,
 509				owner: usern,
 510				from: usern,
 511				to: Array(num_dest),
 512				stanza_id: s.id.to_s,
 513				bandwidth_id: parsed["id"],
 514				body: body,
 515				media_urls: [murl].compact
 516			).emit(REDIS)
 517			response
 518		}.catch { |e|
 519			EMPromise.reject(
 520				[:cancel, 'internal-server-error', e.message]
 521			)
 522		}
 523	end
 524
 525	def self.validate_num(m)
 526		# if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
 527		if m.to == ARGV[0]
 528			an = m.children.find { |v| v.element_name == "addresses" }
 529			if not an
 530				return EMPromise.reject(
 531					[:cancel, 'item-not-found']
 532				)
 533			end
 534			LOG.debug "Found addresses node, iterating"
 535
 536			nums = []
 537			an.children.each do |e|
 538				num = ''
 539				type = ''
 540				e.attributes.each do |c|
 541					if c[0] == 'type'
 542						if c[1] != 'to'
 543							# TODO: error
 544						end
 545						type = c[1].to_s
 546					elsif c[0] == 'uri'
 547						if !c[1].to_s.start_with? 'sms:'
 548							# TODO: error
 549						end
 550						num = c[1].to_s[4..-1]
 551						# TODO: confirm num validates
 552						# TODO: else, error - unexpected name
 553					end
 554				end
 555				if num.empty? or type.empty?
 556					# TODO: error
 557				end
 558				nums << num
 559			end
 560			return nums
 561		end
 562
 563		# if not sent to SGX domain, then assume destination is in 'to'
 564		EMPromise.resolve(m.to.node.to_s).then { |num_dest|
 565			if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
 566				next num_dest if num_dest[0] == '+'
 567
 568				shortcode = extract_shortcode(num_dest)
 569				next shortcode if shortcode
 570			end
 571
 572			if anonymous_tel?(num_dest)
 573				EMPromise.reject([:cancel, 'gone'])
 574			else
 575				# TODO: text re num not (yet) supportd/implmentd
 576				EMPromise.reject([:cancel, 'item-not-found'])
 577			end
 578		}
 579	end
 580
 581	def self.fetch_catapult_cred_for(jid)
 582		@registration_repo.find(jid).then { |creds|
 583			if creds.length < 4
 584				# TODO: add text re credentials not registered
 585				EMPromise.reject(
 586					[:auth, 'registration-required']
 587				)
 588			else
 589				creds
 590			end
 591		}
 592	end
 593
 594	message :error? do |m|
 595		# TODO: report it somewhere/somehow - eat for now so no err loop
 596		LOG.warn("Eating error stanza", stanza: m.inspect)
 597		true
 598	end
 599
 600	message(->(m) { m.body || m.at("oob|x > oob|url", oob: "jabber:x:oob") }) do |m|
 601		EMPromise.all([
 602			validate_num(m),
 603			fetch_catapult_cred_for(m.from)
 604		]).then { |(num_dest, creds)|
 605			@registration_repo.find_jid(num_dest).then { |jid|
 606				[jid, num_dest] + creds
 607			}
 608		}.then { |(jid, num_dest, *creds)|
 609			if jid
 610				@registration_repo.find(jid).then { |other_user|
 611					[jid, num_dest] + creds + [other_user.first]
 612				}
 613			else
 614				[jid, num_dest] + creds + [nil]
 615			end
 616		}.then { |(jid, num_dest, *creds, other_user)|
 617			# if destination user is in the system pass on directly
 618			if other_user and not other_user.start_with? 'u-'
 619				pass_on_message(m, creds.last, jid)
 620			else
 621				to_catapult_possible_oob(m, num_dest, *creds)
 622			end
 623		}.catch { |e|
 624			if e.is_a?(Array) && (e.length == 2 || e.length == 3)
 625				write_to_stream m.as_error(e[1], e[0], e[2])
 626			else
 627				EMPromise.reject(e)
 628			end
 629		}
 630	end
 631
 632	def self.user_cap_identities
 633		[{category: 'client', type: 'sms'}]
 634	end
 635
 636	# TODO: must re-add stuff so can do ad-hoc commands
 637	def self.user_cap_features
 638		["urn:xmpp:receipts"]
 639	end
 640
 641	def self.add_gateway_feature(feature)
 642		@gateway_features << feature
 643		@gateway_features.uniq!
 644	end
 645
 646	subscription :request? do |p|
 647		# subscriptions are allowed from anyone - send reply immediately
 648		msg = Blather::Stanza::Presence.new
 649		msg.to = p.from
 650		msg.from = p.to
 651		msg.type = :subscribed
 652
 653		write_to_stream msg
 654
 655		# send a <presence> immediately; not automatically probed for it
 656		# TODO: refactor so no "presence :probe? do |p|" duplicate below
 657		caps = Blather::Stanza::Capabilities.new
 658		# TODO: user a better node URI (?)
 659		caps.node = 'http://catapult.sgx.soprani.ca/'
 660		caps.identities = user_cap_identities
 661		caps.features = user_cap_features
 662
 663		msg = caps.c
 664		msg.to = p.from
 665		msg.from = p.to.to_s + '/sgx'
 666
 667		write_to_stream msg
 668
 669		# need to subscribe back so Conversations displays images inline
 670		msg = Blather::Stanza::Presence.new
 671		msg.to = p.from.to_s.split('/', 2)[0]
 672		msg.from = p.to.to_s.split('/', 2)[0]
 673		msg.type = :subscribe
 674
 675		write_to_stream msg
 676	end
 677
 678	presence :probe? do |p|
 679		caps = Blather::Stanza::Capabilities.new
 680		# TODO: user a better node URI (?)
 681		caps.node = 'http://catapult.sgx.soprani.ca/'
 682		caps.identities = user_cap_identities
 683		caps.features = user_cap_features
 684
 685		msg = caps.c
 686		msg.to = p.from
 687		msg.from = p.to.to_s + '/sgx'
 688
 689		write_to_stream msg
 690	end
 691
 692	disco_items(
 693		to: Blather::JID.new(ARGV[0]),
 694		node: "http://jabber.org/protocol/commands"
 695	) do |i|
 696		fetch_catapult_cred_for(i.from).then { |creds|
 697			BandwidthTNOptions.tn_eligible_for_port_out_pin?(creds).then { |eligible|
 698				reply = i.reply
 699				reply.node = 'http://jabber.org/protocol/commands'
 700
 701				if eligible
 702					reply.items = [
 703						Blather::Stanza::DiscoItems::Item.new(
 704							i.to,
 705							'set-port-out-pin',
 706							'Set Port-Out PIN'
 707						)
 708					]
 709				else
 710					reply.items = []
 711				end
 712
 713				write_to_stream reply
 714			}
 715		}.catch { |e|
 716			if e.is_a?(Array) && [2, 3].include?(e.length)
 717				write_to_stream i.as_error(e[1], e[0], e[2])
 718			else
 719				EMPromise.reject(e)
 720			end
 721		}
 722	end
 723
 724	iq '/iq/ns:query', ns:	'http://jabber.org/protocol/disco#info' do |i|
 725		# TODO: return error if i.type is :set - if it is :reply or
 726		#  :error it should be ignored (as the below does currently);
 727		#  review specification to see how to handle other type values
 728		if i.type != :get
 729			LOG.warn("Ignoring non-get disco IQ", type: i.type.to_s, stanza: i.inspect)
 730			next
 731		end
 732
 733		# respond to capabilities request for an sgx-bwmsgsv2 number JID
 734		if i.to.node
 735			# TODO: confirm the node URL is expected using below
 736			#puts "XR[node]: #{xpath_result[0]['node']}"
 737
 738			msg = i.reply
 739			msg.node = i.node
 740			msg.identities = user_cap_identities
 741			msg.features = user_cap_features
 742
 743			write_to_stream msg
 744			next
 745		end
 746
 747		# respond to capabilities request for sgx-bwmsgsv2 itself
 748		msg = i.reply
 749		msg.node = i.node
 750		msg.identities = [{
 751			name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
 752			type: 'sms', category: 'gateway'
 753		}]
 754		msg.features = @gateway_features
 755		write_to_stream msg
 756	end
 757
 758	def self.check_then_register(i, *creds)
 759		@registration_repo
 760			.put(i.from, *creds)
 761			.catch_only(RegistrationRepo::Conflict) { |e|
 762				EMPromise.reject([:cancel, 'conflict', e.message])
 763			}.then {
 764				write_to_stream i.reply
 765			}
 766	end
 767
 768	def self.creds_from_registration_query(i)
 769		if i.query.find_first("./ns:x", ns: "jabber:x:data")
 770			[
 771				i.form.field("nick")&.value,
 772				i.form.field("username")&.value,
 773				i.form.field("password")&.value,
 774				i.form.field("phone")&.value
 775			]
 776		else
 777			[i.nick, i.username, i.password, i.phone]
 778		end
 779	end
 780
 781	def self.process_registration(i)
 782		EMPromise.resolve(nil).then {
 783			if i.remove?
 784				@registration_repo.delete(i.from).then do
 785					write_to_stream i.reply
 786					EMPromise.reject(:done)
 787				end
 788			else
 789				creds_from_registration_query(i)
 790			end
 791		}.then { |user_id, api_token, api_secret, phone_num|
 792			if phone_num && phone_num[0] == '+'
 793				[user_id, api_token, api_secret, phone_num]
 794			else
 795				# TODO: add text re number not (yet) supported
 796				EMPromise.reject([:cancel, 'item-not-found'])
 797			end
 798		}.then { |user_id, api_token, api_secret, phone_num|
 799			# TODO: find way to verify #{phone_num}, too
 800			call_catapult(
 801				api_token,
 802				api_secret,
 803				:get,
 804				"api/v2/users/#{user_id}/media"
 805			).then { |response|
 806				JSON.parse(response)
 807				# TODO: confirm response is array - could be empty
 808
 809				LOG.debug("Registration verify response", response: response.to_s[0..999])
 810
 811				check_then_register(
 812					i,
 813					user_id,
 814					api_token,
 815					api_secret,
 816					phone_num
 817				)
 818			}
 819		}.catch_only(BandwidthError) { |e|
 820			EMPromise.reject(case e.code
 821			when 401
 822				# TODO: add text re bad credentials
 823				[:auth, 'not-authorized']
 824			when 404
 825				# TODO: add text re number not found or disabled
 826				[:cancel, 'item-not-found']
 827			else
 828				[:modify, 'not-acceptable']
 829			end)
 830		}
 831	end
 832
 833	def self.registration_form(orig, existing_number=nil)
 834		orig.registered = !!existing_number
 835
 836		# TODO: update "User Id" x2 below (to "accountId"?), and others?
 837		orig.instructions = "Enter the information from your Account "\
 838			"page as well as the Phone Number\nin your "\
 839			"account you want to use (ie. '+12345678901')"\
 840			".\nUser Id is nick, API Token is username, "\
 841			"API Secret is password, Phone Number is phone"\
 842			".\n\nThe source code for this gateway is at "\
 843			"https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
 844			"\nCopyright (C) 2017-2020  Denver Gingerich "\
 845			"and others, licensed under AGPLv3+."
 846		orig.nick = ""
 847		orig.username = ""
 848		orig.password = ""
 849		orig.phone = existing_number.to_s
 850
 851		orig.form.fields = [
 852			{
 853				required: true, type: :"text-single",
 854				label: 'User Id', var: 'nick'
 855			},
 856			{
 857				required: true, type: :"text-single",
 858				label: 'API Token', var: 'username'
 859			},
 860			{
 861				required: true, type: :"text-private",
 862				label: 'API Secret', var: 'password'
 863			},
 864			{
 865				required: true, type: :"text-single",
 866				label: 'Phone Number', var: 'phone',
 867				value: existing_number.to_s
 868			}
 869		]
 870		orig.form.title = 'Register for '\
 871			'Soprani.ca Gateway to XMPP - Bandwidth API V2'
 872		orig.form.instructions = "Enter the details from your Account "\
 873			"page as well as the Phone Number\nin your "\
 874			"account you want to use (ie. '+12345678901')"\
 875			".\n\nThe source code for this gateway is at "\
 876			"https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
 877			"\nCopyright (C) 2017-2020  Denver Gingerich "\
 878			"and others, licensed under AGPLv3+."
 879
 880		orig
 881	end
 882
 883	ibr do |i|
 884		case i.type
 885		when :set
 886			process_registration(i)
 887		when :get
 888			bare_jid = i.from.stripped
 889			@registration_repo.find(bare_jid).then { |creds|
 890				reply = registration_form(i.reply, creds.last)
 891				write_to_stream reply
 892			}
 893		else
 894			# Unknown IQ, ignore for now
 895			EMPromise.reject(:done)
 896		end.catch { |e|
 897			if e.is_a?(Array) && (e.length == 2 || e.length == 3)
 898				write_to_stream i.as_error(e[1], e[0], e[2])
 899			elsif e != :done
 900				EMPromise.reject(e)
 901			end
 902		}.catch(&method(:panic))
 903	end
 904
 905	command :execute?, node: "set-port-out-pin", sessionid: nil do |iq|
 906		# Ensure user is registered, but discard their credentials
 907		# because we're just showing them a form.
 908		fetch_catapult_cred_for(iq.from).then { |_creds|
 909			reply = iq.reply
 910			reply.node = 'set-port-out-pin'
 911			reply.sessionid = SecureRandom.uuid
 912			reply.status = :executing
 913
 914			form = Blather::Stanza::X.find_or_create(reply.command)
 915			form.type = "form"
 916			form.fields = [
 917				{
 918					var: 'pin',
 919					type: 'text-private',
 920					label: 'Port-Out PIN',
 921					required: true
 922				},
 923				{
 924					var: 'confirm_pin',
 925					type: 'text-private',
 926					label: 'Confirm PIN',
 927					required: true
 928				}
 929			]
 930
 931			reply.command.add_child(form)
 932			reply.allowed_actions = [:complete]
 933
 934			write_to_stream reply
 935		}.catch { |e|
 936			if e.is_a?(Array) && [2, 3].include?(e.length)
 937				write_to_stream iq.as_error(e[1], e[0], e[2])
 938			else
 939				EMPromise.reject(e)
 940			end
 941		}.catch(&method(:panic))
 942	end
 943
 944	command :complete?, node: "set-port-out-pin", sessionid: /./ do |iq|
 945		pin = iq.form.field('pin')&.value
 946		confirm_pin = iq.form.field('confirm_pin')&.value
 947
 948		if pin.nil? || confirm_pin.nil?
 949			write_to_stream iq.as_error(
 950				'bad-request',
 951				:modify,
 952				'PIN fields are required'
 953			)
 954			next
 955		end
 956
 957		if pin != confirm_pin
 958			write_to_stream iq.as_error(
 959				'bad-request',
 960				:modify,
 961				'PIN confirmation does not match'
 962			)
 963			next
 964		end
 965
 966		if pin !~ /\A[a-zA-Z0-9]{4,10}\z/
 967			write_to_stream iq.as_error(
 968				'bad-request',
 969				:modify,
 970				'PIN must be 4-10 alphanumeric characters'
 971			)
 972			next
 973		end
 974
 975		fetch_catapult_cred_for(iq.from).then { |creds|
 976			BandwidthTNOptions.set_port_out_pin(creds, pin).then {
 977				reply = iq.reply
 978				reply.node = 'set-port-out-pin'
 979				reply.sessionid = iq.sessionid
 980				reply.status = :completed
 981				reply.note_type = :info
 982				reply.note_text = 'Port-out PIN has been set successfully.'
 983
 984				write_to_stream reply
 985			}.catch { |e|
 986				reply = iq.reply
 987				reply.node = 'set-port-out-pin'
 988				reply.sessionid = iq.sessionid
 989				reply.status = :completed
 990				reply.note_type = :error
 991				error_msg = if e.respond_to?(:message) && e.message.include?('not valid')
 992					"Invalid phone number format. "\
 993					"Please check your registered phone number."
 994				elsif e.respond_to?(:message) && e.message.include?('ErrorCode')
 995					"Bandwidth API error: #{e.message}"
 996				else
 997					"Failed to set port-out PIN. Please try again later."
 998				end
 999				reply.note_text = error_msg
1000
1001				write_to_stream reply
1002			}
1003		}.catch { |e|
1004			if e.is_a?(Array) && [2, 3].include?(e.length)
1005				write_to_stream iq.as_error(e[1], e[0], e[2])
1006			else
1007				EMPromise.reject(e)
1008			end
1009		}.catch(&method(:panic))
1010	end
1011
1012	iq type: [:get, :set] do |iq|
1013		write_to_stream(Blather::StanzaError.new(
1014			iq,
1015			'feature-not-implemented',
1016			:cancel
1017		))
1018	end
1019end
1020
1021class ReceiptMessage < Blather::Stanza
1022	def self.new(to=nil)
1023		node = super :message
1024		node.to = to
1025		node
1026	end
1027end
1028
1029class WebhookHandler < Goliath::API
1030	use Sentry::Rack::CaptureExceptions
1031	use Goliath::Rack::Params
1032
1033	def response(env)
1034		@registration_repo = RegistrationRepo.new
1035		# TODO: add timestamp grab here, and MUST include ./tai version
1036
1037		LOG.debug("Webhook request env", env: env.reject { |k| k == 'params' })
1038
1039		if params.empty?
1040			LOG.warn "Empty webhook params"
1041			return [200, {}, "OK"]
1042		end
1043
1044		if env['REQUEST_URI'] != '/'
1045			LOG.warn("Non-/ request", uri: env['REQUEST_URI'], method: env['REQUEST_METHOD'])
1046			return [200, {}, "OK"]
1047		end
1048
1049		if env['REQUEST_METHOD'] != 'POST'
1050			LOG.warn("Non-POST request", uri: env['REQUEST_URI'], method: env['REQUEST_METHOD'])
1051			return [200, {}, "OK"]
1052		end
1053
1054		# TODO: process each message in list, not just first one
1055		jparams = params.dig('_json', 0, 'message')
1056		type = params.dig('_json', 0, 'type')
1057
1058		return [400, {}, "Missing params\n"] unless jparams && type
1059
1060		users_num, others_num =
1061		if type == 'message-failed' # NOTE: This implies direction == 'out'
1062			[jparams['from'], params['_json'][0]['to']]
1063		elsif jparams['direction'] == 'in'
1064			[jparams['owner'], jparams['from']]
1065		elsif jparams['direction'] == 'out'
1066			[jparams['from'], jparams['owner']] # NOTE: for outbound, 'from' == 'owner'
1067		else
1068			LOG.error("Unexpected message direction", direction: jparams['direction'])
1069			[jparams['from'], jparams['owner']]
1070		end
1071
1072		return [400, {}, "Missing params\n"] unless users_num && others_num
1073		return [400, {}, "Missing params\n"] unless jparams['to'].is_a?(Array)
1074		return [400, {}, "Missing params\n"] if jparams['to'].empty?
1075
1076		LOG.info(
1077			"Webhook message",
1078			message_id: jparams['id'],
1079			event_type: type,
1080			time: jparams['time'],
1081			direction: jparams['direction'],
1082			delivery_state: jparams['deliveryState'],
1083			error_code: jparams['errorCode'],
1084			description: jparams['description'],
1085			tag: jparams['tag'],
1086			media: jparams['media']
1087		)
1088
1089		if others_num[0] != '+'
1090			# TODO: check that others_num actually a shortcode first
1091			others_num +=
1092				';phone-context=ca-us.phone-context.soprani.ca'
1093		end
1094
1095		bare_jid = @registration_repo.find_jid(users_num).sync
1096
1097		if !bare_jid
1098			LOG.warn("JID not found for number, BW API misconfigured?", users_num: users_num)
1099
1100			return [403, {}, "Customer not found\n"]
1101		end
1102
1103		msg = nil
1104		case jparams['direction']
1105		when 'in'
1106			text = ''
1107			case type
1108			when 'message-received'
1109				# TODO: handle group chat, and fix above
1110				text = jparams['text']
1111
1112				if text.to_s.empty? && Array(jparams['media']).empty?
1113					return [400, {}, "Missing params\n"]
1114				end
1115
1116				if jparams['to'].length > 1
1117					msg = Blather::Stanza::Message.new(
1118						Blather::JID.new(bare_jid).domain
1119					)
1120					msg.body = text unless text&.empty?
1121
1122					addrs = Nokogiri::XML::Node.new(
1123						'addresses', msg.document)
1124					addrs['xmlns'] = 'http://jabber.org/' \
1125						'protocol/address'
1126
1127					addr1 = Nokogiri::XML::Node.new(
1128						'address', msg.document)
1129					addr1['type'] = 'to'
1130					addr1['jid'] = bare_jid
1131					addrs.add_child(addr1)
1132
1133					jparams['to'].reject(
1134						# Don't send to the same person twice,
1135						# and don't send to the person who sent it
1136						&[users_num, others_num].method(:include?)
1137					).each do |receiver|
1138						addrn = Nokogiri::XML::Node.new(
1139							'address', msg.document)
1140						addrn['type'] = 'to'
1141						addrn['uri'] = "sms:#{receiver}"
1142						addrn['delivered'] = 'true'
1143						addrs.add_child(addrn)
1144					end
1145
1146					msg.add_child(addrs)
1147				end
1148
1149				media_urls = Array(jparams['media']).map { |media_url|
1150					unless media_url.end_with?(
1151						'.smil', '.txt', '.xml'
1152					)
1153						SGXbwmsgsv2.send_media(
1154							others_num + '@' +
1155							ARGV[0],
1156							bare_jid, media_url,
1157							nil, nil, msg
1158						)
1159						media_url
1160					end
1161				}.compact
1162
1163				if text&.empty? || (media_urls.any? && jparams['to'].length > 1)
1164					if !env['HTTP_X_JMP_RESEND_OF'].to_s.empty?
1165						MessageEvent::ResendIn.new(
1166							original_stream_id: env['HTTP_X_JMP_RESEND_OF'],
1167							original_bandwidth_id: jparams['id'],
1168							owner: jparams['owner']
1169						).emit(REDIS)
1170					else
1171						MessageEvent::In.new(
1172							timestamp: jparams['time'],
1173							from: jparams['from'],
1174							to: jparams['to'],
1175							owner: jparams['owner'],
1176							bandwidth_id: jparams['id'],
1177							body: jparams['text'].to_s,
1178							media_urls: media_urls
1179						).emit(REDIS)
1180					end
1181
1182					return [200, {}, "OK"]
1183				end
1184			else
1185				text = "unknown type (#{type})" \
1186					" with text: #{jparams['text']}"
1187
1188				# TODO: log/notify of this properly
1189				LOG.warn("Unknown inbound message type", text: text)
1190			end
1191
1192			# If text is not empty, but there isn't a msg,
1193			# we need to construct a msg to convey that text
1194			unless msg || text.to_s.empty?
1195				msg = Blather::Stanza::Message.new(
1196					bare_jid,
1197					# Strip control codes.
1198					# This only happened, or at least caused a problem, once,
1199					# but it seems sensible to say that we shouldn't be getting
1200					# control codes from the PSTN.
1201					text.gsub(/[\u0000-\u0008\u000b-\u001f]/, "")
1202				)
1203				msg.document.encoding = "utf-8"
1204				msg.chat_state = nil
1205			end
1206		else # per prior switch, this is:  jparams['direction'] == 'out'
1207			tag_parts = jparams['tag'].split(/ /, 2)
1208			id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1209			resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
1210
1211			# TODO: remove this hack
1212			if jparams['to'].length > 1
1213				case type
1214				when 'message-failed'
1215					MessageEvent::Failed.new(
1216						timestamp: jparams['time'],
1217						stanza_id: id,
1218						bandwidth_id: jparams['id'],
1219						error_code: jparams['errorCode'].to_s,
1220						error_description: jparams['description'].to_s
1221					).emit(REDIS)
1222				when 'message-delivered'
1223					MessageEvent::Delivered.new(
1224						timestamp: jparams['time'],
1225						stanza_id: id,
1226						bandwidth_id: jparams['id']
1227					).emit(REDIS)
1228				end
1229				LOG.warn("Group message, skipping receipt", users_num: users_num)
1230				return [200, {}, "OK"]
1231			end
1232
1233			case type
1234			when 'message-failed'
1235				# create a bare message like the one user sent
1236				msg = Blather::Stanza::Message.new(
1237					others_num + '@' + ARGV[0])
1238				msg.from = bare_jid + '/' + resourcepart
1239				msg['id'] = id
1240
1241				# TODO: add 'errorCode' and/or 'description' val
1242				# create an error reply to the bare message
1243				msg = msg.as_error(
1244					'recipient-unavailable',
1245					:wait,
1246					params['_json'][0]['description']
1247				)
1248
1249			when 'message-delivered'
1250
1251				msg = ReceiptMessage.new(bare_jid)
1252
1253				# TODO: put in member/instance variable
1254				msg['id'] = SecureRandom.uuid
1255
1256				# TODO: send only when requested per XEP-0184
1257				rcvd = Nokogiri::XML::Node.new(
1258					'received',
1259					msg.document
1260				)
1261				rcvd['xmlns'] = 'urn:xmpp:receipts'
1262				rcvd['id'] = id
1263				msg.add_child(rcvd)
1264
1265				# TODO: make prettier: this should be done above
1266				others_num = params['_json'][0]['to']
1267			else
1268				# TODO: notify somehow of unknown state receivd?
1269				LOG.warn("Unknown outbound message type", id: id, type: type)
1270				return [200, {}, "OK"]
1271			end
1272
1273			# Keeping this due to the `msg.from=` shuffle just below
1274			LOG.debug("Outbound callback response", stanza: msg.inspect)
1275		end
1276
1277		# if message-failed, we already set msg.from
1278		# moreover, we said `msg = msg.as_error`, and StanzaError
1279		msg.from = others_num + '@' + ARGV[0] if msg.respond_to?(:from=)
1280		SGXbwmsgsv2.write(msg)
1281
1282		# Emit event to messages stream
1283		case [jparams['direction'], type]
1284		when ['in', 'message-received']
1285			if !env['HTTP_X_JMP_RESEND_OF'].to_s.empty?
1286				MessageEvent::ResendIn.new(
1287					original_stream_id: env['HTTP_X_JMP_RESEND_OF'],
1288					original_bandwidth_id: jparams['id'],
1289					owner: jparams['owner']
1290				).emit(REDIS)
1291			else
1292				media_urls = Array(jparams['media']).reject { |u|
1293					u.end_with?('.smil', '.txt', '.xml')
1294				}
1295				MessageEvent::In.new(
1296					timestamp: jparams['time'],
1297					from: jparams['from'],
1298					to: jparams['to'],
1299					owner: jparams['owner'],
1300					bandwidth_id: jparams['id'],
1301					body: jparams['text'].to_s,
1302					media_urls: media_urls
1303				).emit(REDIS)
1304			end
1305		when ['out', 'message-failed']
1306			tag_parts = jparams['tag'].split(/ /, 2)
1307			stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1308			MessageEvent::Failed.new(
1309				timestamp: jparams['time'],
1310				stanza_id: stanza_id,
1311				bandwidth_id: jparams['id'],
1312				error_code: jparams['errorCode'].to_s,
1313				error_description: jparams['description'].to_s
1314			).emit(REDIS)
1315		when ['out', 'message-delivered']
1316			tag_parts = jparams['tag'].split(/ /, 2)
1317			stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1318			MessageEvent::Delivered.new(
1319				timestamp: jparams['time'],
1320				stanza_id: stanza_id,
1321				bandwidth_id: jparams['id']
1322			).emit(REDIS)
1323		end
1324
1325		[200, {}, "OK"]
1326	rescue Exception => e
1327		Sentry.capture_exception(e)
1328		[500, {}, "Error"]
1329	end
1330end
1331
1332at_exit do
1333	LOG.info("Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2",
1334		version: `git rev-parse HEAD`.chomp)
1335
1336	if ARGV.size != 7
1337		warn "Usage: sgx-bwmsgsv2.rb <component_jid> "\
1338			"<component_password> <server_hostname> "\
1339			"<server_port> <application_id> "\
1340			"<http_listen_port> <mms_proxy_prefix_url>"
1341		exit 0
1342	end
1343
1344	LOG.info "Starting"
1345
1346	EM.run do
1347		REDIS = EM::Hiredis.connect
1348
1349		SGXbwmsgsv2.run
1350
1351		# required when using Prosody otherwise disconnects on 6-hour inactivity
1352		EM.add_periodic_timer(3600) do
1353			msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1354			msg.from = ARGV[0]
1355			SGXbwmsgsv2.write(msg)
1356		end
1357
1358		server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
1359		server.api = WebhookHandler.new
1360		server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1361		server.logger = LOG
1362		server.start do
1363			["INT", "TERM"].each do |sig|
1364				trap(sig) do
1365					EM.defer do
1366						LOG.info "Shutting down gateway"
1367						SGXbwmsgsv2.shutdown
1368
1369						LOG.info "Gateway has terminated"
1370						EM.stop
1371					end
1372				end
1373			end
1374		end
1375	end
1376end unless ENV['ENV'] == 'test'