notify_inbound_failures_job

  1#!/usr/bin/ruby
  2# frozen_string_literal: true
  3
  4require "em-http/middleware/json_response"
  5require "sentry-ruby"
  6require "em-hiredis"
  7require "em_promise"
  8require "em-http-request"
  9require "optparse"
 10
 11OPTIONS = {}.tap { |options|
 12	OptionParser.new do |opt|
 13		opt.on("--webhook-endpoint WEBHOOK_ENDPOINT") do |o|
 14			options[:webhook_endpoint] = o
 15		end
 16		opt.on("--bw-acct-id BANDWIDTH_ACCOUNT_ID") do |o|
 17			options[:account_id] = o
 18		end
 19		opt.on("--bw-password BANDWIDTH_PASSWORD") do |o|
 20			options[:password] = o
 21		end
 22		opt.on("--bw-username BANDWIDTH_USERNAME") do |o|
 23			options[:username] = o
 24		end
 25	end.parse!
 26}.freeze
 27
 28$stdout.sync = true
 29
 30Sentry.init do |config|
 31	config.background_worker_threads = 0
 32end
 33
 34def save_last_msg(redis, info)
 35	redis.mset(
 36		"bw_last_message_failed_id", info.id,
 37		"bw_last_message_failed_time", info.date
 38	)
 39end
 40
 41class BlockedMessage
 42	def initialize(msg)
 43		@msg = msg
 44	end
 45
 46	def notification_text
 47		"This is an automated notification from JMP " \
 48		"letting you know that a message to you " \
 49		"was blocked by the carrier or otherwise failed " \
 50		"to be delivered at #{@msg['receiveTime']}."
 51	end
 52
 53	def notification_body
 54		{
 55			direction: "in",
 56			owner: @msg["destinationTn"],
 57			from: @msg["sourceTn"],
 58			errorCode: @msg["errorCode"],
 59			to: [@msg["destinationTn"]],
 60			deliveryState: "FAILED",
 61			id: @msg["message_id"],
 62			text: notification_text
 63		}
 64	end
 65
 66	def to_h
 67		{
 68			time: @msg["receiveTime"],
 69			type: "sms",
 70			description: "Notification of failed or blocked message",
 71			message: notification_body
 72		}
 73	end
 74
 75	def notify!
 76		EM::HttpRequest.new(
 77			OPTIONS[:webhook_endpoint],
 78			tls: {verify_peer: true}
 79		).post(
 80			body: [to_h].to_json,
 81			head: {"Content-Type"=>"application/json"}
 82		)
 83	end
 84end
 85
 86class BandwidthMessages
 87	attr_accessor :query
 88
 89	BW_HEAD = {
 90		"Authorization" => [OPTIONS[:username], OPTIONS[:password]],
 91		"Content-Type" => "application/json"
 92	}.freeze
 93
 94	def initialize(query, starting_id: nil, starting_date: nil)
 95		@query = query
 96		@starting_id = starting_id
 97		@starting_date = starting_date
 98	end
 99
100	def each(&block)
101		req.then { |res|
102			EMPromise.all(res.response["messages"].map { |msg|
103				block.call(BlockedMessage.new(msg))
104			}).then { res }
105		}.then { |res|
106			for_next_page(res).each(&block)
107		}
108	end
109
110	URL = "https://messaging.bandwidth.com/api/v2/users/" \
111		"#{OPTIONS[:account_id]}" \
112		"/messages"
113
114	def req
115		EM::HttpRequest
116			.new(URL, tls: {verify_peer: true})
117			.tap { |r| r.use(EM::Middleware::JSONResponse) }
118			.get(head: BW_HEAD, query: @query)
119	end
120
121	def for_next_page(res)
122		page_token = res.response.dig("pageInfo", "nextPageToken")
123		if page_token
124			clone.tap { |s| s.query.merge!(pageToken: page_token) }
125		elsif (last_msg = res.response&.[]("messages")&.last)
126			Done.new(last_msg["messageId"], last_msg["receiveTime"])
127		else
128			Done.new(@starting_id, @starting_date)
129		end
130	end
131
132	Done = Struct.new(:id, :date) do
133		def each
134			EMPromise.resolve(self)
135		end
136	end
137end
138
139def panic(e, hub=nil)
140	warn "Error raised during event loop: #{e.class}", e, e.backtrace.join("\n")
141	if e.is_a?(Exception)
142		(hub || Sentry).capture_exception(e, hint: {background: false})
143	else
144		(hub || Sentry).capture_message(e.to_s, hint: {background: false})
145	end
146	exit 1
147end
148
149QUERY = {
150	messageStatus: "FAILED",
151	messageDirection: "INBOUND",
152	sort: "receiveTime:asc"
153}.freeze
154
155EM.run do
156	redis = EM::Hiredis.connect
157
158	EMPromise.resolve(
159		redis.mget("bw_last_message_failed_time", "bw_last_message_failed_id")
160	).then { |(last_msg_date, last_msg_id)|
161		thirty_mins_ago = (Time.now - 30 * 60).strftime("%Y-%m-%dT%H:%M:%S.%LZ")
162		last_msg_date ||= thirty_mins_ago
163		last_msg_date = [last_msg_date, thirty_mins_ago].max
164
165		BandwidthMessages.new(
166			QUERY.merge({fromDateTime: last_msg_date}),
167			starting_id: last_msg_id, starting_date: last_msg_date
168		).each(&:notify!)
169	}.then { |info|
170		save_last_msg(redis, info)
171	}.then {
172		EM.stop
173	}.catch { |err|
174		panic(err)
175	}
176end