pending_transaction_repo.rb

  1# frozen_string_literal: true
  2
  3require "lazy_object"
  4require "forwardable"
  5
  6class PendingTransactionRepo
  7	class PendingTransaction
  8		extend Forwardable
  9		def_delegators :@tx, :tx_hash, :confirmations
 10
 11		def initialize(tx, address)
 12			@tx = tx
 13			@address = address
 14		end
 15
 16		def txid
 17			"#{tx_hash}/#{@address}"
 18		end
 19
 20		def value
 21			@tx.amount_for(@address)
 22		end
 23
 24		def outgoing?
 25			value <= 0
 26		end
 27	end
 28
 29	def initialize(key, customer_address_template: nil, ignored_key: nil)
 30		@key = key
 31		@customer_address_template = customer_address_template
 32		@ignored_key = ignored_key
 33
 34		# Default handler allows all exceptions to be rethrown
 35		@error_handler = ->(*) {}
 36	end
 37
 38	def database
 39		@database ||= LazyObject.new { DB }
 40	end
 41
 42	def redis
 43		@redis ||= LazyObject.new { REDIS }
 44	end
 45
 46	def electrum
 47		@electrum ||= LazyObject.new { ELECTRUM }
 48	end
 49
 50	def error_handler(&block)
 51		@error_handler = block
 52	end
 53
 54	class ExistingTransactionFilter
 55		def initialize(database)
 56			@database = database
 57		end
 58
 59		def prepare_sql(txids_and_customer_ids)
 60			values = txids_and_customer_ids.map { |(txid, _customer_id)|
 61				"('#{@database.escape_string(txid)}')"
 62			}
 63
 64			<<-SQL
 65				SELECT
 66					transaction_id,
 67					transactions.transaction_id IS NOT NULL as exists
 68				FROM
 69					(VALUES #{values.join(',')}) AS t(transaction_id)
 70				LEFT JOIN transactions USING (transaction_id)
 71			SQL
 72		end
 73
 74		def filter_chunk(txids_and_customer_ids)
 75			exists =
 76				@database
 77				.exec_params(prepare_sql(txids_and_customer_ids))
 78				.map { |row| [row["transaction_id"], row["exists"]] }
 79				.to_h
 80
 81			txids_and_customer_ids.reject do |(txid, _customer_id)|
 82				exists[txid]
 83			end
 84		end
 85	end
 86
 87	class IgnoredTransactionFilter
 88		def initialize(redis, key)
 89			@redis = redis
 90			@key = key
 91		end
 92
 93		def filter_chunk(txids_and_customer_ids)
 94			# There's an smismember command that was added to the rubygem in
 95			# v4.5 but prod is currently v4.2.5 and I don't want to have to
 96			# deal with that.  So for now this sucks a little, using pipelining
 97			# instead.
 98			# Better than N full round-trip requests, but not as good as
 99			# smismember probably
100			results = @redis.pipelined {
101				txids_and_customer_ids.each do |(txid, _customer_id)|
102					@redis.sismember(@key, txid)
103				end
104			}
105
106			# Now filter by the results correspondingly
107			# Because it's in order, we pull each true or false off results and
108			# if it's true then we don't need to process the item further
109			txids_and_customer_ids.reject { |_v| results.shift }
110		end
111	end
112
113	class WrongCustomerFilter
114		def initialize(redis, template)
115			@redis = redis
116			@template = template
117		end
118
119		def query_redis(txids_and_customer_ids)
120			@redis.pipelined do
121				txids_and_customer_ids.each do |(txid, customer_id)|
122					_tx_hash, addr = txid.split("/")
123					@redis.sismember(@template.call(customer_id), addr)
124				end
125			end
126		end
127
128		def filter_chunk(txids_and_customer_ids)
129			results = query_redis(txids_and_customer_ids)
130
131			txids_and_customer_ids.select do |(txid, customer_id)|
132				r = results.shift
133				warn "#{txid} doesn't match customer #{customer_id}" unless r
134				r
135			end
136		end
137	end
138
139	def run_filters(txids_and_customer_ids)
140		@filters ||= [
141			IgnoredTransactionFilter.new(redis, @ignored_key),
142			WrongCustomerFilter.new(redis, @customer_address_template),
143			ExistingTransactionFilter.new(database)
144		]
145
146		@filters.reduce(txids_and_customer_ids) do |remaining_txs, f|
147			next [] if remaining_txs.empty?
148
149			f.filter_chunk(remaining_txs)
150		end
151	end
152
153	# run_filters removes the ones we don't care about from the list, but that
154	# list is just in memory. The rest of this removes it from the redis queue
155	# so we're not just skipping over these things every time in the list but
156	# otherwise leaving them there.
157	def filter(txids_and_customer_ids)
158		passed = run_filters(txids_and_customer_ids)
159
160		unless txids_and_customer_ids.length == passed.length
161			redis.hdel(
162				@key,
163				(txids_and_customer_ids - passed).map(&:first)
164			)
165		end
166
167		passed
168	end
169
170	def build_transaction(txid)
171		tx_hash, address = txid.split("/", 2)
172		txn = electrum.gettransaction(tx_hash)
173
174		PendingTransaction.new(txn, address)
175	end
176
177	def map(chunk_size: 200)
178		redis.hgetall(@key).each_slice(chunk_size).map { |chunk|
179			filter(chunk).map { |(txid, customer_id)|
180				begin
181					yield [build_transaction(txid), customer_id]
182				rescue StandardError => e
183					raise e unless @error_handler.call(e)
184				end
185			}.compact
186		}.flatten
187	end
188
189	def remove_transaction(pending)
190		redis.hdel(@key, pending.txid)
191	end
192
193	# We use this to record transactions we've decided not to care about ever
194	# again. These get filtered out during the processing _before_ it gets to
195	# electrum or the user code, so effectively they just don't exist anymore.
196	#
197	# If electrum was more efficient this would be unnecessary because we'd
198	# just ask electrum about the thing and we could decide we don't care
199	# organically. But since it's a bit sensitive, this effectively serves as a
200	# cache of "things we'd end up ignoring anyway"
201	def mark_ignored(pending)
202		redis.sadd(@ignored_key, pending.txid)
203	end
204end