resend-inbounds

  1#!/usr/bin/env ruby
  2# frozen_string_literal: true
  3
  4# Resend inbound messages from the Redis stream to a customer's JID by replaying
  5# them through the webhook handler.
  6#
  7# Usage:
  8#   resend-inbounds --start TIMESTAMP --end TIMESTAMP --number TEL [--dry-run]
  9#
 10# TIMESTAMP is anything GNU date -d accepts:
 11#   - ISO 8601:  2025-06-01T00:00:00Z
 12#   - Relative:  "3 hours ago"
 13#   - Date only: 2025-06-01
 14#   - Unix:      @1717200000
 15#
 16# TEL is the JMP customer's phone number, with or without +1 prefix. 5551234567
 17# and +15551234567 are equivalent.
 18#
 19# --dry-run  prints matching messages without POSTing.
 20#
 21# Environment:
 22#   REDIS_URL    Redis connection URL   (default: redis://127.0.0.1:6379/0)
 23#   STREAM_KEY   Stream key name        (default: messages)
 24#   BATCH_SIZE   Entries per XRANGE     (default: 100)
 25#   WEBHOOK_URL  Webhook endpoint       (default: http://127.0.0.1:50209)
 26#
 27# Requires: GNU coreutils date (for relative timestamp parsing)
 28#
 29# Examples:
 30#   resend-inbounds --start "2025-06-01" --end "2025-06-02" --number +15551234567
 31#   resend-inbounds --start "3 hours ago" --number 5551234567 --dry-run
 32
 33require "json"
 34require "net/http"
 35require "optparse"
 36require "redis"
 37require "shellwords"
 38require "time"
 39
 40REDIS_URL   = ENV.fetch("REDIS_URL",   "redis://127.0.0.1:6379/0")
 41STREAM_KEY  = ENV.fetch("STREAM_KEY",  "messages")
 42BATCH_SIZE  = ENV.fetch("BATCH_SIZE",  "100").to_i
 43WEBHOOK_URL = ENV.fetch("WEBHOOK_URL", "http://127.0.0.1:50209")
 44
 45def to_ms(str)
 46	# Try Ruby's Time.parse first, fall back to GNU date -d for relative
 47	# timestamps like "3 hours ago".
 48	epoch = begin
 49		Time.parse(str).to_i
 50	rescue ArgumentError
 51		out = `date -d #{str.shellescape} +%s 2>/dev/null`.chomp
 52		abort "error: cannot parse timestamp: #{str}" unless $?.success?
 53		out.to_i
 54	end
 55	epoch * 1000
 56end
 57
 58def normalise_tel(number)
 59	case number
 60	when /\A\+1\d{10}\z/ then number
 61	when /\A\d{10}\z/ then "+1#{number}"
 62	else abort "error: invalid phone number: #{number}"
 63	end
 64end
 65
 66def build_payload(stream_id, fields)
 67	to = JSON.parse(fields["to"]) rescue [fields["owner"]]
 68	media_parsed = JSON.parse(fields["media_urls"]) rescue []
 69	media = fields["media_urls"] ? media_parsed : []
 70
 71	JSON.dump([{
 72		"type" => "message-received",
 73		"message" => {
 74			"id" => fields["bandwidth_id"],
 75			"direction" => "in",
 76			"owner" => fields["owner"],
 77			"from" => fields["from"],
 78			"to" => to,
 79			"time" => fields["timestamp"],
 80			"text" => fields["body"].to_s,
 81			"media" => media
 82		}
 83	}])
 84end
 85
 86start_ms = nil
 87end_ms = "+"
 88owner = nil
 89dry_run = false
 90
 91OptionParser.new do |opts|
 92	opts.banner = "Usage: resend-inbounds --start TIMESTAMP --number TEL [options]"
 93	opts.on("--start TIMESTAMP", "Start of time range (required)") { |v| start_ms = to_ms(v).to_s }
 94	opts.on("--end TIMESTAMP", "End of time range (default: now)") { |v| end_ms = to_ms(v).to_s }
 95	opts.on("--number TEL", "Customer phone number (required)") { |v| owner = normalise_tel(v) }
 96	opts.on("--dry-run", "Print matches without POSTing") { dry_run = true }
 97end.parse!
 98
 99abort "error: --start is required" unless start_ms
100abort "error: --number is required" unless owner
101
102redis = Redis.new(url: REDIS_URL)
103uri = URI(WEBHOOK_URL) unless dry_run
104cursor = start_ms
105matched = 0
106resent = 0
107failed = 0
108consecutive_failures = 0
109max_consecutive_failures = 5
110
111http = unless dry_run
112	h = Net::HTTP.new(uri.host, uri.port)
113	h.use_ssl = uri.scheme == "https"
114	h.start
115end
116
117begin
118	loop do
119		entries = redis.xrange(STREAM_KEY, cursor, end_ms, count: BATCH_SIZE)
120		break if entries.empty?
121
122		entries.each do |id, fields|
123			next unless fields["event"] == "in" && fields["owner"] == owner
124
125			matched += 1
126			ts = fields["timestamp"] || "unknown"
127			body_preview = (fields["body"] || "")[0, 80]
128
129			if dry_run
130				puts "[dry-run] #{id}  from=#{fields['from']}  ts=#{ts}  body=#{body_preview}"
131				next
132			end
133
134			begin
135				req = Net::HTTP::Post.new(uri)
136				req["Content-Type"] = "application/json"
137				req["X-JMP-Resend-Of"] = id
138				req.body = build_payload(id, fields)
139				resp = http.request(req)
140
141				if resp.code == "200"
142					puts "resent #{id}  from=#{fields['from']}  ts=#{ts}  body=#{body_preview}"
143					resent += 1
144					consecutive_failures = 0
145				else
146					warn "FAILED (HTTP #{resp.code}) #{id}  from=#{fields['from']}  ts=#{ts}"
147					failed += 1
148					consecutive_failures += 1
149				end
150			rescue StandardError => e
151				warn "ERROR #{id}  #{e.class}: #{e.message}"
152				failed += 1
153				consecutive_failures += 1
154			end
155
156			if consecutive_failures >= max_consecutive_failures
157				abort "aborting: #{consecutive_failures} consecutive failures, #{max_consecutive_failures} allowed"
158			end
159		end
160
161		last_id = entries.last[0]
162		ms, seq = last_id.split("-").map(&:to_i)
163		cursor = "#{ms}-#{seq + 1}"
164		break if entries.size < BATCH_SIZE
165	end
166ensure
167	http&.finish
168end
169
170if matched.zero?
171	puts "no inbound messages found for #{owner} in the given range"
172elsif dry_run
173	puts "\n#{matched} inbound message(s) matched (dry run, nothing sent)"
174else
175	puts "\n#{matched} matched, #{resent} resent, #{failed} failed"
176end