1#!/usr/bin/env ruby
2# frozen_string_literal: true
3
4# Resend inbound messages from the Redis stream to a customer's JID by replaying
5# them through the webhook handler.
6#
7# Usage:
8# resend-inbounds --start TIMESTAMP [--end TIMESTAMP] [targeting] [options]
9#
10# Targeting (at least one required):
11# --number TEL Customer phone number (repeatable)
12# --file PATH File with newline-separated phone numbers
13# --all Resend to ALL customers (requires confirmation or --force)
14#
15# TIMESTAMP is anything GNU date -d accepts:
16# - ISO 8601: 2025-06-01T00:00:00Z
17# - Relative: "3 hours ago"
18# - Date only: 2025-06-01
19# - Unix: @1717200000
20#
21# TEL is the JMP customer's phone number, with or without +1 prefix. 5551234567
22# and +15551234567 are equivalent.
23#
24# Options:
25# --dry-run Print matching messages without POSTing.
26# --force Skip interactive confirmation for --all.
27#
28# Environment:
29# REDIS_URL Redis connection URL (default: redis://127.0.0.1:6379/0)
30# STREAM_KEY Stream key name (default: messages)
31# BATCH_SIZE Entries per XRANGE (default: 100)
32# WEBHOOK_URL Webhook endpoint (default: http://127.0.0.1:50209)
33#
34# Requires: GNU coreutils date (for relative timestamp parsing)
35#
36# Examples:
37# resend-inbounds --start "2025-06-01" --end "2025-06-02" --number +15551234567
38# resend-inbounds --start "3 hours ago" --number 5551234567 --dry-run
39# resend-inbounds --start "3 hours ago" --number 5551234567 --number 5559876543
40# resend-inbounds --start "2025-06-01" --file numbers.txt
41# resend-inbounds --start "2025-06-01" --all --force
42
43require "json"
44require "net/http"
45require "optparse"
46require "redis"
47require "set"
48require "shellwords"
49require "time"
50
51REDIS_URL = ENV.fetch("REDIS_URL", "redis://127.0.0.1:6379/0")
52STREAM_KEY = ENV.fetch("STREAM_KEY", "messages")
53BATCH_SIZE = ENV.fetch("BATCH_SIZE", "100").to_i
54WEBHOOK_URL = ENV.fetch("WEBHOOK_URL", "http://127.0.0.1:50209")
55
56def to_ms(str)
57 # Try Ruby's Time.parse first, fall back to GNU date -d for relative
58 # timestamps like "3 hours ago".
59 epoch = begin
60 Time.parse(str).to_i
61 rescue ArgumentError
62 out = `date -d #{str.shellescape} +%s 2>/dev/null`.chomp
63 abort "error: cannot parse timestamp: #{str}" unless $?.success?
64 out.to_i
65 end
66 epoch * 1000
67end
68
69def normalise_tel(number, location: nil)
70 case number
71 when /\A\+1\d{10}\z/ then number
72 when /\A\d{10}\z/ then "+1#{number}"
73 else
74 context = location ? " (#{location})" : ""
75 abort "error: invalid phone number: #{number}#{context}"
76 end
77end
78
79def build_payload(stream_id, fields)
80 to = JSON.parse(fields["to"]) rescue [fields["owner"]]
81 media_parsed = JSON.parse(fields["media_urls"]) rescue []
82 media = fields["media_urls"] ? media_parsed : []
83
84 JSON.dump([{
85 "type" => "message-received",
86 "message" => {
87 "id" => fields["bandwidth_id"],
88 "direction" => "in",
89 "owner" => fields["owner"],
90 "from" => fields["from"],
91 "to" => to,
92 "time" => fields["timestamp"],
93 "text" => fields["body"].to_s,
94 "media" => media
95 }
96 }])
97end
98
99def load_numbers_file(path)
100 numbers = Set.new
101 File.readlines(path).each_with_index do |line, idx|
102 line = line.strip
103 next if line.empty? || line.start_with?("#")
104 numbers << normalise_tel(line, location: "line #{idx + 1}")
105 end
106 abort "error: no phone numbers found in #{path}" if numbers.empty?
107 numbers
108rescue Errno::ENOENT
109 abort "error: file not found: #{path}"
110rescue Errno::EACCES
111 abort "error: permission denied: #{path}"
112end
113
114start_ms = nil
115end_ms = "+"
116owners = Set.new
117all_customers = false
118dry_run = false
119force = false
120
121OptionParser.new do |opts|
122 opts.banner = "Usage: resend-inbounds --start TIMESTAMP [targeting] [options]"
123 opts.on("--start TIMESTAMP", "Start of time range (required)") { |v| start_ms = to_ms(v).to_s }
124 opts.on("--end TIMESTAMP", "End of time range (default: now)") { |v| end_ms = to_ms(v).to_s }
125 opts.on("--number TEL", "Customer phone number (repeatable)") { |v| owners << normalise_tel(v) }
126 opts.on("--file PATH", "File with newline-separated phone numbers") { |v| owners.merge(load_numbers_file(v)) }
127 opts.on("--all", "Resend to ALL customers") { all_customers = true }
128 opts.on("--dry-run", "Print matches without POSTing") { dry_run = true }
129 opts.on("--force", "Skip interactive confirmation for --all") { force = true }
130end.parse!
131
132abort "error: --start is required" unless start_ms
133if all_customers && !owners.empty?
134 abort "error: --all cannot be combined with --number or --file"
135end
136if !all_customers && owners.empty?
137 abort "error: provide --number, --file, or --all"
138end
139
140if all_customers && !force
141 action = dry_run ? "scan" : "resend"
142 $stderr.puts "WARNING: This will #{action} ALL inbound messages in the given range."
143 $stderr.puts " stream: #{STREAM_KEY}"
144 $stderr.puts " start: #{start_ms}"
145 $stderr.puts " end: #{end_ms == '+' ? 'now' : end_ms}"
146 $stderr.puts " dry_run: #{dry_run}"
147 $stderr.puts ""
148 abort "error: refusing --all in non-interactive mode (use --force)" unless $stdin.tty?
149 $stderr.print "Enter [y]es to continue, anything else aborts: "
150 confirmation = $stdin.gets&.strip
151 abort "aborted" unless confirmation == "yes" || confirmation == "y"
152end
153
154redis = Redis.new(url: REDIS_URL)
155uri = URI(WEBHOOK_URL) unless dry_run
156cursor = start_ms
157matched = 0
158resent = 0
159failed = 0
160consecutive_failures = 0
161max_consecutive_failures = 5
162matched_owners = Set.new # Track which explicit owners had matches
163
164http = unless dry_run
165 h = Net::HTTP.new(uri.host, uri.port)
166 h.use_ssl = uri.scheme == "https"
167 h.start
168end
169
170begin
171 loop do
172 entries = redis.xrange(STREAM_KEY, cursor, end_ms, count: BATCH_SIZE)
173 break if entries.empty?
174
175 entries.each do |id, fields|
176 next unless %w[in thru].include?(fields["event"])
177 next unless all_customers || owners.include?(fields["owner"])
178
179 matched += 1
180 matched_owners << fields["owner"] unless all_customers
181 ts = fields["timestamp"] || "unknown"
182 body_preview = (fields["body"] || "")[0, 80]
183
184 if dry_run
185 puts "[dry-run] #{id} from=#{fields['from']} ts=#{ts} body=#{body_preview}"
186 next
187 end
188
189 begin
190 req = Net::HTTP::Post.new(uri)
191 req["Content-Type"] = "application/json"
192 req["X-JMP-Resend-Of"] = id
193 req.body = build_payload(id, fields)
194 resp = http.request(req)
195
196 if resp.code == "200"
197 puts "resent #{id} from=#{fields['from']} ts=#{ts} body=#{body_preview}"
198 resent += 1
199 consecutive_failures = 0
200 else
201 warn "FAILED (HTTP #{resp.code}) #{id} from=#{fields['from']} ts=#{ts}"
202 failed += 1
203 consecutive_failures += 1
204 end
205 rescue StandardError => e
206 warn "ERROR #{id} #{e.class}: #{e.message}"
207 failed += 1
208 consecutive_failures += 1
209 end
210
211 if consecutive_failures >= max_consecutive_failures
212 abort "aborting: #{consecutive_failures} consecutive failures, #{max_consecutive_failures} allowed"
213 end
214 end
215
216 last_id = entries.last[0]
217 ms, seq = last_id.split("-").map(&:to_i)
218 cursor = "#{ms}-#{seq + 1}"
219 break if entries.size < BATCH_SIZE
220 end
221ensure
222 http&.finish
223end
224
225target_desc = if all_customers
226 "all customers"
227elsif owners.size == 1
228 owners.first
229else
230 "#{owners.size} numbers"
231end
232
233if matched.zero?
234 puts "no inbound messages found for #{target_desc} in the given range"
235else
236 if dry_run
237 puts "\n#{matched} inbound message(s) matched (dry run, nothing sent)"
238 else
239 puts "\n#{matched} matched, #{resent} resent, #{failed} failed"
240 end
241 no_matches = owners - matched_owners
242 puts "numbers with no matches: #{no_matches.sort.join(', ')}" if no_matches.any?
243end