#!/usr/bin/env ruby # frozen_string_literal: true # Resend inbound messages from the Redis stream to a customer's JID by replaying # them through the webhook handler. # # Usage: # resend-inbounds --start TIMESTAMP [--end TIMESTAMP] [targeting] [options] # # Targeting (at least one required): # --number TEL Customer phone number (repeatable) # --file PATH File with newline-separated phone numbers # --all Resend to ALL customers (requires confirmation or --force) # # TIMESTAMP is anything GNU date -d accepts: # - ISO 8601: 2025-06-01T00:00:00Z # - Relative: "3 hours ago" # - Date only: 2025-06-01 # - Unix: @1717200000 # # TEL is the JMP customer's phone number, with or without +1 prefix. 5551234567 # and +15551234567 are equivalent. # # Options: # --dry-run Print matching messages without POSTing. # --force Skip interactive confirmation for --all. # # Environment: # REDIS_URL Redis connection URL (default: redis://127.0.0.1:6379/0) # STREAM_KEY Stream key name (default: messages) # BATCH_SIZE Entries per XRANGE (default: 100) # WEBHOOK_URL Webhook endpoint (default: http://127.0.0.1:50209) # # Requires: GNU coreutils date (for relative timestamp parsing) # # Examples: # resend-inbounds --start "2025-06-01" --end "2025-06-02" --number +15551234567 # resend-inbounds --start "3 hours ago" --number 5551234567 --dry-run # resend-inbounds --start "3 hours ago" --number 5551234567 --number 5559876543 # resend-inbounds --start "2025-06-01" --file numbers.txt # resend-inbounds --start "2025-06-01" --all --force require "json" require "net/http" require "optparse" require "redis" require "set" require "shellwords" require "time" REDIS_URL = ENV.fetch("REDIS_URL", "redis://127.0.0.1:6379/0") STREAM_KEY = ENV.fetch("STREAM_KEY", "messages") BATCH_SIZE = ENV.fetch("BATCH_SIZE", "100").to_i WEBHOOK_URL = ENV.fetch("WEBHOOK_URL", "http://127.0.0.1:50209") def to_ms(str) # Try Ruby's Time.parse first, fall back to GNU date -d for relative # timestamps like "3 hours ago". epoch = begin Time.parse(str).to_i rescue ArgumentError out = `date -d #{str.shellescape} +%s 2>/dev/null`.chomp abort "error: cannot parse timestamp: #{str}" unless $?.success? out.to_i end epoch * 1000 end def normalise_tel(number, location: nil) case number when /\A\+1\d{10}\z/ then number when /\A\d{10}\z/ then "+1#{number}" else context = location ? " (#{location})" : "" abort "error: invalid phone number: #{number}#{context}" end end def build_payload(stream_id, fields) to = JSON.parse(fields["to"]) rescue [fields["owner"]] media_parsed = JSON.parse(fields["media_urls"]) rescue [] media = fields["media_urls"] ? media_parsed : [] JSON.dump([{ "type" => "message-received", "message" => { "id" => fields["bandwidth_id"], "direction" => "in", "owner" => fields["owner"], "from" => fields["from"], "to" => to, "time" => fields["timestamp"], "text" => fields["body"].to_s, "media" => media } }]) end def load_numbers_file(path) numbers = Set.new File.readlines(path).each_with_index do |line, idx| line = line.strip next if line.empty? || line.start_with?("#") numbers << normalise_tel(line, location: "line #{idx + 1}") end abort "error: no phone numbers found in #{path}" if numbers.empty? numbers rescue Errno::ENOENT abort "error: file not found: #{path}" rescue Errno::EACCES abort "error: permission denied: #{path}" end start_ms = nil end_ms = "+" owners = Set.new all_customers = false dry_run = false force = false OptionParser.new do |opts| opts.banner = "Usage: resend-inbounds --start TIMESTAMP [targeting] [options]" opts.on("--start TIMESTAMP", "Start of time range (required)") { |v| start_ms = to_ms(v).to_s } opts.on("--end TIMESTAMP", "End of time range (default: now)") { |v| end_ms = to_ms(v).to_s } opts.on("--number TEL", "Customer phone number (repeatable)") { |v| owners << normalise_tel(v) } opts.on("--file PATH", "File with newline-separated phone numbers") { |v| owners.merge(load_numbers_file(v)) } opts.on("--all", "Resend to ALL customers") { all_customers = true } opts.on("--dry-run", "Print matches without POSTing") { dry_run = true } opts.on("--force", "Skip interactive confirmation for --all") { force = true } end.parse! abort "error: --start is required" unless start_ms if all_customers && !owners.empty? abort "error: --all cannot be combined with --number or --file" end if !all_customers && owners.empty? abort "error: provide --number, --file, or --all" end if all_customers && !force action = dry_run ? "scan" : "resend" $stderr.puts "WARNING: This will #{action} ALL inbound messages in the given range." $stderr.puts " stream: #{STREAM_KEY}" $stderr.puts " start: #{start_ms}" $stderr.puts " end: #{end_ms == '+' ? 'now' : end_ms}" $stderr.puts " dry_run: #{dry_run}" $stderr.puts "" abort "error: refusing --all in non-interactive mode (use --force)" unless $stdin.tty? $stderr.print "Enter [y]es to continue, anything else aborts: " confirmation = $stdin.gets&.strip abort "aborted" unless confirmation == "yes" || confirmation == "y" end redis = Redis.new(url: REDIS_URL) uri = URI(WEBHOOK_URL) unless dry_run cursor = start_ms matched = 0 resent = 0 failed = 0 consecutive_failures = 0 max_consecutive_failures = 5 matched_owners = Set.new # Track which explicit owners had matches http = unless dry_run h = Net::HTTP.new(uri.host, uri.port) h.use_ssl = uri.scheme == "https" h.start end begin loop do entries = redis.xrange(STREAM_KEY, cursor, end_ms, count: BATCH_SIZE) break if entries.empty? entries.each do |id, fields| next unless %w[in thru].include?(fields["event"]) next unless all_customers || owners.include?(fields["owner"]) matched += 1 matched_owners << fields["owner"] unless all_customers ts = fields["timestamp"] || "unknown" body_preview = (fields["body"] || "")[0, 80] if dry_run puts "[dry-run] #{id} from=#{fields['from']} ts=#{ts} body=#{body_preview}" next end begin req = Net::HTTP::Post.new(uri) req["Content-Type"] = "application/json" req["X-JMP-Resend-Of"] = id req.body = build_payload(id, fields) resp = http.request(req) if resp.code == "200" puts "resent #{id} from=#{fields['from']} ts=#{ts} body=#{body_preview}" resent += 1 consecutive_failures = 0 else warn "FAILED (HTTP #{resp.code}) #{id} from=#{fields['from']} ts=#{ts}" failed += 1 consecutive_failures += 1 end rescue StandardError => e warn "ERROR #{id} #{e.class}: #{e.message}" failed += 1 consecutive_failures += 1 end if consecutive_failures >= max_consecutive_failures abort "aborting: #{consecutive_failures} consecutive failures, #{max_consecutive_failures} allowed" end end last_id = entries.last[0] ms, seq = last_id.split("-").map(&:to_i) cursor = "#{ms}-#{seq + 1}" break if entries.size < BATCH_SIZE end ensure http&.finish end target_desc = if all_customers "all customers" elsif owners.size == 1 owners.first else "#{owners.size} numbers" end if matched.zero? puts "no inbound messages found for #{target_desc} in the given range" else if dry_run puts "\n#{matched} inbound message(s) matched (dry run, nothing sent)" else puts "\n#{matched} matched, #{resent} resent, #{failed} failed" end no_matches = owners - matched_owners puts "numbers with no matches: #{no_matches.sort.join(', ')}" if no_matches.any? end