In the previous PR we moved some of the logic out of the electrum
webhook handler, but it was still hitting electrum to turn addresses
into transaction hashes. We wanted to do even less in the electrum
webhook path, so this just dumps the raw info into a queue and then
returns. It doesn't even check if the customerID and address match,
that's handled offline now and in bulk.
Then, we have this new hydrate script which reads from that dumb queue,
hits the electrum, and then pushes to the slightly-smarter "queue-like"
hash for the process_pending_btc_transactions script to handle.
There's a fundamental difference between the two that probably I
wouldn't have built if I'd started from scratch, but I don't hate now
that we're here. This hydrate script is meant to be run as a daemon,
blocking in redis and maintaining a PID file for monit to monitor,
whereas the process_pending script is run by cron and is meant to handle
a bulk of orders all together.
So this one can just chew through every address as it comes in, or if
we're doing a reassert it can truck along on its merry way working
through a huge backlog, and then the process_script will allow a batch
to accumulate to process all at once. And I think that's okay.
I may have overdesigned the pidfile stuff, but meh, it was fun.
One of the main things this does versus the current model is to just
linearalize what would otherwise be a parallel process. Previously we'd
have perhaps multiple webhooks coming in together during a reassert say,
and they'd all be hitting electrum and blocking electrum. Whereas this
basically makes it so only one address can be hitting electrum at a
time. And that's on purpose to kinda hope that this is a little more
gentle on the poor thing.
But if we did want to parallelize later, the easiest way would just be
to run this 3 times with 3 different lockfiles, and that would ensure
there was always 3 and work with the existing code as-is.
@@ -0,0 +1,54 @@
+#!/usr/bin/ruby
+# frozen_string_literal: true
+
+# This reads the queue of "changed addresses" from electrum webhooks in the
+# form "<address>/<customer_id>" and turns those into <tx_hash>/<address> =>
+# customer_id keys for the bin/process_pending_btc_transactions script to run
+# through and process more fully
+#
+# Usage: bin/process_pending-btc_transactions '{
+# pidfile : Text,
+# electrum : env:ELECTRUM_CONFIG,
+# }'
+
+require "dhall"
+
+CONFIG =
+ Dhall::Coder
+ .new(safe: Dhall::Coder::JSON_LIKE + [Symbol, Proc])
+ .load(ARGV[0], transform_keys: :to_sym)
+
+require "redis"
+require_relative "../lib/electrum"
+require_relative "../lib/pidfile"
+
+REDIS = Redis.new
+ELECTRUM = Electrum.new(**CONFIG[:electrum])
+
+PIDFile.new(CONFIG[:pidfile]).lock do |pid|
+ loop do
+ key, value = REDIS.brpop("exciting_#{ELECTRUM.currency}_addrs", 600)
+
+ # Rewrite the pidfile so monit can see we're still alive
+ pid.refresh
+
+ # brpop allows blocking on multiple keys, and key tells us which one
+ # signalled but in this case we know which one, so just make sure it's
+ # not nil which means the timeout fired
+ next unless key
+
+ address, customer_id = value.split("/", 2)
+
+ txids =
+ ELECTRUM
+ .getaddresshistory(address)
+ .map { |item| "#{item['tx_hash']}/#{address}" }
+
+ next if txids.empty?
+
+ REDIS.hset(
+ "pending_#{ELECTRUM.currency}_transactions",
+ *txids.flat_map { |txid| [txid, customer_id] }
+ )
+ end
+end
@@ -0,0 +1,56 @@
+# frozen_string_literal: true
+
+class PIDFile
+ class NonExclusive < StandardError
+ def initalize
+ super("PID file exists and is locked by another process")
+ end
+ end
+
+ class OpenPIDFile
+ def initialize(handle, pid)
+ @handle = handle
+ @pid = pid
+ end
+
+ def refresh
+ # This rewrites the file to move the modified time up
+ # I _very likely_ don't have to rewrite the contents, because
+ # hopefully no one else has dumped crap into my file since that
+ # last time, and I could just write nothing...
+ #
+ # But since I need to do this at least once, it's not worse to do
+ # it later too, it's so few cycles really, and it also heals the
+ # file if something weird happened...
+ @handle.rewind
+ @handle.write("#{@pid}\n")
+ @handle.flush
+ @handle.truncate(@handle.pos)
+ end
+ end
+
+ def initialize(filename, pid=Process.pid)
+ @pid = pid
+ @filename = filename
+ end
+
+ def lock
+ File.open(@filename, File::RDWR | File::CREAT, 0o644) do |f|
+ raise NonExclusive unless f.flock(File::LOCK_EX | File::LOCK_NB)
+
+ begin
+ open_pid = OpenPIDFile.new(f, @pid)
+
+ # Start us off by writing to the file at least once
+ open_pid.refresh
+
+ # Then run the block, which can optionally refresh if they like
+ yield open_pid
+ ensure
+ # Cleanup the pidfile on shutdown
+ # But only if we obtained the lock
+ File.delete(@filename)
+ end
+ end
+ end
+end