Emit messages and statuses to Redis stream

Amolith created

- in: inbound message-received
- out: outbound after successful Bandwidth API call
- thru: pass-through XMPP-to-XMPP messages
- delivered: message-delivered callback
- failed: message-failed callback

Implements: https://todo.sr.ht/~singpolyma/soprani.ca/517
Implements: https://todo.sr.ht/~singpolyma/soprani.ca/518

Change summary

Gemfile                     |   1 
lib/bandwidth_tn_options.rb |   1 
lib/message_event.rb        | 115 ++++++++++++++++++++++
sgx-bwmsgsv2.rb             |  60 +++++++++++
test/test_component.rb      | 197 +++++++++++++++++++++++++++++++++++++++
test/test_helper.rb         |  12 ++
6 files changed, 385 insertions(+), 1 deletion(-)

Detailed changes

Gemfile 🔗

@@ -14,6 +14,7 @@ gem 'faraday-em_synchrony'
 gem 'ruby-bandwidth-iris'
 gem 'goliath'
 gem 'lazy_object'
+gem 'value_semantics'
 gem 'log4r'
 gem 'multibases'
 gem 'multihashes'

lib/bandwidth_tn_options.rb 🔗

@@ -2,6 +2,7 @@
 
 require "em_promise"
 require "ruby-bandwidth-iris"
+require "em-http-request"
 Faraday.default_adapter = :em_synchrony
 
 class BandwidthTNOptions

lib/message_event.rb 🔗

@@ -0,0 +1,115 @@
+# frozen_string_literal: true
+
+require "json"
+require "value_semantics"
+
+module MessageEvent
+	module Emittable
+		def emit(redis=LazyObject.new { REDIS }, stream: "messages")
+			args = to_redis_fields.flat_map { |k, v| [k, v] }
+			redis.xadd(stream, "*", *args).catch do |e|
+				puts "WARN Failed to emit message event: #{e.message}"
+			end
+		end
+	end
+
+	module ValidTel
+		def self.===(value)
+			return value.match?(/\A\+1\d{10}\z/) if value.is_a?(String)
+			return value.to_s.match?(/\A\+1\d{10}\z/) if value.respond_to?(:to_s)
+
+			false
+		end
+	end
+
+	class MessageFields
+		include ValueSemantics.for_attributes {
+			event String
+			timestamp Time, coerce: true
+			from ValidTel
+			to ArrayOf(ValidTel)
+			message_id String
+			body String
+			media_urls ArrayOf(String), coerce: true
+		}
+
+		def self.coerce_timestamp(value)
+			value.is_a?(String) ? Time.parse(value) : value
+		end
+
+		def self.coerce_media_urls(value)
+			value.is_a?(String) ? JSON.parse(value) : value
+		end
+
+		def to_redis_fields
+			{
+				"event" => event,
+				"timestamp" => timestamp.iso8601,
+				"from" => from,
+				"to" => JSON.dump(to),
+				"message_id" => message_id,
+				"has_media" => media_urls.any?.to_s,
+				"body" => body,
+				"media_urls" => JSON.dump(media_urls)
+			}
+		end
+	end
+
+	%w[In Out Thru].each { |n|
+		const_set(n, Class.new(MessageFields) {
+			include Emittable
+
+			const_set(n, lambda { |v|
+				v == n.downcase
+			})
+
+			define_method(:initialize) { |**a|
+				super(**a.merge(event: n.downcase))
+			}
+		})
+	}
+
+	class Delivered
+		include Emittable
+		Delivered = lambda { |v| v == "delivered" }
+		include ValueSemantics.for_attributes {
+			event Delivered
+			timestamp Either(Time, String)
+			message_id String
+		}
+
+		def initialize(**a)
+			super(**a.merge(event: "delivered"))
+		end
+
+		def to_redis_fields = {
+			"event" => event,
+			"timestamp" => timestamp.iso8601,
+			"message_id" => message_id
+		}
+	end
+
+	class Failed
+		include Emittable
+		Failed = lambda { |v| v == "failed" }
+		include ValueSemantics.for_attributes {
+			event Failed
+			timestamp Either(Time, String)
+			message_id String
+			error_code String
+			error_description String
+		}
+
+		def initialize(**a)
+			super(**a.merge(event: "failed"))
+		end
+
+		def to_redis_fields = {
+			"event" => event,
+			"timestamp" => timestamp.is_a?(Time) ? timestamp.iso8601 : timestamp,
+			"message_id" => message_id,
+			"error_code" => error_code,
+			"error_description" => error_description
+		}
+	end
+end

sgx-bwmsgsv2.rb 🔗

@@ -40,6 +40,7 @@ require 'em-synchrony'
 
 require_relative 'lib/bandwidth_error'
 require_relative 'lib/bandwidth_tn_options'
+require_relative 'lib/message_event'
 require_relative 'lib/registration_repo'
 
 Sentry.init
@@ -213,6 +214,9 @@ module SGXbwmsgsv2
 	setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, async: true
 
 	def self.pass_on_message(m, users_num, jid)
+		# Capture destination before modifying m.to
+		dest_num = m.to.node
+
 		# setup delivery receipt; similar to a reply
 		rcpt = ReceiptMessage.new(m.from.stripped)
 		rcpt.from = m.to
@@ -224,6 +228,17 @@ module SGXbwmsgsv2
 		puts 'XRESPONSE0: ' + m.inspect
 		write_to_stream m
 
+		# Emit pass-through event
+		oob_url = m.at("oob|x > oob|url", oob: "jabber:x:oob")&.text
+		MessageEvent::Thru.new(
+			timestamp: Time.now,
+			from: users_num,
+			to: [dest_num],
+			message_id: m.id,
+			body: m.body.to_s,
+			media_urls: [oob_url].compact
+		).emit(REDIS)
+
 		# send a delivery receipt back to the sender
 		# TODO: send only when requested per XEP-0184
 		# TODO: pass receipts from target if supported
@@ -356,7 +371,18 @@ module SGXbwmsgsv2
 			)),
 			{'Content-Type' => 'application/json'},
 			[201]
-		).catch { |e|
+		).then { |response|
+			parsed = JSON.parse(response) rescue {}
+			MessageEvent::Out.new(
+				timestamp: Time.now,
+				from: usern,
+				to: Array(num_dest),
+				message_id: parsed["id"] || s.id,
+				body: body,
+				media_urls: [murl].compact
+			).emit(REDIS)
+			response
+		}.catch { |e|
 			EMPromise.reject(
 				[:cancel, 'internal-server-error', e.message]
 			)
@@ -1113,6 +1139,38 @@ class WebhookHandler < Goliath::API
 		msg.from = others_num + '@' + ARGV[0]
 		SGXbwmsgsv2.write(msg)
 
+		# Emit event to messages stream
+		case [jparams['direction'], type]
+		when ['in', 'message-received']
+			media_urls = Array(jparams['media']).reject { |u|
+				u.end_with?('.smil', '.txt', '.xml')
+			}
+			MessageEvent::In.new(
+				timestamp: jparams['time'],
+				from: jparams['from'],
+				to: jparams['to'],
+				message_id: jparams['id'],
+				body: jparams['text'].to_s,
+				media_urls: media_urls
+			).emit(REDIS)
+		when ['out', 'message-failed']
+			tag_parts = jparams['tag'].split(/ /, 2)
+			msg_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
+			MessageEvent::Failed.new(
+				timestamp: jparams['time'],
+				message_id: msg_id,
+				error_code: jparams['errorCode'].to_s,
+				error_description: jparams['description'].to_s
+			).emit(REDIS)
+		when ['out', 'message-delivered']
+			tag_parts = jparams['tag'].split(/ /, 2)
+			msg_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
+			MessageEvent::Delivered.new(
+				timestamp: jparams['time'],
+				message_id: msg_id
+			).emit(REDIS)
+		end
+
 		[200, {}, "OK"]
 	rescue Exception => e
 		Sentry.capture_exception(e)

test/test_component.rb 🔗

@@ -44,6 +44,16 @@ class ComponentTest < Minitest::Test
 		raise $panic if $panic
 	end
 
+	def wait_for(timeout: 1.0, interval: 0.01)
+		start = Time.now
+		until yield
+			raise "Timeout waiting for condition" if Time.now - start > timeout
+			f = Fiber.current
+			EM.add_timer(interval) { f.resume }
+			Fiber.yield
+		end
+	end
+
 	def test_message_unregistered
 		m = Blather::Stanza::Message.new("+15551234567@component", "a"*4096)
 		m.from = "unknown@example.com"
@@ -513,4 +523,191 @@ class ComponentTest < Minitest::Test
 		end
 	end
 	em :test_port_out_pin_validation
+
+	def test_outbound_message_emits_to_stream
+		stub_request(
+			:post,
+			"https://messaging.bandwidth.com/api/v2/users/account/messages"
+		).with(body: hash_including(
+			from: "+15550000000",
+			to: "+15551234567",
+			text: "Hello world"
+		)).to_return(status: 201, body: JSON.dump(id: "bw-msg-123"))
+
+		m = Blather::Stanza::Message.new("+15551234567@component", "Hello world")
+		m.from = "test@example.com/resource"
+		m['id'] = "stanza-123"
+		process_stanza(m)
+
+		entries = REDIS.stream_entries("messages").sync
+		assert_equal 1, entries.length
+
+		event = entries.first[:fields]
+		assert_equal "out", event["event"]
+		assert_equal "+15550000000", event["from"]
+		assert_equal JSON.dump(["+15551234567"]), event["to"]
+		assert_equal "bw-msg-123", event["message_id"]
+		assert_equal "false", event["has_media"]
+		assert_equal "Hello world", event["body"]
+	end
+	em :test_outbound_message_emits_to_stream
+
+	def test_passthrough_message_emits_to_stream
+		REDIS.set("catapult_jid-+15559999999", "other@example.com")
+		REDIS.set("catapult_cred-other@example.com", [
+			'other_acct', 'other_user', 'other_pw', '+15559999999'
+		])
+
+		m = Blather::Stanza::Message.new("+15559999999@component", "Pass through")
+		m.from = "test@example.com/resource"
+		m['id'] = "passthru-stanza-456"
+		process_stanza(m)
+
+		entries = REDIS.stream_entries("messages").sync
+		assert_equal 1, entries.length
+
+		event = entries.first[:fields]
+		assert_equal "thru", event["event"]
+		assert_equal "+15550000000", event["from"]
+		assert_equal JSON.dump(["+15559999999"]), event["to"]
+		assert_equal "passthru-stanza-456", event["message_id"]
+		assert_equal "false", event["has_media"]
+		assert_equal "Pass through", event["body"]
+	end
+	em :test_passthrough_message_emits_to_stream
+
+	def invoke_webhook(payload)
+		handler = WebhookHandler.new
+		env = {
+			"REQUEST_URI" => "/",
+			"REQUEST_METHOD" => "POST",
+			"params" => {"_json" => [payload]}
+		}
+		handler.instance_variable_set(:@env, env)
+		def handler.params
+			@env["params"]
+		end
+
+		EMPromise.resolve(nil).then {
+			handler.response(env)
+		}.sync
+	end
+
+	def test_inbound_sms_emits_to_stream
+		payload = {
+			"type" => "message-received",
+			"message" => {
+				"id" => "bw-in-123",
+				"direction" => "in",
+				"owner" => "+15550000000",
+				"from" => "+15551234567",
+				"to" => ["+15550000000"],
+				"time" => "2025-01-13T10:00:00Z",
+				"text" => "Hello from outside"
+			}
+		}
+
+		invoke_webhook(payload)
+
+		entries = REDIS.stream_entries("messages").sync
+		assert_equal 1, entries.length
+
+		event = entries.first[:fields]
+		assert_equal "in", event["event"]
+		assert_equal "+15551234567", event["from"]
+		assert_equal JSON.dump(["+15550000000"]), event["to"]
+		assert_equal "bw-in-123", event["message_id"]
+		assert_equal "false", event["has_media"]
+		assert_equal "Hello from outside", event["body"]
+		assert_equal JSON.dump([]), event["media_urls"]
+	end
+	em :test_inbound_sms_emits_to_stream
+
+	def test_inbound_mms_emits_to_stream_and_filters_smil
+		payload = {
+			"type" => "message-received",
+			"message" => {
+				"id" => "bw-mms-456",
+				"direction" => "in",
+				"owner" => "+15550000000",
+				"from" => "+15551234567",
+				"to" => ["+15550000000"],
+				"time" => "2025-01-13T10:05:00Z",
+				"text" => "Check this out",
+				"media" => [
+					"https://example.com/image.jpg",
+					"https://example.com/file.smil",
+					"https://example.com/data.txt",
+					"https://example.com/meta.xml"
+				]
+			}
+		}
+
+		invoke_webhook(payload)
+
+		entries = REDIS.stream_entries("messages").sync
+		assert_equal 1, entries.length
+
+		event = entries.first[:fields]
+		assert_equal "in", event["event"]
+		assert_equal "true", event["has_media"]
+		assert_equal JSON.dump(["https://example.com/image.jpg"]), event["media_urls"]
+	end
+	em :test_inbound_mms_emits_to_stream_and_filters_smil
+
+	def test_message_delivered_emits_to_stream
+		payload = {
+			"type" => "message-delivered",
+			"message" => {
+				"id" => "bw-out-789",
+				"direction" => "out",
+				"owner" => "+15550000000",
+				"from" => "+15550000000",
+				"to" => ["+15551234567"],
+				"time" => "2025-01-13T10:10:00Z",
+				"tag" => "stanza-id-abc%20extra-data"
+			}
+		}
+
+		invoke_webhook(payload)
+
+		entries = REDIS.stream_entries("messages").sync
+		assert_equal 1, entries.length
+
+		event = entries.first[:fields]
+		assert_equal "delivered", event["event"]
+		assert_equal "stanza-id-abc", event["message_id"]
+		assert_equal "2025-01-13T10:10:00Z", event["timestamp"]
+	end
+	em :test_message_delivered_emits_to_stream
+
+	def test_message_failed_emits_to_stream
+		payload = {
+			"type" => "message-failed",
+			"message" => {
+				"id" => "bw-out-999",
+				"direction" => "out",
+				"owner" => "+15550000000",
+				"from" => "+15550000000",
+				"to" => ["+15551234567"],
+				"time" => "2025-01-13T10:15:00Z",
+				"tag" => "failed-stanza-xyz%20extra",
+				"errorCode" => 4720,
+				"description" => "Carrier rejected message"
+			}
+		}
+
+		invoke_webhook(payload)
+
+		entries = REDIS.stream_entries("messages").sync
+		assert_equal 1, entries.length
+
+		event = entries.first[:fields]
+		assert_equal "failed", event["event"]
+		assert_equal "failed-stanza-xyz", event["message_id"]
+		assert_equal "4720", event["error_code"]
+		assert_equal "Carrier rejected message", event["error_description"]
+		assert_equal "2025-01-13T10:15:00Z", event["timestamp"]
+	end
+	em :test_message_failed_emits_to_stream
 end

test/test_helper.rb 🔗

@@ -131,6 +131,18 @@ class FakeRedis
 
 	def discard
 	end
+
+	def xadd(stream, id, *args)
+		@values[stream] ||= []
+		entry_id = id == "*" ? "#{Time.now.to_i}-0" : id
+		fields = Hash[*args]
+		@values[stream] << { id: entry_id, fields: fields }
+		EMPromise.resolve(entry_id)
+	end
+
+	def stream_entries(stream)
+		EMPromise.resolve(@values[stream] || [])
+	end
 end
 
 REDIS = FakeRedis.new