Separate message_id into bandwidth/stanza IDs, make timestamp optional

Amolith created

Having both bandwidth and stanza IDs would be useful instead of only
ever capturing one. With both, message_id becomes redundant.

Use provided timestamp everywhere, don't generate a timestamp when one's
not provided and instead just rely on the event ID from redis, because
stream IDs are timestamps.

Change summary

lib/message_event.rb   | 94 ++++++++++++++++++++++++++++++++++++-------
sgx-bwmsgsv2.rb        | 25 +++++++----
test/test_component.rb | 15 ++++--
3 files changed, 102 insertions(+), 32 deletions(-)

Detailed changes

lib/message_event.rb 🔗

@@ -21,28 +21,29 @@ module MessageEvent
 	class Base
 		include Emittable
 
-		def initialize(event:, timestamp:, message_id:)
+		def initialize(event:, timestamp: nil)
 			raise ArgumentError, "event must be a String" unless event.is_a?(String)
-			raise ArgumentError, "message_id must be a String" unless message_id.is_a?(String)
 
 			Time.iso8601(timestamp) if timestamp.is_a?(String)
 			@event = event
 			@timestamp = timestamp
-			@message_id = message_id
 		end
 
-		attr_reader :event, :timestamp, :message_id
+		attr_reader :event, :timestamp
 
 		# We use to_redis_fields instead of to_h because we want to serialize values
 		# (e.g., Time -> ISO8601 string, arrays -> JSON). A plain to_h would return
 		# raw Ruby objects, which is less useful for parsing from other
 		# projects/languages.
 		def to_redis_fields
-			{
+			fields = {
 				"event" => @event,
-				"timestamp" => @timestamp.is_a?(Time) ? @timestamp.iso8601 : @timestamp,
-				"message_id" => @message_id
+				"source" => "bwmsgsv2"
 			}
+			if @timestamp
+				fields["timestamp"] = @timestamp.is_a?(Time) ? @timestamp.iso8601 : @timestamp
+			end
+			fields
 		end
 	end
 
@@ -66,53 +67,112 @@ module MessageEvent
 		attr_reader :from, :to, :body, :media_urls
 
 		def to_redis_fields
-			super.merge(
+			fields = super.merge(
 				"from" => @from,
-				"to" => JSON.dump(@to),
-				"body" => @body,
-				"media_urls" => JSON.dump(@media_urls)
+				"to" => JSON.dump(@to)
 			)
+			fields["body"] = @body unless @body.nil? || @body.empty?
+			fields["media_urls"] = JSON.dump(@media_urls) unless @media_urls.empty?
+			fields
 		end
 	end
 
 	class In < Message
-		def initialize(**kwargs)
+		def initialize(owner:, bandwidth_id:, **kwargs)
+			raise ArgumentError, "owner must be a valid US telephone number" unless ValidTel === owner
+			raise ArgumentError, "bandwidth_id must be a String" unless bandwidth_id.is_a?(String)
+
+			@owner = owner
+			@bandwidth_id = bandwidth_id
 			super(event: "in", **kwargs)
 		end
+
+		attr_reader :owner, :bandwidth_id
+
+		def to_redis_fields
+			super.merge(
+				"owner" => @owner,
+				"bandwidth_id" => @bandwidth_id
+			)
+		end
 	end
 
 	class Out < Message
-		def initialize(**kwargs)
+		def initialize(stanza_id:, bandwidth_id: nil, **kwargs)
+			raise ArgumentError, "stanza_id must be a String" unless stanza_id.is_a?(String)
+			if bandwidth_id && !bandwidth_id.is_a?(String)
+				raise ArgumentError, "bandwidth_id must be a String"
+			end
+
+			@stanza_id = stanza_id
+			@bandwidth_id = bandwidth_id
 			super(event: "out", **kwargs)
 		end
+
+		attr_reader :stanza_id, :bandwidth_id
+
+		def to_redis_fields
+			fields = super.merge("stanza_id" => @stanza_id)
+			fields["bandwidth_id"] = @bandwidth_id if @bandwidth_id
+			fields
+		end
 	end
 
 	class Thru < Message
-		def initialize(**kwargs)
+		def initialize(stanza_id:, **kwargs)
+			raise ArgumentError, "stanza_id must be a String" unless stanza_id.is_a?(String)
+
+			@stanza_id = stanza_id
 			super(event: "thru", **kwargs)
 		end
+
+		attr_reader :stanza_id
+
+		def to_redis_fields
+			super.merge("stanza_id" => @stanza_id)
+		end
 	end
 
 	class Delivered < Base
-		def initialize(**kwargs)
+		def initialize(stanza_id:, bandwidth_id:, **kwargs)
+			raise ArgumentError, "stanza_id must be a String" unless stanza_id.is_a?(String)
+			raise ArgumentError, "bandwidth_id must be a String" unless bandwidth_id.is_a?(String)
+
+			@stanza_id = stanza_id
+			@bandwidth_id = bandwidth_id
 			super(event: "delivered", **kwargs)
 		end
+
+		attr_reader :stanza_id, :bandwidth_id
+
+		def to_redis_fields
+			super.merge(
+				"stanza_id" => @stanza_id,
+				"bandwidth_id" => @bandwidth_id
+			)
+		end
 	end
 
 	class Failed < Base
-		def initialize(error_code:, error_description:, **kwargs)
+		def initialize(stanza_id:, bandwidth_id:, error_code:, error_description:, **kwargs)
+			raise ArgumentError, "stanza_id must be a String" unless stanza_id.is_a?(String)
+			raise ArgumentError, "bandwidth_id must be a String" unless bandwidth_id.is_a?(String)
 			raise ArgumentError, "error_code must be a String" unless error_code.is_a?(String)
 			raise ArgumentError, "error_description must be a String" unless error_description.is_a?(String)
 
+			@stanza_id = stanza_id
+			@bandwidth_id = bandwidth_id
 			@error_code = error_code
 			@error_description = error_description
 			super(event: "failed", **kwargs)
 		end
 
-		attr_reader :error_code, :error_description
+		attr_reader :stanza_id, :bandwidth_id, :error_code, :error_description
 
 		def to_redis_fields
 			super.merge(
+				"stanza_id" => @stanza_id,
+				"bandwidth_id" => @bandwidth_id,
 				"error_code" => @error_code,
 				"error_description" => @error_description
 			)

sgx-bwmsgsv2.rb 🔗

@@ -228,13 +228,14 @@ module SGXbwmsgsv2
 		puts 'XRESPONSE0: ' + m.inspect
 		write_to_stream m
 
-		# Emit pass-through event
+		# Emit pass-through event. Thru events don't capture a timestamp because XMPP
+		# stanzas don't carry timestamps for realtime messages, and the Redis stream
+		# ID provides the emit time.
 		oob_url = m.at("oob|x > oob|url", oob: "jabber:x:oob")&.text
 		MessageEvent::Thru.new(
-			timestamp: Time.now,
 			from: users_num,
 			to: [dest_num],
-			message_id: m.id,
+			stanza_id: m.id.to_s,
 			body: m.body.to_s,
 			media_urls: [oob_url].compact
 		).emit(REDIS)
@@ -374,10 +375,11 @@ module SGXbwmsgsv2
 		).then { |response|
 			parsed = JSON.parse(response) rescue {}
 			MessageEvent::Out.new(
-				timestamp: Time.now,
+				timestamp: parsed["time"] || Time.now,
 				from: usern,
 				to: Array(num_dest),
-				message_id: parsed["id"] || s.id,
+				stanza_id: s.id.to_s,
+				bandwidth_id: parsed["id"],
 				body: body,
 				media_urls: [murl].compact
 			).emit(REDIS)
@@ -1149,25 +1151,28 @@ class WebhookHandler < Goliath::API
 				timestamp: jparams['time'],
 				from: jparams['from'],
 				to: jparams['to'],
-				message_id: jparams['id'],
+				owner: jparams['owner'],
+				bandwidth_id: jparams['id'],
 				body: jparams['text'].to_s,
 				media_urls: media_urls
 			).emit(REDIS)
 		when ['out', 'message-failed']
 			tag_parts = jparams['tag'].split(/ /, 2)
-			msg_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
+			stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
 			MessageEvent::Failed.new(
 				timestamp: jparams['time'],
-				message_id: msg_id,
+				stanza_id: stanza_id,
+				bandwidth_id: jparams['id'],
 				error_code: jparams['errorCode'].to_s,
 				error_description: jparams['description'].to_s
 			).emit(REDIS)
 		when ['out', 'message-delivered']
 			tag_parts = jparams['tag'].split(/ /, 2)
-			msg_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
+			stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
 			MessageEvent::Delivered.new(
 				timestamp: jparams['time'],
-				message_id: msg_id
+				stanza_id: stanza_id,
+				bandwidth_id: jparams['id']
 			).emit(REDIS)
 		end
 

test/test_component.rb 🔗

@@ -536,7 +536,8 @@ class ComponentTest < Minitest::Test
 		assert_equal "out", event["event"]
 		assert_equal "+15550000000", event["from"]
 		assert_equal JSON.dump(["+15551234567"]), event["to"]
-		assert_equal "bw-msg-123", event["message_id"]
+		assert_equal "stanza-123", event["stanza_id"]
+		assert_equal "bw-msg-123", event["bandwidth_id"]
 		assert_equal "Hello world", event["body"]
 	end
 	em :test_outbound_message_emits_to_stream
@@ -559,8 +560,9 @@ class ComponentTest < Minitest::Test
 		assert_equal "thru", event["event"]
 		assert_equal "+15550000000", event["from"]
 		assert_equal JSON.dump(["+15559999999"]), event["to"]
-		assert_equal "passthru-stanza-456", event["message_id"]
+		assert_equal "passthru-stanza-456", event["stanza_id"]
 		assert_equal "Pass through", event["body"]
+		refute event.key?("timestamp"), "Thru events should not have a timestamp field"
 	end
 	em :test_passthrough_message_emits_to_stream
 
@@ -604,7 +606,8 @@ class ComponentTest < Minitest::Test
 		assert_equal "in", event["event"]
 		assert_equal "+15551234567", event["from"]
 		assert_equal JSON.dump(["+15550000000"]), event["to"]
-		assert_equal "bw-in-123", event["message_id"]
+		assert_equal "bw-in-123", event["bandwidth_id"]
+		assert_equal "+15550000000", event["owner"]
 		assert_equal "Hello from outside", event["body"]
 		assert_equal JSON.dump([]), event["media_urls"]
 	end
@@ -662,7 +665,8 @@ class ComponentTest < Minitest::Test
 
 		event = entries.first[:fields]
 		assert_equal "delivered", event["event"]
-		assert_equal "stanza-id-abc", event["message_id"]
+		assert_equal "stanza-id-abc", event["stanza_id"]
+		assert_equal "bw-out-789", event["bandwidth_id"]
 		assert_equal "2025-01-13T10:10:00Z", event["timestamp"]
 	end
 	em :test_message_delivered_emits_to_stream
@@ -690,7 +694,8 @@ class ComponentTest < Minitest::Test
 
 		event = entries.first[:fields]
 		assert_equal "failed", event["event"]
-		assert_equal "failed-stanza-xyz", event["message_id"]
+		assert_equal "failed-stanza-xyz", event["stanza_id"]
+		assert_equal "bw-out-999", event["bandwidth_id"]
 		assert_equal "4720", event["error_code"]
 		assert_equal "Carrier rejected message", event["error_description"]
 		assert_equal "2025-01-13T10:15:00Z", event["timestamp"]