1# frozen_string_literal: true
2
3require "lazy_object"
4require "forwardable"
5
6class PendingTransactionRepo
7 class PendingTransaction
8 extend Forwardable
9 def_delegators :@tx, :tx_hash, :confirmations
10
11 def initialize(tx, address)
12 @tx = tx
13 @address = address
14 end
15
16 def txid
17 "#{tx_hash}/#{@address}"
18 end
19
20 def value
21 @tx.amount_for(@address)
22 end
23
24 def outgoing?
25 value <= 0
26 end
27 end
28
29 def initialize(key, customer_address_template: nil, ignored_key: nil)
30 @key = key
31 @customer_address_template = customer_address_template
32 @ignored_key = ignored_key
33
34 # Default handler allows all exceptions to be rethrown
35 @error_handler = ->(*) {}
36 end
37
38 def database
39 @database ||= LazyObject.new { DB }
40 end
41
42 def redis
43 @redis ||= LazyObject.new { REDIS }
44 end
45
46 def electrum
47 @electrum ||= LazyObject.new { ELECTRUM }
48 end
49
50 def error_handler(&block)
51 @error_handler = block
52 end
53
54 class ExistingTransactionFilter
55 def initialize(database)
56 @database = database
57 end
58
59 def prepare_sql(txids_and_customer_ids)
60 values = txids_and_customer_ids.map { |(txid, _customer_id)|
61 "('#{@database.escape_string(txid)}')"
62 }
63
64 <<-SQL
65 SELECT
66 transaction_id,
67 transactions.transaction_id IS NOT NULL as exists
68 FROM
69 (VALUES #{values.join(',')}) AS t(transaction_id)
70 LEFT JOIN transactions USING (transaction_id)
71 SQL
72 end
73
74 def filter_chunk(txids_and_customer_ids)
75 exists =
76 @database
77 .exec_params(prepare_sql(txids_and_customer_ids))
78 .map { |row| [row["transaction_id"], row["exists"]] }
79 .to_h
80
81 txids_and_customer_ids.reject do |(txid, _customer_id)|
82 exists[txid]
83 end
84 end
85 end
86
87 class IgnoredTransactionFilter
88 def initialize(redis, key)
89 @redis = redis
90 @key = key
91 end
92
93 def filter_chunk(txids_and_customer_ids)
94 # There's an smismember command that was added to the rubygem in
95 # v4.5 but prod is currently v4.2.5 and I don't want to have to
96 # deal with that. So for now this sucks a little, using pipelining
97 # instead.
98 # Better than N full round-trip requests, but not as good as
99 # smismember probably
100 results = @redis.pipelined {
101 txids_and_customer_ids.each do |(txid, _customer_id)|
102 @redis.sismember(@key, txid)
103 end
104 }
105
106 # Now filter by the results correspondingly
107 # Because it's in order, we pull each true or false off results and
108 # if it's true then we don't need to process the item further
109 txids_and_customer_ids.reject { |_v| results.shift }
110 end
111 end
112
113 class WrongCustomerFilter
114 def initialize(redis, template)
115 @redis = redis
116 @template = template
117 end
118
119 def query_redis(txids_and_customer_ids)
120 @redis.pipelined do
121 txids_and_customer_ids.each do |(txid, customer_id)|
122 _tx_hash, addr = txid.split("/")
123 @redis.sismember(@template.call(customer_id), addr)
124 end
125 end
126 end
127
128 def filter_chunk(txids_and_customer_ids)
129 results = query_redis(txids_and_customer_ids)
130
131 txids_and_customer_ids.select do |(txid, customer_id)|
132 r = results.shift
133 warn "#{txid} doesn't match customer #{customer_id}" unless r
134 r
135 end
136 end
137 end
138
139 def run_filters(txids_and_customer_ids)
140 @filters ||= [
141 IgnoredTransactionFilter.new(redis, @ignored_key),
142 WrongCustomerFilter.new(redis, @customer_address_template),
143 ExistingTransactionFilter.new(database)
144 ]
145
146 @filters.reduce(txids_and_customer_ids) do |remaining_txs, f|
147 next [] if remaining_txs.empty?
148
149 f.filter_chunk(remaining_txs)
150 end
151 end
152
153 # run_filters removes the ones we don't care about from the list, but that
154 # list is just in memory. The rest of this removes it from the redis queue
155 # so we're not just skipping over these things every time in the list but
156 # otherwise leaving them there.
157 def filter(txids_and_customer_ids)
158 passed = run_filters(txids_and_customer_ids)
159
160 unless txids_and_customer_ids.length == passed.length
161 redis.hdel(
162 @key,
163 (txids_and_customer_ids - passed).map(&:first)
164 )
165 end
166
167 passed
168 end
169
170 def build_transaction(txid)
171 tx_hash, address = txid.split("/", 2)
172 txn = electrum.gettransaction(tx_hash)
173
174 PendingTransaction.new(txn, address)
175 end
176
177 def map(chunk_size: 200)
178 redis.hgetall(@key).each_slice(chunk_size).map { |chunk|
179 filter(chunk).map { |(txid, customer_id)|
180 begin
181 yield [build_transaction(txid), customer_id]
182 rescue StandardError => e
183 raise e unless @error_handler.call(e)
184 end
185 }.compact
186 }.flatten
187 end
188
189 def remove_transaction(pending)
190 redis.hdel(@key, pending.txid)
191 end
192
193 # We use this to record transactions we've decided not to care about ever
194 # again. These get filtered out during the processing _before_ it gets to
195 # electrum or the user code, so effectively they just don't exist anymore.
196 #
197 # If electrum was more efficient this would be unnecessary because we'd
198 # just ask electrum about the thing and we could decide we don't care
199 # organically. But since it's a bit sensitive, this effectively serves as a
200 # cache of "things we'd end up ignoring anyway"
201 def mark_ignored(pending)
202 redis.sadd(@ignored_key, pending.txid)
203 end
204end