Enqueue All Electrum Transactions, Filter and Chunk in Process Pending

Christopher Vollick created

This does two things:

The first is that it moves validation of the electrum transactions out
of the webhook and into the processing script.

The theory here is that we want the actual webhook to be lighter so
things like reasserts aren't as hard on the system, etc. So we could
have dumped into a queue to process later by doing these checks and then
pushing into the pending list when it looks legit.

But really the pending list _is_ a queue already. So rather than having
another queue and another background process to process it, let's just
have the bin we're already running do the checks before bothering
electrum! Then the electrum webhook is basically just dumping into the
pending list. This means that, on a reassert, every transaction we've
ever seen will end up in this queue, but if we had a background that
would effectively still be true, it would just be a different pre-queue
queue.

There's just one problem with this scheme, which is that the current
speed of the bin is not enough to actually make it through this much
data in any kind of reasonable time. So! I've also introduced batching
to this new process that runs the validity filters on an entire chunk at
a time, with the hope that we can chew through the vast majority of them
with only a few queries to the DB and Redis before bothering Electrum,
which is the slower and more fragile of them.

This could have been two patches, but I decided to include them together
so the old logic and the new logic are both next to each in the same
place for easier context. Otherwise there would have been a patch that
added a bunch of filters and stuff that would seem like they came out of
nowhere at best, and like they were doing unnecessary work because we
were already filtering things at worst. Then the next commit would just
be deleting that stuff, making the last one necessary.

This way you can see where the logic _used_ to live, and compare it to
the equivalent logic in the new place.

Change summary

bin/process_pending_btc_transactions  |   5 
config.ru                             |  85 +--------
lib/pending_transaction_repo.rb       | 105 ++++++++++-
test/test_pending_transaction_repo.rb | 264 ++++++++++++++++++++++++++++
4 files changed, 373 insertions(+), 86 deletions(-)

Detailed changes

bin/process_pending_btc_transactions 🔗

@@ -219,7 +219,10 @@ class Customer
 end
 
 repo = PendingTransactionRepo.new(
-	"pending_#{CONFIG[:electrum][:currency]}_transactions"
+	"pending_#{CONFIG[:electrum][:currency]}_transactions",
+	customer_address_template: lambda { |customer_id|
+		"jmp_customer_#{CONFIG[:electrum][:currency]}_addresses-#{customer_id}"
+	}
 )
 
 repo.error_handler do |e|

config.ru 🔗

@@ -120,55 +120,6 @@ class CreditCardGateway
 	end
 end
 
-class UnknownTransactions
-	def self.from(currency, customer_id, address, tx_hashes)
-		self.for(
-			currency,
-			customer_id,
-			fetch_rows_for(address, tx_hashes).map { |row|
-				row["transaction_id"]
-			}
-		)
-	end
-
-	def self.fetch_rows_for(address, tx_hashes)
-		values = tx_hashes.map { |tx_hash|
-			"('#{DB.escape_string(tx_hash)}/#{DB.escape_string(address)}')"
-		}
-		return [] if values.empty?
-
-		DB.exec_params(<<-SQL)
-			SELECT transaction_id FROM
-				(VALUES #{values.join(',')}) AS t(transaction_id)
-				LEFT JOIN transactions USING (transaction_id)
-			WHERE transactions.transaction_id IS NULL
-		SQL
-	end
-
-	def self.for(currency, customer_id, transaction_ids)
-		return None.new if transaction_ids.empty?
-
-		new(currency, customer_id, transaction_ids)
-	end
-
-	def initialize(currency, customer_id, transaction_ids)
-		@currency = currency
-		@customer_id = customer_id
-		@transaction_ids = transaction_ids
-	end
-
-	def enqueue!
-		REDIS.hset(
-			"pending_#{@currency}_transactions",
-			*@transaction_ids.flat_map { |txid| [txid, @customer_id] }
-		)
-	end
-
-	class None
-		def enqueue!; end
-	end
-end
-
 class CardVault
 	def self.for(gateway, nonce, amount=nil)
 		if amount&.positive?
@@ -253,21 +204,6 @@ class JmpPay < Roda
 		end
 	end
 
-	def redis_key_btc_addresses
-		"jmp_customer_#{electrum.currency}_addresses-#{params['customer_id']}"
-	end
-
-	def verify_address_customer_id(r)
-		return if REDIS.sismember(redis_key_btc_addresses, params["address"])
-
-		warn "Address and customer_id do not match"
-		r.halt([
-			403,
-			{ "Content-Type" => "text/plain" },
-			"Address and customer_id do not match"
-		])
-	end
-
 	def nil_empty(s)
 		s.to_s == "" ? nil : s
 	end
@@ -335,16 +271,19 @@ class JmpPay < Roda
 
 	route do |r|
 		r.on "electrum_notify" do
-			verify_address_customer_id(r)
-
-			UnknownTransactions.from(
-				electrum.currency,
-				params["customer_id"],
-				params["address"],
+			tx_hashes =
 				electrum
-					.getaddresshistory(params["address"])
-					.map { |item| item["tx_hash"] }
-			).enqueue!
+				.getaddresshistory(params["address"])
+				.map { |item| item["tx_hash"] }
+
+			txids = tx_hashes.map { |tx_hash|
+				"#{tx_hash}/#{params['address']}"
+			}
+
+			REDIS.hset(
+				"pending_#{electrum.currency}_transactions",
+				*txids.flat_map { |txid| [txid, params["customer_id"]] }
+			)
 
 			"OK"
 		end

lib/pending_transaction_repo.rb 🔗

@@ -26,8 +26,16 @@ class PendingTransactionRepo
 		end
 	end
 
-	def initialize(key)
+	def initialize(key, customer_address_template: nil)
 		@key = key
+		@customer_address_template = customer_address_template
+
+		# Default handler allows all exceptions to be rethrown
+		@error_handler = ->(*){}
+	end
+
+	def database
+		@database ||= LazyObject.new { DB }
 	end
 
 	def redis
@@ -42,18 +50,95 @@ class PendingTransactionRepo
 		@error_handler = block
 	end
 
-	def map
-		redis.hgetall(@key).map { |(txid, customer_id)|
-			begin
-				tx_hash, address = txid.split("/", 2)
+	class ExistingTransactionFilter
+		def initialize(database)
+			@database = database
+		end
+
+		def prepare_sql(txids_and_customer_ids)
+			values = txids_and_customer_ids.map { |(txid, _customer_id)|
+				"('#{@database.escape_string(txid)}')"
+			}
+
+			<<-SQL
+				SELECT
+					transaction_id,
+					transactions.transaction_id IS NOT NULL as exists
+				FROM
+					(VALUES #{values.join(',')}) AS t(transaction_id)
+				LEFT JOIN transactions USING (transaction_id)
+			SQL
+		end
+
+		def filter_chunk(txids_and_customer_ids)
+			exists =
+				@database
+				.exec_params(prepare_sql(txids_and_customer_ids))
+				.map { |row| [row["transaction_id"], row["exists"]] }
+				.to_h
+
+			txids_and_customer_ids.reject do |(txid, _customer_id)|
+				exists[txid]
+			end
+		end
+	end
+
+	class WrongCustomerFilter
+		def initialize(redis, template)
+			@redis = redis
+			@template = template
+		end
+
+		def query_redis(txids_and_customer_ids)
+			@redis.pipelined do
+				txids_and_customer_ids.each do |(txid, customer_id)|
+					_tx_hash, addr = txid.split("/")
+					@redis.sismember(@template.call(customer_id), addr)
+				end
+			end
+		end
 
-				txn = electrum.gettransaction(tx_hash)
+		def filter_chunk(txids_and_customer_ids)
+			results = query_redis(txids_and_customer_ids)
 
-				yield [PendingTransaction.new(txn, address), customer_id]
-			rescue StandardError => e
-				raise e unless @error_handler.call(e)
+			txids_and_customer_ids.select do |(txid, customer_id)|
+				r = results.shift
+				warn "#{txid} doesn't match customer #{customer_id}" unless r
+				r
 			end
-		}.compact
+		end
+	end
+
+	def filter(txids_and_customer_ids)
+		@filters ||= [
+			WrongCustomerFilter.new(redis, @customer_address_template),
+			ExistingTransactionFilter.new(database)
+		]
+
+		@filters.reduce(txids_and_customer_ids) do |remaining_txs, f|
+			next [] if remaining_txs.empty?
+
+			f.filter_chunk(remaining_txs)
+		end
+	end
+
+	def build_transaction(txid)
+		tx_hash, address = txid.split("/", 2)
+		txn = electrum.gettransaction(tx_hash)
+
+		PendingTransaction.new(txn, address)
+	end
+
+	def map(chunk_size: 200)
+		redis.hgetall(@key).each_slice(chunk_size).map { |chunk|
+			filter(chunk).map { |(txid, customer_id)|
+				begin
+					yield [build_transaction(txid), customer_id]
+				rescue StandardError => e
+					raise e unless @error_handler.call(e)
+				end
+			}.compact
+		}.flatten
 	end
 
 	def remove_transaction(pending)

test/test_pending_transaction_repo.rb 🔗

@@ -7,6 +7,10 @@ class PendingTransactionRepo
 		@redis = Minitest::Mock.new
 		@electrum = Minitest::Mock.new
 	end
+
+	def override_filters(filters)
+		@filters = filters
+	end
 end
 
 FakeElectrumTransaction = Struct.new(:tx_hash, :confirmations, :value) {
@@ -35,6 +39,7 @@ class TestPendingTransactionRepo < Minitest::Test
 	def test_map
 		repo = PendingTransactionRepo.new("key")
 		repo.setup_mocks
+		repo.override_filters([])
 		repo.redis.expect(
 			:hgetall,
 			[["tx/addr", "1234"]],
@@ -59,6 +64,7 @@ class TestPendingTransactionRepo < Minitest::Test
 	def test_error_handler
 		repo = PendingTransactionRepo.new("key")
 		repo.setup_mocks
+		repo.override_filters([])
 		repo.redis.expect(
 			:hgetall,
 			[["tx/addr", "1234"], ["missing/addr", "1234"]],
@@ -95,6 +101,7 @@ class TestPendingTransactionRepo < Minitest::Test
 	def test_other_errors
 		repo = PendingTransactionRepo.new("key")
 		repo.setup_mocks
+		repo.override_filters([])
 		repo.redis.expect(
 			:hgetall,
 			[["tx/addr", "1234"], ["error/addr", "1234"]],
@@ -103,7 +110,7 @@ class TestPendingTransactionRepo < Minitest::Test
 		def repo.electrum
 			Class.new {
 				def gettransaction(txid)
-					raise ArgumentError, "Oh no" if txid == "error"
+					raise "Oh no" if txid == "error"
 
 					FakeElectrumTransaction.new("tx", 6, 0.5)
 				end
@@ -117,7 +124,37 @@ class TestPendingTransactionRepo < Minitest::Test
 			end
 		end
 
-		assert_raises(ArgumentError) do
+		assert_raises(RuntimeError) do
+			repo.map { |pending, customer_id|
+				"#{pending.value} #{customer_id}"
+			}
+		end
+
+		assert_mock repo.redis
+	end
+
+	# This is basically the same as test_other_errors but uses the default
+	# error handler that should re-throw everything
+	def test_default_errors
+		repo = PendingTransactionRepo.new("key")
+		repo.setup_mocks
+		repo.override_filters([])
+		repo.redis.expect(
+			:hgetall,
+			[["tx/addr", "1234"], ["error/addr", "1234"]],
+			["key"]
+		)
+		def repo.electrum
+			Class.new {
+				def gettransaction(txid)
+					raise "Oh no" if txid == "error"
+
+					FakeElectrumTransaction.new("tx", 6, 0.5)
+				end
+			}.new
+		end
+
+		assert_raises(RuntimeError) do
 			repo.map { |pending, customer_id|
 				"#{pending.value} #{customer_id}"
 			}
@@ -142,4 +179,227 @@ class TestPendingTransactionRepo < Minitest::Test
 		assert_mock repo.redis
 		assert_mock repo.electrum
 	end
+
+	def test_chunking
+		repo = PendingTransactionRepo.new("key")
+		repo.setup_mocks
+		mock_filter = Minitest::Mock.new
+		mock_filter.expect(
+			:filter_chunk, [["one/a", "1234"], ["two/a", "1234"]],
+			[[["one/a", "1234"], ["two/a", "1234"]]]
+		)
+		mock_filter.expect(
+			:filter_chunk, [["three/a", "1234"]],
+			[[["three/a", "1234"]]]
+		)
+		repo.override_filters([mock_filter])
+		repo.redis.expect(
+			:hgetall,
+			[["one/a", "1234"], ["two/a", "1234"], ["three/a", "1234"]],
+			["key"]
+		)
+		repo.electrum.expect(
+			:gettransaction,
+			FakeElectrumTransaction.new("one", 6, 0.5),
+			["one"]
+		)
+		repo.electrum.expect(
+			:gettransaction,
+			FakeElectrumTransaction.new("two", 6, 0.5),
+			["two"]
+		)
+		repo.electrum.expect(
+			:gettransaction,
+			FakeElectrumTransaction.new("three", 6, 0.5),
+			["three"]
+		)
+
+		v = repo.map(chunk_size: 2) { |pending, _customer_id|
+			pending.tx_hash
+		}
+
+		assert_equal(
+			["one", "two", "three"], v,
+			"Should have returned result of block"
+		)
+
+		assert_mock repo.redis
+		assert_mock repo.electrum
+		assert_mock mock_filter
+	end
+
+	def test_existing_transaction_filter
+		db_mock = Minitest::Mock.new
+		filter = PendingTransactionRepo::ExistingTransactionFilter.new(db_mock)
+
+		db_mock.expect(:escape_string, "one/a", ["one/a"])
+		db_mock.expect(:escape_string, "two/a", ["two/a"])
+		db_mock.expect(:escape_string, "three/b", ["three/b"])
+		# I've pretended this made a change here
+		db_mock.expect(:escape_string, "four/C", ["four/c"])
+
+		# I don't want to match the regex literally, that seems like a bit much
+		# so instead I've just matched the parameterized part
+		db_mock.expect(
+			:exec_params,
+			[
+				{ "transaction_id" => "one/a", "exists" => false },
+				{ "transaction_id" => "two/a", "exists" => true },
+				{ "transaction_id" => "three/b", "exists" => true },
+				{ "transaction_id" => "four/c", "exists" => false }
+			],
+			[/
+				\(VALUES
+				\s
+				\('one\/a'\),\('two\/a'\),
+				\('three\/b'\),\('four\/C'\)
+				\)
+			/x]
+		)
+
+		remaining = filter.filter_chunk([
+			["one/a", "1234"],
+			["two/a", "1234"],
+			["three/b", "4321"],
+			["four/c", "2323"]
+		])
+
+		assert_equal(
+			[["one/a", "1234"], ["four/c", "2323"]],
+			remaining,
+			"should only include unfiltered results"
+		)
+
+		assert_mock db_mock
+	end
+
+	def test_wrong_customer_filter
+		redis = Object.new
+
+		def redis.pipelined
+			@stuff = []
+			yield
+			@stuff
+		end
+
+		def redis.sismember(key, value)
+			store = {
+				"test_key_1234" => ["not_a"],
+				"test_key_4321" => ["b"],
+				"test_key_2323" => ["c"]
+			}
+			@stuff << store[key].include?(value)
+		end
+
+		filter = PendingTransactionRepo::WrongCustomerFilter.new(
+			redis, ->(customer_id) { "test_key_#{customer_id}" }
+		)
+
+		def filter.warn(s)
+			@warnings ||= []
+			@warnings << s
+		end
+
+		def filter.sneak_warnings
+			@warnings
+		end
+
+		remaining = filter.filter_chunk([
+			["one/a", "1234"],
+			["two/a", "1234"],
+			["three/b", "4321"],
+			["four/c", "2323"]
+		])
+
+		assert_equal(
+			[["three/b", "4321"], ["four/c", "2323"]],
+			remaining,
+			"should only include unfiltered results"
+		)
+
+		assert_equal(
+			[
+				"one/a doesn't match customer 1234",
+				"two/a doesn't match customer 1234"
+			],
+			filter.sneak_warnings,
+			"should have warned about busted results"
+		)
+	end
+
+	def test_filter_stack
+		repo = PendingTransactionRepo.new("key")
+		repo.setup_mocks
+		mock_filter_one = Minitest::Mock.new
+		mock_filter_one.expect(
+			:filter_chunk, [["one/a", "1234"], ["two/a", "1234"]],
+			[[["one/a", "1234"], ["two/a", "1234"], ["three/a", "1234"]]]
+		)
+
+		mock_filter_two = Minitest::Mock.new
+		mock_filter_two.expect(
+			:filter_chunk, [["one/a", "1234"]],
+			[[["one/a", "1234"], ["two/a", "1234"]]]
+		)
+
+		repo.override_filters([mock_filter_one, mock_filter_two])
+		repo.redis.expect(
+			:hgetall,
+			[["one/a", "1234"], ["two/a", "1234"], ["three/a", "1234"]],
+			["key"]
+		)
+		repo.electrum.expect(
+			:gettransaction,
+			FakeElectrumTransaction.new("one", 6, 0.5),
+			["one"]
+		)
+
+		v = repo.map { |pending, _customer_id|
+			pending.tx_hash
+		}
+
+		assert_equal(
+			["one"], v,
+			"Should have returned result of block"
+		)
+
+		assert_mock repo.redis
+		assert_mock repo.electrum
+		assert_mock mock_filter_one
+		assert_mock mock_filter_two
+	end
+
+	def test_filter_all
+		repo = PendingTransactionRepo.new("key")
+		repo.setup_mocks
+		mock_filter_one = Minitest::Mock.new
+		mock_filter_one.expect(
+			:filter_chunk, [],
+			[[["one/a", "1234"], ["two/a", "1234"], ["three/a", "1234"]]]
+		)
+
+		# Shouldn't be called at all
+		mock_filter_two = Minitest::Mock.new
+
+		repo.override_filters([mock_filter_one, mock_filter_two])
+		repo.redis.expect(
+			:hgetall,
+			[["one/a", "1234"], ["two/a", "1234"], ["three/a", "1234"]],
+			["key"]
+		)
+
+		v = repo.map { |pending, _customer_id|
+			pending.tx_hash
+		}
+
+		assert_equal(
+			[], v,
+			"Should have returned result of block"
+		)
+
+		assert_mock repo.redis
+		assert_mock repo.electrum
+		assert_mock mock_filter_one
+		assert_mock mock_filter_two
+	end
 end