Run billing 3 at a time

Stephen Paul Weber created

All at once kills the box

Change summary

bin/billing_monthly_cronjob | 103 ++++++++++++++++++++++----------------
lib/blather_notify.rb       |  16 ++++-
2 files changed, 71 insertions(+), 48 deletions(-)

Detailed changes

bin/billing_monthly_cronjob 🔗

@@ -30,62 +30,77 @@ BlatherNotify.start(
 	CONFIG[:notify_using][:password]
 )
 
-promises = []
+def format(item)
+	if item.respond_to?(:note) && item.note && item.note.text != ""
+		item.note.text
+	elsif item.respond_to?(:to_xml)
+		item.to_xml
+	else
+		item.inspect
+	end
+end
 
-db.exec(
-	<<-SQL
-	SELECT customer_id
-	FROM customer_plans
-	WHERE expires_at <= LOCALTIMESTAMP + '4 days'
-	SQL
-).each do |row|
-	EM.next_tick do
-		promises << BlatherNotify.execute(
+class ExpiringCustomer
+	def initialize(customer_id)
+		@customer_id = customer_id
+	end
+
+	def info
+		BlatherNotify.execute(
 			"customer info",
-			{ q: row["customer_id"] }.to_form(:submit)
-		).then { |iq|
-			BlatherNotify.write_with_promise(BlatherNotify.command(
-				"customer info",
-				iq.sessionid
-			))
-		}.then do |iq|
-			unless iq.form.field("action")
-				next "#{row["customer_id"]} not found"
+			{ q: @customer_id }.to_form(:submit)
+		).then do |iq|
+			@sessionid = iq.sessionid
+			unless iq.form.field("customer_id")
+				raise "#{@customer_id} not found"
 			end
 
-			BlatherNotify.write_with_promise(BlatherNotify.command(
-				"customer info",
-				iq.sessionid,
-				action: :complete,
-				form: { action: "bill_plan" }.to_form(:submit)
-			))
+			iq
 		end
 	end
-end
 
-one = Queue.new
+	def next
+		raise "Call info first" unless @sessionid
 
-def format(item)
-	if item.respond_to?(:note) && item.note
-		item.note.text
-	elsif item.respond_to?(:to_xml)
-		item.to_xml
-	else
-		item.inspect
+		BlatherNotify.write_with_promise(BlatherNotify.command(
+			"customer info",
+			@sessionid
+		))
+	end
+
+	def bill_plan
+		raise "Call info first" unless @sessionid
+
+		BlatherNotify.write_with_promise(BlatherNotify.command(
+			"customer info",
+			@sessionid,
+			action: :complete,
+			form: { action: "bill_plan" }.to_form(:submit)
+		))
 	end
 end
 
-EM.add_timer(0) do
-	EMPromise.all(promises).then(
-		->(all) { one << all },
-		->(err) { one << RuntimeError.new(format(err)) }
-	)
+one = Queue.new
+
+EM::Iterator.new(db.exec(
+	<<-SQL
+	SELECT customer_id
+	FROM customer_plans
+	WHERE expires_at <= LOCALTIMESTAMP + '4 days'
+	SQL
+), 3).each(nil, -> { one << :done }) do |row, iter|
+	customer = ExpiringCustomer.new(row["customer_id"])
+	customer.info.then {
+		customer.next
+	}.then {
+		customer.bill_plan
+	}.then { |result|
+		puts format(result)
+		iter.next
+	}.catch do |err|
+		one << (err.is_a?(Exception) ? err : RuntimeError.new(format(err)))
+	end
 end
 
 result = one.pop
-
 raise result if result.is_a?(Exception)
-
-result.each do |item|
-	puts format(item)
-end

lib/blather_notify.rb 🔗

@@ -40,14 +40,22 @@ module BlatherNotify
 		@thread.join
 	end
 
-	def self.write_with_promise(stanza)
-		promise = EMPromise.new
-		EM.add_timer(15) do
+	def self.timeout_promise(promise, timeout: 15)
+		timer = EM.add_timer(timeout) {
 			promise.reject(:timeout)
+		}
+
+		promise.then do
+			timer.cancel
 		end
+	end
+
+	def self.write_with_promise(stanza)
+		promise = EMPromise.new
+		timeout_promise(promise)
 
 		client.write_with_handler(stanza) do |s|
-			if s.error?
+			if s.error? || s.type == :error
 				promise.reject(s)
 			else
 				promise.fulfill(s)