merge initial EventMachine refactor - passes tests

Denver Gingerich created

This looks very good to me and worked fine in my testing (including
most of the cases that result in errors being returned to the user).
The changes should make sgx-catapult much more performant (especially
as we handle more load) and are generally the right thing to do.

There are a couple of minor fixes that are included in these changes
that would ideally be broken out into separate commits, but I won't
push for those here as the important part is the resulting code.

See merge request !6 for the discussion and details behind this merge.

Change summary

.rubocop.yml    |  14 +
Gemfile         |   3 
em_promise.rb   |  50 +++++
sgx-catapult.rb | 468 ++++++++++++++++++++++----------------------------
4 files changed, 270 insertions(+), 265 deletions(-)

Detailed changes

.rubocop.yml 🔗

@@ -35,7 +35,7 @@ Metrics/ModuleLength:
   Max: 1000
 
 Metrics/ParameterLists:
-  Max: 6
+  Max: 7
 
 Metrics/PerceivedComplexity:
   Max: 20
@@ -93,3 +93,15 @@ Style/NegatedIf:
 
 Style/RedundantReturn:
   Enabled: false
+
+Style/MultilineBlockChain:
+  Enabled: false
+
+Style/SpaceAroundEqualsInParameterDefault:
+  EnforcedStyle: no_space
+
+Style/IndentArray:
+  EnforcedStyle: consistent
+
+Style/FirstParameterIndentation:
+  EnforcedStyle: consistent

Gemfile 🔗

@@ -2,8 +2,11 @@ source 'https://rubygems.org'
 
 gem 'activesupport', '<5.0.0'
 gem 'blather'
+gem 'em-http-request'
 gem 'eventmachine', '1.0.0'
+gem 'promise.rb'
 
+gem 'em-hiredis'
 gem 'hiredis', '~> 0.6.0'
 gem 'redis', '>= 3.2.0'
 

em_promise.rb 🔗

@@ -0,0 +1,50 @@
+require "eventmachine"
+require "promise"
+
+class EMPromise < Promise
+	def initialize(deferrable=nil)
+		super()
+		fulfill(deferrable) if deferrable
+	end
+
+	def fulfill(value, bind_defer=true)
+		if bind_defer && value.is_a?(EM::Deferrable)
+			value.callback { |x| fulfill(x, false) }
+			value.errback(&method(:reject))
+		else
+			super(value)
+		end
+	end
+
+	def defer
+		EM.next_tick { yield }
+	end
+
+	def wait
+		fiber = Fiber.current
+		resume = proc do |arg|
+			defer { fiber.resume(arg) }
+		end
+
+		self.then(resume, resume)
+		Fiber.yield
+	end
+
+	def self.reject(e)
+		new.tap { |promise| promise.reject(e) }
+	end
+end
+
+module EventMachine
+	module Deferrable
+		def promise
+			EMPromise.new(self)
+		end
+
+		[:then, :rescue, :catch].each do |method|
+			define_method(method) do |*args, &block|
+				promise.public_send(method, *args, &block)
+			end
+		end
+	end
+end

sgx-catapult.rb 🔗

@@ -19,6 +19,8 @@
 # with sgx-catapult.  If not, see <http://www.gnu.org/licenses/>.
 
 require 'blather/client/dsl'
+require 'em-hiredis'
+require 'em-http-request'
 require 'json'
 require 'net/http'
 require 'redis/connection/hiredis'
@@ -31,6 +33,8 @@ require 'goliath/api'
 require 'goliath/server'
 require 'log4r'
 
+require_relative 'em_promise'
+
 $stdout.sync = true
 
 puts "Soprani.ca/SMS Gateway for XMPP - Catapult\n"\
@@ -47,6 +51,19 @@ end
 t = Time.now
 puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
 
+def panic(e)
+	puts "Shutting down gateway due to exception: #{e.message}"
+	puts e.backtrace
+	SGXcatapult.shutdown
+	puts 'Gateway has terminated.'
+	EM.stop
+end
+
+def extract_shortcode(dest)
+	num, context = dest.split(';', 2)
+	num if context && context == 'phone-context=ca-us.phone-context.soprani.ca'
+end
+
 module SGXcatapult
 	extend Blather::DSL
 
@@ -63,7 +80,7 @@ module SGXcatapult
 		client.write(stanza)
 	end
 
-	def self.error_msg(orig, query_node, type, name, text = nil)
+	def self.error_msg(orig, query_node, type, name, text=nil)
 		if not query_node.nil?
 			orig.add_child(query_node)
 			orig.type = :error
@@ -90,125 +107,140 @@ module SGXcatapult
 
 	setup ARGV[0], ARGV[1], ARGV[2], ARGV[3]
 
-	message :chat?, :body do |m|
-	begin
-		num_dest = m.to.to_s.split('@', 2)[0]
-
-		if num_dest[0] != '+'
-			# check to see if a valid shortcode context is specified
-			num_and_context = num_dest.split(';', 2)
-			if num_and_context[1] and num_and_context[1] ==
-				'phone-context=ca-us.phone-context.soprani.ca'
-
-				# TODO: check if num_dest is fully numeric
-				num_dest = num_and_context[0]
-			else
-				# TODO: text re num not (yet) supportd/implmentd
-				write_to_stream error_msg(
-					m.reply, m.body,
-					:cancel, 'item-not-found'
-				)
-				next
-			end
-		end
-
-		bare_jid = m.from.to_s.split('/', 2)[0]
-		cred_key = "catapult_cred-" + bare_jid
-
-		conn = Hiredis::Connection.new
-		conn.connect(ARGV[4], ARGV[5].to_i)
-
-		conn.write ["EXISTS", cred_key]
-		if conn.read == 0
-			conn.disconnect
-
-			# TODO: add text re credentials not being registered
-			write_to_stream error_msg(
-				m.reply, m.body, :auth,
-				'registration-required'
-			)
-			next
-		end
+	def self.pass_on_message(m, users_num, jid)
+		# setup delivery receipt; similar to a reply
+		rcpt = ReceiptMessage.new(m.from.stripped)
+		rcpt.from = m.to
 
-		conn.write ["LRANGE", cred_key, 0, 3]
-		user_id, api_token, api_secret, users_num = conn.read
+		# pass original message (before sending receipt)
+		m.to = jid
+		m.from = "#{users_num}@#{ARGV[0]}"
 
-		# if the destination user is in the system just pass on directly
-		jid_key = "catapult_jid-" + num_dest
-		conn.write ["EXISTS", jid_key]
-		if conn.read > 0
-			# setup delivery receipt; sort of a reply but not quite
-			rcpt = ReceiptMessage.new(bare_jid)
-			rcpt.from = m.to
+		puts 'XRESPONSE0: ' + m.inspect
+		write_to_stream m
 
-			# pass on the original message (before sending receipt)
-			conn.write ["GET", jid_key]
-			m.to = conn.read
+		# send a delivery receipt back to the sender
+		# TODO: send only when requested per XEP-0184
+		# TODO: pass receipts from target if supported
 
-			m.from = users_num + '@' + ARGV[0]
+		# TODO: put in member/instance variable
+		rcpt['id'] = SecureRandom.uuid
+		rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
+		rcvd['xmlns'] = 'urn:xmpp:receipts'
+		rcvd['id'] = m.id
+		rcpt.add_child(rcvd)
 
-			puts 'XRESPONSE0: ' + m.inspect
-			write_to_stream m
-
-			# send a delivery receipt back to the sender
-			# TODO: send only when requested per XEP-0184
-
-			# TODO: put in member/instance variable
-			rcpt['id'] = SecureRandom.uuid
-			rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
-			rcvd['xmlns'] = 'urn:xmpp:receipts'
-			rcvd['id'] = m.id
-			rcpt.add_child(rcvd)
+		puts 'XRESPONSE1: ' + rcpt.inspect
+		write_to_stream rcpt
+	end
 
-			puts 'XRESPONSE1: ' + rcpt.inspect
-			write_to_stream rcpt
+	def self.call_catapult(token, secret, m, pth, body, head={}, code=[200])
+		EM::HttpRequest.new(
+			"https://api.catapult.inetwork.com/#{pth}"
+		).public_send(
+			m,
+			head: {
+				'Authorization' => [token, secret]
+			}.merge(head),
+			body: body
+		).then { |http|
+			puts "API response to send: #{http.response} with code"\
+				" response.code #{http.response_header.status}"
+
+			if code.include?(http.response_header.status)
+				http.response
+			else
+				# TODO: add text; mention code number
+				EMPromise.reject(
+					[:cancel, 'internal-server-error']
+				)
+			end
+		}
+	end
 
-			conn.disconnect
-			next
+	def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
+		extra = if murl
+			{
+				media: murl
+			}
+		else
+			{
+				receiptRequested: 'all',
+				callbackUrl:      ARGV[6]
+			}
 		end
 
-		conn.disconnect
-
-		uri = URI.parse('https://api.catapult.inetwork.com')
-		http = Net::HTTP.new(uri.host, uri.port)
-		http.use_ssl = true
-		request = Net::HTTP::Post.new('/v1/users/' + user_id +
-			'/messages')
-		request.basic_auth api_token, api_secret
-		request.add_field('Content-Type', 'application/json')
-		request.body = JSON.dump(
-			'from'			=> users_num,
-			'to'			=> num_dest,
-			'text'			=> m.body,
-			'tag'			=>
-				# callbacks need both the id and resourcepart
-				WEBrick::HTTPUtils.escape(m.id.to_s) + ' ' +
-				WEBrick::HTTPUtils.escape(
-					m.from.to_s.split('/', 2)[1].to_s
-				),
-			'receiptRequested'	=> 'all',
-			'callbackUrl'		=> ARGV[6]
+		call_catapult(
+			token,
+			secret,
+			:post,
+			"v1/users/#{user_id}/messages",
+			JSON.dump(extra.merge(
+				from: usern,
+				to:   num_dest,
+				text: s.respond_to?(:body) ? s.body : '',
+				tag:
+					# callbacks need id and resourcepart
+					WEBrick::HTTPUtils.escape(s.id.to_s) +
+					' ' +
+					WEBrick::HTTPUtils.escape(
+						s.from.resource.to_s
+					)
+			)),
+			{'Content-Type' => 'application/json'},
+			[201]
 		)
-		response = http.request(request)
-
-		puts 'API response to send: ' + response.to_s + ' with code ' +
-			response.code + ', body "' + response.body + '"'
+	end
 
-		if response.code != '201'
-			# TODO: add text re unexpected code; mention code number
-			write_to_stream error_msg(
-				m.reply, m.body, :cancel,
-				'internal-server-error'
-			)
-			next
-		end
+	def self.validate_num(num)
+		EMPromise.resolve(num.to_s).then { |num_dest|
+			if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
+				next num_dest if num_dest[0] == '+'
+				shortcode = extract_shortcode(num_dest)
+				next shortcode if shortcode
+			end
+			# TODO: text re num not (yet) supportd/implmentd
+			EMPromise.reject([:cancel, 'item-not-found'])
+		}
+	end
 
-	rescue Exception => e
-		puts 'Shutting down gateway due to exception 001: ' + e.message
-		SGXcatapult.shutdown
-		puts 'Gateway has terminated.'
-		EM.stop
+	def self.fetch_catapult_cred_for(jid)
+		cred_key = "catapult_cred-#{jid.stripped}"
+		REDIS.lrange(cred_key, 0, 3).then { |creds|
+			if creds.length < 4
+				# TODO: add text re credentials not registered
+				EMPromise.reject(
+					[:auth, 'registration-required']
+				)
+			else
+				creds
+			end
+		}
 	end
+
+	message :chat?, :body do |m|
+		EMPromise.all([
+			validate_num(m.to.node),
+			fetch_catapult_cred_for(m.from)
+		]).then { |(num_dest, creds)|
+			jid_key = "catapult_jid-#{num_dest}"
+			REDIS.get(jid_key).then { |jid|
+				[jid, num_dest] + creds
+			}
+		}.then { |(jid, num_dest, *creds)|
+			# if destination user is in the system pass on directly
+			if jid
+				pass_on_message(m, creds.last, jid)
+			else
+				to_catapult(m, nil, num_dest, *creds)
+			end
+		}.catch { |e|
+			if e.is_a?(Array) && e.length == 2
+				write_to_stream error_msg(m.reply, m.body, *e)
+			else
+				EMPromise.reject(e)
+			end
+		}.catch(&method(:panic))
 	end
 
 	def self.user_cap_identities
@@ -431,158 +463,73 @@ module SGXcatapult
 	end
 
 	iq '/iq/ns:close', ns:	'http://jabber.org/protocol/ibb' do |i, cn|
-	begin
 		puts "IQc: #{i.inspect}"
 		write_to_stream i.reply
 
-		# TODO: refactor below so that "message :chat?" uses same code
-		num_dest = i.to.to_s.split('@', 2)[0]
-
-		if num_dest[0] != '+'
-			# check to see if a valid shortcode context is specified
-			num_and_context = num_dest.split(';', 2)
-			if num_and_context[1] and num_and_context[1] ==
-				'phone-context=ca-us.phone-context.soprani.ca'
-
-				# TODO: check if num_dest is fully numeric
-				num_dest = num_and_context[0]
-			else
-				# TODO: text re num not (yet) supportd/implmentd
-				write_to_stream error_msg(
-					i.reply, nil,
-					:cancel, 'item-not-found'
-				)
-				next
+		EMPromise.all([
+			validate_num(i.to.node),
+			fetch_catapult_cred_for(i.from)
+		]).then { |(num_dest, creds)|
+			# Gajim bug: <close/> has Jingle (not transport) sid; fix later
+			if not @jingle_fnames.key? cn[0]['sid']
+				puts 'ERROR: Not found in filename map: ' + cn[0]['sid']
+
+				next EMPromise.reject(:done)
+				# TODO: in case only Gajim's <data/> bug fixed, add map:
+				#cn[0]['sid'] = @jingle_tsids[cn[0]['sid']]
 			end
-		end
-
-		bare_jid = i.from.to_s.split('/', 2)[0]
-		cred_key = "catapult_cred-" + bare_jid
-
-		# TODO: connect at start of program instead
-		conn = Hiredis::Connection.new
-		conn.connect(ARGV[4], ARGV[5].to_i)
-
-		conn.write ["EXISTS", cred_key]
-		if conn.read == 0
-			conn.disconnect
-
-			# TODO: add text re credentials not being registered
-			write_to_stream error_msg(
-				i.reply, nil, :auth,
-				'registration-required'
-			)
-			next
-		end
 
-		conn.write ["LRANGE", cred_key, 0, 3]
-		user_id, api_token, api_secret, users_num = conn.read
-		conn.disconnect
-
-		# Gajim bug: <close/> has Jingle (not transport) sid; fix later
-		if not @jingle_fnames.key? cn[0]['sid']
-			puts 'ERROR: Not found in filename map: ' + cn[0]['sid']
-
-			next
-			# TODO: in case only Gajim's <data/> bug fixed, add map:
-			#cn[0]['sid'] = @jingle_tsids[cn[0]['sid']]
-		end
+			# upload cached data to server (before success reply)
+			media_name =
+				"#{Time.now.utc.iso8601}_#{SecureRandom.uuid}"\
+				"_#{@jingle_fnames[cn[0]['sid']]}"
+			puts 'name to save: ' + media_name
 
-		# upload cached data to server (before success reply)
-		media_name =
-			"#{Time.now.utc.iso8601}_#{SecureRandom.uuid}"\
-			"_#{@jingle_fnames[cn[0]['sid']]}"
-		puts 'name to save: ' + media_name
-
-		uri = URI.parse('https://api.catapult.inetwork.com')
-		http = Net::HTTP.new(uri.host, uri.port)
-		http.use_ssl = true
-		request = Net::HTTP::Put.new('/v1/users/' + user_id +
-			'/media/' + media_name)
-		request.basic_auth api_token, api_secret
-		request.body = @partial_data[cn[0]['sid']]
-		response = http.request(request)
-
-		puts 'eAPI response to send: ' + response.to_s + ' with code ' +
-			response.code + ', body "' + response.body + '"'
-
-		if response.code != '200'
-			# TODO: add text re unexpected code; mention code number
-			write_to_stream error_msg(
-				i.reply, nil, :cancel,
-				'internal-server-error'
-			)
-			next
-		end
+			path = "/v1/users/#{creds.first}/media/#{media_name}"
 
-		uri = URI.parse('https://api.catapult.inetwork.com')
-		http = Net::HTTP.new(uri.host, uri.port)
-		http.use_ssl = true
-		request = Net::HTTP::Post.new('/v1/users/' + user_id +
-			'/messages')
-		request.basic_auth api_token, api_secret
-		request.add_field('Content-Type', 'application/json')
-		request.body = JSON.dump(
-			'from'			=> users_num,
-			'to'			=> num_dest,
-			'text'			=> '',
-			'media'			=> [
-				'https://api.catapult.inetwork.com/v1/users/' +
-				user_id + '/media/' + media_name
-			],
-			'tag'			=>
-				# callbacks need both the id and resourcepart
-				WEBrick::HTTPUtils.escape(i.id.to_s) + ' ' +
-				WEBrick::HTTPUtils.escape(
-					i.from.to_s.split('/', 2)[1].to_s
+			EMPromise.all([
+				call_catapult(
+					*creds[1..2],
+					:put,
+					path,
+					@partial_data[cn[0]['sid']]
+				),
+				to_catapult(
+					i,
+					"https://api.catapult.inetwork.com/" +
+						path,
+					num_dest,
+					*creds
 				)
-			# TODO: add back when Bandwidth AP supports it (?); now:
-			#  "The ''messages'' resource property
-			#  ''receiptRequested'' is not supported for MMS"
-			#'receiptRequested'	=> 'all',
-			#'callbackUrl'		=> ARGV[6]
-		)
-		response = http.request(request)
-
-		puts 'mAPI response to send: ' + response.to_s + ' with code ' +
-			response.code + ', body "' + response.body + '"'
-
-		if response.code != '201'
-			# TODO: add text re unexpected code; mention code number
-			write_to_stream error_msg(
-				i.reply, nil, :cancel,
-				'internal-server-error'
-			)
-			next
-		end
-
-		@partial_data[cn[0]['sid']] = ''
-
-		# received the complete file so now close the stream
-		msg = Blather::Stanza::Iq.new :set
-		msg.to = i.from
-		msg.from = i.to
-
-		j = Nokogiri::XML::Node.new 'jingle', msg.document
-		j['xmlns'] = 'urn:xmpp:jingle:1'
-		j['action'] = 'session-terminate'
-		j['sid'] = @jingle_sids[cn[0]['sid']]
-		msg.add_child(j)
-
-		r = Nokogiri::XML::Node.new 'reason', msg.document
-		s = Nokogiri::XML::Node.new 'success', msg.document
-		r.add_child(s)
-		j.add_child(r)
-
-		puts 'RESPONSE1: ' + msg.inspect
-		write_to_stream msg
-
-	rescue Exception => e
-		puts 'Shutting down gateway due to exception 007: ' + e.message
-		SGXcatapult.shutdown
-		puts 'Gateway has terminated.'
-		EM.stop
-	end
+			])
+		}.then {
+			@partial_data[cn[0]['sid']] = ''
+
+			# received the complete file so now close the stream
+			msg = Blather::Stanza::Iq.new :set
+			msg.to = i.from
+			msg.from = i.to
+
+			j = Nokogiri::XML::Node.new 'jingle', msg.document
+			j['xmlns'] = 'urn:xmpp:jingle:1'
+			j['action'] = 'session-terminate'
+			j['sid'] = @jingle_sids[cn[0]['sid']]
+			msg.add_child(j)
+
+			r = Nokogiri::XML::Node.new 'reason', msg.document
+			s = Nokogiri::XML::Node.new 'success', msg.document
+			r.add_child(s)
+			j.add_child(r)
+
+			puts 'RESPONSE1: ' + msg.inspect
+			write_to_stream msg
+		}.catch { |e|
+			if e.is_a?(Array) && e.length == 2
+				write_to_stream error_msg(i.reply, nil, *e)
+			elsif e != :done
+				EMPromise.reject(e)
+			end
+		}.catch(&method(:panic))
 	end
 
 	iq '/iq/ns:query', ns:	'http://jabber.org/protocol/disco#items' do |i|
@@ -950,7 +897,7 @@ end
 end
 
 class ReceiptMessage < Blather::Stanza
-	def self.new(to = nil)
+	def self.new(to=nil)
 		node = super :message
 		node.to = to
 		node
@@ -1013,21 +960,16 @@ class WebhookHandler < Goliath::API
 			return [200, {}, "OK"]
 		end
 
-		jid_key = "catapult_jid-" + users_num
-
 		if others_num[0] != '+'
 			# TODO: check that others_num actually a shortcode first
 			others_num +=
 				';phone-context=ca-us.phone-context.soprani.ca'
 		end
 
-		conn = Hiredis::Connection.new
-		conn.connect(ARGV[4], ARGV[5].to_i)
-
-		conn.write ["EXISTS", jid_key]
-		if conn.read == 0
-			conn.disconnect
+		jid_key = "catapult_jid-#{users_num}"
+		bare_jid = REDIS.get(jid_key).promise.sync
 
+		if !bare_jid
 			puts "jid_key (#{jid_key}) DNE; Catapult misconfigured?"
 
 			# TODO: likely not appropriate; give error to Catapult?
@@ -1037,10 +979,6 @@ class WebhookHandler < Goliath::API
 			return [200, {}, "OK"]
 		end
 
-		conn.write ["GET", jid_key]
-		bare_jid = conn.read
-		conn.disconnect
-
 		msg = ''
 		case params['direction']
 		when 'in'
@@ -1161,6 +1099,8 @@ class WebhookHandler < Goliath::API
 end
 
 EM.run do
+	REDIS = EM::Hiredis.connect("redis://#{ARGV[4]}:#{ARGV[5]}/0")
+
 	SGXcatapult.run
 
 	# required when using Prosody otherwise disconnects on 6-hour inactivity