diff --git a/Gemfile b/Gemfile index 66fb75010cbf1924bf27f734c630cb4b6796b012..a2d226e622437348ce074f8109fa343055bc55f1 100644 --- a/Gemfile +++ b/Gemfile @@ -15,7 +15,8 @@ gem 'ruby-bandwidth-iris', '~> 2.7' gem 'goliath' gem 'lazy_object' gem 'value_semantics' -gem 'log4r' +gem 'amazing_print' +gem 'ougai' gem 'multibases' gem 'multihashes' gem 'rack', '< 2' diff --git a/lib/background_log.rb b/lib/background_log.rb new file mode 100644 index 0000000000000000000000000000000000000000..e1f9e8d556dd07e9518246c8ad6bcbf642174944 --- /dev/null +++ b/lib/background_log.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +class BackgroundLog < IO + def initialize(io) + @io = io + @q = Queue.new + thread + at_exit do + @q << :done + thread.join + end + end + + def thread + @thread ||= Thread.new do + loop do + m = @q.pop + break if m == :done + + @io.write m + end + end + end + + def write(s) + @q << s + end +end diff --git a/sgx-bwmsgsv2.rb b/sgx-bwmsgsv2.rb index d6d9caf9998ab0fe0854b5f9b0f30eb7d0df4c33..d64e55168bcd32ce116b27939bb964a78e0e9b7a 100755 --- a/sgx-bwmsgsv2.rb +++ b/sgx-bwmsgsv2.rb @@ -34,7 +34,7 @@ require 'webrick' require 'goliath/api' require 'goliath/server' -require 'log4r' +require "ougai" require 'em_promise' require 'em-synchrony' @@ -46,6 +46,24 @@ require_relative 'lib/registration_repo' Sentry.init +require_relative 'lib/background_log' + +$stdout.sync = true +LOG = Ougai::Logger.new(ENV["ENV"] == "test" ? $stdout : BackgroundLog.new($stdout)) +LOG.level = ENV.fetch("LOG_LEVEL", "info") +LOG.formatter = Ougai::Formatters::Readable.new( + nil, + nil, + plain: !$stdout.isatty +) +Blather.logger = LOG +EM::Hiredis.logger = LOG + +Sentry.init do |config| + config.logger = LOG + config.breadcrumbs_logger = [:sentry_logger] +end + BADWORD_LIST = [ "marijuana", "psilocybin", @@ -140,16 +158,13 @@ MMS_MIME_TYPES = [ def panic(e) if e.is_a?(Exception) - puts "[#{Time.now.iso8601}] Shutting down gateway due to exception: #{e.message}" Sentry.capture_exception(e, hint: { background: false }) - puts e.backtrace else - puts "[#{Time.now.iso8601}] Shutting down gateway due to non-exception:" - p e Sentry.capture_message(e.to_s, hint: { background: false }) end + LOG.fatal("Shutting down gateway", e) SGXbwmsgsv2.shutdown - puts 'Gateway has terminated.' + LOG.info "Gateway has terminated" EM.stop end @@ -228,9 +243,7 @@ protected end def handle_error(scope, stanza, e) - puts "Error raised during #{scope.transaction_name}: #{e.class}" - puts e.message - puts e.backtrace + LOG.error("Error during #{scope.transaction_name}", e) Sentry.capture_exception(e) unless e.is_a?(Sentry::Error) scope.get_transaction&.set_status("internal_error") return if e.respond_to?(:replied?) && e.replied? @@ -275,13 +288,13 @@ module SGXbwmsgsv2 # we assume media_url is one of these (always the case so far): # https://messaging.bandwidth.com/api/v2/users/[u]/media/[path] - puts 'ORIG_URL: ' + media_url + LOG.debug("Original media URL", url: media_url) usr = to if media_url.start_with?('https://messaging.bandwidth.com/api/v2/users/') pth = media_url.split('/', 9)[8] # the caller must guarantee that 'to' is a bare JID media_url = ARGV[6] + WEBrick::HTTPUtils.escape(usr) + '/' + pth - puts 'PROX_URL: ' + media_url + LOG.debug("Proxied media URL", url: media_url) end msg = m ? m.copy : Blather::Stanza::Message.new(to, "") @@ -325,7 +338,6 @@ module SGXbwmsgsv2 m.to = jid m.from = "#{users_num}@#{ARGV[0]}" - puts 'XRESPONSE0: ' + m.inspect write_to_stream m # Emit pass-through event. Thru events don't capture a timestamp because XMPP @@ -352,7 +364,6 @@ module SGXbwmsgsv2 rcvd['id'] = m.id rcpt.add_child(rcvd) - puts 'XRESPONSE1: ' + rcpt.inspect write_to_stream rcpt end @@ -400,11 +411,11 @@ module SGXbwmsgsv2 usern) un = s.at("oob|x > oob|url", oob: "jabber:x:oob") unless un - puts "MMSOOB: no url node found so process as normal" + LOG.debug "No OOB URL node, processing as normal" return to_catapult(s, nil, num_dest, user_id, token, secret, usern) end - puts "MMSOOB: found a url node - checking if to make MMS..." + LOG.debug "Found OOB URL node, checking MMS eligibility" body = s.respond_to?(:body) ? s.body : '' @@ -427,10 +438,7 @@ module SGXbwmsgsv2 # some clients send URI in both body & so delete s.body = body.sub(/\s*#{Regexp.escape(un.text)}\s*$/, '') - puts "MMSOOB: url text is '#{un.text}'" - puts "MMSOOB: the body is '#{body.to_s.strip}'" - - puts "MMSOOB: sending MMS since found OOB & user asked" + LOG.debug("OOB MMS details", url: un.text, body: body.to_s.strip) to_catapult(s, un.text, num_dest, user_id, token, secret, usern) end } @@ -523,7 +531,7 @@ module SGXbwmsgsv2 [:cancel, 'item-not-found'] ) end - puts "ADRXEP: found an addresses node - iterate addrs.." + LOG.debug "Found addresses node, iterating" nums = [] an.children.each do |e| @@ -585,7 +593,7 @@ module SGXbwmsgsv2 message :error? do |m| # TODO: report it somewhere/somehow - eat for now so no err loop - puts "EATERROR1: #{m.inspect}" + LOG.warn("Eating error stanza", stanza: m.inspect) true end @@ -636,15 +644,12 @@ module SGXbwmsgsv2 end subscription :request? do |p| - puts "PRESENCE1: #{p.inspect}" - # subscriptions are allowed from anyone - send reply immediately msg = Blather::Stanza::Presence.new msg.to = p.from msg.from = p.to msg.type = :subscribed - puts 'RESPONSE5a: ' + msg.inspect write_to_stream msg # send a immediately; not automatically probed for it @@ -659,7 +664,6 @@ module SGXbwmsgsv2 msg.to = p.from msg.from = p.to.to_s + '/sgx' - puts 'RESPONSE5b: ' + msg.inspect write_to_stream msg # need to subscribe back so Conversations displays images inline @@ -668,13 +672,10 @@ module SGXbwmsgsv2 msg.from = p.to.to_s.split('/', 2)[0] msg.type = :subscribe - puts 'RESPONSE5c: ' + msg.inspect write_to_stream msg end presence :probe? do |p| - puts 'PRESENCE2: ' + p.inspect - caps = Blather::Stanza::Capabilities.new # TODO: user a better node URI (?) caps.node = 'http://catapult.sgx.soprani.ca/' @@ -685,7 +686,6 @@ module SGXbwmsgsv2 msg.to = p.from msg.from = p.to.to_s + '/sgx' - puts 'RESPONSE6: ' + msg.inspect write_to_stream msg end @@ -710,7 +710,6 @@ module SGXbwmsgsv2 reply.items = [] end - puts 'RESPONSE_CMD_DISCO: ' + reply.inspect write_to_stream reply } }.catch { |e| @@ -727,8 +726,7 @@ module SGXbwmsgsv2 # :error it should be ignored (as the below does currently); # review specification to see how to handle other type values if i.type != :get - puts 'DISCO iq rcvd, of non-get type "' + i.type.to_s + - '" for message "' + i.inspect + '"; ignoring...' + LOG.warn("Ignoring non-get disco IQ", type: i.type.to_s, stanza: i.inspect) next end @@ -742,7 +740,6 @@ module SGXbwmsgsv2 msg.identities = user_cap_identities msg.features = user_cap_features - puts 'RESPONSE7: ' + msg.inspect write_to_stream msg next end @@ -809,7 +806,7 @@ module SGXbwmsgsv2 JSON.parse(response) # TODO: confirm response is array - could be empty - puts "register got str #{response.to_s[0..999]}" + LOG.debug("Registration verify response", response: response.to_s[0..999]) check_then_register( i, @@ -884,8 +881,6 @@ module SGXbwmsgsv2 end ibr do |i| - puts "IQ: #{i.inspect}" - case i.type when :set process_registration(i) @@ -893,7 +888,6 @@ module SGXbwmsgsv2 bare_jid = i.from.stripped @registration_repo.find(bare_jid).then { |creds| reply = registration_form(i.reply, creds.last) - puts "RESPONSE2: #{reply.inspect}" write_to_stream reply } else @@ -937,7 +931,6 @@ module SGXbwmsgsv2 reply.command.add_child(form) reply.allowed_actions = [:complete] - puts "RESPONSE_CMD_FORM: #{reply.inspect}" write_to_stream reply }.catch { |e| if e.is_a?(Array) && [2, 3].include?(e.length) @@ -1041,24 +1034,20 @@ class WebhookHandler < Goliath::API @registration_repo = RegistrationRepo.new # TODO: add timestamp grab here, and MUST include ./tai version - puts 'ENV: ' + env.reject { |k| k == 'params' }.to_s + LOG.debug("Webhook request env", env: env.reject { |k| k == 'params' }) if params.empty? - puts 'PARAMS empty!' + LOG.warn "Empty webhook params" return [200, {}, "OK"] end if env['REQUEST_URI'] != '/' - puts 'BADREQUEST1: non-/ request "' + - env['REQUEST_URI'] + '", method "' + - env['REQUEST_METHOD'] + '"' + LOG.warn("Non-/ request", uri: env['REQUEST_URI'], method: env['REQUEST_METHOD']) return [200, {}, "OK"] end if env['REQUEST_METHOD'] != 'POST' - puts 'BADREQUEST2: non-POST request; URI: "' + - env['REQUEST_URI'] + '", method "' + - env['REQUEST_METHOD'] + '"' + LOG.warn("Non-POST request", uri: env['REQUEST_URI'], method: env['REQUEST_METHOD']) return [200, {}, "OK"] end @@ -1076,7 +1065,7 @@ class WebhookHandler < Goliath::API elsif jparams['direction'] == 'out' [jparams['from'], jparams['owner']] # NOTE: for outbound, 'from' == 'owner' else - puts "big prob: '#{jparams['direction']}'" + LOG.error("Unexpected message direction", direction: jparams['direction']) [jparams['from'], jparams['owner']] end @@ -1084,15 +1073,18 @@ class WebhookHandler < Goliath::API return [400, {}, "Missing params\n"] unless jparams['to'].is_a?(Array) return [400, {}, "Missing params\n"] if jparams['to'].empty? - puts "BODY - messageId: #{jparams['id']}" \ - ", eventType: #{type}" \ - ", time: #{jparams['time']}" \ - ", direction: #{jparams['direction']}" \ - ", deliveryState: #{jparams['deliveryState'] || 'NONE'}" \ - ", errorCode: #{jparams['errorCode'] || 'NONE'}" \ - ", description: #{jparams['description'] || 'NONE'}" \ - ", tag: #{jparams['tag'] || 'NONE'}" \ - ", media: #{jparams['media'] || 'NONE'}" + LOG.info( + "Webhook message", + message_id: jparams['id'], + event_type: type, + time: jparams['time'], + direction: jparams['direction'], + delivery_state: jparams['deliveryState'], + error_code: jparams['errorCode'], + description: jparams['description'], + tag: jparams['tag'], + media: jparams['media'] + ) if others_num[0] != '+' # TODO: check that others_num actually a shortcode first @@ -1103,7 +1095,7 @@ class WebhookHandler < Goliath::API bare_jid = @registration_repo.find_jid(users_num).sync if !bare_jid - puts "jid_key for (#{users_num}) DNE; BW API misconfigured?" + LOG.warn("JID not found for number, BW API misconfigured?", users_num: users_num) return [403, {}, "Customer not found\n"] end @@ -1152,9 +1144,6 @@ class WebhookHandler < Goliath::API end msg.add_child(addrs) - - # TODO: delete - puts "RESPONSE9: #{msg.inspect}" end media_urls = Array(jparams['media']).map { |media_url| @@ -1197,7 +1186,7 @@ class WebhookHandler < Goliath::API " with text: " + jparams['text'] # TODO: log/notify of this properly - puts text + LOG.warn("Unknown inbound message type", text: text) end # If text is not empty, but there isn't a msg, @@ -1237,7 +1226,7 @@ class WebhookHandler < Goliath::API bandwidth_id: jparams['id'] ).emit(REDIS) end - puts "WARN! group no rcpt: #{users_num}" + LOG.warn("Group message, skipping receipt", users_num: users_num) return [200, {}, "OK"] end @@ -1277,12 +1266,12 @@ class WebhookHandler < Goliath::API others_num = params['_json'][0]['to'] else # TODO: notify somehow of unknown state receivd? - puts "message with id #{id} has "\ - "other type #{type}" + LOG.warn("Unknown outbound message type", id: id, type: type) return [200, {}, "OK"] end - puts "RESPONSE4: #{msg.inspect}" + # Keeping this due to the `msg.from=` shuffle just below + LOG.debug("Outbound callback response", stanza: msg.inspect) end # if message-failed, we already set msg.from @@ -1336,27 +1325,23 @@ class WebhookHandler < Goliath::API [200, {}, "OK"] rescue Exception => e Sentry.capture_exception(e) - p e [500, {}, "Error"] end end at_exit do - $stdout.sync = true - - puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\ - "==>> last commit of this version is " + `git rev-parse HEAD` + "\n" + LOG.info("Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2", + version: `git rev-parse HEAD`.chomp) if ARGV.size != 7 - puts "Usage: sgx-bwmsgsv2.rb "\ + warn "Usage: sgx-bwmsgsv2.rb "\ " "\ " "\ " " exit 0 end - t = Time.now - puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec] + LOG.info "Starting" EM.run do REDIS = EM::Hiredis.connect @@ -1373,17 +1358,15 @@ at_exit do server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i) server.api = WebhookHandler.new server.app = Goliath::Rack::Builder.build(server.api.class, server.api) - server.logger = Log4r::Logger.new('goliath') - server.logger.add(Log4r::StdoutOutputter.new('console')) - server.logger.level = Log4r::INFO + server.logger = LOG server.start do ["INT", "TERM"].each do |sig| trap(sig) do EM.defer do - puts 'Shutting down gateway...' + LOG.info "Shutting down gateway" SGXbwmsgsv2.shutdown - puts 'Gateway has terminated.' + LOG.info "Gateway has terminated" EM.stop end end