forward errors to users when a msg gets blocked

Phillip Davis created

Change summary

.rubocop.yml                    |   3 
bin/notify_inbound_failures_job | 178 +++++++++++++++++++++++++++++++++++
2 files changed, 180 insertions(+), 1 deletion(-)

Detailed changes

.rubocop.yml 🔗

@@ -32,6 +32,7 @@ Metrics/CyclomaticComplexity:
 
 Metrics/MethodLength:
   Max: 200
+  CountAsOne: ["hash"]
 
 Metrics/ModuleLength:
   Max: 1000
@@ -66,7 +67,7 @@ Style/FormatString:
 # Offense count: 1
 Style/IfInsideElse:
   Exclude:
-    - 'sgx-bwmsgsv2.rb'
+    - "sgx-bwmsgsv2.rb"
 
 Layout/LeadingCommentSpace:
   Enabled: false

bin/notify_inbound_failures_job 🔗

@@ -0,0 +1,178 @@
+#!/usr/bin/ruby
+# frozen_string_literal: true
+
+require "em-http/middleware/json_response"
+require "sentry-ruby"
+require "em-hiredis"
+require "em_promise"
+require "em-http-request"
+require "optparse"
+
+OPTIONS = {}.tap { |options|
+	OptionParser.new do |opt|
+		opt.on("--webhook-endpoint WEBHOOK_ENDPOINT") do |o|
+			options[:webhook_endpoint] = o
+		end
+		opt.on("--bw-acct-id BANDWIDTH_ACCOUNT_ID") do |o|
+			options[:account_id] = o
+		end
+		opt.on("--bw-password BANDWIDTH_PASSWORD") do |o|
+			options[:password] = o
+		end
+		opt.on("--bw-username BANDWIDTH_USERNAME") do |o|
+			options[:username] = o
+		end
+	end.parse!
+}.freeze
+
+$stdout.sync = true
+
+Sentry.init do |config|
+	config.background_worker_threads = 0
+end
+
+def save_last_msg(redis, info)
+	redis.mset(
+		"bw_last_message_failed_id", info.id,
+		"bw_last_message_failed_time", info.date
+	)
+end
+
+class BlockedMessage
+	def initialize(msg)
+		@msg = msg
+	end
+
+	def notification_text
+		"This is an automated notification from JMP " \
+		"letting you know that a message from #{@msg['sourceTn']} " \
+		"was blocked by the carrier or otherwise failed " \
+		"to be delivered at #{@msg['receiveTime']}."
+	end
+
+	def notification_body
+		{
+			direction: "in",
+			owner: @msg["destinationTn"],
+			from: @msg["sourceTn"],
+			errorCode: @msg["errorCode"],
+			to: [@msg["destinationTn"]],
+			deliveryState: "FAILED",
+			id: @msg["message_id"],
+			text: notification_text
+		}
+	end
+
+	def to_h
+		[{
+			time: @msg["receiveTime"],
+			type: "sms",
+			description: "Notification of failed or blocked message",
+			message: notification_body
+		}].to_json
+	end
+
+	def notify!
+		EM::HttpRequest.new(
+			OPTIONS[:webhook_endpoint],
+			tls: {verify_peer: true}
+		).post(
+			body: to_h,
+			head: {"Content-Type"=>"application/json"}
+		)
+	end
+end
+
+class BandwidthMessages
+	attr_accessor :query
+
+	BW_HEAD = {
+		"Authorization" => [OPTIONS[:username], OPTIONS[:password]],
+		"Content-Type" => "application/json"
+	}.freeze
+
+	def initialize(query, starting_id: nil, starting_date: nil)
+		@query = query
+		@starting_id = starting_id
+		@starting_date = starting_date
+	end
+
+	def each(&block)
+		req.then { |res|
+			EMPromise.all(res.response["messages"].map { |msg|
+				block.call(BlockedMessage.new(msg))
+			}).then { res }
+		}.then { |res|
+			for_next_page(res).each(&block)
+		}
+	end
+
+	URL = "https://messaging.bandwidth.com/api/v2/users/" \
+		"#{OPTIONS[:account_id]}" \
+		"/messages"
+
+	def req
+		req = EM::HttpRequest.new(URL, tls: {verify_peer: true})
+		req.use(EM::Middleware::JSONResponse).get(
+			head: BW_HEAD,
+			query: @query
+		)
+	end
+
+	def for_next_page(res)
+		page_token = res.response.dig("pageInfo", "nextPageToken")
+		if page_token
+			clone.tap { |s| s.query.merge!(pageToken: page_token) }
+		elsif (last_msg = res.response&.[]("messages")&.last)
+			Done.new(last_msg["messageId"], last_msg["receiveTime"])
+		else
+			Done.new(@starting_id, @starting_date)
+		end
+	end
+
+	Done = Struct.new(:id, :date) do
+		def each
+			EMPromise.resolve(self)
+		end
+	end
+end
+
+def panic(e, hub=nil)
+	warn "Error raised during event loop: #{e.class}", e, e.backtrace.join("\n")
+	if e.is_a?(Exception)
+		(hub || Sentry).capture_exception(e, hint: {background: false})
+	else
+		(hub || Sentry).capture_message(e.to_s, hint: {background: false})
+	end
+	exit 1
+end
+
+QUERY = {
+	messageStatus: "FAILED",
+	messageDirection: "INBOUND",
+	sort: "receiveTime:asc"
+}.freeze
+
+EM.run do
+	redis = EM::Hiredis.connect
+
+	EMPromise.resolve(
+		redis.mget("bw_last_msg_failed_date", "bw_last_msg_failed_id")
+	).then { |(last_msg_date, last_msg_id)|
+		## Full disclosure, string format came from ChatGPT
+		thirty_mins_ago = (Time.now - 30 * 60).strftime("%Y-%m-%dT%H:%M:%S.%LZ")
+		last_msg_date ||= thirty_mins_ago
+		last_msg_date = [last_msg_date, thirty_mins_ago].max
+
+		BandwidthMessages.new(
+			QUERY.merge({fromDateTime: last_msg_date}),
+			starting_id: last_msg_id, starting_date: last_msg_date
+		).each(&:notify!)
+	}.then { |info|
+		save_last_msg(redis, info)
+	}.then {
+		EM.stop
+	}.catch { |err|
+		panic(err)
+	}
+end