1# frozen_string_literal: true
2
3require "pg/em/connection_pool"
4require "bigdecimal"
5require "blather/client/dsl" # Require this first to not auto-include
6require "blather/client"
7require "braintree"
8require "date"
9require "dhall"
10require "em-hiredis"
11require "em_promise"
12require "ruby-bandwidth-iris"
13require "sentry-ruby"
14
15Sentry.init
16
17CONFIG =
18 Dhall::Coder
19 .new(safe: Dhall::Coder::JSON_LIKE + [Symbol, Proc])
20 .load(ARGV[0], transform_keys: ->(k) { k&.to_sym })
21
22singleton_class.class_eval do
23 include Blather::DSL
24 Blather::DSL.append_features(self)
25end
26
27require_relative "lib/alt_top_up_form"
28require_relative "lib/add_bitcoin_address"
29require_relative "lib/backend_sgx"
30require_relative "lib/bandwidth_tn_order"
31require_relative "lib/btc_sell_prices"
32require_relative "lib/buy_account_credit_form"
33require_relative "lib/command_list"
34require_relative "lib/customer"
35require_relative "lib/electrum"
36require_relative "lib/em"
37require_relative "lib/payment_methods"
38require_relative "lib/registration"
39require_relative "lib/transaction"
40require_relative "lib/web_register_manager"
41
42ELECTRUM = Electrum.new(**CONFIG[:electrum])
43
44Faraday.default_adapter = :em_synchrony
45BandwidthIris::Client.global_options = {
46 account_id: CONFIG[:creds][:account],
47 username: CONFIG[:creds][:username],
48 password: CONFIG[:creds][:password]
49}
50
51def new_sentry_hub(stanza, name: nil)
52 hub = Sentry.get_current_hub&.new_from_top
53 raise "Sentry.init has not been called" unless hub
54
55 hub.push_scope
56 hub.current_scope.clear_breadcrumbs
57 hub.current_scope.set_transaction_name(name) if name
58 hub.current_scope.set_user(jid: stanza.from.stripped.to_s)
59 hub
60end
61
62# Braintree is not async, so wrap in EM.defer for now
63class AsyncBraintree
64 def initialize(environment:, merchant_id:, public_key:, private_key:, **)
65 @gateway = Braintree::Gateway.new(
66 environment: environment,
67 merchant_id: merchant_id,
68 public_key: public_key,
69 private_key: private_key
70 )
71 end
72
73 def respond_to_missing?(m, *)
74 @gateway.respond_to?(m)
75 end
76
77 def method_missing(m, *args)
78 return super unless respond_to_missing?(m, *args)
79
80 EM.promise_defer(klass: PromiseChain) do
81 @gateway.public_send(m, *args)
82 end
83 end
84
85 class PromiseChain < EMPromise
86 def respond_to_missing?(*)
87 false # We don't actually know what we respond to...
88 end
89
90 def method_missing(m, *args)
91 return super if respond_to_missing?(m, *args)
92 self.then { |o| o.public_send(m, *args) }
93 end
94 end
95end
96
97BRAINTREE = AsyncBraintree.new(**CONFIG[:braintree])
98
99def panic(e, hub=nil)
100 m = e.respond_to?(:message) ? e.message : e
101 warn "Error raised during event loop: #{e.class}: #{m}"
102 warn e.backtrace if e.respond_to?(:backtrace)
103 if e.is_a?(::Exception)
104 (hub || Sentry).capture_exception(e, hint: { background: false })
105 else
106 (hub || Sentry).capture_message(e.to_s, hint: { background: false })
107 end
108 exit 1
109end
110
111EM.error_handler(&method(:panic))
112
113when_ready do
114 BLATHER = self
115 REDIS = EM::Hiredis.connect
116 BTC_SELL_PRICES = BTCSellPrices.new(REDIS, CONFIG[:oxr_app_id])
117 DB = PG::EM::ConnectionPool.new(dbname: "jmp")
118 DB.type_map_for_results = PG::BasicTypeMapForResults.new(DB)
119 DB.type_map_for_queries = PG::BasicTypeMapForQueries.new(DB)
120
121 EM.add_periodic_timer(3600) do
122 ping = Blather::Stanza::Iq::Ping.new(:get, CONFIG[:server][:host])
123 ping.from = CONFIG[:component][:jid]
124 self << ping
125 end
126end
127
128# workqueue_count MUST be 0 or else Blather uses threads!
129setup(
130 CONFIG[:component][:jid],
131 CONFIG[:component][:secret],
132 CONFIG[:server][:host],
133 CONFIG[:server][:port],
134 nil,
135 nil,
136 workqueue_count: 0
137)
138
139message to: /\Aaccount@/ do |m|
140 self << m.reply.tap do |out|
141 out.body = "This bot is deprecated. Please talk to xmpp:cheogram.com"
142 end
143end
144
145before(
146 :iq,
147 type: [:error, :result],
148 to: /\Acustomer_/,
149 from: /(\A|@)#{CONFIG[:sgx]}(\/|\Z)/
150) { |iq| halt if IQ_MANAGER.fulfill(iq) }
151
152before nil, to: /\Acustomer_/, from: /(\A|@)#{CONFIG[:sgx]}(\/|\Z)/ do |s|
153 sentry_hub = new_sentry_hub(s, name: "stanza_customer")
154 Customer.for_customer_id(
155 s.to.node.delete_prefix("customer_")
156 ).then { |customer|
157 sentry_hub.current_scope.set_user(
158 id: customer.customer_id,
159 jid: s.from.stripped.to_s
160 )
161 customer.stanza_to(s)
162 }.catch { |e| panic(e, sentry_hub) }
163 halt
164end
165
166# Ignore messages to component
167# Especially if we have the component join MUC for notifications
168message(to: /\A#{CONFIG[:component][:jid]}\Z/) {}
169
170def billable_message(m)
171 !m.body.empty? || m.find("ns:x", ns: OOB.registered_ns).first
172end
173
174message do |m|
175 sentry_hub = new_sentry_hub(m, name: "message")
176 today = Time.now.utc.to_date
177 Customer.for_jid(m.from.stripped).then { |customer|
178 sentry_hub.current_scope.set_user(
179 id: customer.customer_id,
180 jid: m.from.stripped.to_s
181 )
182 EMPromise.all([
183 customer,
184 (customer.incr_message_usage if billable_message(m)),
185 REDIS.exists("jmp_usage_notify-#{customer.customer_id}"),
186 customer.stanza_from(m)
187 ])
188 }.then { |(customer, _, already, _)|
189 next if already == 1
190
191 customer.message_usage((today..(today - 30))).then do |usage|
192 next unless usage > 500
193
194 BLATHER.join(CONFIG[:notify_admin], "sgx-jmp")
195 BLATHER.say(
196 CONFIG[:notify_admin],
197 "#{customer.customer_id} has used #{usage} messages since #{today - 30}"
198 )
199 REDIS.set("jmp_usage_notify-#{customer.customer_id}", ex: 60 * 60 * 24)
200 end
201 }.catch { |e| panic(e, sentry_hub) }
202end
203
204message :error? do |m|
205 puts "MESSAGE ERROR: #{m.inspect}"
206end
207
208class SessionManager
209 def initialize(blather, id_msg, timeout: 5)
210 @blather = blather
211 @sessions = {}
212 @id_msg = id_msg
213 @timeout = timeout
214 end
215
216 def promise_for(stanza)
217 id = "#{stanza.to.stripped}/#{stanza.public_send(@id_msg)}"
218 @sessions.fetch(id) do
219 @sessions[id] = EMPromise.new
220 EM.add_timer(@timeout) do
221 @sessions.delete(id)&.reject(:timeout)
222 end
223 @sessions[id]
224 end
225 end
226
227 def write(stanza)
228 promise = promise_for(stanza)
229 @blather << stanza
230 promise
231 end
232
233 def fulfill(stanza)
234 id = "#{stanza.from.stripped}/#{stanza.public_send(@id_msg)}"
235 if stanza.error?
236 @sessions.delete(id)&.reject(stanza)
237 else
238 @sessions.delete(id)&.fulfill(stanza)
239 end
240 end
241end
242
243IQ_MANAGER = SessionManager.new(self, :id)
244COMMAND_MANAGER = SessionManager.new(self, :sessionid, timeout: 60 * 60)
245web_register_manager = WebRegisterManager.new
246
247disco_info to: Blather::JID.new(CONFIG[:component][:jid]) do |iq|
248 reply = iq.reply
249 reply.identities = [{
250 name: "JMP.chat",
251 type: "sms",
252 category: "gateway"
253 }]
254 form = Blather::Stanza::X.find_or_create(reply.query)
255 form.type = "result"
256 form.fields = [
257 {
258 var: "FORM_TYPE",
259 type: "hidden",
260 value: "http://jabber.org/network/serverinfo"
261 }
262 ] + CONFIG[:xep0157]
263 self << reply
264end
265
266disco_items node: "http://jabber.org/protocol/commands" do |iq|
267 sentry_hub = new_sentry_hub(iq, name: iq.node)
268 reply = iq.reply
269
270 CommandList.for(iq.from.stripped).then { |list|
271 reply.items = list.map do |item|
272 Blather::Stanza::DiscoItems::Item.new(
273 iq.to,
274 item[:node],
275 item[:name]
276 )
277 end
278 self << reply
279 }.catch { |e| panic(e, sentry_hub) }
280end
281
282iq "/iq/ns:services", ns: "urn:xmpp:extdisco:2" do |iq|
283 reply = iq.reply
284 reply << Nokogiri::XML::Builder.new {
285 services(xmlns: "urn:xmpp:extdisco:2") do
286 service(
287 type: "sip",
288 host: CONFIG[:sip_host]
289 )
290 end
291 }.doc.root
292
293 self << reply
294end
295
296command :execute?, node: "jabber:iq:register", sessionid: nil do |iq|
297 sentry_hub = new_sentry_hub(iq, name: iq.node)
298 EMPromise.resolve(nil).then {
299 Customer.for_jid(iq.from.stripped)
300 }.catch {
301 sentry_hub.add_breadcrumb(Sentry::Breadcrumb.new(
302 message: "Customer.create"
303 ))
304 Customer.create(iq.from.stripped)
305 }.then { |customer|
306 sentry_hub.current_scope.set_user(
307 id: customer.customer_id,
308 jid: iq.from.stripped.to_s
309 )
310 sentry_hub.add_breadcrumb(Sentry::Breadcrumb.new(
311 message: "Registration.for"
312 ))
313 Registration.for(
314 iq,
315 customer,
316 web_register_manager
317 ).then(&:write)
318 }.catch_only(Blather::Stanza) { |reply|
319 self << reply
320 }.catch { |e| panic(e, sentry_hub) }
321end
322
323def reply_with_note(iq, text, type: :info)
324 reply = iq.reply
325 reply.status = :completed
326 reply.note_type = type
327 reply.note_text = text
328
329 self << reply
330end
331
332# Commands that just pass through to the SGX
333command node: [
334 "number-display",
335 "configure-calls",
336 "record-voicemail-greeting"
337] do |iq|
338 sentry_hub = new_sentry_hub(iq, name: iq.node)
339 Customer.for_jid(iq.from.stripped).then { |customer|
340 sentry_hub.current_scope.set_user(
341 id: customer.customer_id,
342 jid: iq.from.stripped.to_s
343 )
344
345 customer.stanza_from(iq)
346 }.catch { |e| panic(e, sentry_hub) }
347end
348
349command :execute?, node: "credit cards", sessionid: nil do |iq|
350 sentry_hub = new_sentry_hub(iq, name: iq.node)
351 reply = iq.reply
352 reply.status = :completed
353
354 Customer.for_jid(iq.from.stripped).then { |customer|
355 oob = OOB.find_or_create(reply.command)
356 oob.url = CONFIG[:credit_card_url].call(
357 reply.to.stripped.to_s.gsub("\\", "%5C"),
358 customer.customer_id
359 )
360 oob.desc = "Manage credits cards and settings"
361
362 reply.note_type = :info
363 reply.note_text = "#{oob.desc}: #{oob.url}"
364
365 self << reply
366 }.catch { |e| panic(e, sentry_hub) }
367end
368
369command :execute?, node: "top up", sessionid: nil do |iq|
370 sentry_hub = new_sentry_hub(iq, name: iq.node)
371 reply = iq.reply
372 reply.allowed_actions = [:complete]
373
374 Customer.for_jid(iq.from.stripped).then { |customer|
375 BuyAccountCreditForm.for(customer).then do |credit_form|
376 credit_form.add_to_form(reply.form)
377 COMMAND_MANAGER.write(reply).then { |iq2| [customer, credit_form, iq2] }
378 end
379 }.then { |(customer, credit_form, iq2)|
380 iq = iq2 # This allows the catch to use it also
381 Transaction.sale(customer, **credit_form.parse(iq2.form))
382 }.then { |transaction|
383 transaction.insert.then do
384 reply_with_note(iq, "#{transaction} added to your account balance.")
385 end
386 }.catch_only(BuyAccountCreditForm::AmountValidationError) { |e|
387 reply_with_note(iq, e.message, type: :error)
388 }.catch { |e|
389 sentry_hub.capture_exception(e)
390 text = "Failed to buy credit, system said: #{e.message}"
391 reply_with_note(iq, text, type: :error)
392 }.catch { |e| panic(e, sentry_hub) }
393end
394
395command :execute?, node: "alt top up", sessionid: nil do |iq|
396 sentry_hub = new_sentry_hub(iq, name: iq.node)
397 reply = iq.reply
398 reply.status = :executing
399 reply.allowed_actions = [:complete]
400
401 Customer.for_jid(iq.from.stripped).then { |customer|
402 sentry_hub.current_scope.set_user(
403 id: customer.customer_id,
404 jid: iq.from.stripped.to_s
405 )
406
407 EMPromise.all([AltTopUpForm.for(customer), customer])
408 }.then { |(alt_form, customer)|
409 reply.command << alt_form.form
410
411 COMMAND_MANAGER.write(reply).then do |iq2|
412 AddBitcoinAddress.for(iq2, alt_form, customer).write
413 end
414 }.catch { |e| panic(e, sentry_hub) }
415end
416
417command :execute?, node: "reset sip account", sessionid: nil do |iq|
418 sentry_hub = new_sentry_hub(iq, name: iq.node)
419 Customer.for_jid(iq.from.stripped).then { |customer|
420 sentry_hub.current_scope.set_user(
421 id: customer.customer_id,
422 jid: iq.from.stripped.to_s
423 )
424 customer.reset_sip_account
425 }.then { |sip_account|
426 reply = iq.reply
427 reply.command << sip_account.form
428 BLATHER << reply
429 }.catch { |e| panic(e, sentry_hub) }
430end
431
432command :execute?, node: "usage", sessionid: nil do |iq|
433 sentry_hub = new_sentry_hub(iq, name: iq.node)
434 report_for = (Date.today..(Date.today << 1))
435
436 Customer.for_jid(iq.from.stripped).then { |customer|
437 sentry_hub.current_scope.set_user(
438 id: customer.customer_id,
439 jid: iq.from.stripped.to_s
440 )
441
442 customer.usage_report(report_for)
443 }.then { |usage_report|
444 reply = iq.reply
445 reply.status = :completed
446 reply.command << usage_report.form
447 BLATHER << reply
448 }.catch { |e| panic(e, sentry_hub) }
449end
450
451command :execute?, node: "web-register", sessionid: nil do |iq|
452 sentry_hub = new_sentry_hub(iq, name: iq.node)
453
454 begin
455 jid = iq.form.field("jid")&.value.to_s.strip
456 tel = iq.form.field("tel")&.value.to_s.strip
457 hub.current_scope.set_user(jid: jid, tel: tel)
458 if iq.from.stripped != CONFIG[:web_register][:from]
459 BLATHER << iq.as_error("forbidden", :auth)
460 elsif jid == "" || tel !~ /\A\+\d+\Z/
461 reply_with_note(iq, "Invalid JID or telephone number.", type: :error)
462 else
463 IQ_MANAGER.write(Blather::Stanza::Iq::Command.new.tap { |cmd|
464 cmd.to = CONFIG[:web_register][:to]
465 cmd.from = CONFIG[:component][:jid]
466 cmd.node = "push-register"
467 cmd.form.fields = [var: "to", value: jid]
468 cmd.form.type = "submit"
469 }).then { |result|
470 final_jid = result.form.field("from")&.value.to_s.strip
471 web_register_manager[final_jid] = tel
472 BLATHER << iq.reply.tap { |reply| reply.status = :completed }
473 }.catch { |e| panic(e, sentry_hub) }
474 end
475 rescue StandardError => e
476 sentry_hub.capture_exception(e)
477 end
478end
479
480command sessionid: /./ do |iq|
481 COMMAND_MANAGER.fulfill(iq)
482end
483
484iq type: [:result, :error] do |iq|
485 IQ_MANAGER.fulfill(iq)
486end
487
488iq type: [:get, :set] do |iq|
489 self << Blather::StanzaError.new(iq, "feature-not-implemented", :cancel)
490end