Add event viewer script

Amolith created

Change summary

bin/view-events | 216 +++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 216 insertions(+)

Detailed changes

bin/view-events 🔗

@@ -0,0 +1,216 @@
+#!/usr/bin/env ruby
+# frozen_string_literal: true
+
+require "json"
+require "optparse"
+require "redis"
+require "shellwords"
+require "time"
+
+REDIS_URL  = ENV.fetch("REDIS_URL",  "redis://127.0.0.1:6379/0")
+STREAM_KEY = ENV.fetch("STREAM_KEY", "messages")
+BATCH_SIZE = ENV.fetch("BATCH_SIZE", "100").to_i
+
+module Color
+	RESET  = "\e[0m"
+	RED    = "\e[31m"
+	GREEN  = "\e[32m"
+	YELLOW = "\e[33m"
+	BLUE   = "\e[34m"
+	CYAN   = "\e[36m"
+
+	EVENTS = {
+		"in"        => GREEN,
+		"out"       => BLUE,
+		"thru"      => CYAN,
+		"delivered" => CYAN,
+		"failed"    => RED,
+		"resend"    => YELLOW
+	}.freeze
+
+	@enabled = $stdout.tty? && !ENV.key?("NO_COLOR")
+
+	def self.paint(text, color)
+		@enabled ? "#{color}#{text}#{RESET}" : text
+	end
+
+	def self.event(name)
+		paint(name, EVENTS.fetch(name, ""))
+	end
+end
+
+def normalise_tel(value)
+	case value
+	when /\A\+1\d{10}\z/ then value
+	when /\A\d{10}\z/    then "+1#{value}"
+	end
+end
+
+def resolve_jid(redis, jid)
+	creds = redis.lrange("catapult_cred-#{jid}", 0, -1)
+	abort "error: no registration found for JID: #{jid}" if creds.empty?
+	normalise_tel(creds.last) or
+		abort "error: no valid phone number for JID: #{jid}"
+end
+
+def resolve_owner(redis, value)
+	normalise_tel(value) || resolve_jid(redis, value)
+end
+
+def to_stream_id(str)
+	epoch = begin
+		Time.parse(str).to_i
+	rescue ArgumentError
+		out = `date -d #{str.shellescape} +%s 2>/dev/null`.chomp
+		abort "error: cannot parse timestamp: #{str}" unless $?.success?
+		out.to_i
+	end
+	"#{epoch * 1000}-0"
+end
+
+def format_event(fields)
+	event = fields["event"]
+	parts = [fields.fetch("timestamp", "—"), Color.event(event)]
+
+	parts << "owner=#{fields['owner']}"           if fields["owner"]
+	parts << "from=#{fields['from']}"             if fields["from"]
+	parts << "to=#{fields['to']}"                 if fields["to"]
+	parts << "body=#{fields['body'].inspect}"     if fields["body"] && !fields["body"].empty?
+
+	media = begin
+		JSON.parse(fields["media_urls"] || "[]")
+	rescue JSON::ParserError
+		[]
+	end
+	parts << "media=#{media.inspect}" unless media.empty?
+
+	parts << "stanza=#{fields['stanza_id']}"      if fields["stanza_id"]
+	parts << "bw=#{fields['bandwidth_id']}"       if fields["bandwidth_id"]
+
+	if event == "failed"
+		parts << "err=#{fields['error_code']}"
+		parts << fields["error_description"].inspect if fields["error_description"]
+	end
+
+	if event == "resend"
+		parts << "orig_stream=#{fields['original_stream_id']}"   if fields["original_stream_id"]
+		parts << "orig_bw=#{fields['original_bandwidth_id']}"    if fields["original_bandwidth_id"]
+	end
+
+	parts.join("  ")
+end
+
+def matches?(fields, filters)
+	return false if filters[:events].any? && !filters[:events].include?(fields["event"])
+	return false if filters[:owner] && fields["owner"] != filters[:owner]
+	return false if filters[:from] && fields["from"] != filters[:from]
+	return false if filters[:bandwidth_id] && fields["bandwidth_id"] != filters[:bandwidth_id]
+	return false if filters[:stanza_id] && fields["stanza_id"] != filters[:stanza_id]
+	true
+end
+
+# Decrement a Redis stream ID for reverse pagination. Returns nil if we've
+# reached the very beginning of the stream.
+def prev_stream_id(stream_id)
+	ms, seq = stream_id.split("-").map(&:to_i)
+	if seq.positive?
+		"#{ms}-#{seq - 1}"
+	elsif ms.positive?
+		"#{ms - 1}"
+	end
+end
+
+# --- Option parsing ---
+
+options = { events: [] }
+
+parser = OptionParser.new do |opts|
+	opts.banner = "Usage: view-events --recent N [filters...]\n" \
+	              "       view-events --start TIMESTAMP [--end TIMESTAMP] [filters...]"
+
+	opts.separator ""
+	opts.separator "Time selection (one required):"
+	opts.on("--recent N", Integer, "Show last N matching events")    { |n| options[:recent] = n }
+	opts.on("--start TIMESTAMP", "Start of time range")              { |v| options[:start] = v }
+	opts.on("--end TIMESTAMP", "End of time range (default: now)")   { |v| options[:end] = v }
+
+	opts.separator ""
+	opts.separator "Event type filters (combinable, none = all):"
+	%w[in out thru delivered failed resend].each do |event|
+		opts.on("--#{event}", "Show #{event} events") { options[:events] << event }
+	end
+
+	opts.separator ""
+	opts.separator "Other filters:"
+	opts.on("--owner VALUE", "Phone number or JID")            { |v| options[:owner] = v }
+	opts.on("--from TEL", "Filter by sender")                  { |v| options[:from] = v }
+	opts.on("--bandwidth-id ID", "Filter by Bandwidth msg ID") { |v| options[:bandwidth_id] = v }
+	opts.on("--stanza-id ID", "Filter by XMPP stanza ID")      { |v| options[:stanza_id] = v }
+end
+
+parser.parse!
+
+abort parser.to_s unless options[:recent] || options[:start]
+abort "error: --recent and --start are mutually exclusive" if options[:recent] && options[:start]
+
+# --- Main ---
+
+redis = Redis.new(url: REDIS_URL)
+
+options[:owner] = resolve_owner(redis, options[:owner]) if options[:owner]
+
+if options[:from]
+	options[:from] = normalise_tel(options[:from]) or
+		abort "error: invalid phone number for --from"
+end
+
+filters = options.slice(:events, :owner, :from, :bandwidth_id, :stanza_id)
+count = 0
+
+if options[:recent]
+	target = options[:recent]
+	matched = []
+	cursor = options[:end] ? to_stream_id(options[:end]) : "+"
+
+	loop do
+		entries = redis.xrevrange(STREAM_KEY, cursor, "-", count: BATCH_SIZE)
+		break if entries.empty?
+
+		entries.each do |_id, fields|
+			if matches?(fields, filters)
+				matched.unshift(fields)
+				break if matched.length >= target
+			end
+		end
+
+		break if matched.length >= target
+		cursor = prev_stream_id(entries.last[0])
+		break unless cursor
+		break if entries.size < BATCH_SIZE
+	end
+
+	matched.each { |fields| puts format_event(fields) }
+	count = matched.length
+else
+	start_id = to_stream_id(options[:start])
+	end_id = options[:end] ? to_stream_id(options[:end]) : "+"
+	cursor = start_id
+
+	loop do
+		entries = redis.xrange(STREAM_KEY, cursor, end_id, count: BATCH_SIZE)
+		break if entries.empty?
+
+		entries.each do |_id, fields|
+			next unless matches?(fields, filters)
+			puts format_event(fields)
+			count += 1
+		end
+
+		last_id = entries.last[0]
+		ms, seq = last_id.split("-").map(&:to_i)
+		cursor = "#{ms}-#{seq + 1}"
+		break if entries.size < BATCH_SIZE
+	end
+end
+
+warn "no matching events found" if count.zero?