1# frozen_string_literal: true
  2
  3require "blather/client/dsl"
  4require "em_promise"
  5require "timeout"
  6
  7require "ostruct"
  8require "securerandom"
  9require_relative "form_to_h"
 10
 11module BlatherNotify
 12	extend Blather::DSL
 13
 14	class PubSub
 15		class Address
 16			attr_reader :node, :server
 17
 18			def initialize(node:, server:)
 19				@node = node
 20				@server = server
 21			end
 22
 23			def to_uri
 24				"xmpp:#{@server}?;node=#{@node}"
 25			end
 26		end
 27
 28		def initialize(blather, addr)
 29			@blather = blather
 30			@addr = addr
 31		end
 32
 33		def publish(xml)
 34			@blather.write_with_promise(
 35				Blather::Stanza::PubSub::Publish.new(
 36					@addr.server,
 37					@addr.node,
 38					:set,
 39					xml
 40				)
 41			)
 42		end
 43	end
 44
 45	class CommandExecution
 46		using FormToH
 47
 48		class FormErrorResponse < RuntimeError; end
 49
 50		def initialize(blather, server, node)
 51			@blather = blather
 52			@server = server
 53			@node = node
 54			@sessionid = nil
 55		end
 56
 57		def fetch_and_submit(**form)
 58			get_form.then { |response|
 59				@sessionid ||= response.sessionid
 60
 61				validate_form(form, response.form)
 62
 63				@blather.write_with_promise(@blather.command(
 64					@node, @sessionid, form: form.to_form(:submit), server: @server
 65				))
 66			}.then(&method(:check_for_error)).then { |response|
 67				@last_response = response
 68				OpenStruct.new(response.form.to_h)
 69			}
 70		end
 71
 72	protected
 73
 74		def check_for_error(response)
 75			if response.note&.[]("type") == "error"
 76				raise FormErrorResponse, response.note.text
 77			end
 78
 79			response
 80		end
 81
 82		def validate_form(to_submit, received)
 83			to_submit.each_key do |key|
 84				raise "No field #{key}" unless received.field(key.to_s)
 85			end
 86		end
 87
 88		def get_form
 89			# If we already got a form on the last submit then
 90			# assume we should fill that out here.
 91			# If not, then move next to find the form
 92			if @last_response&.form&.form?
 93				EMPromise.resolve(@last_response)
 94			else
 95				@blather.write_with_promise(@blather.command(
 96					@node,
 97					@sessionid,
 98					server: @server
 99				))
100			end
101		end
102	end
103
104	@ready = Queue.new
105
106	when_ready { @ready << :ready }
107
108	def self.start(jid, password, default_pubsub_addr: nil)
109		# workqueue_count MUST be 0 or else Blather uses threads!
110		setup(jid, password, nil, nil, nil, nil, workqueue_count: 0)
111		set_default_pubsub(default_pubsub_addr)
112
113		EM.error_handler(&method(:panic))
114
115		EM.next_tick { client.run }
116
117		block_until_ready
118	end
119
120	def self.block_until_ready
121		if EM.reactor_running?
122			promise = EMPromise.new
123			disconnected { true.tap { EM.next_tick { EM.stop } } }
124			Thread.new { promise.fulfill(@ready.pop) }
125			timeout_promise(promise, timeout: 30)
126		else
127			@thread = Thread.new { EM.run }
128			Timeout.timeout(30) { @ready.pop }
129			at_exit { wait_then_exit }
130		end
131	end
132
133	def self.panic(e)
134		warn e.message
135		warn e.backtrace
136		exit! 2
137	end
138
139	def self.wait_then_exit
140		disconnected { EM.stop }
141		EM.add_timer(30) { EM.stop }
142		shutdown
143		@thread&.join
144	end
145
146	def self.timeout_promise(promise, timeout: 30)
147		timer = EventMachine::Timer.new(timeout) {
148			promise.reject(:timeout)
149		}
150
151		promise.then do
152			timer.cancel
153		end
154	end
155
156	def self.write_with_promise(stanza)
157		promise = EMPromise.new
158		timeout_promise(promise)
159
160		client.write_with_handler(stanza) do |s|
161			if s.error? || s.type == :error
162				promise.reject(s)
163			else
164				promise.fulfill(s)
165			end
166		end
167		promise
168	end
169
170	def self.command(
171		node, sessionid=nil,
172		server: CONFIG[:sgx_jmp], action: :execute, form: nil
173	)
174		Blather::Stanza::Iq::Command.new.tap do |cmd|
175			cmd.to = server
176			cmd.node = node
177			cmd.command[:sessionid] = sessionid if sessionid
178			cmd.action = action
179			cmd.command << form if form
180		end
181	end
182
183	def self.execute(command_node, form=nil)
184		write_with_promise(command(command_node)).then do |iq|
185			next iq unless form
186
187			write_with_promise(command(command_node, iq.sessionid, form: form))
188		end
189	end
190
191	def self.pubsub(addr)
192		PubSub.new(self, addr)
193	end
194
195	def self.set_default_pubsub(addr)
196		@default_pubsub = addr && pubsub(addr)
197	end
198
199	def self.publish(xml)
200		raise "No default pubsub set!" unless @default_pubsub
201
202		@default_pubsub.publish(xml)
203	end
204
205	def self.command_execution(server, node)
206		CommandExecution.new(self, server, node)
207	end
208end