From 1944a9946cc175eac66fe8974a3cebf146e224a5 Mon Sep 17 00:00:00 2001 From: Christopher Vollick <0@psycoti.ca> Date: Tue, 10 Jun 2025 15:47:39 -0400 Subject: [PATCH] Enqueue All Electrum Transactions, Filter and Chunk in Process Pending This does two things: The first is that it moves validation of the electrum transactions out of the webhook and into the processing script. The theory here is that we want the actual webhook to be lighter so things like reasserts aren't as hard on the system, etc. So we could have dumped into a queue to process later by doing these checks and then pushing into the pending list when it looks legit. But really the pending list _is_ a queue already. So rather than having another queue and another background process to process it, let's just have the bin we're already running do the checks before bothering electrum! Then the electrum webhook is basically just dumping into the pending list. This means that, on a reassert, every transaction we've ever seen will end up in this queue, but if we had a background that would effectively still be true, it would just be a different pre-queue queue. There's just one problem with this scheme, which is that the current speed of the bin is not enough to actually make it through this much data in any kind of reasonable time. So! I've also introduced batching to this new process that runs the validity filters on an entire chunk at a time, with the hope that we can chew through the vast majority of them with only a few queries to the DB and Redis before bothering Electrum, which is the slower and more fragile of them. This could have been two patches, but I decided to include them together so the old logic and the new logic are both next to each in the same place for easier context. Otherwise there would have been a patch that added a bunch of filters and stuff that would seem like they came out of nowhere at best, and like they were doing unnecessary work because we were already filtering things at worst. Then the next commit would just be deleting that stuff, making the last one necessary. This way you can see where the logic _used_ to live, and compare it to the equivalent logic in the new place. --- bin/process_pending_btc_transactions | 5 +- config.ru | 85 ++------- lib/pending_transaction_repo.rb | 105 +++++++++- test/test_pending_transaction_repo.rb | 264 +++++++++++++++++++++++++- 4 files changed, 373 insertions(+), 86 deletions(-) diff --git a/bin/process_pending_btc_transactions b/bin/process_pending_btc_transactions index 06f48f5440f46a28377dcb1d35f34d2ac8953a9b..a7df1391ec0b4679674771d3ed856d24352ce177 100755 --- a/bin/process_pending_btc_transactions +++ b/bin/process_pending_btc_transactions @@ -219,7 +219,10 @@ class Customer end repo = PendingTransactionRepo.new( - "pending_#{CONFIG[:electrum][:currency]}_transactions" + "pending_#{CONFIG[:electrum][:currency]}_transactions", + customer_address_template: lambda { |customer_id| + "jmp_customer_#{CONFIG[:electrum][:currency]}_addresses-#{customer_id}" + } ) repo.error_handler do |e| diff --git a/config.ru b/config.ru index 616528e769363170b8911a1e58c5555dce22f698..69f271ee9407081f81d700e175c1fb0c33096afe 100644 --- a/config.ru +++ b/config.ru @@ -120,55 +120,6 @@ class CreditCardGateway end end -class UnknownTransactions - def self.from(currency, customer_id, address, tx_hashes) - self.for( - currency, - customer_id, - fetch_rows_for(address, tx_hashes).map { |row| - row["transaction_id"] - } - ) - end - - def self.fetch_rows_for(address, tx_hashes) - values = tx_hashes.map { |tx_hash| - "('#{DB.escape_string(tx_hash)}/#{DB.escape_string(address)}')" - } - return [] if values.empty? - - DB.exec_params(<<-SQL) - SELECT transaction_id FROM - (VALUES #{values.join(',')}) AS t(transaction_id) - LEFT JOIN transactions USING (transaction_id) - WHERE transactions.transaction_id IS NULL - SQL - end - - def self.for(currency, customer_id, transaction_ids) - return None.new if transaction_ids.empty? - - new(currency, customer_id, transaction_ids) - end - - def initialize(currency, customer_id, transaction_ids) - @currency = currency - @customer_id = customer_id - @transaction_ids = transaction_ids - end - - def enqueue! - REDIS.hset( - "pending_#{@currency}_transactions", - *@transaction_ids.flat_map { |txid| [txid, @customer_id] } - ) - end - - class None - def enqueue!; end - end -end - class CardVault def self.for(gateway, nonce, amount=nil) if amount&.positive? @@ -253,21 +204,6 @@ class JmpPay < Roda end end - def redis_key_btc_addresses - "jmp_customer_#{electrum.currency}_addresses-#{params['customer_id']}" - end - - def verify_address_customer_id(r) - return if REDIS.sismember(redis_key_btc_addresses, params["address"]) - - warn "Address and customer_id do not match" - r.halt([ - 403, - { "Content-Type" => "text/plain" }, - "Address and customer_id do not match" - ]) - end - def nil_empty(s) s.to_s == "" ? nil : s end @@ -335,16 +271,19 @@ class JmpPay < Roda route do |r| r.on "electrum_notify" do - verify_address_customer_id(r) - - UnknownTransactions.from( - electrum.currency, - params["customer_id"], - params["address"], + tx_hashes = electrum - .getaddresshistory(params["address"]) - .map { |item| item["tx_hash"] } - ).enqueue! + .getaddresshistory(params["address"]) + .map { |item| item["tx_hash"] } + + txids = tx_hashes.map { |tx_hash| + "#{tx_hash}/#{params['address']}" + } + + REDIS.hset( + "pending_#{electrum.currency}_transactions", + *txids.flat_map { |txid| [txid, params["customer_id"]] } + ) "OK" end diff --git a/lib/pending_transaction_repo.rb b/lib/pending_transaction_repo.rb index 63fc97cf3213b3a708919e35eb65260d9dbde409..74ed59c03f02c25b37934e82728909a8bbaaa844 100644 --- a/lib/pending_transaction_repo.rb +++ b/lib/pending_transaction_repo.rb @@ -26,8 +26,16 @@ class PendingTransactionRepo end end - def initialize(key) + def initialize(key, customer_address_template: nil) @key = key + @customer_address_template = customer_address_template + + # Default handler allows all exceptions to be rethrown + @error_handler = ->(*){} + end + + def database + @database ||= LazyObject.new { DB } end def redis @@ -42,18 +50,95 @@ class PendingTransactionRepo @error_handler = block end - def map - redis.hgetall(@key).map { |(txid, customer_id)| - begin - tx_hash, address = txid.split("/", 2) + class ExistingTransactionFilter + def initialize(database) + @database = database + end + + def prepare_sql(txids_and_customer_ids) + values = txids_and_customer_ids.map { |(txid, _customer_id)| + "('#{@database.escape_string(txid)}')" + } + + <<-SQL + SELECT + transaction_id, + transactions.transaction_id IS NOT NULL as exists + FROM + (VALUES #{values.join(',')}) AS t(transaction_id) + LEFT JOIN transactions USING (transaction_id) + SQL + end + + def filter_chunk(txids_and_customer_ids) + exists = + @database + .exec_params(prepare_sql(txids_and_customer_ids)) + .map { |row| [row["transaction_id"], row["exists"]] } + .to_h + + txids_and_customer_ids.reject do |(txid, _customer_id)| + exists[txid] + end + end + end + + class WrongCustomerFilter + def initialize(redis, template) + @redis = redis + @template = template + end + + def query_redis(txids_and_customer_ids) + @redis.pipelined do + txids_and_customer_ids.each do |(txid, customer_id)| + _tx_hash, addr = txid.split("/") + @redis.sismember(@template.call(customer_id), addr) + end + end + end - txn = electrum.gettransaction(tx_hash) + def filter_chunk(txids_and_customer_ids) + results = query_redis(txids_and_customer_ids) - yield [PendingTransaction.new(txn, address), customer_id] - rescue StandardError => e - raise e unless @error_handler.call(e) + txids_and_customer_ids.select do |(txid, customer_id)| + r = results.shift + warn "#{txid} doesn't match customer #{customer_id}" unless r + r end - }.compact + end + end + + def filter(txids_and_customer_ids) + @filters ||= [ + WrongCustomerFilter.new(redis, @customer_address_template), + ExistingTransactionFilter.new(database) + ] + + @filters.reduce(txids_and_customer_ids) do |remaining_txs, f| + next [] if remaining_txs.empty? + + f.filter_chunk(remaining_txs) + end + end + + def build_transaction(txid) + tx_hash, address = txid.split("/", 2) + txn = electrum.gettransaction(tx_hash) + + PendingTransaction.new(txn, address) + end + + def map(chunk_size: 200) + redis.hgetall(@key).each_slice(chunk_size).map { |chunk| + filter(chunk).map { |(txid, customer_id)| + begin + yield [build_transaction(txid), customer_id] + rescue StandardError => e + raise e unless @error_handler.call(e) + end + }.compact + }.flatten end def remove_transaction(pending) diff --git a/test/test_pending_transaction_repo.rb b/test/test_pending_transaction_repo.rb index 33804cc226e0a95b7ea3fab4fbf81f288260dd62..69d4ae3b8c50f14b1574ea1d57af8c1b57391434 100644 --- a/test/test_pending_transaction_repo.rb +++ b/test/test_pending_transaction_repo.rb @@ -7,6 +7,10 @@ class PendingTransactionRepo @redis = Minitest::Mock.new @electrum = Minitest::Mock.new end + + def override_filters(filters) + @filters = filters + end end FakeElectrumTransaction = Struct.new(:tx_hash, :confirmations, :value) { @@ -35,6 +39,7 @@ class TestPendingTransactionRepo < Minitest::Test def test_map repo = PendingTransactionRepo.new("key") repo.setup_mocks + repo.override_filters([]) repo.redis.expect( :hgetall, [["tx/addr", "1234"]], @@ -59,6 +64,7 @@ class TestPendingTransactionRepo < Minitest::Test def test_error_handler repo = PendingTransactionRepo.new("key") repo.setup_mocks + repo.override_filters([]) repo.redis.expect( :hgetall, [["tx/addr", "1234"], ["missing/addr", "1234"]], @@ -95,6 +101,7 @@ class TestPendingTransactionRepo < Minitest::Test def test_other_errors repo = PendingTransactionRepo.new("key") repo.setup_mocks + repo.override_filters([]) repo.redis.expect( :hgetall, [["tx/addr", "1234"], ["error/addr", "1234"]], @@ -103,7 +110,7 @@ class TestPendingTransactionRepo < Minitest::Test def repo.electrum Class.new { def gettransaction(txid) - raise ArgumentError, "Oh no" if txid == "error" + raise "Oh no" if txid == "error" FakeElectrumTransaction.new("tx", 6, 0.5) end @@ -117,7 +124,37 @@ class TestPendingTransactionRepo < Minitest::Test end end - assert_raises(ArgumentError) do + assert_raises(RuntimeError) do + repo.map { |pending, customer_id| + "#{pending.value} #{customer_id}" + } + end + + assert_mock repo.redis + end + + # This is basically the same as test_other_errors but uses the default + # error handler that should re-throw everything + def test_default_errors + repo = PendingTransactionRepo.new("key") + repo.setup_mocks + repo.override_filters([]) + repo.redis.expect( + :hgetall, + [["tx/addr", "1234"], ["error/addr", "1234"]], + ["key"] + ) + def repo.electrum + Class.new { + def gettransaction(txid) + raise "Oh no" if txid == "error" + + FakeElectrumTransaction.new("tx", 6, 0.5) + end + }.new + end + + assert_raises(RuntimeError) do repo.map { |pending, customer_id| "#{pending.value} #{customer_id}" } @@ -142,4 +179,227 @@ class TestPendingTransactionRepo < Minitest::Test assert_mock repo.redis assert_mock repo.electrum end + + def test_chunking + repo = PendingTransactionRepo.new("key") + repo.setup_mocks + mock_filter = Minitest::Mock.new + mock_filter.expect( + :filter_chunk, [["one/a", "1234"], ["two/a", "1234"]], + [[["one/a", "1234"], ["two/a", "1234"]]] + ) + mock_filter.expect( + :filter_chunk, [["three/a", "1234"]], + [[["three/a", "1234"]]] + ) + repo.override_filters([mock_filter]) + repo.redis.expect( + :hgetall, + [["one/a", "1234"], ["two/a", "1234"], ["three/a", "1234"]], + ["key"] + ) + repo.electrum.expect( + :gettransaction, + FakeElectrumTransaction.new("one", 6, 0.5), + ["one"] + ) + repo.electrum.expect( + :gettransaction, + FakeElectrumTransaction.new("two", 6, 0.5), + ["two"] + ) + repo.electrum.expect( + :gettransaction, + FakeElectrumTransaction.new("three", 6, 0.5), + ["three"] + ) + + v = repo.map(chunk_size: 2) { |pending, _customer_id| + pending.tx_hash + } + + assert_equal( + ["one", "two", "three"], v, + "Should have returned result of block" + ) + + assert_mock repo.redis + assert_mock repo.electrum + assert_mock mock_filter + end + + def test_existing_transaction_filter + db_mock = Minitest::Mock.new + filter = PendingTransactionRepo::ExistingTransactionFilter.new(db_mock) + + db_mock.expect(:escape_string, "one/a", ["one/a"]) + db_mock.expect(:escape_string, "two/a", ["two/a"]) + db_mock.expect(:escape_string, "three/b", ["three/b"]) + # I've pretended this made a change here + db_mock.expect(:escape_string, "four/C", ["four/c"]) + + # I don't want to match the regex literally, that seems like a bit much + # so instead I've just matched the parameterized part + db_mock.expect( + :exec_params, + [ + { "transaction_id" => "one/a", "exists" => false }, + { "transaction_id" => "two/a", "exists" => true }, + { "transaction_id" => "three/b", "exists" => true }, + { "transaction_id" => "four/c", "exists" => false } + ], + [/ + \(VALUES + \s + \('one\/a'\),\('two\/a'\), + \('three\/b'\),\('four\/C'\) + \) + /x] + ) + + remaining = filter.filter_chunk([ + ["one/a", "1234"], + ["two/a", "1234"], + ["three/b", "4321"], + ["four/c", "2323"] + ]) + + assert_equal( + [["one/a", "1234"], ["four/c", "2323"]], + remaining, + "should only include unfiltered results" + ) + + assert_mock db_mock + end + + def test_wrong_customer_filter + redis = Object.new + + def redis.pipelined + @stuff = [] + yield + @stuff + end + + def redis.sismember(key, value) + store = { + "test_key_1234" => ["not_a"], + "test_key_4321" => ["b"], + "test_key_2323" => ["c"] + } + @stuff << store[key].include?(value) + end + + filter = PendingTransactionRepo::WrongCustomerFilter.new( + redis, ->(customer_id) { "test_key_#{customer_id}" } + ) + + def filter.warn(s) + @warnings ||= [] + @warnings << s + end + + def filter.sneak_warnings + @warnings + end + + remaining = filter.filter_chunk([ + ["one/a", "1234"], + ["two/a", "1234"], + ["three/b", "4321"], + ["four/c", "2323"] + ]) + + assert_equal( + [["three/b", "4321"], ["four/c", "2323"]], + remaining, + "should only include unfiltered results" + ) + + assert_equal( + [ + "one/a doesn't match customer 1234", + "two/a doesn't match customer 1234" + ], + filter.sneak_warnings, + "should have warned about busted results" + ) + end + + def test_filter_stack + repo = PendingTransactionRepo.new("key") + repo.setup_mocks + mock_filter_one = Minitest::Mock.new + mock_filter_one.expect( + :filter_chunk, [["one/a", "1234"], ["two/a", "1234"]], + [[["one/a", "1234"], ["two/a", "1234"], ["three/a", "1234"]]] + ) + + mock_filter_two = Minitest::Mock.new + mock_filter_two.expect( + :filter_chunk, [["one/a", "1234"]], + [[["one/a", "1234"], ["two/a", "1234"]]] + ) + + repo.override_filters([mock_filter_one, mock_filter_two]) + repo.redis.expect( + :hgetall, + [["one/a", "1234"], ["two/a", "1234"], ["three/a", "1234"]], + ["key"] + ) + repo.electrum.expect( + :gettransaction, + FakeElectrumTransaction.new("one", 6, 0.5), + ["one"] + ) + + v = repo.map { |pending, _customer_id| + pending.tx_hash + } + + assert_equal( + ["one"], v, + "Should have returned result of block" + ) + + assert_mock repo.redis + assert_mock repo.electrum + assert_mock mock_filter_one + assert_mock mock_filter_two + end + + def test_filter_all + repo = PendingTransactionRepo.new("key") + repo.setup_mocks + mock_filter_one = Minitest::Mock.new + mock_filter_one.expect( + :filter_chunk, [], + [[["one/a", "1234"], ["two/a", "1234"], ["three/a", "1234"]]] + ) + + # Shouldn't be called at all + mock_filter_two = Minitest::Mock.new + + repo.override_filters([mock_filter_one, mock_filter_two]) + repo.redis.expect( + :hgetall, + [["one/a", "1234"], ["two/a", "1234"], ["three/a", "1234"]], + ["key"] + ) + + v = repo.map { |pending, _customer_id| + pending.tx_hash + } + + assert_equal( + [], v, + "Should have returned result of block" + ) + + assert_mock repo.redis + assert_mock repo.electrum + assert_mock mock_filter_one + assert_mock mock_filter_two + end end