Add resend-inbounds script for replaying messages from Redis

Amolith created

Change summary

bin/resend-inbounds | 176 +++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 176 insertions(+)

Detailed changes

bin/resend-inbounds 🔗

@@ -0,0 +1,176 @@
+#!/usr/bin/env ruby
+# frozen_string_literal: true
+
+# Resend inbound messages from the Redis stream to a customer's JID by replaying
+# them through the webhook handler.
+#
+# Usage:
+#   resend-inbounds --start TIMESTAMP --end TIMESTAMP --number TEL [--dry-run]
+#
+# TIMESTAMP is anything GNU date -d accepts:
+#   - ISO 8601:  2025-06-01T00:00:00Z
+#   - Relative:  "3 hours ago"
+#   - Date only: 2025-06-01
+#   - Unix:      @1717200000
+#
+# TEL is the JMP customer's phone number, with or without +1 prefix. 5551234567
+# and +15551234567 are equivalent.
+#
+# --dry-run  prints matching messages without POSTing.
+#
+# Environment:
+#   REDIS_URL    Redis connection URL   (default: redis://127.0.0.1:6379/0)
+#   STREAM_KEY   Stream key name        (default: messages)
+#   BATCH_SIZE   Entries per XRANGE     (default: 100)
+#   WEBHOOK_URL  Webhook endpoint       (default: http://127.0.0.1:50209)
+#
+# Requires: GNU coreutils date (for relative timestamp parsing)
+#
+# Examples:
+#   resend-inbounds --start "2025-06-01" --end "2025-06-02" --number +15551234567
+#   resend-inbounds --start "3 hours ago" --number 5551234567 --dry-run
+
+require "json"
+require "net/http"
+require "optparse"
+require "redis"
+require "shellwords"
+require "time"
+
+REDIS_URL   = ENV.fetch("REDIS_URL",   "redis://127.0.0.1:6379/0")
+STREAM_KEY  = ENV.fetch("STREAM_KEY",  "messages")
+BATCH_SIZE  = ENV.fetch("BATCH_SIZE",  "100").to_i
+WEBHOOK_URL = ENV.fetch("WEBHOOK_URL", "http://127.0.0.1:50209")
+
+def to_ms(str)
+	# Try Ruby's Time.parse first, fall back to GNU date -d for relative
+	# timestamps like "3 hours ago".
+	epoch = begin
+		Time.parse(str).to_i
+	rescue ArgumentError
+		out = `date -d #{str.shellescape} +%s 2>/dev/null`.chomp
+		abort "error: cannot parse timestamp: #{str}" unless $?.success?
+		out.to_i
+	end
+	epoch * 1000
+end
+
+def normalise_tel(number)
+	case number
+	when /\A\+1\d{10}\z/ then number
+	when /\A\d{10}\z/ then "+1#{number}"
+	else abort "error: invalid phone number: #{number}"
+	end
+end
+
+def build_payload(stream_id, fields)
+	to = JSON.parse(fields["to"]) rescue [fields["owner"]]
+	media_parsed = JSON.parse(fields["media_urls"]) rescue []
+	media = fields["media_urls"] ? media_parsed : []
+
+	JSON.dump([{
+		"type" => "message-received",
+		"message" => {
+			"id" => fields["bandwidth_id"],
+			"direction" => "in",
+			"owner" => fields["owner"],
+			"from" => fields["from"],
+			"to" => to,
+			"time" => fields["timestamp"],
+			"text" => fields["body"].to_s,
+			"media" => media
+		}
+	}])
+end
+
+start_ms = nil
+end_ms = "+"
+owner = nil
+dry_run = false
+
+OptionParser.new do |opts|
+	opts.banner = "Usage: resend-inbounds --start TIMESTAMP --number TEL [options]"
+	opts.on("--start TIMESTAMP", "Start of time range (required)") { |v| start_ms = to_ms(v).to_s }
+	opts.on("--end TIMESTAMP", "End of time range (default: now)") { |v| end_ms = to_ms(v).to_s }
+	opts.on("--number TEL", "Customer phone number (required)") { |v| owner = normalise_tel(v) }
+	opts.on("--dry-run", "Print matches without POSTing") { dry_run = true }
+end.parse!
+
+abort "error: --start is required" unless start_ms
+abort "error: --number is required" unless owner
+
+redis = Redis.new(url: REDIS_URL)
+uri = URI(WEBHOOK_URL) unless dry_run
+cursor = start_ms
+matched = 0
+resent = 0
+failed = 0
+consecutive_failures = 0
+max_consecutive_failures = 5
+
+http = unless dry_run
+	h = Net::HTTP.new(uri.host, uri.port)
+	h.use_ssl = uri.scheme == "https"
+	h.start
+end
+
+begin
+	loop do
+		entries = redis.xrange(STREAM_KEY, cursor, end_ms, count: BATCH_SIZE)
+		break if entries.empty?
+
+		entries.each do |id, fields|
+			next unless fields["event"] == "in" && fields["owner"] == owner
+
+			matched += 1
+			ts = fields["timestamp"] || "unknown"
+			body_preview = (fields["body"] || "")[0, 80]
+
+			if dry_run
+				puts "[dry-run] #{id}  from=#{fields['from']}  ts=#{ts}  body=#{body_preview}"
+				next
+			end
+
+			begin
+				req = Net::HTTP::Post.new(uri)
+				req["Content-Type"] = "application/json"
+				req["X-JMP-Resend-Of"] = id
+				req.body = build_payload(id, fields)
+				resp = http.request(req)
+
+				if resp.code == "200"
+					puts "resent #{id}  from=#{fields['from']}  ts=#{ts}  body=#{body_preview}"
+					resent += 1
+					consecutive_failures = 0
+				else
+					warn "FAILED (HTTP #{resp.code}) #{id}  from=#{fields['from']}  ts=#{ts}"
+					failed += 1
+					consecutive_failures += 1
+				end
+			rescue StandardError => e
+				warn "ERROR #{id}  #{e.class}: #{e.message}"
+				failed += 1
+				consecutive_failures += 1
+			end
+
+			if consecutive_failures >= max_consecutive_failures
+				abort "aborting: #{consecutive_failures} consecutive failures, #{max_consecutive_failures} allowed"
+			end
+		end
+
+		last_id = entries.last[0]
+		ms, seq = last_id.split("-").map(&:to_i)
+		cursor = "#{ms}-#{seq + 1}"
+		break if entries.size < BATCH_SIZE
+	end
+ensure
+	http&.finish
+end
+
+if matched.zero?
+	puts "no inbound messages found for #{owner} in the given range"
+elsif dry_run
+	puts "\n#{matched} inbound message(s) matched (dry run, nothing sent)"
+else
+	puts "\n#{matched} matched, #{resent} resent, #{failed} failed"
+end