1#!/usr/bin/ruby
2# frozen_string_literal: true
3
4require "bigdecimal"
5require "pg/em/connection_pool"
6require "eventmachine"
7require "em_promise"
8require "em-hiredis"
9require "dhall"
10require "ougai"
11require "sentry-ruby"
12
13$stdout.sync = true
14LOG = Ougai::Logger.new($stdout)
15LOG.level = ENV.fetch("LOG_LEVEL", "info")
16LOG.formatter = Ougai::Formatters::Readable.new(
17 nil,
18 nil,
19 plain: !$stdout.isatty
20)
21
22def log
23 LOG
24end
25
26Sentry.init do |config|
27 config.background_worker_threads = 0
28end
29
30SCHEMA = "{
31 xmpp: { jid: Text, password: Text },
32 component: { jid: Text },
33 keepgo: Optional { access_token: Text, api_key: Text },
34 sims: {
35 CAD: { per_gb: Natural, annual: Natural },
36 USD: { per_gb: Natural, annual: Natural }
37 },
38 plans: List {
39 currency: < CAD | USD >,
40 messages: < limited: { included: Natural, price: Natural } | unlimited >,
41 minutes: < limited: { included: Natural, price: Natural } | unlimited >,
42 monthly_price : Natural,
43 name : Text,
44 allow_register: Bool,
45 subaccount_discount: Natural
46 },
47 braintree: {
48 environment : Text,
49 merchant_accounts : { CAD : Text, USD : Text },
50 merchant_id : Text,
51 private_key : Text,
52 public_key : Text
53 }
54}"
55
56raise "Need a Dhall config" unless ARGV[0]
57
58CONFIG = Dhall::Coder
59 .new(safe: Dhall::Coder::JSON_LIKE + [Symbol, Proc])
60 .load("#{ARGV.first} : #{SCHEMA}", transform_keys: :to_sym)
61
62CONFIG[:keep_area_codes_in] = {}
63CONFIG[:creds] = {}
64
65require_relative "../lib/async_braintree"
66require_relative "../lib/blather_notify"
67require_relative "../lib/customer_repo"
68require_relative "../lib/expiring_lock"
69require_relative "../lib/low_balance"
70require_relative "../lib/postgres"
71require_relative "../lib/sim_repo"
72require_relative "../lib/transaction"
73
74BRAINTREE = AsyncBraintree.new(**CONFIG[:braintree])
75CUSTOMER_REPO = CustomerRepo.new
76SIM_REPO = SIMRepo.new
77
78class JobCustomer < SimpleDelegator
79 def billing_customer
80 super.then(&self.class.method(:new))
81 end
82
83 def stanza_to(stanza)
84 stanza = stanza.dup
85 stanza.from = nil # It's a client connection, use default
86 stanza.to = Blather::JID.new(
87 "customer_#{customer_id}", CONFIG[:component][:jid]
88 ).with(resource: stanza.to&.resource)
89 block_given? ? yield(stanza) : (BlatherNotify << stanza)
90 end
91end
92
93module SimAction
94 attr_accessor :customer
95
96 def initialize(sim, customer: nil)
97 @sim = sim
98 @customer = customer
99 end
100
101 def iccid
102 @sim.iccid
103 end
104
105 def customer_id
106 customer.customer_id
107 end
108
109 def refill_price
110 (BigDecimal(CONFIG[:sims][customer.currency][:per_gb]) / 100) * 5
111 end
112
113 def refill_and_bill(data, price, note)
114 SIM_REPO.refill(@sim, amount_mb: data).then { |keepgo_tx|
115 raise "SIM refill failed: #{iccid}" unless keepgo_tx["ack"] == "success"
116
117 Transaction.new(
118 customer_id: customer_id,
119 transaction_id: keepgo_tx["transaction_id"],
120 amount: -price, note: note
121 ).insert
122 }.then do
123 LOG.info "Refilled #{customer.customer_id} #{iccid}"
124 end
125 end
126
127 def monthly_limit
128 REDIS.get(
129 "jmp_customer_monthly_data_limit-#{customer_id}"
130 ).then do |limit|
131 BigDecimal(limit || refill_price)
132 end
133 end
134
135 def amount_spent
136 promise = DB.query_defer(<<~SQL, [customer_id])
137 SELECT COALESCE(SUM(amount), 0) AS a FROM transactions WHERE
138 customer_id=$1 AND transaction_id LIKE 'AB_59576_%' AND
139 created_at >= DATE_TRUNC('month', LOCALTIMESTAMP)
140 SQL
141 promise.then { |rows| -rows.first["a"] }
142 end
143end
144
145class SimTopUp
146 include SimAction
147
148 def low_balance
149 LowBalance.for(customer, refill_price).then(&:notify!).then do |result|
150 @customer = customer.with_balance(customer.balance + result)
151 next call if result.positive?
152
153 LOG.info "Low balance #{customer.customer_id} #{iccid}"
154 end
155 end
156
157 def call
158 EMPromise.all([amount_spent, monthly_limit]).then do |(spent, limit)|
159 if spent < limit
160 next low_balance if customer.balance < refill_price
161
162 refill_and_bill(5120, refill_price, "5GB Data Topup for #{iccid}")
163 else
164 SimWarn.new(@sim, customer: customer).call
165 end
166 end
167 end
168end
169
170class SimWarn
171 include SimAction
172
173 def notify
174 ExpiringLock.new("jmp_customer_sim_warn-#{customer.customer_id}").with do
175 m = Blather::Stanza::Message.new
176 m.body = "Your SIM #{iccid} only has " \
177 "#{(@sim.remaining_usage_kb / 1024).to_i} MB left"
178 customer.stanza_to(m)
179 end
180 end
181
182 def call
183 EMPromise.all([amount_spent, monthly_limit]).then do |(spent, limit)|
184 next unless spent >= limit || low_balance_and_not_auto_top_up
185
186 notify
187 LOG.info "Data warning #{customer.customer_id} #{@sim.iccid}"
188 end
189 end
190
191 def low_balance_and_not_auto_top_up
192 customer.balance < refill_price && !customer.auto_top_up_amount&.positive?
193 end
194end
195
196class SimAnnual
197 include SimAction
198
199 def notify
200 ExpiringLock.new("jmp_customer_sim_annual-#{customer.customer_id}").with do
201 m = Blather::Stanza::Message.new
202 m.body = "Your SIM #{iccid} only has #{@sim.remaining_days} days left"
203 customer.stanza_to(m)
204 end
205 LOG.info "Annual warning #{customer.customer_id} #{@sim.iccid}"
206 end
207
208 def annual_price
209 BigDecimal(CONFIG[:sims][customer.currency][:annual]) / 100
210 end
211
212 def call
213 if customer.balance >= annual_price
214 refill_and_bill(1024, annual_price, "Annual fee for #{iccid}")
215 else
216 LowBalance.for(customer, annual_price).then(&:notify!).then do |result|
217 next call if result.positive?
218
219 notify
220 end
221 end
222 end
223end
224
225def fetch_customers(cids)
226 # This is gross N+1 against the DB, but also does a buch of Redis work
227 # We expect the set to be very small for the forseeable future,
228 # hundreds at most
229 EMPromise.all(
230 Set.new(cids).to_a.compact.map { |id|
231 CUSTOMER_REPO.find(id).catch_only(CustomerRepo::NotFound) { nil }
232 }
233 ).then do |customers|
234 Hash[customers.compact.map { |c| [c.customer_id, JobCustomer.new(c)] }]
235 end
236end
237
238SIM_QUERY = "SELECT iccid, customer_id FROM sims WHERE iccid = ANY ($1)"
239def load_customers!(sims)
240 DB.query_defer(SIM_QUERY, [sims.keys]).then { |rows|
241 fetch_customers(rows.map { |row| row["customer_id"] }).then do |customers|
242 rows.each do |row|
243 sims[row["iccid"]]&.customer = customers[row["customer_id"]]
244 end
245
246 sims
247 end
248 }
249end
250
251def decide_sim_actions(sims)
252 sims.each_with_object({}) { |sim, h|
253 if sim.remaining_days < 31
254 h[sim.iccid] = SimAnnual.new(sim)
255 elsif sim.remaining_usage_kb < 100000
256 h[sim.iccid] = SimTopUp.new(sim)
257 elsif sim.remaining_usage_kb < 250000
258 h[sim.iccid] = SimWarn.new(sim)
259 end
260 }.compact
261end
262
263EM.run do
264 REDIS = EM::Hiredis.connect
265 DB = Postgres.connect(dbname: "jmp")
266
267 BlatherNotify.start(
268 CONFIG[:xmpp][:jid],
269 CONFIG[:xmpp][:password]
270 ).then { SIM_REPO.all }.then { |sims|
271 load_customers!(decide_sim_actions(sims))
272 }.then { |items|
273 items = items.values.select { |item| item.customer&.currency }
274 EMPromise.all(items.map(&:call))
275 }.catch { |e|
276 LOG.error e
277
278 if e.is_a?(::Exception)
279 Sentry.capture_exception(e)
280 else
281 Sentry.capture_message(e.to_s)
282 end
283 }.then { EM.stop }
284end