1#!/usr/bin/env ruby
2# frozen_string_literal: true
3
4# Resend inbound messages from the Redis stream to a customer's JID by replaying
5# them through the webhook handler.
6#
7# Usage:
8# resend-inbounds --start TIMESTAMP --end TIMESTAMP --number TEL [--dry-run]
9#
10# TIMESTAMP is anything GNU date -d accepts:
11# - ISO 8601: 2025-06-01T00:00:00Z
12# - Relative: "3 hours ago"
13# - Date only: 2025-06-01
14# - Unix: @1717200000
15#
16# TEL is the JMP customer's phone number, with or without +1 prefix. 5551234567
17# and +15551234567 are equivalent.
18#
19# --dry-run prints matching messages without POSTing.
20#
21# Environment:
22# REDIS_URL Redis connection URL (default: redis://127.0.0.1:6379/0)
23# STREAM_KEY Stream key name (default: messages)
24# BATCH_SIZE Entries per XRANGE (default: 100)
25# WEBHOOK_URL Webhook endpoint (default: http://127.0.0.1:50209)
26#
27# Requires: GNU coreutils date (for relative timestamp parsing)
28#
29# Examples:
30# resend-inbounds --start "2025-06-01" --end "2025-06-02" --number +15551234567
31# resend-inbounds --start "3 hours ago" --number 5551234567 --dry-run
32
33require "json"
34require "net/http"
35require "optparse"
36require "redis"
37require "shellwords"
38require "time"
39
40REDIS_URL = ENV.fetch("REDIS_URL", "redis://127.0.0.1:6379/0")
41STREAM_KEY = ENV.fetch("STREAM_KEY", "messages")
42BATCH_SIZE = ENV.fetch("BATCH_SIZE", "100").to_i
43WEBHOOK_URL = ENV.fetch("WEBHOOK_URL", "http://127.0.0.1:50209")
44
45def to_ms(str)
46 # Try Ruby's Time.parse first, fall back to GNU date -d for relative
47 # timestamps like "3 hours ago".
48 epoch = begin
49 Time.parse(str).to_i
50 rescue ArgumentError
51 out = `date -d #{str.shellescape} +%s 2>/dev/null`.chomp
52 abort "error: cannot parse timestamp: #{str}" unless $?.success?
53 out.to_i
54 end
55 epoch * 1000
56end
57
58def normalise_tel(number)
59 case number
60 when /\A\+1\d{10}\z/ then number
61 when /\A\d{10}\z/ then "+1#{number}"
62 else abort "error: invalid phone number: #{number}"
63 end
64end
65
66def build_payload(stream_id, fields)
67 to = JSON.parse(fields["to"]) rescue [fields["owner"]]
68 media_parsed = JSON.parse(fields["media_urls"]) rescue []
69 media = fields["media_urls"] ? media_parsed : []
70
71 JSON.dump([{
72 "type" => "message-received",
73 "message" => {
74 "id" => fields["bandwidth_id"],
75 "direction" => "in",
76 "owner" => fields["owner"],
77 "from" => fields["from"],
78 "to" => to,
79 "time" => fields["timestamp"],
80 "text" => fields["body"].to_s,
81 "media" => media
82 }
83 }])
84end
85
86start_ms = nil
87end_ms = "+"
88owner = nil
89dry_run = false
90
91OptionParser.new do |opts|
92 opts.banner = "Usage: resend-inbounds --start TIMESTAMP --number TEL [options]"
93 opts.on("--start TIMESTAMP", "Start of time range (required)") { |v| start_ms = to_ms(v).to_s }
94 opts.on("--end TIMESTAMP", "End of time range (default: now)") { |v| end_ms = to_ms(v).to_s }
95 opts.on("--number TEL", "Customer phone number (required)") { |v| owner = normalise_tel(v) }
96 opts.on("--dry-run", "Print matches without POSTing") { dry_run = true }
97end.parse!
98
99abort "error: --start is required" unless start_ms
100abort "error: --number is required" unless owner
101
102redis = Redis.new(url: REDIS_URL)
103uri = URI(WEBHOOK_URL) unless dry_run
104cursor = start_ms
105matched = 0
106resent = 0
107failed = 0
108consecutive_failures = 0
109max_consecutive_failures = 5
110
111http = unless dry_run
112 h = Net::HTTP.new(uri.host, uri.port)
113 h.use_ssl = uri.scheme == "https"
114 h.start
115end
116
117begin
118 loop do
119 entries = redis.xrange(STREAM_KEY, cursor, end_ms, count: BATCH_SIZE)
120 break if entries.empty?
121
122 entries.each do |id, fields|
123 next unless fields["event"] == "in" && fields["owner"] == owner
124
125 matched += 1
126 ts = fields["timestamp"] || "unknown"
127 body_preview = (fields["body"] || "")[0, 80]
128
129 if dry_run
130 puts "[dry-run] #{id} from=#{fields['from']} ts=#{ts} body=#{body_preview}"
131 next
132 end
133
134 begin
135 req = Net::HTTP::Post.new(uri)
136 req["Content-Type"] = "application/json"
137 req["X-JMP-Resend-Of"] = id
138 req.body = build_payload(id, fields)
139 resp = http.request(req)
140
141 if resp.code == "200"
142 puts "resent #{id} from=#{fields['from']} ts=#{ts} body=#{body_preview}"
143 resent += 1
144 consecutive_failures = 0
145 else
146 warn "FAILED (HTTP #{resp.code}) #{id} from=#{fields['from']} ts=#{ts}"
147 failed += 1
148 consecutive_failures += 1
149 end
150 rescue StandardError => e
151 warn "ERROR #{id} #{e.class}: #{e.message}"
152 failed += 1
153 consecutive_failures += 1
154 end
155
156 if consecutive_failures >= max_consecutive_failures
157 abort "aborting: #{consecutive_failures} consecutive failures, #{max_consecutive_failures} allowed"
158 end
159 end
160
161 last_id = entries.last[0]
162 ms, seq = last_id.split("-").map(&:to_i)
163 cursor = "#{ms}-#{seq + 1}"
164 break if entries.size < BATCH_SIZE
165 end
166ensure
167 http&.finish
168end
169
170if matched.zero?
171 puts "no inbound messages found for #{owner} in the given range"
172elsif dry_run
173 puts "\n#{matched} inbound message(s) matched (dry run, nothing sent)"
174else
175 puts "\n#{matched} matched, #{resent} resent, #{failed} failed"
176end