From 429837b61087e829089101c480639b32de311281 Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Thu, 16 Mar 2017 18:15:41 -0500 Subject: [PATCH 1/4] First HTTP request using em-http --- .rubocop.yml | 6 ++++ Gemfile | 2 ++ em_promise.rb | 40 +++++++++++++++++++++++++ sgx-catapult.rb | 79 ++++++++++++++++++++++++++++--------------------- 4 files changed, 93 insertions(+), 34 deletions(-) create mode 100644 em_promise.rb diff --git a/.rubocop.yml b/.rubocop.yml index 41481b07ca04fd99056d8c06d984452221235343..574c2129dc91242ea4495a1ebb525a14e2efc028 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -93,3 +93,9 @@ Style/NegatedIf: Style/RedundantReturn: Enabled: false + +Style/MultilineBlockChain: + Enabled: false + +Style/SpaceAroundEqualsInParameterDefault: + EnforcedStyle: no_space diff --git a/Gemfile b/Gemfile index 5006d96e1c8d0d1ab0d09ea0f83b3cfc179af34d..1886af41a6dcc06667a0174497d3bfd85a48418e 100644 --- a/Gemfile +++ b/Gemfile @@ -2,7 +2,9 @@ source 'https://rubygems.org' gem 'activesupport', '<5.0.0' gem 'blather' +gem 'em-http-request' gem 'eventmachine', '1.0.0' +gem 'promise.rb' gem 'hiredis', '~> 0.6.0' gem 'redis', '>= 3.2.0' diff --git a/em_promise.rb b/em_promise.rb new file mode 100644 index 0000000000000000000000000000000000000000..96e25425d4a4de9fbfa8289f0fb1a88b8a3f6498 --- /dev/null +++ b/em_promise.rb @@ -0,0 +1,40 @@ +require "eventmachine" +require "promise" + +class EMPromise < Promise + def initialize(deferrable=nil) + super() + fulfill(deferrable) if deferrable + end + + def fulfill(value, bind_defer=true) + if bind_defer && value.is_a?(EM::Deferrable) + value.callback { |x| fulfill(x, false) } + value.errback(&method(:reject)) + else + super(value) + end + end + + def defer + EM.next_tick { yield } + end + + def self.reject(e) + new.tap { |promise| promise.reject(e) } + end +end + +module EventMachine + module Deferrable + def promise + EMPromise.new(self) + end + + [:then, :rescue, :catch].each do |method| + define_method(method) do |*args, &block| + promise.public_send(method, *args, &block) + end + end + end +end diff --git a/sgx-catapult.rb b/sgx-catapult.rb index 68efdd4cfdd3830c035dc2369146c1b23c705b05..d90f43ca4319e473b10b57c7b2fe072d2bbd9931 100755 --- a/sgx-catapult.rb +++ b/sgx-catapult.rb @@ -19,6 +19,7 @@ # with sgx-catapult. If not, see . require 'blather/client/dsl' +require 'em-http-request' require 'json' require 'net/http' require 'redis/connection/hiredis' @@ -31,6 +32,8 @@ require 'goliath/api' require 'goliath/server' require 'log4r' +require_relative 'em_promise' + $stdout.sync = true puts "Soprani.ca/SMS Gateway for XMPP - Catapult\n"\ @@ -47,6 +50,14 @@ end t = Time.now puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec] +def panic(e) + puts "Shutting down gateway due to exception: #{e.message}" + puts e.backtrace + SGXcatapult.shutdown + puts 'Gateway has terminated.' + EM.stop +end + module SGXcatapult extend Blather::DSL @@ -63,7 +74,7 @@ module SGXcatapult client.write(stanza) end - def self.error_msg(orig, query_node, type, name, text = nil) + def self.error_msg(orig, query_node, type, name, text=nil) if not query_node.nil? orig.add_child(query_node) orig.type = :error @@ -169,39 +180,39 @@ module SGXcatapult conn.disconnect - uri = URI.parse('https://api.catapult.inetwork.com') - http = Net::HTTP.new(uri.host, uri.port) - http.use_ssl = true - request = Net::HTTP::Post.new('/v1/users/' + user_id + - '/messages') - request.basic_auth api_token, api_secret - request.add_field('Content-Type', 'application/json') - request.body = JSON.dump( - 'from' => users_num, - 'to' => num_dest, - 'text' => m.body, - 'tag' => - # callbacks need both the id and resourcepart - WEBrick::HTTPUtils.escape(m.id.to_s) + ' ' + - WEBrick::HTTPUtils.escape( - m.from.to_s.split('/', 2)[1].to_s - ), - 'receiptRequested' => 'all', - 'callbackUrl' => ARGV[6] - ) - response = http.request(request) - - puts 'API response to send: ' + response.to_s + ' with code ' + - response.code + ', body "' + response.body + '"' - - if response.code != '201' - # TODO: add text re unexpected code; mention code number - write_to_stream error_msg( - m.reply, m.body, :cancel, - 'internal-server-error' + EM::HttpRequest.new( + "https://api.catapult.inetwork.com/"\ + "v1/users/#{user_id}/messages" + ).post( + head: { + 'Authorization' => [api_token, api_secret], + 'Content-Type' => 'application/json' + }, + body: JSON.dump( + from: users_num, + to: num_dest, + text: m.body, + tag: + # callbacks need both the id and resourcepart + WEBrick::HTTPUtils.escape(m.id.to_s) + ' ' + + WEBrick::HTTPUtils.escape( + m.from.to_s.split('/', 2)[1].to_s + ), + receiptRequested: 'all', + callbackUrl: ARGV[6] ) - next - end + ).then { |http| + puts "API response to send: #{http.response} with code "\ + "response.code #{http.response_header.status}" + + if http.response_header.status != 201 + # TODO: add text re unexpected code; mention code number + write_to_stream error_msg( + m.reply, m.body, :cancel, + 'internal-server-error' + ) + end + }.catch(&method(:panic)) rescue Exception => e puts 'Shutting down gateway due to exception 001: ' + e.message @@ -950,7 +961,7 @@ end end class ReceiptMessage < Blather::Stanza - def self.new(to = nil) + def self.new(to=nil) node = super :message node.to = to node From a35693b9f499870172fce2d755341568cce64963 Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Thu, 16 Mar 2017 19:34:16 -0500 Subject: [PATCH 2/4] Refactor the whole chat handler --- Gemfile | 1 + sgx-catapult.rb | 172 +++++++++++++++++++++++------------------------- 2 files changed, 83 insertions(+), 90 deletions(-) diff --git a/Gemfile b/Gemfile index 1886af41a6dcc06667a0174497d3bfd85a48418e..9fe5c18de0dd255875a97aced5421f1196938efe 100644 --- a/Gemfile +++ b/Gemfile @@ -6,6 +6,7 @@ gem 'em-http-request' gem 'eventmachine', '1.0.0' gem 'promise.rb' +gem 'em-hiredis' gem 'hiredis', '~> 0.6.0' gem 'redis', '>= 3.2.0' diff --git a/sgx-catapult.rb b/sgx-catapult.rb index d90f43ca4319e473b10b57c7b2fe072d2bbd9931..c24f8686775db036a36e24eeb31daf1f64f3748b 100755 --- a/sgx-catapult.rb +++ b/sgx-catapult.rb @@ -19,6 +19,7 @@ # with sgx-catapult. If not, see . require 'blather/client/dsl' +require 'em-hiredis' require 'em-http-request' require 'json' require 'net/http' @@ -58,6 +59,11 @@ def panic(e) EM.stop end +def extract_shortcode(dest) + num, context = dest.split(';', 2) + num if context && context == 'phone-context=ca-us.phone-context.soprani.ca' +end + module SGXcatapult extend Blather::DSL @@ -101,91 +107,40 @@ module SGXcatapult setup ARGV[0], ARGV[1], ARGV[2], ARGV[3] - message :chat?, :body do |m| - begin - num_dest = m.to.to_s.split('@', 2)[0] - - if num_dest[0] != '+' - # check to see if a valid shortcode context is specified - num_and_context = num_dest.split(';', 2) - if num_and_context[1] and num_and_context[1] == - 'phone-context=ca-us.phone-context.soprani.ca' - - # TODO: check if num_dest is fully numeric - num_dest = num_and_context[0] - else - # TODO: text re num not (yet) supportd/implmentd - write_to_stream error_msg( - m.reply, m.body, - :cancel, 'item-not-found' - ) - next - end - end - - bare_jid = m.from.to_s.split('/', 2)[0] - cred_key = "catapult_cred-" + bare_jid - - conn = Hiredis::Connection.new - conn.connect(ARGV[4], ARGV[5].to_i) - - conn.write ["EXISTS", cred_key] - if conn.read == 0 - conn.disconnect - - # TODO: add text re credentials not being registered - write_to_stream error_msg( - m.reply, m.body, :auth, - 'registration-required' - ) - next - end - - conn.write ["LRANGE", cred_key, 0, 3] - user_id, api_token, api_secret, users_num = conn.read - - # if the destination user is in the system just pass on directly - jid_key = "catapult_jid-" + num_dest - conn.write ["EXISTS", jid_key] - if conn.read > 0 - # setup delivery receipt; sort of a reply but not quite - rcpt = ReceiptMessage.new(bare_jid) - rcpt.from = m.to - - # pass on the original message (before sending receipt) - conn.write ["GET", jid_key] - m.to = conn.read - - m.from = users_num + '@' + ARGV[0] + def self.pass_on_message(m, users_num, jid) + # setup delivery receipt; similar to a reply + rcpt = ReceiptMessage.new(m.from.stripped) + rcpt.from = m.to - puts 'XRESPONSE0: ' + m.inspect - write_to_stream m + # pass original message (before sending receipt) + m.to = jid + m.from = "#{users_num}@#{ARGV[0]}" - # send a delivery receipt back to the sender - # TODO: send only when requested per XEP-0184 + puts 'XRESPONSE0: ' + m.inspect + write_to_stream m - # TODO: put in member/instance variable - rcpt['id'] = SecureRandom.uuid - rcvd = Nokogiri::XML::Node.new 'received', rcpt.document - rcvd['xmlns'] = 'urn:xmpp:receipts' - rcvd['id'] = m.id - rcpt.add_child(rcvd) + # send a delivery receipt back to the sender + # TODO: send only when requested per XEP-0184 + # TODO: pass receipts from target if supported - puts 'XRESPONSE1: ' + rcpt.inspect - write_to_stream rcpt + # TODO: put in member/instance variable + rcpt['id'] = SecureRandom.uuid + rcvd = Nokogiri::XML::Node.new 'received', rcpt.document + rcvd['xmlns'] = 'urn:xmpp:receipts' + rcvd['id'] = m.id + rcpt.add_child(rcvd) - conn.disconnect - next - end - - conn.disconnect + puts 'XRESPONSE1: ' + rcpt.inspect + write_to_stream rcpt + end + def self.to_catapult(m, num_dest, user_id, token, secret, users_num) EM::HttpRequest.new( "https://api.catapult.inetwork.com/"\ "v1/users/#{user_id}/messages" ).post( head: { - 'Authorization' => [api_token, api_secret], + 'Authorization' => [token, secret], 'Content-Type' => 'application/json' }, body: JSON.dump( @@ -193,33 +148,68 @@ module SGXcatapult to: num_dest, text: m.body, tag: - # callbacks need both the id and resourcepart - WEBrick::HTTPUtils.escape(m.id.to_s) + ' ' + + # callbacks need id and resourcepart + WEBrick::HTTPUtils.escape(m.id.to_s) + + ' ' + WEBrick::HTTPUtils.escape( - m.from.to_s.split('/', 2)[1].to_s + m.from.resource.to_s ), receiptRequested: 'all', callbackUrl: ARGV[6] ) ).then { |http| - puts "API response to send: #{http.response} with code "\ - "response.code #{http.response_header.status}" + puts "API response to send: #{http.response} with code"\ + " response.code #{http.response_header.status}" if http.response_header.status != 201 - # TODO: add text re unexpected code; mention code number - write_to_stream error_msg( - m.reply, m.body, :cancel, - 'internal-server-error' + # TODO: add text; mention code number + EMPromise.reject( + [:cancel, 'internal-server-error'] ) end - }.catch(&method(:panic)) - - rescue Exception => e - puts 'Shutting down gateway due to exception 001: ' + e.message - SGXcatapult.shutdown - puts 'Gateway has terminated.' - EM.stop + } end + + message :chat?, :body do |m| + EMPromise.resolve(m.to.node.to_s).then { |num_dest| + if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/ + next num_dest if num_dest[0] == '+' + shortcode = extract_shortcode(num_dest) + next shortcode if shortcode + end + # TODO: text re num not (yet) supportd/implmentd + EMPromise.reject([:cancel, 'item-not-found']) + }.then { |num_dest| + cred_key = "catapult_cred-#{m.from.stripped}" + REDIS.lrange(cred_key, 0, 3).then { |creds| + [num_dest] + creds + } + }.then { |(num_dest, *creds)| + if creds.length < 4 + # TODO: add text re credentials not registered + EMPromise.reject( + [:auth, 'registration-required'] + ) + else + jid_key = "catapult_jid-#{num_dest}" + REDIS.get(jid_key).then { |jid| + [jid, num_dest] + creds + } + end + }.then { |(jid, num_dest, *creds)| + # if destination user is in the system pass on directly + if jid + pass_on_message(m, creds.last, jid) + else + to_catapult(m, num_dest, *creds) + end + }.catch { |e| + if e.is_a?(Array) && e.length == 2 + write_to_stream error_msg(m.reply, m.body, *e) + else + EMPromise.reject(e) + end + }.catch(&method(:panic)) end def self.user_cap_identities @@ -1172,6 +1162,8 @@ class WebhookHandler < Goliath::API end EM.run do + REDIS = EM::Hiredis.connect("redis://#{ARGV[4]}:#{ARGV[5]}/0") + SGXcatapult.run # required when using Prosody otherwise disconnects on 6-hour inactivity From dd7e711b77c0726c4dcfac7724db0a85fcc0f54a Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Sun, 19 Mar 2017 19:50:10 -0500 Subject: [PATCH 3/4] Refactor the ibb file receive as well --- .rubocop.yml | 8 +- sgx-catapult.rb | 308 ++++++++++++++++++++---------------------------- 2 files changed, 134 insertions(+), 182 deletions(-) diff --git a/.rubocop.yml b/.rubocop.yml index 574c2129dc91242ea4495a1ebb525a14e2efc028..4fa0adeeceaa2e351d503c9d1df1557e0fc41839 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -35,7 +35,7 @@ Metrics/ModuleLength: Max: 1000 Metrics/ParameterLists: - Max: 6 + Max: 7 Metrics/PerceivedComplexity: Max: 20 @@ -99,3 +99,9 @@ Style/MultilineBlockChain: Style/SpaceAroundEqualsInParameterDefault: EnforcedStyle: no_space + +Style/IndentArray: + EnforcedStyle: consistent + +Style/FirstParameterIndentation: + EnforcedStyle: consistent diff --git a/sgx-catapult.rb b/sgx-catapult.rb index c24f8686775db036a36e24eeb31daf1f64f3748b..a3b26f0ab398ac015878c0691929c4089162b0bc 100755 --- a/sgx-catapult.rb +++ b/sgx-catapult.rb @@ -134,34 +134,22 @@ module SGXcatapult write_to_stream rcpt end - def self.to_catapult(m, num_dest, user_id, token, secret, users_num) + def self.call_catapult(token, secret, m, pth, body, head={}, code=[200]) EM::HttpRequest.new( - "https://api.catapult.inetwork.com/"\ - "v1/users/#{user_id}/messages" - ).post( + "https://api.catapult.inetwork.com/#{pth}" + ).public_send( + m, head: { - 'Authorization' => [token, secret], - 'Content-Type' => 'application/json' - }, - body: JSON.dump( - from: users_num, - to: num_dest, - text: m.body, - tag: - # callbacks need id and resourcepart - WEBrick::HTTPUtils.escape(m.id.to_s) + - ' ' + - WEBrick::HTTPUtils.escape( - m.from.resource.to_s - ), - receiptRequested: 'all', - callbackUrl: ARGV[6] - ) + 'Authorization' => [token, secret] + }.merge(head), + body: body ).then { |http| puts "API response to send: #{http.response} with code"\ " response.code #{http.response_header.status}" - if http.response_header.status != 201 + if code.include?(http.response_header.status) + http.response + else # TODO: add text; mention code number EMPromise.reject( [:cancel, 'internal-server-error'] @@ -170,8 +158,42 @@ module SGXcatapult } end - message :chat?, :body do |m| - EMPromise.resolve(m.to.node.to_s).then { |num_dest| + def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern) + extra = if murl + { + media: murl + } + else + { + receiptRequested: 'all', + callbackUrl: ARGV[6] + } + end + + call_catapult( + token, + secret, + :post, + "v1/users/#{user_id}/messages", + JSON.dump(extra.merge( + from: usern, + to: num_dest, + text: s.respond_to?(:body) ? s.body : '', + tag: + # callbacks need id and resourcepart + WEBrick::HTTPUtils.escape(s.id.to_s) + + ' ' + + WEBrick::HTTPUtils.escape( + s.from.resource.to_s + ) + )), + {'Content-Type' => 'application/json'}, + [201] + ) + end + + def self.validate_num(num) + EMPromise.resolve(num.to_s).then { |num_dest| if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/ next num_dest if num_dest[0] == '+' shortcode = extract_shortcode(num_dest) @@ -179,29 +201,38 @@ module SGXcatapult end # TODO: text re num not (yet) supportd/implmentd EMPromise.reject([:cancel, 'item-not-found']) - }.then { |num_dest| - cred_key = "catapult_cred-#{m.from.stripped}" - REDIS.lrange(cred_key, 0, 3).then { |creds| - [num_dest] + creds - } - }.then { |(num_dest, *creds)| + } + end + + def self.fetch_catapult_cred_for(jid) + cred_key = "catapult_cred-#{jid.stripped}" + REDIS.lrange(cred_key, 0, 3).then { |creds| if creds.length < 4 # TODO: add text re credentials not registered EMPromise.reject( [:auth, 'registration-required'] ) else - jid_key = "catapult_jid-#{num_dest}" - REDIS.get(jid_key).then { |jid| - [jid, num_dest] + creds - } + creds end + } + end + + message :chat?, :body do |m| + EMPromise.all([ + validate_num(m.to.node), + fetch_catapult_cred_for(m.from) + ]).then { |(num_dest, creds)| + jid_key = "catapult_jid-#{num_dest}" + REDIS.get(jid_key).then { |jid| + [jid, num_dest] + creds + } }.then { |(jid, num_dest, *creds)| # if destination user is in the system pass on directly if jid pass_on_message(m, creds.last, jid) else - to_catapult(m, num_dest, *creds) + to_catapult(m, nil, num_dest, *creds) end }.catch { |e| if e.is_a?(Array) && e.length == 2 @@ -432,158 +463,73 @@ module SGXcatapult end iq '/iq/ns:close', ns: 'http://jabber.org/protocol/ibb' do |i, cn| - begin puts "IQc: #{i.inspect}" write_to_stream i.reply - # TODO: refactor below so that "message :chat?" uses same code - num_dest = i.to.to_s.split('@', 2)[0] - - if num_dest[0] != '+' - # check to see if a valid shortcode context is specified - num_and_context = num_dest.split(';', 2) - if num_and_context[1] and num_and_context[1] == - 'phone-context=ca-us.phone-context.soprani.ca' - - # TODO: check if num_dest is fully numeric - num_dest = num_and_context[0] - else - # TODO: text re num not (yet) supportd/implmentd - write_to_stream error_msg( - i.reply, nil, - :cancel, 'item-not-found' - ) - next + EMPromise.all([ + validate_num(i.to.node), + fetch_catapult_cred_for(i.from) + ]).then { |(num_dest, creds)| + # Gajim bug: has Jingle (not transport) sid; fix later + if not @jingle_fnames.key? cn[0]['sid'] + puts 'ERROR: Not found in filename map: ' + cn[0]['sid'] + + next EMPromise.reject(:done) + # TODO: in case only Gajim's bug fixed, add map: + #cn[0]['sid'] = @jingle_tsids[cn[0]['sid']] end - end - - bare_jid = i.from.to_s.split('/', 2)[0] - cred_key = "catapult_cred-" + bare_jid - - # TODO: connect at start of program instead - conn = Hiredis::Connection.new - conn.connect(ARGV[4], ARGV[5].to_i) - - conn.write ["EXISTS", cred_key] - if conn.read == 0 - conn.disconnect - - # TODO: add text re credentials not being registered - write_to_stream error_msg( - i.reply, nil, :auth, - 'registration-required' - ) - next - end - - conn.write ["LRANGE", cred_key, 0, 3] - user_id, api_token, api_secret, users_num = conn.read - conn.disconnect - - # Gajim bug: has Jingle (not transport) sid; fix later - if not @jingle_fnames.key? cn[0]['sid'] - puts 'ERROR: Not found in filename map: ' + cn[0]['sid'] - - next - # TODO: in case only Gajim's bug fixed, add map: - #cn[0]['sid'] = @jingle_tsids[cn[0]['sid']] - end - - # upload cached data to server (before success reply) - media_name = - "#{Time.now.utc.iso8601}_#{SecureRandom.uuid}"\ - "_#{@jingle_fnames[cn[0]['sid']]}" - puts 'name to save: ' + media_name - - uri = URI.parse('https://api.catapult.inetwork.com') - http = Net::HTTP.new(uri.host, uri.port) - http.use_ssl = true - request = Net::HTTP::Put.new('/v1/users/' + user_id + - '/media/' + media_name) - request.basic_auth api_token, api_secret - request.body = @partial_data[cn[0]['sid']] - response = http.request(request) - - puts 'eAPI response to send: ' + response.to_s + ' with code ' + - response.code + ', body "' + response.body + '"' - - if response.code != '200' - # TODO: add text re unexpected code; mention code number - write_to_stream error_msg( - i.reply, nil, :cancel, - 'internal-server-error' - ) - next - end - uri = URI.parse('https://api.catapult.inetwork.com') - http = Net::HTTP.new(uri.host, uri.port) - http.use_ssl = true - request = Net::HTTP::Post.new('/v1/users/' + user_id + - '/messages') - request.basic_auth api_token, api_secret - request.add_field('Content-Type', 'application/json') - request.body = JSON.dump( - 'from' => users_num, - 'to' => num_dest, - 'text' => '', - 'media' => [ - 'https://api.catapult.inetwork.com/v1/users/' + - user_id + '/media/' + media_name - ], - 'tag' => - # callbacks need both the id and resourcepart - WEBrick::HTTPUtils.escape(i.id.to_s) + ' ' + - WEBrick::HTTPUtils.escape( - i.from.to_s.split('/', 2)[1].to_s + # upload cached data to server (before success reply) + media_name = + "#{Time.now.utc.iso8601}_#{SecureRandom.uuid}"\ + "_#{@jingle_fnames[cn[0]['sid']]}" + puts 'name to save: ' + media_name + + path = "/v1/users/#{creds.first}/media/#{media_name}" + + EMPromise.all([ + call_catapult( + *creds[1..2], + :put, + path, + @partial_data[cn[0]['sid']] + ), + to_catapult( + i, + "https://api.catapult.inetwork.com/" + + path, + num_dest, + *creds ) - # TODO: add back when Bandwidth AP supports it (?); now: - # "The ''messages'' resource property - # ''receiptRequested'' is not supported for MMS" - #'receiptRequested' => 'all', - #'callbackUrl' => ARGV[6] - ) - response = http.request(request) - - puts 'mAPI response to send: ' + response.to_s + ' with code ' + - response.code + ', body "' + response.body + '"' - - if response.code != '201' - # TODO: add text re unexpected code; mention code number - write_to_stream error_msg( - i.reply, nil, :cancel, - 'internal-server-error' - ) - next - end - - @partial_data[cn[0]['sid']] = '' - - # received the complete file so now close the stream - msg = Blather::Stanza::Iq.new :set - msg.to = i.from - msg.from = i.to - - j = Nokogiri::XML::Node.new 'jingle', msg.document - j['xmlns'] = 'urn:xmpp:jingle:1' - j['action'] = 'session-terminate' - j['sid'] = @jingle_sids[cn[0]['sid']] - msg.add_child(j) - - r = Nokogiri::XML::Node.new 'reason', msg.document - s = Nokogiri::XML::Node.new 'success', msg.document - r.add_child(s) - j.add_child(r) - - puts 'RESPONSE1: ' + msg.inspect - write_to_stream msg - - rescue Exception => e - puts 'Shutting down gateway due to exception 007: ' + e.message - SGXcatapult.shutdown - puts 'Gateway has terminated.' - EM.stop - end + ]) + }.then { + @partial_data[cn[0]['sid']] = '' + + # received the complete file so now close the stream + msg = Blather::Stanza::Iq.new :set + msg.to = i.from + msg.from = i.to + + j = Nokogiri::XML::Node.new 'jingle', msg.document + j['xmlns'] = 'urn:xmpp:jingle:1' + j['action'] = 'session-terminate' + j['sid'] = @jingle_sids[cn[0]['sid']] + msg.add_child(j) + + r = Nokogiri::XML::Node.new 'reason', msg.document + s = Nokogiri::XML::Node.new 'success', msg.document + r.add_child(s) + j.add_child(r) + + puts 'RESPONSE1: ' + msg.inspect + write_to_stream msg + }.catch { |e| + if e.is_a?(Array) && e.length == 2 + write_to_stream error_msg(i.reply, nil, *e) + elsif e != :done + EMPromise.reject(e) + end + }.catch(&method(:panic)) end iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#items' do |i| From 85aa080d0ce2beed5b9c0be5d439b8d2295488ff Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Sun, 19 Mar 2017 20:14:46 -0500 Subject: [PATCH 4/4] Refactor incoming callback redis code --- em_promise.rb | 10 ++++++++++ sgx-catapult.rb | 15 +++------------ 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/em_promise.rb b/em_promise.rb index 96e25425d4a4de9fbfa8289f0fb1a88b8a3f6498..cdd84305de927652bd0b738c618348305e9ab4b3 100644 --- a/em_promise.rb +++ b/em_promise.rb @@ -20,6 +20,16 @@ class EMPromise < Promise EM.next_tick { yield } end + def wait + fiber = Fiber.current + resume = proc do |arg| + defer { fiber.resume(arg) } + end + + self.then(resume, resume) + Fiber.yield + end + def self.reject(e) new.tap { |promise| promise.reject(e) } end diff --git a/sgx-catapult.rb b/sgx-catapult.rb index a3b26f0ab398ac015878c0691929c4089162b0bc..7f4ec70857de10d6b693bb71465184ece8f0b835 100755 --- a/sgx-catapult.rb +++ b/sgx-catapult.rb @@ -960,21 +960,16 @@ class WebhookHandler < Goliath::API return [200, {}, "OK"] end - jid_key = "catapult_jid-" + users_num - if others_num[0] != '+' # TODO: check that others_num actually a shortcode first others_num += ';phone-context=ca-us.phone-context.soprani.ca' end - conn = Hiredis::Connection.new - conn.connect(ARGV[4], ARGV[5].to_i) - - conn.write ["EXISTS", jid_key] - if conn.read == 0 - conn.disconnect + jid_key = "catapult_jid-#{users_num}" + bare_jid = REDIS.get(jid_key).promise.sync + if !bare_jid puts "jid_key (#{jid_key}) DNE; Catapult misconfigured?" # TODO: likely not appropriate; give error to Catapult? @@ -984,10 +979,6 @@ class WebhookHandler < Goliath::API return [200, {}, "OK"] end - conn.write ["GET", jid_key] - bare_jid = conn.read - conn.disconnect - msg = '' case params['direction'] when 'in'