sim_job

  1#!/usr/bin/ruby
  2# frozen_string_literal: true
  3
  4require "bigdecimal"
  5require "pg/em/connection_pool"
  6require "eventmachine"
  7require "em_promise"
  8require "em-hiredis"
  9require "dhall"
 10require "ougai"
 11require "sentry-ruby"
 12
 13$stdout.sync = true
 14LOG = Ougai::Logger.new($stdout)
 15LOG.level = ENV.fetch("LOG_LEVEL", "info")
 16LOG.formatter = Ougai::Formatters::Readable.new(
 17	nil,
 18	nil,
 19	plain: !$stdout.isatty
 20)
 21
 22def log
 23	LOG
 24end
 25
 26Sentry.init do |config|
 27	config.background_worker_threads = 0
 28end
 29
 30SCHEMA = "{
 31	xmpp: { jid: Text, password: Text },
 32	component: { jid: Text },
 33	keepgo: Optional { access_token: Text, api_key: Text },
 34	sims: {
 35		CAD: { per_gb: Natural, annual: Natural },
 36		USD: { per_gb: Natural, annual: Natural }
 37	},
 38	plans: List {
 39		currency: < CAD | USD >,
 40		messages: < limited: { included: Natural, price: Natural } | unlimited >,
 41		minutes: < limited: { included: Natural, price: Natural } | unlimited >,
 42		monthly_price : Natural,
 43		name : Text,
 44		allow_register: Bool,
 45		subaccount_discount: Natural
 46	},
 47	braintree: {
 48		environment : Text,
 49		merchant_accounts : { CAD : Text, USD : Text },
 50		merchant_id : Text,
 51		private_key : Text,
 52		public_key : Text
 53	}
 54}"
 55
 56raise "Need a Dhall config" unless ARGV[0]
 57
 58CONFIG = Dhall::Coder
 59	.new(safe: Dhall::Coder::JSON_LIKE + [Symbol, Proc])
 60	.load("#{ARGV.first} : #{SCHEMA}", transform_keys: :to_sym)
 61
 62CONFIG[:keep_area_codes_in] = {}
 63CONFIG[:creds] = {}
 64
 65require_relative "../lib/async_braintree"
 66require_relative "../lib/blather_notify"
 67require_relative "../lib/customer_repo"
 68require_relative "../lib/expiring_lock"
 69require_relative "../lib/low_balance"
 70require_relative "../lib/postgres"
 71require_relative "../lib/sim_repo"
 72require_relative "../lib/transaction"
 73
 74BRAINTREE = AsyncBraintree.new(**CONFIG[:braintree])
 75CUSTOMER_REPO = CustomerRepo.new
 76SIM_REPO = SIMRepo.new
 77
 78class JobCustomer < SimpleDelegator
 79	def billing_customer
 80		super.then(&self.class.method(:new))
 81	end
 82
 83	def stanza_to(stanza)
 84		stanza = stanza.dup
 85		stanza.from = nil # It's a client connection, use default
 86		stanza.to = Blather::JID.new(
 87			"customer_#{customer_id}", CONFIG[:component][:jid]
 88		).with(resource: stanza.to&.resource)
 89		block_given? ? yield(stanza) : (BlatherNotify << stanza)
 90	end
 91end
 92
 93module SimAction
 94	attr_accessor :customer
 95
 96	def initialize(sim, customer: nil)
 97		@sim = sim
 98		@customer = customer
 99	end
100
101	def iccid
102		@sim.iccid
103	end
104
105	def customer_id
106		customer.customer_id
107	end
108
109	def refill_price
110		(BigDecimal(CONFIG[:sims][customer.currency][:per_gb]) / 100) * 5
111	end
112
113	def refill_and_bill(data, price, note)
114		SIM_REPO.refill(@sim, amount_mb: data).then { |keepgo_tx|
115			raise "SIM refill failed: #{iccid}" unless keepgo_tx["ack"] == "success"
116
117			Transaction.new(
118				customer_id: customer_id,
119				transaction_id: keepgo_tx["transaction_id"],
120				amount: -price, note: note
121			).insert
122		}.then do
123			LOG.info "Refilled #{customer.customer_id} #{iccid}"
124		end
125	end
126
127	def monthly_limit
128		REDIS.get(
129			"jmp_customer_monthly_data_limit-#{customer_id}"
130		).then do |limit|
131			BigDecimal(limit || refill_price)
132		end
133	end
134
135	def amount_spent
136		promise = DB.query_defer(<<~SQL, [customer_id])
137			SELECT COALESCE(SUM(amount), 0) AS a FROM transactions WHERE
138			customer_id=$1 AND transaction_id LIKE 'AB_59576_%' AND
139			created_at >= DATE_TRUNC('month', LOCALTIMESTAMP)
140		SQL
141		promise.then { |rows| -rows.first["a"] }
142	end
143end
144
145class SimTopUp
146	include SimAction
147
148	def low_balance
149		LowBalance.for(customer, refill_price).then(&:notify!).then do |result|
150			@customer = customer.with_balance(customer.balance + result)
151			next call if result.positive?
152
153			LOG.info "Low balance #{customer.customer_id} #{iccid}"
154		end
155	end
156
157	def call
158		EMPromise.all([amount_spent, monthly_limit]).then do |(spent, limit)|
159			if spent < limit
160				next low_balance if customer.balance < refill_price
161
162				refill_and_bill(5120, refill_price, "5GB Data Topup for #{iccid}")
163			else
164				SimWarn.new(@sim, customer: customer).call
165			end
166		end
167	end
168end
169
170class SimWarn
171	include SimAction
172
173	def notify
174		ExpiringLock.new("jmp_customer_sim_warn-#{customer.customer_id}").with do
175			m = Blather::Stanza::Message.new
176			m.body = "Your SIM #{iccid} only has " \
177				"#{(@sim.remaining_usage_kb / 1024).to_i} MB left"
178			customer.stanza_to(m)
179		end
180	end
181
182	def call
183		EMPromise.all([amount_spent, monthly_limit]).then do |(spent, limit)|
184			next unless spent >= limit || low_balance_and_not_auto_top_up
185
186			notify
187			LOG.info "Data warning #{customer.customer_id} #{@sim.iccid}"
188		end
189	end
190
191	def low_balance_and_not_auto_top_up
192		customer.balance < refill_price && !customer.auto_top_up_amount&.positive?
193	end
194end
195
196class SimAnnual
197	include SimAction
198
199	def notify
200		ExpiringLock.new("jmp_customer_sim_annual-#{customer.customer_id}").with do
201			m = Blather::Stanza::Message.new
202			m.body = "Your SIM #{iccid} only has #{@sim.remaining_days} days left"
203			customer.stanza_to(m)
204		end
205		LOG.info "Annual warning #{customer.customer_id} #{@sim.iccid}"
206	end
207
208	def annual_price
209		BigDecimal(CONFIG[:sims][customer.currency][:annual]) / 100
210	end
211
212	def call
213		if customer.balance >= annual_price
214			refill_and_bill(1024, annual_price, "Annual fee for #{iccid}")
215		else
216			LowBalance.for(customer, annual_price).then(&:notify!).then do |result|
217				next call if result.positive?
218
219				notify
220			end
221		end
222	end
223end
224
225def fetch_customers(cids)
226	# This is gross N+1 against the DB, but also does a buch of Redis work
227	# We expect the set to be very small for the forseeable future,
228	# hundreds at most
229	EMPromise.all(
230		Set.new(cids).to_a.compact.map { |id|
231			CUSTOMER_REPO.find(id).catch_only(CustomerRepo::NotFound) { nil }
232		}
233	).then do |customers|
234		Hash[customers.compact.map { |c| [c.customer_id, JobCustomer.new(c)] }]
235	end
236end
237
238SIM_QUERY = "SELECT iccid, customer_id FROM sims WHERE iccid = ANY ($1)"
239def load_customers!(sims)
240	DB.query_defer(SIM_QUERY, [sims.keys]).then { |rows|
241		fetch_customers(rows.map { |row| row["customer_id"] }).then do |customers|
242			rows.each do |row|
243				sims[row["iccid"]]&.customer = customers[row["customer_id"]]
244			end
245
246			sims
247		end
248	}
249end
250
251def decide_sim_actions(sims)
252	sims.each_with_object({}) { |sim, h|
253		if sim.remaining_days < 31
254			h[sim.iccid] = SimAnnual.new(sim)
255		elsif sim.remaining_usage_kb < 100000
256			h[sim.iccid] = SimTopUp.new(sim)
257		elsif sim.remaining_usage_kb < 250000
258			h[sim.iccid] = SimWarn.new(sim)
259		end
260	}.compact
261end
262
263EM.run do
264	REDIS = EM::Hiredis.connect
265	DB = Postgres.connect(dbname: "jmp")
266
267	BlatherNotify.start(
268		CONFIG[:xmpp][:jid],
269		CONFIG[:xmpp][:password]
270	).then { SIM_REPO.all }.then { |sims|
271		load_customers!(decide_sim_actions(sims))
272	}.then { |items|
273		items = items.values.select { |item| item.customer&.currency }
274		EMPromise.all(items.map(&:call))
275	}.catch { |e|
276		LOG.error e
277
278		if e.is_a?(::Exception)
279			Sentry.capture_exception(e)
280		else
281			Sentry.capture_message(e.to_s)
282		end
283	}.then { EM.stop }
284end