Detailed changes
@@ -0,0 +1,253 @@
+#!/usr/bin/ruby
+# frozen_string_literal: true
+
+require "bigdecimal"
+require "pg/em/connection_pool"
+require "eventmachine"
+require "em_promise"
+require "em-hiredis"
+require "dhall"
+require "sentry-ruby"
+
+Sentry.init do |config|
+ config.background_worker_threads = 0
+end
+
+SCHEMA = "{
+ xmpp: { jid: Text, password: Text },
+ component: { jid: Text },
+ keepgo: Optional { access_token: Text, api_key: Text },
+ sims: {
+ CAD: { per_gb: Natural, annual: Natural },
+ USD: { per_gb: Natural, annual: Natural }
+ },
+ plans: List {
+ currency: < CAD | USD >,
+ messages: < limited: { included: Natural, price: Natural } | unlimited >,
+ minutes: < limited: { included: Natural, price: Natural } | unlimited >,
+ monthly_price : Natural,
+ name : Text
+ }
+}"
+
+raise "Need a Dhall config" unless ARGV[0]
+
+CONFIG = Dhall::Coder
+ .new(safe: Dhall::Coder::JSON_LIKE + [Symbol, Proc])
+ .load("#{ARGV.first} : #{SCHEMA}", transform_keys: :to_sym)
+
+CONFIG[:keep_area_codes_in] = {}
+CONFIG[:creds] = {}
+
+require_relative "../lib/blather_notify"
+require_relative "../lib/customer_repo"
+require_relative "../lib/low_balance"
+require_relative "../lib/postgres"
+require_relative "../lib/sim_repo"
+require_relative "../lib/transaction"
+
+CUSTOMER_REPO = CustomerRepo.new
+SIM_REPO = SIMRepo.new
+
+class JobCustomer < SimpleDelegator
+ def billing_customer
+ super.then(&self.class.method(:new))
+ end
+
+ def stanza_to(stanza)
+ stanza = stanza.dup
+ stanza.from = nil # It's a client connection, use default
+ stanza.to = Blather::JID.new(
+ "customer_#{customer_id}", CONFIG[:component][:jid]
+ ).with(resource: stanza.to&.resource)
+ block_given? ? yield(stanza) : (BlatherNotify << stanza)
+ end
+end
+
+module SimAction
+ attr_accessor :customer
+
+ def initialize(sim)
+ @sim = sim
+ end
+
+ def iccid
+ @sim.iccid
+ end
+
+ def customer_id
+ customer.customer_id
+ end
+
+ def refill_price
+ (BigDecimal(CONFIG[:sims][customer.currency][:per_gb]) / 100) * 5
+ end
+
+ def refill_and_bill(data, price, note)
+ SIM_REPO.refill(@sim, amount_mb: data).then { |keepgo_tx|
+ raise "SIM refill failed" unless keepgo_tx["ack"] == "success"
+
+ Transaction.new(
+ customer_id: customer_id,
+ transaction_id: keepgo_tx["transaction_id"],
+ amount: -price, note: note
+ ).insert
+ }.then do
+ puts "Refilled #{customer.customer_id} #{iccid}"
+ end
+ end
+
+ def monthly_limit
+ REDIS.get(
+ "jmp_customer_monthly_data_limit-#{customer_id}"
+ ).then do |limit|
+ BigDecimal(limit || refill_price)
+ end
+ end
+
+ def amount_spent
+ promise = DB.query_defer(<<~SQL, [customer_id])
+ SELECT COALESCE(SUM(amount), 0) AS a FROM transactions WHERE
+ customer_id=$1 AND transaction_id LIKE 'AB_59576_%' AND
+ created_at >= DATE_TRUNC('month', LOCALTIMESTAMP)
+ SQL
+ promise.then { |rows| rows.first["a"] }
+ end
+end
+
+class SimTopUp
+ include SimAction
+
+ def low_balance
+ LowBalance.for(customer, refill_price).then(&:notify!).then do |result|
+ next call if result.positive?
+
+ puts "Low balance #{customer.customer_id} #{iccid}"
+ end
+ end
+
+ def call
+ EMPromise.all([amount_spent, monthly_limit]).then do |(spent, limit)|
+ if spent < limit
+ next low_balance if customer.balance < refill_price
+
+ refill_and_bill(5120, refill_price, "5GB Data Topup for #{iccid}")
+ else
+ SimWarn.new(@sim).call
+ end
+ end
+ end
+end
+
+class SimWarn
+ include SimAction
+
+ def notify
+ m = Blather::Stanza::Message.new
+ m.body = "Your SIM #{iccid} only has " \
+ "#{(@sim.remaining_usage_kb / 1024).to_i} MB left"
+ customer.stanza_to(m)
+ end
+
+ def call
+ EMPromise.all([amount_spent, monthly_limit]).then do |(spent, limit)|
+ next unless spent >= limit || low_balance_and_not_auto_top_up
+
+ notify
+ puts "Data warning #{customer.customer_id} #{sim.iccid}"
+ end
+ end
+
+ def low_balance_and_not_auto_top_up
+ customer.balance < refill_price && !customer.auto_top_up_amount&.positive?
+ end
+end
+
+class SimAnnual
+ include SimAction
+
+ def notify
+ m = Blather::Stanza::Message.new
+ m.body = "Your SIM #{iccid} only has #{@sim.remaining_days} days left"
+ customer.stanza_to(m)
+ end
+
+ def annual_price
+ BigDecimal(CONFIG[:sims][customer.currency][:annual]) / 100
+ end
+
+ def call
+ if customer.balance >= annual_fee
+ refill_and_bill(1024, annual_price, "Annual fee for #{iccid}")
+ else
+ LowBalance.for(customer, refill_price).then(&:notify!).then do |result|
+ next call if result.positive?
+
+ notify
+ end
+ end
+ end
+
+ def annual_fee
+ customer.currency == :USD ? BigDecimal("5.50") : BigDecimal("7.50")
+ end
+end
+
+def fetch_customers(cids)
+ # This is gross N+1 against the DB, but also does a buch of Redis work
+ # We expect the set to be very small for the forseeable future,
+ # hundreds at most
+ EMPromise.all(
+ Set.new(cids).to_a.compact.map { |id| CUSTOMER_REPO.find(id) }
+ ).then do |customers|
+ Hash[customers.map { |c| [c.customer_id, JobCustomer.new(c)] }]
+ end
+end
+
+SIM_QUERY = "SELECT iccid, customer_id FROM sims WHERE iccid = ANY ($1)"
+def load_customers!(sims)
+ DB.query_defer(SIM_QUERY, [sims.keys]).then { |rows|
+ fetch_customers(rows.map { |row| row["customer_id"] }).then do |customers|
+ rows.each do |row|
+ sims[row["iccid"]]&.customer = customers[row["customer_id"]]
+ end
+
+ sims
+ end
+ }
+end
+
+def decide_sim_actions(sims)
+ sims.each_with_object({}) { |sim, h|
+ if sim.remaining_usage_kb < 100000
+ h[sim.iccid] = SimTopUp.new(sim)
+ elsif sim.remaining_usage_kb < 250000
+ h[sim.iccid] = SimWarn.new(sim)
+ elsif sim.remaining_days < 30
+ h[sim.iccid] = SimAnnual.new(sim)
+ end
+ }.compact
+end
+
+EM.run do
+ REDIS = EM::Hiredis.connect
+ DB = Postgres.connect(dbname: "jmp")
+
+ BlatherNotify.start(
+ CONFIG[:xmpp][:jid],
+ CONFIG[:xmpp][:password]
+ ).then { SIM_REPO.all }.then { |sims|
+ load_customers!(decide_sim_actions(sims))
+ }.then { |items|
+ items = items.values.select(&:customer)
+ EMPromise.all(items.map(&:call))
+ }.catch { |e|
+ p e
+
+ if e.is_a?(::Exception)
+ Sentry.capture_exception(e)
+ else
+ Sentry.capture_message(e.to_s)
+ end
+ }.then { EM.stop }
+end
@@ -15,6 +15,7 @@
, component : { jid : Text, secret : Text }
, credit_card_url : forall (jid : Text) -> forall (customer_id : Text) -> Text
, creds : { account : Text, password : Text, username : Text }
+, direct_sources : List { mapKey : Text, mapValue : Text }
, direct_targets : List { mapKey : Text, mapValue : Text }
, electrum : { rpc_password : Text, rpc_uri : Text, rpc_username : Text }
, electrum_notify_url :
@@ -87,6 +87,9 @@ in
direct_targets = toMap {
`+15551234567` = "support@example.com"
},
+ direct_sources = toMap {
+ `support@example.com` = "+15551234567"
+ },
keep_area_codes = ["555"],
keep_area_codes_in = { account = "", site_id = "", sip_peer_id = "" },
snikket_hosting_api = "",
@@ -52,7 +52,7 @@ class LowBalance
return unless @transaction_amount&.positive?
return unless @transaction_amount > @customer.balance
- "You need an additional " \
+ "\nYou need an additional " \
"$#{'%.2f' % (@transaction_amount - @customer.balance)} "\
"to complete this transaction."
end
@@ -5,7 +5,7 @@ require "value_semantics/monkey_patched"
class SIM
value_semantics do
iccid(/\A\d+\Z/)
- lpa_code(/\ALPA:/)
+ lpa_code Either(/\ALPA:/, nil), default: nil
remaining_usage_kb Integer
remaining_days Integer
notes String
@@ -1,5 +1,8 @@
# frozen_string_literal: true
+require "em-http"
+require "em-synchrony/em-http" # For apost vs post
+require "json"
require "lazy_object"
require "value_semantics/monkey_patched"
@@ -16,11 +19,22 @@ class SIMRepo
"accessToken" => CONFIG[:keepgo][:access_token]
}.freeze
+ def all(start_page=1)
+ req("lines").aget(
+ head: KEEPGO_HEADERS, query: { page: start_page, per_page: 1000 }
+ ).then { |req|
+ result = JSON.parse(req.response)
+ sims = result.dig("sim_cards", "items")&.map(&SIM.method(:extract))
+
+ next sims if result.dig("sim_cards", "last_page") == start_page
+ next [] if !sims || sims.empty?
+
+ all(start_page + 1).then { |next_page| sims + next_page }
+ }
+ end
+
def find(iccid)
- EM::HttpRequest.new(
- "https://myaccount.keepgo.com/api/v2/line/#{iccid}/get_details",
- tls: { verify_peer: true }
- ).aget(head: KEEPGO_HEADERS).then { |req|
+ req("line/#{iccid}/get_details").aget(head: KEEPGO_HEADERS).then { |req|
SIM.extract(JSON.parse(req.response)&.dig("sim_card"))
}
end
@@ -28,10 +42,7 @@ class SIMRepo
def refill(sim, **kwargs)
iccid = sim.is_a?(String) ? sim : sim.iccid
- EM::HttpRequest.new(
- "https://myaccount.keepgo.com/api/v2/line/#{iccid}/refill",
- tls: { verify_peer: true }
- ).apost(
+ req("line/#{iccid}/refill").apost(
head: KEEPGO_HEADERS,
body: kwargs.to_json
).then { |req| JSON.parse(req.response) }
@@ -60,4 +71,13 @@ class SIMRepo
SELECT iccid FROM sims WHERE customer_id IS NULL LIMIT 1
SQL
end
+
+protected
+
+ def req(path)
+ EM::HttpRequest.new(
+ "https://myaccount.keepgo.com/api/v2/#{path}",
+ tls: { verify_peer: true }
+ )
+ end
end
@@ -367,7 +367,13 @@ CONFIG[:direct_targets].each do |(tel, jid)|
BLATHER << m
}
end
+end
+CONFIG[:direct_sources].each do |(jid, tel)|
+ customer_repo = CustomerRepo.new(
+ sgx_repo: TrivialBackendSgxRepo.new(jid: jid),
+ set_user: Sentry.method(:set_user)
+ )
message to: /\Acustomer_/, from: /\A#{Regexp.escape(jid)}\/?/ do |m|
customer_repo.find(m.to.node.delete_prefix("customer_")).then { |customer|
m.from = "#{tel}@sgx-jmp" # stanza_to will fix domain