sgx-bwmsgsv2.rb

  1#!/usr/bin/env ruby
  2#
  3# Copyright (C) 2017-2020  Denver Gingerich <denver@ossguy.com>
  4# Copyright (C) 2017  Stephen Paul Weber <singpolyma@singpolyma.net>
  5#
  6# This file is part of sgx-bwmsgsv2.
  7#
  8# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
  9# the terms of the GNU Affero General Public License as published by the Free
 10# Software Foundation, either version 3 of the License, or (at your option) any
 11# later version.
 12#
 13# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
 14# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 15# FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more
 16# details.
 17#
 18# You should have received a copy of the GNU Affero General Public License along
 19# with sgx-bwmsgsv2.  If not, see <http://www.gnu.org/licenses/>.
 20
 21require 'blather/client/dsl'
 22require 'em-hiredis'
 23require 'em-http-request'
 24require 'json'
 25require 'securerandom'
 26require 'time'
 27require 'uri'
 28require 'webrick'
 29
 30require 'goliath/api'
 31require 'goliath/server'
 32require 'log4r'
 33
 34require_relative 'em_promise'
 35
 36def panic(e)
 37	puts "Shutting down gateway due to exception: #{e.message}"
 38	puts e.backtrace
 39	SGXbwmsgsv2.shutdown
 40	puts 'Gateway has terminated.'
 41	EM.stop
 42end
 43
 44def extract_shortcode(dest)
 45	num, context = dest.split(';', 2)
 46	num if context && context == 'phone-context=ca-us.phone-context.soprani.ca'
 47end
 48
 49def is_anonymous_tel?(dest)
 50	num, context = dest.split(';', 2)
 51	context && context == 'phone-context=anonymous.phone-context.soprani.ca'
 52end
 53
 54class SGXClient < Blather::Client
 55	def register_handler(type, *guards, &block)
 56		super(type, *guards) { |*args| wrap_handler(*args, &block) }
 57	end
 58
 59	def register_handler_before(type, *guards, &block)
 60		check_handler(type, guards)
 61		handler = lambda { |*args| wrap_handler(*args, &block) }
 62
 63		@handlers[type] ||= []
 64		@handlers[type].unshift([guards, handler])
 65	end
 66
 67protected
 68
 69	def wrap_handler(*args, &block)
 70		v = block.call(*args)
 71		v.catch(&method(:panic)) if v.is_a?(Promise)
 72		true # Do not run other handlers unless throw :pass
 73	rescue Exception => e
 74		panic(e)
 75	end
 76end
 77
 78# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
 79module CatapultSettingFlagBits
 80	VOICEMAIL_TRANSCRIPTION_DISABLED = 0
 81	MMS_ON_OOB_URL = 1
 82end
 83
 84module SGXbwmsgsv2
 85	extend Blather::DSL
 86
 87	@client = SGXClient.new
 88	@gateway_features = [
 89		"http://jabber.org/protocol/disco#info",
 90		"jabber:iq:register"
 91	]
 92
 93	def self.run
 94		client.run
 95	end
 96
 97	# so classes outside this module can write messages, too
 98	def self.write(stanza)
 99		client.write(stanza)
100	end
101
102	def self.before_handler(type, *guards, &block)
103		client.register_handler_before(type, *guards, &block)
104	end
105
106	def self.send_media(from, to, media_url, desc=nil, subject=nil)
107		# we assume media_url is of the form (always the case so far):
108		#  https://api.catapult.inetwork.com/v1/users/[uid]/media/[file]
109
110		# the caller must guarantee that 'to' is a bare JID
111		proxy_url = ARGV[6] + WEBrick::HTTPUtils.escape(to) + '/' +
112			media_url.split('/', 8)[7]
113
114		puts 'ORIG_URL: ' + media_url
115		puts 'PROX_URL: ' + proxy_url
116
117		# put URL in the body (so Conversations will still see it)...
118		msg = Blather::Stanza::Message.new(to, proxy_url)
119		msg.from = from
120		msg.subject = subject if subject
121
122		# ...but also provide URL in XEP-0066 (OOB) fashion
123		# TODO: confirm client supports OOB or don't send this
124		x = Nokogiri::XML::Node.new 'x', msg.document
125		x['xmlns'] = 'jabber:x:oob'
126
127		urln = Nokogiri::XML::Node.new 'url', msg.document
128		urlc = Nokogiri::XML::Text.new proxy_url, msg.document
129		urln.add_child(urlc)
130		x.add_child(urln)
131
132		if desc
133			descn = Nokogiri::XML::Node.new('desc', msg.document)
134			descc = Nokogiri::XML::Text.new(desc, msg.document)
135			descn.add_child(descc)
136			x.add_child(descn)
137		end
138
139		msg.add_child(x)
140
141		write(msg)
142	rescue Exception => e
143		panic(e)
144	end
145
146	def self.error_msg(orig, query_node, type, name, text=nil)
147		orig.type = :error
148
149		error = Nokogiri::XML::Node.new 'error', orig.document
150		error['type'] = type
151		orig.add_child(error)
152
153		suberr = Nokogiri::XML::Node.new name, orig.document
154		suberr['xmlns'] = 'urn:ietf:params:xml:ns:xmpp-stanzas'
155		error.add_child(suberr)
156
157		orig.add_child(query_node) if query_node
158
159		# TODO: add some explanatory xml:lang='en' text (see text param)
160		puts "RESPONSE3: #{orig.inspect}"
161		return orig
162	end
163
164	# workqueue_count MUST be 0 or else Blather uses threads!
165	setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, workqueue_count: 0
166
167	def self.pass_on_message(m, users_num, jid)
168		# setup delivery receipt; similar to a reply
169		rcpt = ReceiptMessage.new(m.from.stripped)
170		rcpt.from = m.to
171
172		# pass original message (before sending receipt)
173		m.to = jid
174		m.from = "#{users_num}@#{ARGV[0]}"
175
176		puts 'XRESPONSE0: ' + m.inspect
177		write_to_stream m
178
179		# send a delivery receipt back to the sender
180		# TODO: send only when requested per XEP-0184
181		# TODO: pass receipts from target if supported
182
183		# TODO: put in member/instance variable
184		rcpt['id'] = SecureRandom.uuid
185		rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
186		rcvd['xmlns'] = 'urn:xmpp:receipts'
187		rcvd['id'] = m.id
188		rcpt.add_child(rcvd)
189
190		puts 'XRESPONSE1: ' + rcpt.inspect
191		write_to_stream rcpt
192	end
193
194	def self.call_catapult(
195		token, secret, m, pth, body=nil,
196		head={}, code=[200], respond_with=:body
197	)
198		# TODO: need to make a separate thing for voice.bw.c eventually
199		EM::HttpRequest.new(
200			"https://messaging.bandwidth.com/#{pth}"
201		).public_send(
202			m,
203			head: {
204				'Authorization' => [token, secret]
205			}.merge(head),
206			body: body
207		).then { |http|
208			puts "API response to send: #{http.response} with code"\
209				" response.code #{http.response_header.status}"
210
211			if code.include?(http.response_header.status)
212				case respond_with
213				when :body
214					http.response
215				when :headers
216					http.response_header
217				else
218					http
219				end
220			else
221				EMPromise.reject(http.response_header.status)
222			end
223		}
224	end
225
226	def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
227		usern)
228
229		xn = s.children.find { |v| v.element_name == "x" }
230		if not xn
231			to_catapult(s, nil, num_dest, user_id, token, secret,
232				usern)
233			return
234		end
235		puts "MMSOOB: found an x node - checking for url node..."
236
237		# TODO: also check for xmlns='jabber:x:oob' in <x/> - the below
238		#  is probably fine, though, as non-OOB <x><url/></x> unlikely
239
240		un = xn.children.find { |v| v.element_name == "url" }
241		if not un
242			puts "MMSOOB: no url node found so process as normal"
243			to_catapult(s, nil, num_dest, user_id, token, secret,
244				usern)
245			return
246		end
247		puts "MMSOOB: found a url node - checking if to make MMS..."
248
249		REDIS.getbit("catapult_setting_flags-#{s.from.stripped}",
250			CatapultSettingFlagBits::MMS_ON_OOB_URL).then { |oob_on|
251
252			puts "MMSOOB: found MMS_ON_OOB_URL value is '#{oob_on}'"
253			if 0 == oob_on
254				puts "MMSOOB: MMS_ON_OOB_URL off so no MMS send"
255				to_catapult(s, nil, num_dest, user_id, token,
256					secret, usern)
257				next
258			end
259
260			# TODO: check size of file at un.text and shrink if need
261
262			body = s.respond_to?(:body) ? s.body : ''
263			puts "MMSOOB: url text is '#{un.text}'"
264			puts "MMSOOB: the body is '#{body.to_s.strip}'"
265
266			# some clients send URI in both body & <url/> so delete
267			if un.text == body.to_s.strip
268				puts "MMSOOB: url matches body so deleting body"
269				s.body = ''
270			end
271
272			puts "MMSOOB: sending MMS since found OOB & user asked"
273			to_catapult(s, un.text, num_dest, user_id, token,
274				secret, usern)
275		}
276	end
277
278	def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
279		body = s.respond_to?(:body) ? s.body : ''
280		if murl.to_s.empty? && body.to_s.strip.empty?
281			return EMPromise.reject(
282				[:modify, 'policy-violation']
283			)
284		end
285
286		extra = {}
287		if murl
288			extra = { media: murl }
289		end
290
291		call_catapult(
292			token,
293			secret,
294			:post,
295			"api/v2/users/#{user_id}/messages",
296			JSON.dump(extra.merge(
297				from: usern,
298				to:   num_dest,
299				text: body,
300				applicationId:  ARGV[4],
301				tag:
302					# callbacks need id and resourcepart
303					WEBrick::HTTPUtils.escape(s.id.to_s) +
304					' ' +
305					WEBrick::HTTPUtils.escape(
306						s.from.resource.to_s
307					)
308			)),
309			{'Content-Type' => 'application/json'},
310			[201]
311		).catch {
312			# TODO: add text; mention code number
313			EMPromise.reject(
314				[:cancel, 'internal-server-error']
315			)
316		}
317
318		t = Time.now
319		tai_timestamp = `./tai`.strip
320		tai_yyyymmdd = Time.at(tai_timestamp.to_i).strftime('%Y%m%d')
321		puts "SMU %d.%09d, %s: msg for %s sent on %s - incrementing\n" %
322			[t.to_i, t.nsec, tai_timestamp, usern, tai_yyyymmdd]
323
324		REDIS.incr('usage_messages-' + tai_yyyymmdd + '-' +
325			usern).then { |total|
326
327			t = Time.now
328			puts "SMU %d.%09d: total msgs for %s-%s now at %s\n" %
329				[t.to_i, t.nsec, tai_yyyymmdd, usern, total]
330		}
331	end
332
333	def self.validate_num(num)
334		EMPromise.resolve(num.to_s).then { |num_dest|
335			if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
336				next num_dest if num_dest[0] == '+'
337				shortcode = extract_shortcode(num_dest)
338				next shortcode if shortcode
339			end
340
341			if is_anonymous_tel?(num_dest)
342				EMPromise.reject([:cancel, 'gone'])
343			else
344				numbers = num_dest.split(',')
345				if numbers.length > 1
346					# TODO: validate each element of numbers
347
348					if numbers.sort != numbers
349						# TODO: send regular msg re err
350
351						# TODO: sorted JID in gone body
352						next EMPromise.reject([:cancel,
353							'gone'])
354					end
355
356					next numbers
357				end
358
359				# TODO: text re num not (yet) supportd/implmentd
360				EMPromise.reject([:cancel, 'item-not-found'])
361			end
362		}
363	end
364
365	def self.fetch_catapult_cred_for(jid)
366		cred_key = "catapult_cred-#{jid.stripped}"
367		REDIS.lrange(cred_key, 0, 3).then { |creds|
368			if creds.length < 4
369				# TODO: add text re credentials not registered
370				EMPromise.reject(
371					[:auth, 'registration-required']
372				)
373			else
374				creds
375			end
376		}
377	end
378
379	message :error? do |m|
380		# TODO: report it somewhere/somehow - eat for now so no err loop
381		puts "EATERROR1: #{m.inspect}"
382	end
383
384	message :body do |m|
385		EMPromise.all([
386			validate_num(m.to.node),
387			fetch_catapult_cred_for(m.from)
388		]).then { |(num_dest, creds)|
389			jid_key = "catapult_jid-#{num_dest}"
390			REDIS.get(jid_key).then { |jid|
391				[jid, num_dest] + creds
392			}
393		}.then { |(jid, num_dest, *creds)|
394			# if destination user is in the system pass on directly
395			if jid
396				pass_on_message(m, creds.last, jid)
397			else
398				to_catapult_possible_oob(m, num_dest, *creds)
399			end
400		}.catch { |e|
401			if e.is_a?(Array) && e.length == 2
402				write_to_stream error_msg(m.reply, m.body, *e)
403			else
404				EMPromise.reject(e)
405			end
406		}
407	end
408
409	def self.user_cap_identities
410		[{category: 'client', type: 'sms'}]
411	end
412
413	def self.user_cap_features
414		[
415			"urn:xmpp:receipts"
416		]
417	end
418
419	def self.add_gateway_feature(feature)
420		@gateway_features << feature
421		@gateway_features.uniq!
422	end
423
424	subscription :request? do |p|
425		puts "PRESENCE1: #{p.inspect}"
426
427		# subscriptions are allowed from anyone - send reply immediately
428		msg = Blather::Stanza::Presence.new
429		msg.to = p.from
430		msg.from = p.to
431		msg.type = :subscribed
432
433		puts 'RESPONSE5a: ' + msg.inspect
434		write_to_stream msg
435
436		# send a <presence> immediately; not automatically probed for it
437		# TODO: refactor so no "presence :probe? do |p|" duplicate below
438		caps = Blather::Stanza::Capabilities.new
439		# TODO: user a better node URI (?)
440		caps.node = 'http://catapult.sgx.soprani.ca/'
441		caps.identities = user_cap_identities
442		caps.features = user_cap_features
443
444		msg = caps.c
445		msg.to = p.from
446		msg.from = p.to.to_s + '/sgx'
447
448		puts 'RESPONSE5b: ' + msg.inspect
449		write_to_stream msg
450
451		# need to subscribe back so Conversations displays images inline
452		msg = Blather::Stanza::Presence.new
453		msg.to = p.from.to_s.split('/', 2)[0]
454		msg.from = p.to.to_s.split('/', 2)[0]
455		msg.type = :subscribe
456
457		puts 'RESPONSE5c: ' + msg.inspect
458		write_to_stream msg
459	end
460
461	presence :probe? do |p|
462		puts 'PRESENCE2: ' + p.inspect
463
464		caps = Blather::Stanza::Capabilities.new
465		# TODO: user a better node URI (?)
466		caps.node = 'http://catapult.sgx.soprani.ca/'
467		caps.identities = user_cap_identities
468		caps.features = user_cap_features
469
470		msg = caps.c
471		msg.to = p.from
472		msg.from = p.to.to_s + '/sgx'
473
474		puts 'RESPONSE6: ' + msg.inspect
475		write_to_stream msg
476	end
477
478	iq '/iq/ns:query', ns:	'http://jabber.org/protocol/disco#info' do |i|
479		# respond to capabilities request for an sgx-bwmsgsv2 number JID
480		if i.to.node
481			# TODO: confirm the node URL is expected using below
482			#puts "XR[node]: #{xpath_result[0]['node']}"
483
484			msg = i.reply
485			msg.node = i.node
486			msg.identities = user_cap_identities
487			msg.features = user_cap_features
488
489			puts 'RESPONSE7: ' + msg.inspect
490			write_to_stream msg
491			next
492		end
493
494		# respond to capabilities request for sgx-bwmsgsv2 itself
495		msg = i.reply
496		msg.node = i.node
497		msg.identities = [{
498			name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
499			type: 'sms', category: 'gateway'
500		}]
501		msg.features = @gateway_features
502		write_to_stream msg
503	end
504
505	def self.check_then_register(i, *creds)
506		jid_key = "catapult_jid-#{creds.last}"
507		bare_jid = i.from.stripped
508		cred_key = "catapult_cred-#{bare_jid}"
509
510		REDIS.get(jid_key).then { |existing_jid|
511			if existing_jid && existing_jid != bare_jid
512				# TODO: add/log text: credentials exist already
513				EMPromise.reject([:cancel, 'conflict'])
514			end
515		}.then {
516			REDIS.lrange(cred_key, 0, 3)
517		}.then { |existing_creds|
518			# TODO: add/log text: credentials exist already
519			if existing_creds.length == 4 && creds != existing_creds
520				EMPromise.reject([:cancel, 'conflict'])
521			elsif existing_creds.length < 4
522				REDIS.rpush(cred_key, *creds).then { |length|
523					if length != 4
524						EMPromise.reject([
525							:cancel,
526							'internal-server-error'
527						])
528					end
529				}
530			end
531		}.then {
532			# not necessary if existing_jid non-nil, easier this way
533			REDIS.set(jid_key, bare_jid)
534		}.then { |result|
535			if result != 'OK'
536				# TODO: add txt re push failure
537				EMPromise.reject(
538					[:cancel, 'internal-server-error']
539				)
540			end
541		}.then {
542			write_to_stream i.reply
543		}
544	end
545
546	def self.creds_from_registration_query(qn)
547		xn = qn.children.find { |v| v.element_name == "x" }
548
549		if xn
550			xn.children.each_with_object({}) do |field, h|
551				next if field.element_name != "field"
552				val = field.children.find { |v|
553					v.element_name == "value"
554				}
555
556				case field['var']
557				when 'nick'
558					h[:user_id] = val.text
559				when 'username'
560					h[:api_token] = val.text
561				when 'password'
562					h[:api_secret] = val.text
563				when 'phone'
564					h[:phone_num] = val.text
565				else
566					# TODO: error
567					puts "?: #{field['var']}"
568				end
569			end
570		else
571			qn.children.each_with_object({}) do |field, h|
572				case field.element_name
573				when "nick"
574					h[:user_id] = field.text
575				when "username"
576					h[:api_token] = field.text
577				when "password"
578					h[:api_secret] = field.text
579				when "phone"
580					h[:phone_num] = field.text
581				end
582			end
583		end.values_at(:user_id, :api_token, :api_secret, :phone_num)
584	end
585
586	def self.process_registration(i, qn)
587		EMPromise.resolve(
588			qn.children.find { |v| v.element_name == "remove" }
589		).then { |rn|
590			if rn
591				puts "received <remove/> - ignoring for now..."
592				EMPromise.reject(:done)
593			else
594				creds_from_registration_query(qn)
595			end
596		}.then { |user_id, api_token, api_secret, phone_num|
597			if phone_num[0] == '+'
598				[user_id, api_token, api_secret, phone_num]
599			else
600				# TODO: add text re number not (yet) supported
601				EMPromise.reject([:cancel, 'item-not-found'])
602			end
603		}.then { |user_id, api_token, api_secret, phone_num|
604			# TODO: find way to verify #{phone_num}, too
605			call_catapult(
606				api_token,
607				api_secret,
608				:get,
609				"api/v2/users/#{user_id}/media"
610			).then { |response|
611				params = JSON.parse(response)
612				# TODO: confirm params is array - could be empty
613
614				puts "register got str #{response.to_s[0..999]}"
615
616				check_then_register(
617					i,
618					user_id,
619					api_token,
620					api_secret,
621					phone_num
622				)
623			}
624		}.catch { |e|
625			EMPromise.reject(case e
626			when 401
627				# TODO: add text re bad credentials
628				[:auth, 'not-authorized']
629			when 404
630				# TODO: add text re number not found or disabled
631				[:cancel, 'item-not-found']
632			when Integer
633				[:modify, 'not-acceptable']
634			else
635				e
636			end)
637		}
638	end
639
640	def self.registration_form(orig, existing_number=nil)
641		msg = Nokogiri::XML::Node.new 'query', orig.document
642		msg['xmlns'] = 'jabber:iq:register'
643
644		if existing_number
645			msg.add_child(
646				Nokogiri::XML::Node.new(
647					'registered', msg.document
648				)
649			)
650		end
651
652		# TODO: update "User Id" x2 below (to "accountId"?), and others?
653		n1 = Nokogiri::XML::Node.new(
654			'instructions', msg.document
655		)
656		n1.content = "Enter the information from your Account "\
657			"page as well as the Phone Number\nin your "\
658			"account you want to use (ie. '+12345678901')"\
659			".\nUser Id is nick, API Token is username, "\
660			"API Secret is password, Phone Number is phone"\
661			".\n\nThe source code for this gateway is at "\
662			"https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
663			"\nCopyright (C) 2017-2020  Denver Gingerich "\
664			"and others, licensed under AGPLv3+."
665		n2 = Nokogiri::XML::Node.new 'nick', msg.document
666		n3 = Nokogiri::XML::Node.new 'username', msg.document
667		n4 = Nokogiri::XML::Node.new 'password', msg.document
668		n5 = Nokogiri::XML::Node.new 'phone', msg.document
669		n5.content = existing_number.to_s
670		msg.add_child(n1)
671		msg.add_child(n2)
672		msg.add_child(n3)
673		msg.add_child(n4)
674		msg.add_child(n5)
675
676		x = Blather::Stanza::X.new :form, [
677			{
678				required: true, type: :"text-single",
679				label: 'User Id', var: 'nick'
680			},
681			{
682				required: true, type: :"text-single",
683				label: 'API Token', var: 'username'
684			},
685			{
686				required: true, type: :"text-private",
687				label: 'API Secret', var: 'password'
688			},
689			{
690				required: true, type: :"text-single",
691				label: 'Phone Number', var: 'phone',
692				value: existing_number.to_s
693			}
694		]
695		x.title = 'Register for '\
696			'Soprani.ca Gateway to XMPP - Bandwidth API V2'
697		x.instructions = "Enter the details from your Account "\
698			"page as well as the Phone Number\nin your "\
699			"account you want to use (ie. '+12345678901')"\
700			".\n\nThe source code for this gateway is at "\
701			"https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
702			"\nCopyright (C) 2017-2020  Denver Gingerich "\
703			"and others, licensed under AGPLv3+."
704		msg.add_child(x)
705
706		orig.add_child(msg)
707
708		return orig
709	end
710
711	iq '/iq/ns:query', ns: 'jabber:iq:register' do |i, qn|
712		puts "IQ: #{i.inspect}"
713
714		case i.type
715		when :set
716			process_registration(i, qn)
717		when :get
718			bare_jid = i.from.stripped
719			cred_key = "catapult_cred-#{bare_jid}"
720			REDIS.lindex(cred_key, 3).then { |existing_number|
721				reply = registration_form(i.reply, existing_number)
722				puts "RESPONSE2: #{reply.inspect}"
723				write_to_stream reply
724			}
725		else
726			# Unknown IQ, ignore for now
727			EMPromise.reject(:done)
728		end.catch { |e|
729			if e.is_a?(Array) && e.length == 2
730				write_to_stream error_msg(i.reply, qn, *e)
731			elsif e != :done
732				EMPromise.reject(e)
733			end
734		}.catch(&method(:panic))
735	end
736
737	iq :get? do |i|
738		write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
739	end
740
741	iq :set? do |i|
742		write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
743	end
744end
745
746class ReceiptMessage < Blather::Stanza
747	def self.new(to=nil)
748		node = super :message
749		node.to = to
750		node
751	end
752end
753
754class WebhookHandler < Goliath::API
755	use Goliath::Rack::Params
756
757	def response(env)
758		puts 'ENV: ' + env.reject{ |k| k == 'params' }.to_s
759
760		users_num = ''
761		others_num = ''
762		if params['direction'] == 'in'
763			users_num = params['to']
764			others_num = params['from']
765		elsif params['direction'] == 'out'
766			users_num = params['from']
767			others_num = params['to']
768		else
769			# TODO: exception or similar
770			puts "big problem: '" + params['direction'] + "'" + body
771			return [200, {}, "OK"]
772		end
773
774		puts 'BODY - messageId: ' + params['messageId'] +
775			', eventType: ' + params['eventType'] +
776			', time: ' + params['time'] +
777			', direction: ' + params['direction'] +
778			', state: ' + params['state'] +
779			', deliveryState: ' + (params['deliveryState'] ?
780				params['deliveryState'] : 'NONE') +
781			', deliveryCode: ' + (params['deliveryCode'] ?
782				params['deliveryCode'] : 'NONE') +
783			', deliveryDesc: ' + (params['deliveryDescription'] ?
784				params['deliveryDescription'] : 'NONE') +
785			', tag: ' + (params['tag'] ? params['tag'] : 'NONE') +
786			', media: ' + (params['media'] ? params['media'].to_s :
787				'NONE')
788
789		if others_num[0] != '+'
790			# TODO: check that others_num actually a shortcode first
791			others_num +=
792				';phone-context=ca-us.phone-context.soprani.ca'
793		end
794
795		jid_key = "catapult_jid-#{users_num}"
796		bare_jid = REDIS.get(jid_key).promise.sync
797
798		if !bare_jid
799			puts "jid_key (#{jid_key}) DNE; BW API misconfigured?"
800
801			# TODO: likely not appropriate; give error to BW API?
802			# TODO: add text re credentials not being registered
803			#write_to_stream error_msg(m.reply, m.body, :auth,
804			#	'registration-required')
805			return [200, {}, "OK"]
806		end
807
808		msg = ''
809		case params['direction']
810		when 'in'
811			text = ''
812			case params['eventType']
813			when 'sms'
814				text = params['text']
815			when 'mms'
816				has_media = false
817				params['media'].each do |media_url|
818					if not media_url.end_with?(
819						'.smil', '.txt', '.xml'
820					)
821
822						has_media = true
823						SGXbwmsgsv2.send_media(
824							others_num + '@' +
825							ARGV[0],
826							bare_jid, media_url
827						)
828					end
829				end
830
831				if params['text'].empty?
832					if not has_media
833						text = '[suspected group msg '\
834							'with no text (odd)]'
835					end
836				else
837					text = if has_media
838						# TODO: write/use a caption XEP
839						params['text']
840					else
841						'[suspected group msg '\
842						'(recipient list not '\
843						'available) with '\
844						'following text] ' +
845						params['text']
846					end
847				end
848
849				# ie. if text param non-empty or had no media
850				if not text.empty?
851					msg = Blather::Stanza::Message.new(
852						bare_jid, text)
853					msg.from = others_num + '@' + ARGV[0]
854					SGXbwmsgsv2.write(msg)
855				end
856
857				return [200, {}, "OK"]
858			else
859				text = "unknown type (#{params['eventType']})"\
860					" with text: " + params['text']
861
862				# TODO: log/notify of this properly
863				puts text
864			end
865
866			msg = Blather::Stanza::Message.new(bare_jid, text)
867		else # per prior switch, this is:  params['direction'] == 'out'
868			tag_parts = params['tag'].split(/ /, 2)
869			id = WEBrick::HTTPUtils.unescape(tag_parts[0])
870			resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
871
872			case params['deliveryState']
873			when 'not-delivered'
874				# create a bare message like the one user sent
875				msg = Blather::Stanza::Message.new(
876					others_num + '@' + ARGV[0])
877				msg.from = bare_jid + '/' + resourcepart
878				msg['id'] = id
879
880				# create an error reply to the bare message
881				msg = Blather::StanzaError.new(
882					msg,
883					'recipient-unavailable',
884					:wait
885				).to_node
886			when 'delivered'
887				msg = ReceiptMessage.new(bare_jid)
888
889				# TODO: put in member/instance variable
890				msg['id'] = SecureRandom.uuid
891
892				# TODO: send only when requested per XEP-0184
893				rcvd = Nokogiri::XML::Node.new(
894					'received',
895					msg.document
896				)
897				rcvd['xmlns'] = 'urn:xmpp:receipts'
898				rcvd['id'] = id
899				msg.add_child(rcvd)
900			when 'waiting'
901				# can't really do anything with it; nice to know
902				puts "message with id #{id} waiting"
903				return [200, {}, "OK"]
904			else
905				# TODO: notify somehow of unknown state receivd?
906				puts "message with id #{id} has "\
907					"other state #{params['deliveryState']}"
908				return [200, {}, "OK"]
909			end
910
911			puts "RESPONSE4: #{msg.inspect}"
912		end
913
914		msg.from = others_num + '@' + ARGV[0]
915		SGXbwmsgsv2.write(msg)
916
917		[200, {}, "OK"]
918
919	rescue Exception => e
920		puts 'Shutting down gateway due to exception 013: ' + e.message
921		SGXbwmsgsv2.shutdown
922		puts 'Gateway has terminated.'
923		EM.stop
924	end
925end
926
927at_exit do
928	$stdout.sync = true
929
930	puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\
931		"==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
932
933	if ARGV.size != 7
934		puts "Usage: sgx-bwmsgsv2.rb <component_jid> "\
935			"<component_password> <server_hostname> "\
936			"<server_port> <application_id> "\
937			"<http_listen_port> <mms_proxy_prefix_url>"
938		exit 0
939	end
940
941	t = Time.now
942	puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
943
944	EM.run do
945		REDIS = EM::Hiredis.connect
946
947		SGXbwmsgsv2.run
948
949		# required when using Prosody otherwise disconnects on 6-hour inactivity
950		EM.add_periodic_timer(3600) do
951			msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
952			msg.from = ARGV[0]
953			SGXbwmsgsv2.write(msg)
954		end
955
956		server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
957		server.api = WebhookHandler.new
958		server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
959		server.logger = Log4r::Logger.new('goliath')
960		server.logger.add(Log4r::StdoutOutputter.new('console'))
961		server.logger.level = Log4r::INFO
962		server.start do
963			["INT", "TERM"].each do |sig|
964				trap(sig) do
965					EM.defer do
966						puts 'Shutting down gateway...'
967						SGXbwmsgsv2.shutdown
968
969						puts 'Gateway has terminated.'
970						EM.stop
971					end
972				end
973			end
974		end
975	end
976end