cancel_expired_customers

  1#!/usr/bin/ruby
  2# frozen_string_literal: true
  3
  4require "date"
  5require "dhall"
  6require "em_promise"
  7require "pg"
  8require "ruby-bandwidth-iris"
  9require "set"
 10
 11require_relative "../lib/blather_notify"
 12require_relative "../lib/to_form"
 13
 14CONFIG = Dhall.load(<<-DHALL).sync
 15	(#{ARGV[0]}) : {
 16		sgx_jmp: Text,
 17		creds: {
 18			account: Text,
 19			username: Text,
 20			password: Text
 21		},
 22		notify_using: {
 23			jid: Text,
 24			password: Text,
 25			target: Text -> Text,
 26			body: Text -> Text -> Text
 27		}
 28	}
 29DHALL
 30
 31Faraday.default_adapter = :em_synchrony
 32BandwidthIris::Client.global_options = {
 33	account_id: CONFIG[:creds][:account],
 34	username: CONFIG[:creds][:username],
 35	password: CONFIG[:creds][:password]
 36}
 37
 38using ToForm
 39
 40db = PG.connect(dbname: "jmp")
 41db.type_map_for_results = PG::BasicTypeMapForResults.new(db)
 42db.type_map_for_queries = PG::BasicTypeMapForQueries.new(db)
 43
 44BlatherNotify.start(
 45	CONFIG[:notify_using][:jid],
 46	CONFIG[:notify_using][:password]
 47)
 48
 49def format(item)
 50	if item.respond_to?(:note) && item.note && item.note.text != ""
 51		item.note.text
 52	elsif item.respond_to?(:to_xml)
 53		item.to_xml
 54	else
 55		item.inspect
 56	end
 57end
 58
 59ported_in_promise = Promise.new
 60
 61EM.schedule do
 62	Fiber.new {
 63		begin
 64			tns = Set.new
 65			page = BandwidthIris::PortIn.list(
 66				page: 1,
 67				size: 1000,
 68				status: :complete
 69			)
 70			while page
 71				page.each_slice(250) do |orders|
 72					EMPromise.all(
 73						orders.map { |order|
 74							EMPromise.resolve(nil).then { order.tns }
 75						}
 76					).sync.each { |chunk| tns += chunk.map { |tn| "+1#{tn}" } }
 77				end
 78				page = page.next
 79			end
 80			raise "ported_in looks wrong" if tns.length < 250
 81
 82			ported_in_promise.fulfill(tns)
 83		rescue StandardError
 84			ported_in_promise.reject($!)
 85		end
 86	}.resume
 87end
 88
 89class ExpiringCustomer
 90	def initialize(customer_id)
 91		@customer_id = customer_id
 92	end
 93
 94	def info
 95		BlatherNotify.execute(
 96			"customer info",
 97			{ q: @customer_id }.to_form(:submit)
 98		).then do |iq|
 99			@sessionid = iq.sessionid
100			unless iq.form.field("customer_id")
101				raise "#{@customer_id} not found"
102			end
103
104			@info = iq
105		end
106	end
107
108	def next
109		raise "Call info first" unless @sessionid && @info
110		return EMPromise.reject(:skip) unless @info.form.field("tel")
111
112		BlatherNotify.write_with_promise(BlatherNotify.command(
113			"customer info",
114			@sessionid
115		))
116	end
117
118	def cancel_account
119		raise "Call info first" unless @sessionid
120
121		BlatherNotify.write_with_promise(BlatherNotify.command(
122			"customer info",
123			@sessionid,
124			action: :complete,
125			form: { action: "cancel_account" }.to_form(:submit)
126		))
127	end
128end
129
130one = Queue.new
131
132ported_in_promise.then { |ported_in|
133	EM::Iterator.new(db.exec(
134		<<-SQL
135		SELECT customer_id, expires_at FROM customer_plans
136		WHERE expires_at < LOCALTIMESTAMP - INTERVAL '1 month'
137		SQL
138	), 3).each(nil, -> { one << :done }) do |row, iter|
139		customer = ExpiringCustomer.new(row["customer_id"])
140		customer.info.then { |iq|
141			if ported_in.include?(iq.form.field("tel")&.value&.to_s) &&
142			   row["expires_at"] > (Date.today << 12).to_time
143				puts "#{row['customer_id']} ported in, skipping"
144				EMPromise.reject(:skip)
145			else
146				customer.next
147			end
148		}.then {
149			customer.cancel_account
150		}.then { |result|
151			puts format(result)
152			iter.next
153		}.catch do |err|
154			next iter.next if err == :skip
155
156			one << (err.is_a?(Exception) ? err : RuntimeError.new(format(err)))
157		end
158	end
159}.catch do |err|
160	one << (err.is_a?(Exception) ? err : RuntimeError.new(format(err)))
161end
162
163result = one.pop
164raise result if result.is_a?(Exception)