sim_job

  1#!/usr/bin/ruby
  2# frozen_string_literal: true
  3
  4require "bigdecimal"
  5require "pg/em/connection_pool"
  6require "eventmachine"
  7require "em_promise"
  8require "em-hiredis"
  9require "dhall"
 10require "ougai"
 11require "sentry-ruby"
 12
 13$stdout.sync = true
 14LOG = Ougai::Logger.new($stdout)
 15LOG.level = ENV.fetch("LOG_LEVEL", "info")
 16LOG.formatter = Ougai::Formatters::Readable.new(
 17	nil,
 18	nil,
 19	plain: !$stdout.isatty
 20)
 21
 22def log
 23	LOG
 24end
 25
 26Sentry.init do |config|
 27	config.background_worker_threads = 0
 28end
 29
 30SCHEMA = "{
 31	xmpp: { jid: Text, password: Text },
 32	component: { jid: Text },
 33	keepgo: Optional { access_token: Text, api_key: Text },
 34	sims: {
 35		CAD: { per_gb: Natural, annual: Natural },
 36		USD: { per_gb: Natural, annual: Natural }
 37	},
 38	plans: List {
 39		currency: < CAD | USD >,
 40		messages: < limited: { included: Natural, price: Natural } | unlimited >,
 41		minutes: < limited: { included: Natural, price: Natural } | unlimited >,
 42		monthly_price : Natural,
 43		name : Text,
 44		allow_register: Bool,
 45		subaccount_discount: Natural
 46	},
 47	braintree: {
 48		environment : Text,
 49		merchant_accounts : { CAD : Text, USD : Text },
 50		merchant_id : Text,
 51		private_key : Text,
 52		public_key : Text
 53	}
 54}"
 55
 56raise "Need a Dhall config" unless ARGV[0]
 57
 58CONFIG = Dhall::Coder
 59	.new(safe: Dhall::Coder::JSON_LIKE + [Symbol, Proc])
 60	.load("#{ARGV.first} : #{SCHEMA}", transform_keys: :to_sym)
 61
 62CONFIG[:keep_area_codes_in] = {}
 63CONFIG[:creds] = {}
 64
 65require_relative "../lib/async_braintree"
 66require_relative "../lib/blather_notify"
 67require_relative "../lib/customer_repo"
 68require_relative "../lib/low_balance"
 69require_relative "../lib/postgres"
 70require_relative "../lib/sim_repo"
 71require_relative "../lib/transaction"
 72
 73BRAINTREE = AsyncBraintree.new(**CONFIG[:braintree])
 74CUSTOMER_REPO = CustomerRepo.new
 75SIM_REPO = SIMRepo.new
 76
 77class JobCustomer < SimpleDelegator
 78	def billing_customer
 79		super.then(&self.class.method(:new))
 80	end
 81
 82	def stanza_to(stanza)
 83		stanza = stanza.dup
 84		stanza.from = nil # It's a client connection, use default
 85		stanza.to = Blather::JID.new(
 86			"customer_#{customer_id}", CONFIG[:component][:jid]
 87		).with(resource: stanza.to&.resource)
 88		block_given? ? yield(stanza) : (BlatherNotify << stanza)
 89	end
 90end
 91
 92module SimAction
 93	attr_accessor :customer
 94
 95	def initialize(sim, customer: nil)
 96		@sim = sim
 97		@customer = customer
 98	end
 99
100	def iccid
101		@sim.iccid
102	end
103
104	def customer_id
105		customer.customer_id
106	end
107
108	def refill_price
109		(BigDecimal(CONFIG[:sims][customer.currency][:per_gb]) / 100) * 5
110	end
111
112	def refill_and_bill(data, price, note)
113		SIM_REPO.refill(@sim, amount_mb: data).then { |keepgo_tx|
114			raise "SIM refill failed: #{iccid}" unless keepgo_tx["ack"] == "success"
115
116			Transaction.new(
117				customer_id: customer_id,
118				transaction_id: keepgo_tx["transaction_id"],
119				amount: -price, note: note
120			).insert
121		}.then do
122			LOG.info "Refilled #{customer.customer_id} #{iccid}"
123		end
124	end
125
126	def monthly_limit
127		REDIS.get(
128			"jmp_customer_monthly_data_limit-#{customer_id}"
129		).then do |limit|
130			BigDecimal(limit || refill_price)
131		end
132	end
133
134	def amount_spent
135		promise = DB.query_defer(<<~SQL, [customer_id])
136			SELECT COALESCE(SUM(amount), 0) AS a FROM transactions WHERE
137			customer_id=$1 AND transaction_id LIKE 'AB_59576_%' AND
138			created_at >= DATE_TRUNC('month', LOCALTIMESTAMP)
139		SQL
140		promise.then { |rows| -rows.first["a"] }
141	end
142end
143
144class SimTopUp
145	include SimAction
146
147	def low_balance
148		LowBalance.for(customer, refill_price).then(&:notify!).then do |result|
149			@customer = customer.with_balance(customer.balance + result)
150			next call if result.positive?
151
152			LOG.info "Low balance #{customer.customer_id} #{iccid}"
153		end
154	end
155
156	def call
157		EMPromise.all([amount_spent, monthly_limit]).then do |(spent, limit)|
158			if spent < limit
159				next low_balance if customer.balance < refill_price
160
161				refill_and_bill(5120, refill_price, "5GB Data Topup for #{iccid}")
162			else
163				SimWarn.new(@sim, customer: customer).call
164			end
165		end
166	end
167end
168
169class SimWarn
170	include SimAction
171
172	def notify
173		m = Blather::Stanza::Message.new
174		m.body = "Your SIM #{iccid} only has " \
175		         "#{(@sim.remaining_usage_kb / 1024).to_i} MB left"
176		customer.stanza_to(m)
177	end
178
179	def call
180		EMPromise.all([amount_spent, monthly_limit]).then do |(spent, limit)|
181			next unless spent >= limit || low_balance_and_not_auto_top_up
182
183			notify
184			LOG.info "Data warning #{customer.customer_id} #{@sim.iccid}"
185		end
186	end
187
188	def low_balance_and_not_auto_top_up
189		customer.balance < refill_price && !customer.auto_top_up_amount&.positive?
190	end
191end
192
193class SimAnnual
194	include SimAction
195
196	def notify
197		m = Blather::Stanza::Message.new
198		m.body = "Your SIM #{iccid} only has #{@sim.remaining_days} days left"
199		customer.stanza_to(m)
200	end
201
202	def annual_price
203		BigDecimal(CONFIG[:sims][customer.currency][:annual]) / 100
204	end
205
206	def call
207		if customer.balance >= annual_price
208			refill_and_bill(1024, annual_price, "Annual fee for #{iccid}")
209		else
210			LowBalance.for(customer, annual_price).then(&:notify!).then do |result|
211				next call if result.positive?
212
213				notify
214			end
215		end
216	end
217end
218
219def fetch_customers(cids)
220	# This is gross N+1 against the DB, but also does a buch of Redis work
221	# We expect the set to be very small for the forseeable future,
222	# hundreds at most
223	EMPromise.all(
224		Set.new(cids).to_a.compact.map { |id|
225			CUSTOMER_REPO.find(id).catch_only(CustomerRepo::NotFound) { nil }
226		}
227	).then do |customers|
228		Hash[customers.map { |c| [c.customer_id, JobCustomer.new(c)] }]
229	end
230end
231
232SIM_QUERY = "SELECT iccid, customer_id FROM sims WHERE iccid = ANY ($1)"
233def load_customers!(sims)
234	DB.query_defer(SIM_QUERY, [sims.keys]).then { |rows|
235		fetch_customers(rows.map { |row| row["customer_id"] }).then do |customers|
236			rows.each do |row|
237				sims[row["iccid"]]&.customer = customers[row["customer_id"]]
238			end
239
240			sims
241		end
242	}
243end
244
245def decide_sim_actions(sims)
246	sims.each_with_object({}) { |sim, h|
247		if sim.remaining_days < 31
248			h[sim.iccid] = SimAnnual.new(sim)
249		elsif sim.remaining_usage_kb < 100000
250			h[sim.iccid] = SimTopUp.new(sim)
251		elsif sim.remaining_usage_kb < 250000
252			h[sim.iccid] = SimWarn.new(sim)
253		end
254	}.compact
255end
256
257EM.run do
258	REDIS = EM::Hiredis.connect
259	DB = Postgres.connect(dbname: "jmp")
260
261	BlatherNotify.start(
262		CONFIG[:xmpp][:jid],
263		CONFIG[:xmpp][:password]
264	).then { SIM_REPO.all }.then { |sims|
265		load_customers!(decide_sim_actions(sims))
266	}.then { |items|
267		items = items.values.select(&:customer)
268		EMPromise.all(items.map(&:call))
269	}.catch { |e|
270		LOG.error e
271
272		if e.is_a?(::Exception)
273			Sentry.capture_exception(e)
274		else
275			Sentry.capture_message(e.to_s)
276		end
277	}.then { EM.stop }
278end