view-events

  1#!/usr/bin/env ruby
  2# frozen_string_literal: true
  3
  4require "json"
  5require "optparse"
  6require "redis"
  7require "shellwords"
  8require "time"
  9
 10REDIS_URL  = ENV.fetch("REDIS_URL",  "redis://127.0.0.1:6379/0")
 11STREAM_KEY = ENV.fetch("STREAM_KEY", "messages")
 12BATCH_SIZE = ENV.fetch("BATCH_SIZE", "100").to_i
 13
 14module Color
 15	RESET  = "\e[0m"
 16	RED    = "\e[31m"
 17	GREEN  = "\e[32m"
 18	YELLOW = "\e[33m"
 19	BLUE   = "\e[34m"
 20	CYAN   = "\e[36m"
 21
 22	EVENTS = {
 23		"in"        => GREEN,
 24		"out"       => BLUE,
 25		"thru"      => CYAN,
 26		"delivered" => CYAN,
 27		"failed"    => RED,
 28		"resend"    => YELLOW
 29	}.freeze
 30
 31	@enabled = $stdout.tty? && !ENV.key?("NO_COLOR")
 32
 33	def self.paint(text, color)
 34		@enabled ? "#{color}#{text}#{RESET}" : text
 35	end
 36
 37	def self.event(name)
 38		paint(name, EVENTS.fetch(name, ""))
 39	end
 40end
 41
 42def normalise_tel(value)
 43	case value
 44	when /\A\+1\d{10}\z/ then value
 45	when /\A\d{10}\z/    then "+1#{value}"
 46	end
 47end
 48
 49def resolve_jid(redis, jid)
 50	creds = redis.lrange("catapult_cred-#{jid}", 0, -1)
 51	abort "error: no registration found for JID: #{jid}" if creds.empty?
 52	normalise_tel(creds.last) or
 53		abort "error: no valid phone number for JID: #{jid}"
 54end
 55
 56def resolve_owner(redis, value)
 57	normalise_tel(value) || resolve_jid(redis, value)
 58end
 59
 60def to_stream_id(str)
 61	epoch = begin
 62		Time.parse(str).to_i
 63	rescue ArgumentError
 64		out = `date -d #{str.shellescape} +%s 2>/dev/null`.chomp
 65		abort "error: cannot parse timestamp: #{str}" unless $?.success?
 66		out.to_i
 67	end
 68	"#{epoch * 1000}-0"
 69end
 70
 71def format_event(fields)
 72	event = fields["event"]
 73	parts = [fields.fetch("timestamp", "—"), Color.event(event)]
 74
 75	parts << "owner=#{fields['owner']}"           if fields["owner"]
 76	parts << "from=#{fields['from']}"             if fields["from"]
 77	parts << "to=#{fields['to']}"                 if fields["to"]
 78	parts << "body=#{fields['body'].inspect}"     if fields["body"] && !fields["body"].empty?
 79
 80	media = begin
 81		JSON.parse(fields["media_urls"] || "[]")
 82	rescue JSON::ParserError
 83		[]
 84	end
 85	parts << "media=#{media.inspect}" unless media.empty?
 86
 87	parts << "stanza=#{fields['stanza_id']}"      if fields["stanza_id"]
 88	parts << "bw=#{fields['bandwidth_id']}"       if fields["bandwidth_id"]
 89
 90	if event == "failed"
 91		parts << "err=#{fields['error_code']}"
 92		parts << fields["error_description"].inspect if fields["error_description"]
 93	end
 94
 95	if event == "resend"
 96		parts << "orig_stream=#{fields['original_stream_id']}"   if fields["original_stream_id"]
 97		parts << "orig_bw=#{fields['original_bandwidth_id']}"    if fields["original_bandwidth_id"]
 98	end
 99
100	parts.join("  ")
101end
102
103def matches?(fields, filters)
104	return false if filters[:events].any? && !filters[:events].include?(fields["event"])
105	return false if filters[:owner] && fields["owner"] != filters[:owner]
106	return false if filters[:from] && fields["from"] != filters[:from]
107	return false if filters[:bandwidth_id] && fields["bandwidth_id"] != filters[:bandwidth_id]
108	return false if filters[:stanza_id] && fields["stanza_id"] != filters[:stanza_id]
109	true
110end
111
112# Decrement a Redis stream ID for reverse pagination. Returns nil if we've
113# reached the very beginning of the stream.
114def prev_stream_id(stream_id)
115	ms, seq = stream_id.split("-").map(&:to_i)
116	if seq.positive?
117		"#{ms}-#{seq - 1}"
118	elsif ms.positive?
119		"#{ms - 1}"
120	end
121end
122
123# --- Option parsing ---
124
125options = { events: [] }
126
127parser = OptionParser.new do |opts|
128	opts.banner = "Usage: view-events --recent N [filters...]\n" \
129	              "       view-events --start TIMESTAMP [--end TIMESTAMP] [filters...]"
130
131	opts.separator ""
132	opts.separator "Time selection (one required):"
133	opts.on("--recent N", Integer, "Show last N matching events")    { |n| options[:recent] = n }
134	opts.on("--start TIMESTAMP", "Start of time range")              { |v| options[:start] = v }
135	opts.on("--end TIMESTAMP", "End of time range (default: now)")   { |v| options[:end] = v }
136
137	opts.separator ""
138	opts.separator "Event type filters (combinable, none = all):"
139	%w[in out thru delivered failed resend].each do |event|
140		opts.on("--#{event}", "Show #{event} events") { options[:events] << event }
141	end
142
143	opts.separator ""
144	opts.separator "Other filters:"
145	opts.on("--owner VALUE", "Phone number or JID")            { |v| options[:owner] = v }
146	opts.on("--from TEL", "Filter by sender")                  { |v| options[:from] = v }
147	opts.on("--bandwidth-id ID", "Filter by Bandwidth msg ID") { |v| options[:bandwidth_id] = v }
148	opts.on("--stanza-id ID", "Filter by XMPP stanza ID")      { |v| options[:stanza_id] = v }
149end
150
151parser.parse!
152
153abort parser.to_s unless options[:recent] || options[:start]
154abort "error: --recent and --start are mutually exclusive" if options[:recent] && options[:start]
155
156# --- Main ---
157
158redis = Redis.new(url: REDIS_URL)
159
160options[:owner] = resolve_owner(redis, options[:owner]) if options[:owner]
161
162if options[:from]
163	options[:from] = normalise_tel(options[:from]) or
164		abort "error: invalid phone number for --from"
165end
166
167filters = options.slice(:events, :owner, :from, :bandwidth_id, :stanza_id)
168count = 0
169
170if options[:recent]
171	target = options[:recent]
172	matched = []
173	cursor = options[:end] ? to_stream_id(options[:end]) : "+"
174
175	loop do
176		entries = redis.xrevrange(STREAM_KEY, cursor, "-", count: BATCH_SIZE)
177		break if entries.empty?
178
179		entries.each do |_id, fields|
180			if matches?(fields, filters)
181				matched.unshift(fields)
182				break if matched.length >= target
183			end
184		end
185
186		break if matched.length >= target
187		cursor = prev_stream_id(entries.last[0])
188		break unless cursor
189		break if entries.size < BATCH_SIZE
190	end
191
192	matched.each { |fields| puts format_event(fields) }
193	count = matched.length
194else
195	start_id = to_stream_id(options[:start])
196	end_id = options[:end] ? to_stream_id(options[:end]) : "+"
197	cursor = start_id
198
199	loop do
200		entries = redis.xrange(STREAM_KEY, cursor, end_id, count: BATCH_SIZE)
201		break if entries.empty?
202
203		entries.each do |_id, fields|
204			next unless matches?(fields, filters)
205			puts format_event(fields)
206			count += 1
207		end
208
209		last_id = entries.last[0]
210		ms, seq = last_id.split("-").map(&:to_i)
211		cursor = "#{ms}-#{seq + 1}"
212		break if entries.size < BATCH_SIZE
213	end
214end
215
216warn "no matching events found" if count.zero?