blather_notify.rb

  1# frozen_string_literal: true
  2
  3require "blather/client/dsl"
  4require "em_promise"
  5require "timeout"
  6
  7module BlatherNotify
  8	extend Blather::DSL
  9
 10	class PubSub
 11		class Address
 12			attr_reader :node, :server
 13
 14			def initialize(node:, server:)
 15				@node = node
 16				@server = server
 17			end
 18
 19			def to_uri
 20				"xmpp:#{@server}?;node=#{@node}"
 21			end
 22		end
 23
 24		def initialize(blather, addr)
 25			@blather = blather
 26			@addr = addr
 27		end
 28
 29		def publish(xml)
 30			@blather.write_with_promise(
 31				Blather::Stanza::PubSub::Publish.new(
 32					@addr.server,
 33					@addr.node,
 34					:set,
 35					xml
 36				)
 37			)
 38		end
 39	end
 40
 41	@ready = Queue.new
 42
 43	when_ready { @ready << :ready }
 44
 45	def self.start(jid, password, default_pubsub_addr: nil)
 46		# workqueue_count MUST be 0 or else Blather uses threads!
 47		setup(jid, password, nil, nil, nil, nil, workqueue_count: 0)
 48		set_default_pubsub(default_pubsub_addr)
 49
 50		EM.error_handler(&method(:panic))
 51
 52		@thread = Thread.new {
 53			EM.run do
 54				client.run
 55			end
 56		}
 57
 58		Timeout.timeout(30) { @ready.pop }
 59		at_exit { wait_then_exit }
 60	end
 61
 62	def self.panic(e)
 63		warn e.message
 64		warn e.backtrace
 65		exit! 2
 66	end
 67
 68	def self.wait_then_exit
 69		disconnected { EM.stop }
 70		EM.add_timer(30) { EM.stop }
 71		shutdown
 72		@thread.join
 73	end
 74
 75	def self.timeout_promise(promise, timeout: 15)
 76		timer = EM.add_timer(timeout) {
 77			promise.reject(:timeout)
 78		}
 79
 80		promise.then do
 81			timer.cancel
 82		end
 83	end
 84
 85	def self.write_with_promise(stanza)
 86		promise = EMPromise.new
 87		timeout_promise(promise)
 88
 89		client.write_with_handler(stanza) do |s|
 90			if s.error? || s.type == :error
 91				promise.reject(s)
 92			else
 93				promise.fulfill(s)
 94			end
 95		end
 96		promise
 97	end
 98
 99	def self.command(node, sessionid=nil, action: :execute, form: nil)
100		Blather::Stanza::Iq::Command.new.tap do |cmd|
101			cmd.to = CONFIG[:sgx_jmp]
102			cmd.node = node
103			cmd.command[:sessionid] = sessionid if sessionid
104			cmd.action = action
105			cmd.command << form if form
106		end
107	end
108
109	def self.execute(command_node, form=nil)
110		write_with_promise(command(command_node)).then do |iq|
111			next iq unless form
112
113			write_with_promise(command(command_node, iq.sessionid, form: form))
114		end
115	end
116
117	def self.pubsub(addr)
118		PubSub.new(self, addr)
119	end
120
121	def self.set_default_pubsub(addr)
122		@default_pubsub = addr && pubsub(addr)
123	end
124
125	def self.publish(xml)
126		raise "No default pubsub set!" unless @default_pubsub
127
128		@default_pubsub.publish(xml)
129	end
130end