diff --git a/bin/view-events b/bin/view-events new file mode 100755 index 0000000000000000000000000000000000000000..51e33f64f63547b0ddb20f86b46ac868fca17df4 --- /dev/null +++ b/bin/view-events @@ -0,0 +1,216 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require "json" +require "optparse" +require "redis" +require "shellwords" +require "time" + +REDIS_URL = ENV.fetch("REDIS_URL", "redis://127.0.0.1:6379/0") +STREAM_KEY = ENV.fetch("STREAM_KEY", "messages") +BATCH_SIZE = ENV.fetch("BATCH_SIZE", "100").to_i + +module Color + RESET = "\e[0m" + RED = "\e[31m" + GREEN = "\e[32m" + YELLOW = "\e[33m" + BLUE = "\e[34m" + CYAN = "\e[36m" + + EVENTS = { + "in" => GREEN, + "out" => BLUE, + "thru" => CYAN, + "delivered" => CYAN, + "failed" => RED, + "resend" => YELLOW + }.freeze + + @enabled = $stdout.tty? && !ENV.key?("NO_COLOR") + + def self.paint(text, color) + @enabled ? "#{color}#{text}#{RESET}" : text + end + + def self.event(name) + paint(name, EVENTS.fetch(name, "")) + end +end + +def normalise_tel(value) + case value + when /\A\+1\d{10}\z/ then value + when /\A\d{10}\z/ then "+1#{value}" + end +end + +def resolve_jid(redis, jid) + creds = redis.lrange("catapult_cred-#{jid}", 0, -1) + abort "error: no registration found for JID: #{jid}" if creds.empty? + normalise_tel(creds.last) or + abort "error: no valid phone number for JID: #{jid}" +end + +def resolve_owner(redis, value) + normalise_tel(value) || resolve_jid(redis, value) +end + +def to_stream_id(str) + epoch = begin + Time.parse(str).to_i + rescue ArgumentError + out = `date -d #{str.shellescape} +%s 2>/dev/null`.chomp + abort "error: cannot parse timestamp: #{str}" unless $?.success? + out.to_i + end + "#{epoch * 1000}-0" +end + +def format_event(fields) + event = fields["event"] + parts = [fields.fetch("timestamp", "—"), Color.event(event)] + + parts << "owner=#{fields['owner']}" if fields["owner"] + parts << "from=#{fields['from']}" if fields["from"] + parts << "to=#{fields['to']}" if fields["to"] + parts << "body=#{fields['body'].inspect}" if fields["body"] && !fields["body"].empty? + + media = begin + JSON.parse(fields["media_urls"] || "[]") + rescue JSON::ParserError + [] + end + parts << "media=#{media.inspect}" unless media.empty? + + parts << "stanza=#{fields['stanza_id']}" if fields["stanza_id"] + parts << "bw=#{fields['bandwidth_id']}" if fields["bandwidth_id"] + + if event == "failed" + parts << "err=#{fields['error_code']}" + parts << fields["error_description"].inspect if fields["error_description"] + end + + if event == "resend" + parts << "orig_stream=#{fields['original_stream_id']}" if fields["original_stream_id"] + parts << "orig_bw=#{fields['original_bandwidth_id']}" if fields["original_bandwidth_id"] + end + + parts.join(" ") +end + +def matches?(fields, filters) + return false if filters[:events].any? && !filters[:events].include?(fields["event"]) + return false if filters[:owner] && fields["owner"] != filters[:owner] + return false if filters[:from] && fields["from"] != filters[:from] + return false if filters[:bandwidth_id] && fields["bandwidth_id"] != filters[:bandwidth_id] + return false if filters[:stanza_id] && fields["stanza_id"] != filters[:stanza_id] + true +end + +# Decrement a Redis stream ID for reverse pagination. Returns nil if we've +# reached the very beginning of the stream. +def prev_stream_id(stream_id) + ms, seq = stream_id.split("-").map(&:to_i) + if seq.positive? + "#{ms}-#{seq - 1}" + elsif ms.positive? + "#{ms - 1}" + end +end + +# --- Option parsing --- + +options = { events: [] } + +parser = OptionParser.new do |opts| + opts.banner = "Usage: view-events --recent N [filters...]\n" \ + " view-events --start TIMESTAMP [--end TIMESTAMP] [filters...]" + + opts.separator "" + opts.separator "Time selection (one required):" + opts.on("--recent N", Integer, "Show last N matching events") { |n| options[:recent] = n } + opts.on("--start TIMESTAMP", "Start of time range") { |v| options[:start] = v } + opts.on("--end TIMESTAMP", "End of time range (default: now)") { |v| options[:end] = v } + + opts.separator "" + opts.separator "Event type filters (combinable, none = all):" + %w[in out thru delivered failed resend].each do |event| + opts.on("--#{event}", "Show #{event} events") { options[:events] << event } + end + + opts.separator "" + opts.separator "Other filters:" + opts.on("--owner VALUE", "Phone number or JID") { |v| options[:owner] = v } + opts.on("--from TEL", "Filter by sender") { |v| options[:from] = v } + opts.on("--bandwidth-id ID", "Filter by Bandwidth msg ID") { |v| options[:bandwidth_id] = v } + opts.on("--stanza-id ID", "Filter by XMPP stanza ID") { |v| options[:stanza_id] = v } +end + +parser.parse! + +abort parser.to_s unless options[:recent] || options[:start] +abort "error: --recent and --start are mutually exclusive" if options[:recent] && options[:start] + +# --- Main --- + +redis = Redis.new(url: REDIS_URL) + +options[:owner] = resolve_owner(redis, options[:owner]) if options[:owner] + +if options[:from] + options[:from] = normalise_tel(options[:from]) or + abort "error: invalid phone number for --from" +end + +filters = options.slice(:events, :owner, :from, :bandwidth_id, :stanza_id) +count = 0 + +if options[:recent] + target = options[:recent] + matched = [] + cursor = options[:end] ? to_stream_id(options[:end]) : "+" + + loop do + entries = redis.xrevrange(STREAM_KEY, cursor, "-", count: BATCH_SIZE) + break if entries.empty? + + entries.each do |_id, fields| + if matches?(fields, filters) + matched.unshift(fields) + break if matched.length >= target + end + end + + break if matched.length >= target + cursor = prev_stream_id(entries.last[0]) + break unless cursor + break if entries.size < BATCH_SIZE + end + + matched.each { |fields| puts format_event(fields) } + count = matched.length +else + start_id = to_stream_id(options[:start]) + end_id = options[:end] ? to_stream_id(options[:end]) : "+" + cursor = start_id + + loop do + entries = redis.xrange(STREAM_KEY, cursor, end_id, count: BATCH_SIZE) + break if entries.empty? + + entries.each do |_id, fields| + next unless matches?(fields, filters) + puts format_event(fields) + count += 1 + end + + last_id = entries.last[0] + ms, seq = last_id.split("-").map(&:to_i) + cursor = "#{ms}-#{seq + 1}" + break if entries.size < BATCH_SIZE + end +end + +warn "no matching events found" if count.zero?