1#!/usr/bin/ruby
2# frozen_string_literal: true
3
4require "em-http/middleware/json_response"
5require "sentry-ruby"
6require "em-hiredis"
7require "em_promise"
8require "em-http-request"
9require "optparse"
10
11OPTIONS = {}.tap { |options|
12 OptionParser.new do |opt|
13 opt.on("--webhook-endpoint WEBHOOK_ENDPOINT") do |o|
14 options[:webhook_endpoint] = o
15 end
16 opt.on("--bw-acct-id BANDWIDTH_ACCOUNT_ID") do |o|
17 options[:account_id] = o
18 end
19 opt.on("--bw-password BANDWIDTH_PASSWORD") do |o|
20 options[:password] = o
21 end
22 opt.on("--bw-username BANDWIDTH_USERNAME") do |o|
23 options[:username] = o
24 end
25 end.parse!
26}.freeze
27
28$stdout.sync = true
29
30Sentry.init do |config|
31 config.background_worker_threads = 0
32end
33
34def save_last_msg(redis, info)
35 redis.mset(
36 "bw_last_message_failed_id", info.id,
37 "bw_last_message_failed_time", info.date
38 )
39end
40
41class BlockedMessage
42 def initialize(msg)
43 @msg = msg
44 end
45
46 def notification_text
47 "This is an automated notification from JMP " \
48 "letting you know that a message to you " \
49 "was blocked by the carrier or otherwise failed " \
50 "to be delivered at #{@msg['receiveTime']}."
51 end
52
53 def notification_body
54 {
55 direction: "in",
56 owner: @msg["destinationTn"],
57 from: @msg["sourceTn"],
58 errorCode: @msg["errorCode"],
59 to: [@msg["destinationTn"]],
60 deliveryState: "FAILED",
61 id: @msg["message_id"],
62 text: notification_text
63 }
64 end
65
66 def to_h
67 {
68 time: @msg["receiveTime"],
69 type: "sms",
70 description: "Notification of failed or blocked message",
71 message: notification_body
72 }
73 end
74
75 def notify!
76 EM::HttpRequest.new(
77 OPTIONS[:webhook_endpoint],
78 tls: {verify_peer: true}
79 ).post(
80 body: [to_h].to_json,
81 head: {"Content-Type"=>"application/json"}
82 )
83 end
84end
85
86class BandwidthMessages
87 attr_accessor :query
88
89 BW_HEAD = {
90 "Authorization" => [OPTIONS[:username], OPTIONS[:password]],
91 "Content-Type" => "application/json"
92 }.freeze
93
94 def initialize(query, starting_id: nil, starting_date: nil)
95 @query = query
96 @starting_id = starting_id
97 @starting_date = starting_date
98 end
99
100 def each(&block)
101 req.then { |res|
102 EMPromise.all(res.response["messages"].map { |msg|
103 block.call(BlockedMessage.new(msg))
104 }).then { res }
105 }.then { |res|
106 for_next_page(res).each(&block)
107 }
108 end
109
110 URL = "https://messaging.bandwidth.com/api/v2/users/" \
111 "#{OPTIONS[:account_id]}" \
112 "/messages"
113
114 def req
115 EM::HttpRequest
116 .new(URL, tls: {verify_peer: true})
117 .tap { |r| r.use(EM::Middleware::JSONResponse) }
118 .get(head: BW_HEAD, query: @query)
119 end
120
121 def for_next_page(res)
122 page_token = res.response.dig("pageInfo", "nextPageToken")
123 if page_token
124 clone.tap { |s| s.query.merge!(pageToken: page_token) }
125 elsif (last_msg = res.response&.[]("messages")&.last)
126 Done.new(last_msg["messageId"], last_msg["receiveTime"])
127 else
128 Done.new(@starting_id, @starting_date)
129 end
130 end
131
132 Done = Struct.new(:id, :date) do
133 def each
134 EMPromise.resolve(self)
135 end
136 end
137end
138
139def panic(e, hub=nil)
140 warn "Error raised during event loop: #{e.class}", e, e.backtrace.join("\n")
141 if e.is_a?(Exception)
142 (hub || Sentry).capture_exception(e, hint: {background: false})
143 else
144 (hub || Sentry).capture_message(e.to_s, hint: {background: false})
145 end
146 exit 1
147end
148
149QUERY = {
150 messageStatus: "FAILED",
151 messageDirection: "INBOUND",
152 sort: "receiveTime:asc"
153}.freeze
154
155EM.run do
156 redis = EM::Hiredis.connect
157
158 EMPromise.resolve(
159 redis.mget("bw_last_message_failed_time", "bw_last_message_failed_id")
160 ).then { |(last_msg_date, last_msg_id)|
161 thirty_mins_ago = (Time.now - 30 * 60).strftime("%Y-%m-%dT%H:%M:%S.%LZ")
162 last_msg_date ||= thirty_mins_ago
163 last_msg_date = [last_msg_date, thirty_mins_ago].max
164
165 BandwidthMessages.new(
166 QUERY.merge({fromDateTime: last_msg_date}),
167 starting_id: last_msg_id, starting_date: last_msg_date
168 ).each(&:notify!)
169 }.then { |info|
170 save_last_msg(redis, info)
171 }.then {
172 EM.stop
173 }.catch { |err|
174 panic(err)
175 }
176end