1# frozen_string_literal: true
2
3require "pg/em"
4require "bigdecimal"
5require "blather/client/dsl" # Require this first to not auto-include
6require "blather/client"
7require "braintree"
8require "date"
9require "dhall"
10require "em-hiredis"
11require "em_promise"
12require "ruby-bandwidth-iris"
13require "sentry-ruby"
14
15Sentry.init
16
17CONFIG =
18 Dhall::Coder
19 .new(safe: Dhall::Coder::JSON_LIKE + [Symbol, Proc])
20 .load(ARGV[0], transform_keys: ->(k) { k&.to_sym })
21
22singleton_class.class_eval do
23 include Blather::DSL
24 Blather::DSL.append_features(self)
25end
26
27require_relative "lib/backend_sgx"
28require_relative "lib/bandwidth_tn_order"
29require_relative "lib/btc_sell_prices"
30require_relative "lib/buy_account_credit_form"
31require_relative "lib/command_list"
32require_relative "lib/customer"
33require_relative "lib/electrum"
34require_relative "lib/em"
35require_relative "lib/payment_methods"
36require_relative "lib/registration"
37require_relative "lib/transaction"
38require_relative "lib/web_register_manager"
39
40ELECTRUM = Electrum.new(**CONFIG[:electrum])
41
42Faraday.default_adapter = :em_synchrony
43BandwidthIris::Client.global_options = {
44 account_id: CONFIG[:creds][:account],
45 username: CONFIG[:creds][:username],
46 password: CONFIG[:creds][:password]
47}
48
49def new_sentry_hub(stanza, name: nil)
50 hub = Sentry.get_current_hub&.new_from_top
51 raise "Sentry.init has not been called" unless hub
52
53 hub.push_scope
54 hub.current_scope.clear_breadcrumbs
55 hub.current_scope.set_transaction_name(name) if name
56 hub.current_scope.set_user(jid: stanza.from.stripped.to_s)
57 hub
58end
59
60# Braintree is not async, so wrap in EM.defer for now
61class AsyncBraintree
62 def initialize(environment:, merchant_id:, public_key:, private_key:, **)
63 @gateway = Braintree::Gateway.new(
64 environment: environment,
65 merchant_id: merchant_id,
66 public_key: public_key,
67 private_key: private_key
68 )
69 end
70
71 def respond_to_missing?(m, *)
72 @gateway.respond_to?(m)
73 end
74
75 def method_missing(m, *args)
76 return super unless respond_to_missing?(m, *args)
77
78 EM.promise_defer(klass: PromiseChain) do
79 @gateway.public_send(m, *args)
80 end
81 end
82
83 class PromiseChain < EMPromise
84 def respond_to_missing?(*)
85 false # We don't actually know what we respond to...
86 end
87
88 def method_missing(m, *args)
89 return super if respond_to_missing?(m, *args)
90 self.then { |o| o.public_send(m, *args) }
91 end
92 end
93end
94
95BRAINTREE = AsyncBraintree.new(**CONFIG[:braintree])
96
97def panic(e, hub=nil)
98 m = e.respond_to?(:message) ? e.message : e
99 warn "Error raised during event loop: #{e.class}: #{m}"
100 warn e.backtrace if e.respond_to?(:backtrace)
101 if e.is_a?(::Exception)
102 (hub || Sentry).capture_exception(e, hint: { background: false })
103 else
104 (hub || Sentry).capture_message(e.to_s, hint: { background: false })
105 end
106 exit 1
107end
108
109EM.error_handler(&method(:panic))
110
111when_ready do
112 BLATHER = self
113 REDIS = EM::Hiredis.connect
114 BTC_SELL_PRICES = BTCSellPrices.new(REDIS, CONFIG[:oxr_app_id])
115 DB = PG::EM::Client.new(dbname: "jmp")
116 DB.type_map_for_results = PG::BasicTypeMapForResults.new(DB)
117 DB.type_map_for_queries = PG::BasicTypeMapForQueries.new(DB)
118
119 EM.add_periodic_timer(3600) do
120 ping = Blather::Stanza::Iq::Ping.new(:get, CONFIG[:server][:host])
121 ping.from = CONFIG[:component][:jid]
122 self << ping
123 end
124end
125
126# workqueue_count MUST be 0 or else Blather uses threads!
127setup(
128 CONFIG[:component][:jid],
129 CONFIG[:component][:secret],
130 CONFIG[:server][:host],
131 CONFIG[:server][:port],
132 nil,
133 nil,
134 workqueue_count: 0
135)
136
137message to: /\Aaccount@/ do |m|
138 self << m.reply.tap do |out|
139 out.body = "This bot is deprecated. Please talk to xmpp:cheogram.com"
140 end
141end
142
143before nil, to: /\Acustomer_/, from: /(\A|@)#{CONFIG[:sgx]}(\/|\Z)/ do |s|
144 sentry_hub = new_sentry_hub(s, name: "stanza_customer")
145 Customer.for_customer_id(
146 s.to.node.delete_prefix("customer_")
147 ).then { |customer|
148 sentry_hub.current_scope.set_user(
149 id: customer.customer_id,
150 jid: s.from.stripped.to_s
151 )
152 customer.stanza_to(s)
153 }.catch { |e| panic(e, sentry_hub) }
154 halt
155end
156
157message do |m|
158 sentry_hub = new_sentry_hub(m, name: "message")
159 Customer.for_jid(m.from.stripped).then { |customer|
160 sentry_hub.current_scope.set_user(
161 id: customer.customer_id,
162 jid: m.from.stripped.to_s
163 )
164 today = Time.now.utc.to_date
165 EMPromise.all([
166 REDIS.zremrangebylex(
167 "jmp_customer_outbound_messages-#{customer.customer_id}",
168 "-",
169 # Store message counts per day for 1 year
170 "[#{(today << 12).strftime('%Y%m%d')}"
171 ),
172 REDIS.zincrby(
173 "jmp_customer_outbound_messages-#{customer.customer_id}",
174 1,
175 today.strftime("%Y%m%d")
176 ),
177 customer.stanza_from(m)
178 ])
179 }.catch { |e| panic(e, sentry_hub) }
180end
181
182message :error? do |m|
183 puts "MESSAGE ERROR: #{m.inspect}"
184end
185
186class SessionManager
187 def initialize(blather, id_msg, timeout: 5)
188 @blather = blather
189 @sessions = {}
190 @id_msg = id_msg
191 @timeout = timeout
192 end
193
194 def promise_for(stanza)
195 id = "#{stanza.to.stripped}/#{stanza.public_send(@id_msg)}"
196 @sessions.fetch(id) do
197 @sessions[id] = EMPromise.new
198 EM.add_timer(@timeout) do
199 @sessions.delete(id)&.reject(:timeout)
200 end
201 @sessions[id]
202 end
203 end
204
205 def write(stanza)
206 promise = promise_for(stanza)
207 @blather << stanza
208 promise
209 end
210
211 def fulfill(stanza)
212 id = "#{stanza.from.stripped}/#{stanza.public_send(@id_msg)}"
213 if stanza.error?
214 @sessions.delete(id)&.reject(stanza)
215 else
216 @sessions.delete(id)&.fulfill(stanza)
217 end
218 end
219end
220
221IQ_MANAGER = SessionManager.new(self, :id)
222COMMAND_MANAGER = SessionManager.new(self, :sessionid, timeout: 60 * 60)
223web_register_manager = WebRegisterManager.new
224
225disco_info to: Blather::JID.new(CONFIG[:component][:jid]) do |iq|
226 reply = iq.reply
227 reply.identities = [{
228 name: "JMP.chat",
229 type: "sms",
230 category: "gateway"
231 }]
232 form = Blather::Stanza::X.find_or_create(reply.query)
233 form.type = "result"
234 form.fields = [
235 {
236 var: "FORM_TYPE",
237 type: "hidden",
238 value: "http://jabber.org/network/serverinfo"
239 }
240 ] + CONFIG[:xep0157]
241 self << reply
242end
243
244disco_items node: "http://jabber.org/protocol/commands" do |iq|
245 sentry_hub = new_sentry_hub(iq, name: iq.node)
246 reply = iq.reply
247
248 CommandList.for(iq.from.stripped).then { |list|
249 reply.items = list.map do |item|
250 Blather::Stanza::DiscoItems::Item.new(
251 iq.to,
252 item[:node],
253 item[:name]
254 )
255 end
256 self << reply
257 }.catch { |e| panic(e, sentry_hub) }
258end
259
260iq "/iq/ns:services", ns: "urn:xmpp:extdisco:2" do |iq|
261 reply = iq.reply
262 reply << Nokogiri::XML::Builder.new {
263 services(xmlns: "urn:xmpp:extdisco:2") do
264 service(
265 type: "sip",
266 host: CONFIG[:sip_host]
267 )
268 end
269 }.doc.root
270
271 self << reply
272end
273
274command :execute?, node: "jabber:iq:register", sessionid: nil do |iq|
275 sentry_hub = new_sentry_hub(iq, name: iq.node)
276 EMPromise.resolve(nil).then {
277 Customer.for_jid(iq.from.stripped)
278 }.catch {
279 sentry_hub.add_breadcrumb(Sentry::Breadcrumb.new(
280 message: "Customer.create"
281 ))
282 Customer.create(iq.from.stripped)
283 }.then { |customer|
284 sentry_hub.current_scope.set_user(
285 id: customer.customer_id,
286 jid: iq.from.stripped.to_s
287 )
288 sentry_hub.add_breadcrumb(Sentry::Breadcrumb.new(
289 message: "Registration.for"
290 ))
291 Registration.for(
292 iq,
293 customer,
294 web_register_manager
295 ).then(&:write)
296 }.catch_only(Blather::Stanza) { |reply|
297 self << reply
298 }.catch { |e| panic(e, sentry_hub) }
299end
300
301def reply_with_note(iq, text, type: :info)
302 reply = iq.reply
303 reply.status = :completed
304 reply.note_type = type
305 reply.note_text = text
306
307 self << reply
308end
309
310# Commands that just pass through to the SGX
311command node: [
312 "number-display",
313 "configure-calls",
314 "record-voicemail-greeting"
315] do |iq|
316 sentry_hub = new_sentry_hub(iq, name: iq.node)
317 Customer.for_jid(iq.from.stripped).then { |customer|
318 sentry_hub.current_scope.set_user(
319 id: customer.customer_id,
320 jid: iq.from.stripped.to_s
321 )
322
323 customer.stanza_from(iq)
324 }.catch { |e| panic(e, sentry_hub) }
325end
326
327command :execute?, node: "buy credit", sessionid: nil do |iq|
328 sentry_hub = new_sentry_hub(iq, name: iq.node)
329 reply = iq.reply
330 reply.allowed_actions = [:complete]
331
332 Customer.for_jid(iq.from.stripped).then { |customer|
333 BuyAccountCreditForm.for(customer).then do |credit_form|
334 credit_form.add_to_form(reply.form)
335 COMMAND_MANAGER.write(reply).then { |iq2| [customer, credit_form, iq2] }
336 end
337 }.then { |(customer, credit_form, iq2)|
338 iq = iq2 # This allows the catch to use it also
339 Transaction.sale(customer, **credit_form.parse(iq2.form))
340 }.then { |transaction|
341 transaction.insert.then { transaction.amount }
342 }.then { |amount|
343 reply_with_note(iq, "$#{'%.2f' % amount} added to your account balance.")
344 }.catch_only(BuyAccountCreditForm::AmountValidationError) { |e|
345 reply_with_note(iq, e.message, type: :error)
346 }.catch { |e|
347 sentry_hub.capture_exception(e)
348 text = "Failed to buy credit, system said: #{e.message}"
349 reply_with_note(iq, text, type: :error)
350 }.catch { |e| panic(e, sentry_hub) }
351end
352
353command :execute?, node: "reset sip account", sessionid: nil do |iq|
354 sentry_hub = new_sentry_hub(iq, name: iq.node)
355 Customer.for_jid(iq.from.stripped).then { |customer|
356 sentry_hub.current_scope.set_user(
357 id: customer.customer_id,
358 jid: iq.from.stripped.to_s
359 )
360 customer.reset_sip_account
361 }.then { |sip_account|
362 reply = iq.reply
363 reply.command << sip_account.form
364 BLATHER << reply
365 }.catch { |e| panic(e, sentry_hub) }
366end
367
368command :execute?, node: "usage", sessionid: nil do |iq|
369 sentry_hub = new_sentry_hub(iq, name: iq.node)
370 report_for = (Date.today..(Date.today << 1))
371
372 Customer.for_jid(iq.from.stripped).then { |customer|
373 sentry_hub.current_scope.set_user(
374 id: customer.customer_id,
375 jid: iq.from.stripped.to_s
376 )
377
378 customer.usage_report(report_for)
379 }.then { |usage_report|
380 reply = iq.reply
381 reply.status = :completed
382 reply.command << usage_report.form
383 BLATHER << reply
384 }.catch { |e| panic(e, sentry_hub) }
385end
386
387command :execute?, node: "web-register", sessionid: nil do |iq|
388 sentry_hub = new_sentry_hub(iq, name: iq.node)
389
390 begin
391 jid = iq.form.field("jid")&.value.to_s.strip
392 tel = iq.form.field("tel")&.value.to_s.strip
393 hub.current_scope.set_user(jid: jid, tel: tel)
394 if iq.from.stripped != CONFIG[:web_register][:from]
395 BLATHER << iq.as_error("forbidden", :auth)
396 elsif jid == "" || tel !~ /\A\+\d+\Z/
397 reply_with_note(iq, "Invalid JID or telephone number.", type: :error)
398 else
399 IQ_MANAGER.write(Blather::Stanza::Iq::Command.new.tap { |cmd|
400 cmd.to = CONFIG[:web_register][:to]
401 cmd.from = CONFIG[:component][:jid]
402 cmd.node = "push-register"
403 cmd.form.fields = [var: "to", value: jid]
404 cmd.form.type = "submit"
405 }).then { |result|
406 final_jid = result.form.field("from")&.value.to_s.strip
407 web_register_manager[final_jid] = tel
408 BLATHER << iq.reply.tap { |reply| reply.status = :completed }
409 }.catch { |e| panic(e, sentry_hub) }
410 end
411 rescue StandardError => e
412 sentry_hub.capture_exception(e)
413 end
414end
415
416command sessionid: /./ do |iq|
417 COMMAND_MANAGER.fulfill(iq)
418end
419
420iq type: [:result, :error] do |iq|
421 IQ_MANAGER.fulfill(iq)
422end
423
424iq type: [:get, :set] do |iq|
425 self << Blather::StanzaError.new(iq, "feature-not-implemented", :cancel)
426end