sgx_jmp.rb

  1# frozen_string_literal: true
  2
  3require "pg/em/connection_pool"
  4require "bigdecimal"
  5require "blather/client/dsl" # Require this first to not auto-include
  6require "blather/client"
  7require "braintree"
  8require "date"
  9require "dhall"
 10require "em-hiredis"
 11require "em_promise"
 12require "ruby-bandwidth-iris"
 13require "sentry-ruby"
 14require "statsd-instrument"
 15
 16$stdout.sync = true
 17
 18Sentry.init
 19
 20CONFIG =
 21	Dhall::Coder
 22	.new(safe: Dhall::Coder::JSON_LIKE + [Symbol, Proc])
 23	.load(ARGV[0], transform_keys: ->(k) { k&.to_sym })
 24
 25singleton_class.class_eval do
 26	include Blather::DSL
 27	Blather::DSL.append_features(self)
 28end
 29
 30require_relative "lib/alt_top_up_form"
 31require_relative "lib/add_bitcoin_address"
 32require_relative "lib/backend_sgx"
 33require_relative "lib/bandwidth_tn_order"
 34require_relative "lib/btc_sell_prices"
 35require_relative "lib/buy_account_credit_form"
 36require_relative "lib/command_list"
 37require_relative "lib/customer"
 38require_relative "lib/electrum"
 39require_relative "lib/error_to_send"
 40require_relative "lib/em"
 41require_relative "lib/payment_methods"
 42require_relative "lib/registration"
 43require_relative "lib/transaction"
 44require_relative "lib/web_register_manager"
 45require_relative "lib/statsd"
 46
 47ELECTRUM = Electrum.new(**CONFIG[:electrum])
 48EM::Hiredis::Client.load_scripts_from("./redis_lua")
 49
 50Faraday.default_adapter = :em_synchrony
 51BandwidthIris::Client.global_options = {
 52	account_id: CONFIG[:creds][:account],
 53	username: CONFIG[:creds][:username],
 54	password: CONFIG[:creds][:password]
 55}
 56
 57def new_sentry_hub(stanza, name: nil)
 58	hub = Sentry.get_current_hub&.new_from_top
 59	raise "Sentry.init has not been called" unless hub
 60
 61	hub.push_scope
 62	hub.current_scope.clear_breadcrumbs
 63	hub.current_scope.set_transaction_name(name) if name
 64	hub.current_scope.set_user(jid: stanza.from.stripped.to_s)
 65	hub
 66end
 67
 68# Braintree is not async, so wrap in EM.defer for now
 69class AsyncBraintree
 70	def initialize(environment:, merchant_id:, public_key:, private_key:, **)
 71		@gateway = Braintree::Gateway.new(
 72			environment: environment,
 73			merchant_id: merchant_id,
 74			public_key: public_key,
 75			private_key: private_key
 76		)
 77	end
 78
 79	def respond_to_missing?(m, *)
 80		@gateway.respond_to?(m)
 81	end
 82
 83	def method_missing(m, *args)
 84		return super unless respond_to_missing?(m, *args)
 85
 86		EM.promise_defer(klass: PromiseChain) do
 87			@gateway.public_send(m, *args)
 88		end
 89	end
 90
 91	class PromiseChain < EMPromise
 92		def respond_to_missing?(*)
 93			false # We don't actually know what we respond to...
 94		end
 95
 96		def method_missing(m, *args)
 97			return super if respond_to_missing?(m, *args)
 98			self.then { |o| o.public_send(m, *args) }
 99		end
100	end
101end
102
103BRAINTREE = AsyncBraintree.new(**CONFIG[:braintree])
104
105def panic(e, hub=nil)
106	m = e.respond_to?(:message) ? e.message : e
107	warn "Error raised during event loop: #{e.class}: #{m}"
108	warn e.backtrace if e.respond_to?(:backtrace)
109	if e.is_a?(::Exception)
110		(hub || Sentry).capture_exception(e, hint: { background: false })
111	else
112		(hub || Sentry).capture_message(e.to_s, hint: { background: false })
113	end
114	exit 1
115end
116
117EM.error_handler(&method(:panic))
118
119when_ready do
120	BLATHER = self
121	REDIS = EM::Hiredis.connect
122	BTC_SELL_PRICES = BTCSellPrices.new(REDIS, CONFIG[:oxr_app_id])
123	DB = PG::EM::ConnectionPool.new(dbname: "jmp") do |conn|
124		conn.type_map_for_results = PG::BasicTypeMapForResults.new(conn)
125		conn.type_map_for_queries = PG::BasicTypeMapForQueries.new(conn)
126	end
127
128	EM.add_periodic_timer(3600) do
129		ping = Blather::Stanza::Iq::Ping.new(:get, CONFIG[:server][:host])
130		ping.from = CONFIG[:component][:jid]
131		self << ping
132	end
133end
134
135# workqueue_count MUST be 0 or else Blather uses threads!
136setup(
137	CONFIG[:component][:jid],
138	CONFIG[:component][:secret],
139	CONFIG[:server][:host],
140	CONFIG[:server][:port],
141	nil,
142	nil,
143	workqueue_count: 0
144)
145
146message to: /\Aaccount@/, body: /./ do |m|
147	StatsD.increment("deprecated_account_bot")
148
149	self << m.reply.tap do |out|
150		out.body = "This bot is deprecated. Please talk to xmpp:cheogram.com"
151	end
152end
153
154before(
155	:iq,
156	type: [:error, :result],
157	to: /\Acustomer_/,
158	from: /(\A|@)#{CONFIG[:sgx]}(\/|\Z)/
159) { |iq| halt if IQ_MANAGER.fulfill(iq) }
160
161before nil, to: /\Acustomer_/, from: /(\A|@)#{CONFIG[:sgx]}(\/|\Z)/ do |s|
162	StatsD.increment("stanza_customer")
163
164	sentry_hub = new_sentry_hub(s, name: "stanza_customer")
165	Customer.for_customer_id(
166		s.to.node.delete_prefix("customer_")
167	).then { |customer|
168		sentry_hub.current_scope.set_user(
169			id: customer.customer_id,
170			jid: s.from.stripped.to_s
171		)
172		customer.stanza_to(s)
173	}.catch { |e| panic(e, sentry_hub) }
174	halt
175end
176
177ADDRESSES_NS = "http://jabber.org/protocol/address"
178message(
179	to: /\A#{CONFIG[:component][:jid]}\Z/,
180	from: /(\A|@)#{CONFIG[:sgx]}(\/|\Z)/
181) do |m|
182	StatsD.increment("inbound_group_text")
183	sentry_hub = new_sentry_hub(m, name: "message")
184
185	address = m.find("ns:addresses", ns: ADDRESSES_NS).first
186		&.find("ns:address", ns: ADDRESSES_NS)
187		&.find { |el| el["jid"].to_s.start_with?("customer_") }
188	pass unless address
189
190	Customer.for_customer_id(
191		Blather::JID.new(address["jid"].to_s).node.delete_prefix("customer_")
192	).then(&:jid).then { |customer_jid|
193		m.from = m.from.with(domain: CONFIG[:component][:jid])
194		m.to = m.to.with(domain: customer_jid.domain)
195		address["jid"] = customer_jid.to_s
196		BLATHER << m
197	}.catch { |e| panic(e, sentry_hub) }
198end
199
200# Ignore groupchat messages
201# Especially if we have the component join MUC for notifications
202message(type: :groupchat) { true }
203
204def billable_message(m)
205	(m.body && !m.body.empty?) || m.find("ns:x", ns: OOB.registered_ns).first
206end
207
208message do |m|
209	StatsD.increment("message")
210
211	sentry_hub = new_sentry_hub(m, name: "message")
212	today = Time.now.utc.to_date
213	Customer.for_jid(m.from.stripped).then { |customer|
214		sentry_hub.current_scope.set_user(
215			id: customer.customer_id, jid: m.from.stripped.to_s
216		)
217		EMPromise.all([
218			customer, (customer.incr_message_usage if billable_message(m)),
219			REDIS.exists("jmp_usage_notify-#{customer.customer_id}"),
220			customer.stanza_from(m)
221		])
222	}.then { |(customer, _, already, _)|
223		next if already == 1
224
225		customer.message_usage((today..(today - 30))).then do |usage|
226			next unless usage > 500
227
228			BLATHER.join(CONFIG[:notify_admin], "sgx-jmp")
229			BLATHER.say(
230				CONFIG[:notify_admin],
231				"#{customer.customer_id} has used #{usage} messages since #{today - 30}",
232				:groupchat
233			)
234			REDIS.set("jmp_usage_notify-#{customer.customer_id}", ex: 60 * 60 * 24)
235		end
236	}.catch { |e| panic(e, sentry_hub) }
237end
238
239message :error? do |m|
240	StatsD.increment("message_error")
241
242	puts "MESSAGE ERROR: #{m.inspect}"
243end
244
245class SessionManager
246	def initialize(blather, id_msg, timeout: 5, error_if: nil)
247		@blather = blather
248		@sessions = {}
249		@id_msg = id_msg
250		@timeout = timeout
251		@error_if = error_if
252	end
253
254	def promise_for(stanza)
255		id = "#{stanza.to.stripped}/#{stanza.public_send(@id_msg)}"
256		@sessions.fetch(id) do
257			@sessions[id] = EMPromise.new
258			EM.add_timer(@timeout) do
259				@sessions.delete(id)&.reject(:timeout)
260			end
261			@sessions[id]
262		end
263	end
264
265	def write(stanza)
266		promise = promise_for(stanza)
267		@blather << stanza
268		promise
269	end
270
271	def fulfill(stanza)
272		id = "#{stanza.from.stripped}/#{stanza.public_send(@id_msg)}"
273		if stanza.error? || @error_if&.call(stanza)
274			@sessions.delete(id)&.reject(stanza)
275		else
276			@sessions.delete(id)&.fulfill(stanza)
277		end
278	end
279end
280
281IQ_MANAGER = SessionManager.new(self, :id)
282COMMAND_MANAGER = SessionManager.new(
283	self,
284	:sessionid,
285	timeout: 60 * 60,
286	error_if: ->(s) { s.cancel? }
287)
288web_register_manager = WebRegisterManager.new
289
290disco_info to: Blather::JID.new(CONFIG[:component][:jid]) do |iq|
291	reply = iq.reply
292	reply.identities = [{
293		name: "JMP.chat",
294		type: "sms",
295		category: "gateway"
296	}]
297	reply.features = [
298		"http://jabber.org/protocol/disco#info",
299		"http://jabber.org/protocol/commands"
300	]
301	form = Blather::Stanza::X.find_or_create(reply.query)
302	form.type = "result"
303	form.fields = [
304		{
305			var: "FORM_TYPE",
306			type: "hidden",
307			value: "http://jabber.org/network/serverinfo"
308		}
309	] + CONFIG[:xep0157]
310	self << reply
311end
312
313disco_info do |iq|
314	reply = iq.reply
315	reply.identities = [{
316		name: "JMP.chat",
317		type: "sms",
318		category: "client"
319	}]
320	reply.features = [
321		"urn:xmpp:receipts"
322	]
323	self << reply
324end
325
326disco_items node: "http://jabber.org/protocol/commands" do |iq|
327	StatsD.increment("command_list")
328
329	sentry_hub = new_sentry_hub(iq, name: iq.node)
330	reply = iq.reply
331
332	CommandList.for(iq.from.stripped).then { |list|
333		reply.items = list.map do |item|
334			Blather::Stanza::DiscoItems::Item.new(
335				iq.to,
336				item[:node],
337				item[:name]
338			)
339		end
340		self << reply
341	}.catch { |e| panic(e, sentry_hub) }
342end
343
344iq "/iq/ns:services", ns: "urn:xmpp:extdisco:2" do |iq|
345	StatsD.increment("extdisco")
346
347	reply = iq.reply
348	reply << Nokogiri::XML::Builder.new {
349		services(xmlns: "urn:xmpp:extdisco:2") do
350			service(
351				type: "sip",
352				host: CONFIG[:sip_host]
353			)
354		end
355	}.doc.root
356
357	self << reply
358end
359
360command :execute?, node: "jabber:iq:register", sessionid: nil do |iq|
361	StatsD.increment("command", tags: ["node:#{iq.node}"])
362
363	sentry_hub = new_sentry_hub(iq, name: iq.node)
364	EMPromise.resolve(nil).then {
365		Customer.for_jid(iq.from.stripped)
366	}.catch {
367		sentry_hub.add_breadcrumb(Sentry::Breadcrumb.new(
368			message: "Customer.create"
369		))
370		Customer.create(iq.from.stripped)
371	}.then { |customer|
372		sentry_hub.current_scope.set_user(
373			id: customer.customer_id,
374			jid: iq.from.stripped.to_s
375		)
376		sentry_hub.add_breadcrumb(Sentry::Breadcrumb.new(
377			message: "Registration.for"
378		))
379		Registration.for(
380			iq,
381			customer,
382			web_register_manager
383		).then(&:write).then { StatsD.increment("registration.completed") }
384	}.catch_only(ErrorToSend) { |e|
385		self << e.stanza
386	}.catch { |e| panic(e, sentry_hub) }
387end
388
389def reply_with_note(iq, text, type: :info)
390	reply = iq.reply
391	reply.status = :completed
392	reply.note_type = type
393	reply.note_text = text
394
395	self << reply
396end
397
398# Commands that just pass through to the SGX
399command node: [
400	"number-display",
401	"configure-calls",
402	"record-voicemail-greeting"
403] do |iq|
404	StatsD.increment("command", tags: ["node:#{iq.node}"])
405
406	sentry_hub = new_sentry_hub(iq, name: iq.node)
407	Customer.for_jid(iq.from.stripped).then { |customer|
408		sentry_hub.current_scope.set_user(
409			id: customer.customer_id,
410			jid: iq.from.stripped.to_s
411		)
412
413		customer.stanza_from(iq)
414	}.catch { |e| panic(e, sentry_hub) }
415end
416
417command :execute?, node: "credit cards", sessionid: nil do |iq|
418	StatsD.increment("command", tags: ["node:#{iq.node}"])
419
420	sentry_hub = new_sentry_hub(iq, name: iq.node)
421	reply = iq.reply
422	reply.status = :completed
423
424	Customer.for_jid(iq.from.stripped).then { |customer|
425		oob = OOB.find_or_create(reply.command)
426		oob.url = CONFIG[:credit_card_url].call(
427			reply.to.stripped.to_s.gsub("\\", "%5C"),
428			customer.customer_id
429		)
430		oob.desc = "Manage credits cards and settings"
431
432		reply.note_type = :info
433		reply.note_text = "#{oob.desc}: #{oob.url}"
434
435		self << reply
436	}.catch { |e| panic(e, sentry_hub) }
437end
438
439command :execute?, node: "top up", sessionid: nil do |iq|
440	StatsD.increment("command", tags: ["node:#{iq.node}"])
441
442	sentry_hub = new_sentry_hub(iq, name: iq.node)
443	reply = iq.reply
444	reply.allowed_actions = [:complete]
445
446	Customer.for_jid(iq.from.stripped).then { |customer|
447		BuyAccountCreditForm.for(customer).then do |credit_form|
448			credit_form.add_to_form(reply.form)
449			COMMAND_MANAGER.write(reply).then { |iq2| [customer, credit_form, iq2] }
450		end
451	}.then { |(customer, credit_form, iq2)|
452		iq = iq2 # This allows the catch to use it also
453		Transaction.sale(customer, **credit_form.parse(iq2.form))
454	}.then { |transaction|
455		transaction.insert.then do
456			reply_with_note(iq, "#{transaction} added to your account balance.")
457		end
458	}.catch_only(BuyAccountCreditForm::AmountValidationError) { |e|
459		reply_with_note(iq, e.message, type: :error)
460	}.catch { |e|
461		sentry_hub.capture_exception(e)
462		text = "Failed to buy credit, system said: #{e.message}"
463		reply_with_note(iq, text, type: :error)
464	}.catch { |e| panic(e, sentry_hub) }
465end
466
467command :execute?, node: "alt top up", sessionid: nil do |iq|
468	StatsD.increment("command", tags: ["node:#{iq.node}"])
469
470	sentry_hub = new_sentry_hub(iq, name: iq.node)
471	reply = iq.reply
472	reply.status = :executing
473	reply.allowed_actions = [:complete]
474
475	Customer.for_jid(iq.from.stripped).then { |customer|
476		sentry_hub.current_scope.set_user(
477			id: customer.customer_id,
478			jid: iq.from.stripped.to_s
479		)
480
481		EMPromise.all([AltTopUpForm.for(customer), customer])
482	}.then { |(alt_form, customer)|
483		reply.command << alt_form.form
484
485		COMMAND_MANAGER.write(reply).then do |iq2|
486			AddBitcoinAddress.for(iq2, alt_form, customer).write
487		end
488	}.catch { |e| panic(e, sentry_hub) }
489end
490
491command :execute?, node: "reset sip account", sessionid: nil do |iq|
492	StatsD.increment("command", tags: ["node:#{iq.node}"])
493
494	sentry_hub = new_sentry_hub(iq, name: iq.node)
495	Customer.for_jid(iq.from.stripped).then { |customer|
496		sentry_hub.current_scope.set_user(
497			id: customer.customer_id,
498			jid: iq.from.stripped.to_s
499		)
500		customer.reset_sip_account
501	}.then { |sip_account|
502		reply = iq.reply
503		reply.command << sip_account.form
504		BLATHER << reply
505	}.catch { |e| panic(e, sentry_hub) }
506end
507
508command :execute?, node: "usage", sessionid: nil do |iq|
509	StatsD.increment("command", tags: ["node:#{iq.node}"])
510
511	sentry_hub = new_sentry_hub(iq, name: iq.node)
512	report_for = (Date.today..(Date.today << 1))
513
514	Customer.for_jid(iq.from.stripped).then { |customer|
515		sentry_hub.current_scope.set_user(
516			id: customer.customer_id,
517			jid: iq.from.stripped.to_s
518		)
519
520		customer.usage_report(report_for)
521	}.then { |usage_report|
522		reply = iq.reply
523		reply.status = :completed
524		reply.command << usage_report.form
525		BLATHER << reply
526	}.catch { |e| panic(e, sentry_hub) }
527end
528
529command :execute?, node: "web-register", sessionid: nil do |iq|
530	StatsD.increment("command", tags: ["node:#{iq.node}"])
531
532	sentry_hub = new_sentry_hub(iq, name: iq.node)
533
534	begin
535		jid = iq.form.field("jid")&.value.to_s.strip
536		tel = iq.form.field("tel")&.value.to_s.strip
537		sentry_hub.current_scope.set_user(jid: jid, tel: tel)
538		if iq.from.stripped != CONFIG[:web_register][:from]
539			BLATHER << iq.as_error("forbidden", :auth)
540		elsif jid == "" || tel !~ /\A\+\d+\Z/
541			reply_with_note(iq, "Invalid JID or telephone number.", type: :error)
542		else
543			IQ_MANAGER.write(Blather::Stanza::Iq::Command.new.tap { |cmd|
544				cmd.to = CONFIG[:web_register][:to]
545				cmd.node = "push-register"
546				cmd.form.fields = [var: "to", value: jid]
547				cmd.form.type = "submit"
548			}).then { |result|
549				final_jid = result.form.field("from")&.value.to_s.strip
550				web_register_manager[final_jid] = tel
551				BLATHER << iq.reply.tap { |reply| reply.status = :completed }
552			}.catch { |e| panic(e, sentry_hub) }
553		end
554	rescue StandardError => e
555		sentry_hub.capture_exception(e)
556	end
557end
558
559command sessionid: /./ do |iq|
560	COMMAND_MANAGER.fulfill(iq)
561end
562
563iq type: [:result, :error] do |iq|
564	IQ_MANAGER.fulfill(iq)
565end
566
567iq type: [:get, :set] do |iq|
568	StatsD.increment("unknown_iq")
569
570	self << Blather::StanzaError.new(iq, "feature-not-implemented", :cancel)
571end