resend-inbounds

  1#!/usr/bin/env ruby
  2# frozen_string_literal: true
  3
  4# Resend inbound messages from the Redis stream to a customer's JID by replaying
  5# them through the webhook handler.
  6#
  7# Usage:
  8#   resend-inbounds --start TIMESTAMP [--end TIMESTAMP] [targeting] [options]
  9#
 10# Targeting (at least one required):
 11#   --number TEL     Customer phone number (repeatable)
 12#   --file PATH      File with newline-separated phone numbers
 13#   --all            Resend to ALL customers (requires confirmation or --force)
 14#
 15# TIMESTAMP is anything GNU date -d accepts:
 16#   - ISO 8601:  2025-06-01T00:00:00Z
 17#   - Relative:  "3 hours ago"
 18#   - Date only: 2025-06-01
 19#   - Unix:      @1717200000
 20#
 21# TEL is the JMP customer's phone number, with or without +1 prefix. 5551234567
 22# and +15551234567 are equivalent.
 23#
 24# Options:
 25#   --dry-run  Print matching messages without POSTing.
 26#   --force    Skip interactive confirmation for --all.
 27#
 28# Environment:
 29#   REDIS_URL    Redis connection URL   (default: redis://127.0.0.1:6379/0)
 30#   STREAM_KEY   Stream key name        (default: messages)
 31#   BATCH_SIZE   Entries per XRANGE     (default: 100)
 32#   WEBHOOK_URL  Webhook endpoint       (default: http://127.0.0.1:50209)
 33#
 34# Requires: GNU coreutils date (for relative timestamp parsing)
 35#
 36# Examples:
 37#   resend-inbounds --start "2025-06-01" --end "2025-06-02" --number +15551234567
 38#   resend-inbounds --start "3 hours ago" --number 5551234567 --dry-run
 39#   resend-inbounds --start "3 hours ago" --number 5551234567 --number 5559876543
 40#   resend-inbounds --start "2025-06-01" --file numbers.txt
 41#   resend-inbounds --start "2025-06-01" --all --force
 42
 43require "json"
 44require "net/http"
 45require "optparse"
 46require "redis"
 47require "set"
 48require "shellwords"
 49require "time"
 50
 51REDIS_URL   = ENV.fetch("REDIS_URL",   "redis://127.0.0.1:6379/0")
 52STREAM_KEY  = ENV.fetch("STREAM_KEY",  "messages")
 53BATCH_SIZE  = ENV.fetch("BATCH_SIZE",  "100").to_i
 54WEBHOOK_URL = ENV.fetch("WEBHOOK_URL", "http://127.0.0.1:50209")
 55
 56def to_ms(str)
 57	# Try Ruby's Time.parse first, fall back to GNU date -d for relative
 58	# timestamps like "3 hours ago".
 59	epoch = begin
 60		Time.parse(str).to_i
 61	rescue ArgumentError
 62		out = `date -d #{str.shellescape} +%s 2>/dev/null`.chomp
 63		abort "error: cannot parse timestamp: #{str}" unless $?.success?
 64		out.to_i
 65	end
 66	epoch * 1000
 67end
 68
 69def normalise_tel(number, location: nil)
 70	case number
 71	when /\A\+1\d{10}\z/ then number
 72	when /\A\d{10}\z/ then "+1#{number}"
 73	else
 74		context = location ? " (#{location})" : ""
 75		abort "error: invalid phone number: #{number}#{context}"
 76	end
 77end
 78
 79def build_payload(stream_id, fields)
 80	to = JSON.parse(fields["to"]) rescue [fields["owner"]]
 81	media_parsed = JSON.parse(fields["media_urls"]) rescue []
 82	media = fields["media_urls"] ? media_parsed : []
 83
 84	JSON.dump([{
 85		"type" => "message-received",
 86		"message" => {
 87			"id" => fields["bandwidth_id"],
 88			"direction" => "in",
 89			"owner" => fields["owner"],
 90			"from" => fields["from"],
 91			"to" => to,
 92			"time" => fields["timestamp"],
 93			"text" => fields["body"].to_s,
 94			"media" => media
 95		}
 96	}])
 97end
 98
 99def load_numbers_file(path)
100	numbers = Set.new
101	File.readlines(path).each_with_index do |line, idx|
102		line = line.strip
103		next if line.empty? || line.start_with?("#")
104		numbers << normalise_tel(line, location: "line #{idx + 1}")
105	end
106	abort "error: no phone numbers found in #{path}" if numbers.empty?
107	numbers
108rescue Errno::ENOENT
109	abort "error: file not found: #{path}"
110rescue Errno::EACCES
111	abort "error: permission denied: #{path}"
112end
113
114start_ms = nil
115end_ms = "+"
116owners = Set.new
117all_customers = false
118dry_run = false
119force = false
120
121OptionParser.new do |opts|
122	opts.banner = "Usage: resend-inbounds --start TIMESTAMP [targeting] [options]"
123	opts.on("--start TIMESTAMP", "Start of time range (required)") { |v| start_ms = to_ms(v).to_s }
124	opts.on("--end TIMESTAMP", "End of time range (default: now)") { |v| end_ms = to_ms(v).to_s }
125	opts.on("--number TEL", "Customer phone number (repeatable)") { |v| owners << normalise_tel(v) }
126	opts.on("--file PATH", "File with newline-separated phone numbers") { |v| owners.merge(load_numbers_file(v)) }
127	opts.on("--all", "Resend to ALL customers") { all_customers = true }
128	opts.on("--dry-run", "Print matches without POSTing") { dry_run = true }
129	opts.on("--force", "Skip interactive confirmation for --all") { force = true }
130end.parse!
131
132abort "error: --start is required" unless start_ms
133if all_customers && !owners.empty?
134	abort "error: --all cannot be combined with --number or --file"
135end
136if !all_customers && owners.empty?
137	abort "error: provide --number, --file, or --all"
138end
139
140if all_customers && !force
141	action = dry_run ? "scan" : "resend"
142	$stderr.puts "WARNING: This will #{action} ALL inbound messages in the given range."
143	$stderr.puts "  stream: #{STREAM_KEY}"
144	$stderr.puts "  start:  #{start_ms}"
145	$stderr.puts "  end:    #{end_ms == '+' ? 'now' : end_ms}"
146	$stderr.puts "  dry_run: #{dry_run}"
147	$stderr.puts ""
148	abort "error: refusing --all in non-interactive mode (use --force)" unless $stdin.tty?
149	$stderr.print "Enter [y]es to continue, anything else aborts: "
150	confirmation = $stdin.gets&.strip
151	abort "aborted" unless confirmation == "yes" || confirmation == "y"
152end
153
154redis = Redis.new(url: REDIS_URL)
155uri = URI(WEBHOOK_URL) unless dry_run
156cursor = start_ms
157matched = 0
158resent = 0
159failed = 0
160consecutive_failures = 0
161max_consecutive_failures = 5
162matched_owners = Set.new # Track which explicit owners had matches
163
164http = unless dry_run
165	h = Net::HTTP.new(uri.host, uri.port)
166	h.use_ssl = uri.scheme == "https"
167	h.start
168end
169
170begin
171	loop do
172		entries = redis.xrange(STREAM_KEY, cursor, end_ms, count: BATCH_SIZE)
173		break if entries.empty?
174
175		entries.each do |id, fields|
176			next unless %w[in thru].include?(fields["event"])
177			next unless all_customers || owners.include?(fields["owner"])
178
179			matched += 1
180			matched_owners << fields["owner"] unless all_customers
181			ts = fields["timestamp"] || "unknown"
182			body_preview = (fields["body"] || "")[0, 80]
183
184			if dry_run
185				puts "[dry-run] #{id}  from=#{fields['from']}  ts=#{ts}  body=#{body_preview}"
186				next
187			end
188
189			begin
190				req = Net::HTTP::Post.new(uri)
191				req["Content-Type"] = "application/json"
192				req["X-JMP-Resend-Of"] = id
193				req.body = build_payload(id, fields)
194				resp = http.request(req)
195
196				if resp.code == "200"
197					puts "resent #{id}  from=#{fields['from']}  ts=#{ts}  body=#{body_preview}"
198					resent += 1
199					consecutive_failures = 0
200				else
201					warn "FAILED (HTTP #{resp.code}) #{id}  from=#{fields['from']}  ts=#{ts}"
202					failed += 1
203					consecutive_failures += 1
204				end
205			rescue StandardError => e
206				warn "ERROR #{id}  #{e.class}: #{e.message}"
207				failed += 1
208				consecutive_failures += 1
209			end
210
211			if consecutive_failures >= max_consecutive_failures
212				abort "aborting: #{consecutive_failures} consecutive failures, #{max_consecutive_failures} allowed"
213			end
214		end
215
216		last_id = entries.last[0]
217		ms, seq = last_id.split("-").map(&:to_i)
218		cursor = "#{ms}-#{seq + 1}"
219		break if entries.size < BATCH_SIZE
220	end
221ensure
222	http&.finish
223end
224
225target_desc = if all_customers
226	"all customers"
227elsif owners.size == 1
228	owners.first
229else
230	"#{owners.size} numbers"
231end
232
233if matched.zero?
234	puts "no inbound messages found for #{target_desc} in the given range"
235else
236	if dry_run
237		puts "\n#{matched} inbound message(s) matched (dry run, nothing sent)"
238	else
239		puts "\n#{matched} matched, #{resent} resent, #{failed} failed"
240	end
241	no_matches = owners - matched_owners
242	puts "numbers with no matches: #{no_matches.sort.join(', ')}" if no_matches.any?
243end