notify_inbound_failures_job

  1#!/usr/bin/ruby
  2# frozen_string_literal: true
  3
  4require "em-http/middleware/json_response"
  5require "sentry-ruby"
  6require "em-hiredis"
  7require "em_promise"
  8require "em-http-request"
  9require "optparse"
 10
 11OPTIONS = {}.tap { |options|
 12	OptionParser.new do |opt|
 13		opt.on("--webhook-endpoint WEBHOOK_ENDPOINT") do |o|
 14			options[:webhook_endpoint] = o
 15		end
 16		opt.on("--bw-acct-id BANDWIDTH_ACCOUNT_ID") do |o|
 17			options[:account_id] = o
 18		end
 19		opt.on("--bw-password BANDWIDTH_PASSWORD") do |o|
 20			options[:password] = o
 21		end
 22		opt.on("--bw-username BANDWIDTH_USERNAME") do |o|
 23			options[:username] = o
 24		end
 25	end.parse!
 26}.freeze
 27
 28$stdout.sync = true
 29
 30Sentry.init do |config|
 31	config.background_worker_threads = 0
 32end
 33
 34def save_last_msg(redis, info)
 35	redis.mset(
 36		"bw_last_message_failed_id", info.id,
 37		"bw_last_message_failed_time", info.date
 38	)
 39end
 40
 41class BlockedMessage
 42	def initialize(msg)
 43		@msg = msg
 44	end
 45
 46	def notification_text
 47		"This is an automated notification from JMP " \
 48		"letting you know that a message from #{@msg['sourceTn']} " \
 49		"was blocked by the carrier or otherwise failed " \
 50		"to be delivered at #{@msg['receiveTime']}."
 51	end
 52
 53	def notification_body
 54		{
 55			direction: "in",
 56			owner: @msg["destinationTn"],
 57			from: @msg["sourceTn"],
 58			errorCode: @msg["errorCode"],
 59			to: [@msg["destinationTn"]],
 60			deliveryState: "FAILED",
 61			id: @msg["message_id"],
 62			text: notification_text
 63		}
 64	end
 65
 66	def to_h
 67		[{
 68			time: @msg["receiveTime"],
 69			type: "sms",
 70			description: "Notification of failed or blocked message",
 71			message: notification_body
 72		}].to_json
 73	end
 74
 75	def notify!
 76		EM::HttpRequest.new(
 77			OPTIONS[:webhook_endpoint],
 78			tls: {verify_peer: true}
 79		).post(
 80			body: to_h,
 81			head: {"Content-Type"=>"application/json"}
 82		)
 83	end
 84end
 85
 86class BandwidthMessages
 87	attr_accessor :query
 88
 89	BW_HEAD = {
 90		"Authorization" => [OPTIONS[:username], OPTIONS[:password]],
 91		"Content-Type" => "application/json"
 92	}.freeze
 93
 94	def initialize(query, starting_id: nil, starting_date: nil)
 95		@query = query
 96		@starting_id = starting_id
 97		@starting_date = starting_date
 98	end
 99
100	def each(&block)
101		req.then { |res|
102			EMPromise.all(res.response["messages"].map { |msg|
103				block.call(BlockedMessage.new(msg))
104			}).then { res }
105		}.then { |res|
106			for_next_page(res).each(&block)
107		}
108	end
109
110	URL = "https://messaging.bandwidth.com/api/v2/users/" \
111		"#{OPTIONS[:account_id]}" \
112		"/messages"
113
114	def req
115		req = EM::HttpRequest.new(URL, tls: {verify_peer: true})
116		req.use(EM::Middleware::JSONResponse).get(
117			head: BW_HEAD,
118			query: @query
119		)
120	end
121
122	def for_next_page(res)
123		page_token = res.response.dig("pageInfo", "nextPageToken")
124		if page_token
125			clone.tap { |s| s.query.merge!(pageToken: page_token) }
126		elsif (last_msg = res.response&.[]("messages")&.last)
127			Done.new(last_msg["messageId"], last_msg["receiveTime"])
128		else
129			Done.new(@starting_id, @starting_date)
130		end
131	end
132
133	Done = Struct.new(:id, :date) do
134		def each
135			EMPromise.resolve(self)
136		end
137	end
138end
139
140def panic(e, hub=nil)
141	warn "Error raised during event loop: #{e.class}", e, e.backtrace.join("\n")
142	if e.is_a?(Exception)
143		(hub || Sentry).capture_exception(e, hint: {background: false})
144	else
145		(hub || Sentry).capture_message(e.to_s, hint: {background: false})
146	end
147	exit 1
148end
149
150QUERY = {
151	messageStatus: "FAILED",
152	messageDirection: "INBOUND",
153	sort: "receiveTime:asc"
154}.freeze
155
156EM.run do
157	redis = EM::Hiredis.connect
158
159	EMPromise.resolve(
160		redis.mget("bw_last_msg_failed_date", "bw_last_msg_failed_id")
161	).then { |(last_msg_date, last_msg_id)|
162		## Full disclosure, string format came from ChatGPT
163		thirty_mins_ago = (Time.now - 30 * 60).strftime("%Y-%m-%dT%H:%M:%S.%LZ")
164		last_msg_date ||= thirty_mins_ago
165		last_msg_date = [last_msg_date, thirty_mins_ago].max
166
167		BandwidthMessages.new(
168			QUERY.merge({fromDateTime: last_msg_date}),
169			starting_id: last_msg_id, starting_date: last_msg_date
170		).each(&:notify!)
171	}.then { |info|
172		save_last_msg(redis, info)
173	}.then {
174		EM.stop
175	}.catch { |err|
176		panic(err)
177	}
178end