1# frozen_string_literal: true
2
3class AdminActionRepo
4 class NotFound < StandardError; end
5
6 def initialize(redis: REDIS)
7 @redis = redis
8 end
9
10 def build(klass:, direction:, **kwargs)
11 dir = AdminAction::Direction.for(direction)
12 dir.new(AdminAction.const_get(klass).new(**kwargs))
13 end
14
15 # I'm using hash subset test for pred
16 # So if you give me any keys I'll find only things where those keys are
17 # present and set to that value
18 def find(limit, max="+", **pred)
19 return EMPromise.resolve([]) unless limit.positive?
20
21 xrevrange(
22 "admin_actions", max: max, min: "-", count: limit
23 ).then { |new_max, results|
24 next [] if results.empty?
25
26 selected = results.select { |_id, values| pred < values }
27 .map { |id, values| build(id: id, **rename_class(values)) }
28
29 find(limit - selected.length, "(#{new_max}", **pred)
30 .then { |r| selected + r }
31 }
32 end
33
34 def create(action)
35 push_to_redis(**action.to_h).then { |id|
36 action.with(id: id)
37 }
38 end
39
40protected
41
42 def rename_class(hash)
43 hash.transform_keys { |k| k == :class ? :klass : k }
44 end
45
46 # Turn value into a hash, paper over redis version issue, return earliest ID
47 def xrevrange(stream, min:, max:, count:)
48 min = next_id(min[1..-1]) if min.start_with?("(")
49 max = previous_id(max[1..-1]) if max.start_with?("(")
50
51 @redis.xrevrange(stream, max, min, "COUNT", count).then { |result|
52 next ["+", []] if result.empty?
53
54 [
55 result.last.first, # Reverse order, so this is the lowest ID
56 result.map { |id, values| [id, Hash[*values].transform_keys(&:to_sym)] }
57 ]
58 }
59 end
60
61 # Versions of REDIS after 6.2 can just do "(#{current_id}" to make an
62 # exclusive version
63 def previous_id(current_id)
64 time, seq = current_id.split("-")
65 if seq == "0"
66 "#{time.to_i - 1}-18446744073709551615"
67 else
68 "#{time}-#{seq.to_i - 1}"
69 end
70 end
71
72 # Versions of REDIS after 6.2 can just do "(#{current_id}" to make an
73 # exclusive version
74 def next_id(current_id)
75 time, seq = current_id.split("-")
76 if seq == "18446744073709551615"
77 "#{time.to_i + 1}-0"
78 else
79 "#{time}-#{seq.to_i + 1}"
80 end
81 end
82
83 def push_to_redis(**kwargs)
84 @redis.xadd("admin_actions", "*", *kwargs.flatten)
85 end
86end