sgx-bwmsgsv2.rb

   1#!/usr/bin/env ruby
   2# frozen_string_literal: true
   3
   4# Copyright (C) 2017-2020  Denver Gingerich <denver@ossguy.com>
   5# Copyright (C) 2017  Stephen Paul Weber <singpolyma@singpolyma.net>
   6#
   7# This file is part of sgx-bwmsgsv2.
   8#
   9# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
  10# the terms of the GNU Affero General Public License as published by the Free
  11# Software Foundation, either version 3 of the License, or (at your option) any
  12# later version.
  13#
  14# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
  15# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16# FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more
  17# details.
  18#
  19# You should have received a copy of the GNU Affero General Public License along
  20# with sgx-bwmsgsv2.  If not, see <http://www.gnu.org/licenses/>.
  21
  22require 'delegate'
  23require 'blather/client/dsl'
  24require 'em-hiredis'
  25require 'em-http-request'
  26require 'json'
  27require 'multibases'
  28require 'multihashes'
  29require 'securerandom'
  30require "sentry-ruby"
  31require 'time'
  32require 'uri'
  33require 'webrick'
  34
  35require 'goliath/api'
  36require 'goliath/server'
  37require "ougai"
  38
  39require 'em_promise'
  40require 'em-synchrony'
  41
  42require_relative 'lib/bandwidth_error'
  43require_relative 'lib/bandwidth_tn_options'
  44require_relative 'lib/message_event'
  45require_relative 'lib/registration_repo'
  46
  47Sentry.init
  48
  49require_relative 'lib/background_log'
  50
  51$stdout.sync = true
  52LOG = Ougai::Logger.new(ENV["ENV"] == "test" ? $stdout : BackgroundLog.new($stdout))
  53LOG.level = ENV.fetch("LOG_LEVEL", "info")
  54LOG.formatter = Ougai::Formatters::Readable.new(
  55	nil,
  56	nil,
  57	plain: !$stdout.isatty
  58)
  59Blather.logger = LOG
  60EM::Hiredis.logger = LOG
  61
  62Sentry.init do |config|
  63	config.logger = LOG
  64	config.breadcrumbs_logger = [:sentry_logger]
  65end
  66
  67BADWORD_LIST = [
  68	"marijuana",
  69	"psilocybin",
  70	"cannabis",
  71	"cocaine",
  72	"heroin",
  73	"meth",
  74	"methamphetamine",
  75	"methamphetamines",
  76	"cigarette",
  77	"tobacco",
  78	"cbd",
  79	"thc",
  80	"morphine",
  81	"incall",
  82	"in-call",
  83	"outcall",
  84	"out-call",
  85	"shrooms",
  86	"lsd",
  87	"kratom",
  88	"mdma",
  89	"addy",
  90	"xanz",
  91	"cialis",
  92	"viagra",
  93	"bbfs",
  94	"fentanyl",
  95	"opium",
  96	"golden teacher",
  97	"bbbj",
  98	"canna",
  99	"fuck",
 100	"xanax",
 101	"zarareturns",
 102	"zarareturns.com",
 103	"plantation",
 104].freeze
 105
 106BADWORDS = Regexp.union(
 107	BADWORD_LIST.map { |w| /\b#{Regexp.escape(w)}\b/ }
 108)
 109
 110WHISPER_NUMBER = /\A\+?1?2266669977\z/.freeze
 111
 112# List of supported MIME types from Bandwidth - https://support.bandwidth.com/hc/en-us/articles/360014128994-What-MMS-file-types-are-supported-
 113MMS_MIME_TYPES = [
 114	"application/json",
 115	"application/ogg",
 116	"application/pdf",
 117	"application/rtf",
 118	"application/zip",
 119	"application/x-tar",
 120	"application/xml",
 121	"application/gzip",
 122	"application/x-bzip2",
 123	"application/x-gzip",
 124	"application/smil",
 125	"application/javascript",
 126	"audio/mp4",
 127	"audio/mpeg",
 128	"audio/ogg",
 129	"audio/flac",
 130	"audio/webm",
 131	"audio/wav",
 132	"audio/amr",
 133	"audio/3gpp",
 134	"image/bmp",
 135	"image/gif",
 136	"image/jpeg",
 137	"image/pjpeg",
 138	"image/png",
 139	"image/svg+xml",
 140	"image/tiff",
 141	"image/webp",
 142	"image/x-icon",
 143	"text/css",
 144	"text/csv",
 145	"text/calendar",
 146	"text/plain",
 147	"text/javascript",
 148	"text/vcard",
 149	"text/vnd.wap.wml",
 150	"text/xml",
 151	"video/avi",
 152	"video/mp4",
 153	"video/mpeg",
 154	"video/ogg",
 155	"video/quicktime",
 156	"video/webm",
 157	"video/x-ms-wmv",
 158	"video/x-flv"
 159]
 160
 161# 1 MB
 162MAX_MEDIA_SIZE = 1000000
 163
 164def panic(e)
 165	EMPromise.resolve(nil).then {
 166		if e.is_a?(Exception)
 167			Sentry.capture_exception(e, hint: { background: false })
 168		else
 169			Sentry.capture_message(e.to_s, hint: { background: false })
 170		end
 171		LOG.fatal("Shutting down gateway", e)
 172		SGXbwmsgsv2.shutdown
 173		LOG.info "Gateway has terminated"
 174		EM.stop
 175	}
 176end
 177
 178EM.error_handler(&method(:panic))
 179
 180def extract_shortcode(dest)
 181	num, context = dest.split(';', 2)
 182	num if context == 'phone-context=ca-us.phone-context.soprani.ca'
 183end
 184
 185def anonymous_tel?(dest)
 186	dest.split(';', 2)[1] == 'phone-context=anonymous.phone-context.soprani.ca'
 187end
 188
 189class SGXClient < Blather::Client
 190	def handle_data(stanza)
 191		promise = EMPromise.resolve(nil).then {
 192			with_sentry(stanza) do |scope|
 193				super
 194			rescue StandardError => e
 195				handle_error(scope, stanza, e)
 196			end
 197		}.catch { |e| panic(e) }
 198		promise.sync if ENV["ENV"] == "test"
 199		promise
 200	end
 201
 202	# Override the default call_handler to syncify during testing.
 203	def call_handler(handler, guards, stanza)
 204		result = if guards.first.respond_to?(:to_str)
 205			found = stanza.find(*guards)
 206			throw :pass if found.empty?
 207
 208			handler.call(stanza, found)
 209		else
 210			throw :pass if guarded?(guards, stanza)
 211
 212			handler.call(stanza)
 213		end
 214
 215		# Up to here, identical to upstream impl
 216
 217		return result unless result.is_a?(Promise)
 218
 219		result.sync if ENV["ENV"] == "test"
 220		result
 221	end
 222
 223protected
 224
 225	def with_sentry(stanza)
 226		Sentry.clone_hub_to_current_thread
 227
 228		Sentry.with_scope do |scope|
 229			setup_scope(stanza, scope)
 230			yield scope
 231		ensure
 232			scope.get_transaction&.then do |tx|
 233				tx.set_status("ok") unless tx.status
 234				tx.finish
 235			end
 236		end
 237	end
 238
 239	def setup_scope(stanza, scope)
 240		name = stanza.respond_to?(:node) ? stanza.node : stanza.name
 241		scope.clear_breadcrumbs
 242		scope.set_transaction_name(name)
 243		scope.set_user(jid: stanza.from&.stripped.to_s)
 244
 245		transaction = Sentry.start_transaction(
 246			name: name,
 247			op: "blather.handle_data"
 248		)
 249		scope.set_span(transaction) if transaction
 250	end
 251
 252	def handle_error(scope, stanza, e)
 253		LOG.error("Error during #{scope.transaction_name}", e)
 254		Sentry.capture_exception(e) unless e.is_a?(Sentry::Error)
 255		scope.get_transaction&.set_status("internal_error")
 256		return if e.respond_to?(:replied?) && e.replied?
 257
 258		SGXbwmsgsv2.write_to_stream stanza.as_error("internal-server-error", :cancel)
 259	end
 260end
 261
 262# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
 263module CatapultSettingFlagBits
 264	VOICEMAIL_TRANSCRIPTION_DISABLED = 0
 265	MMS_ON_OOB_URL = 1
 266end
 267
 268module SGXbwmsgsv2
 269	extend Blather::DSL
 270
 271	@registration_repo = RegistrationRepo.new
 272	@client = SGXClient.new
 273	@gateway_features = [
 274		"http://jabber.org/protocol/disco#info",
 275		"http://jabber.org/protocol/address/",
 276		"jabber:iq:register",
 277		"http://jabber.org/protocol/commands"
 278	]
 279
 280	def self.run
 281		# TODO: read/save ARGV[7] creds to local variables
 282		client.run
 283	end
 284
 285	# so classes outside this module can write messages, too
 286	def self.write(stanza)
 287		client.write(stanza)
 288	end
 289
 290	def self.send_media(from, to, media_url, desc=nil, subject=nil, m=nil)
 291		# we assume media_url is one of these (always the case so far):
 292		#  https://messaging.bandwidth.com/api/v2/users/[u]/media/[path]
 293
 294		LOG.debug("Original media URL", url: media_url)
 295		usr = to
 296		if media_url.start_with?('https://messaging.bandwidth.com/api/v2/users/')
 297			pth = media_url.split('/', 9)[8]
 298			# the caller must guarantee that 'to' is a bare JID
 299			media_url = ARGV[6] + WEBrick::HTTPUtils.escape(usr) + '/' + pth
 300			LOG.debug("Proxied media URL", url: media_url)
 301		end
 302
 303		msg = m ? m.copy : Blather::Stanza::Message.new(to)
 304		msg.from = from
 305		msg.subject = subject if subject
 306
 307		# provide URL in XEP-0066 (OOB) fashion
 308		x = Nokogiri::XML::Node.new 'x', msg.document
 309		x['xmlns'] = 'jabber:x:oob'
 310
 311		urln = Nokogiri::XML::Node.new 'url', msg.document
 312		urlc = Nokogiri::XML::Text.new media_url, msg.document
 313		urln.add_child(urlc)
 314		x.add_child(urln)
 315
 316		if desc
 317			descn = Nokogiri::XML::Node.new('desc', msg.document)
 318			descc = Nokogiri::XML::Text.new(desc, msg.document)
 319			descn.add_child(descc)
 320			x.add_child(descn)
 321		end
 322
 323		msg.add_child(x)
 324
 325		store = Nokogiri::XML::Node.new 'store', msg.document
 326		store['xmlns'] = 'urn:xmpp:hints'
 327		msg.add_child(store)
 328
 329		write(msg)
 330	rescue Exception => e
 331		panic(e)
 332	end
 333
 334	setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, async: true
 335
 336	def self.pass_on_message(m, users_num, jid)
 337		# Capture destination before modifying m.to
 338		dest_num = m.to.node
 339
 340		# setup delivery receipt; similar to a reply
 341		rcpt = ReceiptMessage.new(m.from.stripped)
 342		rcpt.from = m.to
 343
 344		# pass original message (before sending receipt)
 345		m.to = jid
 346		m.from = "#{users_num}@#{ARGV[0]}"
 347
 348		write_to_stream m
 349
 350		# Emit pass-through event. Thru events don't capture a timestamp because XMPP
 351		# stanzas don't carry timestamps for realtime messages, and the Redis stream
 352		# ID provides the emit time.
 353		oob_url = m.at("oob|x > oob|url", oob: "jabber:x:oob")&.text
 354		MessageEvent::Thru.new(
 355			owner: users_num,
 356			from: users_num,
 357			to: [dest_num],
 358			stanza_id: m.id.to_s,
 359			body: m.body.to_s,
 360			media_urls: [oob_url].compact
 361		).emit(REDIS)
 362
 363		# send a delivery receipt back to the sender
 364		# TODO: send only when requested per XEP-0184
 365		# TODO: pass receipts from target if supported
 366
 367		# TODO: put in member/instance variable
 368		rcpt['id'] = SecureRandom.uuid
 369		rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
 370		rcvd['xmlns'] = 'urn:xmpp:receipts'
 371		rcvd['id'] = m.id
 372		rcpt.add_child(rcvd)
 373
 374		write_to_stream rcpt
 375
 376		true
 377	end
 378
 379	def self.call_catapult(
 380		token, secret, m, pth, body=nil,
 381		head={}, code=[200], respond_with=:body
 382	)
 383		# pth looks like one of:
 384		#  "api/v2/users/#{user_id}/[endpoint_name]"
 385
 386		url_prefix = ''
 387
 388		# TODO: need to make a separate thing for voice.bw.c eventually
 389		if pth.start_with? 'api/v2/users'
 390			url_prefix = 'https://messaging.bandwidth.com/'
 391		end
 392
 393		EM::HttpRequest.new(
 394			URI.parse(url_prefix + pth), tls: {verify_peer: true}
 395		).public_send(
 396			"a#{m}",
 397			head: {
 398				'Authorization' => [token, secret]
 399			}.merge(head),
 400			body: body
 401		).then { |http|
 402			if code.include?(http.response_header.status)
 403				case respond_with
 404				when :body
 405					http.response
 406				when :headers
 407					http.response_header
 408				else
 409					http
 410				end
 411			else
 412				EMPromise.reject(
 413					BandwidthError.for(http.response_header.status, http.response)
 414				)
 415			end
 416		}
 417	end
 418
 419	def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
 420		usern)
 421		un = s.at("oob|x > oob|url", oob: "jabber:x:oob")
 422		unless un
 423			LOG.debug "No OOB URL node, processing as normal"
 424			return to_catapult(s, nil, num_dest, user_id, token,
 425				secret, usern)
 426		end
 427		LOG.debug "Found OOB URL node, checking MMS eligibility"
 428
 429		body = s.respond_to?(:body) ? s.body.to_s : ''
 430
 431		if num_dest.is_a?(String) && num_dest !~ /^\+?1/
 432			unless body.include?(un.text)
 433				s.body = body.empty? ? un.text : "#{body}\n#{un.text}"
 434			end
 435			return to_catapult(s, nil, num_dest, user_id, token, secret, usern)
 436		end
 437
 438		EM::HttpRequest.new(URI.parse(un.text), tls: {verify_peer: true}).ahead.then { |http|
 439			# If content is too large, or MIME type is not supported, place the link inside the body and do not send MMS.
 440			if http.response_header["CONTENT_LENGTH"].to_i > MAX_MEDIA_SIZE ||
 441			   !MMS_MIME_TYPES.include?(http.response_header["CONTENT_TYPE"])
 442				unless body.include?(un.text)
 443					s.body = body.empty? ? un.text : "#{body}\n#{un.text}"
 444				end
 445				to_catapult(s, nil, num_dest, user_id, token, secret, usern)
 446			else # If size is less than ~3.5MB, strip the link from the body and attach media in the body.
 447				# some clients send URI in both body & <url/> so delete
 448				s.body = body.sub(/\s*#{Regexp.escape(un.text)}\s*$/, '')
 449
 450				LOG.debug("OOB MMS details", url: un.text, body: body.to_s.strip)
 451				to_catapult(s, un.text, num_dest, user_id, token, secret, usern)
 452			end
 453		}.catch { |e|
 454			LOG.error("Error sending MMS, falling back to sending link", error: e)
 455			unless body.include?(un.text)
 456				s.body = body.empty? ? un.text : "#{body}\n#{un.text}"
 457			end
 458			to_catapult(s, nil, num_dest, user_id, token, secret, usern)
 459		}
 460	end
 461
 462	def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
 463		body = s.respond_to?(:body) ? s.body.to_s : ''
 464		if murl.to_s.empty? && body.strip.empty?
 465			return EMPromise.reject(
 466				[:modify, 'policy-violation']
 467			)
 468		end
 469
 470		if body.downcase.match?(BADWORDS)
 471			return EMPromise.reject([
 472				:wait,
 473				'recipient-unavailable',
 474				'Single message blocked by carrier content policy, see https://blog.jmp.chat/b/sms-censorship for details'
 475			])
 476		end
 477
 478		if body =~ /\u2063/
 479			return EMPromise.reject([
 480				:wait,
 481				'recipient-unavailable',
 482				'Please contact JMP support about your message'
 483			])
 484		end
 485
 486		if usern.match?(WHISPER_NUMBER)
 487			return EMPromise.reject([
 488				:cancel,
 489				'recipient-unavailable',
 490				'Please register with a backend'
 491			])
 492		end
 493
 494		segment_size = body.ascii_only? ? 160 : 70
 495		if !murl && ENV["MMS_PATH"] && num_dest =~ /^\+?1/ && body.length > segment_size*3
 496			file = Multibases.pack(
 497				'base58btc',
 498				Multihashes.encode(Digest::SHA256.digest(body), "sha2-256")
 499			).to_s
 500			File.open("#{ENV['MMS_PATH']}/#{file}", "w") { |fh| fh.write body }
 501			murl = "#{ENV['MMS_URL']}/#{file}.txt"
 502			body = ""
 503		end
 504
 505		extra = {}
 506		extra[:media] = murl if murl
 507
 508		call_catapult(
 509			token,
 510			secret,
 511			:post,
 512			"api/v2/users/#{user_id}/messages",
 513			JSON.dump(extra.merge(
 514				from: usern,
 515				to:   num_dest,
 516				text: body,
 517				applicationId:  ARGV[4],
 518				tag:
 519					# callbacks need id and resourcepart
 520					WEBrick::HTTPUtils.escape(s.id.to_s) +
 521					' ' +
 522					WEBrick::HTTPUtils.escape(
 523						s.from.resource.to_s
 524					)
 525			)),
 526			{'Content-Type' => 'application/json'},
 527			[201, 202]
 528		).then { |response|
 529			parsed = JSON.parse(response) rescue {}
 530			MessageEvent::Out.new(
 531				timestamp: parsed["time"] || Time.now,
 532				owner: usern,
 533				from: usern,
 534				to: Array(num_dest),
 535				stanza_id: s.id.to_s,
 536				bandwidth_id: parsed["id"],
 537				body: body,
 538				media_urls: [murl].compact
 539			).emit(REDIS)
 540			response
 541		}.catch { |e|
 542			EMPromise.reject(
 543				[:cancel, 'internal-server-error', e.message]
 544			)
 545		}
 546	end
 547
 548	def self.validate_num(m)
 549		# if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
 550		if m.to == ARGV[0]
 551			an = m.children.find { |v| v.element_name == "addresses" }
 552			if not an
 553				return EMPromise.reject(
 554					[:cancel, 'item-not-found']
 555				)
 556			end
 557			LOG.debug "Found addresses node, iterating"
 558
 559			nums = []
 560			an.children.each do |e|
 561				num = ''
 562				type = ''
 563				e.attributes.each do |c|
 564					if c[0] == 'type'
 565						if c[1] != 'to'
 566							# TODO: error
 567						end
 568						type = c[1].to_s
 569					elsif c[0] == 'uri'
 570						if !c[1].to_s.start_with? 'sms:'
 571							# TODO: error
 572						end
 573						num = c[1].to_s[4..-1]
 574						# TODO: confirm num validates
 575						# TODO: else, error - unexpected name
 576					end
 577				end
 578				if num.empty? or type.empty?
 579					# TODO: error
 580				end
 581				nums << num
 582			end
 583			return nums
 584		end
 585
 586		# if not sent to SGX domain, then assume destination is in 'to'
 587		EMPromise.resolve(m.to.node.to_s).then { |num_dest|
 588			if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
 589				next num_dest if num_dest[0] == '+'
 590
 591				shortcode = extract_shortcode(num_dest)
 592				next shortcode if shortcode
 593			end
 594
 595			if anonymous_tel?(num_dest)
 596				EMPromise.reject([:cancel, 'gone'])
 597			else
 598				# TODO: text re num not (yet) supportd/implmentd
 599				EMPromise.reject([:cancel, 'item-not-found'])
 600			end
 601		}
 602	end
 603
 604	def self.fetch_catapult_cred_for(jid)
 605		@registration_repo.find(jid).then { |creds|
 606			if creds.length < 4
 607				# TODO: add text re credentials not registered
 608				EMPromise.reject(
 609					[:auth, 'registration-required']
 610				)
 611			else
 612				creds
 613			end
 614		}
 615	end
 616
 617	message :error? do |m|
 618		# TODO: report it somewhere/somehow - eat for now so no err loop
 619		LOG.warn("Eating error stanza", stanza: m.inspect)
 620		true
 621	end
 622
 623	message(->(m) { m.body || m.at("oob|x > oob|url", oob: "jabber:x:oob") }) do |m|
 624		EMPromise.all([
 625			validate_num(m),
 626			fetch_catapult_cred_for(m.from)
 627		]).then { |(num_dest, creds)|
 628			@registration_repo.find_jid(num_dest).then { |jid|
 629				[jid, num_dest] + creds
 630			}
 631		}.then { |(jid, num_dest, *creds)|
 632			if jid
 633				@registration_repo.find(jid).then { |other_user|
 634					[jid, num_dest] + creds + [other_user.first]
 635				}
 636			else
 637				[jid, num_dest] + creds + [nil]
 638			end
 639		}.then { |(jid, num_dest, *creds, other_user)|
 640			# if destination user is in the system pass on directly
 641			if other_user and not other_user.start_with? 'u-'
 642				pass_on_message(m, creds.last, jid)
 643			else
 644				to_catapult_possible_oob(m, num_dest, *creds)
 645			end
 646		}.catch { |e|
 647			if e.is_a?(Array) && (e.length == 2 || e.length == 3)
 648				write_to_stream m.as_error(e[1], e[0], e[2])
 649			else
 650				EMPromise.reject(e)
 651			end
 652		}
 653	end
 654
 655	def self.user_cap_identities
 656		[{category: 'client', type: 'sms'}]
 657	end
 658
 659	# TODO: must re-add stuff so can do ad-hoc commands
 660	def self.user_cap_features
 661		["urn:xmpp:receipts"]
 662	end
 663
 664	def self.add_gateway_feature(feature)
 665		@gateway_features << feature
 666		@gateway_features.uniq!
 667	end
 668
 669	subscription :request? do |p|
 670		# subscriptions are allowed from anyone - send reply immediately
 671		msg = Blather::Stanza::Presence.new
 672		msg.to = p.from
 673		msg.from = p.to
 674		msg.type = :subscribed
 675
 676		write_to_stream msg
 677
 678		# send a <presence> immediately; not automatically probed for it
 679		# TODO: refactor so no "presence :probe? do |p|" duplicate below
 680		caps = Blather::Stanza::Capabilities.new
 681		# TODO: user a better node URI (?)
 682		caps.node = 'http://catapult.sgx.soprani.ca/'
 683		caps.identities = user_cap_identities
 684		caps.features = user_cap_features
 685
 686		msg = caps.c
 687		msg.to = p.from
 688		msg.from = p.to.to_s + '/sgx'
 689
 690		write_to_stream msg
 691
 692		# need to subscribe back so Conversations displays images inline
 693		msg = Blather::Stanza::Presence.new
 694		msg.to = p.from.to_s.split('/', 2)[0]
 695		msg.from = p.to.to_s.split('/', 2)[0]
 696		msg.type = :subscribe
 697
 698		write_to_stream msg
 699
 700		true
 701	end
 702
 703	presence :probe? do |p|
 704		caps = Blather::Stanza::Capabilities.new
 705		# TODO: user a better node URI (?)
 706		caps.node = 'http://catapult.sgx.soprani.ca/'
 707		caps.identities = user_cap_identities
 708		caps.features = user_cap_features
 709
 710		msg = caps.c
 711		msg.to = p.from
 712		msg.from = p.to.to_s + '/sgx'
 713
 714		write_to_stream msg
 715
 716		true
 717	end
 718
 719	disco_items(
 720		to: Blather::JID.new(ARGV[0]),
 721		node: "http://jabber.org/protocol/commands"
 722	) do |i|
 723		fetch_catapult_cred_for(i.from).then { |creds|
 724			BandwidthTNOptions.tn_eligible_for_port_out_pin?(creds).then { |eligible|
 725				reply = i.reply
 726				reply.node = 'http://jabber.org/protocol/commands'
 727
 728				if eligible
 729					reply.items = [
 730						Blather::Stanza::DiscoItems::Item.new(
 731							i.to,
 732							'set-port-out-pin',
 733							'Set Port-Out PIN'
 734						)
 735					]
 736				else
 737					reply.items = []
 738				end
 739
 740				write_to_stream reply
 741			}
 742		}.catch { |e|
 743			if e.is_a?(Array) && [2, 3].include?(e.length)
 744				write_to_stream i.as_error(e[1], e[0], e[2])
 745			else
 746				EMPromise.reject(e)
 747			end
 748		}
 749	end
 750
 751	iq '/iq/ns:query', ns:	'http://jabber.org/protocol/disco#info' do |i|
 752		# TODO: return error if i.type is :set - if it is :reply or
 753		#  :error it should be ignored (as the below does currently);
 754		#  review specification to see how to handle other type values
 755		if i.type != :get
 756			LOG.warn("Ignoring non-get disco IQ", type: i.type.to_s, stanza: i.inspect)
 757			next
 758		end
 759
 760		# respond to capabilities request for an sgx-bwmsgsv2 number JID
 761		if i.to.node
 762			# TODO: confirm the node URL is expected using below
 763			#puts "XR[node]: #{xpath_result[0]['node']}"
 764
 765			msg = i.reply
 766			msg.node = i.node
 767			msg.identities = user_cap_identities
 768			msg.features = user_cap_features
 769
 770			write_to_stream msg
 771			next
 772		end
 773
 774		# respond to capabilities request for sgx-bwmsgsv2 itself
 775		msg = i.reply
 776		msg.node = i.node
 777		msg.identities = [{
 778			name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
 779			type: 'sms', category: 'gateway'
 780		}]
 781		msg.features = @gateway_features
 782		write_to_stream msg
 783
 784		true
 785	end
 786
 787	def self.check_then_register(i, *creds)
 788		@registration_repo
 789			.put(i.from, *creds)
 790			.catch_only(RegistrationRepo::Conflict) { |e|
 791				EMPromise.reject([:cancel, 'conflict', e.message])
 792			}.then {
 793				write_to_stream i.reply
 794			}
 795	end
 796
 797	def self.creds_from_registration_query(i)
 798		if i.query.find_first("./ns:x", ns: "jabber:x:data")
 799			[
 800				i.form.field("nick")&.value,
 801				i.form.field("username")&.value,
 802				i.form.field("password")&.value,
 803				i.form.field("phone")&.value
 804			]
 805		else
 806			[i.nick, i.username, i.password, i.phone]
 807		end
 808	end
 809
 810	def self.process_registration(i)
 811		EMPromise.resolve(nil).then {
 812			if i.remove?
 813				@registration_repo.delete(i.from).then do
 814					write_to_stream i.reply
 815					EMPromise.reject(:done)
 816				end
 817			else
 818				creds_from_registration_query(i)
 819			end
 820		}.then { |user_id, api_token, api_secret, phone_num|
 821			if phone_num && phone_num[0] == '+'
 822				[user_id, api_token, api_secret, phone_num]
 823			else
 824				# TODO: add text re number not (yet) supported
 825				EMPromise.reject([:cancel, 'item-not-found'])
 826			end
 827		}.then { |user_id, api_token, api_secret, phone_num|
 828			# TODO: find way to verify #{phone_num}, too
 829			call_catapult(
 830				api_token,
 831				api_secret,
 832				:get,
 833				"api/v2/users/#{user_id}/media",
 834				nil,
 835				{'Content-Type' => 'application/json'}
 836			).then { |response|
 837				JSON.parse(response)
 838				# TODO: confirm response is array - could be empty
 839
 840				LOG.debug("Registration verify response", response: response.to_s[0..999])
 841
 842				check_then_register(
 843					i,
 844					user_id,
 845					api_token,
 846					api_secret,
 847					phone_num
 848				)
 849			}
 850		}.catch_only(BandwidthError) { |e|
 851			EMPromise.reject(case e.code
 852			when 401
 853				[:auth, 'not-authorized', e.to_s]
 854			when 404
 855				[:cancel, 'item-not-found', e.to_s]
 856			else
 857				[:modify, 'not-acceptable', e.to_s]
 858			end)
 859		}
 860	end
 861
 862	def self.registration_form(orig, existing_number=nil)
 863		orig.registered = !!existing_number
 864
 865		# TODO: update "User Id" x2 below (to "accountId"?), and others?
 866		orig.instructions = "Enter the information from your Account "\
 867			"page as well as the Phone Number\nin your "\
 868			"account you want to use (ie. '+12345678901')"\
 869			".\nUser Id is nick, API Token is username, "\
 870			"API Secret is password, Phone Number is phone"\
 871			".\n\nThe source code for this gateway is at "\
 872			"https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
 873			"\nCopyright (C) 2017-2020  Denver Gingerich "\
 874			"and others, licensed under AGPLv3+."
 875		orig.nick = ""
 876		orig.username = ""
 877		orig.password = ""
 878		orig.phone = existing_number.to_s
 879
 880		orig.form.fields = [
 881			{
 882				required: true, type: :"text-single",
 883				label: 'User Id', var: 'nick'
 884			},
 885			{
 886				required: true, type: :"text-single",
 887				label: 'API Token', var: 'username'
 888			},
 889			{
 890				required: true, type: :"text-private",
 891				label: 'API Secret', var: 'password'
 892			},
 893			{
 894				required: true, type: :"text-single",
 895				label: 'Phone Number', var: 'phone',
 896				value: existing_number.to_s
 897			}
 898		]
 899		orig.form.title = 'Register for '\
 900			'Soprani.ca Gateway to XMPP - Bandwidth API V2'
 901		orig.form.instructions = "Enter the details from your Account "\
 902			"page as well as the Phone Number\nin your "\
 903			"account you want to use (ie. '+12345678901')"\
 904			".\n\nThe source code for this gateway is at "\
 905			"https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
 906			"\nCopyright (C) 2017-2020  Denver Gingerich "\
 907			"and others, licensed under AGPLv3+."
 908
 909		orig
 910	end
 911
 912	ibr do |i|
 913		case i.type
 914		when :set
 915			process_registration(i)
 916		when :get
 917			bare_jid = i.from.stripped
 918			@registration_repo.find(bare_jid).then { |creds|
 919				reply = registration_form(i.reply, creds.last)
 920				write_to_stream reply
 921			}
 922		else
 923			# Unknown IQ, ignore for now
 924			EMPromise.reject(:done)
 925		end.catch { |e|
 926			if e.is_a?(Array) && (e.length == 2 || e.length == 3)
 927				write_to_stream i.as_error(e[1], e[0], e[2])
 928			elsif e != :done
 929				EMPromise.reject(e)
 930			end
 931		}.catch(&method(:panic))
 932	end
 933
 934	command :execute?, node: "set-port-out-pin", sessionid: nil do |iq|
 935		# Ensure user is registered, but discard their credentials
 936		# because we're just showing them a form.
 937		fetch_catapult_cred_for(iq.from).then { |_creds|
 938			reply = iq.reply
 939			reply.node = 'set-port-out-pin'
 940			reply.sessionid = SecureRandom.uuid
 941			reply.status = :executing
 942
 943			form = Blather::Stanza::X.find_or_create(reply.command)
 944			form.type = "form"
 945			form.fields = [
 946				{
 947					var: 'pin',
 948					type: 'text-private',
 949					label: 'Port-Out PIN',
 950					required: true
 951				},
 952				{
 953					var: 'confirm_pin',
 954					type: 'text-private',
 955					label: 'Confirm PIN',
 956					required: true
 957				}
 958			]
 959
 960			reply.command.add_child(form)
 961			reply.allowed_actions = [:complete]
 962
 963			write_to_stream reply
 964		}.catch { |e|
 965			if e.is_a?(Array) && [2, 3].include?(e.length)
 966				write_to_stream iq.as_error(e[1], e[0], e[2])
 967			else
 968				EMPromise.reject(e)
 969			end
 970		}.catch(&method(:panic))
 971	end
 972
 973	command :complete?, node: "set-port-out-pin", sessionid: /./ do |iq|
 974		pin = iq.form.field('pin')&.value
 975		confirm_pin = iq.form.field('confirm_pin')&.value
 976
 977		if pin.nil? || confirm_pin.nil?
 978			write_to_stream iq.as_error(
 979				'bad-request',
 980				:modify,
 981				'PIN fields are required'
 982			)
 983			next
 984		end
 985
 986		if pin != confirm_pin
 987			write_to_stream iq.as_error(
 988				'bad-request',
 989				:modify,
 990				'PIN confirmation does not match'
 991			)
 992			next
 993		end
 994
 995		if pin !~ /\A[a-zA-Z0-9]{4,10}\z/
 996			write_to_stream iq.as_error(
 997				'bad-request',
 998				:modify,
 999				'PIN must be 4-10 alphanumeric characters'
1000			)
1001			next
1002		end
1003
1004		fetch_catapult_cred_for(iq.from).then { |creds|
1005			BandwidthTNOptions.set_port_out_pin(creds, pin).then {
1006				reply = iq.reply
1007				reply.node = 'set-port-out-pin'
1008				reply.sessionid = iq.sessionid
1009				reply.status = :completed
1010				reply.note_type = :info
1011				reply.note_text = 'Port-out PIN has been set successfully.'
1012
1013				write_to_stream reply
1014			}.catch { |e|
1015				reply = iq.reply
1016				reply.node = 'set-port-out-pin'
1017				reply.sessionid = iq.sessionid
1018				reply.status = :completed
1019				reply.note_type = :error
1020				error_msg = if e.respond_to?(:message) && e.message.include?('not valid')
1021					"Invalid phone number format. "\
1022					"Please check your registered phone number."
1023				elsif e.respond_to?(:message) && e.message.include?('ErrorCode')
1024					"Bandwidth API error: #{e.message}"
1025				else
1026					"Failed to set port-out PIN. Please try again later."
1027				end
1028				reply.note_text = error_msg
1029
1030				write_to_stream reply
1031			}
1032		}.catch { |e|
1033			if e.is_a?(Array) && [2, 3].include?(e.length)
1034				write_to_stream iq.as_error(e[1], e[0], e[2])
1035			else
1036				EMPromise.reject(e)
1037			end
1038		}.catch(&method(:panic))
1039	end
1040
1041	iq type: [:get, :set] do |iq|
1042		write_to_stream(Blather::StanzaError.new(
1043			iq,
1044			'feature-not-implemented',
1045			:cancel
1046		))
1047
1048		true
1049	end
1050end
1051
1052class ReceiptMessage < Blather::Stanza
1053	def self.new(to=nil)
1054		node = super :message
1055		node.to = to
1056		node
1057	end
1058end
1059
1060class WebhookHandler < Goliath::API
1061	use Sentry::Rack::CaptureExceptions
1062	use Goliath::Rack::Params
1063
1064	def response(env)
1065		@registration_repo = RegistrationRepo.new
1066		# TODO: add timestamp grab here, and MUST include ./tai version
1067
1068		LOG.debug("Webhook request env", env: env.reject { |k| k == 'params' })
1069
1070		if params.empty?
1071			LOG.warn "Empty webhook params"
1072			return [200, {}, "OK"]
1073		end
1074
1075		if env['REQUEST_URI'] != '/'
1076			LOG.warn("Non-/ request", uri: env['REQUEST_URI'], method: env['REQUEST_METHOD'])
1077			return [200, {}, "OK"]
1078		end
1079
1080		if env['REQUEST_METHOD'] != 'POST'
1081			LOG.warn("Non-POST request", uri: env['REQUEST_URI'], method: env['REQUEST_METHOD'])
1082			return [200, {}, "OK"]
1083		end
1084
1085		# TODO: process each message in list, not just first one
1086		jparams = params.dig('_json', 0, 'message')
1087		type = params.dig('_json', 0, 'type')
1088
1089		return [400, {}, "Missing params\n"] unless jparams && type
1090
1091		users_num, others_num =
1092		if type == 'message-failed' # NOTE: This implies direction == 'out'
1093			[jparams['from'], params['_json'][0]['to']]
1094		elsif jparams['direction'] == 'in'
1095			[jparams['owner'], jparams['from']]
1096		elsif jparams['direction'] == 'out'
1097			[jparams['from'], jparams['owner']] # NOTE: for outbound, 'from' == 'owner'
1098		else
1099			LOG.error("Unexpected message direction", direction: jparams['direction'])
1100			[jparams['from'], jparams['owner']]
1101		end
1102
1103		return [400, {}, "Missing params\n"] unless users_num && others_num
1104		return [400, {}, "Missing params\n"] unless jparams['to'].is_a?(Array)
1105		return [400, {}, "Missing params\n"] if jparams['to'].empty?
1106
1107		LOG.info(
1108			"Webhook message",
1109			message_id: jparams['id'],
1110			event_type: type,
1111			time: jparams['time'],
1112			direction: jparams['direction'],
1113			delivery_state: jparams['deliveryState'],
1114			error_code: jparams['errorCode'],
1115			description: jparams['description'],
1116			tag: jparams['tag'],
1117			media: jparams['media']
1118		)
1119
1120		if others_num[0] != '+'
1121			# TODO: check that others_num actually a shortcode first
1122			others_num +=
1123				';phone-context=ca-us.phone-context.soprani.ca'
1124		end
1125
1126		bare_jid = @registration_repo.find_jid(users_num).sync
1127
1128		if !bare_jid
1129			LOG.warn("JID not found for number, BW API misconfigured?", users_num: users_num)
1130
1131			return [403, {}, "Customer not found\n"]
1132		end
1133
1134		msg = nil
1135		case jparams['direction']
1136		when 'in'
1137			text = ''
1138			case type
1139			when 'message-received'
1140				# TODO: handle group chat, and fix above
1141				text = jparams['text']
1142
1143				if text.to_s.empty? && Array(jparams['media']).empty?
1144					return [400, {}, "Missing params\n"]
1145				end
1146
1147				if jparams['to'].length > 1
1148					msg = Blather::Stanza::Message.new(
1149						Blather::JID.new(bare_jid).domain
1150					)
1151					msg.body = text unless text&.empty?
1152
1153					addrs = Nokogiri::XML::Node.new(
1154						'addresses', msg.document)
1155					addrs['xmlns'] = 'http://jabber.org/' \
1156						'protocol/address'
1157
1158					addr1 = Nokogiri::XML::Node.new(
1159						'address', msg.document)
1160					addr1['type'] = 'to'
1161					addr1['jid'] = bare_jid
1162					addrs.add_child(addr1)
1163
1164					jparams['to'].reject(
1165						# Don't send to the same person twice,
1166						# and don't send to the person who sent it
1167						&[users_num, others_num].method(:include?)
1168					).each do |receiver|
1169						addrn = Nokogiri::XML::Node.new(
1170							'address', msg.document)
1171						addrn['type'] = 'to'
1172						addrn['uri'] = "sms:#{receiver}"
1173						addrn['delivered'] = 'true'
1174						addrs.add_child(addrn)
1175					end
1176
1177					msg.add_child(addrs)
1178				end
1179
1180				media_urls = Array(jparams['media']).map { |media_url|
1181					unless media_url.end_with?(
1182						'.smil', '.txt', '.xml'
1183					)
1184						SGXbwmsgsv2.send_media(
1185							others_num + '@' +
1186							ARGV[0],
1187							bare_jid, media_url,
1188							nil, nil, msg
1189						)
1190						media_url
1191					end
1192				}.compact
1193
1194				if text&.empty? || (media_urls.any? && jparams['to'].length > 1)
1195					if !env['HTTP_X_JMP_RESEND_OF'].to_s.empty?
1196						MessageEvent::ResendIn.new(
1197							original_stream_id: env['HTTP_X_JMP_RESEND_OF'],
1198							original_bandwidth_id: jparams['id'],
1199							owner: jparams['owner']
1200						).emit(REDIS)
1201					else
1202						MessageEvent::In.new(
1203							timestamp: jparams['time'],
1204							from: jparams['from'],
1205							to: jparams['to'],
1206							owner: jparams['owner'],
1207							bandwidth_id: jparams['id'],
1208							body: jparams['text'].to_s,
1209							media_urls: media_urls
1210						).emit(REDIS)
1211					end
1212
1213					return [200, {}, "OK"]
1214				end
1215
1216				if !text || text.empty? || (jparams['to'].length > 1 && media_urls.any?)
1217					return [200, {}, "OK"]
1218				end
1219			else
1220				text = "unknown type (#{type})" \
1221					" with text: #{jparams['text']}"
1222
1223				# TODO: log/notify of this properly
1224				LOG.warn("Unknown inbound message type", text: text)
1225			end
1226
1227			# If text is not empty, but there isn't a msg,
1228			# we need to construct a msg to convey that text
1229			unless msg || text.to_s.empty?
1230				msg = Blather::Stanza::Message.new(
1231					bare_jid,
1232					# Strip control codes.
1233					# This only happened, or at least caused a problem, once,
1234					# but it seems sensible to say that we shouldn't be getting
1235					# control codes from the PSTN.
1236					text.gsub(/[\u0000-\u0008\u000b-\u001f]/, "")
1237				)
1238				msg.document.encoding = "utf-8"
1239				msg.chat_state = nil
1240			end
1241		else # per prior switch, this is:  jparams['direction'] == 'out'
1242			tag_parts = jparams['tag'].split(/ /, 2)
1243			id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1244			resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
1245
1246			# TODO: remove this hack
1247			if jparams['to'].length > 1
1248				case type
1249				when 'message-failed'
1250					MessageEvent::Failed.new(
1251						timestamp: jparams['time'],
1252						stanza_id: id,
1253						bandwidth_id: jparams['id'],
1254						error_code: jparams['errorCode'].to_s,
1255						error_description: jparams['description'].to_s
1256					).emit(REDIS)
1257				when 'message-delivered'
1258					MessageEvent::Delivered.new(
1259						timestamp: jparams['time'],
1260						stanza_id: id,
1261						bandwidth_id: jparams['id']
1262					).emit(REDIS)
1263				end
1264				LOG.warn("Group message, skipping receipt", users_num: users_num)
1265				return [200, {}, "OK"]
1266			end
1267
1268			case type
1269			when 'message-failed'
1270				# create a bare message like the one user sent
1271				msg = Blather::Stanza::Message.new(
1272					others_num + '@' + ARGV[0])
1273				msg.from = bare_jid + '/' + resourcepart
1274				msg['id'] = id
1275
1276				# TODO: add 'errorCode' and/or 'description' val
1277				# create an error reply to the bare message
1278				msg = msg.as_error(
1279					'recipient-unavailable',
1280					:wait,
1281					params['_json'][0]['description']
1282				)
1283
1284			when 'message-delivered'
1285
1286				msg = ReceiptMessage.new(bare_jid)
1287
1288				# TODO: put in member/instance variable
1289				msg['id'] = SecureRandom.uuid
1290
1291				# TODO: send only when requested per XEP-0184
1292				rcvd = Nokogiri::XML::Node.new(
1293					'received',
1294					msg.document
1295				)
1296				rcvd['xmlns'] = 'urn:xmpp:receipts'
1297				rcvd['id'] = id
1298				msg.add_child(rcvd)
1299
1300				# TODO: make prettier: this should be done above
1301				others_num = params['_json'][0]['to']
1302			else
1303				# TODO: notify somehow of unknown state receivd?
1304				LOG.warn("Unknown outbound message type", id: id, type: type)
1305				return [200, {}, "OK"]
1306			end
1307
1308			# Keeping this due to the `msg.from=` shuffle just below
1309			LOG.debug("Outbound callback response", stanza: msg.inspect)
1310		end
1311
1312		# if message-failed, we already set msg.from
1313		# moreover, we said `msg = msg.as_error`, and StanzaError
1314		if msg
1315			msg.from = others_num + '@' + ARGV[0] if msg.respond_to?(:from=)
1316			SGXbwmsgsv2.write(msg)
1317		end
1318
1319		# Emit event to messages stream
1320		case [jparams['direction'], type]
1321		when ['in', 'message-received']
1322			if !env['HTTP_X_JMP_RESEND_OF'].to_s.empty?
1323				MessageEvent::ResendIn.new(
1324					original_stream_id: env['HTTP_X_JMP_RESEND_OF'],
1325					original_bandwidth_id: jparams['id'],
1326					owner: jparams['owner']
1327				).emit(REDIS)
1328			else
1329				media_urls = Array(jparams['media']).reject { |u|
1330					u.end_with?('.smil', '.txt', '.xml')
1331				}
1332				MessageEvent::In.new(
1333					timestamp: jparams['time'],
1334					from: jparams['from'],
1335					to: jparams['to'],
1336					owner: jparams['owner'],
1337					bandwidth_id: jparams['id'],
1338					body: jparams['text'].to_s,
1339					media_urls: media_urls
1340				).emit(REDIS)
1341			end
1342		when ['out', 'message-failed']
1343			tag_parts = jparams['tag'].split(/ /, 2)
1344			stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1345			MessageEvent::Failed.new(
1346				timestamp: jparams['time'],
1347				stanza_id: stanza_id,
1348				bandwidth_id: jparams['id'],
1349				error_code: jparams['errorCode'].to_s,
1350				error_description: jparams['description'].to_s
1351			).emit(REDIS)
1352		when ['out', 'message-delivered']
1353			tag_parts = jparams['tag'].split(/ /, 2)
1354			stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1355			MessageEvent::Delivered.new(
1356				timestamp: jparams['time'],
1357				stanza_id: stanza_id,
1358				bandwidth_id: jparams['id']
1359			).emit(REDIS)
1360		end
1361
1362		[200, {}, "OK"]
1363	rescue Exception => e
1364		Sentry.capture_exception(e)
1365		[500, {}, "Error"]
1366	end
1367end
1368
1369at_exit do
1370	LOG.info("Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2",
1371		version: `git rev-parse HEAD`.chomp)
1372
1373	if ARGV.size != 7
1374		warn "Usage: sgx-bwmsgsv2.rb <component_jid> "\
1375			"<component_password> <server_hostname> "\
1376			"<server_port> <application_id> "\
1377			"<http_listen_port> <mms_proxy_prefix_url>"
1378		exit 0
1379	end
1380
1381	LOG.info "Starting"
1382
1383	EM.run do
1384		REDIS = EM::Hiredis.connect
1385
1386		SGXbwmsgsv2.run
1387
1388		# required when using Prosody otherwise disconnects on 6-hour inactivity
1389		EM.add_periodic_timer(3600) do
1390			msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1391			msg.from = ARGV[0]
1392			SGXbwmsgsv2.write(msg)
1393		end
1394
1395		server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
1396		server.api = WebhookHandler.new
1397		server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1398		server.logger = LOG
1399		server.start do
1400			["INT", "TERM"].each do |sig|
1401				trap(sig) do
1402					EM.defer do
1403						LOG.info "Shutting down gateway"
1404						SGXbwmsgsv2.shutdown
1405
1406						LOG.info "Gateway has terminated"
1407						EM.stop
1408					end
1409				end
1410			end
1411		end
1412	end
1413end unless ENV['ENV'] == 'test'