#!/usr/bin/ruby # frozen_string_literal: true require "bigdecimal" require "pg/em/connection_pool" require "eventmachine" require "em_promise" require "em-hiredis" require "dhall" require "ougai" require "sentry-ruby" $stdout.sync = true LOG = Ougai::Logger.new($stdout) LOG.level = ENV.fetch("LOG_LEVEL", "info") LOG.formatter = Ougai::Formatters::Readable.new( nil, nil, plain: !$stdout.isatty ) def log LOG end Sentry.init do |config| config.background_worker_threads = 0 end SCHEMA = "{ xmpp: { jid: Text, password: Text }, component: { jid: Text }, keepgo: Optional { access_token: Text, api_key: Text }, sims: { CAD: { per_gb: Natural, annual: Natural }, USD: { per_gb: Natural, annual: Natural } }, plans: List { currency: < CAD | USD >, messages: < limited: { included: Natural, price: Natural } | unlimited >, minutes: < limited: { included: Natural, price: Natural } | unlimited >, monthly_price : Natural, name : Text, allow_register: Bool, subaccount_discount: Natural }, braintree: { environment : Text, merchant_accounts : { CAD : Text, USD : Text }, merchant_id : Text, private_key : Text, public_key : Text } }" raise "Need a Dhall config" unless ARGV[0] CONFIG = Dhall::Coder .new(safe: Dhall::Coder::JSON_LIKE + [Symbol, Proc]) .load("#{ARGV.first} : #{SCHEMA}", transform_keys: :to_sym) CONFIG[:keep_area_codes_in] = {} CONFIG[:creds] = {} require_relative "../lib/async_braintree" require_relative "../lib/blather_notify" require_relative "../lib/customer_repo" require_relative "../lib/expiring_lock" require_relative "../lib/low_balance" require_relative "../lib/postgres" require_relative "../lib/sim_repo" require_relative "../lib/transaction" BRAINTREE = AsyncBraintree.new(**CONFIG[:braintree]) CUSTOMER_REPO = CustomerRepo.new SIM_REPO = SIMRepo.new class JobCustomer < SimpleDelegator def billing_customer super.then(&self.class.method(:new)) end def stanza_to(stanza) stanza = stanza.dup stanza.from = nil # It's a client connection, use default stanza.to = Blather::JID.new( "customer_#{customer_id}", CONFIG[:component][:jid] ).with(resource: stanza.to&.resource) block_given? ? yield(stanza) : (BlatherNotify << stanza) end end module SimAction attr_accessor :customer def initialize(sim, customer: nil) @sim = sim @customer = customer end def iccid @sim.iccid end def customer_id customer.customer_id end def refill_price (BigDecimal(CONFIG[:sims][customer.currency][:per_gb]) / 100) * 5 end def refill_and_bill(data, price, note) SIM_REPO.refill(@sim, amount_mb: data).then { |keepgo_tx| raise "SIM refill failed: #{iccid}" unless keepgo_tx["ack"] == "success" Transaction.new( customer_id: customer_id, transaction_id: keepgo_tx["transaction_id"], amount: -price, note: note ).insert }.then do LOG.info "Refilled #{customer.customer_id} #{iccid}" end end def monthly_limit REDIS.get( "jmp_customer_monthly_data_limit-#{customer_id}" ).then do |limit| BigDecimal(limit || refill_price) end end def amount_spent promise = DB.query_defer(<<~SQL, [customer_id]) SELECT COALESCE(SUM(amount), 0) AS a FROM transactions WHERE customer_id=$1 AND transaction_id LIKE 'AB_59576_%' AND created_at >= DATE_TRUNC('month', LOCALTIMESTAMP) SQL promise.then { |rows| -rows.first["a"] } end end class SimTopUp include SimAction def low_balance LowBalance.for(customer, refill_price).then(&:notify!).then do |result| @customer = customer.with_balance(customer.balance + result) next call if result.positive? LOG.info "Low balance #{customer.customer_id} #{iccid}" end end def call EMPromise.all([amount_spent, monthly_limit]).then do |(spent, limit)| if spent < limit next low_balance if customer.balance < refill_price refill_and_bill(5120, refill_price, "5GB Data Topup for #{iccid}") else SimWarn.new(@sim, customer: customer).call end end end end class SimWarn include SimAction def notify ExpiringLock.new("jmp_customer_sim_warn-#{customer.customer_id}").with do m = Blather::Stanza::Message.new m.body = "Your SIM #{iccid} only has " \ "#{(@sim.remaining_usage_kb / 1024).to_i} MB left" customer.stanza_to(m) end end def call EMPromise.all([amount_spent, monthly_limit]).then do |(spent, limit)| next unless spent >= limit || low_balance_and_not_auto_top_up notify LOG.info "Data warning #{customer.customer_id} #{@sim.iccid}" end end def low_balance_and_not_auto_top_up customer.balance < refill_price && !customer.auto_top_up_amount&.positive? end end class SimAnnual include SimAction def notify ExpiringLock.new("jmp_customer_sim_annual-#{customer.customer_id}").with do m = Blather::Stanza::Message.new m.body = "Your SIM #{iccid} only has #{@sim.remaining_days} days left" customer.stanza_to(m) end LOG.info "Annual warning #{customer.customer_id} #{@sim.iccid}" end def annual_price BigDecimal(CONFIG[:sims][customer.currency][:annual]) / 100 end def call if customer.balance >= annual_price refill_and_bill(1024, annual_price, "Annual fee for #{iccid}") else LowBalance.for(customer, annual_price).then(&:notify!).then do |result| next call if result.positive? notify end end end end def fetch_customers(cids) # This is gross N+1 against the DB, but also does a buch of Redis work # We expect the set to be very small for the forseeable future, # hundreds at most EMPromise.all( Set.new(cids).to_a.compact.map { |id| CUSTOMER_REPO.find(id).catch_only(CustomerRepo::NotFound) { nil } } ).then do |customers| Hash[customers.compact.map { |c| [c.customer_id, JobCustomer.new(c)] }] end end SIM_QUERY = "SELECT iccid, customer_id FROM sims WHERE iccid = ANY ($1)" def load_customers!(sims) DB.query_defer(SIM_QUERY, [sims.keys]).then { |rows| fetch_customers(rows.map { |row| row["customer_id"] }).then do |customers| rows.each do |row| sims[row["iccid"]]&.customer = customers[row["customer_id"]] end sims end } end def decide_sim_actions(sims) sims.each_with_object({}) { |sim, h| if sim.remaining_days < 31 h[sim.iccid] = SimAnnual.new(sim) elsif sim.remaining_usage_kb < 100000 h[sim.iccid] = SimTopUp.new(sim) elsif sim.remaining_usage_kb < 250000 h[sim.iccid] = SimWarn.new(sim) end }.compact end EM.run do REDIS = EM::Hiredis.connect DB = Postgres.connect(dbname: "jmp") BlatherNotify.start( CONFIG[:xmpp][:jid], CONFIG[:xmpp][:password] ).then { SIM_REPO.all }.then { |sims| load_customers!(decide_sim_actions(sims)) }.then { |items| items = items.values.select { |item| item.customer&.currency } EMPromise.all(items.map(&:call)) }.catch { |e| LOG.error e if e.is_a?(::Exception) Sentry.capture_exception(e) else Sentry.capture_message(e.to_s) end }.then { EM.stop } end