#!/usr/bin/ruby # frozen_string_literal: true require "em-http/middleware/json_response" require "sentry-ruby" require "em-hiredis" require "em_promise" require "em-http-request" require "optparse" OPTIONS = {}.tap { |options| OptionParser.new do |opt| opt.on("--webhook-endpoint WEBHOOK_ENDPOINT") do |o| options[:webhook_endpoint] = o end opt.on("--bw-acct-id BANDWIDTH_ACCOUNT_ID") do |o| options[:account_id] = o end opt.on("--bw-password BANDWIDTH_PASSWORD") do |o| options[:password] = o end opt.on("--bw-username BANDWIDTH_USERNAME") do |o| options[:username] = o end end.parse! }.freeze $stdout.sync = true Sentry.init do |config| config.background_worker_threads = 0 end def save_last_msg(redis, info) redis.mset( "bw_last_message_failed_id", info.id, "bw_last_message_failed_time", info.date ) end class BlockedMessage def initialize(msg) @msg = msg end def notification_text "This is an automated notification from JMP " \ "letting you know that a message from #{@msg['sourceTn']} " \ "was blocked by the carrier or otherwise failed " \ "to be delivered at #{@msg['receiveTime']}." end def notification_body { direction: "in", owner: @msg["destinationTn"], from: @msg["sourceTn"], errorCode: @msg["errorCode"], to: [@msg["destinationTn"]], deliveryState: "FAILED", id: @msg["message_id"], text: notification_text } end def to_h [{ time: @msg["receiveTime"], type: "sms", description: "Notification of failed or blocked message", message: notification_body }].to_json end def notify! EM::HttpRequest.new( OPTIONS[:webhook_endpoint], tls: {verify_peer: true} ).post( body: to_h, head: {"Content-Type"=>"application/json"} ) end end class BandwidthMessages attr_accessor :query BW_HEAD = { "Authorization" => [OPTIONS[:username], OPTIONS[:password]], "Content-Type" => "application/json" }.freeze def initialize(query, starting_id: nil, starting_date: nil) @query = query @starting_id = starting_id @starting_date = starting_date end def each(&block) req.then { |res| EMPromise.all(res.response["messages"].map { |msg| block.call(BlockedMessage.new(msg)) }).then { res } }.then { |res| for_next_page(res).each(&block) } end URL = "https://messaging.bandwidth.com/api/v2/users/" \ "#{OPTIONS[:account_id]}" \ "/messages" def req req = EM::HttpRequest.new(URL, tls: {verify_peer: true}) req.use(EM::Middleware::JSONResponse).get( head: BW_HEAD, query: @query ) end def for_next_page(res) page_token = res.response.dig("pageInfo", "nextPageToken") if page_token clone.tap { |s| s.query.merge!(pageToken: page_token) } elsif (last_msg = res.response&.[]("messages")&.last) Done.new(last_msg["messageId"], last_msg["receiveTime"]) else Done.new(@starting_id, @starting_date) end end Done = Struct.new(:id, :date) do def each EMPromise.resolve(self) end end end def panic(e, hub=nil) warn "Error raised during event loop: #{e.class}", e, e.backtrace.join("\n") if e.is_a?(Exception) (hub || Sentry).capture_exception(e, hint: {background: false}) else (hub || Sentry).capture_message(e.to_s, hint: {background: false}) end exit 1 end QUERY = { messageStatus: "FAILED", messageDirection: "INBOUND", sort: "receiveTime:asc" }.freeze EM.run do redis = EM::Hiredis.connect EMPromise.resolve( redis.mget("bw_last_msg_failed_date", "bw_last_msg_failed_id") ).then { |(last_msg_date, last_msg_id)| ## Full disclosure, string format came from ChatGPT thirty_mins_ago = (Time.now - 30 * 60).strftime("%Y-%m-%dT%H:%M:%S.%LZ") last_msg_date ||= thirty_mins_ago last_msg_date = [last_msg_date, thirty_mins_ago].max BandwidthMessages.new( QUERY.merge({fromDateTime: last_msg_date}), starting_id: last_msg_id, starting_date: last_msg_date ).each(&:notify!) }.then { |info| save_last_msg(redis, info) }.then { EM.stop }.catch { |err| panic(err) } end