sgx-bwmsgsv2.rb

   1#!/usr/bin/env ruby
   2# frozen_string_literal: true
   3
   4# Copyright (C) 2017-2020  Denver Gingerich <denver@ossguy.com>
   5# Copyright (C) 2017  Stephen Paul Weber <singpolyma@singpolyma.net>
   6#
   7# This file is part of sgx-bwmsgsv2.
   8#
   9# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
  10# the terms of the GNU Affero General Public License as published by the Free
  11# Software Foundation, either version 3 of the License, or (at your option) any
  12# later version.
  13#
  14# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
  15# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16# FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more
  17# details.
  18#
  19# You should have received a copy of the GNU Affero General Public License along
  20# with sgx-bwmsgsv2.  If not, see <http://www.gnu.org/licenses/>.
  21
  22require 'blather/client/dsl'
  23require 'em-hiredis'
  24require 'em-http-request'
  25require 'json'
  26require 'multibases'
  27require 'multihashes'
  28require 'securerandom'
  29require "sentry-ruby"
  30require 'time'
  31require 'uri'
  32require 'webrick'
  33
  34require 'goliath/api'
  35require 'goliath/server'
  36require 'log4r'
  37
  38require 'em_promise'
  39require 'em-synchrony'
  40
  41require_relative 'lib/bandwidth_error'
  42require_relative 'lib/bandwidth_tn_options'
  43require_relative 'lib/message_event'
  44require_relative 'lib/registration_repo'
  45
  46Sentry.init
  47
  48# List of supported MIME types from Bandwidth - https://support.bandwidth.com/hc/en-us/articles/360014128994-What-MMS-file-types-are-supported-
  49MMS_MIME_TYPES = [
  50	"application/json",
  51	"application/ogg",
  52	"application/pdf",
  53	"application/rtf",
  54	"application/zip",
  55	"application/x-tar",
  56	"application/xml",
  57	"application/gzip",
  58	"application/x-bzip2",
  59	"application/x-gzip",
  60	"application/smil",
  61	"application/javascript",
  62	"audio/mp4",
  63	"audio/mpeg",
  64	"audio/ogg",
  65	"audio/flac",
  66	"audio/webm",
  67	"audio/wav",
  68	"audio/amr",
  69	"audio/3gpp",
  70	"image/bmp",
  71	"image/gif",
  72	"image/jpeg",
  73	"image/pjpeg",
  74	"image/png",
  75	"image/svg+xml",
  76	"image/tiff",
  77	"image/webp",
  78	"image/x-icon",
  79	"text/css",
  80	"text/csv",
  81	"text/calendar",
  82	"text/plain",
  83	"text/javascript",
  84	"text/vcard",
  85	"text/vnd.wap.wml",
  86	"text/xml",
  87	"video/avi",
  88	"video/mp4",
  89	"video/mpeg",
  90	"video/ogg",
  91	"video/quicktime",
  92	"video/webm",
  93	"video/x-ms-wmv",
  94	"video/x-flv"
  95]
  96
  97def panic(e)
  98	Sentry.capture_exception(e)
  99	puts "Shutting down gateway due to exception: #{e.message}"
 100	puts e.backtrace
 101	SGXbwmsgsv2.shutdown
 102	puts 'Gateway has terminated.'
 103	EM.stop
 104end
 105
 106EM.error_handler(&method(:panic))
 107
 108def extract_shortcode(dest)
 109	num, context = dest.split(';', 2)
 110	num if context == 'phone-context=ca-us.phone-context.soprani.ca'
 111end
 112
 113def anonymous_tel?(dest)
 114	dest.split(';', 2)[1] == 'phone-context=anonymous.phone-context.soprani.ca'
 115end
 116
 117class SGXClient < Blather::Client
 118	def handle_data(stanza)
 119		promise = EMPromise.resolve(nil).then {
 120			with_sentry(stanza) do |scope|
 121				super
 122			rescue StandardError => e
 123				handle_error(scope, stanza, e)
 124			end
 125		}.catch { |e| panic(e) }
 126		promise.sync if ENV["ENV"] == "test"
 127		promise
 128	end
 129
 130	# Override the default call_handler to syncify during testing.
 131	def call_handler(handler, guards, stanza)
 132		result = if guards.first.respond_to?(:to_str)
 133			found = stanza.find(*guards)
 134			throw :pass if found.empty?
 135
 136			handler.call(stanza, found)
 137		else
 138			throw :pass if guarded?(guards, stanza)
 139
 140			handler.call(stanza)
 141		end
 142
 143		# Up to here, identical to upstream impl
 144
 145		return result unless result.is_a?(Promise)
 146
 147		result.sync if ENV["ENV"] == "test"
 148		result
 149	end
 150
 151protected
 152
 153	def with_sentry(stanza)
 154		Sentry.clone_hub_to_current_thread
 155
 156		Sentry.with_scope do |scope|
 157			setup_scope(stanza, scope)
 158			yield scope
 159		ensure
 160			scope.get_transaction&.then do |tx|
 161				tx.set_status("ok") unless tx.status
 162				tx.finish
 163			end
 164		end
 165	end
 166
 167	def setup_scope(stanza, scope)
 168		name = stanza.respond_to?(:node) ? stanza.node : stanza.name
 169		scope.clear_breadcrumbs
 170		scope.set_transaction_name(name)
 171		scope.set_user(jid: stanza.from&.stripped.to_s)
 172
 173		transaction = Sentry.start_transaction(
 174			name: name,
 175			op: "blather.handle_data"
 176		)
 177		scope.set_span(transaction) if transaction
 178	end
 179
 180	def handle_error(scope, stanza, e)
 181		puts "Error raised during #{scope.transaction_name}: #{e.class}"
 182		puts e.message
 183		puts e.backtrace
 184		Sentry.capture_exception(e) unless e.is_a?(Sentry::Error)
 185		scope.get_transaction&.set_status("internal_error")
 186		return if e.respond_to?(:replied?) && e.replied?
 187
 188		SGXbwmsgsv2.write_to_stream stanza.as_error("internal-server-error", :cancel)
 189	end
 190end
 191
 192# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
 193module CatapultSettingFlagBits
 194	VOICEMAIL_TRANSCRIPTION_DISABLED = 0
 195	MMS_ON_OOB_URL = 1
 196end
 197
 198module SGXbwmsgsv2
 199	extend Blather::DSL
 200
 201	@registration_repo = RegistrationRepo.new
 202	@client = SGXClient.new
 203	@gateway_features = [
 204		"http://jabber.org/protocol/disco#info",
 205		"http://jabber.org/protocol/address/",
 206		"jabber:iq:register",
 207		"http://jabber.org/protocol/commands"
 208	]
 209
 210	def self.run
 211		# TODO: read/save ARGV[7] creds to local variables
 212		client.run
 213	end
 214
 215	# so classes outside this module can write messages, too
 216	def self.write(stanza)
 217		client.write(stanza)
 218	end
 219
 220	def self.before_handler(type, *guards, &block)
 221		client.register_handler_before(type, *guards, &block)
 222	end
 223
 224	def self.send_media(from, to, media_url, desc=nil, subject=nil, m=nil)
 225		# we assume media_url is one of these (always the case so far):
 226		#  https://messaging.bandwidth.com/api/v2/users/[u]/media/[path]
 227
 228		puts 'ORIG_URL: ' + media_url
 229		usr = to
 230		if media_url.start_with?('https://messaging.bandwidth.com/api/v2/users/')
 231			pth = media_url.split('/', 9)[8]
 232			# the caller must guarantee that 'to' is a bare JID
 233			media_url = ARGV[6] + WEBrick::HTTPUtils.escape(usr) + '/' + pth
 234			puts 'PROX_URL: ' + media_url
 235		end
 236
 237		msg = m ? m.copy : Blather::Stanza::Message.new(to, "")
 238		msg.from = from
 239		msg.subject = subject if subject
 240
 241		# provide URL in XEP-0066 (OOB) fashion
 242		x = Nokogiri::XML::Node.new 'x', msg.document
 243		x['xmlns'] = 'jabber:x:oob'
 244
 245		urln = Nokogiri::XML::Node.new 'url', msg.document
 246		urlc = Nokogiri::XML::Text.new media_url, msg.document
 247		urln.add_child(urlc)
 248		x.add_child(urln)
 249
 250		if desc
 251			descn = Nokogiri::XML::Node.new('desc', msg.document)
 252			descc = Nokogiri::XML::Text.new(desc, msg.document)
 253			descn.add_child(descc)
 254			x.add_child(descn)
 255		end
 256
 257		msg.add_child(x)
 258
 259		write(msg)
 260	rescue Exception => e
 261		panic(e)
 262	end
 263
 264	setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, async: true
 265
 266	def self.pass_on_message(m, users_num, jid)
 267		# Capture destination before modifying m.to
 268		dest_num = m.to.node
 269
 270		# setup delivery receipt; similar to a reply
 271		rcpt = ReceiptMessage.new(m.from.stripped)
 272		rcpt.from = m.to
 273
 274		# pass original message (before sending receipt)
 275		m.to = jid
 276		m.from = "#{users_num}@#{ARGV[0]}"
 277
 278		puts 'XRESPONSE0: ' + m.inspect
 279		write_to_stream m
 280
 281		# Emit pass-through event. Thru events don't capture a timestamp because XMPP
 282		# stanzas don't carry timestamps for realtime messages, and the Redis stream
 283		# ID provides the emit time.
 284		oob_url = m.at("oob|x > oob|url", oob: "jabber:x:oob")&.text
 285		MessageEvent::Thru.new(
 286			owner: users_num,
 287			from: users_num,
 288			to: [dest_num],
 289			stanza_id: m.id.to_s,
 290			body: m.body.to_s,
 291			media_urls: [oob_url].compact
 292		).emit(REDIS)
 293
 294		# send a delivery receipt back to the sender
 295		# TODO: send only when requested per XEP-0184
 296		# TODO: pass receipts from target if supported
 297
 298		# TODO: put in member/instance variable
 299		rcpt['id'] = SecureRandom.uuid
 300		rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
 301		rcvd['xmlns'] = 'urn:xmpp:receipts'
 302		rcvd['id'] = m.id
 303		rcpt.add_child(rcvd)
 304
 305		puts 'XRESPONSE1: ' + rcpt.inspect
 306		write_to_stream rcpt
 307	end
 308
 309	def self.call_catapult(
 310		token, secret, m, pth, body=nil,
 311		head={}, code=[200], respond_with=:body
 312	)
 313		# pth looks like one of:
 314		#  "api/v2/users/#{user_id}/[endpoint_name]"
 315
 316		url_prefix = ''
 317
 318		# TODO: need to make a separate thing for voice.bw.c eventually
 319		if pth.start_with? 'api/v2/users'
 320			url_prefix = 'https://messaging.bandwidth.com/'
 321		end
 322
 323		EM::HttpRequest.new(
 324			url_prefix + pth
 325		).public_send(
 326			"a#{m}",
 327			head: {
 328				'Authorization' => [token, secret]
 329			}.merge(head),
 330			body: body
 331		).then { |http|
 332			puts "API response to send: #{http.response} with code"\
 333				" response.code #{http.response_header.status}"
 334
 335			if code.include?(http.response_header.status)
 336				case respond_with
 337				when :body
 338					http.response
 339				when :headers
 340					http.response_header
 341				else
 342					http
 343				end
 344			else
 345				EMPromise.reject(
 346					BandwidthError.for(http.response_header.status, http.response)
 347				)
 348			end
 349		}
 350	end
 351
 352	def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
 353		usern)
 354		un = s.at("oob|x > oob|url", oob: "jabber:x:oob")
 355		unless un
 356			puts "MMSOOB: no url node found so process as normal"
 357			return to_catapult(s, nil, num_dest, user_id, token,
 358				secret, usern)
 359		end
 360		puts "MMSOOB: found a url node - checking if to make MMS..."
 361
 362		body = s.respond_to?(:body) ? s.body : ''
 363		EM::HttpRequest.new(un.text, tls: { verify_peer: true }).ahead.then { |http|
 364			# If content is too large, or MIME type is not supported, place the link inside the body and do not send MMS.
 365			if http.response_header["CONTENT_LENGTH"].to_i > 3500000 ||
 366			   !MMS_MIME_TYPES.include?(http.response_header["CONTENT_TYPE"])
 367				unless body.include?(un.text)
 368					s.body = body.empty? ? un.text : "#{body}\n#{un.text}"
 369				end
 370				to_catapult(s, nil, num_dest, user_id, token, secret, usern)
 371			else # If size is less than ~3.5MB, strip the link from the body and attach media in the body.
 372				# some clients send URI in both body & <url/> so delete
 373				s.body = body.sub(/\s*#{Regexp.escape(un.text)}\s*$/, '')
 374
 375				puts "MMSOOB: url text is '#{un.text}'"
 376				puts "MMSOOB: the body is '#{body.to_s.strip}'"
 377
 378				puts "MMSOOB: sending MMS since found OOB & user asked"
 379				to_catapult(s, un.text, num_dest, user_id, token, secret, usern)
 380			end
 381		}
 382	end
 383
 384	def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
 385		body = s.respond_to?(:body) ? s.body.to_s : ''
 386		if murl.to_s.empty? && body.strip.empty?
 387			return EMPromise.reject(
 388				[:modify, 'policy-violation']
 389			)
 390		end
 391
 392		segment_size = body.ascii_only? ? 160 : 70
 393		if !murl && ENV["MMS_PATH"] && body.length > segment_size*3
 394			file = Multibases.pack(
 395				'base58btc',
 396				Multihashes.encode(Digest::SHA256.digest(body), "sha2-256")
 397			).to_s
 398			File.open("#{ENV['MMS_PATH']}/#{file}", "w") { |fh| fh.write body }
 399			murl = "#{ENV['MMS_URL']}/#{file}.txt"
 400			body = ""
 401		end
 402
 403		extra = {}
 404		extra[:media] = murl if murl
 405
 406		call_catapult(
 407			token,
 408			secret,
 409			:post,
 410			"api/v2/users/#{user_id}/messages",
 411			JSON.dump(extra.merge(
 412				from: usern,
 413				to:   num_dest,
 414				text: body,
 415				applicationId:  ARGV[4],
 416				tag:
 417					# callbacks need id and resourcepart
 418					WEBrick::HTTPUtils.escape(s.id.to_s) +
 419					' ' +
 420					WEBrick::HTTPUtils.escape(
 421						s.from.resource.to_s
 422					)
 423			)),
 424			{'Content-Type' => 'application/json'},
 425			[201]
 426		).then { |response|
 427			parsed = JSON.parse(response) rescue {}
 428			MessageEvent::Out.new(
 429				timestamp: parsed["time"] || Time.now,
 430				owner: usern,
 431				from: usern,
 432				to: Array(num_dest),
 433				stanza_id: s.id.to_s,
 434				bandwidth_id: parsed["id"],
 435				body: body,
 436				media_urls: [murl].compact
 437			).emit(REDIS)
 438			response
 439		}.catch { |e|
 440			EMPromise.reject(
 441				[:cancel, 'internal-server-error', e.message]
 442			)
 443		}
 444	end
 445
 446	def self.validate_num(m)
 447		# if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
 448		if m.to == ARGV[0]
 449			an = m.children.find { |v| v.element_name == "addresses" }
 450			if not an
 451				return EMPromise.reject(
 452					[:cancel, 'item-not-found']
 453				)
 454			end
 455			puts "ADRXEP: found an addresses node - iterate addrs.."
 456
 457			nums = []
 458			an.children.each do |e|
 459				num = ''
 460				type = ''
 461				e.attributes.each do |c|
 462					if c[0] == 'type'
 463						if c[1] != 'to'
 464							# TODO: error
 465						end
 466						type = c[1].to_s
 467					elsif c[0] == 'uri'
 468						if !c[1].to_s.start_with? 'sms:'
 469							# TODO: error
 470						end
 471						num = c[1].to_s[4..-1]
 472						# TODO: confirm num validates
 473						# TODO: else, error - unexpected name
 474					end
 475				end
 476				if num.empty? or type.empty?
 477					# TODO: error
 478				end
 479				nums << num
 480			end
 481			return nums
 482		end
 483
 484		# if not sent to SGX domain, then assume destination is in 'to'
 485		EMPromise.resolve(m.to.node.to_s).then { |num_dest|
 486			if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
 487				next num_dest if num_dest[0] == '+'
 488
 489				shortcode = extract_shortcode(num_dest)
 490				next shortcode if shortcode
 491			end
 492
 493			if anonymous_tel?(num_dest)
 494				EMPromise.reject([:cancel, 'gone'])
 495			else
 496				# TODO: text re num not (yet) supportd/implmentd
 497				EMPromise.reject([:cancel, 'item-not-found'])
 498			end
 499		}
 500	end
 501
 502	def self.fetch_catapult_cred_for(jid)
 503		@registration_repo.find(jid).then { |creds|
 504			if creds.length < 4
 505				# TODO: add text re credentials not registered
 506				EMPromise.reject(
 507					[:auth, 'registration-required']
 508				)
 509			else
 510				creds
 511			end
 512		}
 513	end
 514
 515	message :error? do |m|
 516		# TODO: report it somewhere/somehow - eat for now so no err loop
 517		puts "EATERROR1: #{m.inspect}"
 518	end
 519
 520	message :body do |m|
 521		EMPromise.all([
 522			validate_num(m),
 523			fetch_catapult_cred_for(m.from)
 524		]).then { |(num_dest, creds)|
 525			@registration_repo.find_jid(num_dest).then { |jid|
 526				[jid, num_dest] + creds
 527			}
 528		}.then { |(jid, num_dest, *creds)|
 529			if jid
 530				@registration_repo.find(jid).then { |other_user|
 531					[jid, num_dest] + creds + [other_user.first]
 532				}
 533			else
 534				[jid, num_dest] + creds + [nil]
 535			end
 536		}.then { |(jid, num_dest, *creds, other_user)|
 537			# if destination user is in the system pass on directly
 538			if other_user and not other_user.start_with? 'u-'
 539				pass_on_message(m, creds.last, jid)
 540			else
 541				to_catapult_possible_oob(m, num_dest, *creds)
 542			end
 543		}.catch { |e|
 544			if e.is_a?(Array) && (e.length == 2 || e.length == 3)
 545				write_to_stream m.as_error(e[1], e[0], e[2])
 546			else
 547				EMPromise.reject(e)
 548			end
 549		}
 550	end
 551
 552	def self.user_cap_identities
 553		[{category: 'client', type: 'sms'}]
 554	end
 555
 556	# TODO: must re-add stuff so can do ad-hoc commands
 557	def self.user_cap_features
 558		["urn:xmpp:receipts"]
 559	end
 560
 561	def self.add_gateway_feature(feature)
 562		@gateway_features << feature
 563		@gateway_features.uniq!
 564	end
 565
 566	subscription :request? do |p|
 567		puts "PRESENCE1: #{p.inspect}"
 568
 569		# subscriptions are allowed from anyone - send reply immediately
 570		msg = Blather::Stanza::Presence.new
 571		msg.to = p.from
 572		msg.from = p.to
 573		msg.type = :subscribed
 574
 575		puts 'RESPONSE5a: ' + msg.inspect
 576		write_to_stream msg
 577
 578		# send a <presence> immediately; not automatically probed for it
 579		# TODO: refactor so no "presence :probe? do |p|" duplicate below
 580		caps = Blather::Stanza::Capabilities.new
 581		# TODO: user a better node URI (?)
 582		caps.node = 'http://catapult.sgx.soprani.ca/'
 583		caps.identities = user_cap_identities
 584		caps.features = user_cap_features
 585
 586		msg = caps.c
 587		msg.to = p.from
 588		msg.from = p.to.to_s + '/sgx'
 589
 590		puts 'RESPONSE5b: ' + msg.inspect
 591		write_to_stream msg
 592
 593		# need to subscribe back so Conversations displays images inline
 594		msg = Blather::Stanza::Presence.new
 595		msg.to = p.from.to_s.split('/', 2)[0]
 596		msg.from = p.to.to_s.split('/', 2)[0]
 597		msg.type = :subscribe
 598
 599		puts 'RESPONSE5c: ' + msg.inspect
 600		write_to_stream msg
 601	end
 602
 603	presence :probe? do |p|
 604		puts 'PRESENCE2: ' + p.inspect
 605
 606		caps = Blather::Stanza::Capabilities.new
 607		# TODO: user a better node URI (?)
 608		caps.node = 'http://catapult.sgx.soprani.ca/'
 609		caps.identities = user_cap_identities
 610		caps.features = user_cap_features
 611
 612		msg = caps.c
 613		msg.to = p.from
 614		msg.from = p.to.to_s + '/sgx'
 615
 616		puts 'RESPONSE6: ' + msg.inspect
 617		write_to_stream msg
 618	end
 619
 620	disco_items(
 621		to: Blather::JID.new(ARGV[0]),
 622		node: "http://jabber.org/protocol/commands"
 623	) do |i|
 624		fetch_catapult_cred_for(i.from).then { |creds|
 625			BandwidthTNOptions.tn_eligible_for_port_out_pin?(creds).then { |eligible|
 626				reply = i.reply
 627				reply.node = 'http://jabber.org/protocol/commands'
 628
 629				if eligible
 630					reply.items = [
 631						Blather::Stanza::DiscoItems::Item.new(
 632							i.to,
 633							'set-port-out-pin',
 634							'Set Port-Out PIN'
 635						)
 636					]
 637				else
 638					reply.items = []
 639				end
 640
 641				puts 'RESPONSE_CMD_DISCO: ' + reply.inspect
 642				write_to_stream reply
 643			}
 644		}.catch { |e|
 645			if e.is_a?(Array) && [2, 3].include?(e.length)
 646				write_to_stream i.as_error(e[1], e[0], e[2])
 647			else
 648				EMPromise.reject(e)
 649			end
 650		}
 651	end
 652
 653	iq '/iq/ns:query', ns:	'http://jabber.org/protocol/disco#info' do |i|
 654		# TODO: return error if i.type is :set - if it is :reply or
 655		#  :error it should be ignored (as the below does currently);
 656		#  review specification to see how to handle other type values
 657		if i.type != :get
 658			puts 'DISCO iq rcvd, of non-get type "' + i.type.to_s +
 659				'" for message "' + i.inspect + '"; ignoring...'
 660			next
 661		end
 662
 663		# respond to capabilities request for an sgx-bwmsgsv2 number JID
 664		if i.to.node
 665			# TODO: confirm the node URL is expected using below
 666			#puts "XR[node]: #{xpath_result[0]['node']}"
 667
 668			msg = i.reply
 669			msg.node = i.node
 670			msg.identities = user_cap_identities
 671			msg.features = user_cap_features
 672
 673			puts 'RESPONSE7: ' + msg.inspect
 674			write_to_stream msg
 675			next
 676		end
 677
 678		# respond to capabilities request for sgx-bwmsgsv2 itself
 679		msg = i.reply
 680		msg.node = i.node
 681		msg.identities = [{
 682			name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
 683			type: 'sms', category: 'gateway'
 684		}]
 685		msg.features = @gateway_features
 686		write_to_stream msg
 687	end
 688
 689	def self.check_then_register(i, *creds)
 690		@registration_repo
 691			.put(i.from, *creds)
 692			.catch_only(RegistrationRepo::Conflict) { |e|
 693				EMPromise.reject([:cancel, 'conflict', e.message])
 694			}.then {
 695				write_to_stream i.reply
 696			}
 697	end
 698
 699	def self.creds_from_registration_query(i)
 700		if i.query.find_first("./ns:x", ns: "jabber:x:data")
 701			[
 702				i.form.field("nick")&.value,
 703				i.form.field("username")&.value,
 704				i.form.field("password")&.value,
 705				i.form.field("phone")&.value
 706			]
 707		else
 708			[i.nick, i.username, i.password, i.phone]
 709		end
 710	end
 711
 712	def self.process_registration(i)
 713		EMPromise.resolve(nil).then {
 714			if i.remove?
 715				@registration_repo.delete(i.from).then do
 716					write_to_stream i.reply
 717					EMPromise.reject(:done)
 718				end
 719			else
 720				creds_from_registration_query(i)
 721			end
 722		}.then { |user_id, api_token, api_secret, phone_num|
 723			if phone_num && phone_num[0] == '+'
 724				[user_id, api_token, api_secret, phone_num]
 725			else
 726				# TODO: add text re number not (yet) supported
 727				EMPromise.reject([:cancel, 'item-not-found'])
 728			end
 729		}.then { |user_id, api_token, api_secret, phone_num|
 730			# TODO: find way to verify #{phone_num}, too
 731			call_catapult(
 732				api_token,
 733				api_secret,
 734				:get,
 735				"api/v2/users/#{user_id}/media"
 736			).then { |response|
 737				JSON.parse(response)
 738				# TODO: confirm response is array - could be empty
 739
 740				puts "register got str #{response.to_s[0..999]}"
 741
 742				check_then_register(
 743					i,
 744					user_id,
 745					api_token,
 746					api_secret,
 747					phone_num
 748				)
 749			}
 750		}.catch_only(BandwidthError) { |e|
 751			EMPromise.reject(case e.code
 752			when 401
 753				# TODO: add text re bad credentials
 754				[:auth, 'not-authorized']
 755			when 404
 756				# TODO: add text re number not found or disabled
 757				[:cancel, 'item-not-found']
 758			else
 759				[:modify, 'not-acceptable']
 760			end)
 761		}
 762	end
 763
 764	def self.registration_form(orig, existing_number=nil)
 765		orig.registered = !!existing_number
 766
 767		# TODO: update "User Id" x2 below (to "accountId"?), and others?
 768		orig.instructions = "Enter the information from your Account "\
 769			"page as well as the Phone Number\nin your "\
 770			"account you want to use (ie. '+12345678901')"\
 771			".\nUser Id is nick, API Token is username, "\
 772			"API Secret is password, Phone Number is phone"\
 773			".\n\nThe source code for this gateway is at "\
 774			"https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
 775			"\nCopyright (C) 2017-2020  Denver Gingerich "\
 776			"and others, licensed under AGPLv3+."
 777		orig.nick = ""
 778		orig.username = ""
 779		orig.password = ""
 780		orig.phone = existing_number.to_s
 781
 782		orig.form.fields = [
 783			{
 784				required: true, type: :"text-single",
 785				label: 'User Id', var: 'nick'
 786			},
 787			{
 788				required: true, type: :"text-single",
 789				label: 'API Token', var: 'username'
 790			},
 791			{
 792				required: true, type: :"text-private",
 793				label: 'API Secret', var: 'password'
 794			},
 795			{
 796				required: true, type: :"text-single",
 797				label: 'Phone Number', var: 'phone',
 798				value: existing_number.to_s
 799			}
 800		]
 801		orig.form.title = 'Register for '\
 802			'Soprani.ca Gateway to XMPP - Bandwidth API V2'
 803		orig.form.instructions = "Enter the details from your Account "\
 804			"page as well as the Phone Number\nin your "\
 805			"account you want to use (ie. '+12345678901')"\
 806			".\n\nThe source code for this gateway is at "\
 807			"https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
 808			"\nCopyright (C) 2017-2020  Denver Gingerich "\
 809			"and others, licensed under AGPLv3+."
 810
 811		orig
 812	end
 813
 814	ibr do |i|
 815		puts "IQ: #{i.inspect}"
 816
 817		case i.type
 818		when :set
 819			process_registration(i)
 820		when :get
 821			bare_jid = i.from.stripped
 822			@registration_repo.find(bare_jid).then { |creds|
 823				reply = registration_form(i.reply, creds.last)
 824				puts "RESPONSE2: #{reply.inspect}"
 825				write_to_stream reply
 826			}
 827		else
 828			# Unknown IQ, ignore for now
 829			EMPromise.reject(:done)
 830		end.catch { |e|
 831			if e.is_a?(Array) && (e.length == 2 || e.length == 3)
 832				write_to_stream i.as_error(e[1], e[0], e[2])
 833			elsif e != :done
 834				EMPromise.reject(e)
 835			end
 836		}.catch(&method(:panic))
 837	end
 838
 839	command :execute?, node: "set-port-out-pin", sessionid: nil do |iq|
 840		# Ensure user is registered, but discard their credentials
 841		# because we're just showing them a form.
 842		fetch_catapult_cred_for(iq.from).then { |_creds|
 843			reply = iq.reply
 844			reply.node = 'set-port-out-pin'
 845			reply.sessionid = SecureRandom.uuid
 846			reply.status = :executing
 847
 848			form = Blather::Stanza::X.find_or_create(reply.command)
 849			form.type = "form"
 850			form.fields = [
 851				{
 852					var: 'pin',
 853					type: 'text-private',
 854					label: 'Port-Out PIN',
 855					required: true
 856				},
 857				{
 858					var: 'confirm_pin',
 859					type: 'text-private',
 860					label: 'Confirm PIN',
 861					required: true
 862				}
 863			]
 864
 865			reply.command.add_child(form)
 866			reply.allowed_actions = [:complete]
 867
 868			puts "RESPONSE_CMD_FORM: #{reply.inspect}"
 869			write_to_stream reply
 870		}.catch { |e|
 871			if e.is_a?(Array) && [2, 3].include?(e.length)
 872				write_to_stream iq.as_error(e[1], e[0], e[2])
 873			else
 874				EMPromise.reject(e)
 875			end
 876		}.catch(&method(:panic))
 877	end
 878
 879	command :complete?, node: "set-port-out-pin", sessionid: /./ do |iq|
 880		pin = iq.form.field('pin')&.value
 881		confirm_pin = iq.form.field('confirm_pin')&.value
 882
 883		if pin.nil? || confirm_pin.nil?
 884			write_to_stream iq.as_error(
 885				'bad-request',
 886				:modify,
 887				'PIN fields are required'
 888			)
 889			next
 890		end
 891
 892		if pin != confirm_pin
 893			write_to_stream iq.as_error(
 894				'bad-request',
 895				:modify,
 896				'PIN confirmation does not match'
 897			)
 898			next
 899		end
 900
 901		if pin !~ /\A[a-zA-Z0-9]{4,10}\z/
 902			write_to_stream iq.as_error(
 903				'bad-request',
 904				:modify,
 905				'PIN must be 4-10 alphanumeric characters'
 906			)
 907			next
 908		end
 909
 910		fetch_catapult_cred_for(iq.from).then { |creds|
 911			BandwidthTNOptions.set_port_out_pin(creds, pin).then {
 912				reply = iq.reply
 913				reply.node = 'set-port-out-pin'
 914				reply.sessionid = iq.sessionid
 915				reply.status = :completed
 916				reply.note_type = :info
 917				reply.note_text = 'Port-out PIN has been set successfully.'
 918
 919				write_to_stream reply
 920			}.catch { |e|
 921				reply = iq.reply
 922				reply.node = 'set-port-out-pin'
 923				reply.sessionid = iq.sessionid
 924				reply.status = :completed
 925				reply.note_type = :error
 926				error_msg = if e.respond_to?(:message) && e.message.include?('not valid')
 927					"Invalid phone number format. "\
 928					"Please check your registered phone number."
 929				elsif e.respond_to?(:message) && e.message.include?('ErrorCode')
 930					"Bandwidth API error: #{e.message}"
 931				else
 932					"Failed to set port-out PIN. Please try again later."
 933				end
 934				reply.note_text = error_msg
 935
 936				write_to_stream reply
 937			}
 938		}.catch { |e|
 939			if e.is_a?(Array) && [2, 3].include?(e.length)
 940				write_to_stream iq.as_error(e[1], e[0], e[2])
 941			else
 942				EMPromise.reject(e)
 943			end
 944		}.catch(&method(:panic))
 945	end
 946
 947	iq type: [:get, :set] do |iq|
 948		write_to_stream(Blather::StanzaError.new(
 949			iq,
 950			'feature-not-implemented',
 951			:cancel
 952		))
 953	end
 954end
 955
 956class ReceiptMessage < Blather::Stanza
 957	def self.new(to=nil)
 958		node = super :message
 959		node.to = to
 960		node
 961	end
 962end
 963
 964class WebhookHandler < Goliath::API
 965	use Sentry::Rack::CaptureExceptions
 966	use Goliath::Rack::Params
 967
 968	def response(env)
 969		@registration_repo = RegistrationRepo.new
 970		# TODO: add timestamp grab here, and MUST include ./tai version
 971
 972		puts 'ENV: ' + env.reject { |k| k == 'params' }.to_s
 973
 974		if params.empty?
 975			puts 'PARAMS empty!'
 976			return [200, {}, "OK"]
 977		end
 978
 979		if env['REQUEST_URI'] != '/'
 980			puts 'BADREQUEST1: non-/ request "' +
 981				env['REQUEST_URI'] + '", method "' +
 982				env['REQUEST_METHOD'] + '"'
 983			return [200, {}, "OK"]
 984		end
 985
 986		if env['REQUEST_METHOD'] != 'POST'
 987			puts 'BADREQUEST2: non-POST request; URI: "' +
 988				env['REQUEST_URI'] + '", method "' +
 989				env['REQUEST_METHOD'] + '"'
 990			return [200, {}, "OK"]
 991		end
 992
 993		# TODO: process each message in list, not just first one
 994		jparams = params.dig('_json', 0, 'message')
 995		type = params.dig('_json', 0, 'type')
 996
 997		return [400, {}, "Missing params\n"] unless jparams && type
 998
 999		users_num, others_num = if jparams['direction'] == 'in'
1000			[jparams['owner'], jparams['from']]
1001		elsif jparams['direction'] == 'out'
1002			[jparams['from'], jparams['owner']]
1003		else
1004			puts "big prob: '#{jparams['direction']}'"
1005			return [400, {}, "OK"]
1006		end
1007
1008		jparams['to'].reject! { |num|
1009			num == users_num || num == others_num
1010		}
1011
1012		return [400, {}, "Missing params\n"] unless users_num && others_num
1013		return [400, {}, "Missing params\n"] unless jparams['to'].is_a?(Array)
1014
1015		puts "BODY - messageId: #{jparams['id']}" \
1016			", eventType: #{type}" \
1017			", time: #{jparams['time']}" \
1018			", direction: #{jparams['direction']}" \
1019			", deliveryState: #{jparams['deliveryState'] || 'NONE'}" \
1020			", errorCode: #{jparams['errorCode'] || 'NONE'}" \
1021			", description: #{jparams['description'] || 'NONE'}" \
1022			", tag: #{jparams['tag'] || 'NONE'}" \
1023			", media: #{jparams['media'] || 'NONE'}"
1024
1025		if others_num[0] != '+'
1026			# TODO: check that others_num actually a shortcode first
1027			others_num +=
1028				';phone-context=ca-us.phone-context.soprani.ca'
1029		end
1030
1031		bare_jid = @registration_repo.find_jid(users_num).sync
1032
1033		if !bare_jid
1034			puts "jid_key for (#{users_num}) DNE; BW API misconfigured?"
1035
1036			return [403, {}, "Customer not found\n"]
1037		end
1038
1039		msg = nil
1040		case jparams['direction']
1041		when 'in'
1042			text = ''
1043			case type
1044			when 'sms'
1045				text = jparams['text']
1046			when 'mms'
1047				has_media = false
1048
1049				if jparams['text'].empty?
1050					if not has_media
1051						text = '[suspected group msg '\
1052							'with no text (odd)]'
1053					end
1054				else
1055					text = if has_media
1056						# TODO: write/use a caption XEP
1057						jparams['text']
1058					else
1059						'[suspected group msg '\
1060						'(recipient list not '\
1061						'available) with '\
1062						'following text] ' +
1063						jparams['text']
1064					end
1065				end
1066
1067				# ie. if text param non-empty or had no media
1068				if not text.empty?
1069					msg = Blather::Stanza::Message.new(
1070						bare_jid, text)
1071					msg.from = others_num + '@' + ARGV[0]
1072					SGXbwmsgsv2.write(msg)
1073				end
1074
1075				return [200, {}, "OK"]
1076			when 'message-received'
1077				# TODO: handle group chat, and fix above
1078				text = jparams['text']
1079
1080				if jparams['to'].length > 1
1081					msg = Blather::Stanza::Message.new(
1082						Blather::JID.new(bare_jid).domain,
1083						text
1084					)
1085
1086					addrs = Nokogiri::XML::Node.new(
1087						'addresses', msg.document)
1088					addrs['xmlns'] = 'http://jabber.org/' \
1089						'protocol/address'
1090
1091					addr1 = Nokogiri::XML::Node.new(
1092						'address', msg.document)
1093					addr1['type'] = 'to'
1094					addr1['jid'] = bare_jid
1095					addrs.add_child(addr1)
1096
1097					jparams['to'].each do |receiver|
1098						addrn = Nokogiri::XML::Node.new(
1099							'address', msg.document)
1100						addrn['type'] = 'to'
1101						addrn['uri'] = "sms:#{receiver}"
1102						addrn['delivered'] = 'true'
1103						addrs.add_child(addrn)
1104					end
1105
1106					msg.add_child(addrs)
1107
1108					# TODO: delete
1109					puts "RESPONSE9: #{msg.inspect}"
1110				end
1111
1112				Array(jparams['media']).each do |media_url|
1113					unless media_url.end_with?(
1114						'.smil', '.txt', '.xml'
1115					)
1116						has_media = true
1117						SGXbwmsgsv2.send_media(
1118							others_num + '@' +
1119							ARGV[0],
1120							bare_jid, media_url,
1121							nil, nil, msg
1122						)
1123					end
1124				end
1125			else
1126				text = "unknown type (#{type})"\
1127					" with text: " + jparams['text']
1128
1129				# TODO: log/notify of this properly
1130				puts text
1131			end
1132
1133			if not msg
1134				msg = Blather::Stanza::Message.new(bare_jid, text)
1135			end
1136		else # per prior switch, this is:  jparams['direction'] == 'out'
1137			tag_parts = jparams['tag'].split(/ /, 2)
1138			id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1139			resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
1140
1141			# TODO: remove this hack
1142			if jparams['to'].length > 1
1143				puts "WARN! group no rcpt: #{users_num}"
1144				return [200, {}, "OK"]
1145			end
1146
1147			case type
1148			when 'message-failed'
1149				# create a bare message like the one user sent
1150				msg = Blather::Stanza::Message.new(
1151					others_num + '@' + ARGV[0])
1152				msg.from = bare_jid + '/' + resourcepart
1153				msg['id'] = id
1154
1155				# TODO: add 'errorCode' and/or 'description' val
1156				# create an error reply to the bare message
1157				msg = msg.as_error(
1158					'recipient-unavailable',
1159					:wait,
1160					jparams['description']
1161				)
1162
1163				# TODO: make prettier: this should be done above
1164				others_num = params['_json'][0]['to']
1165			when 'message-delivered'
1166
1167				msg = ReceiptMessage.new(bare_jid)
1168
1169				# TODO: put in member/instance variable
1170				msg['id'] = SecureRandom.uuid
1171
1172				# TODO: send only when requested per XEP-0184
1173				rcvd = Nokogiri::XML::Node.new(
1174					'received',
1175					msg.document
1176				)
1177				rcvd['xmlns'] = 'urn:xmpp:receipts'
1178				rcvd['id'] = id
1179				msg.add_child(rcvd)
1180
1181				# TODO: make prettier: this should be done above
1182				others_num = params['_json'][0]['to']
1183			else
1184				# TODO: notify somehow of unknown state receivd?
1185				puts "message with id #{id} has "\
1186					"other type #{type}"
1187				return [200, {}, "OK"]
1188			end
1189
1190			puts "RESPONSE4: #{msg.inspect}"
1191		end
1192
1193		msg.from = others_num + '@' + ARGV[0]
1194		SGXbwmsgsv2.write(msg)
1195
1196		# Emit event to messages stream
1197		case [jparams['direction'], type]
1198		when ['in', 'message-received']
1199			media_urls = Array(jparams['media']).reject { |u|
1200				u.end_with?('.smil', '.txt', '.xml')
1201			}
1202			MessageEvent::In.new(
1203				timestamp: jparams['time'],
1204				from: jparams['from'],
1205				to: jparams['to'],
1206				owner: jparams['owner'],
1207				bandwidth_id: jparams['id'],
1208				body: jparams['text'].to_s,
1209				media_urls: media_urls
1210			).emit(REDIS)
1211		when ['out', 'message-failed']
1212			tag_parts = jparams['tag'].split(/ /, 2)
1213			stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1214			MessageEvent::Failed.new(
1215				timestamp: jparams['time'],
1216				stanza_id: stanza_id,
1217				bandwidth_id: jparams['id'],
1218				error_code: jparams['errorCode'].to_s,
1219				error_description: jparams['description'].to_s
1220			).emit(REDIS)
1221		when ['out', 'message-delivered']
1222			tag_parts = jparams['tag'].split(/ /, 2)
1223			stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
1224			MessageEvent::Delivered.new(
1225				timestamp: jparams['time'],
1226				stanza_id: stanza_id,
1227				bandwidth_id: jparams['id']
1228			).emit(REDIS)
1229		end
1230
1231		[200, {}, "OK"]
1232	rescue Exception => e
1233		Sentry.capture_exception(e)
1234		puts 'Shutting down gateway due to exception 013: ' + e.message
1235		SGXbwmsgsv2.shutdown
1236		puts 'Gateway has terminated.'
1237		EM.stop
1238	end
1239end
1240
1241at_exit do
1242	$stdout.sync = true
1243
1244	puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\
1245		"==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
1246
1247	if ARGV.size != 7
1248		puts "Usage: sgx-bwmsgsv2.rb <component_jid> "\
1249			"<component_password> <server_hostname> "\
1250			"<server_port> <application_id> "\
1251			"<http_listen_port> <mms_proxy_prefix_url>"
1252		exit 0
1253	end
1254
1255	t = Time.now
1256	puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
1257
1258	EM.run do
1259		REDIS = EM::Hiredis.connect
1260
1261		SGXbwmsgsv2.run
1262
1263		# required when using Prosody otherwise disconnects on 6-hour inactivity
1264		EM.add_periodic_timer(3600) do
1265			msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1266			msg.from = ARGV[0]
1267			SGXbwmsgsv2.write(msg)
1268		end
1269
1270		server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
1271		server.api = WebhookHandler.new
1272		server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1273		server.logger = Log4r::Logger.new('goliath')
1274		server.logger.add(Log4r::StdoutOutputter.new('console'))
1275		server.logger.level = Log4r::INFO
1276		server.start do
1277			["INT", "TERM"].each do |sig|
1278				trap(sig) do
1279					EM.defer do
1280						puts 'Shutting down gateway...'
1281						SGXbwmsgsv2.shutdown
1282
1283						puts 'Gateway has terminated.'
1284						EM.stop
1285					end
1286				end
1287			end
1288		end
1289	end
1290end unless ENV['ENV'] == 'test'