blather_notify.rb

  1# frozen_string_literal: true
  2
  3require "blather/client/dsl"
  4require "em_promise"
  5require "timeout"
  6
  7module BlatherNotify
  8	extend Blather::DSL
  9
 10	class PubSub
 11		class Address
 12			attr_reader :node, :server
 13
 14			def initialize(node:, server:)
 15				@node = node
 16				@server = server
 17			end
 18
 19			def to_uri
 20				"xmpp:#{@server}?;node=#{@node}"
 21			end
 22		end
 23
 24		def initialize(blather, addr)
 25			@blather = blather
 26			@addr = addr
 27		end
 28
 29		def publish(xml)
 30			@blather.write_with_promise(
 31				Blather::Stanza::PubSub::Publish.new(
 32					@addr.server,
 33					@addr.node,
 34					:set,
 35					xml
 36				)
 37			)
 38		end
 39	end
 40
 41	@ready = Queue.new
 42
 43	when_ready { @ready << :ready }
 44
 45	def self.start(jid, password, default_pubsub_addr: nil)
 46		# workqueue_count MUST be 0 or else Blather uses threads!
 47		setup(jid, password, nil, nil, nil, nil, workqueue_count: 0)
 48		set_default_pubsub(default_pubsub_addr)
 49
 50		EM.error_handler(&method(:panic))
 51
 52		EM.next_tick { client.run }
 53
 54		block_until_ready
 55	end
 56
 57	def self.block_until_ready
 58		if EM.reactor_running?
 59			promise = EMPromise.new
 60			disconnected { true.tap { EM.next_tick { EM.stop } } }
 61			Thread.new { promise.fulfill(@ready.pop) }
 62			timeout_promise(promise, timeout: 30)
 63		else
 64			@thread = Thread.new { EM.run }
 65			Timeout.timeout(30) { @ready.pop }
 66			at_exit { wait_then_exit }
 67		end
 68	end
 69
 70	def self.panic(e)
 71		warn e.message
 72		warn e.backtrace
 73		exit! 2
 74	end
 75
 76	def self.wait_then_exit
 77		disconnected { EM.stop }
 78		EM.add_timer(30) { EM.stop }
 79		shutdown
 80		@thread&.join
 81	end
 82
 83	def self.timeout_promise(promise, timeout: 15)
 84		timer = EventMachine::Timer.new(timeout) {
 85			promise.reject(:timeout)
 86		}
 87
 88		promise.then do
 89			timer.cancel
 90		end
 91	end
 92
 93	def self.write_with_promise(stanza)
 94		promise = EMPromise.new
 95		timeout_promise(promise)
 96
 97		client.write_with_handler(stanza) do |s|
 98			if s.error? || s.type == :error
 99				promise.reject(s)
100			else
101				promise.fulfill(s)
102			end
103		end
104		promise
105	end
106
107	def self.command(node, sessionid=nil, action: :execute, form: nil)
108		Blather::Stanza::Iq::Command.new.tap do |cmd|
109			cmd.to = CONFIG[:sgx_jmp]
110			cmd.node = node
111			cmd.command[:sessionid] = sessionid if sessionid
112			cmd.action = action
113			cmd.command << form if form
114		end
115	end
116
117	def self.execute(command_node, form=nil)
118		write_with_promise(command(command_node)).then do |iq|
119			next iq unless form
120
121			write_with_promise(command(command_node, iq.sessionid, form: form))
122		end
123	end
124
125	def self.pubsub(addr)
126		PubSub.new(self, addr)
127	end
128
129	def self.set_default_pubsub(addr)
130		@default_pubsub = addr && pubsub(addr)
131	end
132
133	def self.publish(xml)
134		raise "No default pubsub set!" unless @default_pubsub
135
136		@default_pubsub.publish(xml)
137	end
138end