sgx_jmp.rb

  1# frozen_string_literal: true
  2
  3require "pg/em"
  4require "bigdecimal"
  5require "blather/client/dsl" # Require this first to not auto-include
  6require "blather/client"
  7require "braintree"
  8require "date"
  9require "dhall"
 10require "em-hiredis"
 11require "em_promise"
 12require "ruby-bandwidth-iris"
 13require "sentry-ruby"
 14
 15Sentry.init
 16
 17CONFIG =
 18	Dhall::Coder
 19	.new(safe: Dhall::Coder::JSON_LIKE + [Symbol, Proc])
 20	.load(ARGV[0], transform_keys: ->(k) { k&.to_sym })
 21
 22singleton_class.class_eval do
 23	include Blather::DSL
 24	Blather::DSL.append_features(self)
 25end
 26
 27require_relative "lib/backend_sgx"
 28require_relative "lib/bandwidth_tn_order"
 29require_relative "lib/btc_sell_prices"
 30require_relative "lib/buy_account_credit_form"
 31require_relative "lib/command_list"
 32require_relative "lib/customer"
 33require_relative "lib/electrum"
 34require_relative "lib/em"
 35require_relative "lib/payment_methods"
 36require_relative "lib/registration"
 37require_relative "lib/transaction"
 38require_relative "lib/web_register_manager"
 39
 40ELECTRUM = Electrum.new(**CONFIG[:electrum])
 41
 42Faraday.default_adapter = :em_synchrony
 43BandwidthIris::Client.global_options = {
 44	account_id: CONFIG[:creds][:account],
 45	username: CONFIG[:creds][:username],
 46	password: CONFIG[:creds][:password]
 47}
 48
 49def new_sentry_hub(stanza, name: nil)
 50	hub = Sentry.get_current_hub&.new_from_top
 51	raise "Sentry.init has not been called" unless hub
 52
 53	hub.push_scope
 54	hub.current_scope.clear_breadcrumbs
 55	hub.current_scope.set_transaction_name(name) if name
 56	hub.current_scope.set_user(jid: stanza.from.stripped.to_s)
 57	hub
 58end
 59
 60# Braintree is not async, so wrap in EM.defer for now
 61class AsyncBraintree
 62	def initialize(environment:, merchant_id:, public_key:, private_key:, **)
 63		@gateway = Braintree::Gateway.new(
 64			environment: environment,
 65			merchant_id: merchant_id,
 66			public_key: public_key,
 67			private_key: private_key
 68		)
 69	end
 70
 71	def respond_to_missing?(m, *)
 72		@gateway.respond_to?(m)
 73	end
 74
 75	def method_missing(m, *args)
 76		return super unless respond_to_missing?(m, *args)
 77
 78		EM.promise_defer(klass: PromiseChain) do
 79			@gateway.public_send(m, *args)
 80		end
 81	end
 82
 83	class PromiseChain < EMPromise
 84		def respond_to_missing?(*)
 85			false # We don't actually know what we respond to...
 86		end
 87
 88		def method_missing(m, *args)
 89			return super if respond_to_missing?(m, *args)
 90			self.then { |o| o.public_send(m, *args) }
 91		end
 92	end
 93end
 94
 95BRAINTREE = AsyncBraintree.new(**CONFIG[:braintree])
 96
 97def panic(e, hub=nil)
 98	m = e.respond_to?(:message) ? e.message : e
 99	warn "Error raised during event loop: #{e.class}: #{m}"
100	warn e.backtrace if e.respond_to?(:backtrace)
101	if e.is_a?(::Exception)
102		(hub || Sentry).capture_exception(e, hint: { background: false })
103	else
104		(hub || Sentry).capture_message(e.to_s, hint: { background: false })
105	end
106	exit 1
107end
108
109EM.error_handler(&method(:panic))
110
111when_ready do
112	BLATHER = self
113	REDIS = EM::Hiredis.connect
114	BTC_SELL_PRICES = BTCSellPrices.new(REDIS, CONFIG[:oxr_app_id])
115	DB = PG::EM::Client.new(dbname: "jmp")
116	DB.type_map_for_results = PG::BasicTypeMapForResults.new(DB)
117	DB.type_map_for_queries = PG::BasicTypeMapForQueries.new(DB)
118
119	EM.add_periodic_timer(3600) do
120		ping = Blather::Stanza::Iq::Ping.new(:get, CONFIG[:server][:host])
121		ping.from = CONFIG[:component][:jid]
122		self << ping
123	end
124end
125
126# workqueue_count MUST be 0 or else Blather uses threads!
127setup(
128	CONFIG[:component][:jid],
129	CONFIG[:component][:secret],
130	CONFIG[:server][:host],
131	CONFIG[:server][:port],
132	nil,
133	nil,
134	workqueue_count: 0
135)
136
137message to: /\Aaccount@/ do |m|
138	self << m.reply.tap do |out|
139		out.body = "This bot is deprecated. Please talk to xmpp:cheogram.com"
140	end
141end
142
143before nil, to: /\Acustomer_/, from: /(\A|@)#{CONFIG[:sgx]}(\/|\Z)/ do |s|
144	sentry_hub = new_sentry_hub(s, name: "stanza_customer")
145	Customer.for_customer_id(
146		s.to.node.delete_prefix("customer_")
147	).then { |customer|
148		sentry_hub.current_scope.set_user(
149			id: customer.customer_id,
150			jid: s.from.stripped.to_s
151		)
152		customer.stanza_to(s)
153	}.catch { |e| panic(e, sentry_hub) }
154	halt
155end
156
157message do |m|
158	sentry_hub = new_sentry_hub(m, name: "message")
159	Customer.for_jid(m.from.stripped).then { |customer|
160		sentry_hub.current_scope.set_user(
161			id: customer.customer_id,
162			jid: m.from.stripped.to_s
163		)
164		today = Time.now.utc.to_date
165		EMPromise.all([
166			REDIS.zremrangebylex(
167				"jmp_customer_outbound_messages-#{customer.customer_id}",
168				"-",
169				# Store message counts per day for 1 year
170				"[#{(today << 12).strftime('%Y%m%d')}"
171			),
172			REDIS.zincrby(
173				"jmp_customer_outbound_messages-#{customer.customer_id}",
174				1,
175				today.strftime("%Y%m%d")
176			),
177			customer.stanza_from(m)
178		])
179	}.catch { |e| panic(e, sentry_hub) }
180end
181
182message :error? do |m|
183	puts "MESSAGE ERROR: #{m.inspect}"
184end
185
186class SessionManager
187	def initialize(blather, id_msg, timeout: 5)
188		@blather = blather
189		@sessions = {}
190		@id_msg = id_msg
191		@timeout = timeout
192	end
193
194	def promise_for(stanza)
195		id = "#{stanza.to.stripped}/#{stanza.public_send(@id_msg)}"
196		@sessions.fetch(id) do
197			@sessions[id] = EMPromise.new
198			EM.add_timer(@timeout) do
199				@sessions.delete(id)&.reject(:timeout)
200			end
201			@sessions[id]
202		end
203	end
204
205	def write(stanza)
206		promise = promise_for(stanza)
207		@blather << stanza
208		promise
209	end
210
211	def fulfill(stanza)
212		id = "#{stanza.from.stripped}/#{stanza.public_send(@id_msg)}"
213		if stanza.error?
214			@sessions.delete(id)&.reject(stanza)
215		else
216			@sessions.delete(id)&.fulfill(stanza)
217		end
218	end
219end
220
221IQ_MANAGER = SessionManager.new(self, :id)
222COMMAND_MANAGER = SessionManager.new(self, :sessionid, timeout: 60 * 60)
223web_register_manager = WebRegisterManager.new
224
225disco_info to: Blather::JID.new(CONFIG[:component][:jid]) do |iq|
226	reply = iq.reply
227	reply.identities = [{
228		name: "JMP.chat",
229		type: "sms",
230		category: "gateway"
231	}]
232	form = Blather::Stanza::X.find_or_create(reply.query)
233	form.type = "result"
234	form.fields = [
235		{
236			var: "FORM_TYPE",
237			type: "hidden",
238			value: "http://jabber.org/network/serverinfo"
239		}
240	] + CONFIG[:xep0157]
241	self << reply
242end
243
244disco_items node: "http://jabber.org/protocol/commands" do |iq|
245	sentry_hub = new_sentry_hub(iq, name: iq.node)
246	reply = iq.reply
247
248	CommandList.for(iq.from.stripped).then { |list|
249		reply.items = list.map do |item|
250			Blather::Stanza::DiscoItems::Item.new(
251				iq.to,
252				item[:node],
253				item[:name]
254			)
255		end
256		self << reply
257	}.catch { |e| panic(e, sentry_hub) }
258end
259
260command :execute?, node: "jabber:iq:register", sessionid: nil do |iq|
261	sentry_hub = new_sentry_hub(iq, name: iq.node)
262	EMPromise.resolve(nil).then {
263		Customer.for_jid(iq.from.stripped)
264	}.catch {
265		sentry_hub.add_breadcrumb(Sentry::Breadcrumb.new(
266			message: "Customer.create"
267		))
268		Customer.create(iq.from.stripped)
269	}.then { |customer|
270		sentry_hub.current_scope.set_user(
271			id: customer.customer_id,
272			jid: iq.from.stripped.to_s
273		)
274		sentry_hub.add_breadcrumb(Sentry::Breadcrumb.new(
275			message: "Registration.for"
276		))
277		Registration.for(
278			iq,
279			customer,
280			web_register_manager
281		).then(&:write)
282	}.catch_only(Blather::Stanza) { |reply|
283		self << reply
284	}.catch { |e| panic(e, sentry_hub) }
285end
286
287def reply_with_note(iq, text, type: :info)
288	reply = iq.reply
289	reply.status = :completed
290	reply.note_type = type
291	reply.note_text = text
292
293	self << reply
294end
295
296# Commands that just pass through to the SGX
297command node: [
298	"number-display",
299	"configure-calls",
300	"record-voicemail-greeting"
301] do |iq|
302	sentry_hub = new_sentry_hub(iq, name: iq.node)
303	Customer.for_jid(iq.from.stripped).then { |customer|
304		sentry_hub.current_scope.set_user(
305			id: customer.customer_id,
306			jid: iq.from.stripped.to_s
307		)
308
309		customer.stanza_from(iq)
310	}.catch { |e| panic(e, sentry_hub) }
311end
312
313command :execute?, node: "buy credit", sessionid: nil do |iq|
314	sentry_hub = new_sentry_hub(iq, name: iq.node)
315	reply = iq.reply
316	reply.allowed_actions = [:complete]
317
318	Customer.for_jid(iq.from.stripped).then { |customer|
319		BuyAccountCreditForm.for(customer).then do |credit_form|
320			credit_form.add_to_form(reply.form)
321			COMMAND_MANAGER.write(reply).then { |iq2| [customer, credit_form, iq2] }
322		end
323	}.then { |(customer, credit_form, iq2)|
324		iq = iq2 # This allows the catch to use it also
325		Transaction.sale(customer, **credit_form.parse(iq2.form))
326	}.then { |transaction|
327		transaction.insert.then { transaction.amount }
328	}.then { |amount|
329		reply_with_note(iq, "$#{'%.2f' % amount} added to your account balance.")
330	}.catch_only(BuyAccountCreditForm::AmountValidationError) { |e|
331		reply_with_note(iq, e.message, type: :error)
332	}.catch { |e|
333		sentry_hub.capture_exception(e)
334		text = "Failed to buy credit, system said: #{e.message}"
335		reply_with_note(iq, text, type: :error)
336	}.catch { |e| panic(e, sentry_hub) }
337end
338
339command :execute?, node: "reset sip account", sessionid: nil do |iq|
340	sentry_hub = new_sentry_hub(iq, name: iq.node)
341	Customer.for_jid(iq.from.stripped).then { |customer|
342		sentry_hub.current_scope.set_user(
343			id: customer.customer_id,
344			jid: iq.from.stripped.to_s
345		)
346		customer.reset_sip_account
347	}.then { |sip_account|
348		reply = iq.reply
349		reply.command << sip_account.form
350		BLATHER << reply
351	}.catch { |e| panic(e, sentry_hub) }
352end
353
354command :execute?, node: "usage", sessionid: nil do |iq|
355	sentry_hub = new_sentry_hub(iq, name: iq.node)
356	report_for = (Date.today..(Date.today << 1))
357
358	Customer.for_jid(iq.from.stripped).then { |customer|
359		sentry_hub.current_scope.set_user(
360			id: customer.customer_id,
361			jid: iq.from.stripped.to_s
362		)
363
364		customer.usage_report(report_for)
365	}.then { |usage_report|
366		reply = iq.reply
367		reply.status = :completed
368		reply.command << usage_report.form
369		BLATHER << reply
370	}.catch { |e| panic(e, sentry_hub) }
371end
372
373command :execute?, node: "web-register", sessionid: nil do |iq|
374	sentry_hub = new_sentry_hub(iq, name: iq.node)
375
376	begin
377		jid = iq.form.field("jid")&.value.to_s.strip
378		tel = iq.form.field("tel")&.value.to_s.strip
379		hub.current_scope.set_user(jid: jid, tel: tel)
380		if iq.from.stripped != CONFIG[:web_register][:from]
381			BLATHER << iq.as_error("forbidden", :auth)
382		elsif jid == "" || tel !~ /\A\+\d+\Z/
383			reply_with_note(iq, "Invalid JID or telephone number.", type: :error)
384		else
385			IQ_MANAGER.write(Blather::Stanza::Iq::Command.new.tap { |cmd|
386				cmd.to = CONFIG[:web_register][:to]
387				cmd.from = CONFIG[:component][:jid]
388				cmd.node = "push-register"
389				cmd.form.fields = [var: "to", value: jid]
390				cmd.form.type = "submit"
391			}).then { |result|
392				final_jid = result.form.field("from")&.value.to_s.strip
393				web_register_manager[final_jid] = tel
394				BLATHER << iq.reply.tap { |reply| reply.status = :completed }
395			}.catch { |e| panic(e, sentry_hub) }
396		end
397	rescue StandardError => e
398		sentry_hub.capture_exception(e)
399	end
400end
401
402command sessionid: /./ do |iq|
403	COMMAND_MANAGER.fulfill(iq)
404end
405
406iq type: [:result, :error] do |iq|
407	IQ_MANAGER.fulfill(iq)
408end
409
410iq type: [:get, :set] do |iq|
411	self << Blather::StanzaError.new(iq, "feature-not-implemented", :cancel)
412end