1#!/usr/bin/ruby
2# frozen_string_literal: true
3
4require "bigdecimal"
5require "pg/em/connection_pool"
6require "eventmachine"
7require "em_promise"
8require "em-hiredis"
9require "dhall"
10require "ougai"
11require "sentry-ruby"
12
13$stdout.sync = true
14LOG = Ougai::Logger.new($stdout)
15LOG.level = ENV.fetch("LOG_LEVEL", "info")
16LOG.formatter = Ougai::Formatters::Readable.new(
17 nil,
18 nil,
19 plain: !$stdout.isatty
20)
21
22def log
23 LOG
24end
25
26Sentry.init do |config|
27 config.background_worker_threads = 0
28end
29
30SCHEMA = "{
31 xmpp: { jid: Text, password: Text },
32 component: { jid: Text },
33 keepgo: Optional { access_token: Text, api_key: Text },
34 sims: {
35 CAD: { per_gb: Natural, annual: Natural },
36 USD: { per_gb: Natural, annual: Natural }
37 },
38 plans: List {
39 currency: < CAD | USD >,
40 messages: < limited: { included: Natural, price: Natural } | unlimited >,
41 minutes: < limited: { included: Natural, price: Natural } | unlimited >,
42 monthly_price : Natural,
43 name : Text,
44 allow_register: Bool,
45 subaccount_discount: Natural
46 },
47 braintree: {
48 environment : Text,
49 merchant_accounts : { CAD : Text, USD : Text },
50 merchant_id : Text,
51 private_key : Text,
52 public_key : Text
53 }
54}"
55
56raise "Need a Dhall config" unless ARGV[0]
57
58CONFIG = Dhall::Coder
59 .new(safe: Dhall::Coder::JSON_LIKE + [Symbol, Proc])
60 .load("#{ARGV.first} : #{SCHEMA}", transform_keys: :to_sym)
61
62CONFIG[:keep_area_codes_in] = {}
63CONFIG[:creds] = {}
64
65require_relative "../lib/async_braintree"
66require_relative "../lib/blather_notify"
67require_relative "../lib/customer_repo"
68require_relative "../lib/low_balance"
69require_relative "../lib/postgres"
70require_relative "../lib/sim_repo"
71require_relative "../lib/transaction"
72
73BRAINTREE = AsyncBraintree.new(**CONFIG[:braintree])
74CUSTOMER_REPO = CustomerRepo.new
75SIM_REPO = SIMRepo.new
76
77class JobCustomer < SimpleDelegator
78 def billing_customer
79 super.then(&self.class.method(:new))
80 end
81
82 def stanza_to(stanza)
83 stanza = stanza.dup
84 stanza.from = nil # It's a client connection, use default
85 stanza.to = Blather::JID.new(
86 "customer_#{customer_id}", CONFIG[:component][:jid]
87 ).with(resource: stanza.to&.resource)
88 block_given? ? yield(stanza) : (BlatherNotify << stanza)
89 end
90end
91
92module SimAction
93 attr_accessor :customer
94
95 def initialize(sim, customer: nil)
96 @sim = sim
97 @customer = customer
98 end
99
100 def iccid
101 @sim.iccid
102 end
103
104 def customer_id
105 customer.customer_id
106 end
107
108 def refill_price
109 (BigDecimal(CONFIG[:sims][customer.currency][:per_gb]) / 100) * 5
110 end
111
112 def refill_and_bill(data, price, note)
113 SIM_REPO.refill(@sim, amount_mb: data).then { |keepgo_tx|
114 raise "SIM refill failed: #{iccid}" unless keepgo_tx["ack"] == "success"
115
116 Transaction.new(
117 customer_id: customer_id,
118 transaction_id: keepgo_tx["transaction_id"],
119 amount: -price, note: note
120 ).insert
121 }.then do
122 LOG.info "Refilled #{customer.customer_id} #{iccid}"
123 end
124 end
125
126 def monthly_limit
127 REDIS.get(
128 "jmp_customer_monthly_data_limit-#{customer_id}"
129 ).then do |limit|
130 BigDecimal(limit || refill_price)
131 end
132 end
133
134 def amount_spent
135 promise = DB.query_defer(<<~SQL, [customer_id])
136 SELECT COALESCE(SUM(amount), 0) AS a FROM transactions WHERE
137 customer_id=$1 AND transaction_id LIKE 'AB_59576_%' AND
138 created_at >= DATE_TRUNC('month', LOCALTIMESTAMP)
139 SQL
140 promise.then { |rows| -rows.first["a"] }
141 end
142end
143
144class SimTopUp
145 include SimAction
146
147 def low_balance
148 LowBalance.for(customer, refill_price).then(&:notify!).then do |result|
149 @customer = customer.with_balance(customer.balance + result)
150 next call if result.positive?
151
152 LOG.info "Low balance #{customer.customer_id} #{iccid}"
153 end
154 end
155
156 def call
157 EMPromise.all([amount_spent, monthly_limit]).then do |(spent, limit)|
158 if spent < limit
159 next low_balance if customer.balance < refill_price
160
161 refill_and_bill(5120, refill_price, "5GB Data Topup for #{iccid}")
162 else
163 SimWarn.new(@sim, customer: customer).call
164 end
165 end
166 end
167end
168
169class SimWarn
170 include SimAction
171
172 def notify
173 m = Blather::Stanza::Message.new
174 m.body = "Your SIM #{iccid} only has " \
175 "#{(@sim.remaining_usage_kb / 1024).to_i} MB left"
176 customer.stanza_to(m)
177 end
178
179 def call
180 EMPromise.all([amount_spent, monthly_limit]).then do |(spent, limit)|
181 next unless spent >= limit || low_balance_and_not_auto_top_up
182
183 notify
184 LOG.info "Data warning #{customer.customer_id} #{@sim.iccid}"
185 end
186 end
187
188 def low_balance_and_not_auto_top_up
189 customer.balance < refill_price && !customer.auto_top_up_amount&.positive?
190 end
191end
192
193class SimAnnual
194 include SimAction
195
196 def notify
197 m = Blather::Stanza::Message.new
198 m.body = "Your SIM #{iccid} only has #{@sim.remaining_days} days left"
199 customer.stanza_to(m)
200 end
201
202 def annual_price
203 BigDecimal(CONFIG[:sims][customer.currency][:annual]) / 100
204 end
205
206 def call
207 if customer.balance >= annual_price
208 refill_and_bill(1024, annual_price, "Annual fee for #{iccid}")
209 else
210 LowBalance.for(customer, annual_price).then(&:notify!).then do |result|
211 next call if result.positive?
212
213 notify
214 end
215 end
216 end
217end
218
219def fetch_customers(cids)
220 # This is gross N+1 against the DB, but also does a buch of Redis work
221 # We expect the set to be very small for the forseeable future,
222 # hundreds at most
223 EMPromise.all(
224 Set.new(cids).to_a.compact.map { |id|
225 CUSTOMER_REPO.find(id).catch_only(CustomerRepo::NotFound) { nil }
226 }
227 ).then do |customers|
228 Hash[customers.map { |c| [c.customer_id, JobCustomer.new(c)] }]
229 end
230end
231
232SIM_QUERY = "SELECT iccid, customer_id FROM sims WHERE iccid = ANY ($1)"
233def load_customers!(sims)
234 DB.query_defer(SIM_QUERY, [sims.keys]).then { |rows|
235 fetch_customers(rows.map { |row| row["customer_id"] }).then do |customers|
236 rows.each do |row|
237 sims[row["iccid"]]&.customer = customers[row["customer_id"]]
238 end
239
240 sims
241 end
242 }
243end
244
245def decide_sim_actions(sims)
246 sims.each_with_object({}) { |sim, h|
247 if sim.remaining_days < 31
248 h[sim.iccid] = SimAnnual.new(sim)
249 elsif sim.remaining_usage_kb < 100000
250 h[sim.iccid] = SimTopUp.new(sim)
251 elsif sim.remaining_usage_kb < 250000
252 h[sim.iccid] = SimWarn.new(sim)
253 end
254 }.compact
255end
256
257EM.run do
258 REDIS = EM::Hiredis.connect
259 DB = Postgres.connect(dbname: "jmp")
260
261 BlatherNotify.start(
262 CONFIG[:xmpp][:jid],
263 CONFIG[:xmpp][:password]
264 ).then { SIM_REPO.all }.then { |sims|
265 load_customers!(decide_sim_actions(sims))
266 }.then { |items|
267 items = items.values.select(&:customer)
268 EMPromise.all(items.map(&:call))
269 }.catch { |e|
270 LOG.error e
271
272 if e.is_a?(::Exception)
273 Sentry.capture_exception(e)
274 else
275 Sentry.capture_message(e.to_s)
276 end
277 }.then { EM.stop }
278end