view-events

  1#!/usr/bin/env ruby
  2# frozen_string_literal: true
  3
  4require "json"
  5require "optparse"
  6require "redis"
  7require "shellwords"
  8require "time"
  9
 10REDIS_URL  = ENV.fetch("REDIS_URL",  "redis://127.0.0.1:6379/0")
 11STREAM_KEY = ENV.fetch("STREAM_KEY", "messages")
 12BATCH_SIZE = ENV.fetch("BATCH_SIZE", "100").to_i
 13
 14module Color
 15	RESET  = "\e[0m"
 16	RED    = "\e[31m"
 17	GREEN  = "\e[32m"
 18	YELLOW = "\e[33m"
 19	BLUE   = "\e[34m"
 20	CYAN   = "\e[36m"
 21
 22	EVENTS = {
 23		"in"        => GREEN,
 24		"out"       => BLUE,
 25		"thru"      => CYAN,
 26		"delivered" => CYAN,
 27		"failed"    => RED,
 28		"resend"    => YELLOW
 29	}.freeze
 30
 31	@enabled = $stdout.tty? && !ENV.key?("NO_COLOR")
 32
 33	def self.paint(text, color)
 34		@enabled ? "#{color}#{text}#{RESET}" : text
 35	end
 36
 37	def self.event(name)
 38		paint(name, EVENTS.fetch(name, ""))
 39	end
 40end
 41
 42def normalise_tel(value)
 43	case value
 44	when /\A\+1\d{10}\z/ then value
 45	when /\A\d{10}\z/    then "+1#{value}"
 46	end
 47end
 48
 49def resolve_jid(redis, jid)
 50	creds = redis.lrange("catapult_cred-#{jid}", 0, -1)
 51	abort "error: no registration found for JID: #{jid}" if creds.empty?
 52	normalise_tel(creds.last) or
 53		abort "error: no valid phone number for JID: #{jid}"
 54end
 55
 56def resolve_owner(redis, value)
 57	normalise_tel(value) || resolve_jid(redis, value)
 58end
 59
 60def to_stream_id(str)
 61	epoch = begin
 62		Time.parse(str).to_i
 63	rescue ArgumentError
 64		out = `date -d #{str.shellescape} +%s 2>/dev/null`.chomp
 65		abort "error: cannot parse timestamp: #{str}" unless $?.success?
 66		out.to_i
 67	end
 68	"#{epoch * 1000}-0"
 69end
 70
 71def format_event(fields, json: False, **)
 72	return JSON.dump(fields) if json
 73
 74	event = fields["event"]
 75	parts = [fields.fetch("timestamp", "—"), Color.event(event)]
 76
 77	parts << "owner=#{fields['owner']}"           if fields["owner"]
 78	parts << "from=#{fields['from']}"             if fields["from"]
 79	parts << "to=#{fields['to']}"                 if fields["to"]
 80	parts << "body=#{fields['body'].inspect}"     if fields["body"] && !fields["body"].empty?
 81
 82	media = begin
 83		JSON.parse(fields["media_urls"] || "[]")
 84	rescue JSON::ParserError
 85		[]
 86	end
 87	parts << "media=#{media.inspect}" unless media.empty?
 88
 89	parts << "stanza=#{fields['stanza_id']}"      if fields["stanza_id"]
 90	parts << "bw=#{fields['bandwidth_id']}"       if fields["bandwidth_id"]
 91
 92	if event == "failed"
 93		parts << "err=#{fields['error_code']}"
 94		parts << fields["error_description"].inspect if fields["error_description"]
 95	end
 96
 97	if event == "resend"
 98		parts << "orig_stream=#{fields['original_stream_id']}"   if fields["original_stream_id"]
 99		parts << "orig_bw=#{fields['original_bandwidth_id']}"    if fields["original_bandwidth_id"]
100	end
101
102	parts.join("  ")
103end
104
105def matches?(fields, filters)
106	return false if filters[:events].any? && !filters[:events].include?(fields["event"])
107	return false if filters[:owner] && fields["owner"] != filters[:owner]
108	return false if filters[:from] && fields["from"] != filters[:from]
109	return false if filters[:bandwidth_id] && fields["bandwidth_id"] != filters[:bandwidth_id]
110	return false if filters[:stanza_id] && fields["stanza_id"] != filters[:stanza_id]
111	true
112end
113
114# Decrement a Redis stream ID for reverse pagination. Returns nil if we've
115# reached the very beginning of the stream.
116def prev_stream_id(stream_id)
117	ms, seq = stream_id.split("-").map(&:to_i)
118	if seq.positive?
119		"#{ms}-#{seq - 1}"
120	elsif ms.positive?
121		"#{ms - 1}"
122	end
123end
124
125# --- Option parsing ---
126
127options = { events: [] }
128
129parser = OptionParser.new do |opts|
130	opts.banner = "Usage: view-events --recent N [filters...]\n" \
131	              "       view-events --start TIMESTAMP [--end TIMESTAMP] [filters...]"
132
133	opts.separator ""
134	opts.separator "Time selection (one required):"
135	opts.on("--recent N", Integer, "Show last N matching events")    { |n| options[:recent] = n }
136	opts.on("--start TIMESTAMP", "Start of time range")              { |v| options[:start] = v }
137	opts.on("--end TIMESTAMP", "End of time range (default: now)")   { |v| options[:end] = v }
138
139	opts.separator ""
140	opts.separator "Event type filters (combinable, none = all):"
141	%w[in out thru delivered failed resend].each do |event|
142		opts.on("--#{event}", "Show #{event} events") { options[:events] << event }
143	end
144
145	opts.separator ""
146	opts.separator "Other filters:"
147	opts.on("--owner VALUE", "Phone number or JID")            { |v| options[:owner] = v }
148	opts.on("--from TEL", "Filter by sender")                  { |v| options[:from] = v }
149	opts.on("--bandwidth-id ID", "Filter by Bandwidth msg ID") { |v| options[:bandwidth_id] = v }
150	opts.on("--stanza-id ID", "Filter by XMPP stanza ID")      { |v| options[:stanza_id] = v }
151	opts.on("-j", "--json", "Format output as JSON")           { |v| options[:json] = v }
152end
153
154parser.parse!
155
156abort parser.to_s unless options[:recent] || options[:start]
157abort "error: --recent and --start are mutually exclusive" if options[:recent] && options[:start]
158
159# --- Main ---
160
161redis = Redis.new(url: REDIS_URL)
162
163options[:owner] = resolve_owner(redis, options[:owner]) if options[:owner]
164
165if options[:from]
166	options[:from] = normalise_tel(options[:from]) or
167		abort "error: invalid phone number for --from"
168end
169
170filters = options.slice(:events, :owner, :from, :bandwidth_id, :stanza_id)
171count = 0
172
173if options[:recent]
174	target = options[:recent]
175	matched = []
176	cursor = options[:end] ? to_stream_id(options[:end]) : "+"
177
178	loop do
179		entries = redis.xrevrange(STREAM_KEY, cursor, "-", count: BATCH_SIZE)
180		break if entries.empty?
181
182		entries.each do |_id, fields|
183			if matches?(fields, filters)
184				matched.unshift(fields)
185				break if matched.length >= target
186			end
187		end
188
189		break if matched.length >= target
190		cursor = prev_stream_id(entries.last[0])
191		break unless cursor
192		break if entries.size < BATCH_SIZE
193	end
194
195	matched.each { |fields| puts format_event(fields, **options) }
196	count = matched.length
197else
198	start_id = to_stream_id(options[:start])
199	end_id = options[:end] ? to_stream_id(options[:end]) : "+"
200	cursor = start_id
201
202	loop do
203		entries = redis.xrange(STREAM_KEY, cursor, end_id, count: BATCH_SIZE)
204		break if entries.empty?
205
206		entries.each do |_id, fields|
207			next unless matches?(fields, filters)
208			puts format_event(fields, **options)
209			count += 1
210		end
211
212		last_id = entries.last[0]
213		ms, seq = last_id.split("-").map(&:to_i)
214		cursor = "#{ms}-#{seq + 1}"
215		break if entries.size < BATCH_SIZE
216	end
217end
218
219warn "no matching events found" if count.zero?