Emit message events to a Redis stream

Amolith and Phillip Davis created

Add MessageEvent module (adapted from sgx-bwmsgsv2) and EventEmitter
helpers that record In, Out, and Failed events to a Redis stream via
XADD.

Each emit does a single REGISTRATION_REPO.find lookup to resolve the
owner's phone number from their JID.

Co-authored-by: Phillip Davis <phdavis1027@gmail.com>

Change summary

lib/blather_ext.rb   |  11 ++
lib/event_emitter.rb |  56 +++++++++++
lib/incoming_mms.rb  |   9 +
lib/message_event.rb | 216 +++++++++++++++++++++++++++++++++++++++++++++
sgx_endstream.rb     |  68 +++++++++++---
5 files changed, 345 insertions(+), 15 deletions(-)

Detailed changes

lib/blather_ext.rb 🔗

@@ -6,4 +6,15 @@ module Blather
 			self.class.new(node, domain, resource)
 		end
 	end
+
+	class Stanza
+		# @param message [Blather::Stanza::Message]
+		def recipients(message)
+			if message.addresses
+				message.addresses.map { |a| a.jid.node }
+			else
+				[message.to.node]
+			end
+		end
+	end
 end

lib/event_emitter.rb 🔗

@@ -0,0 +1,56 @@
+# frozen_string_literal: true
+
+require_relative "message_event"
+
+module EventEmitter
+	# @param owner_jid [Blather::JID]
+	def jid_to_owner(owner_jid)
+		REGISTRATION_REPO.find(owner_jid).then { |(tel, *rest)|
+			unless MessageEvent::NanpaTel.match?(tel)
+				log.warn(
+					"Skipping incoming message event: no valid owner mapping",
+					jid: owner_jid.to_s, owner: tel.to_s
+				)
+				next
+			end
+			yield [tel, *rest] if block_given?
+		}
+	end
+
+	def emit_incoming_event(
+		owner_jid,
+		from:, body:, endstream_id:, media_urls: []
+	)
+		jid_to_owner(owner_jid) { |(tel, *)|
+			MessageEvent::In.new(
+				owner: tel, from: from, to: [tel], body: body.to_s,
+				endstream_id: endstream_id, media_urls: media_urls
+			).emit(REDIS)
+		}.catch do |e|
+			log.warn("Failed to emit incoming message event", error: e.message)
+		end
+	end
+
+	def emit_failed_event(endstream_id:, error_code:, error_description:)
+		MessageEvent::Failed.new(
+			endstream_id: endstream_id,
+			error_code: error_code,
+			error_description: error_description
+		).emit(REDIS)
+	end
+
+	def emit_outgoing_event(
+		owner_jid,
+		to:, body:, stanza_id:, media_urls: []
+	)
+		jid_to_owner(owner_jid) { |(owner, *)|
+			MessageEvent::Out.new(
+				owner: owner, from: owner, to: to, body: body.to_s,
+				stanza_id: stanza_id.to_s,
+				media_urls: media_urls
+			).emit(REDIS)
+		}.catch do |e|
+			log.warn("Failed to emit outgoing message event", error: e.message)
+		end
+	end
+end

lib/incoming_mms.rb 🔗

@@ -35,6 +35,15 @@ class IncomingMMS
 		@recip = recip
 	end
 
+	# MMS text parts are separate fetched .txt files; join with paragraph breaks
+	def body_text
+		@txt.empty? ? "" : @txt.join("\n\n")
+	end
+
+	def media_urls
+		@media
+	end
+
 	def unproxied_to
 		ProxiedJID.new(@to).unproxied
 	end

lib/message_event.rb 🔗

@@ -0,0 +1,216 @@
+# frozen_string_literal: true
+
+require "json"
+require "time"
+
+module MessageEvent
+	module Emittable
+		def emit(redis, stream: "messages")
+			args = to_redis_fields.flat_map { |k, v| [k, v] }
+			redis.xadd(stream, "*", *args).catch do |e|
+				puts "WARN Failed to emit message event: #{e.message}"
+			end
+		end
+	end
+
+	def self.strip_prefix(tel)
+		tel.to_s.sub(/\Asms:/, "").strip
+	end
+
+	module ValidTel
+		def self.===(value)
+			match?(value)
+		end
+
+		def self.match?(value)
+			value.to_s.match?(/\A(\+1)?\d+\Z/)
+		end
+	end
+
+	module NanpaTel
+		def self.===(value)
+			match?(value)
+		end
+
+		def self.match?(value)
+			value.to_s.match?(/\A\+1\d{10}\Z/)
+		end
+	end
+
+	class Base
+		include Emittable
+
+		@validators = [
+			lambda {
+				unless @event.is_a?(String)
+					raise ArgumentError, "event must be a String"
+				end
+			}
+		]
+
+		def self.validators
+			parent = superclass.respond_to?(:validators) ? superclass.validators : []
+			parent + (@validators || [])
+		end
+
+		def initialize(event:, timestamp: nil)
+			timestamp = Time.iso8601(timestamp) if timestamp.is_a?(String)
+			@event = event
+			@timestamp = timestamp
+			self.class.validators.each { |validator| instance_exec(&validator) }
+		end
+
+		attr_reader :event, :timestamp
+
+		def to_redis_fields
+			fields = {
+				"event" => @event,
+				"source" => "endstream"
+			}
+			if @timestamp
+				fields["timestamp"] =
+					@timestamp.is_a?(Time) ? @timestamp.iso8601 : @timestamp
+			end
+			fields
+		end
+	end
+
+	class Message < Base
+		@validators = [
+			lambda {
+				unless ValidTel.match?(@owner)
+					raise(
+						ArgumentError,
+						"owner must be a number, optionally prefixed with +"
+					)
+				end
+			},
+			lambda {
+				raise ArgumentError, "to must be an array" unless @to.is_a?(Array)
+			},
+			lambda {
+				unless @to.all? { |t| ValidTel.match?(t) }
+					raise(
+						ArgumentError,
+						"to must be a number, optionally prefixed with +"
+					)
+				end
+			},
+			lambda {
+				unless @body.nil? || @body.is_a?(String)
+					raise ArgumentError, "body must be a String or nil"
+				end
+			},
+			lambda {
+				unless @media_urls.is_a?(Array)
+					raise ArgumentError, "media_urls must be an Array"
+				end
+			}
+		]
+
+		def initialize(to:, from:, body:, owner:, media_urls: [], **kwargs)
+			owner = MessageEvent.strip_prefix(owner)
+			from = MessageEvent.strip_prefix(from)
+			to = to.map { |t| MessageEvent.strip_prefix(t) }
+
+			@from = from
+			@to = to
+			@body = body
+			@media_urls = media_urls
+			@owner = owner
+			super(**kwargs)
+		end
+
+		attr_reader :from, :to, :body, :media_urls
+
+		def to_redis_fields
+			fields = super.merge(
+				"owner" => @owner,
+				"from" => @from,
+				"to" => JSON.dump(@to)
+			)
+			fields["body"] = @body unless @body.nil?
+			fields["media_urls"] = JSON.dump(@media_urls)
+			fields
+		end
+	end
+
+	class In < Message
+		@validators = [
+			lambda {
+				unless @endstream_id.is_a?(String)
+					raise ArgumentError, "endstream_id must be a String"
+				end
+			}
+		]
+
+		def initialize(endstream_id:, **kwargs)
+			@endstream_id = endstream_id
+			super(event: "in", **kwargs)
+		end
+
+		attr_reader :owner, :endstream_id
+
+		def to_redis_fields
+			super.merge("endstream_id" => @endstream_id)
+		end
+	end
+
+	class Failed < Base
+		@validators = [
+			lambda {
+				unless @endstream_id.is_a?(String)
+					raise ArgumentError, "endstream_id must be a String"
+				end
+			},
+			lambda {
+				unless @error_code.is_a?(String)
+					raise ArgumentError, "error_code must be a String"
+				end
+			},
+			lambda {
+				unless @error_description.is_a?(String)
+					raise ArgumentError, "error_description must be a String"
+				end
+			}
+		]
+
+		def initialize(endstream_id:, error_code:, error_description:, **kwargs)
+			@endstream_id = endstream_id
+			@error_code = error_code
+			@error_description = error_description
+			super(event: "failed", **kwargs)
+		end
+
+		attr_reader :endstream_id, :error_code, :error_description
+
+		def to_redis_fields
+			super.merge(
+				"endstream_id" => @endstream_id,
+				"error_code" => @error_code,
+				"error_description" => @error_description
+			)
+		end
+	end
+
+	class Out < Message
+		@validators = [
+			lambda {
+				unless @stanza_id.is_a?(String)
+					raise ArgumentError, "stanza_id must be a String"
+				end
+			}
+		]
+
+		def initialize(stanza_id:, **kwargs)
+			@stanza_id = stanza_id
+			super(event: "out", **kwargs)
+		end
+
+		attr_reader :stanza_id
+
+		def to_redis_fields
+			super.merge("stanza_id" => @stanza_id)
+		end
+	end
+end

sgx_endstream.rb 🔗

@@ -14,6 +14,7 @@ require_relative "lib/addresses"
 require_relative "lib/blather_client"
 require_relative "lib/blather_ext"
 require_relative "lib/em"
+require_relative "lib/event_emitter"
 require_relative "lib/incoming_mms"
 require_relative "lib/oob"
 require_relative "lib/outgoing_mms"
@@ -22,6 +23,7 @@ require_relative "lib/registration_repo"
 
 singleton_class.class_eval do
 	include Blather::DSL
+	include EventEmitter
 
 	Blather::DSL.append_features(self)
 end
@@ -186,24 +188,24 @@ message from: /@sms.chat.1pcom.net\Z/ do |m|
 	else
 		"#{m.from.node};phone-context=ca-us.phone-context.soprani.ca"
 	end
-	m = m.dup
+	m = m.dup.tap { _1.subject = nil } # They send a generic subject
 	m.from = Blather::JID.new(tel, CONFIG[:component][:jid])
 	m.to = ProxiedJID.new(m.to).unproxied
-	m.subject = nil # They send a generic subject for some reason
 
-	if json.is_a?(Hash) && json["response"]
+	if json.is_a?(Hash) && (resp = json["response"])
 		log.info("SMS Status", json)
-		resp = json["response"]
-		m.id = resp["id"]
-		swap = m.from
-		m.from = m.to
-		m.to = swap
-		m.body = ""
-		m = m.as_error(
+		m = m.reply.tap { _1.body = "" }.tap { _1.id = resp["id"] }.as_error(
 			"recipient-unavailable",
 			:cancel,
 			"#{resp['text']} (#{resp['code']} #{resp['subcode']} #{resp['dlrid']})"
 		)
+		emit_failed_event(
+			endstream_id: resp["id"],
+			error_code: "#{resp['code']} #{resp['subcode']}",
+			error_description: resp["text"]
+		)
+	else
+		emit_incoming_event(m.to, from: tel, body: m.body, endstream_id: m.id)
 	end
 
 	self << m
@@ -216,10 +218,19 @@ message from: /@mms.chat.1pcom.net\Z/ do |m|
 		next
 	end
 
-	IncomingMMS.for(m.to, json).then(&:to_stanza).then { |to_send|
+	IncomingMMS.for(m.to, json).then { |incoming|
+		to_send = incoming.to_stanza
 		to_send.id = m.id
 		to_send.from = Blather::JID.new("+#{m.from.node}", CONFIG[:component][:jid])
 		self << to_send
+
+		emit_incoming_event(
+			incoming.unproxied_to,
+			from: "+#{m.from.node}",
+			body: incoming.body_text,
+			endstream_id: m.id,
+			media_urls: incoming.media_urls
+		)
 	}
 end
 
@@ -228,13 +239,28 @@ message type: :error do
 	true
 end
 
+# @parameter m [Blather::Stanza::Message]
+def send_outgoing_mms(m)
+	oobs = m.oobs
+	id = m.id
+	self << OutgoingMMS.for(m).to_stanza(id: id, from: m.from)
+
+	emit_outgoing_event(
+		m.from,
+		to: m.recipients,
+		body: m.body.to_s.sub(oobs.first&.url.to_s, ""), # OOB's already captured
+		stanza_id: id,
+		media_urls: oobs.map(&:url)
+	)
+end
+
 message :addresses, to: Blather::JID.new(CONFIG[:component][:jid]) do |m|
-	self << OutgoingMMS.for(m).to_stanza(id: m.id, from: m.from)
+	send_outgoing_mms(m)
 end
 
 message ->(m) { !m.oobs.empty? }, to: /\A\+?\d+@/ do |m|
 	# TODO: if too big or bad mime, send sms
-	self << OutgoingMMS.for(m).to_stanza(id: m.id, from: m.from)
+	send_outgoing_mms(m)
 end
 
 def too_long_for_sms?(m)
@@ -243,19 +269,31 @@ def too_long_for_sms?(m)
 end
 
 message :body, method(:too_long_for_sms?).to_proc, to: /\A\+?\d+@/ do |m|
-	self << OutgoingMMS.for(m).to_stanza(id: m.id, from: m.from)
+	send_outgoing_mms(m)
 end
 
 message(
 	:body,
 	to: /(?:\A\+?\d+@)|(?:;phone-context=ca-us\.phone-context\.soprani\.ca@)/
 ) do |m|
+	owner_jid = m.from
+	dest = m.to.node
+	body = m.body
+	stanza_id = m.id
+
 	m.to = Blather::JID.new(
-		m.to.node.sub(/\A\+/, "").sub(/;phone-context=.*\Z/, ""),
+		dest.sub(/\A\+/, "").sub(/;phone-context=.*\Z/, ""),
 		"sms.chat.1pcom.net"
 	)
 	m.from = ProxiedJID.proxy(m.from, CONFIG[:component][:jid])
 	self << m
+
+	emit_outgoing_event(
+		owner_jid,
+		to: [dest],
+		body: body,
+		stanza_id: stanza_id
+	)
 end
 
 iq type: [:get, :set] do |iq|