1# frozen_string_literal: true
2
3require "blather/client/dsl"
4require "em_promise"
5require "timeout"
6
7require "ostruct"
8require "securerandom"
9require_relative "form_to_h"
10
11module BlatherNotify
12 extend Blather::DSL
13
14 class PubSub
15 class Address
16 attr_reader :node, :server
17
18 def initialize(node:, server:)
19 @node = node
20 @server = server
21 end
22
23 def to_uri
24 "xmpp:#{@server}?;node=#{@node}"
25 end
26 end
27
28 def initialize(blather, addr)
29 @blather = blather
30 @addr = addr
31 end
32
33 def publish(xml)
34 @blather.write_with_promise(
35 Blather::Stanza::PubSub::Publish.new(
36 @addr.server,
37 @addr.node,
38 :set,
39 xml
40 )
41 )
42 end
43 end
44
45 class CommandExecution
46 using FormToH
47
48 class FormErrorResponse < RuntimeError; end
49
50 def initialize(blather, server, node)
51 @blather = blather
52 @server = server
53 @node = node
54 @sessionid = nil
55 end
56
57 def fetch_and_submit(**form)
58 get_form.then { |response|
59 @sessionid ||= response.sessionid
60
61 validate_form(form, response.form)
62
63 @blather.write_with_promise(@blather.command(
64 @node, @sessionid, form: form.to_form(:submit), server: @server
65 ))
66 }.then(&method(:check_for_error)).then { |response|
67 @last_response = response
68 OpenStruct.new(response.form.to_h)
69 }
70 end
71
72 protected
73
74 def check_for_error(response)
75 if response.note&.[]("type") == "error"
76 raise FormErrorResponse, response.note.text
77 end
78
79 response
80 end
81
82 def validate_form(to_submit, received)
83 to_submit.each_key do |key|
84 raise "No field #{key}" unless received.field(key.to_s)
85 end
86 end
87
88 def get_form
89 # If we already got a form on the last submit then
90 # assume we should fill that out here.
91 # If not, then move next to find the form
92 if @last_response&.form&.form?
93 EMPromise.resolve(@last_response)
94 else
95 @blather.write_with_promise(@blather.command(
96 @node,
97 @sessionid,
98 server: @server
99 ))
100 end
101 end
102 end
103
104 @ready = Queue.new
105
106 when_ready { @ready << :ready }
107
108 def self.start(jid, password, default_pubsub_addr: nil)
109 # workqueue_count MUST be 0 or else Blather uses threads!
110 setup(jid, password, nil, nil, nil, nil, workqueue_count: 0)
111 set_default_pubsub(default_pubsub_addr)
112
113 EM.error_handler(&method(:panic))
114
115 EM.next_tick { client.run }
116
117 block_until_ready
118 end
119
120 def self.block_until_ready
121 if EM.reactor_running?
122 promise = EMPromise.new
123 disconnected { true.tap { EM.next_tick { EM.stop } } }
124 Thread.new { promise.fulfill(@ready.pop) }
125 timeout_promise(promise, timeout: 30)
126 else
127 @thread = Thread.new { EM.run }
128 Timeout.timeout(30) { @ready.pop }
129 at_exit { wait_then_exit }
130 end
131 end
132
133 def self.panic(e)
134 warn e.message
135 warn e.backtrace
136 exit! 2
137 end
138
139 def self.wait_then_exit
140 disconnected { EM.stop }
141 EM.add_timer(30) { EM.stop }
142 shutdown
143 @thread&.join
144 end
145
146 def self.timeout_promise(promise, timeout: 30)
147 timer = EventMachine::Timer.new(timeout) {
148 promise.reject(:timeout)
149 }
150
151 promise.then do
152 timer.cancel
153 end
154 end
155
156 def self.write_with_promise(stanza)
157 promise = EMPromise.new
158 timeout_promise(promise)
159
160 client.write_with_handler(stanza) do |s|
161 if s.error? || s.type == :error
162 promise.reject(s)
163 else
164 promise.fulfill(s)
165 end
166 end
167 promise
168 end
169
170 def self.command(
171 node, sessionid=nil,
172 server: CONFIG[:sgx_jmp], action: :execute, form: nil
173 )
174 Blather::Stanza::Iq::Command.new.tap do |cmd|
175 cmd.to = server
176 cmd.node = node
177 cmd.command[:sessionid] = sessionid if sessionid
178 cmd.action = action
179 cmd.command << form if form
180 end
181 end
182
183 def self.execute(command_node, form=nil)
184 write_with_promise(command(command_node)).then do |iq|
185 next iq unless form
186
187 write_with_promise(command(command_node, iq.sessionid, form: form))
188 end
189 end
190
191 def self.pubsub(addr)
192 PubSub.new(self, addr)
193 end
194
195 def self.set_default_pubsub(addr)
196 @default_pubsub = addr && pubsub(addr)
197 end
198
199 def self.publish(xml)
200 raise "No default pubsub set!" unless @default_pubsub
201
202 @default_pubsub.publish(xml)
203 end
204
205 def self.command_execution(server, node)
206 CommandExecution.new(self, server, node)
207 end
208end