@@ -0,0 +1,138 @@
+# frozen_string_literal: true
+
+require "blather/client/dsl"
+require "em_promise"
+require "timeout"
+
+module BlatherNotify
+ extend Blather::DSL
+
+ class PubSub
+ class Address
+ attr_reader :node, :server
+
+ def initialize(node:, server:)
+ @node = node
+ @server = server
+ end
+
+ def to_uri
+ "xmpp:#{@server}?;node=#{@node}"
+ end
+ end
+
+ def initialize(blather, addr)
+ @blather = blather
+ @addr = addr
+ end
+
+ def publish(xml)
+ @blather.write_with_promise(
+ Blather::Stanza::PubSub::Publish.new(
+ @addr.server,
+ @addr.node,
+ :set,
+ xml
+ )
+ )
+ end
+ end
+
+ @ready = Queue.new
+
+ when_ready { @ready << :ready }
+
+ def self.start(jid, password, default_pubsub_addr: nil)
+ # workqueue_count MUST be 0 or else Blather uses threads!
+ setup(jid, password, nil, nil, nil, nil, workqueue_count: 0)
+ set_default_pubsub(default_pubsub_addr)
+
+ EM.error_handler(&method(:panic))
+
+ EM.next_tick { client.run }
+
+ block_until_ready
+ end
+
+ def self.block_until_ready
+ if EM.reactor_running?
+ promise = EMPromise.new
+ disconnected { true.tap { EM.next_tick { EM.stop } } }
+ Thread.new { promise.fulfill(@ready.pop) }
+ timeout_promise(promise, timeout: 30)
+ else
+ @thread = Thread.new { EM.run }
+ Timeout.timeout(30) { @ready.pop }
+ at_exit { wait_then_exit }
+ end
+ end
+
+ def self.panic(e)
+ warn e.message
+ warn e.backtrace
+ exit! 2
+ end
+
+ def self.wait_then_exit
+ disconnected { EM.stop }
+ EM.add_timer(30) { EM.stop }
+ shutdown
+ @thread&.join
+ end
+
+ def self.timeout_promise(promise, timeout: 15)
+ timer = EventMachine::Timer.new(timeout) {
+ promise.reject(:timeout)
+ }
+
+ promise.then do
+ timer.cancel
+ end
+ end
+
+ def self.write_with_promise(stanza)
+ promise = EMPromise.new
+ timeout_promise(promise)
+
+ client.write_with_handler(stanza) do |s|
+ if s.error? || s.type == :error
+ promise.reject(s)
+ else
+ promise.fulfill(s)
+ end
+ end
+ promise
+ end
+
+ def self.command(node, sessionid=nil, action: :execute, form: nil)
+ Blather::Stanza::Iq::Command.new.tap do |cmd|
+ cmd.to = CONFIG[:sgx_jmp]
+ cmd.node = node
+ cmd.command[:sessionid] = sessionid if sessionid
+ cmd.action = action
+ cmd.command << form if form
+ end
+ end
+
+ def self.execute(command_node, form=nil)
+ write_with_promise(command(command_node)).then do |iq|
+ next iq unless form
+
+ write_with_promise(command(command_node, iq.sessionid, form: form))
+ end
+ end
+
+ def self.pubsub(addr)
+ PubSub.new(self, addr)
+ end
+
+ def self.set_default_pubsub(addr)
+ @default_pubsub = addr && pubsub(addr)
+ end
+
+ def self.publish(xml)
+ raise "No default pubsub set!" unless @default_pubsub
+
+ @default_pubsub.publish(xml)
+ end
+end
@@ -18,4 +18,16 @@ module FormToH
params.to_h
end
end
+
+ refine ::Hash do
+ def to_fields
+ map { |k, v| { var: k.to_s, value: v.to_s } }
+ end
+
+ def to_form(type)
+ Blather::Stanza::Iq::X.new(type).tap do |form|
+ form.fields = to_fields
+ end
+ end
+ end
end