#!/usr/bin/env ruby # frozen_string_literal: true require "json" require "optparse" require "redis" require "shellwords" require "time" REDIS_URL = ENV.fetch("REDIS_URL", "redis://127.0.0.1:6379/0") STREAM_KEY = ENV.fetch("STREAM_KEY", "messages") BATCH_SIZE = ENV.fetch("BATCH_SIZE", "100").to_i module Color RESET = "\e[0m" RED = "\e[31m" GREEN = "\e[32m" YELLOW = "\e[33m" BLUE = "\e[34m" CYAN = "\e[36m" EVENTS = { "in" => GREEN, "out" => BLUE, "thru" => CYAN, "delivered" => CYAN, "failed" => RED, "resend" => YELLOW }.freeze @enabled = $stdout.tty? && !ENV.key?("NO_COLOR") def self.paint(text, color) @enabled ? "#{color}#{text}#{RESET}" : text end def self.event(name) paint(name, EVENTS.fetch(name, "")) end end def normalise_tel(value) case value when /\A\+1\d{10}\z/ then value when /\A\d{10}\z/ then "+1#{value}" end end def resolve_jid(redis, jid) creds = redis.lrange("catapult_cred-#{jid}", 0, -1) abort "error: no registration found for JID: #{jid}" if creds.empty? normalise_tel(creds.last) or abort "error: no valid phone number for JID: #{jid}" end def resolve_owner(redis, value) normalise_tel(value) || resolve_jid(redis, value) end def to_stream_id(str) epoch = begin Time.parse(str).to_i rescue ArgumentError out = `date -d #{str.shellescape} +%s 2>/dev/null`.chomp abort "error: cannot parse timestamp: #{str}" unless $?.success? out.to_i end "#{epoch * 1000}-0" end def format_event(fields, json: False, **) return JSON.dump(fields) if json event = fields["event"] parts = [fields.fetch("timestamp", "—"), Color.event(event)] parts << "owner=#{fields['owner']}" if fields["owner"] parts << "from=#{fields['from']}" if fields["from"] parts << "to=#{fields['to']}" if fields["to"] parts << "body=#{fields['body'].inspect}" if fields["body"] && !fields["body"].empty? media = begin JSON.parse(fields["media_urls"] || "[]") rescue JSON::ParserError [] end parts << "media=#{media.inspect}" unless media.empty? parts << "stanza=#{fields['stanza_id']}" if fields["stanza_id"] parts << "bw=#{fields['bandwidth_id']}" if fields["bandwidth_id"] if event == "failed" parts << "err=#{fields['error_code']}" parts << fields["error_description"].inspect if fields["error_description"] end if event == "resend" parts << "orig_stream=#{fields['original_stream_id']}" if fields["original_stream_id"] parts << "orig_bw=#{fields['original_bandwidth_id']}" if fields["original_bandwidth_id"] end parts.join(" ") end def matches?(fields, filters) return false if filters[:events].any? && !filters[:events].include?(fields["event"]) return false if filters[:owner] && fields["owner"] != filters[:owner] return false if filters[:from] && fields["from"] != filters[:from] return false if filters[:bandwidth_id] && fields["bandwidth_id"] != filters[:bandwidth_id] return false if filters[:stanza_id] && fields["stanza_id"] != filters[:stanza_id] true end # Decrement a Redis stream ID for reverse pagination. Returns nil if we've # reached the very beginning of the stream. def prev_stream_id(stream_id) ms, seq = stream_id.split("-").map(&:to_i) if seq.positive? "#{ms}-#{seq - 1}" elsif ms.positive? "#{ms - 1}" end end # --- Option parsing --- options = { events: [] } parser = OptionParser.new do |opts| opts.banner = "Usage: view-events --recent N [filters...]\n" \ " view-events --start TIMESTAMP [--end TIMESTAMP] [filters...]" opts.separator "" opts.separator "Time selection (one required):" opts.on("--recent N", Integer, "Show last N matching events") { |n| options[:recent] = n } opts.on("--start TIMESTAMP", "Start of time range") { |v| options[:start] = v } opts.on("--end TIMESTAMP", "End of time range (default: now)") { |v| options[:end] = v } opts.separator "" opts.separator "Event type filters (combinable, none = all):" %w[in out thru delivered failed resend].each do |event| opts.on("--#{event}", "Show #{event} events") { options[:events] << event } end opts.separator "" opts.separator "Other filters:" opts.on("--owner VALUE", "Phone number or JID") { |v| options[:owner] = v } opts.on("--from TEL", "Filter by sender") { |v| options[:from] = v } opts.on("--bandwidth-id ID", "Filter by Bandwidth msg ID") { |v| options[:bandwidth_id] = v } opts.on("--stanza-id ID", "Filter by XMPP stanza ID") { |v| options[:stanza_id] = v } opts.on("-j", "--json", "Format output as JSON") { |v| options[:json] = v } end parser.parse! abort parser.to_s unless options[:recent] || options[:start] abort "error: --recent and --start are mutually exclusive" if options[:recent] && options[:start] # --- Main --- redis = Redis.new(url: REDIS_URL) options[:owner] = resolve_owner(redis, options[:owner]) if options[:owner] if options[:from] options[:from] = normalise_tel(options[:from]) or abort "error: invalid phone number for --from" end filters = options.slice(:events, :owner, :from, :bandwidth_id, :stanza_id) count = 0 if options[:recent] target = options[:recent] matched = [] cursor = options[:end] ? to_stream_id(options[:end]) : "+" loop do entries = redis.xrevrange(STREAM_KEY, cursor, "-", count: BATCH_SIZE) break if entries.empty? entries.each do |_id, fields| if matches?(fields, filters) matched.unshift(fields) break if matched.length >= target end end break if matched.length >= target cursor = prev_stream_id(entries.last[0]) break unless cursor break if entries.size < BATCH_SIZE end matched.each { |fields| puts format_event(fields, **options) } count = matched.length else start_id = to_stream_id(options[:start]) end_id = options[:end] ? to_stream_id(options[:end]) : "+" cursor = start_id loop do entries = redis.xrange(STREAM_KEY, cursor, end_id, count: BATCH_SIZE) break if entries.empty? entries.each do |_id, fields| next unless matches?(fields, filters) puts format_event(fields, **options) count += 1 end last_id = entries.last[0] ms, seq = last_id.split("-").map(&:to_i) cursor = "#{ms}-#{seq + 1}" break if entries.size < BATCH_SIZE end end warn "no matching events found" if count.zero?