diff --git a/lib/message_event.rb b/lib/message_event.rb index 2cff71d01263782b15c345588cc6937974ce576e..6b57a79e117638aa11e05d66374c87176f743f48 100644 --- a/lib/message_event.rb +++ b/lib/message_event.rb @@ -21,28 +21,29 @@ module MessageEvent class Base include Emittable - def initialize(event:, timestamp:, message_id:) + def initialize(event:, timestamp: nil) raise ArgumentError, "event must be a String" unless event.is_a?(String) - raise ArgumentError, "message_id must be a String" unless message_id.is_a?(String) Time.iso8601(timestamp) if timestamp.is_a?(String) @event = event @timestamp = timestamp - @message_id = message_id end - attr_reader :event, :timestamp, :message_id + attr_reader :event, :timestamp # We use to_redis_fields instead of to_h because we want to serialize values # (e.g., Time -> ISO8601 string, arrays -> JSON). A plain to_h would return # raw Ruby objects, which is less useful for parsing from other # projects/languages. def to_redis_fields - { + fields = { "event" => @event, - "timestamp" => @timestamp.is_a?(Time) ? @timestamp.iso8601 : @timestamp, - "message_id" => @message_id + "source" => "bwmsgsv2" } + if @timestamp + fields["timestamp"] = @timestamp.is_a?(Time) ? @timestamp.iso8601 : @timestamp + end + fields end end @@ -66,53 +67,112 @@ module MessageEvent attr_reader :from, :to, :body, :media_urls def to_redis_fields - super.merge( + fields = super.merge( "from" => @from, - "to" => JSON.dump(@to), - "body" => @body, - "media_urls" => JSON.dump(@media_urls) + "to" => JSON.dump(@to) ) + fields["body"] = @body unless @body.nil? || @body.empty? + fields["media_urls"] = JSON.dump(@media_urls) unless @media_urls.empty? + fields end end class In < Message - def initialize(**kwargs) + def initialize(owner:, bandwidth_id:, **kwargs) + raise ArgumentError, "owner must be a valid US telephone number" unless ValidTel === owner + raise ArgumentError, "bandwidth_id must be a String" unless bandwidth_id.is_a?(String) + + @owner = owner + @bandwidth_id = bandwidth_id super(event: "in", **kwargs) end + + attr_reader :owner, :bandwidth_id + + def to_redis_fields + super.merge( + "owner" => @owner, + "bandwidth_id" => @bandwidth_id + ) + end end class Out < Message - def initialize(**kwargs) + def initialize(stanza_id:, bandwidth_id: nil, **kwargs) + raise ArgumentError, "stanza_id must be a String" unless stanza_id.is_a?(String) + if bandwidth_id && !bandwidth_id.is_a?(String) + raise ArgumentError, "bandwidth_id must be a String" + end + + @stanza_id = stanza_id + @bandwidth_id = bandwidth_id super(event: "out", **kwargs) end + + attr_reader :stanza_id, :bandwidth_id + + def to_redis_fields + fields = super.merge("stanza_id" => @stanza_id) + fields["bandwidth_id"] = @bandwidth_id if @bandwidth_id + fields + end end class Thru < Message - def initialize(**kwargs) + def initialize(stanza_id:, **kwargs) + raise ArgumentError, "stanza_id must be a String" unless stanza_id.is_a?(String) + + @stanza_id = stanza_id super(event: "thru", **kwargs) end + + attr_reader :stanza_id + + def to_redis_fields + super.merge("stanza_id" => @stanza_id) + end end class Delivered < Base - def initialize(**kwargs) + def initialize(stanza_id:, bandwidth_id:, **kwargs) + raise ArgumentError, "stanza_id must be a String" unless stanza_id.is_a?(String) + raise ArgumentError, "bandwidth_id must be a String" unless bandwidth_id.is_a?(String) + + @stanza_id = stanza_id + @bandwidth_id = bandwidth_id super(event: "delivered", **kwargs) end + + attr_reader :stanza_id, :bandwidth_id + + def to_redis_fields + super.merge( + "stanza_id" => @stanza_id, + "bandwidth_id" => @bandwidth_id + ) + end end class Failed < Base - def initialize(error_code:, error_description:, **kwargs) + def initialize(stanza_id:, bandwidth_id:, error_code:, error_description:, **kwargs) + raise ArgumentError, "stanza_id must be a String" unless stanza_id.is_a?(String) + raise ArgumentError, "bandwidth_id must be a String" unless bandwidth_id.is_a?(String) raise ArgumentError, "error_code must be a String" unless error_code.is_a?(String) raise ArgumentError, "error_description must be a String" unless error_description.is_a?(String) + @stanza_id = stanza_id + @bandwidth_id = bandwidth_id @error_code = error_code @error_description = error_description super(event: "failed", **kwargs) end - attr_reader :error_code, :error_description + attr_reader :stanza_id, :bandwidth_id, :error_code, :error_description def to_redis_fields super.merge( + "stanza_id" => @stanza_id, + "bandwidth_id" => @bandwidth_id, "error_code" => @error_code, "error_description" => @error_description ) diff --git a/sgx-bwmsgsv2.rb b/sgx-bwmsgsv2.rb index 4bd85ed3b4894c3f7e4c218af803dc06321db68a..032e72d3b8209eb4d1237131e16aa9d3e1bd30bf 100755 --- a/sgx-bwmsgsv2.rb +++ b/sgx-bwmsgsv2.rb @@ -228,13 +228,14 @@ module SGXbwmsgsv2 puts 'XRESPONSE0: ' + m.inspect write_to_stream m - # Emit pass-through event + # Emit pass-through event. Thru events don't capture a timestamp because XMPP + # stanzas don't carry timestamps for realtime messages, and the Redis stream + # ID provides the emit time. oob_url = m.at("oob|x > oob|url", oob: "jabber:x:oob")&.text MessageEvent::Thru.new( - timestamp: Time.now, from: users_num, to: [dest_num], - message_id: m.id, + stanza_id: m.id.to_s, body: m.body.to_s, media_urls: [oob_url].compact ).emit(REDIS) @@ -374,10 +375,11 @@ module SGXbwmsgsv2 ).then { |response| parsed = JSON.parse(response) rescue {} MessageEvent::Out.new( - timestamp: Time.now, + timestamp: parsed["time"] || Time.now, from: usern, to: Array(num_dest), - message_id: parsed["id"] || s.id, + stanza_id: s.id.to_s, + bandwidth_id: parsed["id"], body: body, media_urls: [murl].compact ).emit(REDIS) @@ -1149,25 +1151,28 @@ class WebhookHandler < Goliath::API timestamp: jparams['time'], from: jparams['from'], to: jparams['to'], - message_id: jparams['id'], + owner: jparams['owner'], + bandwidth_id: jparams['id'], body: jparams['text'].to_s, media_urls: media_urls ).emit(REDIS) when ['out', 'message-failed'] tag_parts = jparams['tag'].split(/ /, 2) - msg_id = WEBrick::HTTPUtils.unescape(tag_parts[0]) + stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0]) MessageEvent::Failed.new( timestamp: jparams['time'], - message_id: msg_id, + stanza_id: stanza_id, + bandwidth_id: jparams['id'], error_code: jparams['errorCode'].to_s, error_description: jparams['description'].to_s ).emit(REDIS) when ['out', 'message-delivered'] tag_parts = jparams['tag'].split(/ /, 2) - msg_id = WEBrick::HTTPUtils.unescape(tag_parts[0]) + stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0]) MessageEvent::Delivered.new( timestamp: jparams['time'], - message_id: msg_id + stanza_id: stanza_id, + bandwidth_id: jparams['id'] ).emit(REDIS) end diff --git a/test/test_component.rb b/test/test_component.rb index 54887d297597e4673d982179cf46c395998fdd6f..1461354d153c57e36f7a977adafe9f9f7a973cdf 100644 --- a/test/test_component.rb +++ b/test/test_component.rb @@ -536,7 +536,8 @@ class ComponentTest < Minitest::Test assert_equal "out", event["event"] assert_equal "+15550000000", event["from"] assert_equal JSON.dump(["+15551234567"]), event["to"] - assert_equal "bw-msg-123", event["message_id"] + assert_equal "stanza-123", event["stanza_id"] + assert_equal "bw-msg-123", event["bandwidth_id"] assert_equal "Hello world", event["body"] end em :test_outbound_message_emits_to_stream @@ -559,8 +560,9 @@ class ComponentTest < Minitest::Test assert_equal "thru", event["event"] assert_equal "+15550000000", event["from"] assert_equal JSON.dump(["+15559999999"]), event["to"] - assert_equal "passthru-stanza-456", event["message_id"] + assert_equal "passthru-stanza-456", event["stanza_id"] assert_equal "Pass through", event["body"] + refute event.key?("timestamp"), "Thru events should not have a timestamp field" end em :test_passthrough_message_emits_to_stream @@ -604,7 +606,8 @@ class ComponentTest < Minitest::Test assert_equal "in", event["event"] assert_equal "+15551234567", event["from"] assert_equal JSON.dump(["+15550000000"]), event["to"] - assert_equal "bw-in-123", event["message_id"] + assert_equal "bw-in-123", event["bandwidth_id"] + assert_equal "+15550000000", event["owner"] assert_equal "Hello from outside", event["body"] assert_equal JSON.dump([]), event["media_urls"] end @@ -662,7 +665,8 @@ class ComponentTest < Minitest::Test event = entries.first[:fields] assert_equal "delivered", event["event"] - assert_equal "stanza-id-abc", event["message_id"] + assert_equal "stanza-id-abc", event["stanza_id"] + assert_equal "bw-out-789", event["bandwidth_id"] assert_equal "2025-01-13T10:10:00Z", event["timestamp"] end em :test_message_delivered_emits_to_stream @@ -690,7 +694,8 @@ class ComponentTest < Minitest::Test event = entries.first[:fields] assert_equal "failed", event["event"] - assert_equal "failed-stanza-xyz", event["message_id"] + assert_equal "failed-stanza-xyz", event["stanza_id"] + assert_equal "bw-out-999", event["bandwidth_id"] assert_equal "4720", event["error_code"] assert_equal "Carrier rejected message", event["error_description"] assert_equal "2025-01-13T10:15:00Z", event["timestamp"]