From d358898c16900109780e3464ef11541d93ac138c Mon Sep 17 00:00:00 2001 From: Amolith Date: Mon, 23 Mar 2026 11:13:33 -0600 Subject: [PATCH] Emit message events to a Redis stream Add MessageEvent module (adapted from sgx-bwmsgsv2) and EventEmitter helpers that record In, Out, and Failed events to a Redis stream via XADD. Each emit does a single REGISTRATION_REPO.find lookup to resolve the owner's phone number from their JID. Co-authored-by: Phillip Davis --- lib/blather_ext.rb | 11 +++ lib/event_emitter.rb | 56 +++++++++++ lib/incoming_mms.rb | 9 ++ lib/message_event.rb | 216 +++++++++++++++++++++++++++++++++++++++++++ sgx_endstream.rb | 68 +++++++++++--- 5 files changed, 345 insertions(+), 15 deletions(-) create mode 100644 lib/event_emitter.rb create mode 100644 lib/message_event.rb diff --git a/lib/blather_ext.rb b/lib/blather_ext.rb index eb85cf55185873997d6fefb383d009986e4c265f..f8ca01a17c3ad8b6a8a22bb73dd1af818e40a6c6 100644 --- a/lib/blather_ext.rb +++ b/lib/blather_ext.rb @@ -6,4 +6,15 @@ module Blather self.class.new(node, domain, resource) end end + + class Stanza + # @param message [Blather::Stanza::Message] + def recipients(message) + if message.addresses + message.addresses.map { |a| a.jid.node } + else + [message.to.node] + end + end + end end diff --git a/lib/event_emitter.rb b/lib/event_emitter.rb new file mode 100644 index 0000000000000000000000000000000000000000..d55837b4e24a59d12af8b2ac921780b6db90fff9 --- /dev/null +++ b/lib/event_emitter.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +require_relative "message_event" + +module EventEmitter + # @param owner_jid [Blather::JID] + def jid_to_owner(owner_jid) + REGISTRATION_REPO.find(owner_jid).then { |(tel, *rest)| + unless MessageEvent::NanpaTel.match?(tel) + log.warn( + "Skipping incoming message event: no valid owner mapping", + jid: owner_jid.to_s, owner: tel.to_s + ) + next + end + yield [tel, *rest] if block_given? + } + end + + def emit_incoming_event( + owner_jid, + from:, body:, endstream_id:, media_urls: [] + ) + jid_to_owner(owner_jid) { |(tel, *)| + MessageEvent::In.new( + owner: tel, from: from, to: [tel], body: body.to_s, + endstream_id: endstream_id, media_urls: media_urls + ).emit(REDIS) + }.catch do |e| + log.warn("Failed to emit incoming message event", error: e.message) + end + end + + def emit_failed_event(endstream_id:, error_code:, error_description:) + MessageEvent::Failed.new( + endstream_id: endstream_id, + error_code: error_code, + error_description: error_description + ).emit(REDIS) + end + + def emit_outgoing_event( + owner_jid, + to:, body:, stanza_id:, media_urls: [] + ) + jid_to_owner(owner_jid) { |(owner, *)| + MessageEvent::Out.new( + owner: owner, from: owner, to: to, body: body.to_s, + stanza_id: stanza_id.to_s, + media_urls: media_urls + ).emit(REDIS) + }.catch do |e| + log.warn("Failed to emit outgoing message event", error: e.message) + end + end +end diff --git a/lib/incoming_mms.rb b/lib/incoming_mms.rb index 61b4509b77526f2a699260195077cfbf30241219..77d4b05b94f2a3a0059f7e23761f6139f9285993 100644 --- a/lib/incoming_mms.rb +++ b/lib/incoming_mms.rb @@ -35,6 +35,15 @@ class IncomingMMS @recip = recip end + # MMS text parts are separate fetched .txt files; join with paragraph breaks + def body_text + @txt.empty? ? "" : @txt.join("\n\n") + end + + def media_urls + @media + end + def unproxied_to ProxiedJID.new(@to).unproxied end diff --git a/lib/message_event.rb b/lib/message_event.rb new file mode 100644 index 0000000000000000000000000000000000000000..de2e7516e4a93308c924656bd6e6480748358a9a --- /dev/null +++ b/lib/message_event.rb @@ -0,0 +1,216 @@ +# frozen_string_literal: true + +require "json" +require "time" + +module MessageEvent + module Emittable + def emit(redis, stream: "messages") + args = to_redis_fields.flat_map { |k, v| [k, v] } + redis.xadd(stream, "*", *args).catch do |e| + puts "WARN Failed to emit message event: #{e.message}" + end + end + end + + def self.strip_prefix(tel) + tel.to_s.sub(/\Asms:/, "").strip + end + + module ValidTel + def self.===(value) + match?(value) + end + + def self.match?(value) + value.to_s.match?(/\A(\+1)?\d+\Z/) + end + end + + module NanpaTel + def self.===(value) + match?(value) + end + + def self.match?(value) + value.to_s.match?(/\A\+1\d{10}\Z/) + end + end + + class Base + include Emittable + + @validators = [ + lambda { + unless @event.is_a?(String) + raise ArgumentError, "event must be a String" + end + } + ] + + def self.validators + parent = superclass.respond_to?(:validators) ? superclass.validators : [] + parent + (@validators || []) + end + + def initialize(event:, timestamp: nil) + timestamp = Time.iso8601(timestamp) if timestamp.is_a?(String) + @event = event + @timestamp = timestamp + self.class.validators.each { |validator| instance_exec(&validator) } + end + + attr_reader :event, :timestamp + + def to_redis_fields + fields = { + "event" => @event, + "source" => "endstream" + } + if @timestamp + fields["timestamp"] = + @timestamp.is_a?(Time) ? @timestamp.iso8601 : @timestamp + end + fields + end + end + + class Message < Base + @validators = [ + lambda { + unless ValidTel.match?(@owner) + raise( + ArgumentError, + "owner must be a number, optionally prefixed with +" + ) + end + }, + lambda { + raise ArgumentError, "to must be an array" unless @to.is_a?(Array) + }, + lambda { + unless @to.all? { |t| ValidTel.match?(t) } + raise( + ArgumentError, + "to must be a number, optionally prefixed with +" + ) + end + }, + lambda { + unless @body.nil? || @body.is_a?(String) + raise ArgumentError, "body must be a String or nil" + end + }, + lambda { + unless @media_urls.is_a?(Array) + raise ArgumentError, "media_urls must be an Array" + end + } + ] + + def initialize(to:, from:, body:, owner:, media_urls: [], **kwargs) + owner = MessageEvent.strip_prefix(owner) + from = MessageEvent.strip_prefix(from) + to = to.map { |t| MessageEvent.strip_prefix(t) } + + @from = from + @to = to + @body = body + @media_urls = media_urls + @owner = owner + super(**kwargs) + end + + attr_reader :from, :to, :body, :media_urls + + def to_redis_fields + fields = super.merge( + "owner" => @owner, + "from" => @from, + "to" => JSON.dump(@to) + ) + fields["body"] = @body unless @body.nil? + fields["media_urls"] = JSON.dump(@media_urls) + fields + end + end + + class In < Message + @validators = [ + lambda { + unless @endstream_id.is_a?(String) + raise ArgumentError, "endstream_id must be a String" + end + } + ] + + def initialize(endstream_id:, **kwargs) + @endstream_id = endstream_id + super(event: "in", **kwargs) + end + + attr_reader :owner, :endstream_id + + def to_redis_fields + super.merge("endstream_id" => @endstream_id) + end + end + + class Failed < Base + @validators = [ + lambda { + unless @endstream_id.is_a?(String) + raise ArgumentError, "endstream_id must be a String" + end + }, + lambda { + unless @error_code.is_a?(String) + raise ArgumentError, "error_code must be a String" + end + }, + lambda { + unless @error_description.is_a?(String) + raise ArgumentError, "error_description must be a String" + end + } + ] + + def initialize(endstream_id:, error_code:, error_description:, **kwargs) + @endstream_id = endstream_id + @error_code = error_code + @error_description = error_description + super(event: "failed", **kwargs) + end + + attr_reader :endstream_id, :error_code, :error_description + + def to_redis_fields + super.merge( + "endstream_id" => @endstream_id, + "error_code" => @error_code, + "error_description" => @error_description + ) + end + end + + class Out < Message + @validators = [ + lambda { + unless @stanza_id.is_a?(String) + raise ArgumentError, "stanza_id must be a String" + end + } + ] + + def initialize(stanza_id:, **kwargs) + @stanza_id = stanza_id + super(event: "out", **kwargs) + end + + attr_reader :stanza_id + + def to_redis_fields + super.merge("stanza_id" => @stanza_id) + end + end +end diff --git a/sgx_endstream.rb b/sgx_endstream.rb index 49b41360eb40a1e67cac82dfe5961a919bdcada6..5a1543ae205bece9a5eb9c18e1285dcc667d844b 100755 --- a/sgx_endstream.rb +++ b/sgx_endstream.rb @@ -14,6 +14,7 @@ require_relative "lib/addresses" require_relative "lib/blather_client" require_relative "lib/blather_ext" require_relative "lib/em" +require_relative "lib/event_emitter" require_relative "lib/incoming_mms" require_relative "lib/oob" require_relative "lib/outgoing_mms" @@ -22,6 +23,7 @@ require_relative "lib/registration_repo" singleton_class.class_eval do include Blather::DSL + include EventEmitter Blather::DSL.append_features(self) end @@ -186,24 +188,24 @@ message from: /@sms.chat.1pcom.net\Z/ do |m| else "#{m.from.node};phone-context=ca-us.phone-context.soprani.ca" end - m = m.dup + m = m.dup.tap { _1.subject = nil } # They send a generic subject m.from = Blather::JID.new(tel, CONFIG[:component][:jid]) m.to = ProxiedJID.new(m.to).unproxied - m.subject = nil # They send a generic subject for some reason - if json.is_a?(Hash) && json["response"] + if json.is_a?(Hash) && (resp = json["response"]) log.info("SMS Status", json) - resp = json["response"] - m.id = resp["id"] - swap = m.from - m.from = m.to - m.to = swap - m.body = "" - m = m.as_error( + m = m.reply.tap { _1.body = "" }.tap { _1.id = resp["id"] }.as_error( "recipient-unavailable", :cancel, "#{resp['text']} (#{resp['code']} #{resp['subcode']} #{resp['dlrid']})" ) + emit_failed_event( + endstream_id: resp["id"], + error_code: "#{resp['code']} #{resp['subcode']}", + error_description: resp["text"] + ) + else + emit_incoming_event(m.to, from: tel, body: m.body, endstream_id: m.id) end self << m @@ -216,10 +218,19 @@ message from: /@mms.chat.1pcom.net\Z/ do |m| next end - IncomingMMS.for(m.to, json).then(&:to_stanza).then { |to_send| + IncomingMMS.for(m.to, json).then { |incoming| + to_send = incoming.to_stanza to_send.id = m.id to_send.from = Blather::JID.new("+#{m.from.node}", CONFIG[:component][:jid]) self << to_send + + emit_incoming_event( + incoming.unproxied_to, + from: "+#{m.from.node}", + body: incoming.body_text, + endstream_id: m.id, + media_urls: incoming.media_urls + ) } end @@ -228,13 +239,28 @@ message type: :error do true end +# @parameter m [Blather::Stanza::Message] +def send_outgoing_mms(m) + oobs = m.oobs + id = m.id + self << OutgoingMMS.for(m).to_stanza(id: id, from: m.from) + + emit_outgoing_event( + m.from, + to: m.recipients, + body: m.body.to_s.sub(oobs.first&.url.to_s, ""), # OOB's already captured + stanza_id: id, + media_urls: oobs.map(&:url) + ) +end + message :addresses, to: Blather::JID.new(CONFIG[:component][:jid]) do |m| - self << OutgoingMMS.for(m).to_stanza(id: m.id, from: m.from) + send_outgoing_mms(m) end message ->(m) { !m.oobs.empty? }, to: /\A\+?\d+@/ do |m| # TODO: if too big or bad mime, send sms - self << OutgoingMMS.for(m).to_stanza(id: m.id, from: m.from) + send_outgoing_mms(m) end def too_long_for_sms?(m) @@ -243,19 +269,31 @@ def too_long_for_sms?(m) end message :body, method(:too_long_for_sms?).to_proc, to: /\A\+?\d+@/ do |m| - self << OutgoingMMS.for(m).to_stanza(id: m.id, from: m.from) + send_outgoing_mms(m) end message( :body, to: /(?:\A\+?\d+@)|(?:;phone-context=ca-us\.phone-context\.soprani\.ca@)/ ) do |m| + owner_jid = m.from + dest = m.to.node + body = m.body + stanza_id = m.id + m.to = Blather::JID.new( - m.to.node.sub(/\A\+/, "").sub(/;phone-context=.*\Z/, ""), + dest.sub(/\A\+/, "").sub(/;phone-context=.*\Z/, ""), "sms.chat.1pcom.net" ) m.from = ProxiedJID.proxy(m.from, CONFIG[:component][:jid]) self << m + + emit_outgoing_event( + owner_jid, + to: [dest], + body: body, + stanza_id: stanza_id + ) end iq type: [:get, :set] do |iq|