From 51d5802e2782f49f6ade16e36d0caf599a9020ef Mon Sep 17 00:00:00 2001 From: Amolith Date: Tue, 6 Jan 2026 14:04:09 -0700 Subject: [PATCH] Emit messages and statuses to Redis stream - in: inbound message-received - out: outbound after successful Bandwidth API call - thru: pass-through XMPP-to-XMPP messages - delivered: message-delivered callback - failed: message-failed callback Implements: https://todo.sr.ht/~singpolyma/soprani.ca/517 Implements: https://todo.sr.ht/~singpolyma/soprani.ca/518 --- Gemfile | 1 + lib/bandwidth_tn_options.rb | 1 + lib/message_event.rb | 115 +++++++++++++++++++++ sgx-bwmsgsv2.rb | 60 ++++++++++- test/test_component.rb | 197 ++++++++++++++++++++++++++++++++++++ test/test_helper.rb | 12 +++ 6 files changed, 385 insertions(+), 1 deletion(-) create mode 100644 lib/message_event.rb diff --git a/Gemfile b/Gemfile index 706acf7184ab13ece60d1034b69f7e0c28202b28..2b3a97fe1db901a8d9d5a8e2542324313a1aa65c 100644 --- a/Gemfile +++ b/Gemfile @@ -14,6 +14,7 @@ gem 'faraday-em_synchrony' gem 'ruby-bandwidth-iris' gem 'goliath' gem 'lazy_object' +gem 'value_semantics' gem 'log4r' gem 'multibases' gem 'multihashes' diff --git a/lib/bandwidth_tn_options.rb b/lib/bandwidth_tn_options.rb index d893225133d7911c352c8cec2e486cae64d21dec..d4be7f0dc298c6aac022a484b30e33c32e6fb27b 100644 --- a/lib/bandwidth_tn_options.rb +++ b/lib/bandwidth_tn_options.rb @@ -2,6 +2,7 @@ require "em_promise" require "ruby-bandwidth-iris" +require "em-http-request" Faraday.default_adapter = :em_synchrony class BandwidthTNOptions diff --git a/lib/message_event.rb b/lib/message_event.rb new file mode 100644 index 0000000000000000000000000000000000000000..dbd10adf686f1c63dae91f84f2b0f3768b9c550b --- /dev/null +++ b/lib/message_event.rb @@ -0,0 +1,115 @@ +# frozen_string_literal: true + +require "json" +require "value_semantics" + +module MessageEvent + module Emittable + def emit(redis=LazyObject.new { REDIS }, stream: "messages") + args = to_redis_fields.flat_map { |k, v| [k, v] } + redis.xadd(stream, "*", *args).catch do |e| + puts "WARN Failed to emit message event: #{e.message}" + end + end + end + + module ValidTel + def self.===(value) + return value.match?(/\A\+1\d{10}\z/) if value.is_a?(String) + return value.to_s.match?(/\A\+1\d{10}\z/) if value.respond_to?(:to_s) + + false + end + end + + class MessageFields + include ValueSemantics.for_attributes { + event String + timestamp Time, coerce: true + from ValidTel + to ArrayOf(ValidTel) + message_id String + body String + media_urls ArrayOf(String), coerce: true + } + + def self.coerce_timestamp(value) + value.is_a?(String) ? Time.parse(value) : value + end + + def self.coerce_media_urls(value) + value.is_a?(String) ? JSON.parse(value) : value + end + + def to_redis_fields + { + "event" => event, + "timestamp" => timestamp.iso8601, + "from" => from, + "to" => JSON.dump(to), + "message_id" => message_id, + "has_media" => media_urls.any?.to_s, + "body" => body, + "media_urls" => JSON.dump(media_urls) + } + end + end + + %w[In Out Thru].each { |n| + const_set(n, Class.new(MessageFields) { + include Emittable + + const_set(n, lambda { |v| + v == n.downcase + }) + + define_method(:initialize) { |**a| + super(**a.merge(event: n.downcase)) + } + }) + } + + class Delivered + include Emittable + Delivered = lambda { |v| v == "delivered" } + include ValueSemantics.for_attributes { + event Delivered + timestamp Either(Time, String) + message_id String + } + + def initialize(**a) + super(**a.merge(event: "delivered")) + end + + def to_redis_fields = { + "event" => event, + "timestamp" => timestamp.iso8601, + "message_id" => message_id + } + end + + class Failed + include Emittable + Failed = lambda { |v| v == "failed" } + include ValueSemantics.for_attributes { + event Failed + timestamp Either(Time, String) + message_id String + error_code String + error_description String + } + + def initialize(**a) + super(**a.merge(event: "failed")) + end + + def to_redis_fields = { + "event" => event, + "timestamp" => timestamp.is_a?(Time) ? timestamp.iso8601 : timestamp, + "message_id" => message_id, + "error_code" => error_code, + "error_description" => error_description + } + end +end diff --git a/sgx-bwmsgsv2.rb b/sgx-bwmsgsv2.rb index f5b243007e49472945637083e9027f456ae51e31..4bd85ed3b4894c3f7e4c218af803dc06321db68a 100755 --- a/sgx-bwmsgsv2.rb +++ b/sgx-bwmsgsv2.rb @@ -40,6 +40,7 @@ require 'em-synchrony' require_relative 'lib/bandwidth_error' require_relative 'lib/bandwidth_tn_options' +require_relative 'lib/message_event' require_relative 'lib/registration_repo' Sentry.init @@ -213,6 +214,9 @@ module SGXbwmsgsv2 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, async: true def self.pass_on_message(m, users_num, jid) + # Capture destination before modifying m.to + dest_num = m.to.node + # setup delivery receipt; similar to a reply rcpt = ReceiptMessage.new(m.from.stripped) rcpt.from = m.to @@ -224,6 +228,17 @@ module SGXbwmsgsv2 puts 'XRESPONSE0: ' + m.inspect write_to_stream m + # Emit pass-through event + oob_url = m.at("oob|x > oob|url", oob: "jabber:x:oob")&.text + MessageEvent::Thru.new( + timestamp: Time.now, + from: users_num, + to: [dest_num], + message_id: m.id, + body: m.body.to_s, + media_urls: [oob_url].compact + ).emit(REDIS) + # send a delivery receipt back to the sender # TODO: send only when requested per XEP-0184 # TODO: pass receipts from target if supported @@ -356,7 +371,18 @@ module SGXbwmsgsv2 )), {'Content-Type' => 'application/json'}, [201] - ).catch { |e| + ).then { |response| + parsed = JSON.parse(response) rescue {} + MessageEvent::Out.new( + timestamp: Time.now, + from: usern, + to: Array(num_dest), + message_id: parsed["id"] || s.id, + body: body, + media_urls: [murl].compact + ).emit(REDIS) + response + }.catch { |e| EMPromise.reject( [:cancel, 'internal-server-error', e.message] ) @@ -1113,6 +1139,38 @@ class WebhookHandler < Goliath::API msg.from = others_num + '@' + ARGV[0] SGXbwmsgsv2.write(msg) + # Emit event to messages stream + case [jparams['direction'], type] + when ['in', 'message-received'] + media_urls = Array(jparams['media']).reject { |u| + u.end_with?('.smil', '.txt', '.xml') + } + MessageEvent::In.new( + timestamp: jparams['time'], + from: jparams['from'], + to: jparams['to'], + message_id: jparams['id'], + body: jparams['text'].to_s, + media_urls: media_urls + ).emit(REDIS) + when ['out', 'message-failed'] + tag_parts = jparams['tag'].split(/ /, 2) + msg_id = WEBrick::HTTPUtils.unescape(tag_parts[0]) + MessageEvent::Failed.new( + timestamp: jparams['time'], + message_id: msg_id, + error_code: jparams['errorCode'].to_s, + error_description: jparams['description'].to_s + ).emit(REDIS) + when ['out', 'message-delivered'] + tag_parts = jparams['tag'].split(/ /, 2) + msg_id = WEBrick::HTTPUtils.unescape(tag_parts[0]) + MessageEvent::Delivered.new( + timestamp: jparams['time'], + message_id: msg_id + ).emit(REDIS) + end + [200, {}, "OK"] rescue Exception => e Sentry.capture_exception(e) diff --git a/test/test_component.rb b/test/test_component.rb index 9ee3d95cd50b270b6f36f072635bf5012366d7ae..9dd6155bbe00c7038096807027a8b70e8accab4a 100644 --- a/test/test_component.rb +++ b/test/test_component.rb @@ -44,6 +44,16 @@ class ComponentTest < Minitest::Test raise $panic if $panic end + def wait_for(timeout: 1.0, interval: 0.01) + start = Time.now + until yield + raise "Timeout waiting for condition" if Time.now - start > timeout + f = Fiber.current + EM.add_timer(interval) { f.resume } + Fiber.yield + end + end + def test_message_unregistered m = Blather::Stanza::Message.new("+15551234567@component", "a"*4096) m.from = "unknown@example.com" @@ -513,4 +523,191 @@ class ComponentTest < Minitest::Test end end em :test_port_out_pin_validation + + def test_outbound_message_emits_to_stream + stub_request( + :post, + "https://messaging.bandwidth.com/api/v2/users/account/messages" + ).with(body: hash_including( + from: "+15550000000", + to: "+15551234567", + text: "Hello world" + )).to_return(status: 201, body: JSON.dump(id: "bw-msg-123")) + + m = Blather::Stanza::Message.new("+15551234567@component", "Hello world") + m.from = "test@example.com/resource" + m['id'] = "stanza-123" + process_stanza(m) + + entries = REDIS.stream_entries("messages").sync + assert_equal 1, entries.length + + event = entries.first[:fields] + assert_equal "out", event["event"] + assert_equal "+15550000000", event["from"] + assert_equal JSON.dump(["+15551234567"]), event["to"] + assert_equal "bw-msg-123", event["message_id"] + assert_equal "false", event["has_media"] + assert_equal "Hello world", event["body"] + end + em :test_outbound_message_emits_to_stream + + def test_passthrough_message_emits_to_stream + REDIS.set("catapult_jid-+15559999999", "other@example.com") + REDIS.set("catapult_cred-other@example.com", [ + 'other_acct', 'other_user', 'other_pw', '+15559999999' + ]) + + m = Blather::Stanza::Message.new("+15559999999@component", "Pass through") + m.from = "test@example.com/resource" + m['id'] = "passthru-stanza-456" + process_stanza(m) + + entries = REDIS.stream_entries("messages").sync + assert_equal 1, entries.length + + event = entries.first[:fields] + assert_equal "thru", event["event"] + assert_equal "+15550000000", event["from"] + assert_equal JSON.dump(["+15559999999"]), event["to"] + assert_equal "passthru-stanza-456", event["message_id"] + assert_equal "false", event["has_media"] + assert_equal "Pass through", event["body"] + end + em :test_passthrough_message_emits_to_stream + + def invoke_webhook(payload) + handler = WebhookHandler.new + env = { + "REQUEST_URI" => "/", + "REQUEST_METHOD" => "POST", + "params" => {"_json" => [payload]} + } + handler.instance_variable_set(:@env, env) + def handler.params + @env["params"] + end + + EMPromise.resolve(nil).then { + handler.response(env) + }.sync + end + + def test_inbound_sms_emits_to_stream + payload = { + "type" => "message-received", + "message" => { + "id" => "bw-in-123", + "direction" => "in", + "owner" => "+15550000000", + "from" => "+15551234567", + "to" => ["+15550000000"], + "time" => "2025-01-13T10:00:00Z", + "text" => "Hello from outside" + } + } + + invoke_webhook(payload) + + entries = REDIS.stream_entries("messages").sync + assert_equal 1, entries.length + + event = entries.first[:fields] + assert_equal "in", event["event"] + assert_equal "+15551234567", event["from"] + assert_equal JSON.dump(["+15550000000"]), event["to"] + assert_equal "bw-in-123", event["message_id"] + assert_equal "false", event["has_media"] + assert_equal "Hello from outside", event["body"] + assert_equal JSON.dump([]), event["media_urls"] + end + em :test_inbound_sms_emits_to_stream + + def test_inbound_mms_emits_to_stream_and_filters_smil + payload = { + "type" => "message-received", + "message" => { + "id" => "bw-mms-456", + "direction" => "in", + "owner" => "+15550000000", + "from" => "+15551234567", + "to" => ["+15550000000"], + "time" => "2025-01-13T10:05:00Z", + "text" => "Check this out", + "media" => [ + "https://example.com/image.jpg", + "https://example.com/file.smil", + "https://example.com/data.txt", + "https://example.com/meta.xml" + ] + } + } + + invoke_webhook(payload) + + entries = REDIS.stream_entries("messages").sync + assert_equal 1, entries.length + + event = entries.first[:fields] + assert_equal "in", event["event"] + assert_equal "true", event["has_media"] + assert_equal JSON.dump(["https://example.com/image.jpg"]), event["media_urls"] + end + em :test_inbound_mms_emits_to_stream_and_filters_smil + + def test_message_delivered_emits_to_stream + payload = { + "type" => "message-delivered", + "message" => { + "id" => "bw-out-789", + "direction" => "out", + "owner" => "+15550000000", + "from" => "+15550000000", + "to" => ["+15551234567"], + "time" => "2025-01-13T10:10:00Z", + "tag" => "stanza-id-abc%20extra-data" + } + } + + invoke_webhook(payload) + + entries = REDIS.stream_entries("messages").sync + assert_equal 1, entries.length + + event = entries.first[:fields] + assert_equal "delivered", event["event"] + assert_equal "stanza-id-abc", event["message_id"] + assert_equal "2025-01-13T10:10:00Z", event["timestamp"] + end + em :test_message_delivered_emits_to_stream + + def test_message_failed_emits_to_stream + payload = { + "type" => "message-failed", + "message" => { + "id" => "bw-out-999", + "direction" => "out", + "owner" => "+15550000000", + "from" => "+15550000000", + "to" => ["+15551234567"], + "time" => "2025-01-13T10:15:00Z", + "tag" => "failed-stanza-xyz%20extra", + "errorCode" => 4720, + "description" => "Carrier rejected message" + } + } + + invoke_webhook(payload) + + entries = REDIS.stream_entries("messages").sync + assert_equal 1, entries.length + + event = entries.first[:fields] + assert_equal "failed", event["event"] + assert_equal "failed-stanza-xyz", event["message_id"] + assert_equal "4720", event["error_code"] + assert_equal "Carrier rejected message", event["error_description"] + assert_equal "2025-01-13T10:15:00Z", event["timestamp"] + end + em :test_message_failed_emits_to_stream end diff --git a/test/test_helper.rb b/test/test_helper.rb index 78191373b3cc4322d5707cfdfdedcf60e90ea1ba..a3154226b5142244e3426fa1038c6b5b766c1523 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -131,6 +131,18 @@ class FakeRedis def discard end + + def xadd(stream, id, *args) + @values[stream] ||= [] + entry_id = id == "*" ? "#{Time.now.to_i}-0" : id + fields = Hash[*args] + @values[stream] << { id: entry_id, fields: fields } + EMPromise.resolve(entry_id) + end + + def stream_entries(stream) + EMPromise.resolve(@values[stream] || []) + end end REDIS = FakeRedis.new