cancel_expired_customers

  1#!/usr/bin/ruby
  2# frozen_string_literal: true
  3
  4require "date"
  5require "dhall"
  6require "em_promise"
  7require "pg"
  8require "ruby-bandwidth-iris"
  9require "set"
 10
 11require_relative "../lib/blather_notify"
 12require_relative "../lib/to_form"
 13
 14CONFIG = Dhall.load(<<-DHALL).sync
 15	(#{ARGV[0]}) : {
 16		sgx_jmp: Text,
 17		creds: {
 18			account: Text,
 19			username: Text,
 20			password: Text
 21		},
 22		notify_using: {
 23			jid: Text,
 24			password: Text,
 25			target: Text -> Text,
 26			body: Text -> Text -> Text
 27		},
 28		old_port_ins: List Text
 29	}
 30DHALL
 31
 32Faraday.default_adapter = :em_synchrony
 33BandwidthIris::Client.global_options = {
 34	account_id: CONFIG[:creds][:account],
 35	username: CONFIG[:creds][:username],
 36	password: CONFIG[:creds][:password]
 37}
 38
 39using ToForm
 40
 41db = PG.connect(dbname: "jmp")
 42db.type_map_for_results = PG::BasicTypeMapForResults.new(db)
 43db.type_map_for_queries = PG::BasicTypeMapForQueries.new(db)
 44
 45BlatherNotify.start(
 46	CONFIG[:notify_using][:jid],
 47	CONFIG[:notify_using][:password]
 48)
 49
 50def format(item)
 51	if item.respond_to?(:note) && item.note && item.note.text != ""
 52		item.note.text
 53	elsif item.respond_to?(:to_xml)
 54		item.to_xml
 55	else
 56		item.inspect
 57	end
 58end
 59
 60ported_in_promise = Promise.new
 61
 62EM.schedule do
 63	Fiber.new {
 64		begin
 65			tns = Set.new(CONFIG[:old_port_ins])
 66			page = BandwidthIris::PortIn.list(
 67				page: 1,
 68				size: 1000,
 69				status: :complete
 70			)
 71			while page
 72				page.each_slice(250) do |orders|
 73					EMPromise.all(
 74						orders.map { |order|
 75							EMPromise.resolve(nil).then { order.tns }
 76						}
 77					).sync.each { |chunk| tns += chunk.map { |tn| "+1#{tn}" } }
 78				end
 79				page = page.next
 80			end
 81			raise "ported_in looks wrong" if tns.length < 250
 82
 83			ported_in_promise.fulfill(tns)
 84		rescue StandardError
 85			ported_in_promise.reject($!)
 86		end
 87	}.resume
 88end
 89
 90class ExpiringCustomer
 91	def initialize(customer_id)
 92		@customer_id = customer_id
 93	end
 94
 95	def info
 96		BlatherNotify.execute(
 97			"customer info",
 98			{ q: @customer_id }.to_form(:submit)
 99		).then do |iq|
100			@sessionid = iq.sessionid
101			unless iq.form.field("customer_id")
102				raise "#{@customer_id} not found"
103			end
104
105			@info = iq
106		end
107	end
108
109	def next
110		raise "Call info first" unless @sessionid && @info
111		return EMPromise.reject(:skip) unless @info.form.field("tel")
112
113		BlatherNotify.write_with_promise(BlatherNotify.command(
114			"customer info",
115			@sessionid
116		))
117	end
118
119	def cancel_account
120		raise "Call info first" unless @sessionid
121
122		BlatherNotify.write_with_promise(BlatherNotify.command(
123			"customer info",
124			@sessionid,
125			action: :complete,
126			form: { action: "cancel_account" }.to_form(:submit)
127		))
128	end
129end
130
131module SnikketInstanceManager
132	def self.stop_instances(instance_ids)
133		EMPromise.all(instance_ids.map { |id| stop_instance(id) })
134	end
135
136	def self.stop_instance(instance_id)
137		BlatherNotify.execute(
138			"stop snikket",
139			{ instance_id: instance_id }.to_form(:submit)
140		)
141	end
142end
143
144one = Queue.new
145
146ported_in_promise.then { |ported_in|
147	EM::Iterator.new(db.exec(
148		<<-SQL
149		SELECT customer_plans.customer_id,
150		       customer_plans.expires_at,
151		       array_agg(snikket_instances.instance_id) as instance_ids
152		FROM customer_plans
153		LEFT JOIN snikket_instances ON customer_plans.customer_id = snikket_instances.customer_id
154		WHERE customer_plans.expires_at < LOCALTIMESTAMP - INTERVAL '1 month'
155		GROUP BY customer_plans.customer_id, customer_plans.expires_at
156		SQL
157	), 3).each(nil, -> { one << :done }) do |row, iter|
158		customer = ExpiringCustomer.new(row["customer_id"])
159		customer.info.then { |iq|
160			if ported_in.include?(iq.form.field("tel")&.value&.to_s) &&
161			   row["expires_at"] > (Date.today << 12).to_time
162				puts "#{row['customer_id']} ported in, skipping"
163				EMPromise.reject(:skip)
164			else
165				customer.next
166			end
167		}.then {
168			customer.cancel_account
169		}.then { |result|
170			puts format(result)
171
172			SnikketInstanceManager.stop_instances(row["instance_ids"])
173		}.then { |stopped_instances|
174			puts "Stopped #{stopped_instances.length} Snikket instance(s)"
175			iter.next
176		}.catch do |err|
177			next iter.next if err == :skip
178
179			one << (err.is_a?(Exception) ? err : RuntimeError.new(format(err)))
180		end
181	end
182}.catch do |err|
183	one << (err.is_a?(Exception) ? err : RuntimeError.new(format(err)))
184end
185
186result = one.pop
187raise result if result.is_a?(Exception)