Merge branch 'emit-messages-to-event-stream' of https://git.secluded.site/sgx-bwmsgsv2

Stephen Paul Weber created

* 'emit-messages-to-event-stream' of https://git.secluded.site/sgx-bwmsgsv2:
  Separate message_id into bandwidth/stanza IDs, make timestamp optional
  Switch from ValueSemantics to normal OOP Ruby
  Use stream name in example
  Remove unused wait_for
  Pin ruby-bandwidth-iris to ~> 2.7
  Add Redis stream trim example
  Emit messages and statuses to Redis stream

Change summary

Gemfile                     |   3 
README.creole               |   6 +
lib/bandwidth_tn_options.rb |   1 
lib/message_event.rb        | 181 +++++++++++++++++++++++++++++++++++++
sgx-bwmsgsv2.rb             |  65 +++++++++++++
test/test_component.rb      | 188 +++++++++++++++++++++++++++++++++++++++
test/test_helper.rb         |  12 ++
7 files changed, 454 insertions(+), 2 deletions(-)

Detailed changes

Gemfile 🔗

@@ -11,9 +11,10 @@ gem 'em-synchrony'
 gem 'eventmachine'
 gem 'faraday', '~> 1.0'
 gem 'faraday-em_synchrony'
-gem 'ruby-bandwidth-iris'
+gem 'ruby-bandwidth-iris', '~> 2.7'
 gem 'goliath'
 gem 'lazy_object'
+gem 'value_semantics'
 gem 'log4r'
 gem 'multibases'
 gem 'multihashes'

README.creole 🔗

@@ -5,3 +5,9 @@ An XMPP to SMS gateway for Bandwidth's V2 Messaging API, using XEP-0100.  The "S
 Note that the canonical location for this repository is https://gitlab.com/soprani.ca/sgx-bwmsgsv2 .  Please use that location for all pull requests, issue reports, etc.  Other locations are likely out-of-date.
 
 This program expects a binary named "tai" to be in its working directory, which should be a compiled version of https://ossguy.com/tai.c for the platform it is running on.
+
+The gateway emits events to a Redis stream. To trim entries older than 7 days, put something like this in a cronjob:
+
+{{{
+redis-cli -u redis://127.0.0.1:6380/0 XTRIM messages MINID "~" "$(( ($(date +%s) - 7 * 24 * 60 * 60) * 1000 ))-0"
+}}}

lib/bandwidth_tn_options.rb 🔗

@@ -2,6 +2,7 @@
 
 require "em_promise"
 require "ruby-bandwidth-iris"
+require "em-http-request"
 Faraday.default_adapter = :em_synchrony
 
 class BandwidthTNOptions

lib/message_event.rb 🔗

@@ -0,0 +1,181 @@
+# frozen_string_literal: true
+
+require "json"
+
+module MessageEvent
+	module Emittable
+		def emit(redis, stream: "messages")
+			args = to_redis_fields.flat_map { |k, v| [k, v] }
+			redis.xadd(stream, "*", *args).catch do |e|
+				puts "WARN Failed to emit message event: #{e.message}"
+			end
+		end
+	end
+
+	module ValidTel
+		def self.===(value)
+			value.to_s.match?(/\A\+1\d{10}\z/)
+		end
+	end
+
+	class Base
+		include Emittable
+
+		def initialize(event:, timestamp: nil)
+			raise ArgumentError, "event must be a String" unless event.is_a?(String)
+
+			Time.iso8601(timestamp) if timestamp.is_a?(String)
+			@event = event
+			@timestamp = timestamp
+		end
+
+		attr_reader :event, :timestamp
+
+		# We use to_redis_fields instead of to_h because we want to serialize values
+		# (e.g., Time -> ISO8601 string, arrays -> JSON). A plain to_h would return
+		# raw Ruby objects, which is less useful for parsing from other
+		# projects/languages.
+		def to_redis_fields
+			fields = {
+				"event" => @event,
+				"source" => "bwmsgsv2"
+			}
+			if @timestamp
+				fields["timestamp"] = @timestamp.is_a?(Time) ? @timestamp.iso8601 : @timestamp
+			end
+			fields
+		end
+	end
+
+	class Message < Base
+		def initialize(to:, from:, body:, media_urls: [], **kwargs)
+			raise ArgumentError, "from must be a valid US telephone number" unless ValidTel === from
+			raise ArgumentError, "to must be an array" unless to.is_a?(Array)
+			to.each do |tel|
+				raise ArgumentError, "each to must be a valid US telephone number" unless ValidTel === tel
+			end
+			raise ArgumentError, "body must be a String" unless body.is_a?(String)
+			raise ArgumentError, "media_urls must be an Array" unless media_urls.is_a?(Array)
+
+			@from = from
+			@to = to
+			@body = body
+			@media_urls = media_urls
+			super(**kwargs)
+		end
+
+		attr_reader :from, :to, :body, :media_urls
+
+		def to_redis_fields
+			fields = super.merge(
+				"from" => @from,
+				"to" => JSON.dump(@to)
+			)
+			fields["body"] = @body unless @body.nil? || @body.empty?
+			fields["media_urls"] = JSON.dump(@media_urls) unless @media_urls.empty?
+			fields
+		end
+	end
+
+	class In < Message
+		def initialize(owner:, bandwidth_id:, **kwargs)
+			raise ArgumentError, "owner must be a valid US telephone number" unless ValidTel === owner
+			raise ArgumentError, "bandwidth_id must be a String" unless bandwidth_id.is_a?(String)
+
+			@owner = owner
+			@bandwidth_id = bandwidth_id
+			super(event: "in", **kwargs)
+		end
+
+		attr_reader :owner, :bandwidth_id
+
+		def to_redis_fields
+			super.merge(
+				"owner" => @owner,
+				"bandwidth_id" => @bandwidth_id
+			)
+		end
+	end
+
+	class Out < Message
+		def initialize(stanza_id:, bandwidth_id: nil, **kwargs)
+			raise ArgumentError, "stanza_id must be a String" unless stanza_id.is_a?(String)
+			if bandwidth_id && !bandwidth_id.is_a?(String)
+				raise ArgumentError, "bandwidth_id must be a String"
+			end
+
+			@stanza_id = stanza_id
+			@bandwidth_id = bandwidth_id
+			super(event: "out", **kwargs)
+		end
+
+		attr_reader :stanza_id, :bandwidth_id
+
+		def to_redis_fields
+			fields = super.merge("stanza_id" => @stanza_id)
+			fields["bandwidth_id"] = @bandwidth_id if @bandwidth_id
+			fields
+		end
+	end
+
+	class Thru < Message
+		def initialize(stanza_id:, **kwargs)
+			raise ArgumentError, "stanza_id must be a String" unless stanza_id.is_a?(String)
+
+			@stanza_id = stanza_id
+			super(event: "thru", **kwargs)
+		end
+
+		attr_reader :stanza_id
+
+		def to_redis_fields
+			super.merge("stanza_id" => @stanza_id)
+		end
+	end
+
+	class Delivered < Base
+		def initialize(stanza_id:, bandwidth_id:, **kwargs)
+			raise ArgumentError, "stanza_id must be a String" unless stanza_id.is_a?(String)
+			raise ArgumentError, "bandwidth_id must be a String" unless bandwidth_id.is_a?(String)
+
+			@stanza_id = stanza_id
+			@bandwidth_id = bandwidth_id
+			super(event: "delivered", **kwargs)
+		end
+
+		attr_reader :stanza_id, :bandwidth_id
+
+		def to_redis_fields
+			super.merge(
+				"stanza_id" => @stanza_id,
+				"bandwidth_id" => @bandwidth_id
+			)
+		end
+	end
+
+	class Failed < Base
+		def initialize(stanza_id:, bandwidth_id:, error_code:, error_description:, **kwargs)
+			raise ArgumentError, "stanza_id must be a String" unless stanza_id.is_a?(String)
+			raise ArgumentError, "bandwidth_id must be a String" unless bandwidth_id.is_a?(String)
+			raise ArgumentError, "error_code must be a String" unless error_code.is_a?(String)
+			raise ArgumentError, "error_description must be a String" unless error_description.is_a?(String)
+
+			@stanza_id = stanza_id
+			@bandwidth_id = bandwidth_id
+			@error_code = error_code
+			@error_description = error_description
+			super(event: "failed", **kwargs)
+		end
+
+		attr_reader :stanza_id, :bandwidth_id, :error_code, :error_description
+
+		def to_redis_fields
+			super.merge(
+				"stanza_id" => @stanza_id,
+				"bandwidth_id" => @bandwidth_id,
+				"error_code" => @error_code,
+				"error_description" => @error_description
+			)
+		end
+	end
+end

sgx-bwmsgsv2.rb 🔗

@@ -40,6 +40,7 @@ require 'em-synchrony'
 
 require_relative 'lib/bandwidth_error'
 require_relative 'lib/bandwidth_tn_options'
+require_relative 'lib/message_event'
 require_relative 'lib/registration_repo'
 
 Sentry.init
@@ -213,6 +214,9 @@ module SGXbwmsgsv2
 	setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, async: true
 
 	def self.pass_on_message(m, users_num, jid)
+		# Capture destination before modifying m.to
+		dest_num = m.to.node
+
 		# setup delivery receipt; similar to a reply
 		rcpt = ReceiptMessage.new(m.from.stripped)
 		rcpt.from = m.to
@@ -224,6 +228,18 @@ module SGXbwmsgsv2
 		puts 'XRESPONSE0: ' + m.inspect
 		write_to_stream m
 
+		# Emit pass-through event. Thru events don't capture a timestamp because XMPP
+		# stanzas don't carry timestamps for realtime messages, and the Redis stream
+		# ID provides the emit time.
+		oob_url = m.at("oob|x > oob|url", oob: "jabber:x:oob")&.text
+		MessageEvent::Thru.new(
+			from: users_num,
+			to: [dest_num],
+			stanza_id: m.id.to_s,
+			body: m.body.to_s,
+			media_urls: [oob_url].compact
+		).emit(REDIS)
+
 		# send a delivery receipt back to the sender
 		# TODO: send only when requested per XEP-0184
 		# TODO: pass receipts from target if supported
@@ -356,7 +372,19 @@ module SGXbwmsgsv2
 			)),
 			{'Content-Type' => 'application/json'},
 			[201]
-		).catch { |e|
+		).then { |response|
+			parsed = JSON.parse(response) rescue {}
+			MessageEvent::Out.new(
+				timestamp: parsed["time"] || Time.now,
+				from: usern,
+				to: Array(num_dest),
+				stanza_id: s.id.to_s,
+				bandwidth_id: parsed["id"],
+				body: body,
+				media_urls: [murl].compact
+			).emit(REDIS)
+			response
+		}.catch { |e|
 			EMPromise.reject(
 				[:cancel, 'internal-server-error', e.message]
 			)
@@ -1113,6 +1141,41 @@ class WebhookHandler < Goliath::API
 		msg.from = others_num + '@' + ARGV[0]
 		SGXbwmsgsv2.write(msg)
 
+		# Emit event to messages stream
+		case [jparams['direction'], type]
+		when ['in', 'message-received']
+			media_urls = Array(jparams['media']).reject { |u|
+				u.end_with?('.smil', '.txt', '.xml')
+			}
+			MessageEvent::In.new(
+				timestamp: jparams['time'],
+				from: jparams['from'],
+				to: jparams['to'],
+				owner: jparams['owner'],
+				bandwidth_id: jparams['id'],
+				body: jparams['text'].to_s,
+				media_urls: media_urls
+			).emit(REDIS)
+		when ['out', 'message-failed']
+			tag_parts = jparams['tag'].split(/ /, 2)
+			stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
+			MessageEvent::Failed.new(
+				timestamp: jparams['time'],
+				stanza_id: stanza_id,
+				bandwidth_id: jparams['id'],
+				error_code: jparams['errorCode'].to_s,
+				error_description: jparams['description'].to_s
+			).emit(REDIS)
+		when ['out', 'message-delivered']
+			tag_parts = jparams['tag'].split(/ /, 2)
+			stanza_id = WEBrick::HTTPUtils.unescape(tag_parts[0])
+			MessageEvent::Delivered.new(
+				timestamp: jparams['time'],
+				stanza_id: stanza_id,
+				bandwidth_id: jparams['id']
+			).emit(REDIS)
+		end
+
 		[200, {}, "OK"]
 	rescue Exception => e
 		Sentry.capture_exception(e)

test/test_component.rb 🔗

@@ -527,4 +527,192 @@ class ComponentTest < Minitest::Test
 		end
 	end
 	em :test_port_out_pin_validation
+
+	def test_outbound_message_emits_to_stream
+		stub_request(
+			:post,
+			"https://messaging.bandwidth.com/api/v2/users/account/messages"
+		).with(body: hash_including(
+			from: "+15550000000",
+			to: "+15551234567",
+			text: "Hello world"
+		)).to_return(status: 201, body: JSON.dump(id: "bw-msg-123"))
+
+		m = Blather::Stanza::Message.new("+15551234567@component", "Hello world")
+		m.from = "test@example.com/resource"
+		m['id'] = "stanza-123"
+		process_stanza(m)
+
+		entries = REDIS.stream_entries("messages").sync
+		assert_equal 1, entries.length
+
+		event = entries.first[:fields]
+		assert_equal "out", event["event"]
+		assert_equal "+15550000000", event["from"]
+		assert_equal JSON.dump(["+15551234567"]), event["to"]
+		assert_equal "stanza-123", event["stanza_id"]
+		assert_equal "bw-msg-123", event["bandwidth_id"]
+		assert_equal "Hello world", event["body"]
+	end
+	em :test_outbound_message_emits_to_stream
+
+	def test_passthrough_message_emits_to_stream
+		REDIS.set("catapult_jid-+15559999999", "other@example.com")
+		REDIS.set("catapult_cred-other@example.com", [
+			'other_acct', 'other_user', 'other_pw', '+15559999999'
+		])
+
+		m = Blather::Stanza::Message.new("+15559999999@component", "Pass through")
+		m.from = "test@example.com/resource"
+		m['id'] = "passthru-stanza-456"
+		process_stanza(m)
+
+		entries = REDIS.stream_entries("messages").sync
+		assert_equal 1, entries.length
+
+		event = entries.first[:fields]
+		assert_equal "thru", event["event"]
+		assert_equal "+15550000000", event["from"]
+		assert_equal JSON.dump(["+15559999999"]), event["to"]
+		assert_equal "passthru-stanza-456", event["stanza_id"]
+		assert_equal "Pass through", event["body"]
+		refute event.key?("timestamp"), "Thru events should not have a timestamp field"
+	end
+	em :test_passthrough_message_emits_to_stream
+
+	def invoke_webhook(payload)
+		handler = WebhookHandler.new
+		env = {
+			"REQUEST_URI" => "/",
+			"REQUEST_METHOD" => "POST",
+			"params" => {"_json" => [payload]}
+		}
+		handler.instance_variable_set(:@env, env)
+		def handler.params
+			@env["params"]
+		end
+
+		EMPromise.resolve(nil).then {
+			handler.response(env)
+		}.sync
+	end
+
+	def test_inbound_sms_emits_to_stream
+		payload = {
+			"type" => "message-received",
+			"message" => {
+				"id" => "bw-in-123",
+				"direction" => "in",
+				"owner" => "+15550000000",
+				"from" => "+15551234567",
+				"to" => ["+15550000000"],
+				"time" => "2025-01-13T10:00:00Z",
+				"text" => "Hello from outside"
+			}
+		}
+
+		invoke_webhook(payload)
+
+		entries = REDIS.stream_entries("messages").sync
+		assert_equal 1, entries.length
+
+		event = entries.first[:fields]
+		assert_equal "in", event["event"]
+		assert_equal "+15551234567", event["from"]
+		assert_equal JSON.dump(["+15550000000"]), event["to"]
+		assert_equal "bw-in-123", event["bandwidth_id"]
+		assert_equal "+15550000000", event["owner"]
+		assert_equal "Hello from outside", event["body"]
+		assert_equal JSON.dump([]), event["media_urls"]
+	end
+	em :test_inbound_sms_emits_to_stream
+
+	def test_inbound_mms_emits_to_stream_and_filters_smil
+		payload = {
+			"type" => "message-received",
+			"message" => {
+				"id" => "bw-mms-456",
+				"direction" => "in",
+				"owner" => "+15550000000",
+				"from" => "+15551234567",
+				"to" => ["+15550000000"],
+				"time" => "2025-01-13T10:05:00Z",
+				"text" => "Check this out",
+				"media" => [
+					"https://example.com/image.jpg",
+					"https://example.com/file.smil",
+					"https://example.com/data.txt",
+					"https://example.com/meta.xml"
+				]
+			}
+		}
+
+		invoke_webhook(payload)
+
+		entries = REDIS.stream_entries("messages").sync
+		assert_equal 1, entries.length
+
+		event = entries.first[:fields]
+		assert_equal "in", event["event"]
+		assert_equal JSON.dump(["https://example.com/image.jpg"]), event["media_urls"]
+	end
+	em :test_inbound_mms_emits_to_stream_and_filters_smil
+
+	def test_message_delivered_emits_to_stream
+		payload = {
+			"type" => "message-delivered",
+			"message" => {
+				"id" => "bw-out-789",
+				"direction" => "out",
+				"owner" => "+15550000000",
+				"from" => "+15550000000",
+				"to" => ["+15551234567"],
+				"time" => "2025-01-13T10:10:00Z",
+				"tag" => "stanza-id-abc%20extra-data"
+			}
+		}
+
+		invoke_webhook(payload)
+
+		entries = REDIS.stream_entries("messages").sync
+		assert_equal 1, entries.length
+
+		event = entries.first[:fields]
+		assert_equal "delivered", event["event"]
+		assert_equal "stanza-id-abc", event["stanza_id"]
+		assert_equal "bw-out-789", event["bandwidth_id"]
+		assert_equal "2025-01-13T10:10:00Z", event["timestamp"]
+	end
+	em :test_message_delivered_emits_to_stream
+
+	def test_message_failed_emits_to_stream
+		payload = {
+			"type" => "message-failed",
+			"message" => {
+				"id" => "bw-out-999",
+				"direction" => "out",
+				"owner" => "+15550000000",
+				"from" => "+15550000000",
+				"to" => ["+15551234567"],
+				"time" => "2025-01-13T10:15:00Z",
+				"tag" => "failed-stanza-xyz%20extra",
+				"errorCode" => 4720,
+				"description" => "Carrier rejected message"
+			}
+		}
+
+		invoke_webhook(payload)
+
+		entries = REDIS.stream_entries("messages").sync
+		assert_equal 1, entries.length
+
+		event = entries.first[:fields]
+		assert_equal "failed", event["event"]
+		assert_equal "failed-stanza-xyz", event["stanza_id"]
+		assert_equal "bw-out-999", event["bandwidth_id"]
+		assert_equal "4720", event["error_code"]
+		assert_equal "Carrier rejected message", event["error_description"]
+		assert_equal "2025-01-13T10:15:00Z", event["timestamp"]
+	end
+	em :test_message_failed_emits_to_stream
 end

test/test_helper.rb 🔗

@@ -132,6 +132,18 @@ class FakeRedis
 
 	def discard
 	end
+
+	def xadd(stream, id, *args)
+		@values[stream] ||= []
+		entry_id = id == "*" ? "#{Time.now.to_i}-0" : id
+		fields = Hash[*args]
+		@values[stream] << { id: entry_id, fields: fields }
+		EMPromise.resolve(entry_id)
+	end
+
+	def stream_entries(stream)
+		EMPromise.resolve(@values[stream] || [])
+	end
 end
 
 REDIS = FakeRedis.new