sgx_jmp.rb

  1# frozen_string_literal: true
  2
  3require "pg/em"
  4require "bigdecimal"
  5require "blather/client/dsl" # Require this first to not auto-include
  6require "blather/client"
  7require "braintree"
  8require "date"
  9require "dhall"
 10require "em-hiredis"
 11require "em_promise"
 12require "ruby-bandwidth-iris"
 13require "sentry-ruby"
 14
 15Sentry.init
 16
 17CONFIG =
 18	Dhall::Coder
 19	.new(safe: Dhall::Coder::JSON_LIKE + [Symbol, Proc])
 20	.load(ARGV[0], transform_keys: ->(k) { k&.to_sym })
 21
 22singleton_class.class_eval do
 23	include Blather::DSL
 24	Blather::DSL.append_features(self)
 25end
 26
 27require_relative "lib/backend_sgx"
 28require_relative "lib/bandwidth_tn_order"
 29require_relative "lib/btc_sell_prices"
 30require_relative "lib/buy_account_credit_form"
 31require_relative "lib/command_list"
 32require_relative "lib/customer"
 33require_relative "lib/electrum"
 34require_relative "lib/em"
 35require_relative "lib/payment_methods"
 36require_relative "lib/registration"
 37require_relative "lib/transaction"
 38require_relative "lib/web_register_manager"
 39
 40ELECTRUM = Electrum.new(**CONFIG[:electrum])
 41
 42Faraday.default_adapter = :em_synchrony
 43BandwidthIris::Client.global_options = {
 44	account_id: CONFIG[:creds][:account],
 45	username: CONFIG[:creds][:username],
 46	password: CONFIG[:creds][:password]
 47}
 48
 49def new_sentry_hub(stanza, name: nil)
 50	hub = Sentry.get_current_hub&.new_from_top
 51	raise "Sentry.init has not been called" unless hub
 52
 53	hub.push_scope
 54	hub.current_scope.clear_breadcrumbs
 55	hub.current_scope.set_transaction_name(name) if name
 56	hub.current_scope.set_user(jid: stanza.from.stripped.to_s)
 57	hub
 58end
 59
 60# Braintree is not async, so wrap in EM.defer for now
 61class AsyncBraintree
 62	def initialize(environment:, merchant_id:, public_key:, private_key:, **)
 63		@gateway = Braintree::Gateway.new(
 64			environment: environment,
 65			merchant_id: merchant_id,
 66			public_key: public_key,
 67			private_key: private_key
 68		)
 69	end
 70
 71	def respond_to_missing?(m, *)
 72		@gateway.respond_to?(m)
 73	end
 74
 75	def method_missing(m, *args)
 76		return super unless respond_to_missing?(m, *args)
 77
 78		EM.promise_defer(klass: PromiseChain) do
 79			@gateway.public_send(m, *args)
 80		end
 81	end
 82
 83	class PromiseChain < EMPromise
 84		def respond_to_missing?(*)
 85			false # We don't actually know what we respond to...
 86		end
 87
 88		def method_missing(m, *args)
 89			return super if respond_to_missing?(m, *args)
 90			self.then { |o| o.public_send(m, *args) }
 91		end
 92	end
 93end
 94
 95BRAINTREE = AsyncBraintree.new(**CONFIG[:braintree])
 96
 97def panic(e, hub=nil)
 98	m = e.respond_to?(:message) ? e.message : e
 99	warn "Error raised during event loop: #{e.class}: #{m}"
100	warn e.backtrace if e.respond_to?(:backtrace)
101	if e.is_a?(::Exception)
102		(hub || Sentry).capture_exception(e, hint: { background: false })
103	else
104		(hub || Sentry).capture_message(e.to_s, hint: { background: false })
105	end
106	exit 1
107end
108
109EM.error_handler(&method(:panic))
110
111when_ready do
112	BLATHER = self
113	REDIS = EM::Hiredis.connect
114	BTC_SELL_PRICES = BTCSellPrices.new(REDIS, CONFIG[:oxr_app_id])
115	DB = PG::EM::Client.new(dbname: "jmp")
116	DB.type_map_for_results = PG::BasicTypeMapForResults.new(DB)
117	DB.type_map_for_queries = PG::BasicTypeMapForQueries.new(DB)
118
119	EM.add_periodic_timer(3600) do
120		ping = Blather::Stanza::Iq::Ping.new(:get, CONFIG[:server][:host])
121		ping.from = CONFIG[:component][:jid]
122		self << ping
123	end
124end
125
126# workqueue_count MUST be 0 or else Blather uses threads!
127setup(
128	CONFIG[:component][:jid],
129	CONFIG[:component][:secret],
130	CONFIG[:server][:host],
131	CONFIG[:server][:port],
132	nil,
133	nil,
134	workqueue_count: 0
135)
136
137message to: /\Aaccount@/ do |m|
138	self << m.reply.tap do |out|
139		out.body = "This bot is deprecated. Please talk to xmpp:cheogram.com"
140	end
141end
142
143before nil, to: /\Acustomer_/, from: /(\A|@)#{CONFIG[:sgx]}(\/|\Z)/ do |s|
144	sentry_hub = new_sentry_hub(s, name: "stanza_customer")
145	Customer.for_customer_id(
146		s.to.node.delete_prefix("customer_")
147	).then { |customer|
148		sentry_hub.current_scope.set_user(
149			id: customer.customer_id,
150			jid: s.from.stripped.to_s
151		)
152		customer.stanza_to(s)
153	}.catch { |e| panic(e, sentry_hub) }
154	halt
155end
156
157message do |m|
158	sentry_hub = new_sentry_hub(m, name: "message")
159	Customer.for_jid(m.from.stripped).then { |customer|
160		sentry_hub.current_scope.set_user(
161			id: customer.customer_id,
162			jid: m.from.stripped.to_s
163		)
164		today = Time.now.utc.to_date
165		EMPromise.all([
166			REDIS.zremrangebylex(
167				"jmp_customer_outbound_messages-#{customer.customer_id}",
168				"-",
169				# Store message counts per day for 1 year
170				"[#{(today << 12).strftime('%Y%m%d')}"
171			),
172			REDIS.zincrby(
173				"jmp_customer_outbound_messages-#{customer.customer_id}",
174				1,
175				today.strftime("%Y%m%d")
176			),
177			customer.stanza_from(m)
178		])
179	}.catch { |e| panic(e, sentry_hub) }
180end
181
182message :error? do |m|
183	puts "MESSAGE ERROR: #{m.inspect}"
184end
185
186class SessionManager
187	def initialize(blather, id_msg, timeout: 5)
188		@blather = blather
189		@sessions = {}
190		@id_msg = id_msg
191		@timeout = timeout
192	end
193
194	def promise_for(stanza)
195		id = "#{stanza.to.stripped}/#{stanza.public_send(@id_msg)}"
196		@sessions.fetch(id) do
197			@sessions[id] = EMPromise.new
198			EM.add_timer(@timeout) do
199				@sessions.delete(id)&.reject(:timeout)
200			end
201			@sessions[id]
202		end
203	end
204
205	def write(stanza)
206		promise = promise_for(stanza)
207		@blather << stanza
208		promise
209	end
210
211	def fulfill(stanza)
212		id = "#{stanza.from.stripped}/#{stanza.public_send(@id_msg)}"
213		if stanza.error?
214			@sessions.delete(id)&.reject(stanza)
215		else
216			@sessions.delete(id)&.fulfill(stanza)
217		end
218	end
219end
220
221IQ_MANAGER = SessionManager.new(self, :id)
222COMMAND_MANAGER = SessionManager.new(self, :sessionid, timeout: 60 * 60)
223web_register_manager = WebRegisterManager.new
224
225disco_info to: Blather::JID.new(CONFIG[:component][:jid]) do |iq|
226	reply = iq.reply
227	reply.identities = [{
228		name: "JMP.chat",
229		type: "sms",
230		category: "gateway"
231	}]
232	form = Blather::Stanza::X.find_or_create(reply.query)
233	form.type = "result"
234	form.fields = [
235		{
236			var: "FORM_TYPE",
237			type: "hidden",
238			value: "http://jabber.org/network/serverinfo"
239		}
240	] + CONFIG[:xep0157]
241	self << reply
242end
243
244disco_items node: "http://jabber.org/protocol/commands" do |iq|
245	sentry_hub = new_sentry_hub(iq, name: iq.node)
246	reply = iq.reply
247
248	CommandList.for(iq.from.stripped).then { |list|
249		reply.items = list.map do |item|
250			Blather::Stanza::DiscoItems::Item.new(
251				iq.to,
252				item[:node],
253				item[:name]
254			)
255		end
256		self << reply
257	}.catch { |e| panic(e, sentry_hub) }
258end
259
260iq "/iq/ns:services", ns: "urn:xmpp:extdisco:2" do |iq|
261	reply = iq.reply
262	reply << Nokogiri::XML::Builder.new {
263		services(xmlns: "urn:xmpp:extdisco:2") do
264			service(
265				type: "sip",
266				host: CONFIG[:sip_host]
267			)
268		end
269	}.doc.root
270
271	self << reply
272end
273
274command :execute?, node: "jabber:iq:register", sessionid: nil do |iq|
275	sentry_hub = new_sentry_hub(iq, name: iq.node)
276	EMPromise.resolve(nil).then {
277		Customer.for_jid(iq.from.stripped)
278	}.catch {
279		sentry_hub.add_breadcrumb(Sentry::Breadcrumb.new(
280			message: "Customer.create"
281		))
282		Customer.create(iq.from.stripped)
283	}.then { |customer|
284		sentry_hub.current_scope.set_user(
285			id: customer.customer_id,
286			jid: iq.from.stripped.to_s
287		)
288		sentry_hub.add_breadcrumb(Sentry::Breadcrumb.new(
289			message: "Registration.for"
290		))
291		Registration.for(
292			iq,
293			customer,
294			web_register_manager
295		).then(&:write)
296	}.catch_only(Blather::Stanza) { |reply|
297		self << reply
298	}.catch { |e| panic(e, sentry_hub) }
299end
300
301def reply_with_note(iq, text, type: :info)
302	reply = iq.reply
303	reply.status = :completed
304	reply.note_type = type
305	reply.note_text = text
306
307	self << reply
308end
309
310# Commands that just pass through to the SGX
311command node: [
312	"number-display",
313	"configure-calls",
314	"record-voicemail-greeting"
315] do |iq|
316	sentry_hub = new_sentry_hub(iq, name: iq.node)
317	Customer.for_jid(iq.from.stripped).then { |customer|
318		sentry_hub.current_scope.set_user(
319			id: customer.customer_id,
320			jid: iq.from.stripped.to_s
321		)
322
323		customer.stanza_from(iq)
324	}.catch { |e| panic(e, sentry_hub) }
325end
326
327command :execute?, node: "buy credit", sessionid: nil do |iq|
328	sentry_hub = new_sentry_hub(iq, name: iq.node)
329	reply = iq.reply
330	reply.allowed_actions = [:complete]
331
332	Customer.for_jid(iq.from.stripped).then { |customer|
333		BuyAccountCreditForm.for(customer).then do |credit_form|
334			credit_form.add_to_form(reply.form)
335			COMMAND_MANAGER.write(reply).then { |iq2| [customer, credit_form, iq2] }
336		end
337	}.then { |(customer, credit_form, iq2)|
338		iq = iq2 # This allows the catch to use it also
339		Transaction.sale(customer, **credit_form.parse(iq2.form))
340	}.then { |transaction|
341		transaction.insert.then { transaction.amount }
342	}.then { |amount|
343		reply_with_note(iq, "$#{'%.2f' % amount} added to your account balance.")
344	}.catch_only(BuyAccountCreditForm::AmountValidationError) { |e|
345		reply_with_note(iq, e.message, type: :error)
346	}.catch { |e|
347		sentry_hub.capture_exception(e)
348		text = "Failed to buy credit, system said: #{e.message}"
349		reply_with_note(iq, text, type: :error)
350	}.catch { |e| panic(e, sentry_hub) }
351end
352
353command :execute?, node: "reset sip account", sessionid: nil do |iq|
354	sentry_hub = new_sentry_hub(iq, name: iq.node)
355	Customer.for_jid(iq.from.stripped).then { |customer|
356		sentry_hub.current_scope.set_user(
357			id: customer.customer_id,
358			jid: iq.from.stripped.to_s
359		)
360		customer.reset_sip_account
361	}.then { |sip_account|
362		reply = iq.reply
363		reply.command << sip_account.form
364		BLATHER << reply
365	}.catch { |e| panic(e, sentry_hub) }
366end
367
368command :execute?, node: "usage", sessionid: nil do |iq|
369	sentry_hub = new_sentry_hub(iq, name: iq.node)
370	report_for = (Date.today..(Date.today << 1))
371
372	Customer.for_jid(iq.from.stripped).then { |customer|
373		sentry_hub.current_scope.set_user(
374			id: customer.customer_id,
375			jid: iq.from.stripped.to_s
376		)
377
378		customer.usage_report(report_for)
379	}.then { |usage_report|
380		reply = iq.reply
381		reply.status = :completed
382		reply.command << usage_report.form
383		BLATHER << reply
384	}.catch { |e| panic(e, sentry_hub) }
385end
386
387command :execute?, node: "web-register", sessionid: nil do |iq|
388	sentry_hub = new_sentry_hub(iq, name: iq.node)
389
390	begin
391		jid = iq.form.field("jid")&.value.to_s.strip
392		tel = iq.form.field("tel")&.value.to_s.strip
393		hub.current_scope.set_user(jid: jid, tel: tel)
394		if iq.from.stripped != CONFIG[:web_register][:from]
395			BLATHER << iq.as_error("forbidden", :auth)
396		elsif jid == "" || tel !~ /\A\+\d+\Z/
397			reply_with_note(iq, "Invalid JID or telephone number.", type: :error)
398		else
399			IQ_MANAGER.write(Blather::Stanza::Iq::Command.new.tap { |cmd|
400				cmd.to = CONFIG[:web_register][:to]
401				cmd.from = CONFIG[:component][:jid]
402				cmd.node = "push-register"
403				cmd.form.fields = [var: "to", value: jid]
404				cmd.form.type = "submit"
405			}).then { |result|
406				final_jid = result.form.field("from")&.value.to_s.strip
407				web_register_manager[final_jid] = tel
408				BLATHER << iq.reply.tap { |reply| reply.status = :completed }
409			}.catch { |e| panic(e, sentry_hub) }
410		end
411	rescue StandardError => e
412		sentry_hub.capture_exception(e)
413	end
414end
415
416command sessionid: /./ do |iq|
417	COMMAND_MANAGER.fulfill(iq)
418end
419
420iq type: [:result, :error] do |iq|
421	IQ_MANAGER.fulfill(iq)
422end
423
424iq type: [:get, :set] do |iq|
425	self << Blather::StanzaError.new(iq, "feature-not-implemented", :cancel)
426end