diff --git a/.rubocop.yml b/.rubocop.yml index b48200bf9d7fdd4daeb813166ccf21eae5d2ced0..14cbc9da2960114e6085f22fcb7a898c1ca6965d 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -32,6 +32,7 @@ Metrics/CyclomaticComplexity: Metrics/MethodLength: Max: 200 + CountAsOne: ["hash"] Metrics/ModuleLength: Max: 1000 @@ -66,7 +67,7 @@ Style/FormatString: # Offense count: 1 Style/IfInsideElse: Exclude: - - 'sgx-bwmsgsv2.rb' + - "sgx-bwmsgsv2.rb" Layout/LeadingCommentSpace: Enabled: false diff --git a/bin/notify_inbound_failures_job b/bin/notify_inbound_failures_job new file mode 100755 index 0000000000000000000000000000000000000000..763b2c93093cffd97e9c196c68df4a86ec7736af --- /dev/null +++ b/bin/notify_inbound_failures_job @@ -0,0 +1,178 @@ +#!/usr/bin/ruby +# frozen_string_literal: true + +require "em-http/middleware/json_response" +require "sentry-ruby" +require "em-hiredis" +require "em_promise" +require "em-http-request" +require "optparse" + +OPTIONS = {}.tap { |options| + OptionParser.new do |opt| + opt.on("--webhook-endpoint WEBHOOK_ENDPOINT") do |o| + options[:webhook_endpoint] = o + end + opt.on("--bw-acct-id BANDWIDTH_ACCOUNT_ID") do |o| + options[:account_id] = o + end + opt.on("--bw-password BANDWIDTH_PASSWORD") do |o| + options[:password] = o + end + opt.on("--bw-username BANDWIDTH_USERNAME") do |o| + options[:username] = o + end + end.parse! +}.freeze + +$stdout.sync = true + +Sentry.init do |config| + config.background_worker_threads = 0 +end + +def save_last_msg(redis, info) + redis.mset( + "bw_last_message_failed_id", info.id, + "bw_last_message_failed_time", info.date + ) +end + +class BlockedMessage + def initialize(msg) + @msg = msg + end + + def notification_text + "This is an automated notification from JMP " \ + "letting you know that a message from #{@msg['sourceTn']} " \ + "was blocked by the carrier or otherwise failed " \ + "to be delivered at #{@msg['receiveTime']}." + end + + def notification_body + { + direction: "in", + owner: @msg["destinationTn"], + from: @msg["sourceTn"], + errorCode: @msg["errorCode"], + to: [@msg["destinationTn"]], + deliveryState: "FAILED", + id: @msg["message_id"], + text: notification_text + } + end + + def to_h + [{ + time: @msg["receiveTime"], + type: "sms", + description: "Notification of failed or blocked message", + message: notification_body + }].to_json + end + + def notify! + EM::HttpRequest.new( + OPTIONS[:webhook_endpoint], + tls: {verify_peer: true} + ).post( + body: to_h, + head: {"Content-Type"=>"application/json"} + ) + end +end + +class BandwidthMessages + attr_accessor :query + + BW_HEAD = { + "Authorization" => [OPTIONS[:username], OPTIONS[:password]], + "Content-Type" => "application/json" + }.freeze + + def initialize(query, starting_id: nil, starting_date: nil) + @query = query + @starting_id = starting_id + @starting_date = starting_date + end + + def each(&block) + req.then { |res| + EMPromise.all(res.response["messages"].map { |msg| + block.call(BlockedMessage.new(msg)) + }).then { res } + }.then { |res| + for_next_page(res).each(&block) + } + end + + URL = "https://messaging.bandwidth.com/api/v2/users/" \ + "#{OPTIONS[:account_id]}" \ + "/messages" + + def req + req = EM::HttpRequest.new(URL, tls: {verify_peer: true}) + req.use(EM::Middleware::JSONResponse).get( + head: BW_HEAD, + query: @query + ) + end + + def for_next_page(res) + page_token = res.response.dig("pageInfo", "nextPageToken") + if page_token + clone.tap { |s| s.query.merge!(pageToken: page_token) } + elsif (last_msg = res.response&.[]("messages")&.last) + Done.new(last_msg["messageId"], last_msg["receiveTime"]) + else + Done.new(@starting_id, @starting_date) + end + end + + Done = Struct.new(:id, :date) do + def each + EMPromise.resolve(self) + end + end +end + +def panic(e, hub=nil) + warn "Error raised during event loop: #{e.class}", e, e.backtrace.join("\n") + if e.is_a?(Exception) + (hub || Sentry).capture_exception(e, hint: {background: false}) + else + (hub || Sentry).capture_message(e.to_s, hint: {background: false}) + end + exit 1 +end + +QUERY = { + messageStatus: "FAILED", + messageDirection: "INBOUND", + sort: "receiveTime:asc" +}.freeze + +EM.run do + redis = EM::Hiredis.connect + + EMPromise.resolve( + redis.mget("bw_last_msg_failed_date", "bw_last_msg_failed_id") + ).then { |(last_msg_date, last_msg_id)| + ## Full disclosure, string format came from ChatGPT + thirty_mins_ago = (Time.now - 30 * 60).strftime("%Y-%m-%dT%H:%M:%S.%LZ") + last_msg_date ||= thirty_mins_ago + last_msg_date = [last_msg_date, thirty_mins_ago].max + + BandwidthMessages.new( + QUERY.merge({fromDateTime: last_msg_date}), + starting_id: last_msg_id, starting_date: last_msg_date + ).each(&:notify!) + }.then { |info| + save_last_msg(redis, info) + }.then { + EM.stop + }.catch { |err| + panic(err) + } +end