sgx-bwmsgsv2.rb

   1#!/usr/bin/env ruby
   2# frozen_string_literal: true
   3
   4# Copyright (C) 2017-2020  Denver Gingerich <denver@ossguy.com>
   5# Copyright (C) 2017  Stephen Paul Weber <singpolyma@singpolyma.net>
   6#
   7# This file is part of sgx-bwmsgsv2.
   8#
   9# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
  10# the terms of the GNU Affero General Public License as published by the Free
  11# Software Foundation, either version 3 of the License, or (at your option) any
  12# later version.
  13#
  14# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
  15# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16# FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more
  17# details.
  18#
  19# You should have received a copy of the GNU Affero General Public License along
  20# with sgx-bwmsgsv2.  If not, see <http://www.gnu.org/licenses/>.
  21
  22require 'delegate'
  23require 'blather/client/dsl'
  24require 'em-hiredis'
  25require 'em-http-request'
  26require 'json'
  27require 'multibases'
  28require 'multihashes'
  29require 'securerandom'
  30require "sentry-ruby"
  31require 'time'
  32require 'uri'
  33require 'webrick'
  34
  35require 'goliath/api'
  36require 'goliath/server'
  37require 'log4r'
  38
  39require 'em_promise'
  40require 'em-synchrony'
  41
  42require_relative 'lib/bandwidth_error'
  43require_relative 'lib/bandwidth_tn_options'
  44require_relative 'lib/message_event'
  45require_relative 'lib/registration_repo'
  46
  47Sentry.init
  48
  49# List of supported MIME types from Bandwidth - https://support.bandwidth.com/hc/en-us/articles/360014128994-What-MMS-file-types-are-supported-
  50MMS_MIME_TYPES = [
  51	"application/json",
  52	"application/ogg",
  53	"application/pdf",
  54	"application/rtf",
  55	"application/zip",
  56	"application/x-tar",
  57	"application/xml",
  58	"application/gzip",
  59	"application/x-bzip2",
  60	"application/x-gzip",
  61	"application/smil",
  62	"application/javascript",
  63	"audio/mp4",
  64	"audio/mpeg",
  65	"audio/ogg",
  66	"audio/flac",
  67	"audio/webm",
  68	"audio/wav",
  69	"audio/amr",
  70	"audio/3gpp",
  71	"image/bmp",
  72	"image/gif",
  73	"image/jpeg",
  74	"image/pjpeg",
  75	"image/png",
  76	"image/svg+xml",
  77	"image/tiff",
  78	"image/webp",
  79	"image/x-icon",
  80	"text/css",
  81	"text/csv",
  82	"text/calendar",
  83	"text/plain",
  84	"text/javascript",
  85	"text/vcard",
  86	"text/vnd.wap.wml",
  87	"text/xml",
  88	"video/avi",
  89	"video/mp4",
  90	"video/mpeg",
  91	"video/ogg",
  92	"video/quicktime",
  93	"video/webm",
  94	"video/x-ms-wmv",
  95	"video/x-flv"
  96]
  97
  98def panic(e)
  99	Sentry.capture_exception(e)
 100	puts "Shutting down gateway due to exception: #{e.message}"
 101	puts e.backtrace
 102	SGXbwmsgsv2.shutdown
 103	puts 'Gateway has terminated.'
 104	EM.stop
 105end
 106
 107EM.error_handler(&method(:panic))
 108
 109def extract_shortcode(dest)
 110	num, context = dest.split(';', 2)
 111	num if context == 'phone-context=ca-us.phone-context.soprani.ca'
 112end
 113
 114def anonymous_tel?(dest)
 115	dest.split(';', 2)[1] == 'phone-context=anonymous.phone-context.soprani.ca'
 116end
 117
 118class SGXClient < Blather::Client
 119	def handle_data(stanza)
 120		promise = EMPromise.resolve(nil).then {
 121			with_sentry(stanza) do |scope|
 122				super
 123			rescue StandardError => e
 124				handle_error(scope, stanza, e)
 125			end
 126		}.catch { |e| panic(e) }
 127		promise.sync if ENV["ENV"] == "test"
 128		promise
 129	end
 130
 131	# Override the default call_handler to syncify during testing.
 132	def call_handler(handler, guards, stanza)
 133		result = if guards.first.respond_to?(:to_str)
 134			found = stanza.find(*guards)
 135			throw :pass if found.empty?
 136
 137			handler.call(stanza, found)
 138		else
 139			throw :pass if guarded?(guards, stanza)
 140
 141			handler.call(stanza)
 142		end
 143
 144		# Up to here, identical to upstream impl
 145
 146		return result unless result.is_a?(Promise)
 147
 148		result.sync if ENV["ENV"] == "test"
 149		result
 150	end
 151
 152protected
 153
 154	def with_sentry(stanza)
 155		Sentry.clone_hub_to_current_thread
 156
 157		Sentry.with_scope do |scope|
 158			setup_scope(stanza, scope)
 159			yield scope
 160		ensure
 161			scope.get_transaction&.then do |tx|
 162				tx.set_status("ok") unless tx.status
 163				tx.finish
 164			end
 165		end
 166	end
 167
 168	def setup_scope(stanza, scope)
 169		name = stanza.respond_to?(:node) ? stanza.node : stanza.name
 170		scope.clear_breadcrumbs
 171		scope.set_transaction_name(name)
 172		scope.set_user(jid: stanza.from&.stripped.to_s)
 173
 174		transaction = Sentry.start_transaction(
 175			name: name,
 176			op: "blather.handle_data"
 177		)
 178		scope.set_span(transaction) if transaction
 179	end
 180
 181	def handle_error(scope, stanza, e)
 182		puts "Error raised during #{scope.transaction_name}: #{e.class}"
 183		puts e.message
 184		puts e.backtrace
 185		Sentry.capture_exception(e) unless e.is_a?(Sentry::Error)
 186		scope.get_transaction&.set_status("internal_error")
 187		return if e.respond_to?(:replied?) && e.replied?
 188
 189		SGXbwmsgsv2.write_to_stream stanza.as_error("internal-server-error", :cancel)
 190	end
 191end
 192
 193# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
 194module CatapultSettingFlagBits
 195	VOICEMAIL_TRANSCRIPTION_DISABLED = 0
 196	MMS_ON_OOB_URL = 1
 197end
 198
 199module SGXbwmsgsv2
 200	extend Blather::DSL
 201
 202	@registration_repo = RegistrationRepo.new
 203	@client = SGXClient.new
 204	@gateway_features = [
 205		"http://jabber.org/protocol/disco#info",
 206		"http://jabber.org/protocol/address/",
 207		"jabber:iq:register",
 208		"http://jabber.org/protocol/commands"
 209	]
 210
 211	def self.run
 212		# TODO: read/save ARGV[7] creds to local variables
 213		client.run
 214	end
 215
 216	# so classes outside this module can write messages, too
 217	def self.write(stanza)
 218		client.write(stanza)
 219	end
 220
 221	def self.before_handler(type, *guards, &block)
 222		client.register_handler_before(type, *guards, &block)
 223	end
 224
 225	def self.send_media(from, to, media_url, desc=nil, subject=nil, m=nil)
 226		# we assume media_url is one of these (always the case so far):
 227		#  https://messaging.bandwidth.com/api/v2/users/[u]/media/[path]
 228
 229		puts 'ORIG_URL: ' + media_url
 230		usr = to
 231		if media_url.start_with?('https://messaging.bandwidth.com/api/v2/users/')
 232			pth = media_url.split('/', 9)[8]
 233			# the caller must guarantee that 'to' is a bare JID
 234			media_url = ARGV[6] + WEBrick::HTTPUtils.escape(usr) + '/' + pth
 235			puts 'PROX_URL: ' + media_url
 236		end
 237
 238		msg = m ? m.copy : Blather::Stanza::Message.new(to, "")
 239		msg.from = from
 240		msg.subject = subject if subject
 241
 242		# provide URL in XEP-0066 (OOB) fashion
 243		x = Nokogiri::XML::Node.new 'x', msg.document
 244		x['xmlns'] = 'jabber:x:oob'
 245
 246		urln = Nokogiri::XML::Node.new 'url', msg.document
 247		urlc = Nokogiri::XML::Text.new media_url, msg.document
 248		urln.add_child(urlc)
 249		x.add_child(urln)
 250
 251		if desc
 252			descn = Nokogiri::XML::Node.new('desc', msg.document)
 253			descc = Nokogiri::XML::Text.new(desc, msg.document)
 254			descn.add_child(descc)
 255			x.add_child(descn)
 256		end
 257
 258		msg.add_child(x)
 259
 260		write(msg)
 261	rescue Exception => e
 262		panic(e)
 263	end
 264
 265	setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, async: true
 266
 267	def self.pass_on_message(m, users_num, jid)
 268		# Capture destination before modifying m.to
 269		dest_num = m.to.node
 270
 271		# setup delivery receipt; similar to a reply
 272		rcpt = ReceiptMessage.new(m.from.stripped)
 273		rcpt.from = m.to
 274
 275		# pass original message (before sending receipt)
 276		m.to = jid
 277		m.from = "#{users_num}@#{ARGV[0]}"
 278
 279		puts 'XRESPONSE0: ' + m.inspect
 280		write_to_stream m
 281
 282		# Emit pass-through event. Thru events don't capture a timestamp because XMPP
 283		# stanzas don't carry timestamps for realtime messages, and the Redis stream
 284		# ID provides the emit time.
 285		oob_url = m.at("oob|x > oob|url", oob: "jabber:x:oob")&.text
 286		MessageEvent::Thru.new(
 287			owner: users_num,
 288			from: users_num,
 289			to: [dest_num],
 290			stanza_id: m.id.to_s,
 291			body: m.body.to_s,
 292			media_urls: [oob_url].compact
 293		).emit(REDIS)
 294
 295		# send a delivery receipt back to the sender
 296		# TODO: send only when requested per XEP-0184
 297		# TODO: pass receipts from target if supported
 298
 299		# TODO: put in member/instance variable
 300		rcpt['id'] = SecureRandom.uuid
 301		rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
 302		rcvd['xmlns'] = 'urn:xmpp:receipts'
 303		rcvd['id'] = m.id
 304		rcpt.add_child(rcvd)
 305
 306		puts 'XRESPONSE1: ' + rcpt.inspect
 307		write_to_stream rcpt
 308	end
 309
 310	def self.call_catapult(
 311		token, secret, m, pth, body=nil,
 312		head={}, code=[200], respond_with=:body
 313	)
 314		# pth looks like one of:
 315		#  "api/v2/users/#{user_id}/[endpoint_name]"
 316
 317		url_prefix = ''
 318
 319		# TODO: need to make a separate thing for voice.bw.c eventually
 320		if pth.start_with? 'api/v2/users'
 321			url_prefix = 'https://messaging.bandwidth.com/'
 322		end
 323
 324		EM::HttpRequest.new(
 325			url_prefix + pth
 326		).public_send(
 327			"a#{m}",
 328			head: {
 329				'Authorization' => [token, secret]
 330			}.merge(head),
 331			body: body
 332		).then { |http|
 333			puts "API response to send: #{http.response} with code"\
 334				" response.code #{http.response_header.status}"
 335
 336			if code.include?(http.response_header.status)
 337				case respond_with
 338				when :body
 339					http.response
 340				when :headers
 341					http.response_header
 342				else
 343					http
 344				end
 345			else
 346				EMPromise.reject(
 347					BandwidthError.for(http.response_header.status, http.response)
 348				)
 349			end
 350		}
 351	end
 352
 353	def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
 354		usern)
 355		un = s.at("oob|x > oob|url", oob: "jabber:x:oob")
 356		unless un
 357			puts "MMSOOB: no url node found so process as normal"
 358			return to_catapult(s, nil, num_dest, user_id, token,
 359				secret, usern)
 360		end
 361		puts "MMSOOB: found a url node - checking if to make MMS..."
 362
 363		body = s.respond_to?(:body) ? s.body : ''
 364		EM::HttpRequest.new(un.text, tls: { verify_peer: true }).ahead.then { |http|
 365			# If content is too large, or MIME type is not supported, place the link inside the body and do not send MMS.
 366			if http.response_header["CONTENT_LENGTH"].to_i > 3500000 ||
 367			   !MMS_MIME_TYPES.include?(http.response_header["CONTENT_TYPE"])
 368				unless body.include?(un.text)
 369					s.body = body.empty? ? un.text : "#{body}\n#{un.text}"
 370				end
 371				to_catapult(s, nil, num_dest, user_id, token, secret, usern)
 372			else # If size is less than ~3.5MB, strip the link from the body and attach media in the body.
 373				# some clients send URI in both body & <url/> so delete
 374				s.body = body.sub(/\s*#{Regexp.escape(un.text)}\s*$/, '')
 375
 376				puts "MMSOOB: url text is '#{un.text}'"
 377				puts "MMSOOB: the body is '#{body.to_s.strip}'"
 378
 379				puts "MMSOOB: sending MMS since found OOB & user asked"
 380				to_catapult(s, un.text, num_dest, user_id, token, secret, usern)
 381			end
 382		}
 383	end
 384
 385	def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
 386		body = s.respond_to?(:body) ? s.body.to_s : ''
 387		if murl.to_s.empty? && body.strip.empty?
 388			return EMPromise.reject(
 389				[:modify, 'policy-violation']
 390			)
 391		end
 392
 393		segment_size = body.ascii_only? ? 160 : 70
 394		if !murl && ENV["MMS_PATH"] && body.length > segment_size*3
 395			file = Multibases.pack(
 396				'base58btc',
 397				Multihashes.encode(Digest::SHA256.digest(body), "sha2-256")
 398			).to_s
 399			File.open("#{ENV['MMS_PATH']}/#{file}", "w") { |fh| fh.write body }
 400			murl = "#{ENV['MMS_URL']}/#{file}.txt"
 401			body = ""
 402		end
 403
 404		extra = {}
 405		extra[:media] = murl if murl
 406
 407		call_catapult(
 408			token,
 409			secret,
 410			:post,
 411			"api/v2/users/#{user_id}/messages",
 412			JSON.dump(extra.merge(
 413				from: usern,
 414				to:   num_dest,
 415				text: body,
 416				applicationId:  ARGV[4],
 417				tag:
 418					# callbacks need id and resourcepart
 419					WEBrick::HTTPUtils.escape(s.id.to_s) +
 420					' ' +
 421					WEBrick::HTTPUtils.escape(
 422						s.from.resource.to_s
 423					)
 424			)),
 425			{'Content-Type' => 'application/json'},
 426			[201]
 427		).then { |response|
 428			parsed = JSON.parse(response) rescue {}
 429			MessageEvent::Out.new(
 430				timestamp: parsed["time"] || Time.now,
 431				owner: usern,
 432				from: usern,
 433				to: Array(num_dest),
 434				stanza_id: s.id.to_s,
 435				bandwidth_id: parsed["id"],
 436				body: body,
 437				media_urls: [murl].compact
 438			).emit(REDIS)
 439			response
 440		}.catch { |e|
 441			EMPromise.reject(
 442				[:cancel, 'internal-server-error', e.message]
 443			)
 444		}
 445	end
 446
 447	def self.validate_num(m)
 448		# if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
 449		if m.to == ARGV[0]
 450			an = m.children.find { |v| v.element_name == "addresses" }
 451			if not an
 452				return EMPromise.reject(
 453					[:cancel, 'item-not-found']
 454				)
 455			end
 456			puts "ADRXEP: found an addresses node - iterate addrs.."
 457
 458			nums = []
 459			an.children.each do |e|
 460				num = ''
 461				type = ''
 462				e.attributes.each do |c|
 463					if c[0] == 'type'
 464						if c[1] != 'to'
 465							# TODO: error
 466						end
 467						type = c[1].to_s
 468					elsif c[0] == 'uri'
 469						if !c[1].to_s.start_with? 'sms:'
 470							# TODO: error
 471						end
 472						num = c[1].to_s[4..-1]
 473						# TODO: confirm num validates
 474						# TODO: else, error - unexpected name
 475					end
 476				end
 477				if num.empty? or type.empty?
 478					# TODO: error
 479				end
 480				nums << num
 481			end
 482			return nums
 483		end
 484
 485		# if not sent to SGX domain, then assume destination is in 'to'
 486		EMPromise.resolve(m.to.node.to_s).then { |num_dest|
 487			if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
 488				next num_dest if num_dest[0] == '+'
 489
 490				shortcode = extract_shortcode(num_dest)
 491				next shortcode if shortcode
 492			end
 493
 494			if anonymous_tel?(num_dest)
 495				EMPromise.reject([:cancel, 'gone'])
 496			else
 497				# TODO: text re num not (yet) supportd/implmentd
 498				EMPromise.reject([:cancel, 'item-not-found'])
 499			end
 500		}
 501	end
 502
 503	def self.fetch_catapult_cred_for(jid)
 504		@registration_repo.find(jid).then { |creds|
 505			if creds.length < 4
 506				# TODO: add text re credentials not registered
 507				EMPromise.reject(
 508					[:auth, 'registration-required']
 509				)
 510			else
 511				creds
 512			end
 513		}
 514	end
 515
 516	message :error? do |m|
 517		# TODO: report it somewhere/somehow - eat for now so no err loop
 518		puts "EATERROR1: #{m.inspect}"
 519		true
 520	end
 521
 522	message :body do |m|
 523		EMPromise.all([
 524			validate_num(m),
 525			fetch_catapult_cred_for(m.from)
 526		]).then { |(num_dest, creds)|
 527			@registration_repo.find_jid(num_dest).then { |jid|
 528				[jid, num_dest] + creds
 529			}
 530		}.then { |(jid, num_dest, *creds)|
 531			if jid
 532				@registration_repo.find(jid).then { |other_user|
 533					[jid, num_dest] + creds + [other_user.first]
 534				}
 535			else
 536				[jid, num_dest] + creds + [nil]
 537			end
 538		}.then { |(jid, num_dest, *creds, other_user)|
 539			# if destination user is in the system pass on directly
 540			if other_user and not other_user.start_with? 'u-'
 541				pass_on_message(m, creds.last, jid)
 542			else
 543				to_catapult_possible_oob(m, num_dest, *creds)
 544			end
 545		}.catch { |e|
 546			if e.is_a?(Array) && (e.length == 2 || e.length == 3)
 547				write_to_stream m.as_error(e[1], e[0], e[2])
 548			else
 549				EMPromise.reject(e)
 550			end
 551		}
 552	end
 553
 554	def self.user_cap_identities
 555		[{category: 'client', type: 'sms'}]
 556	end
 557
 558	# TODO: must re-add stuff so can do ad-hoc commands
 559	def self.user_cap_features
 560		["urn:xmpp:receipts"]
 561	end
 562
 563	def self.add_gateway_feature(feature)
 564		@gateway_features << feature
 565		@gateway_features.uniq!
 566	end
 567
 568	subscription :request? do |p|
 569		puts "PRESENCE1: #{p.inspect}"
 570
 571		# subscriptions are allowed from anyone - send reply immediately
 572		msg = Blather::Stanza::Presence.new
 573		msg.to = p.from
 574		msg.from = p.to
 575		msg.type = :subscribed
 576
 577		puts 'RESPONSE5a: ' + msg.inspect
 578		write_to_stream msg
 579
 580		# send a <presence> immediately; not automatically probed for it
 581		# TODO: refactor so no "presence :probe? do |p|" duplicate below
 582		caps = Blather::Stanza::Capabilities.new
 583		# TODO: user a better node URI (?)
 584		caps.node = 'http://catapult.sgx.soprani.ca/'
 585		caps.identities = user_cap_identities
 586		caps.features = user_cap_features
 587
 588		msg = caps.c
 589		msg.to = p.from
 590		msg.from = p.to.to_s + '/sgx'
 591
 592		puts 'RESPONSE5b: ' + msg.inspect
 593		write_to_stream msg
 594
 595		# need to subscribe back so Conversations displays images inline
 596		msg = Blather::Stanza::Presence.new
 597		msg.to = p.from.to_s.split('/', 2)[0]
 598		msg.from = p.to.to_s.split('/', 2)[0]
 599		msg.type = :subscribe
 600
 601		puts 'RESPONSE5c: ' + msg.inspect
 602		write_to_stream msg
 603	end
 604
 605	presence :probe? do |p|
 606		puts 'PRESENCE2: ' + p.inspect
 607
 608		caps = Blather::Stanza::Capabilities.new
 609		# TODO: user a better node URI (?)
 610		caps.node = 'http://catapult.sgx.soprani.ca/'
 611		caps.identities = user_cap_identities
 612		caps.features = user_cap_features
 613
 614		msg = caps.c
 615		msg.to = p.from
 616		msg.from = p.to.to_s + '/sgx'
 617
 618		puts 'RESPONSE6: ' + msg.inspect
 619		write_to_stream msg
 620	end
 621
 622	disco_items(
 623		to: Blather::JID.new(ARGV[0]),
 624		node: "http://jabber.org/protocol/commands"
 625	) do |i|
 626		fetch_catapult_cred_for(i.from).then { |creds|
 627			BandwidthTNOptions.tn_eligible_for_port_out_pin?(creds).then { |eligible|
 628				reply = i.reply
 629				reply.node = 'http://jabber.org/protocol/commands'
 630
 631				if eligible
 632					reply.items = [
 633						Blather::Stanza::DiscoItems::Item.new(
 634							i.to,
 635							'set-port-out-pin',
 636							'Set Port-Out PIN'
 637						)
 638					]
 639				else
 640					reply.items = []
 641				end
 642
 643				puts 'RESPONSE_CMD_DISCO: ' + reply.inspect
 644				write_to_stream reply
 645			}
 646		}.catch { |e|
 647			if e.is_a?(Array) && [2, 3].include?(e.length)
 648				write_to_stream i.as_error(e[1], e[0], e[2])
 649			else
 650				EMPromise.reject(e)
 651			end
 652		}
 653	end
 654
 655	iq '/iq/ns:query', ns:	'http://jabber.org/protocol/disco#info' do |i|
 656		# TODO: return error if i.type is :set - if it is :reply or
 657		#  :error it should be ignored (as the below does currently);
 658		#  review specification to see how to handle other type values
 659		if i.type != :get
 660			puts 'DISCO iq rcvd, of non-get type "' + i.type.to_s +
 661				'" for message "' + i.inspect + '"; ignoring...'
 662			next
 663		end
 664
 665		# respond to capabilities request for an sgx-bwmsgsv2 number JID
 666		if i.to.node
 667			# TODO: confirm the node URL is expected using below
 668			#puts "XR[node]: #{xpath_result[0]['node']}"
 669
 670			msg = i.reply
 671			msg.node = i.node
 672			msg.identities = user_cap_identities
 673			msg.features = user_cap_features
 674
 675			puts 'RESPONSE7: ' + msg.inspect
 676			write_to_stream msg
 677			next
 678		end
 679
 680		# respond to capabilities request for sgx-bwmsgsv2 itself
 681		msg = i.reply
 682		msg.node = i.node
 683		msg.identities = [{
 684			name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
 685			type: 'sms', category: 'gateway'
 686		}]
 687		msg.features = @gateway_features
 688		write_to_stream msg
 689	end
 690
 691	def self.check_then_register(i, *creds)
 692		@registration_repo
 693			.put(i.from, *creds)
 694			.catch_only(RegistrationRepo::Conflict) { |e|
 695				EMPromise.reject([:cancel, 'conflict', e.message])
 696			}.then {
 697				write_to_stream i.reply
 698			}
 699	end
 700
 701	def self.creds_from_registration_query(i)
 702		if i.query.find_first("./ns:x", ns: "jabber:x:data")
 703			[
 704				i.form.field("nick")&.value,
 705				i.form.field("username")&.value,
 706				i.form.field("password")&.value,
 707				i.form.field("phone")&.value
 708			]
 709		else
 710			[i.nick, i.username, i.password, i.phone]
 711		end
 712	end
 713
 714	def self.process_registration(i)
 715		EMPromise.resolve(nil).then {
 716			if i.remove?
 717				@registration_repo.delete(i.from).then do
 718					write_to_stream i.reply
 719					EMPromise.reject(:done)
 720				end
 721			else
 722				creds_from_registration_query(i)
 723			end
 724		}.then { |user_id, api_token, api_secret, phone_num|
 725			if phone_num && phone_num[0] == '+'
 726				[user_id, api_token, api_secret, phone_num]
 727			else
 728				# TODO: add text re number not (yet) supported
 729				EMPromise.reject([:cancel, 'item-not-found'])
 730			end
 731		}.then { |user_id, api_token, api_secret, phone_num|
 732			# TODO: find way to verify #{phone_num}, too
 733			call_catapult(
 734				api_token,
 735				api_secret,
 736				:get,
 737				"api/v2/users/#{user_id}/media"
 738			).then { |response|
 739				JSON.parse(response)
 740				# TODO: confirm response is array - could be empty
 741
 742				puts "register got str #{response.to_s[0..999]}"
 743
 744				check_then_register(
 745					i,
 746					user_id,
 747					api_token,
 748					api_secret,
 749					phone_num
 750				)
 751			}
 752		}.catch_only(BandwidthError) { |e|
 753			EMPromise.reject(case e.code
 754			when 401
 755				# TODO: add text re bad credentials
 756				[:auth, 'not-authorized']
 757			when 404
 758				# TODO: add text re number not found or disabled
 759				[:cancel, 'item-not-found']
 760			else
 761				[:modify, 'not-acceptable']
 762			end)
 763		}
 764	end
 765
 766	def self.registration_form(orig, existing_number=nil)
 767		orig.registered = !!existing_number
 768
 769		# TODO: update "User Id" x2 below (to "accountId"?), and others?
 770		orig.instructions = "Enter the information from your Account "\
 771			"page as well as the Phone Number\nin your "\
 772			"account you want to use (ie. '+12345678901')"\
 773			".\nUser Id is nick, API Token is username, "\
 774			"API Secret is password, Phone Number is phone"\
 775			".\n\nThe source code for this gateway is at "\
 776			"https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
 777			"\nCopyright (C) 2017-2020  Denver Gingerich "\
 778			"and others, licensed under AGPLv3+."
 779		orig.nick = ""
 780		orig.username = ""
 781		orig.password = ""
 782		orig.phone = existing_number.to_s
 783
 784		orig.form.fields = [
 785			{
 786				required: true, type: :"text-single",
 787				label: 'User Id', var: 'nick'
 788			},
 789			{
 790				required: true, type: :"text-single",
 791				label: 'API Token', var: 'username'
 792			},
 793			{
 794				required: true, type: :"text-private",
 795				label: 'API Secret', var: 'password'
 796			},
 797			{
 798				required: true, type: :"text-single",
 799				label: 'Phone Number', var: 'phone',
 800				value: existing_number.to_s
 801			}
 802		]
 803		orig.form.title = 'Register for '\
 804			'Soprani.ca Gateway to XMPP - Bandwidth API V2'
 805		orig.form.instructions = "Enter the details from your Account "\
 806			"page as well as the Phone Number\nin your "\
 807			"account you want to use (ie. '+12345678901')"\
 808			".\n\nThe source code for this gateway is at "\
 809			"https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
 810			"\nCopyright (C) 2017-2020  Denver Gingerich "\
 811			"and others, licensed under AGPLv3+."
 812
 813		orig
 814	end
 815
 816	ibr do |i|
 817		puts "IQ: #{i.inspect}"
 818
 819		case i.type
 820		when :set
 821			process_registration(i)
 822		when :get
 823			bare_jid = i.from.stripped
 824			@registration_repo.find(bare_jid).then { |creds|
 825				reply = registration_form(i.reply, creds.last)
 826				puts "RESPONSE2: #{reply.inspect}"
 827				write_to_stream reply
 828			}
 829		else
 830			# Unknown IQ, ignore for now
 831			EMPromise.reject(:done)
 832		end.catch { |e|
 833			if e.is_a?(Array) && (e.length == 2 || e.length == 3)
 834				write_to_stream i.as_error(e[1], e[0], e[2])
 835			elsif e != :done
 836				EMPromise.reject(e)
 837			end
 838		}.catch(&method(:panic))
 839	end
 840
 841	command :execute?, node: "set-port-out-pin", sessionid: nil do |iq|
 842		# Ensure user is registered, but discard their credentials
 843		# because we're just showing them a form.
 844		fetch_catapult_cred_for(iq.from).then { |_creds|
 845			reply = iq.reply
 846			reply.node = 'set-port-out-pin'
 847			reply.sessionid = SecureRandom.uuid
 848			reply.status = :executing
 849
 850			form = Blather::Stanza::X.find_or_create(reply.command)
 851			form.type = "form"
 852			form.fields = [
 853				{
 854					var: 'pin',
 855					type: 'text-private',
 856					label: 'Port-Out PIN',
 857					required: true
 858				},
 859				{
 860					var: 'confirm_pin',
 861					type: 'text-private',
 862					label: 'Confirm PIN',
 863					required: true
 864				}
 865			]
 866
 867			reply.command.add_child(form)
 868			reply.allowed_actions = [:complete]
 869
 870			puts "RESPONSE_CMD_FORM: #{reply.inspect}"
 871			write_to_stream reply
 872		}.catch { |e|
 873			if e.is_a?(Array) && [2, 3].include?(e.length)
 874				write_to_stream iq.as_error(e[1], e[0], e[2])
 875			else
 876				EMPromise.reject(e)
 877			end
 878		}.catch(&method(:panic))
 879	end
 880
 881	command :complete?, node: "set-port-out-pin", sessionid: /./ do |iq|
 882		pin = iq.form.field('pin')&.value
 883		confirm_pin = iq.form.field('confirm_pin')&.value
 884
 885		if pin.nil? || confirm_pin.nil?
 886			write_to_stream iq.as_error(
 887				'bad-request',
 888				:modify,
 889				'PIN fields are required'
 890			)
 891			next
 892		end
 893
 894		if pin != confirm_pin
 895			write_to_stream iq.as_error(
 896				'bad-request',
 897				:modify,
 898				'PIN confirmation does not match'
 899			)
 900			next
 901		end
 902
 903		if pin !~ /\A[a-zA-Z0-9]{4,10}\z/
 904			write_to_stream iq.as_error(
 905				'bad-request',
 906				:modify,
 907				'PIN must be 4-10 alphanumeric characters'
 908			)
 909			next
 910		end
 911
 912		fetch_catapult_cred_for(iq.from).then { |creds|
 913			BandwidthTNOptions.set_port_out_pin(creds, pin).then {
 914				reply = iq.reply
 915				reply.node = 'set-port-out-pin'
 916				reply.sessionid = iq.sessionid
 917				reply.status = :completed
 918				reply.note_type = :info
 919				reply.note_text = 'Port-out PIN has been set successfully.'
 920
 921				write_to_stream reply
 922			}.catch { |e|
 923				reply = iq.reply
 924				reply.node = 'set-port-out-pin'
 925				reply.sessionid = iq.sessionid
 926				reply.status = :completed
 927				reply.note_type = :error
 928				error_msg = if e.respond_to?(:message) && e.message.include?('not valid')
 929					"Invalid phone number format. "\
 930					"Please check your registered phone number."
 931				elsif e.respond_to?(:message) && e.message.include?('ErrorCode')
 932					"Bandwidth API error: #{e.message}"
 933				else
 934					"Failed to set port-out PIN. Please try again later."
 935				end
 936				reply.note_text = error_msg
 937
 938				write_to_stream reply
 939			}
 940		}.catch { |e|
 941			if e.is_a?(Array) && [2, 3].include?(e.length)
 942				write_to_stream iq.as_error(e[1], e[0], e[2])
 943			else
 944				EMPromise.reject(e)
 945			end
 946		}.catch(&method(:panic))
 947	end
 948
 949	iq type: [:get, :set] do |iq|
 950		write_to_stream(Blather::StanzaError.new(
 951			iq,
 952			'feature-not-implemented',
 953			:cancel
 954		))
 955	end
 956end
 957
 958class ReceiptMessage < Blather::Stanza
 959	def self.new(to=nil)
 960		node = super :message
 961		node.to = to
 962		node
 963	end
 964end
 965
 966class WebhookHandler < Goliath::API
 967	use Sentry::Rack::CaptureExceptions
 968	use Goliath::Rack::Params
 969
 970	def response(env)
 971		@registration_repo = RegistrationRepo.new
 972		# TODO: add timestamp grab here, and MUST include ./tai version
 973
 974		puts 'ENV: ' + env.reject { |k| k == 'params' }.to_s
 975
 976		if params.empty?
 977			puts 'PARAMS empty!'
 978			return [200, {}, "OK"]
 979		end
 980
 981		if env['REQUEST_URI'] != '/'
 982			puts 'BADREQUEST1: non-/ request "' +
 983				env['REQUEST_URI'] + '", method "' +
 984				env['REQUEST_METHOD'] + '"'
 985			return [200, {}, "OK"]
 986		end
 987
 988		if env['REQUEST_METHOD'] != 'POST'
 989			puts 'BADREQUEST2: non-POST request; URI: "' +
 990				env['REQUEST_URI'] + '", method "' +
 991				env['REQUEST_METHOD'] + '"'
 992			return [200, {}, "OK"]
 993		end
 994
 995		# TODO: process each message in list, not just first one
 996		jparams = params.dig('_json', 0, 'message')
 997		type = params.dig('_json', 0, 'type')
 998
 999		return [400, {}, "Missing params\n"] unless jparams && type
1000
1001		users_num, others_num =
1002		if type == 'message-failed' # NOTE: This implies direction == 'out'
1003			[jparams['from'], params['_json'][0]['to']]
1004		elsif jparams['direction'] == 'in'
1005			[jparams['owner'], jparams['from']]
1006		elsif jparams['direction'] == 'out'
1007			[jparams['from'], jparams['owner']] # NOTE: for outbound, 'from' == 'owner'
1008		else
1009			puts "big prob: '#{jparams['direction']}'"
1010			return [400, {}, "OK"]
1011		end
1012
1013		return [400, {}, "Missing params\n"] unless users_num && others_num
1014		return [400, {}, "Missing params\n"] unless jparams['to'].is_a?(Array)
1015		return [400, {}, "Missing params\n"] if jparams['to'].empty?
1016
1017		puts "BODY - messageId: #{jparams['id']}" \
1018			", eventType: #{type}" \
1019			", time: #{jparams['time']}" \
1020			", direction: #{jparams['direction']}" \
1021			", deliveryState: #{jparams['deliveryState'] || 'NONE'}" \
1022			", errorCode: #{jparams['errorCode'] || 'NONE'}" \
1023			", description: #{jparams['description'] || 'NONE'}" \
1024			", tag: #{jparams['tag'] || 'NONE'}" \
1025			", media: #{jparams['media'] || 'NONE'}"
1026
1027		if others_num[0] != '+'
1028			# TODO: check that others_num actually a shortcode first
1029			others_num +=
1030				';phone-context=ca-us.phone-context.soprani.ca'
1031		end
1032
1033		bare_jid = @registration_repo.find_jid(users_num).sync
1034
1035		if !bare_jid
1036			puts "jid_key for (#{users_num}) DNE; BW API misconfigured?"
1037
1038			return [403, {}, "Customer not found\n"]
1039		end
1040
1041		msg = nil
1042		case jparams['direction']
1043		when 'in'
1044			text = ''
1045			case type
1046			when 'message-received'
1047				# TODO: handle group chat, and fix above
1048				text = jparams['text']
1049
1050				if text.to_s.empty? && Array(jparams['media']).empty?
1051					return [400, {}, "Missing params\n"]
1052				end
1053
1054				if jparams['to'].length > 1
1055					msg = Blather::Stanza::Message.new(
1056						Blather::JID.new(bare_jid).domain
1057					)
1058					msg.body = text unless text&.empty?
1059
1060					addrs = Nokogiri::XML::Node.new(
1061						'addresses', msg.document)
1062					addrs['xmlns'] = 'http://jabber.org/' \
1063						'protocol/address'
1064
1065					addr1 = Nokogiri::XML::Node.new(
1066						'address', msg.document)
1067					addr1['type'] = 'to'
1068					addr1['jid'] = bare_jid
1069					addrs.add_child(addr1)
1070
1071					jparams['to'].reject(
1072						# Don't send to the same person twice,
1073						# and don't send to the person who sent it
1074						&[users_num, others_num].method(:include?)
1075					).each do |receiver|
1076						addrn = Nokogiri::XML::Node.new(
1077							'address', msg.document)
1078						addrn['type'] = 'to'
1079						addrn['uri'] = "sms:#{receiver}"
1080						addrn['delivered'] = 'true'
1081						addrs.add_child(addrn)
1082					end
1083
1084					msg.add_child(addrs)
1085
1086					# TODO: delete
1087					puts "RESPONSE9: #{msg.inspect}"
1088				end
1089
1090				media_urls = Array(jparams['media']).map { |media_url|
1091					unless media_url.end_with?(
1092						'.smil', '.txt', '.xml'
1093					)
1094						SGXbwmsgsv2.send_media(
1095							others_num + '@' +
1096							ARGV[0],
1097							bare_jid, media_url,
1098							nil, nil, msg
1099						)
1100						media_url
1101					end
1102				}.compact
1103
1104				if text&.empty? || (media_urls.any? && jparams['to'].length > 1)
1105					if !env['HTTP_X_JMP_RESEND_OF'].to_s.empty?
1106						MessageEvent::ResendIn.new(
1107							original_stream_id: env['HTTP_X_JMP_RESEND_OF'],
1108							original_bandwidth_id: jparams['id'],
1109							owner: jparams['owner']
1110						).emit(REDIS)
1111					else
1112						MessageEvent::In.new(
1113							timestamp: jparams['time'],
1114							from: jparams['from'],
1115							to: jparams['to'],
1116							owner: jparams['owner'],
1117							bandwidth_id: jparams['id'],
1118							body: jparams['text'].to_s,
1119							media_urls: media_urls
1120						).emit(REDIS)
1121					end
1122
1123					return [200, {}, "OK"]
1124				end
1125			else
1126				text = "unknown type (#{type})"\
1127					" with text: " + jparams['text']
1128
1129				# TODO: log/notify of this properly
1130				puts text
1131			end
1132
1133			# If text is not empty, but there isn't a msg,
1134			# we need to construct a msg to convey that text
1135			unless msg || text.to_s.empty?
1136				msg = Blather::Stanza::Message.new(
1137					bare_jid,
1138					# Strip control codes.
1139					# This only happened, or at least caused a problem, once,
1140					# but it seems sensible to say that we shouldn't be getting
1141					# control codes from the PSTN.
1142					text.gsub(/[\u0000-\u0008\u000b-\u001f]/, "")
1143				)
1144				msg.document.encoding = "utf-8"
1145				msg.chat_state = nil
1146			end
1147		else # per prior switch, this is:  jparams['direction'] == 'out'
1148			tag_parts = jparams['tag'].split(/ /, 2)
1149			id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1150			resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
1151
1152			# TODO: remove this hack
1153			if jparams['to'].length > 1
1154				case type
1155				when 'message-failed'
1156					MessageEvent::Failed.new(
1157						timestamp: jparams['time'],
1158						stanza_id: id,
1159						bandwidth_id: jparams['id'],
1160						error_code: jparams['errorCode'].to_s,
1161						error_description: jparams['description'].to_s
1162					).emit(REDIS)
1163				when 'message-delivered'
1164					MessageEvent::Delivered.new(
1165						timestamp: jparams['time'],
1166						stanza_id: id,
1167						bandwidth_id: jparams['id']
1168					).emit(REDIS)
1169				end
1170				puts "WARN! group no rcpt: #{users_num}"
1171				return [200, {}, "OK"]
1172			end
1173
1174			case type
1175			when 'message-failed'
1176				# create a bare message like the one user sent
1177				msg = Blather::Stanza::Message.new(
1178					others_num + '@' + ARGV[0])
1179				msg.from = bare_jid + '/' + resourcepart
1180				msg['id'] = id
1181
1182				# TODO: add 'errorCode' and/or 'description' val
1183				# create an error reply to the bare message
1184				msg = msg.as_error(
1185					'recipient-unavailable',
1186					:wait,
1187					params['_json'][0]['description']
1188				)
1189
1190			when 'message-delivered'
1191
1192				msg = ReceiptMessage.new(bare_jid)
1193
1194				# TODO: put in member/instance variable
1195				msg['id'] = SecureRandom.uuid
1196
1197				# TODO: send only when requested per XEP-0184
1198				rcvd = Nokogiri::XML::Node.new(
1199					'received',
1200					msg.document
1201				)
1202				rcvd['xmlns'] = 'urn:xmpp:receipts'
1203				rcvd['id'] = id
1204				msg.add_child(rcvd)
1205
1206				# TODO: make prettier: this should be done above
1207				others_num = params['_json'][0]['to']
1208			else
1209				# TODO: notify somehow of unknown state receivd?
1210				puts "message with id #{id} has "\
1211					"other type #{type}"
1212				return [200, {}, "OK"]
1213			end
1214
1215			puts "RESPONSE4: #{msg.inspect}"
1216		end
1217
1218		# if message-failed, we already set msg.from
1219		# moreover, we said `msg = msg.as_error`, and StanzaError
1220		msg.from = others_num + '@' + ARGV[0] if msg.respond_to?(:from=)
1221		SGXbwmsgsv2.write(msg)
1222
1223		# Emit event to messages stream
1224		case [jparams['direction'], type]
1225		when ['in', 'message-received']
1226			if !env['HTTP_X_JMP_RESEND_OF'].to_s.empty?
1227				MessageEvent::ResendIn.new(
1228					original_stream_id: env['HTTP_X_JMP_RESEND_OF'],
1229					original_bandwidth_id: jparams['id'],
1230					owner: jparams['owner']
1231				).emit(REDIS)
1232			else
1233				media_urls = Array(jparams['media']).reject { |u|
1234					u.end_with?('.smil', '.txt', '.xml')
1235				}
1236				MessageEvent::In.new(
1237					timestamp: jparams['time'],
1238					from: jparams['from'],
1239					to: jparams['to'],
1240					owner: jparams['owner'],
1241					bandwidth_id: jparams['id'],
1242					body: jparams['text'].to_s,
1243					media_urls: media_urls
1244				).emit(REDIS)
1245			end
1246		when ['out', 'message-failed']
1247			tag_parts = jparams['tag'].split(/ /, 2)
1248			stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1249			MessageEvent::Failed.new(
1250				timestamp: jparams['time'],
1251				stanza_id: stanza_id,
1252				bandwidth_id: jparams['id'],
1253				error_code: jparams['errorCode'].to_s,
1254				error_description: jparams['description'].to_s
1255			).emit(REDIS)
1256		when ['out', 'message-delivered']
1257			tag_parts = jparams['tag'].split(/ /, 2)
1258			stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1259			MessageEvent::Delivered.new(
1260				timestamp: jparams['time'],
1261				stanza_id: stanza_id,
1262				bandwidth_id: jparams['id']
1263			).emit(REDIS)
1264		end
1265
1266		[200, {}, "OK"]
1267	rescue Exception => e
1268		Sentry.capture_exception(e)
1269		puts 'Shutting down gateway due to exception 013: ' + e.message
1270		SGXbwmsgsv2.shutdown
1271		puts 'Gateway has terminated.'
1272		EM.stop unless ENV['ENV'] == 'test'
1273	end
1274end
1275
1276at_exit do
1277	$stdout.sync = true
1278
1279	puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\
1280		"==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
1281
1282	if ARGV.size != 7
1283		puts "Usage: sgx-bwmsgsv2.rb <component_jid> "\
1284			"<component_password> <server_hostname> "\
1285			"<server_port> <application_id> "\
1286			"<http_listen_port> <mms_proxy_prefix_url>"
1287		exit 0
1288	end
1289
1290	t = Time.now
1291	puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
1292
1293	EM.run do
1294		REDIS = EM::Hiredis.connect
1295
1296		SGXbwmsgsv2.run
1297
1298		# required when using Prosody otherwise disconnects on 6-hour inactivity
1299		EM.add_periodic_timer(3600) do
1300			msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1301			msg.from = ARGV[0]
1302			SGXbwmsgsv2.write(msg)
1303		end
1304
1305		server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
1306		server.api = WebhookHandler.new
1307		server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1308		server.logger = Log4r::Logger.new('goliath')
1309		server.logger.add(Log4r::StdoutOutputter.new('console'))
1310		server.logger.level = Log4r::INFO
1311		server.start do
1312			["INT", "TERM"].each do |sig|
1313				trap(sig) do
1314					EM.defer do
1315						puts 'Shutting down gateway...'
1316						SGXbwmsgsv2.shutdown
1317
1318						puts 'Gateway has terminated.'
1319						EM.stop
1320					end
1321				end
1322			end
1323		end
1324	end
1325end unless ENV['ENV'] == 'test'