1#!/usr/bin/env ruby
2# frozen_string_literal: true
3
4require "json"
5require "optparse"
6require "redis"
7require "shellwords"
8require "time"
9
10REDIS_URL = ENV.fetch("REDIS_URL", "redis://127.0.0.1:6379/0")
11STREAM_KEY = ENV.fetch("STREAM_KEY", "messages")
12BATCH_SIZE = ENV.fetch("BATCH_SIZE", "100").to_i
13
14module Color
15 RESET = "\e[0m"
16 RED = "\e[31m"
17 GREEN = "\e[32m"
18 YELLOW = "\e[33m"
19 BLUE = "\e[34m"
20 CYAN = "\e[36m"
21
22 EVENTS = {
23 "in" => GREEN,
24 "out" => BLUE,
25 "thru" => CYAN,
26 "delivered" => CYAN,
27 "failed" => RED,
28 "resend" => YELLOW
29 }.freeze
30
31 @enabled = $stdout.tty? && !ENV.key?("NO_COLOR")
32
33 def self.paint(text, color)
34 @enabled ? "#{color}#{text}#{RESET}" : text
35 end
36
37 def self.event(name)
38 paint(name, EVENTS.fetch(name, ""))
39 end
40end
41
42def normalise_tel(value)
43 case value
44 when /\A\+1\d{10}\z/ then value
45 when /\A\d{10}\z/ then "+1#{value}"
46 end
47end
48
49def resolve_jid(redis, jid)
50 creds = redis.lrange("catapult_cred-#{jid}", 0, -1)
51 abort "error: no registration found for JID: #{jid}" if creds.empty?
52 normalise_tel(creds.last) or
53 abort "error: no valid phone number for JID: #{jid}"
54end
55
56def resolve_owner(redis, value)
57 normalise_tel(value) || resolve_jid(redis, value)
58end
59
60def to_stream_id(str)
61 epoch = begin
62 Time.parse(str).to_i
63 rescue ArgumentError
64 out = `date -d #{str.shellescape} +%s 2>/dev/null`.chomp
65 abort "error: cannot parse timestamp: #{str}" unless $?.success?
66 out.to_i
67 end
68 "#{epoch * 1000}-0"
69end
70
71def format_event(fields)
72 event = fields["event"]
73 parts = [fields.fetch("timestamp", "—"), Color.event(event)]
74
75 parts << "owner=#{fields['owner']}" if fields["owner"]
76 parts << "from=#{fields['from']}" if fields["from"]
77 parts << "to=#{fields['to']}" if fields["to"]
78 parts << "body=#{fields['body'].inspect}" if fields["body"] && !fields["body"].empty?
79
80 media = begin
81 JSON.parse(fields["media_urls"] || "[]")
82 rescue JSON::ParserError
83 []
84 end
85 parts << "media=#{media.inspect}" unless media.empty?
86
87 parts << "stanza=#{fields['stanza_id']}" if fields["stanza_id"]
88 parts << "bw=#{fields['bandwidth_id']}" if fields["bandwidth_id"]
89
90 if event == "failed"
91 parts << "err=#{fields['error_code']}"
92 parts << fields["error_description"].inspect if fields["error_description"]
93 end
94
95 if event == "resend"
96 parts << "orig_stream=#{fields['original_stream_id']}" if fields["original_stream_id"]
97 parts << "orig_bw=#{fields['original_bandwidth_id']}" if fields["original_bandwidth_id"]
98 end
99
100 parts.join(" ")
101end
102
103def matches?(fields, filters)
104 return false if filters[:events].any? && !filters[:events].include?(fields["event"])
105 return false if filters[:owner] && fields["owner"] != filters[:owner]
106 return false if filters[:from] && fields["from"] != filters[:from]
107 return false if filters[:bandwidth_id] && fields["bandwidth_id"] != filters[:bandwidth_id]
108 return false if filters[:stanza_id] && fields["stanza_id"] != filters[:stanza_id]
109 true
110end
111
112# Decrement a Redis stream ID for reverse pagination. Returns nil if we've
113# reached the very beginning of the stream.
114def prev_stream_id(stream_id)
115 ms, seq = stream_id.split("-").map(&:to_i)
116 if seq.positive?
117 "#{ms}-#{seq - 1}"
118 elsif ms.positive?
119 "#{ms - 1}"
120 end
121end
122
123# --- Option parsing ---
124
125options = { events: [] }
126
127parser = OptionParser.new do |opts|
128 opts.banner = "Usage: view-events --recent N [filters...]\n" \
129 " view-events --start TIMESTAMP [--end TIMESTAMP] [filters...]"
130
131 opts.separator ""
132 opts.separator "Time selection (one required):"
133 opts.on("--recent N", Integer, "Show last N matching events") { |n| options[:recent] = n }
134 opts.on("--start TIMESTAMP", "Start of time range") { |v| options[:start] = v }
135 opts.on("--end TIMESTAMP", "End of time range (default: now)") { |v| options[:end] = v }
136
137 opts.separator ""
138 opts.separator "Event type filters (combinable, none = all):"
139 %w[in out thru delivered failed resend].each do |event|
140 opts.on("--#{event}", "Show #{event} events") { options[:events] << event }
141 end
142
143 opts.separator ""
144 opts.separator "Other filters:"
145 opts.on("--owner VALUE", "Phone number or JID") { |v| options[:owner] = v }
146 opts.on("--from TEL", "Filter by sender") { |v| options[:from] = v }
147 opts.on("--bandwidth-id ID", "Filter by Bandwidth msg ID") { |v| options[:bandwidth_id] = v }
148 opts.on("--stanza-id ID", "Filter by XMPP stanza ID") { |v| options[:stanza_id] = v }
149end
150
151parser.parse!
152
153abort parser.to_s unless options[:recent] || options[:start]
154abort "error: --recent and --start are mutually exclusive" if options[:recent] && options[:start]
155
156# --- Main ---
157
158redis = Redis.new(url: REDIS_URL)
159
160options[:owner] = resolve_owner(redis, options[:owner]) if options[:owner]
161
162if options[:from]
163 options[:from] = normalise_tel(options[:from]) or
164 abort "error: invalid phone number for --from"
165end
166
167filters = options.slice(:events, :owner, :from, :bandwidth_id, :stanza_id)
168count = 0
169
170if options[:recent]
171 target = options[:recent]
172 matched = []
173 cursor = options[:end] ? to_stream_id(options[:end]) : "+"
174
175 loop do
176 entries = redis.xrevrange(STREAM_KEY, cursor, "-", count: BATCH_SIZE)
177 break if entries.empty?
178
179 entries.each do |_id, fields|
180 if matches?(fields, filters)
181 matched.unshift(fields)
182 break if matched.length >= target
183 end
184 end
185
186 break if matched.length >= target
187 cursor = prev_stream_id(entries.last[0])
188 break unless cursor
189 break if entries.size < BATCH_SIZE
190 end
191
192 matched.each { |fields| puts format_event(fields) }
193 count = matched.length
194else
195 start_id = to_stream_id(options[:start])
196 end_id = options[:end] ? to_stream_id(options[:end]) : "+"
197 cursor = start_id
198
199 loop do
200 entries = redis.xrange(STREAM_KEY, cursor, end_id, count: BATCH_SIZE)
201 break if entries.empty?
202
203 entries.each do |_id, fields|
204 next unless matches?(fields, filters)
205 puts format_event(fields)
206 count += 1
207 end
208
209 last_id = entries.last[0]
210 ms, seq = last_id.split("-").map(&:to_i)
211 cursor = "#{ms}-#{seq + 1}"
212 break if entries.size < BATCH_SIZE
213 end
214end
215
216warn "no matching events found" if count.zero?