sgx-catapult.rb

   1#!/usr/bin/env ruby
   2#
   3# Copyright (C) 2017-2018  Denver Gingerich <denver@ossguy.com>
   4# Copyright (C) 2017  Stephen Paul Weber <singpolyma@singpolyma.net>
   5#
   6# This file is part of sgx-catapult.
   7#
   8# sgx-catapult is free software: you can redistribute it and/or modify it under
   9# the terms of the GNU Affero General Public License as published by the Free
  10# Software Foundation, either version 3 of the License, or (at your option) any
  11# later version.
  12#
  13# sgx-catapult is distributed in the hope that it will be useful, but WITHOUT
  14# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  15# FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more
  16# details.
  17#
  18# You should have received a copy of the GNU Affero General Public License along
  19# with sgx-catapult.  If not, see <http://www.gnu.org/licenses/>.
  20
  21require 'blather/client/dsl'
  22require 'em-hiredis'
  23require 'em-http-request'
  24require 'json'
  25require 'securerandom'
  26require 'time'
  27require 'uri'
  28require 'webrick'
  29
  30require 'goliath/api'
  31require 'goliath/server'
  32require 'log4r'
  33
  34require_relative 'em_promise'
  35
  36def panic(e)
  37	puts "Shutting down gateway due to exception: #{e.message}"
  38	puts e.backtrace
  39	SGXcatapult.shutdown
  40	puts 'Gateway has terminated.'
  41	EM.stop
  42end
  43
  44def extract_shortcode(dest)
  45	num, context = dest.split(';', 2)
  46	num if context && context == 'phone-context=ca-us.phone-context.soprani.ca'
  47end
  48
  49def is_anonymous_tel?(dest)
  50	num, context = dest.split(';', 2)
  51	context && context == 'phone-context=anonymous.phone-context.soprani.ca'
  52end
  53
  54class SGXClient < Blather::Client
  55	def register_handler(type, *guards, &block)
  56		super(type, *guards) { |*args| wrap_handler(*args, &block) }
  57	end
  58
  59	def register_handler_before(type, *guards, &block)
  60		check_handler(type, guards)
  61		handler = lambda { |*args| wrap_handler(*args, &block) }
  62
  63		@handlers[type] ||= []
  64		@handlers[type].unshift([guards, handler])
  65	end
  66
  67protected
  68
  69	def wrap_handler(*args, &block)
  70		v = block.call(*args)
  71		v.catch(&method(:panic)) if v.is_a?(Promise)
  72		true # Do not run other handlers unless throw :pass
  73	rescue Exception => e
  74		panic(e)
  75	end
  76end
  77
  78module SGXcatapult
  79	extend Blather::DSL
  80
  81	@jingle_sids = {}
  82	@jingle_fnames = {}
  83	@partial_data = {}
  84	@client = SGXClient.new
  85	@gateway_features = [
  86		"http://jabber.org/protocol/disco#info",
  87		"jabber:iq:register"
  88	]
  89
  90	def self.run
  91		client.run
  92	end
  93
  94	# so classes outside this module can write messages, too
  95	def self.write(stanza)
  96		client.write(stanza)
  97	end
  98
  99	def self.before_handler(type, *guards, &block)
 100		client.register_handler_before(type, *guards, &block)
 101	end
 102
 103	def self.send_media(from, to, media_url, desc=nil, subject=nil)
 104		# we assume media_url is of the form (always the case so far):
 105		#  https://api.catapult.inetwork.com/v1/users/[uid]/media/[file]
 106
 107		# the caller must guarantee that 'to' is a bare JID
 108		proxy_url = ARGV[6] + WEBrick::HTTPUtils.escape(to) + '/' +
 109			media_url.split('/', 8)[7]
 110
 111		puts 'ORIG_URL: ' + media_url
 112		puts 'PROX_URL: ' + proxy_url
 113
 114		# put URL in the body (so Conversations will still see it)...
 115		msg = Blather::Stanza::Message.new(to, proxy_url)
 116		msg.from = from
 117		msg.subject = subject if subject
 118
 119		# ...but also provide URL in XEP-0066 (OOB) fashion
 120		# TODO: confirm client supports OOB or don't send this
 121		x = Nokogiri::XML::Node.new 'x', msg.document
 122		x['xmlns'] = 'jabber:x:oob'
 123
 124		urln = Nokogiri::XML::Node.new 'url', msg.document
 125		urlc = Nokogiri::XML::Text.new proxy_url, msg.document
 126		urln.add_child(urlc)
 127		x.add_child(urln)
 128
 129		if desc
 130			descn = Nokogiri::XML::Node.new('desc', msg.document)
 131			descc = Nokogiri::XML::Text.new(desc, msg.document)
 132			descn.add_child(descc)
 133			x.add_child(descn)
 134		end
 135
 136		msg.add_child(x)
 137
 138		write(msg)
 139	rescue Exception => e
 140		panic(e)
 141	end
 142
 143	def self.error_msg(orig, query_node, type, name, text=nil)
 144		orig.type = :error
 145
 146		error = Nokogiri::XML::Node.new 'error', orig.document
 147		error['type'] = type
 148		orig.add_child(error)
 149
 150		suberr = Nokogiri::XML::Node.new name, orig.document
 151		suberr['xmlns'] = 'urn:ietf:params:xml:ns:xmpp-stanzas'
 152		error.add_child(suberr)
 153
 154		orig.add_child(query_node) if query_node
 155
 156		# TODO: add some explanatory xml:lang='en' text (see text param)
 157		puts "RESPONSE3: #{orig.inspect}"
 158		return orig
 159	end
 160
 161	# workqueue_count MUST be 0 or else Blather uses threads!
 162	setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, workqueue_count: 0
 163
 164	def self.pass_on_message(m, users_num, jid)
 165		# setup delivery receipt; similar to a reply
 166		rcpt = ReceiptMessage.new(m.from.stripped)
 167		rcpt.from = m.to
 168
 169		# pass original message (before sending receipt)
 170		m.to = jid
 171		m.from = "#{users_num}@#{ARGV[0]}"
 172
 173		puts 'XRESPONSE0: ' + m.inspect
 174		write_to_stream m
 175
 176		# send a delivery receipt back to the sender
 177		# TODO: send only when requested per XEP-0184
 178		# TODO: pass receipts from target if supported
 179
 180		# TODO: put in member/instance variable
 181		rcpt['id'] = SecureRandom.uuid
 182		rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
 183		rcvd['xmlns'] = 'urn:xmpp:receipts'
 184		rcvd['id'] = m.id
 185		rcpt.add_child(rcvd)
 186
 187		puts 'XRESPONSE1: ' + rcpt.inspect
 188		write_to_stream rcpt
 189	end
 190
 191	def self.call_catapult(
 192		token, secret, m, pth, body=nil,
 193		head={}, code=[200], respond_with=:body
 194	)
 195		EM::HttpRequest.new(
 196			"https://api.catapult.inetwork.com/#{pth}"
 197		).public_send(
 198			m,
 199			head: {
 200				'Authorization' => [token, secret]
 201			}.merge(head),
 202			body: body
 203		).then { |http|
 204			puts "API response to send: #{http.response} with code"\
 205				" response.code #{http.response_header.status}"
 206
 207			if code.include?(http.response_header.status)
 208				case respond_with
 209				when :body
 210					http.response
 211				when :headers
 212					http.response_header
 213				else
 214					http
 215				end
 216			else
 217				EMPromise.reject(http.response_header.status)
 218			end
 219		}
 220	end
 221
 222	def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
 223		body = s.respond_to?(:body) ? s.body : ''
 224		if murl.to_s.empty? && body.to_s.strip.empty?
 225			return EMPromise.reject(
 226				[:modify, 'policy-violation']
 227			)
 228		end
 229
 230		extra = if murl
 231			{
 232				media: murl
 233			}
 234		else
 235			{
 236				receiptRequested: 'all',
 237				callbackUrl:      ARGV[4]
 238			}
 239		end
 240
 241		call_catapult(
 242			token,
 243			secret,
 244			:post,
 245			"v1/users/#{user_id}/messages",
 246			JSON.dump(extra.merge(
 247				from: usern,
 248				to:   num_dest,
 249				text: body,
 250				tag:
 251					# callbacks need id and resourcepart
 252					WEBrick::HTTPUtils.escape(s.id.to_s) +
 253					' ' +
 254					WEBrick::HTTPUtils.escape(
 255						s.from.resource.to_s
 256					)
 257			)),
 258			{'Content-Type' => 'application/json'},
 259			[201]
 260		).catch {
 261			# TODO: add text; mention code number
 262			EMPromise.reject(
 263				[:cancel, 'internal-server-error']
 264			)
 265		}
 266
 267		t = Time.now
 268		tai_timestamp = `./tai`.strip
 269		tai_yyyymmdd = Time.at(tai_timestamp.to_i).strftime('%Y%m%d')
 270		puts "SMU %d.%09d, %s: msg for %s sent on %s - incrementing\n" %
 271			[t.to_i, t.nsec, tai_timestamp, usern, tai_yyyymmdd]
 272
 273		REDIS.incr('usage_messages-' + tai_yyyymmdd + '-' +
 274			usern).then { |total|
 275
 276			t = Time.now
 277			puts "SMU %d.%09d: total msgs for %s-%s now at %s\n" %
 278				[t.to_i, t.nsec, tai_yyyymmdd, usern, total]
 279		}
 280	end
 281
 282	def self.validate_num(num)
 283		EMPromise.resolve(num.to_s).then { |num_dest|
 284			if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
 285				next num_dest if num_dest[0] == '+'
 286				shortcode = extract_shortcode(num_dest)
 287				next shortcode if shortcode
 288			end
 289
 290			if is_anonymous_tel?(num_dest)
 291				EMPromise.reject([:cancel, 'gone'])
 292			else
 293				# TODO: text re num not (yet) supportd/implmentd
 294				EMPromise.reject([:cancel, 'item-not-found'])
 295			end
 296		}
 297	end
 298
 299	def self.fetch_catapult_cred_for(jid)
 300		cred_key = "catapult_cred-#{jid.stripped}"
 301		REDIS.lrange(cred_key, 0, 3).then { |creds|
 302			if creds.length < 4
 303				# TODO: add text re credentials not registered
 304				EMPromise.reject(
 305					[:auth, 'registration-required']
 306				)
 307			else
 308				creds
 309			end
 310		}
 311	end
 312
 313	message :error? do |m|
 314		# TODO: report it somewhere/somehow - eat for now so no err loop
 315		puts "EATERROR1: #{m.inspect}"
 316	end
 317
 318	message :body do |m|
 319		EMPromise.all([
 320			validate_num(m.to.node),
 321			fetch_catapult_cred_for(m.from)
 322		]).then { |(num_dest, creds)|
 323			jid_key = "catapult_jid-#{num_dest}"
 324			REDIS.get(jid_key).then { |jid|
 325				[jid, num_dest] + creds
 326			}
 327		}.then { |(jid, num_dest, *creds)|
 328			# if destination user is in the system pass on directly
 329			if jid
 330				pass_on_message(m, creds.last, jid)
 331			else
 332				to_catapult(m, nil, num_dest, *creds)
 333			end
 334		}.catch { |e|
 335			if e.is_a?(Array) && e.length == 2
 336				write_to_stream error_msg(m.reply, m.body, *e)
 337			else
 338				EMPromise.reject(e)
 339			end
 340		}
 341	end
 342
 343	def self.user_cap_identities
 344		[{category: 'client', type: 'sms'}]
 345	end
 346
 347	def self.user_cap_features
 348		[
 349			"urn:xmpp:receipts",
 350			"urn:xmpp:jingle:1", "urn:xmpp:jingle:transports:ibb:1",
 351
 352			# TODO: add more efficient file transfer mechanisms
 353			#"urn:xmpp:jingle:transports:s5b:1",
 354
 355			# TODO: MUST add all reasonable vers of file-transfer
 356			#"urn:xmpp:jingle:apps:file-transfer:4"
 357			"urn:xmpp:jingle:apps:file-transfer:3"
 358		]
 359	end
 360
 361	def self.add_gateway_feature(feature)
 362		@gateway_features << feature
 363		@gateway_features.uniq!
 364	end
 365
 366	subscription :request? do |p|
 367		puts "PRESENCE1: #{p.inspect}"
 368
 369		# subscriptions are allowed from anyone - send reply immediately
 370		msg = Blather::Stanza::Presence.new
 371		msg.to = p.from
 372		msg.from = p.to
 373		msg.type = :subscribed
 374
 375		puts 'RESPONSE5a: ' + msg.inspect
 376		write_to_stream msg
 377
 378		# send a <presence> immediately; not automatically probed for it
 379		# TODO: refactor so no "presence :probe? do |p|" duplicate below
 380		caps = Blather::Stanza::Capabilities.new
 381		# TODO: user a better node URI (?)
 382		caps.node = 'http://catapult.sgx.soprani.ca/'
 383		caps.identities = user_cap_identities
 384		caps.features = user_cap_features
 385
 386		msg = caps.c
 387		msg.to = p.from
 388		msg.from = p.to.to_s + '/sgx'
 389
 390		puts 'RESPONSE5b: ' + msg.inspect
 391		write_to_stream msg
 392
 393		# need to subscribe back so Conversations displays images inline
 394		msg = Blather::Stanza::Presence.new
 395		msg.to = p.from.to_s.split('/', 2)[0]
 396		msg.from = p.to.to_s.split('/', 2)[0]
 397		msg.type = :subscribe
 398
 399		puts 'RESPONSE5c: ' + msg.inspect
 400		write_to_stream msg
 401	end
 402
 403	presence :probe? do |p|
 404		puts 'PRESENCE2: ' + p.inspect
 405
 406		caps = Blather::Stanza::Capabilities.new
 407		# TODO: user a better node URI (?)
 408		caps.node = 'http://catapult.sgx.soprani.ca/'
 409		caps.identities = user_cap_identities
 410		caps.features = user_cap_features
 411
 412		msg = caps.c
 413		msg.to = p.from
 414		msg.from = p.to.to_s + '/sgx'
 415
 416		puts 'RESPONSE6: ' + msg.inspect
 417		write_to_stream msg
 418	end
 419
 420	iq '/iq/ns:jingle', ns: 'urn:xmpp:jingle:1' do |i, jn|
 421		puts "IQj: #{i.inspect}"
 422
 423		if jn[0]['action'] == 'transport-accept'
 424			puts "REPLY0: #{i.reply.inspect}"
 425			write_to_stream i.reply
 426			next
 427		elsif jn[0]['action'] == 'session-terminate'
 428			# TODO: unexpected (usually we do this; handle?)
 429			puts "TERMINATED"
 430			next
 431		elsif jn[0]['action'] == 'transport-info'
 432			# TODO: unexpected, but should handle in a nice way
 433			puts "FAIL!!!"
 434			next
 435		elsif i.type == :error
 436			# TODO: do something, maybe terminating the connection
 437			puts 'ERROR!!!'
 438			next
 439		end
 440
 441		# TODO: should probably confirm we got session-initiate here
 442
 443		write_to_stream i.reply
 444		puts "RESPONSE8: #{i.reply.inspect}"
 445
 446		msg = Blather::Stanza::Iq.new :set
 447		msg.to = i.from
 448		msg.from = i.to
 449
 450		cn = jn.children.find { |v| v.element_name == "content" }
 451		puts 'CN-name: ' + cn['name']
 452		puts 'JN-sid: ' + jn[0]['sid']
 453
 454		ibb_found = false
 455		last_sid = ''
 456		cn.children.each do |child|
 457			if child.element_name == 'transport'
 458				puts 'TPORT: ' + child.namespace.href
 459				last_sid = child['sid']
 460				if 'urn:xmpp:jingle:transports:ibb:1' ==
 461					child.namespace.href
 462
 463					ibb_found = true
 464					break
 465				end
 466			end
 467		end
 468
 469		j = Nokogiri::XML::Node.new 'jingle', msg.document
 470		j['xmlns'] = 'urn:xmpp:jingle:1'
 471		j['sid'] = jn[0]['sid']
 472		msg.add_child(j)
 473
 474		content = Nokogiri::XML::Node.new 'content', msg.document
 475		content['name'] = cn['name']
 476		content['creator'] = 'initiator'
 477		j.add_child(content)
 478
 479		transport = Nokogiri::XML::Node.new 'transport', msg.document
 480		# TODO: make block-size more variable and/or dependent on sender
 481		transport['block-size'] = '4096'
 482		transport['xmlns'] = 'urn:xmpp:jingle:transports:ibb:1'
 483		if ibb_found
 484			transport['sid'] = last_sid
 485			j['action'] = 'session-accept'
 486			j['responder'] = i.from
 487
 488			dsc = Nokogiri::XML::Node.new 'description', msg.document
 489			dsc['xmlns'] = 'urn:xmpp:jingle:apps:file-transfer:3'
 490			content.add_child(dsc)
 491		else
 492			# for Conversations - it tries s5b even if caps ibb-only
 493			transport['sid'] = SecureRandom.uuid
 494			j['action'] = 'transport-replace'
 495			j['initiator'] = i.from
 496		end
 497		content.add_child(transport)
 498
 499		@jingle_sids[transport['sid']] = jn[0]['sid']
 500
 501		# TODO: save <date> as well? Gajim sends, Conversations does not
 502		# TODO: save/validate <size> with eventual full received length
 503		fname =
 504			cn
 505			.children.find { |v| v.element_name == "description" }
 506			.children.find { |w| w.element_name == "offer" }
 507			.children.find { |x| x.element_name == "file" }
 508			.children.find { |y| y.element_name == "name" }
 509		@jingle_fnames[transport['sid']] = fname.text
 510
 511		puts "RESPONSE9: #{msg.inspect}"
 512		write_to_stream msg
 513	end
 514
 515	iq '/iq/ns:open', ns:	'http://jabber.org/protocol/ibb' do |i, on|
 516		puts "IQo: #{i.inspect}"
 517
 518		@partial_data[on[0]['sid']] = ''
 519		write_to_stream i.reply
 520	end
 521
 522	iq '/iq/ns:data', ns:	'http://jabber.org/protocol/ibb' do |i, dn|
 523		@partial_data[dn[0]['sid']] += Base64.decode64(dn[0].text)
 524		write_to_stream i.reply
 525	end
 526
 527	iq '/iq/ns:close', ns:	'http://jabber.org/protocol/ibb' do |i, cn|
 528		puts "IQc: #{i.inspect}"
 529		write_to_stream i.reply
 530
 531		EMPromise.all([
 532			validate_num(i.to.node),
 533			fetch_catapult_cred_for(i.from)
 534		]).then { |(num_dest, creds)|
 535			# Gajim bug: <close/> has Jingle (not transport) sid; fix later
 536			if not @jingle_fnames.key? cn[0]['sid']
 537				puts 'ERROR: Not found in filename map: ' + cn[0]['sid']
 538
 539				next EMPromise.reject(:done)
 540				# TODO: in case only Gajim's <data/> bug fixed, add map:
 541				#cn[0]['sid'] = @jingle_tsids[cn[0]['sid']]
 542			end
 543
 544			# upload cached data to server (before success reply)
 545			media_name =
 546				"#{Time.now.utc.iso8601}_#{SecureRandom.uuid}"\
 547				"_#{@jingle_fnames[cn[0]['sid']]}"
 548			puts 'name to save: ' + media_name
 549
 550			path = "/v1/users/#{creds.first}/media/#{media_name}"
 551
 552			EMPromise.all([
 553				call_catapult(
 554					*creds[1..2],
 555					:put,
 556					path,
 557					@partial_data[cn[0]['sid']]
 558				).catch {
 559					EMPromise.reject([
 560						:cancel, 'internal-server-error'
 561					])
 562				},
 563				to_catapult(
 564					i,
 565					"https://api.catapult.inetwork.com/" +
 566						path,
 567					num_dest,
 568					*creds
 569				)
 570			])
 571		}.then {
 572			@partial_data[cn[0]['sid']] = ''
 573
 574			# received the complete file so now close the stream
 575			msg = Blather::Stanza::Iq.new :set
 576			msg.to = i.from
 577			msg.from = i.to
 578
 579			j = Nokogiri::XML::Node.new 'jingle', msg.document
 580			j['xmlns'] = 'urn:xmpp:jingle:1'
 581			j['action'] = 'session-terminate'
 582			j['sid'] = @jingle_sids[cn[0]['sid']]
 583			msg.add_child(j)
 584
 585			r = Nokogiri::XML::Node.new 'reason', msg.document
 586			s = Nokogiri::XML::Node.new 'success', msg.document
 587			r.add_child(s)
 588			j.add_child(r)
 589
 590			puts 'RESPONSE1: ' + msg.inspect
 591			write_to_stream msg
 592		}.catch { |e|
 593			if e.is_a?(Array) && e.length == 2
 594				write_to_stream error_msg(i.reply, nil, *e)
 595			elsif e != :done
 596				EMPromise.reject(e)
 597			end
 598		}
 599	end
 600
 601	iq '/iq/ns:query', ns:	'http://jabber.org/protocol/disco#info' do |i|
 602		# respond to capabilities request for an sgx-catapult number JID
 603		if i.to.node
 604			# TODO: confirm the node URL is expected using below
 605			#puts "XR[node]: #{xpath_result[0]['node']}"
 606
 607			msg = i.reply
 608			msg.identities = user_cap_identities
 609			msg.features = user_cap_features
 610
 611			puts 'RESPONSE7: ' + msg.inspect
 612			write_to_stream msg
 613			next
 614		end
 615
 616		# respond to capabilities request for sgx-catapult itself
 617		msg = i.reply
 618		msg.identities = [{
 619			name: 'Soprani.ca Gateway to XMPP - Catapult',
 620			type: 'sms', category: 'gateway'
 621		}]
 622		msg.features = @gateway_features
 623		write_to_stream msg
 624	end
 625
 626	def self.check_then_register(i, *creds)
 627		jid_key = "catapult_jid-#{creds.last}"
 628		bare_jid = i.from.stripped
 629		cred_key = "catapult_cred-#{bare_jid}"
 630
 631		REDIS.get(jid_key).then { |existing_jid|
 632			if existing_jid && existing_jid != bare_jid
 633				# TODO: add/log text: credentials exist already
 634				EMPromise.reject([:cancel, 'conflict'])
 635			end
 636		}.then {
 637			REDIS.lrange(cred_key, 0, 3)
 638		}.then { |existing_creds|
 639			# TODO: add/log text: credentials exist already
 640			if existing_creds.length == 4 && creds != existing_creds
 641				EMPromise.reject([:cancel, 'conflict'])
 642			elsif existing_creds.length < 4
 643				REDIS.rpush(cred_key, *creds).then { |length|
 644					if length != 4
 645						EMPromise.reject([
 646							:cancel,
 647							'internal-server-error'
 648						])
 649					end
 650				}
 651			end
 652		}.then {
 653			# not necessary if existing_jid non-nil, easier this way
 654			REDIS.set(jid_key, bare_jid)
 655		}.then { |result|
 656			if result != 'OK'
 657				# TODO: add txt re push failure
 658				EMPromise.reject(
 659					[:cancel, 'internal-server-error']
 660				)
 661			end
 662		}.then {
 663			write_to_stream i.reply
 664		}
 665	end
 666
 667	def self.creds_from_registration_query(qn)
 668		xn = qn.children.find { |v| v.element_name == "x" }
 669
 670		if xn
 671			xn.children.each_with_object({}) do |field, h|
 672				next if field.element_name != "field"
 673				val = field.children.find { |v|
 674					v.element_name == "value"
 675				}
 676
 677				case field['var']
 678				when 'nick'
 679					h[:user_id] = val.text
 680				when 'username'
 681					h[:api_token] = val.text
 682				when 'password'
 683					h[:api_secret] = val.text
 684				when 'phone'
 685					h[:phone_num] = val.text
 686				else
 687					# TODO: error
 688					puts "?: #{field['var']}"
 689				end
 690			end
 691		else
 692			qn.children.each_with_object({}) do |field, h|
 693				case field.element_name
 694				when "nick"
 695					h[:user_id] = field.text
 696				when "username"
 697					h[:api_token] = field.text
 698				when "password"
 699					h[:api_secret] = field.text
 700				when "phone"
 701					h[:phone_num] = field.text
 702				end
 703			end
 704		end.values_at(:user_id, :api_token, :api_secret, :phone_num)
 705	end
 706
 707	def self.process_registration(i, qn)
 708		EMPromise.resolve(
 709			qn.children.find { |v| v.element_name == "remove" }
 710		).then { |rn|
 711			if rn
 712				puts "received <remove/> - ignoring for now..."
 713				EMPromise.reject(:done)
 714			else
 715				creds_from_registration_query(qn)
 716			end
 717		}.then { |user_id, api_token, api_secret, phone_num|
 718			if phone_num[0] == '+'
 719				[user_id, api_token, api_secret, phone_num]
 720			else
 721				# TODO: add text re number not (yet) supported
 722				EMPromise.reject([:cancel, 'item-not-found'])
 723			end
 724		}.then { |user_id, api_token, api_secret, phone_num|
 725			call_catapult(
 726				api_token,
 727				api_secret,
 728				:get,
 729				"v1/users/#{user_id}/phoneNumbers/#{phone_num}"
 730			).then { |response|
 731				params = JSON.parse(response)
 732				if params['numberState'] == 'enabled'
 733					check_then_register(
 734						i,
 735						user_id,
 736						api_token,
 737						api_secret,
 738						phone_num
 739					)
 740				else
 741					# TODO: add text re number disabled
 742					EMPromise.reject([:modify, 'not-acceptable'])
 743				end
 744			}
 745		}.catch { |e|
 746			EMPromise.reject(case e
 747			when 401
 748				# TODO: add text re bad credentials
 749				[:auth, 'not-authorized']
 750			when 404
 751				# TODO: add text re number not found or disabled
 752				[:cancel, 'item-not-found']
 753			when Integer
 754				[:modify, 'not-acceptable']
 755			else
 756				e
 757			end)
 758		}
 759	end
 760
 761	def self.registration_form(orig, existing_number=nil)
 762		msg = Nokogiri::XML::Node.new 'query', orig.document
 763		msg['xmlns'] = 'jabber:iq:register'
 764
 765		if existing_number
 766			msg.add_child(
 767				Nokogiri::XML::Node.new(
 768					'registered', msg.document
 769				)
 770			)
 771		end
 772
 773		n1 = Nokogiri::XML::Node.new(
 774			'instructions', msg.document
 775		)
 776		n1.content = "Enter the information from your Account "\
 777			"page as well as the Phone Number\nin your "\
 778			"account you want to use (ie. '+12345678901')"\
 779			".\nUser Id is nick, API Token is username, "\
 780			"API Secret is password, Phone Number is phone"\
 781			".\n\nThe source code for this gateway is at "\
 782			"https://gitlab.com/ossguy/sgx-catapult ."\
 783			"\nCopyright (C) 2017-2018  Denver Gingerich "\
 784			"and others, licensed under AGPLv3+."
 785		n2 = Nokogiri::XML::Node.new 'nick', msg.document
 786		n3 = Nokogiri::XML::Node.new 'username', msg.document
 787		n4 = Nokogiri::XML::Node.new 'password', msg.document
 788		n5 = Nokogiri::XML::Node.new 'phone', msg.document
 789		n5.content = existing_number.to_s
 790		msg.add_child(n1)
 791		msg.add_child(n2)
 792		msg.add_child(n3)
 793		msg.add_child(n4)
 794		msg.add_child(n5)
 795
 796		x = Blather::Stanza::X.new :form, [
 797			{
 798				required: true, type: :"text-single",
 799				label: 'User Id', var: 'nick'
 800			},
 801			{
 802				required: true, type: :"text-single",
 803				label: 'API Token', var: 'username'
 804			},
 805			{
 806				required: true, type: :"text-private",
 807				label: 'API Secret', var: 'password'
 808			},
 809			{
 810				required: true, type: :"text-single",
 811				label: 'Phone Number', var: 'phone',
 812				value: existing_number.to_s
 813			}
 814		]
 815		x.title = 'Register for '\
 816			'Soprani.ca Gateway to XMPP - Catapult'
 817		x.instructions = "Enter the details from your Account "\
 818			"page as well as the Phone Number\nin your "\
 819			"account you want to use (ie. '+12345678901')"\
 820			".\n\nThe source code for this gateway is at "\
 821			"https://gitlab.com/ossguy/sgx-catapult ."\
 822			"\nCopyright (C) 2017-2018  Denver Gingerich "\
 823			"and others, licensed under AGPLv3+."
 824		msg.add_child(x)
 825
 826		orig.add_child(msg)
 827
 828		return orig
 829	end
 830
 831	iq '/iq/ns:query', ns: 'jabber:iq:register' do |i, qn|
 832		puts "IQ: #{i.inspect}"
 833
 834		case i.type
 835		when :set
 836			process_registration(i, qn)
 837		when :get
 838			bare_jid = i.from.stripped
 839			cred_key = "catapult_cred-#{bare_jid}"
 840			REDIS.lindex(cred_key, 3).then { |existing_number|
 841				reply = registration_form(i.reply, existing_number)
 842				puts "RESPONSE2: #{reply.inspect}"
 843				write_to_stream reply
 844			}
 845		else
 846			# Unknown IQ, ignore for now
 847			EMPromise.reject(:done)
 848		end.catch { |e|
 849			if e.is_a?(Array) && e.length == 2
 850				write_to_stream error_msg(i.reply, qn, *e)
 851			elsif e != :done
 852				EMPromise.reject(e)
 853			end
 854		}.catch(&method(:panic))
 855	end
 856
 857	iq :get? do |i|
 858		write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
 859	end
 860
 861	iq :set? do |i|
 862		write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
 863	end
 864end
 865
 866class ReceiptMessage < Blather::Stanza
 867	def self.new(to=nil)
 868		node = super :message
 869		node.to = to
 870		node
 871	end
 872end
 873
 874class WebhookHandler < Goliath::API
 875	use Goliath::Rack::Params
 876
 877	def response(env)
 878		puts 'ENV: ' + env.reject{ |k| k == 'params' }.to_s
 879
 880		users_num = ''
 881		others_num = ''
 882		if params['direction'] == 'in'
 883			users_num = params['to']
 884			others_num = params['from']
 885		elsif params['direction'] == 'out'
 886			users_num = params['from']
 887			others_num = params['to']
 888		else
 889			# TODO: exception or similar
 890			puts "big problem: '" + params['direction'] + "'" + body
 891			return [200, {}, "OK"]
 892		end
 893
 894		puts 'BODY - messageId: ' + params['messageId'] +
 895			', eventType: ' + params['eventType'] +
 896			', time: ' + params['time'] +
 897			', direction: ' + params['direction'] +
 898			', state: ' + params['state'] +
 899			', deliveryState: ' + (params['deliveryState'] ?
 900				params['deliveryState'] : 'NONE') +
 901			', deliveryCode: ' + (params['deliveryCode'] ?
 902				params['deliveryCode'] : 'NONE') +
 903			', deliveryDesc: ' + (params['deliveryDescription'] ?
 904				params['deliveryDescription'] : 'NONE') +
 905			', tag: ' + (params['tag'] ? params['tag'] : 'NONE') +
 906			', media: ' + (params['media'] ? params['media'].to_s :
 907				'NONE')
 908
 909		if others_num[0] != '+'
 910			# TODO: check that others_num actually a shortcode first
 911			others_num +=
 912				';phone-context=ca-us.phone-context.soprani.ca'
 913		end
 914
 915		jid_key = "catapult_jid-#{users_num}"
 916		bare_jid = REDIS.get(jid_key).promise.sync
 917
 918		if !bare_jid
 919			puts "jid_key (#{jid_key}) DNE; Catapult misconfigured?"
 920
 921			# TODO: likely not appropriate; give error to Catapult?
 922			# TODO: add text re credentials not being registered
 923			#write_to_stream error_msg(m.reply, m.body, :auth,
 924			#	'registration-required')
 925			return [200, {}, "OK"]
 926		end
 927
 928		msg = ''
 929		case params['direction']
 930		when 'in'
 931			text = ''
 932			case params['eventType']
 933			when 'sms'
 934				text = params['text']
 935			when 'mms'
 936				has_media = false
 937				params['media'].each do |media_url|
 938					if not media_url.end_with?(
 939						'.smil', '.txt', '.xml'
 940					)
 941
 942						has_media = true
 943						SGXcatapult.send_media(
 944							others_num + '@' +
 945							ARGV[0],
 946							bare_jid, media_url
 947						)
 948					end
 949				end
 950
 951				if params['text'].empty?
 952					if not has_media
 953						text = '[suspected group msg '\
 954							'with no text (odd)]'
 955					end
 956				else
 957					text = if has_media
 958						# TODO: write/use a caption XEP
 959						params['text']
 960					else
 961						'[suspected group msg '\
 962						'(recipient list not '\
 963						'available) with '\
 964						'following text] ' +
 965						params['text']
 966					end
 967				end
 968
 969				# ie. if text param non-empty or had no media
 970				if not text.empty?
 971					msg = Blather::Stanza::Message.new(
 972						bare_jid, text)
 973					msg.from = others_num + '@' + ARGV[0]
 974					SGXcatapult.write(msg)
 975				end
 976
 977				return [200, {}, "OK"]
 978			else
 979				text = "unknown type (#{params['eventType']})"\
 980					" with text: " + params['text']
 981
 982				# TODO: log/notify of this properly
 983				puts text
 984			end
 985
 986			msg = Blather::Stanza::Message.new(bare_jid, text)
 987		else # per prior switch, this is:  params['direction'] == 'out'
 988			tag_parts = params['tag'].split(/ /, 2)
 989			id = WEBrick::HTTPUtils.unescape(tag_parts[0])
 990			resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
 991
 992			case params['deliveryState']
 993			when 'not-delivered'
 994				# create a bare message like the one user sent
 995				msg = Blather::Stanza::Message.new(
 996					others_num + '@' + ARGV[0])
 997				msg.from = bare_jid + '/' + resourcepart
 998				msg['id'] = id
 999
1000				# create an error reply to the bare message
1001				msg = Blather::StanzaError.new(
1002					msg,
1003					'recipient-unavailable',
1004					:wait
1005				).to_node
1006			when 'delivered'
1007				msg = ReceiptMessage.new(bare_jid)
1008
1009				# TODO: put in member/instance variable
1010				msg['id'] = SecureRandom.uuid
1011
1012				# TODO: send only when requested per XEP-0184
1013				rcvd = Nokogiri::XML::Node.new(
1014					'received',
1015					msg.document
1016				)
1017				rcvd['xmlns'] = 'urn:xmpp:receipts'
1018				rcvd['id'] = id
1019				msg.add_child(rcvd)
1020			when 'waiting'
1021				# can't really do anything with it; nice to know
1022				puts "message with id #{id} waiting"
1023				return [200, {}, "OK"]
1024			else
1025				# TODO: notify somehow of unknown state receivd?
1026				puts "message with id #{id} has "\
1027					"other state #{params['deliveryState']}"
1028				return [200, {}, "OK"]
1029			end
1030
1031			puts "RESPONSE4: #{msg.inspect}"
1032		end
1033
1034		msg.from = others_num + '@' + ARGV[0]
1035		SGXcatapult.write(msg)
1036
1037		[200, {}, "OK"]
1038
1039	rescue Exception => e
1040		puts 'Shutting down gateway due to exception 013: ' + e.message
1041		SGXcatapult.shutdown
1042		puts 'Gateway has terminated.'
1043		EM.stop
1044	end
1045end
1046
1047at_exit do
1048	$stdout.sync = true
1049
1050	puts "Soprani.ca/SMS Gateway for XMPP - Catapult\n"\
1051		"==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
1052
1053	if ARGV.size != 7
1054		puts "Usage: sgx-catapult.rb <component_jid> "\
1055			"<component_password> <server_hostname> "\
1056			"<server_port> <delivery_receipt_url> "\
1057			"<http_listen_port> <mms_proxy_prefix_url>"
1058		exit 0
1059	end
1060
1061	t = Time.now
1062	puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
1063
1064	EM.run do
1065		REDIS = EM::Hiredis.connect
1066
1067		SGXcatapult.run
1068
1069		# required when using Prosody otherwise disconnects on 6-hour inactivity
1070		EM.add_periodic_timer(3600) do
1071			msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1072			msg.from = ARGV[0]
1073			SGXcatapult.write(msg)
1074		end
1075
1076		server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
1077		server.api = WebhookHandler.new
1078		server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1079		server.logger = Log4r::Logger.new('goliath')
1080		server.logger.add(Log4r::StdoutOutputter.new('console'))
1081		server.logger.level = Log4r::INFO
1082		server.start do
1083			["INT", "TERM"].each do |sig|
1084				trap(sig) do
1085					EM.defer do
1086						puts 'Shutting down gateway...'
1087						SGXcatapult.shutdown
1088
1089						puts 'Gateway has terminated.'
1090						EM.stop
1091					end
1092				end
1093			end
1094		end
1095	end
1096end