Refactor register to use EM

Stephen Paul Weber created

Now everything uses EM and there's no blocking IO!

Also, fixes stalls caused by Blather mixing threads with EM by default:
https://github.com/adhearsion/blather/issues/130
https://github.com/eventmachine/eventmachine/issues/779

Change summary

.rubocop.yml    |   3 
Gemfile         |   5 
sgx-catapult.rb | 477 +++++++++++++++++++++++---------------------------
3 files changed, 221 insertions(+), 264 deletions(-)

Detailed changes

.rubocop.yml 🔗

@@ -49,6 +49,9 @@ Style/AlignParameters:
 Style/BlockDelimiters:
   Enabled: false
 
+Style/CaseIndentation:
+  EnforcedStyle: end
+
 Style/Documentation:
   Enabled: false
 

Gemfile 🔗

@@ -2,14 +2,11 @@ source 'https://rubygems.org'
 
 gem 'activesupport', '<5.0.0'
 gem 'blather'
+gem 'em-hiredis'
 gem 'em-http-request'
 gem 'eventmachine', '1.0.1'
 gem 'promise.rb'
 
-gem 'em-hiredis'
-gem 'hiredis', '~> 0.6.0'
-gem 'redis', '>= 3.2.0'
-
 gem 'goliath'
 gem 'log4r'
 

sgx-catapult.rb 🔗

@@ -22,8 +22,6 @@ require 'blather/client/dsl'
 require 'em-hiredis'
 require 'em-http-request'
 require 'json'
-require 'net/http'
-require 'redis/connection/hiredis'
 require 'securerandom'
 require 'time'
 require 'uri'
@@ -114,7 +112,8 @@ module SGXcatapult
 		return orig
 	end
 
-	setup ARGV[0], ARGV[1], ARGV[2], ARGV[3]
+	# workqueue_count MUST be 0 or else Blather uses threads!
+	setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, workqueue_count: 0
 
 	def self.pass_on_message(m, users_num, jid)
 		# setup delivery receipt; similar to a reply
@@ -143,7 +142,9 @@ module SGXcatapult
 		write_to_stream rcpt
 	end
 
-	def self.call_catapult(token, secret, m, pth, body, head={}, code=[200])
+	def self.call_catapult(
+		token, secret, m, pth, body=nil, head={}, code=[200]
+	)
 		EM::HttpRequest.new(
 			"https://api.catapult.inetwork.com/#{pth}"
 		).public_send(
@@ -159,10 +160,7 @@ module SGXcatapult
 			if code.include?(http.response_header.status)
 				http.response
 			else
-				# TODO: add text; mention code number
-				EMPromise.reject(
-					[:cancel, 'internal-server-error']
-				)
+				EMPromise.reject(http.response_header.status)
 			end
 		}
 	end
@@ -198,7 +196,12 @@ module SGXcatapult
 			)),
 			{'Content-Type' => 'application/json'},
 			[201]
-		)
+		).catch {
+			# TODO: add text; mention code number
+			EMPromise.reject(
+				[:cancel, 'internal-server-error']
+			)
+		}
 	end
 
 	def self.validate_num(num)
@@ -462,7 +465,11 @@ module SGXcatapult
 					:put,
 					path,
 					@partial_data[cn[0]['sid']]
-				),
+				).catch {
+					EMPromise.reject([
+						:cancel, 'internal-server-error'
+					])
+				},
 				to_catapult(
 					i,
 					"https://api.catapult.inetwork.com/" +
@@ -537,285 +544,235 @@ module SGXcatapult
 		write_to_stream msg
 	end
 
-	def self.check_then_register(user_id, api_token, api_secret, phone_num,
-		i, qn)
-
-		jid_key = "catapult_jid-" + phone_num
-
-		bare_jid = i.from.to_s.split('/', 2)[0]
-		cred_key = "catapult_cred-" + bare_jid
-
-		# TODO: pre-validate ARGV[5] is integer
-		conn = Hiredis::Connection.new
-		conn.connect(ARGV[4], ARGV[5].to_i)
-
-		conn.write ["GET", jid_key]
-		existing_jid = conn.read
-
-		if not existing_jid.nil? and existing_jid != bare_jid
-			conn.disconnect
-
-			# TODO: add/log text re credentials exist already
-			write_to_stream error_msg(
-				i.reply, qn, :cancel,
-				'conflict')
-			return false
-		end
-
-		# ASSERT: existing_jid is nil or equal to bare_jid
-
-		conn.write ["EXISTS", cred_key]
-		creds_exist = conn.read
-		if 1 == creds_exist
-			conn.write ["LRANGE", cred_key, 0, 3]
-			if [user_id, api_token, api_secret, phone_num] !=
-				conn.read
+	def self.check_then_register(i, *creds)
+		jid_key = "catapult_jid-#{creds.last}"
+		bare_jid = i.from.stripped
+		cred_key = "catapult_cred-#{bare_jid}"
 
-				conn.disconnect
-
-				# TODO: add/log txt re credentials exist already
-				write_to_stream error_msg(
-					i.reply, qn, :cancel,
-					'conflict')
-				return false
+		REDIS.get(jid_key).then { |existing_jid|
+			if existing_jid && existing_jid != bare_jid
+				# TODO: add/log text: credentials exist already
+				EMPromise.reject([:cancel, 'conflict'])
 			end
-		end
-
-		# ASSERT: cred_key does not exist or its value equals input vals
-
-		# not necessary if existing_jid non-nil, but easier to do anyway
-		conn.write ["SET", jid_key, bare_jid]
-		if conn.read != 'OK'
-			conn.disconnect
-
-			# TODO: catch/relay RuntimeError
-			# TODO: add txt re push failure
-			write_to_stream error_msg(
-				i.reply, qn, :cancel,
-				'internal-server-error')
-			return false
-		end
-
-		if 1 == creds_exist
-			# per above ASSERT, cred_key value equals input already
-			conn.disconnect
+		}.then {
+			REDIS.lrange(cred_key, 0, 3)
+		}.then { |existing_creds|
+			# TODO: add/log text: credentials exist already
+			if existing_creds.length == 4 && creds != existing_creds
+				EMPromise.reject([:cancel, 'conflict'])
+			elsif existing_creds.length < 4
+				REDIS.rpush(cred_key, *creds).then { |length|
+					if length != 4
+						EMPromise.reject([
+							:cancel,
+							'internal-server-error'
+						])
+					end
+				}
+			end
+		}.then {
+			# not necessary if existing_jid non-nil, easier this way
+			REDIS.set(jid_key, bare_jid)
+		}.then { |result|
+			if result != 'OK'
+				# TODO: add txt re push failure
+				EMPromise.reject(
+					[:cancel, 'internal-server-error']
+				)
+			end
+		}.then {
 			write_to_stream i.reply
-			return true
-		end
-
-		conn.write ["RPUSH", cred_key, user_id]
-		conn.write ["RPUSH", cred_key, api_token]
-		conn.write ["RPUSH", cred_key, api_secret]
-		conn.write ["RPUSH", cred_key, phone_num]
+		}
+	end
 
-		# TODO: confirm cred_key list size == 4
+	def self.creds_from_registration_query(qn)
+		xn = qn.children.find { |v| v.element_name == "x" }
 
-		(1..4).each do |n|
-			# TODO: catch/relay RuntimeError
-			result = conn.read
-			if result != n
-				conn.disconnect
+		if xn
+			xn.children.each_with_object({}) do |field, h|
+				next if field.element_name != "field"
+				val = field.children.find { |v|
+					v.element_name == "value"
+				}
 
-				write_to_stream error_msg(
-					i.reply, qn, :cancel,
-					'internal-server-error')
-				return false
+				case field['var']
+				when 'nick'
+					h[:user_id] = val.text
+				when 'username'
+					h[:api_token] = val.text
+				when 'password'
+					h[:api_secret] = val.text
+				when 'phone'
+					h[:phone_num] = val.text
+				else
+					# TODO: error
+					puts "?: #{field['var']}"
+				end
 			end
-		end
-		conn.disconnect
-
-		write_to_stream i.reply
-
-		return true
+		else
+			qn.children.each_with_object({}) do |field, h|
+				case field.element_name
+				when "nick"
+					h[:user_id] = field.text
+				when "username"
+					h[:api_token] = field.text
+				when "password"
+					h[:api_secret] = field.text
+				when "phone"
+					h[:phone_num] = field.text
+				end
+			end
+		end.values_at(:user_id, :api_token, :api_secret, :phone_num)
 	end
 
-	iq '/iq/ns:query', ns: 'jabber:iq:register' do |i, qn|
-		puts "IQ: #{i.inspect}"
-
-		if i.type == :set
-			rn = qn.children.find { |v| v.element_name == "remove" }
-			if not rn.nil?
+	def self.process_registration(i, qn)
+		EMPromise.resolve(
+			qn.children.find { |v| v.element_name == "remove" }
+		).then { |rn|
+			if rn
 				puts "received <remove/> - ignoring for now..."
-				next
-			end
-
-			xn = qn.children.find { |v| v.element_name == "x" }
-
-			user_id = ''
-			api_token = ''
-			api_secret = ''
-			phone_num = ''
-
-			if xn.nil?
-				user_id = qn.children.find { |v|
-					v.element_name == "nick"
-				}
-				api_token = qn.children.find { |v|
-					v.element_name == "username"
-				}
-				api_secret = qn.children.find { |v|
-					v.element_name == "password"
-				}
-				phone_num = qn.children.find { |v|
-					v.element_name == "phone"
-				}
+				EMPromise.reject(:done)
 			else
-				xn.children.each do |field|
-					if field.element_name == "field"
-						val = field.children.find { |v|
-							v.element_name == "value"
-						}
-
-						case field['var']
-						when 'nick'
-							user_id = val.text
-						when 'username'
-							api_token = val.text
-						when 'password'
-							api_secret = val.text
-						when 'phone'
-							phone_num = val.text
-						else
-							# TODO: error
-							puts "?: " +field['var']
-						end
-					end
-				end
+				creds_from_registration_query(qn)
 			end
-
-			if phone_num[0] != '+'
+		}.then { |user_id, api_token, api_secret, phone_num|
+			if phone_num[0] == '+'
+				[user_id, api_token, api_secret, phone_num]
+			else
 				# TODO: add text re number not (yet) supported
-				write_to_stream error_msg(
-					i.reply, qn, :cancel,
-					'item-not-found'
-				)
-				next
+				EMPromise.reject([:cancel, 'item-not-found'])
 			end
-
-			uri = URI.parse('https://api.catapult.inetwork.com')
-			http = Net::HTTP.new(uri.host, uri.port)
-			http.use_ssl = true
-			request = Net::HTTP::Get.new('/v1/users/' + user_id +
-				'/phoneNumbers/' + phone_num)
-			request.basic_auth api_token, api_secret
-			response = http.request(request)
-
-			puts 'API response: ' + response.to_s + ' with code ' +
-				response.code + ', body "' + response.body + '"'
-
-			if response.code == '200'
-				params = JSON.parse response.body
+		}.then { |user_id, api_token, api_secret, phone_num|
+			call_catapult(
+				api_token,
+				api_secret,
+				:get,
+				"v1/users/#{user_id}/phoneNumbers/#{phone_num}"
+			).then { |response|
+				params = JSON.parse(response)
 				if params['numberState'] == 'enabled'
-					if not check_then_register(
-						user_id, api_token, api_secret,
-						phone_num, i, qn
+					check_then_register(
+						i,
+						user_id,
+						api_token,
+						api_secret,
+						phone_num
 					)
-						next
-					end
 				else
 					# TODO: add text re number disabled
-					write_to_stream error_msg(
-						i.reply, qn,
-						:modify, 'not-acceptable'
-					)
+					EMPromise.reject([:modify, 'not-acceptable'])
 				end
-			elsif response.code == '401'
+			}
+		}.catch { |e|
+			EMPromise.reject(case e
+			when 401
 				# TODO: add text re bad credentials
-				write_to_stream error_msg(
-					i.reply, qn, :auth,
-					'not-authorized'
-				)
-			elsif response.code == '404'
+				[:auth, 'not-authorized']
+			when 404
 				# TODO: add text re number not found or disabled
-				write_to_stream error_msg(
-					i.reply, qn, :cancel,
-					'item-not-found'
-				)
+				[:cancel, 'item-not-found']
+			when Integer
+				[:modify, 'not-acceptable']
 			else
-				# TODO: add text re misc error, and mention code
-				write_to_stream error_msg(
-					i.reply, qn, :modify,
-					'not-acceptable'
+				e
+			end)
+		}
+	end
+
+	def self.registration_form(orig, existing_number=nil)
+		msg = Nokogiri::XML::Node.new 'query', orig.document
+		msg['xmlns'] = 'jabber:iq:register'
+
+		if existing_number
+			msg.add_child(
+				Nokogiri::XML::Node.new(
+					'registered', msg.document
 				)
-			end
+			)
+		end
 
-		elsif i.type == :get
-			orig = i.reply
+		n1 = Nokogiri::XML::Node.new(
+			'instructions', msg.document
+		)
+		n1.content = "Enter the information from your Account "\
+			"page as well as the Phone Number\nin your "\
+			"account you want to use (ie. '+12345678901')"\
+			".\nUser Id is nick, API Token is username, "\
+			"API Secret is password, Phone Number is phone"\
+			".\n\nThe source code for this gateway is at "\
+			"https://gitlab.com/ossguy/sgx-catapult ."\
+			"\nCopyright (C) 2017  Denver Gingerich and "\
+			"others, licensed under AGPLv3+."
+		n2 = Nokogiri::XML::Node.new 'nick', msg.document
+		n3 = Nokogiri::XML::Node.new 'username', msg.document
+		n4 = Nokogiri::XML::Node.new 'password', msg.document
+		n5 = Nokogiri::XML::Node.new 'phone', msg.document
+		n5.content = existing_number.to_s
+		msg.add_child(n1)
+		msg.add_child(n2)
+		msg.add_child(n3)
+		msg.add_child(n4)
+		msg.add_child(n5)
+
+		x = Blather::Stanza::X.new :form, [
+			{
+				required: true, type: :"text-single",
+				label: 'User Id', var: 'nick'
+			},
+			{
+				required: true, type: :"text-single",
+				label: 'API Token', var: 'username'
+			},
+			{
+				required: true, type: :"text-private",
+				label: 'API Secret', var: 'password'
+			},
+			{
+				required: true, type: :"text-single",
+				label: 'Phone Number', var: 'phone',
+				value: existing_number.to_s
+			}
+		]
+		x.title = 'Register for '\
+			'Soprani.ca Gateway to XMPP - Catapult'
+		x.instructions = "Enter the details from your Account "\
+			"page as well as the Phone Number\nin your "\
+			"account you want to use (ie. '+12345678901')"\
+			".\n\nThe source code for this gateway is at "\
+			"https://gitlab.com/ossguy/sgx-catapult ."\
+			"\nCopyright (C) 2017  Denver Gingerich and "\
+			"others, licensed under AGPLv3+."
+		msg.add_child(x)
 
-			bare_jid = i.from.to_s.split('/', 2)[0]
-			cred_key = "catapult_cred-" + bare_jid
+		orig.add_child(msg)
 
-			conn = Hiredis::Connection.new
-			conn.connect(ARGV[4], ARGV[5].to_i)
-			conn.write(["LINDEX", cred_key, 3])
-			existing_number = conn.read
-			conn.disconnect
+		return orig
+	end
 
-			msg = Nokogiri::XML::Node.new 'query', orig.document
-			msg['xmlns'] = 'jabber:iq:register'
+	iq '/iq/ns:query', ns: 'jabber:iq:register' do |i, qn|
+		puts "IQ: #{i.inspect}"
 
-			if existing_number
-				msg.add_child(
-					Nokogiri::XML::Node.new('registered', msg.document)
-				)
+		case i.type
+		when :set
+			process_registration(i, qn)
+		when :get
+			bare_jid = i.from.stripped
+			cred_key = "catapult_cred-#{bare_jid}"
+			REDIS.lindex(cred_key, 3).then { |existing_number|
+				reply = registration_form(i.reply, existing_number)
+				puts "RESPONSE2: #{reply.inspect}"
+				write_to_stream reply
+			}
+		else
+			# Unknown IQ, ignore for now
+			EMPromise.reject(:done)
+		end.catch { |e|
+			if e.is_a?(Array) && e.length == 2
+				write_to_stream error_msg(i.reply, qn, *e)
+			elsif e != :done
+				EMPromise.reject(e)
 			end
-
-			n1 = Nokogiri::XML::Node.new 'instructions', msg.document
-			n1.content= "Enter the information from your Account "\
-				"page as well as the Phone Number\nin your "\
-				"account you want to use (ie. '+12345678901')"\
-				".\nUser Id is nick, API Token is username, "\
-				"API Secret is password, Phone Number is phone"\
-				".\n\nThe source code for this gateway is at "\
-				"https://gitlab.com/ossguy/sgx-catapult ."\
-				"\nCopyright (C) 2017  Denver Gingerich and "\
-				"others, licensed under AGPLv3+."
-			n2 = Nokogiri::XML::Node.new 'nick', msg.document
-			n3 = Nokogiri::XML::Node.new 'username', msg.document
-			n4 = Nokogiri::XML::Node.new 'password', msg.document
-			n5 = Nokogiri::XML::Node.new 'phone', msg.document
-			n5.content = existing_number.to_s
-			msg.add_child(n1)
-			msg.add_child(n2)
-			msg.add_child(n3)
-			msg.add_child(n4)
-			msg.add_child(n5)
-
-			x = Blather::Stanza::X.new :form, [
-				{
-					required: true, type: :"text-single",
-					label: 'User Id', var: 'nick'
-				},
-				{
-					required: true, type: :"text-single",
-					label: 'API Token', var: 'username'
-				},
-				{
-					required: true, type: :"text-private",
-					label: 'API Secret', var: 'password'
-				},
-				{
-					required: true, type: :"text-single",
-					label: 'Phone Number', var: 'phone',
-					value: existing_number.to_s
-				}
-			]
-			x.title= 'Register for '\
-				'Soprani.ca Gateway to XMPP - Catapult'
-			x.instructions= "Enter the details from your Account "\
-				"page as well as the Phone Number\nin your "\
-				"account you want to use (ie. '+12345678901')"\
-				".\n\nThe source code for this gateway is at "\
-				"https://gitlab.com/ossguy/sgx-catapult ."\
-				"\nCopyright (C) 2017  Denver Gingerich and "\
-				"others, licensed under AGPLv3+."
-			msg.add_child(x)
-
-			orig.add_child(msg)
-			puts "RESPONSE2: #{orig.inspect}"
-			write_to_stream orig
-			puts "SENT"
-		end
+		}.catch(&method(:panic))
 	end
 
 	subscription(:request?) do |s|