diff --git a/bin/resend-inbounds b/bin/resend-inbounds new file mode 100755 index 0000000000000000000000000000000000000000..fccc0aaba932d4d0f41f8907f50d373768622b32 --- /dev/null +++ b/bin/resend-inbounds @@ -0,0 +1,176 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +# Resend inbound messages from the Redis stream to a customer's JID by replaying +# them through the webhook handler. +# +# Usage: +# resend-inbounds --start TIMESTAMP --end TIMESTAMP --number TEL [--dry-run] +# +# TIMESTAMP is anything GNU date -d accepts: +# - ISO 8601: 2025-06-01T00:00:00Z +# - Relative: "3 hours ago" +# - Date only: 2025-06-01 +# - Unix: @1717200000 +# +# TEL is the JMP customer's phone number, with or without +1 prefix. 5551234567 +# and +15551234567 are equivalent. +# +# --dry-run prints matching messages without POSTing. +# +# Environment: +# REDIS_URL Redis connection URL (default: redis://127.0.0.1:6379/0) +# STREAM_KEY Stream key name (default: messages) +# BATCH_SIZE Entries per XRANGE (default: 100) +# WEBHOOK_URL Webhook endpoint (default: http://127.0.0.1:50209) +# +# Requires: GNU coreutils date (for relative timestamp parsing) +# +# Examples: +# resend-inbounds --start "2025-06-01" --end "2025-06-02" --number +15551234567 +# resend-inbounds --start "3 hours ago" --number 5551234567 --dry-run + +require "json" +require "net/http" +require "optparse" +require "redis" +require "shellwords" +require "time" + +REDIS_URL = ENV.fetch("REDIS_URL", "redis://127.0.0.1:6379/0") +STREAM_KEY = ENV.fetch("STREAM_KEY", "messages") +BATCH_SIZE = ENV.fetch("BATCH_SIZE", "100").to_i +WEBHOOK_URL = ENV.fetch("WEBHOOK_URL", "http://127.0.0.1:50209") + +def to_ms(str) + # Try Ruby's Time.parse first, fall back to GNU date -d for relative + # timestamps like "3 hours ago". + epoch = begin + Time.parse(str).to_i + rescue ArgumentError + out = `date -d #{str.shellescape} +%s 2>/dev/null`.chomp + abort "error: cannot parse timestamp: #{str}" unless $?.success? + out.to_i + end + epoch * 1000 +end + +def normalise_tel(number) + case number + when /\A\+1\d{10}\z/ then number + when /\A\d{10}\z/ then "+1#{number}" + else abort "error: invalid phone number: #{number}" + end +end + +def build_payload(stream_id, fields) + to = JSON.parse(fields["to"]) rescue [fields["owner"]] + media_parsed = JSON.parse(fields["media_urls"]) rescue [] + media = fields["media_urls"] ? media_parsed : [] + + JSON.dump([{ + "type" => "message-received", + "message" => { + "id" => fields["bandwidth_id"], + "direction" => "in", + "owner" => fields["owner"], + "from" => fields["from"], + "to" => to, + "time" => fields["timestamp"], + "text" => fields["body"].to_s, + "media" => media + } + }]) +end + +start_ms = nil +end_ms = "+" +owner = nil +dry_run = false + +OptionParser.new do |opts| + opts.banner = "Usage: resend-inbounds --start TIMESTAMP --number TEL [options]" + opts.on("--start TIMESTAMP", "Start of time range (required)") { |v| start_ms = to_ms(v).to_s } + opts.on("--end TIMESTAMP", "End of time range (default: now)") { |v| end_ms = to_ms(v).to_s } + opts.on("--number TEL", "Customer phone number (required)") { |v| owner = normalise_tel(v) } + opts.on("--dry-run", "Print matches without POSTing") { dry_run = true } +end.parse! + +abort "error: --start is required" unless start_ms +abort "error: --number is required" unless owner + +redis = Redis.new(url: REDIS_URL) +uri = URI(WEBHOOK_URL) unless dry_run +cursor = start_ms +matched = 0 +resent = 0 +failed = 0 +consecutive_failures = 0 +max_consecutive_failures = 5 + +http = unless dry_run + h = Net::HTTP.new(uri.host, uri.port) + h.use_ssl = uri.scheme == "https" + h.start +end + +begin + loop do + entries = redis.xrange(STREAM_KEY, cursor, end_ms, count: BATCH_SIZE) + break if entries.empty? + + entries.each do |id, fields| + next unless fields["event"] == "in" && fields["owner"] == owner + + matched += 1 + ts = fields["timestamp"] || "unknown" + body_preview = (fields["body"] || "")[0, 80] + + if dry_run + puts "[dry-run] #{id} from=#{fields['from']} ts=#{ts} body=#{body_preview}" + next + end + + begin + req = Net::HTTP::Post.new(uri) + req["Content-Type"] = "application/json" + req["X-JMP-Resend-Of"] = id + req.body = build_payload(id, fields) + resp = http.request(req) + + if resp.code == "200" + puts "resent #{id} from=#{fields['from']} ts=#{ts} body=#{body_preview}" + resent += 1 + consecutive_failures = 0 + else + warn "FAILED (HTTP #{resp.code}) #{id} from=#{fields['from']} ts=#{ts}" + failed += 1 + consecutive_failures += 1 + end + rescue StandardError => e + warn "ERROR #{id} #{e.class}: #{e.message}" + failed += 1 + consecutive_failures += 1 + end + + if consecutive_failures >= max_consecutive_failures + abort "aborting: #{consecutive_failures} consecutive failures, #{max_consecutive_failures} allowed" + end + end + + last_id = entries.last[0] + ms, seq = last_id.split("-").map(&:to_i) + cursor = "#{ms}-#{seq + 1}" + break if entries.size < BATCH_SIZE + end +ensure + http&.finish +end + +if matched.zero? + puts "no inbound messages found for #{owner} in the given range" +elsif dry_run + puts "\n#{matched} inbound message(s) matched (dry run, nothing sent)" +else + puts "\n#{matched} matched, #{resent} resent, #{failed} failed" +end