diff --git a/.rubocop.yml b/.rubocop.yml index 41481b07ca04fd99056d8c06d984452221235343..4fa0adeeceaa2e351d503c9d1df1557e0fc41839 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -35,7 +35,7 @@ Metrics/ModuleLength: Max: 1000 Metrics/ParameterLists: - Max: 6 + Max: 7 Metrics/PerceivedComplexity: Max: 20 @@ -93,3 +93,15 @@ Style/NegatedIf: Style/RedundantReturn: Enabled: false + +Style/MultilineBlockChain: + Enabled: false + +Style/SpaceAroundEqualsInParameterDefault: + EnforcedStyle: no_space + +Style/IndentArray: + EnforcedStyle: consistent + +Style/FirstParameterIndentation: + EnforcedStyle: consistent diff --git a/Gemfile b/Gemfile index 5006d96e1c8d0d1ab0d09ea0f83b3cfc179af34d..9fe5c18de0dd255875a97aced5421f1196938efe 100644 --- a/Gemfile +++ b/Gemfile @@ -2,8 +2,11 @@ source 'https://rubygems.org' gem 'activesupport', '<5.0.0' gem 'blather' +gem 'em-http-request' gem 'eventmachine', '1.0.0' +gem 'promise.rb' +gem 'em-hiredis' gem 'hiredis', '~> 0.6.0' gem 'redis', '>= 3.2.0' diff --git a/em_promise.rb b/em_promise.rb new file mode 100644 index 0000000000000000000000000000000000000000..cdd84305de927652bd0b738c618348305e9ab4b3 --- /dev/null +++ b/em_promise.rb @@ -0,0 +1,50 @@ +require "eventmachine" +require "promise" + +class EMPromise < Promise + def initialize(deferrable=nil) + super() + fulfill(deferrable) if deferrable + end + + def fulfill(value, bind_defer=true) + if bind_defer && value.is_a?(EM::Deferrable) + value.callback { |x| fulfill(x, false) } + value.errback(&method(:reject)) + else + super(value) + end + end + + def defer + EM.next_tick { yield } + end + + def wait + fiber = Fiber.current + resume = proc do |arg| + defer { fiber.resume(arg) } + end + + self.then(resume, resume) + Fiber.yield + end + + def self.reject(e) + new.tap { |promise| promise.reject(e) } + end +end + +module EventMachine + module Deferrable + def promise + EMPromise.new(self) + end + + [:then, :rescue, :catch].each do |method| + define_method(method) do |*args, &block| + promise.public_send(method, *args, &block) + end + end + end +end diff --git a/sgx-catapult.rb b/sgx-catapult.rb index 68efdd4cfdd3830c035dc2369146c1b23c705b05..7f4ec70857de10d6b693bb71465184ece8f0b835 100755 --- a/sgx-catapult.rb +++ b/sgx-catapult.rb @@ -19,6 +19,8 @@ # with sgx-catapult. If not, see . require 'blather/client/dsl' +require 'em-hiredis' +require 'em-http-request' require 'json' require 'net/http' require 'redis/connection/hiredis' @@ -31,6 +33,8 @@ require 'goliath/api' require 'goliath/server' require 'log4r' +require_relative 'em_promise' + $stdout.sync = true puts "Soprani.ca/SMS Gateway for XMPP - Catapult\n"\ @@ -47,6 +51,19 @@ end t = Time.now puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec] +def panic(e) + puts "Shutting down gateway due to exception: #{e.message}" + puts e.backtrace + SGXcatapult.shutdown + puts 'Gateway has terminated.' + EM.stop +end + +def extract_shortcode(dest) + num, context = dest.split(';', 2) + num if context && context == 'phone-context=ca-us.phone-context.soprani.ca' +end + module SGXcatapult extend Blather::DSL @@ -63,7 +80,7 @@ module SGXcatapult client.write(stanza) end - def self.error_msg(orig, query_node, type, name, text = nil) + def self.error_msg(orig, query_node, type, name, text=nil) if not query_node.nil? orig.add_child(query_node) orig.type = :error @@ -90,125 +107,140 @@ module SGXcatapult setup ARGV[0], ARGV[1], ARGV[2], ARGV[3] - message :chat?, :body do |m| - begin - num_dest = m.to.to_s.split('@', 2)[0] - - if num_dest[0] != '+' - # check to see if a valid shortcode context is specified - num_and_context = num_dest.split(';', 2) - if num_and_context[1] and num_and_context[1] == - 'phone-context=ca-us.phone-context.soprani.ca' - - # TODO: check if num_dest is fully numeric - num_dest = num_and_context[0] - else - # TODO: text re num not (yet) supportd/implmentd - write_to_stream error_msg( - m.reply, m.body, - :cancel, 'item-not-found' - ) - next - end - end - - bare_jid = m.from.to_s.split('/', 2)[0] - cred_key = "catapult_cred-" + bare_jid - - conn = Hiredis::Connection.new - conn.connect(ARGV[4], ARGV[5].to_i) - - conn.write ["EXISTS", cred_key] - if conn.read == 0 - conn.disconnect - - # TODO: add text re credentials not being registered - write_to_stream error_msg( - m.reply, m.body, :auth, - 'registration-required' - ) - next - end + def self.pass_on_message(m, users_num, jid) + # setup delivery receipt; similar to a reply + rcpt = ReceiptMessage.new(m.from.stripped) + rcpt.from = m.to - conn.write ["LRANGE", cred_key, 0, 3] - user_id, api_token, api_secret, users_num = conn.read + # pass original message (before sending receipt) + m.to = jid + m.from = "#{users_num}@#{ARGV[0]}" - # if the destination user is in the system just pass on directly - jid_key = "catapult_jid-" + num_dest - conn.write ["EXISTS", jid_key] - if conn.read > 0 - # setup delivery receipt; sort of a reply but not quite - rcpt = ReceiptMessage.new(bare_jid) - rcpt.from = m.to + puts 'XRESPONSE0: ' + m.inspect + write_to_stream m - # pass on the original message (before sending receipt) - conn.write ["GET", jid_key] - m.to = conn.read + # send a delivery receipt back to the sender + # TODO: send only when requested per XEP-0184 + # TODO: pass receipts from target if supported - m.from = users_num + '@' + ARGV[0] + # TODO: put in member/instance variable + rcpt['id'] = SecureRandom.uuid + rcvd = Nokogiri::XML::Node.new 'received', rcpt.document + rcvd['xmlns'] = 'urn:xmpp:receipts' + rcvd['id'] = m.id + rcpt.add_child(rcvd) - puts 'XRESPONSE0: ' + m.inspect - write_to_stream m - - # send a delivery receipt back to the sender - # TODO: send only when requested per XEP-0184 - - # TODO: put in member/instance variable - rcpt['id'] = SecureRandom.uuid - rcvd = Nokogiri::XML::Node.new 'received', rcpt.document - rcvd['xmlns'] = 'urn:xmpp:receipts' - rcvd['id'] = m.id - rcpt.add_child(rcvd) + puts 'XRESPONSE1: ' + rcpt.inspect + write_to_stream rcpt + end - puts 'XRESPONSE1: ' + rcpt.inspect - write_to_stream rcpt + def self.call_catapult(token, secret, m, pth, body, head={}, code=[200]) + EM::HttpRequest.new( + "https://api.catapult.inetwork.com/#{pth}" + ).public_send( + m, + head: { + 'Authorization' => [token, secret] + }.merge(head), + body: body + ).then { |http| + puts "API response to send: #{http.response} with code"\ + " response.code #{http.response_header.status}" + + if code.include?(http.response_header.status) + http.response + else + # TODO: add text; mention code number + EMPromise.reject( + [:cancel, 'internal-server-error'] + ) + end + } + end - conn.disconnect - next + def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern) + extra = if murl + { + media: murl + } + else + { + receiptRequested: 'all', + callbackUrl: ARGV[6] + } end - conn.disconnect - - uri = URI.parse('https://api.catapult.inetwork.com') - http = Net::HTTP.new(uri.host, uri.port) - http.use_ssl = true - request = Net::HTTP::Post.new('/v1/users/' + user_id + - '/messages') - request.basic_auth api_token, api_secret - request.add_field('Content-Type', 'application/json') - request.body = JSON.dump( - 'from' => users_num, - 'to' => num_dest, - 'text' => m.body, - 'tag' => - # callbacks need both the id and resourcepart - WEBrick::HTTPUtils.escape(m.id.to_s) + ' ' + - WEBrick::HTTPUtils.escape( - m.from.to_s.split('/', 2)[1].to_s - ), - 'receiptRequested' => 'all', - 'callbackUrl' => ARGV[6] + call_catapult( + token, + secret, + :post, + "v1/users/#{user_id}/messages", + JSON.dump(extra.merge( + from: usern, + to: num_dest, + text: s.respond_to?(:body) ? s.body : '', + tag: + # callbacks need id and resourcepart + WEBrick::HTTPUtils.escape(s.id.to_s) + + ' ' + + WEBrick::HTTPUtils.escape( + s.from.resource.to_s + ) + )), + {'Content-Type' => 'application/json'}, + [201] ) - response = http.request(request) - - puts 'API response to send: ' + response.to_s + ' with code ' + - response.code + ', body "' + response.body + '"' + end - if response.code != '201' - # TODO: add text re unexpected code; mention code number - write_to_stream error_msg( - m.reply, m.body, :cancel, - 'internal-server-error' - ) - next - end + def self.validate_num(num) + EMPromise.resolve(num.to_s).then { |num_dest| + if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/ + next num_dest if num_dest[0] == '+' + shortcode = extract_shortcode(num_dest) + next shortcode if shortcode + end + # TODO: text re num not (yet) supportd/implmentd + EMPromise.reject([:cancel, 'item-not-found']) + } + end - rescue Exception => e - puts 'Shutting down gateway due to exception 001: ' + e.message - SGXcatapult.shutdown - puts 'Gateway has terminated.' - EM.stop + def self.fetch_catapult_cred_for(jid) + cred_key = "catapult_cred-#{jid.stripped}" + REDIS.lrange(cred_key, 0, 3).then { |creds| + if creds.length < 4 + # TODO: add text re credentials not registered + EMPromise.reject( + [:auth, 'registration-required'] + ) + else + creds + end + } end + + message :chat?, :body do |m| + EMPromise.all([ + validate_num(m.to.node), + fetch_catapult_cred_for(m.from) + ]).then { |(num_dest, creds)| + jid_key = "catapult_jid-#{num_dest}" + REDIS.get(jid_key).then { |jid| + [jid, num_dest] + creds + } + }.then { |(jid, num_dest, *creds)| + # if destination user is in the system pass on directly + if jid + pass_on_message(m, creds.last, jid) + else + to_catapult(m, nil, num_dest, *creds) + end + }.catch { |e| + if e.is_a?(Array) && e.length == 2 + write_to_stream error_msg(m.reply, m.body, *e) + else + EMPromise.reject(e) + end + }.catch(&method(:panic)) end def self.user_cap_identities @@ -431,158 +463,73 @@ module SGXcatapult end iq '/iq/ns:close', ns: 'http://jabber.org/protocol/ibb' do |i, cn| - begin puts "IQc: #{i.inspect}" write_to_stream i.reply - # TODO: refactor below so that "message :chat?" uses same code - num_dest = i.to.to_s.split('@', 2)[0] - - if num_dest[0] != '+' - # check to see if a valid shortcode context is specified - num_and_context = num_dest.split(';', 2) - if num_and_context[1] and num_and_context[1] == - 'phone-context=ca-us.phone-context.soprani.ca' - - # TODO: check if num_dest is fully numeric - num_dest = num_and_context[0] - else - # TODO: text re num not (yet) supportd/implmentd - write_to_stream error_msg( - i.reply, nil, - :cancel, 'item-not-found' - ) - next + EMPromise.all([ + validate_num(i.to.node), + fetch_catapult_cred_for(i.from) + ]).then { |(num_dest, creds)| + # Gajim bug: has Jingle (not transport) sid; fix later + if not @jingle_fnames.key? cn[0]['sid'] + puts 'ERROR: Not found in filename map: ' + cn[0]['sid'] + + next EMPromise.reject(:done) + # TODO: in case only Gajim's bug fixed, add map: + #cn[0]['sid'] = @jingle_tsids[cn[0]['sid']] end - end - - bare_jid = i.from.to_s.split('/', 2)[0] - cred_key = "catapult_cred-" + bare_jid - - # TODO: connect at start of program instead - conn = Hiredis::Connection.new - conn.connect(ARGV[4], ARGV[5].to_i) - - conn.write ["EXISTS", cred_key] - if conn.read == 0 - conn.disconnect - - # TODO: add text re credentials not being registered - write_to_stream error_msg( - i.reply, nil, :auth, - 'registration-required' - ) - next - end - conn.write ["LRANGE", cred_key, 0, 3] - user_id, api_token, api_secret, users_num = conn.read - conn.disconnect - - # Gajim bug: has Jingle (not transport) sid; fix later - if not @jingle_fnames.key? cn[0]['sid'] - puts 'ERROR: Not found in filename map: ' + cn[0]['sid'] - - next - # TODO: in case only Gajim's bug fixed, add map: - #cn[0]['sid'] = @jingle_tsids[cn[0]['sid']] - end + # upload cached data to server (before success reply) + media_name = + "#{Time.now.utc.iso8601}_#{SecureRandom.uuid}"\ + "_#{@jingle_fnames[cn[0]['sid']]}" + puts 'name to save: ' + media_name - # upload cached data to server (before success reply) - media_name = - "#{Time.now.utc.iso8601}_#{SecureRandom.uuid}"\ - "_#{@jingle_fnames[cn[0]['sid']]}" - puts 'name to save: ' + media_name - - uri = URI.parse('https://api.catapult.inetwork.com') - http = Net::HTTP.new(uri.host, uri.port) - http.use_ssl = true - request = Net::HTTP::Put.new('/v1/users/' + user_id + - '/media/' + media_name) - request.basic_auth api_token, api_secret - request.body = @partial_data[cn[0]['sid']] - response = http.request(request) - - puts 'eAPI response to send: ' + response.to_s + ' with code ' + - response.code + ', body "' + response.body + '"' - - if response.code != '200' - # TODO: add text re unexpected code; mention code number - write_to_stream error_msg( - i.reply, nil, :cancel, - 'internal-server-error' - ) - next - end + path = "/v1/users/#{creds.first}/media/#{media_name}" - uri = URI.parse('https://api.catapult.inetwork.com') - http = Net::HTTP.new(uri.host, uri.port) - http.use_ssl = true - request = Net::HTTP::Post.new('/v1/users/' + user_id + - '/messages') - request.basic_auth api_token, api_secret - request.add_field('Content-Type', 'application/json') - request.body = JSON.dump( - 'from' => users_num, - 'to' => num_dest, - 'text' => '', - 'media' => [ - 'https://api.catapult.inetwork.com/v1/users/' + - user_id + '/media/' + media_name - ], - 'tag' => - # callbacks need both the id and resourcepart - WEBrick::HTTPUtils.escape(i.id.to_s) + ' ' + - WEBrick::HTTPUtils.escape( - i.from.to_s.split('/', 2)[1].to_s + EMPromise.all([ + call_catapult( + *creds[1..2], + :put, + path, + @partial_data[cn[0]['sid']] + ), + to_catapult( + i, + "https://api.catapult.inetwork.com/" + + path, + num_dest, + *creds ) - # TODO: add back when Bandwidth AP supports it (?); now: - # "The ''messages'' resource property - # ''receiptRequested'' is not supported for MMS" - #'receiptRequested' => 'all', - #'callbackUrl' => ARGV[6] - ) - response = http.request(request) - - puts 'mAPI response to send: ' + response.to_s + ' with code ' + - response.code + ', body "' + response.body + '"' - - if response.code != '201' - # TODO: add text re unexpected code; mention code number - write_to_stream error_msg( - i.reply, nil, :cancel, - 'internal-server-error' - ) - next - end - - @partial_data[cn[0]['sid']] = '' - - # received the complete file so now close the stream - msg = Blather::Stanza::Iq.new :set - msg.to = i.from - msg.from = i.to - - j = Nokogiri::XML::Node.new 'jingle', msg.document - j['xmlns'] = 'urn:xmpp:jingle:1' - j['action'] = 'session-terminate' - j['sid'] = @jingle_sids[cn[0]['sid']] - msg.add_child(j) - - r = Nokogiri::XML::Node.new 'reason', msg.document - s = Nokogiri::XML::Node.new 'success', msg.document - r.add_child(s) - j.add_child(r) - - puts 'RESPONSE1: ' + msg.inspect - write_to_stream msg - - rescue Exception => e - puts 'Shutting down gateway due to exception 007: ' + e.message - SGXcatapult.shutdown - puts 'Gateway has terminated.' - EM.stop - end + ]) + }.then { + @partial_data[cn[0]['sid']] = '' + + # received the complete file so now close the stream + msg = Blather::Stanza::Iq.new :set + msg.to = i.from + msg.from = i.to + + j = Nokogiri::XML::Node.new 'jingle', msg.document + j['xmlns'] = 'urn:xmpp:jingle:1' + j['action'] = 'session-terminate' + j['sid'] = @jingle_sids[cn[0]['sid']] + msg.add_child(j) + + r = Nokogiri::XML::Node.new 'reason', msg.document + s = Nokogiri::XML::Node.new 'success', msg.document + r.add_child(s) + j.add_child(r) + + puts 'RESPONSE1: ' + msg.inspect + write_to_stream msg + }.catch { |e| + if e.is_a?(Array) && e.length == 2 + write_to_stream error_msg(i.reply, nil, *e) + elsif e != :done + EMPromise.reject(e) + end + }.catch(&method(:panic)) end iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#items' do |i| @@ -950,7 +897,7 @@ end end class ReceiptMessage < Blather::Stanza - def self.new(to = nil) + def self.new(to=nil) node = super :message node.to = to node @@ -1013,21 +960,16 @@ class WebhookHandler < Goliath::API return [200, {}, "OK"] end - jid_key = "catapult_jid-" + users_num - if others_num[0] != '+' # TODO: check that others_num actually a shortcode first others_num += ';phone-context=ca-us.phone-context.soprani.ca' end - conn = Hiredis::Connection.new - conn.connect(ARGV[4], ARGV[5].to_i) - - conn.write ["EXISTS", jid_key] - if conn.read == 0 - conn.disconnect + jid_key = "catapult_jid-#{users_num}" + bare_jid = REDIS.get(jid_key).promise.sync + if !bare_jid puts "jid_key (#{jid_key}) DNE; Catapult misconfigured?" # TODO: likely not appropriate; give error to Catapult? @@ -1037,10 +979,6 @@ class WebhookHandler < Goliath::API return [200, {}, "OK"] end - conn.write ["GET", jid_key] - bare_jid = conn.read - conn.disconnect - msg = '' case params['direction'] when 'in' @@ -1161,6 +1099,8 @@ class WebhookHandler < Goliath::API end EM.run do + REDIS = EM::Hiredis.connect("redis://#{ARGV[4]}:#{ARGV[5]}/0") + SGXcatapult.run # required when using Prosody otherwise disconnects on 6-hour inactivity