diff --git a/bin/process_pending_btc_transactions b/bin/process_pending_btc_transactions index 06f48f5440f46a28377dcb1d35f34d2ac8953a9b..a7df1391ec0b4679674771d3ed856d24352ce177 100755 --- a/bin/process_pending_btc_transactions +++ b/bin/process_pending_btc_transactions @@ -219,7 +219,10 @@ class Customer end repo = PendingTransactionRepo.new( - "pending_#{CONFIG[:electrum][:currency]}_transactions" + "pending_#{CONFIG[:electrum][:currency]}_transactions", + customer_address_template: lambda { |customer_id| + "jmp_customer_#{CONFIG[:electrum][:currency]}_addresses-#{customer_id}" + } ) repo.error_handler do |e| diff --git a/config.ru b/config.ru index 616528e769363170b8911a1e58c5555dce22f698..69f271ee9407081f81d700e175c1fb0c33096afe 100644 --- a/config.ru +++ b/config.ru @@ -120,55 +120,6 @@ class CreditCardGateway end end -class UnknownTransactions - def self.from(currency, customer_id, address, tx_hashes) - self.for( - currency, - customer_id, - fetch_rows_for(address, tx_hashes).map { |row| - row["transaction_id"] - } - ) - end - - def self.fetch_rows_for(address, tx_hashes) - values = tx_hashes.map { |tx_hash| - "('#{DB.escape_string(tx_hash)}/#{DB.escape_string(address)}')" - } - return [] if values.empty? - - DB.exec_params(<<-SQL) - SELECT transaction_id FROM - (VALUES #{values.join(',')}) AS t(transaction_id) - LEFT JOIN transactions USING (transaction_id) - WHERE transactions.transaction_id IS NULL - SQL - end - - def self.for(currency, customer_id, transaction_ids) - return None.new if transaction_ids.empty? - - new(currency, customer_id, transaction_ids) - end - - def initialize(currency, customer_id, transaction_ids) - @currency = currency - @customer_id = customer_id - @transaction_ids = transaction_ids - end - - def enqueue! - REDIS.hset( - "pending_#{@currency}_transactions", - *@transaction_ids.flat_map { |txid| [txid, @customer_id] } - ) - end - - class None - def enqueue!; end - end -end - class CardVault def self.for(gateway, nonce, amount=nil) if amount&.positive? @@ -253,21 +204,6 @@ class JmpPay < Roda end end - def redis_key_btc_addresses - "jmp_customer_#{electrum.currency}_addresses-#{params['customer_id']}" - end - - def verify_address_customer_id(r) - return if REDIS.sismember(redis_key_btc_addresses, params["address"]) - - warn "Address and customer_id do not match" - r.halt([ - 403, - { "Content-Type" => "text/plain" }, - "Address and customer_id do not match" - ]) - end - def nil_empty(s) s.to_s == "" ? nil : s end @@ -335,16 +271,19 @@ class JmpPay < Roda route do |r| r.on "electrum_notify" do - verify_address_customer_id(r) - - UnknownTransactions.from( - electrum.currency, - params["customer_id"], - params["address"], + tx_hashes = electrum - .getaddresshistory(params["address"]) - .map { |item| item["tx_hash"] } - ).enqueue! + .getaddresshistory(params["address"]) + .map { |item| item["tx_hash"] } + + txids = tx_hashes.map { |tx_hash| + "#{tx_hash}/#{params['address']}" + } + + REDIS.hset( + "pending_#{electrum.currency}_transactions", + *txids.flat_map { |txid| [txid, params["customer_id"]] } + ) "OK" end diff --git a/lib/pending_transaction_repo.rb b/lib/pending_transaction_repo.rb index 63fc97cf3213b3a708919e35eb65260d9dbde409..74ed59c03f02c25b37934e82728909a8bbaaa844 100644 --- a/lib/pending_transaction_repo.rb +++ b/lib/pending_transaction_repo.rb @@ -26,8 +26,16 @@ class PendingTransactionRepo end end - def initialize(key) + def initialize(key, customer_address_template: nil) @key = key + @customer_address_template = customer_address_template + + # Default handler allows all exceptions to be rethrown + @error_handler = ->(*){} + end + + def database + @database ||= LazyObject.new { DB } end def redis @@ -42,18 +50,95 @@ class PendingTransactionRepo @error_handler = block end - def map - redis.hgetall(@key).map { |(txid, customer_id)| - begin - tx_hash, address = txid.split("/", 2) + class ExistingTransactionFilter + def initialize(database) + @database = database + end + + def prepare_sql(txids_and_customer_ids) + values = txids_and_customer_ids.map { |(txid, _customer_id)| + "('#{@database.escape_string(txid)}')" + } + + <<-SQL + SELECT + transaction_id, + transactions.transaction_id IS NOT NULL as exists + FROM + (VALUES #{values.join(',')}) AS t(transaction_id) + LEFT JOIN transactions USING (transaction_id) + SQL + end + + def filter_chunk(txids_and_customer_ids) + exists = + @database + .exec_params(prepare_sql(txids_and_customer_ids)) + .map { |row| [row["transaction_id"], row["exists"]] } + .to_h + + txids_and_customer_ids.reject do |(txid, _customer_id)| + exists[txid] + end + end + end + + class WrongCustomerFilter + def initialize(redis, template) + @redis = redis + @template = template + end + + def query_redis(txids_and_customer_ids) + @redis.pipelined do + txids_and_customer_ids.each do |(txid, customer_id)| + _tx_hash, addr = txid.split("/") + @redis.sismember(@template.call(customer_id), addr) + end + end + end - txn = electrum.gettransaction(tx_hash) + def filter_chunk(txids_and_customer_ids) + results = query_redis(txids_and_customer_ids) - yield [PendingTransaction.new(txn, address), customer_id] - rescue StandardError => e - raise e unless @error_handler.call(e) + txids_and_customer_ids.select do |(txid, customer_id)| + r = results.shift + warn "#{txid} doesn't match customer #{customer_id}" unless r + r end - }.compact + end + end + + def filter(txids_and_customer_ids) + @filters ||= [ + WrongCustomerFilter.new(redis, @customer_address_template), + ExistingTransactionFilter.new(database) + ] + + @filters.reduce(txids_and_customer_ids) do |remaining_txs, f| + next [] if remaining_txs.empty? + + f.filter_chunk(remaining_txs) + end + end + + def build_transaction(txid) + tx_hash, address = txid.split("/", 2) + txn = electrum.gettransaction(tx_hash) + + PendingTransaction.new(txn, address) + end + + def map(chunk_size: 200) + redis.hgetall(@key).each_slice(chunk_size).map { |chunk| + filter(chunk).map { |(txid, customer_id)| + begin + yield [build_transaction(txid), customer_id] + rescue StandardError => e + raise e unless @error_handler.call(e) + end + }.compact + }.flatten end def remove_transaction(pending) diff --git a/test/test_pending_transaction_repo.rb b/test/test_pending_transaction_repo.rb index 33804cc226e0a95b7ea3fab4fbf81f288260dd62..69d4ae3b8c50f14b1574ea1d57af8c1b57391434 100644 --- a/test/test_pending_transaction_repo.rb +++ b/test/test_pending_transaction_repo.rb @@ -7,6 +7,10 @@ class PendingTransactionRepo @redis = Minitest::Mock.new @electrum = Minitest::Mock.new end + + def override_filters(filters) + @filters = filters + end end FakeElectrumTransaction = Struct.new(:tx_hash, :confirmations, :value) { @@ -35,6 +39,7 @@ class TestPendingTransactionRepo < Minitest::Test def test_map repo = PendingTransactionRepo.new("key") repo.setup_mocks + repo.override_filters([]) repo.redis.expect( :hgetall, [["tx/addr", "1234"]], @@ -59,6 +64,7 @@ class TestPendingTransactionRepo < Minitest::Test def test_error_handler repo = PendingTransactionRepo.new("key") repo.setup_mocks + repo.override_filters([]) repo.redis.expect( :hgetall, [["tx/addr", "1234"], ["missing/addr", "1234"]], @@ -95,6 +101,7 @@ class TestPendingTransactionRepo < Minitest::Test def test_other_errors repo = PendingTransactionRepo.new("key") repo.setup_mocks + repo.override_filters([]) repo.redis.expect( :hgetall, [["tx/addr", "1234"], ["error/addr", "1234"]], @@ -103,7 +110,7 @@ class TestPendingTransactionRepo < Minitest::Test def repo.electrum Class.new { def gettransaction(txid) - raise ArgumentError, "Oh no" if txid == "error" + raise "Oh no" if txid == "error" FakeElectrumTransaction.new("tx", 6, 0.5) end @@ -117,7 +124,37 @@ class TestPendingTransactionRepo < Minitest::Test end end - assert_raises(ArgumentError) do + assert_raises(RuntimeError) do + repo.map { |pending, customer_id| + "#{pending.value} #{customer_id}" + } + end + + assert_mock repo.redis + end + + # This is basically the same as test_other_errors but uses the default + # error handler that should re-throw everything + def test_default_errors + repo = PendingTransactionRepo.new("key") + repo.setup_mocks + repo.override_filters([]) + repo.redis.expect( + :hgetall, + [["tx/addr", "1234"], ["error/addr", "1234"]], + ["key"] + ) + def repo.electrum + Class.new { + def gettransaction(txid) + raise "Oh no" if txid == "error" + + FakeElectrumTransaction.new("tx", 6, 0.5) + end + }.new + end + + assert_raises(RuntimeError) do repo.map { |pending, customer_id| "#{pending.value} #{customer_id}" } @@ -142,4 +179,227 @@ class TestPendingTransactionRepo < Minitest::Test assert_mock repo.redis assert_mock repo.electrum end + + def test_chunking + repo = PendingTransactionRepo.new("key") + repo.setup_mocks + mock_filter = Minitest::Mock.new + mock_filter.expect( + :filter_chunk, [["one/a", "1234"], ["two/a", "1234"]], + [[["one/a", "1234"], ["two/a", "1234"]]] + ) + mock_filter.expect( + :filter_chunk, [["three/a", "1234"]], + [[["three/a", "1234"]]] + ) + repo.override_filters([mock_filter]) + repo.redis.expect( + :hgetall, + [["one/a", "1234"], ["two/a", "1234"], ["three/a", "1234"]], + ["key"] + ) + repo.electrum.expect( + :gettransaction, + FakeElectrumTransaction.new("one", 6, 0.5), + ["one"] + ) + repo.electrum.expect( + :gettransaction, + FakeElectrumTransaction.new("two", 6, 0.5), + ["two"] + ) + repo.electrum.expect( + :gettransaction, + FakeElectrumTransaction.new("three", 6, 0.5), + ["three"] + ) + + v = repo.map(chunk_size: 2) { |pending, _customer_id| + pending.tx_hash + } + + assert_equal( + ["one", "two", "three"], v, + "Should have returned result of block" + ) + + assert_mock repo.redis + assert_mock repo.electrum + assert_mock mock_filter + end + + def test_existing_transaction_filter + db_mock = Minitest::Mock.new + filter = PendingTransactionRepo::ExistingTransactionFilter.new(db_mock) + + db_mock.expect(:escape_string, "one/a", ["one/a"]) + db_mock.expect(:escape_string, "two/a", ["two/a"]) + db_mock.expect(:escape_string, "three/b", ["three/b"]) + # I've pretended this made a change here + db_mock.expect(:escape_string, "four/C", ["four/c"]) + + # I don't want to match the regex literally, that seems like a bit much + # so instead I've just matched the parameterized part + db_mock.expect( + :exec_params, + [ + { "transaction_id" => "one/a", "exists" => false }, + { "transaction_id" => "two/a", "exists" => true }, + { "transaction_id" => "three/b", "exists" => true }, + { "transaction_id" => "four/c", "exists" => false } + ], + [/ + \(VALUES + \s + \('one\/a'\),\('two\/a'\), + \('three\/b'\),\('four\/C'\) + \) + /x] + ) + + remaining = filter.filter_chunk([ + ["one/a", "1234"], + ["two/a", "1234"], + ["three/b", "4321"], + ["four/c", "2323"] + ]) + + assert_equal( + [["one/a", "1234"], ["four/c", "2323"]], + remaining, + "should only include unfiltered results" + ) + + assert_mock db_mock + end + + def test_wrong_customer_filter + redis = Object.new + + def redis.pipelined + @stuff = [] + yield + @stuff + end + + def redis.sismember(key, value) + store = { + "test_key_1234" => ["not_a"], + "test_key_4321" => ["b"], + "test_key_2323" => ["c"] + } + @stuff << store[key].include?(value) + end + + filter = PendingTransactionRepo::WrongCustomerFilter.new( + redis, ->(customer_id) { "test_key_#{customer_id}" } + ) + + def filter.warn(s) + @warnings ||= [] + @warnings << s + end + + def filter.sneak_warnings + @warnings + end + + remaining = filter.filter_chunk([ + ["one/a", "1234"], + ["two/a", "1234"], + ["three/b", "4321"], + ["four/c", "2323"] + ]) + + assert_equal( + [["three/b", "4321"], ["four/c", "2323"]], + remaining, + "should only include unfiltered results" + ) + + assert_equal( + [ + "one/a doesn't match customer 1234", + "two/a doesn't match customer 1234" + ], + filter.sneak_warnings, + "should have warned about busted results" + ) + end + + def test_filter_stack + repo = PendingTransactionRepo.new("key") + repo.setup_mocks + mock_filter_one = Minitest::Mock.new + mock_filter_one.expect( + :filter_chunk, [["one/a", "1234"], ["two/a", "1234"]], + [[["one/a", "1234"], ["two/a", "1234"], ["three/a", "1234"]]] + ) + + mock_filter_two = Minitest::Mock.new + mock_filter_two.expect( + :filter_chunk, [["one/a", "1234"]], + [[["one/a", "1234"], ["two/a", "1234"]]] + ) + + repo.override_filters([mock_filter_one, mock_filter_two]) + repo.redis.expect( + :hgetall, + [["one/a", "1234"], ["two/a", "1234"], ["three/a", "1234"]], + ["key"] + ) + repo.electrum.expect( + :gettransaction, + FakeElectrumTransaction.new("one", 6, 0.5), + ["one"] + ) + + v = repo.map { |pending, _customer_id| + pending.tx_hash + } + + assert_equal( + ["one"], v, + "Should have returned result of block" + ) + + assert_mock repo.redis + assert_mock repo.electrum + assert_mock mock_filter_one + assert_mock mock_filter_two + end + + def test_filter_all + repo = PendingTransactionRepo.new("key") + repo.setup_mocks + mock_filter_one = Minitest::Mock.new + mock_filter_one.expect( + :filter_chunk, [], + [[["one/a", "1234"], ["two/a", "1234"], ["three/a", "1234"]]] + ) + + # Shouldn't be called at all + mock_filter_two = Minitest::Mock.new + + repo.override_filters([mock_filter_one, mock_filter_two]) + repo.redis.expect( + :hgetall, + [["one/a", "1234"], ["two/a", "1234"], ["three/a", "1234"]], + ["key"] + ) + + v = repo.map { |pending, _customer_id| + pending.tx_hash + } + + assert_equal( + [], v, + "Should have returned result of block" + ) + + assert_mock repo.redis + assert_mock repo.electrum + assert_mock mock_filter_one + assert_mock mock_filter_two + end end