cancel_expired_customers

  1#!/usr/bin/ruby
  2# frozen_string_literal: true
  3
  4require "date"
  5require "dhall"
  6require "em_promise"
  7require "pg"
  8require "ruby-bandwidth-iris"
  9require "set"
 10
 11require_relative "../lib/blather_notify"
 12require_relative "../lib/to_form"
 13
 14CONFIG = Dhall.load(<<-DHALL).sync
 15	(#{ARGV[0]}) : {
 16		sgx_jmp: Text,
 17		creds: {
 18			account: Text,
 19			username: Text,
 20			password: Text
 21		},
 22		notify_using: {
 23			jid: Text,
 24			password: Text,
 25			target: Text -> Text,
 26			body: Text -> Text -> Text
 27		},
 28		old_port_ins: List Text
 29	}
 30DHALL
 31
 32Faraday.default_adapter = :em_synchrony
 33BandwidthIris::Client.global_options = {
 34	account_id: CONFIG[:creds][:account],
 35	username: CONFIG[:creds][:username],
 36	password: CONFIG[:creds][:password]
 37}
 38
 39using ToForm
 40
 41db = PG.connect(dbname: "jmp")
 42db.type_map_for_results = PG::BasicTypeMapForResults.new(db)
 43db.type_map_for_queries = PG::BasicTypeMapForQueries.new(db)
 44
 45BlatherNotify.start(
 46	CONFIG[:notify_using][:jid],
 47	CONFIG[:notify_using][:password]
 48)
 49
 50def format(item)
 51	if item.respond_to?(:note) && item.note && item.note.text != ""
 52		item.note.text
 53	elsif item.respond_to?(:to_xml)
 54		item.to_xml
 55	else
 56		item.inspect
 57	end
 58end
 59
 60ported_in_promise = Promise.new
 61
 62EM.schedule do
 63	Fiber.new {
 64		begin
 65			tns = Set.new(CONFIG[:old_port_ins])
 66			page = BandwidthIris::PortIn.list(
 67				page: 1,
 68				size: 1000,
 69				status: :complete
 70			)
 71			while page
 72				page.each_slice(250) do |orders|
 73					EMPromise.all(
 74						orders.map { |order|
 75							EMPromise.resolve(nil).then { order.tns }
 76						}
 77					).sync.each { |chunk| tns += chunk.map { |tn| "+1#{tn}" } }
 78				end
 79				page = page.next
 80			end
 81			raise "ported_in looks wrong" if tns.length < 250
 82
 83			ported_in_promise.fulfill(tns)
 84		rescue StandardError
 85			ported_in_promise.reject($!)
 86		end
 87	}.resume
 88end
 89
 90class ExpiringCustomer
 91	def initialize(customer_id)
 92		@customer_id = customer_id
 93	end
 94
 95	def info
 96		BlatherNotify.execute(
 97			"customer info",
 98			{ q: @customer_id }.to_form(:submit)
 99		).then do |iq|
100			@sessionid = iq.sessionid
101			unless iq.form.field("customer_id")
102				raise "#{@customer_id} not found"
103			end
104
105			@info = iq
106		end
107	end
108
109	def next
110		raise "Call info first" unless @sessionid && @info
111		return EMPromise.reject(:skip) unless @info.form.field("tel")
112
113		BlatherNotify.write_with_promise(BlatherNotify.command(
114			"customer info",
115			@sessionid
116		))
117	end
118
119	def cancel_account
120		raise "Call info first" unless @sessionid
121
122		BlatherNotify.write_with_promise(BlatherNotify.command(
123			"customer info",
124			@sessionid,
125			action: :complete,
126			form: { action: "cancel_account" }.to_form(:submit)
127		))
128	end
129end
130
131one = Queue.new
132
133ported_in_promise.then { |ported_in|
134	EM::Iterator.new(db.exec(
135		<<-SQL
136		SELECT customer_id, expires_at FROM customer_plans
137		WHERE expires_at < LOCALTIMESTAMP - INTERVAL '1 month'
138		SQL
139	), 3).each(nil, -> { one << :done }) do |row, iter|
140		customer = ExpiringCustomer.new(row["customer_id"])
141		customer.info.then { |iq|
142			if ported_in.include?(iq.form.field("tel")&.value&.to_s) &&
143			   row["expires_at"] > (Date.today << 12).to_time
144				puts "#{row['customer_id']} ported in, skipping"
145				EMPromise.reject(:skip)
146			else
147				customer.next
148			end
149		}.then {
150			customer.cancel_account
151		}.then { |result|
152			puts format(result)
153			iter.next
154		}.catch do |err|
155			next iter.next if err == :skip
156
157			one << (err.is_a?(Exception) ? err : RuntimeError.new(format(err)))
158		end
159	end
160}.catch do |err|
161	one << (err.is_a?(Exception) ? err : RuntimeError.new(format(err)))
162end
163
164result = one.pop
165raise result if result.is_a?(Exception)