Emit ResendIn event when replaying inbound messages

Amolith created

Add a ResendIn message event type that records the original stream ID
and bandwidth ID without duplicating body or media fields. The webhook
handler emits this event instead of a normal In event when the
X-JMP-Resend header is present.

Change summary

lib/message_event.rb   | 26 ++++++++++++++++++++++++++
sgx-bwmsgsv2.rb        | 32 ++++++++++++++++++++------------
test/test_component.rb | 37 +++++++++++++++++++++++++++++++++++--
3 files changed, 81 insertions(+), 14 deletions(-)

Detailed changes

lib/message_event.rb 🔗

@@ -139,6 +139,32 @@ module MessageEvent
 		end
 	end
 
+	class ResendIn < Base
+		def initialize(original_stream_id:, owner:, original_bandwidth_id: nil, **kwargs)
+			raise ArgumentError, "original_stream_id must be a String" unless original_stream_id.is_a?(String)
+			raise ArgumentError, "owner must be a valid US telephone number" unless ValidTel === owner
+			if original_bandwidth_id && !original_bandwidth_id.is_a?(String)
+				raise ArgumentError, "original_bandwidth_id must be a String"
+			end
+
+			@original_stream_id = original_stream_id
+			@owner = owner
+			@original_bandwidth_id = original_bandwidth_id
+			super(event: "resend", **kwargs)
+		end
+
+		attr_reader :original_stream_id, :owner, :original_bandwidth_id
+
+		def to_redis_fields
+			fields = super.merge(
+				"original_stream_id" => @original_stream_id,
+				"owner" => @owner
+			)
+			fields["original_bandwidth_id"] = @original_bandwidth_id if @original_bandwidth_id
+			fields
+		end
+	end
+
 	class Failed < Base
 		def initialize(stanza_id:, bandwidth_id:, error_code:, error_description:, **kwargs)
 			raise ArgumentError, "stanza_id must be a String" unless stanza_id.is_a?(String)

sgx-bwmsgsv2.rb 🔗

@@ -1198,18 +1198,26 @@ class WebhookHandler < Goliath::API
 		# Emit event to messages stream
 		case [jparams['direction'], type]
 		when ['in', 'message-received']
-			media_urls = Array(jparams['media']).reject { |u|
-				u.end_with?('.smil', '.txt', '.xml')
-			}
-			MessageEvent::In.new(
-				timestamp: jparams['time'],
-				from: jparams['from'],
-				to: jparams['to'] << users_num,
-				owner: jparams['owner'],
-				bandwidth_id: jparams['id'],
-				body: jparams['text'].to_s,
-				media_urls: media_urls
-			).emit(REDIS)
+			if !env['HTTP_X_JMP_RESEND_OF'].to_s.empty?
+				MessageEvent::ResendIn.new(
+					original_stream_id: env['HTTP_X_JMP_RESEND_OF'],
+					original_bandwidth_id: jparams['id'],
+					owner: jparams['owner']
+				).emit(REDIS)
+			else
+				media_urls = Array(jparams['media']).reject { |u|
+					u.end_with?('.smil', '.txt', '.xml')
+				}
+				MessageEvent::In.new(
+					timestamp: jparams['time'],
+					from: jparams['from'],
+					to: jparams['to'] << users_num,
+					owner: jparams['owner'],
+					bandwidth_id: jparams['id'],
+					body: jparams['text'].to_s,
+					media_urls: media_urls
+				).emit(REDIS)
+			end
 		when ['out', 'message-failed']
 			tag_parts = jparams['tag'].split(/ /, 2)
 			stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])

test/test_component.rb 🔗

@@ -582,7 +582,7 @@ class ComponentTest < Minitest::Test
 	end
 	em :test_passthrough_message_emits_to_stream
 
-	def invoke_webhook(payload)
+	def invoke_webhook(payload, extra_env: {})
 		with_stubs([
 			[
 				SGXbwmsgsv2,
@@ -595,7 +595,7 @@ class ComponentTest < Minitest::Test
 				"REQUEST_URI" => "/",
 				"REQUEST_METHOD" => "POST",
 				"params" => {"_json" => [payload]}
-			}
+			}.merge(extra_env)
 			handler.instance_variable_set(:@env, env)
 			def handler.params
 				@env["params"]
@@ -730,6 +730,39 @@ class ComponentTest < Minitest::Test
 	end
 	em :test_message_failed_emits_to_stream
 
+	def test_resend_emits_resend_event_instead_of_in
+		payload = {
+			"type" => "message-received",
+			"message" => {
+				"id" => "bw-in-resend-001",
+				"direction" => "in",
+				"owner" => "+15550000000",
+				"from" => "+15551234567",
+				"to" => ["+15550000000"],
+				"time" => "2025-01-13T10:00:00Z",
+				"text" => "Resent message"
+			}
+		}
+
+		invoke_webhook(
+			payload,
+			extra_env: { "HTTP_X_JMP_RESEND_OF" => "1736762400000-0" }
+		)
+
+		entries = REDIS.stream_entries("messages").sync
+		assert_equal 1, entries.length
+
+		event = entries.first[:fields]
+		assert_equal "resend", event["event"]
+		assert_equal "bwmsgsv2", event["source"]
+		assert_equal "+15550000000", event["owner"]
+		assert_equal "1736762400000-0", event["original_stream_id"]
+		assert_equal "bw-in-resend-001", event["original_bandwidth_id"]
+		refute event.key?("from"), "Resend events should not duplicate message fields"
+		refute event.key?("body"), "Resend events should not duplicate message fields"
+	end
+	em :test_resend_emits_resend_event_instead_of_in
+
 	def test_sentry_captures_handler_exception
 		captured_exceptions = []
 		repo = SGXbwmsgsv2.instance_variable_get(:@registration_repo)