cancel_expired_customers

  1#!/usr/bin/ruby
  2# frozen_string_literal: true
  3
  4require "date"
  5require "dhall"
  6require "em_promise"
  7require "pg"
  8require "faraday/em_synchrony"
  9require "ruby-bandwidth-iris"
 10require "set"
 11
 12require_relative "../lib/blather_notify"
 13require_relative "../lib/to_form"
 14
 15CONFIG = Dhall.load(<<-DHALL).sync
 16	(#{ARGV[0]}) : {
 17		sgx_jmp: Text,
 18		creds: {
 19			account: Text,
 20			client_id: Text,
 21			client_secret: Text
 22		},
 23		notify_using: {
 24			jid: Text,
 25			password: Text,
 26			target: Text -> Text,
 27			body: Text -> Text -> Text
 28		},
 29		old_port_ins: List Text
 30	}
 31DHALL
 32
 33Faraday.default_adapter = :em_synchrony
 34BandwidthIris::Client.global_options = {
 35	account_id: CONFIG[:creds][:account],
 36	client_id: CONFIG[:creds][:client_id],
 37	client_secret: CONFIG[:creds][:client_secret]
 38}
 39
 40using ToForm
 41
 42db = PG.connect(dbname: "jmp")
 43db.type_map_for_results = PG::BasicTypeMapForResults.new(db)
 44db.type_map_for_queries = PG::BasicTypeMapForQueries.new(db)
 45
 46BlatherNotify.start(
 47	CONFIG[:notify_using][:jid],
 48	CONFIG[:notify_using][:password]
 49)
 50
 51def format(item)
 52	if item.respond_to?(:note) && item.note && item.note.text != ""
 53		item.note.text
 54	elsif item.respond_to?(:to_xml)
 55		item.to_xml
 56	else
 57		item.inspect
 58	end
 59end
 60
 61ported_in_promise = Promise.new
 62
 63EM.schedule do
 64	Fiber.new {
 65		begin
 66			tns = Set.new(CONFIG[:old_port_ins])
 67			page = BandwidthIris::PortIn.list(
 68				page: 1,
 69				size: 1000,
 70				status: :complete
 71			)
 72			while page
 73				page.each_slice(250) do |orders|
 74					EMPromise.all(
 75						orders.map { |order|
 76							EMPromise.resolve(nil).then { order.tns }
 77						}
 78					).sync.each { |chunk| tns += chunk.map { |tn| "+1#{tn}" } }
 79				end
 80				page = page.next
 81			end
 82			raise "ported_in looks wrong" if tns.length < 250
 83
 84			ported_in_promise.fulfill(tns)
 85		rescue StandardError
 86			ported_in_promise.reject($!)
 87		end
 88	}.resume
 89end
 90
 91class ExpiringCustomer
 92	def initialize(customer_id)
 93		@customer_id = customer_id
 94	end
 95
 96	def info
 97		BlatherNotify.execute(
 98			"customer info",
 99			{ q: @customer_id }.to_form(:submit)
100		).then do |iq|
101			@sessionid = iq.sessionid
102			unless iq.form.field("customer_id")
103				raise "#{@customer_id} not found"
104			end
105
106			@info = iq
107		end
108	end
109
110	def next
111		raise "Call info first" unless @sessionid && @info
112		return EMPromise.reject(:skip) unless @info.form.field("tel")
113
114		BlatherNotify.write_with_promise(BlatherNotify.command(
115			"customer info",
116			@sessionid
117		))
118	end
119
120	def cancel_account
121		raise "Call info first" unless @sessionid
122
123		BlatherNotify.write_with_promise(BlatherNotify.command(
124			"customer info",
125			@sessionid,
126			action: :complete,
127			form: { action: "cancel_account" }.to_form(:submit)
128		))
129	end
130end
131
132module SnikketInstanceManager
133	def self.stop_instances(instance_ids)
134		EMPromise.all(instance_ids.map { |id| stop_instance(id) })
135	end
136
137	def self.stop_instance(instance_id)
138		BlatherNotify.execute(
139			"stop snikket",
140			{ instance_id: instance_id }.to_form(:submit)
141		)
142	end
143end
144
145one = Queue.new
146
147ported_in_promise.then { |ported_in|
148	EM::Iterator.new(db.exec(
149		<<-SQL
150		SELECT customer_plans.customer_id,
151		       customer_plans.expires_at,
152		       array_agg(snikket_instances.instance_id) as instance_ids
153		FROM customer_plans
154		LEFT JOIN snikket_instances ON customer_plans.customer_id = snikket_instances.customer_id
155		WHERE customer_plans.expires_at < LOCALTIMESTAMP - INTERVAL '1 month'
156		GROUP BY customer_plans.customer_id, customer_plans.expires_at
157		SQL
158	), 3).each(nil, -> { one << :done }) do |row, iter|
159		customer = ExpiringCustomer.new(row["customer_id"])
160		customer.info.then { |iq|
161			if ported_in.include?(iq.form.field("tel")&.value&.to_s) &&
162			   row["expires_at"] > (Date.today << 12).to_time
163				puts "#{row['customer_id']} ported in, skipping"
164				EMPromise.reject(:skip)
165			else
166				customer.next
167			end
168		}.then {
169			customer.cancel_account
170		}.then { |result|
171			puts format(result)
172
173			SnikketInstanceManager.stop_instances(row["instance_ids"])
174		}.then { |stopped_instances|
175			puts "Stopped #{stopped_instances.length} Snikket instance(s)"
176			iter.next
177		}.catch do |err|
178			next iter.next if err == :skip
179
180			one << (err.is_a?(Exception) ? err : RuntimeError.new(format(err)))
181		end
182	end
183}.catch do |err|
184	one << (err.is_a?(Exception) ? err : RuntimeError.new(format(err)))
185end
186
187result = one.pop
188raise result if result.is_a?(Exception)