#!/usr/bin/env ruby # frozen_string_literal: true # Resend inbound messages from the Redis stream to a customer's JID by replaying # them through the webhook handler. # # Usage: # resend-inbounds --start TIMESTAMP --end TIMESTAMP --number TEL [--dry-run] # # TIMESTAMP is anything GNU date -d accepts: # - ISO 8601: 2025-06-01T00:00:00Z # - Relative: "3 hours ago" # - Date only: 2025-06-01 # - Unix: @1717200000 # # TEL is the JMP customer's phone number, with or without +1 prefix. 5551234567 # and +15551234567 are equivalent. # # --dry-run prints matching messages without POSTing. # # Environment: # REDIS_URL Redis connection URL (default: redis://127.0.0.1:6379/0) # STREAM_KEY Stream key name (default: messages) # BATCH_SIZE Entries per XRANGE (default: 100) # WEBHOOK_URL Webhook endpoint (default: http://127.0.0.1:50209) # # Requires: GNU coreutils date (for relative timestamp parsing) # # Examples: # resend-inbounds --start "2025-06-01" --end "2025-06-02" --number +15551234567 # resend-inbounds --start "3 hours ago" --number 5551234567 --dry-run require "json" require "net/http" require "optparse" require "redis" require "shellwords" require "time" REDIS_URL = ENV.fetch("REDIS_URL", "redis://127.0.0.1:6379/0") STREAM_KEY = ENV.fetch("STREAM_KEY", "messages") BATCH_SIZE = ENV.fetch("BATCH_SIZE", "100").to_i WEBHOOK_URL = ENV.fetch("WEBHOOK_URL", "http://127.0.0.1:50209") def to_ms(str) # Try Ruby's Time.parse first, fall back to GNU date -d for relative # timestamps like "3 hours ago". epoch = begin Time.parse(str).to_i rescue ArgumentError out = `date -d #{str.shellescape} +%s 2>/dev/null`.chomp abort "error: cannot parse timestamp: #{str}" unless $?.success? out.to_i end epoch * 1000 end def normalise_tel(number) case number when /\A\+1\d{10}\z/ then number when /\A\d{10}\z/ then "+1#{number}" else abort "error: invalid phone number: #{number}" end end def build_payload(stream_id, fields) to = JSON.parse(fields["to"]) rescue [fields["owner"]] media_parsed = JSON.parse(fields["media_urls"]) rescue [] media = fields["media_urls"] ? media_parsed : [] JSON.dump([{ "type" => "message-received", "message" => { "id" => fields["bandwidth_id"], "direction" => "in", "owner" => fields["owner"], "from" => fields["from"], "to" => to, "time" => fields["timestamp"], "text" => fields["body"].to_s, "media" => media } }]) end start_ms = nil end_ms = "+" owner = nil dry_run = false OptionParser.new do |opts| opts.banner = "Usage: resend-inbounds --start TIMESTAMP --number TEL [options]" opts.on("--start TIMESTAMP", "Start of time range (required)") { |v| start_ms = to_ms(v).to_s } opts.on("--end TIMESTAMP", "End of time range (default: now)") { |v| end_ms = to_ms(v).to_s } opts.on("--number TEL", "Customer phone number (required)") { |v| owner = normalise_tel(v) } opts.on("--dry-run", "Print matches without POSTing") { dry_run = true } end.parse! abort "error: --start is required" unless start_ms abort "error: --number is required" unless owner redis = Redis.new(url: REDIS_URL) uri = URI(WEBHOOK_URL) unless dry_run cursor = start_ms matched = 0 resent = 0 failed = 0 consecutive_failures = 0 max_consecutive_failures = 5 http = unless dry_run h = Net::HTTP.new(uri.host, uri.port) h.use_ssl = uri.scheme == "https" h.start end begin loop do entries = redis.xrange(STREAM_KEY, cursor, end_ms, count: BATCH_SIZE) break if entries.empty? entries.each do |id, fields| next unless fields["event"] == "in" && fields["owner"] == owner matched += 1 ts = fields["timestamp"] || "unknown" body_preview = (fields["body"] || "")[0, 80] if dry_run puts "[dry-run] #{id} from=#{fields['from']} ts=#{ts} body=#{body_preview}" next end begin req = Net::HTTP::Post.new(uri) req["Content-Type"] = "application/json" req["X-JMP-Resend-Of"] = id req.body = build_payload(id, fields) resp = http.request(req) if resp.code == "200" puts "resent #{id} from=#{fields['from']} ts=#{ts} body=#{body_preview}" resent += 1 consecutive_failures = 0 else warn "FAILED (HTTP #{resp.code}) #{id} from=#{fields['from']} ts=#{ts}" failed += 1 consecutive_failures += 1 end rescue StandardError => e warn "ERROR #{id} #{e.class}: #{e.message}" failed += 1 consecutive_failures += 1 end if consecutive_failures >= max_consecutive_failures abort "aborting: #{consecutive_failures} consecutive failures, #{max_consecutive_failures} allowed" end end last_id = entries.last[0] ms, seq = last_id.split("-").map(&:to_i) cursor = "#{ms}-#{seq + 1}" break if entries.size < BATCH_SIZE end ensure http&.finish end if matched.zero? puts "no inbound messages found for #{owner} in the given range" elsif dry_run puts "\n#{matched} inbound message(s) matched (dry run, nothing sent)" else puts "\n#{matched} matched, #{resent} resent, #{failed} failed" end