sgx_jmp.rb

  1# frozen_string_literal: true
  2
  3require "pg/em/connection_pool"
  4require "bigdecimal"
  5require "blather/client/dsl" # Require this first to not auto-include
  6require "blather/client"
  7require "braintree"
  8require "date"
  9require "dhall"
 10require "em-hiredis"
 11require "em_promise"
 12require "ougai"
 13require "ruby-bandwidth-iris"
 14require "sentry-ruby"
 15require "statsd-instrument"
 16
 17$stdout.sync = true
 18LOG = Ougai::Logger.new($stdout)
 19LOG.level = ENV.fetch("LOG_LEVEL", "info")
 20LOG.formatter = Ougai::Formatters::Readable.new(
 21	nil,
 22	nil,
 23	plain: !$stdout.isatty
 24)
 25Blather.logger = LOG
 26EM::Hiredis.logger = LOG
 27StatsD.logger = LOG
 28LOG.info "Starting"
 29
 30Sentry.init do |config|
 31	config.logger = LOG
 32	config.breadcrumbs_logger = [:sentry_logger]
 33end
 34
 35module SentryOugai
 36	class SentryLogger
 37		include Sentry::Breadcrumb::SentryLogger
 38		include Singleton
 39	end
 40
 41	def _log(severity, message=nil, ex=nil, data=nil, &block)
 42		super
 43		SentryLogger.instance.add_breadcrumb(severity, message || ex.to_s, &block)
 44	end
 45end
 46LOG.extend SentryOugai
 47
 48CONFIG =
 49	Dhall::Coder
 50	.new(safe: Dhall::Coder::JSON_LIKE + [Symbol, Proc])
 51	.load(
 52		"(#{ARGV[0]}) : #{__dir__}/config-schema.dhall",
 53		transform_keys: ->(k) { k&.to_sym }
 54	)
 55
 56singleton_class.class_eval do
 57	include Blather::DSL
 58	Blather::DSL.append_features(self)
 59end
 60
 61require_relative "lib/polyfill"
 62require_relative "lib/alt_top_up_form"
 63require_relative "lib/add_bitcoin_address"
 64require_relative "lib/backend_sgx"
 65require_relative "lib/bandwidth_tn_order"
 66require_relative "lib/btc_sell_prices"
 67require_relative "lib/buy_account_credit_form"
 68require_relative "lib/command"
 69require_relative "lib/command_list"
 70require_relative "lib/customer"
 71require_relative "lib/customer_info_form"
 72require_relative "lib/customer_repo"
 73require_relative "lib/electrum"
 74require_relative "lib/expiring_lock"
 75require_relative "lib/em"
 76require_relative "lib/low_balance"
 77require_relative "lib/payment_methods"
 78require_relative "lib/registration"
 79require_relative "lib/transaction"
 80require_relative "lib/web_register_manager"
 81require_relative "lib/session_manager"
 82require_relative "lib/statsd"
 83
 84ELECTRUM = Electrum.new(**CONFIG[:electrum])
 85EM::Hiredis::Client.load_scripts_from("./redis_lua")
 86
 87Faraday.default_adapter = :em_synchrony
 88BandwidthIris::Client.global_options = {
 89	account_id: CONFIG[:creds][:account],
 90	username: CONFIG[:creds][:username],
 91	password: CONFIG[:creds][:password]
 92}
 93
 94def new_sentry_hub(stanza, name: nil)
 95	hub = Sentry.get_current_hub&.new_from_top
 96	raise "Sentry.init has not been called" unless hub
 97
 98	hub.push_scope
 99	hub.current_scope.clear_breadcrumbs
100	hub.current_scope.set_transaction_name(name) if name
101	hub.current_scope.set_user(jid: stanza.from.stripped.to_s)
102	hub
103end
104
105class AuthError < StandardError; end
106
107# Braintree is not async, so wrap in EM.defer for now
108class AsyncBraintree
109	def initialize(environment:, merchant_id:, public_key:, private_key:, **)
110		@gateway = Braintree::Gateway.new(
111			environment: environment,
112			merchant_id: merchant_id,
113			public_key: public_key,
114			private_key: private_key
115		)
116		@gateway.config.logger = LOG
117	end
118
119	def respond_to_missing?(m, *)
120		@gateway.respond_to?(m)
121	end
122
123	def method_missing(m, *args)
124		return super unless respond_to_missing?(m, *args)
125
126		EM.promise_defer(klass: PromiseChain) do
127			@gateway.public_send(m, *args)
128		end
129	end
130
131	class PromiseChain < EMPromise
132		def respond_to_missing?(*)
133			false # We don't actually know what we respond to...
134		end
135
136		def method_missing(m, *args)
137			return super if respond_to_missing?(m, *args)
138			self.then { |o| o.public_send(m, *args) }
139		end
140	end
141end
142
143BRAINTREE = AsyncBraintree.new(**CONFIG[:braintree])
144
145def panic(e, hub=nil)
146	(Thread.current[:log] || LOG).fatal(
147		"Error raised during event loop: #{e.class}",
148		e
149	)
150	if e.is_a?(::Exception)
151		(hub || Sentry).capture_exception(e, hint: { background: false })
152	else
153		(hub || Sentry).capture_message(e.to_s, hint: { background: false })
154	end
155	exit 1
156end
157
158EM.error_handler(&method(:panic))
159
160def poll_for_notify(db)
161	db.wait_for_notify_defer.then { |notify|
162		CustomerRepo.new.find(notify[:extra])
163	}.then(&LowBalance.method(:for)).then(&:notify!).then {
164		poll_for_notify(db)
165	}.catch(&method(:panic))
166end
167
168when_ready do
169	LOG.info "Ready"
170	BLATHER = self
171	REDIS = EM::Hiredis.connect
172	WEB_REGISTER_MANAGER = WebRegisterManager.new
173	BTC_SELL_PRICES = BTCSellPrices.new(REDIS, CONFIG[:oxr_app_id])
174	DB = PG::EM::ConnectionPool.new(dbname: "jmp") do |conn|
175		conn.type_map_for_results = PG::BasicTypeMapForResults.new(conn)
176		conn.type_map_for_queries = PG::BasicTypeMapForQueries.new(conn)
177	end
178
179	DB.hold do |conn|
180		conn.query("LISTEN low_balance")
181		conn.query("SELECT customer_id FROM balances WHERE balance < 5").each do |c|
182			conn.query("SELECT pg_notify('low_balance', $1)", c.values)
183		end
184		poll_for_notify(conn)
185	end
186
187	EM.add_periodic_timer(3600) do
188		ping = Blather::Stanza::Iq::Ping.new(:get, CONFIG[:server][:host])
189		ping.from = CONFIG[:component][:jid]
190		self << ping
191	end
192end
193
194# workqueue_count MUST be 0 or else Blather uses threads!
195setup(
196	CONFIG[:component][:jid],
197	CONFIG[:component][:secret],
198	CONFIG[:server][:host],
199	CONFIG[:server][:port],
200	nil,
201	nil,
202	workqueue_count: 0
203)
204
205message to: /\Aaccount@/, body: /./ do |m|
206	StatsD.increment("deprecated_account_bot")
207
208	self << m.reply.tap do |out|
209		out.body = "This bot is deprecated. Please talk to xmpp:cheogram.com"
210	end
211end
212
213before(
214	:iq,
215	type: [:error, :result],
216	to: /\Acustomer_/,
217	from: /(\A|@)#{CONFIG[:sgx]}(\/|\Z)/
218) { |iq| halt if IQ_MANAGER.fulfill(iq) }
219
220before nil, to: /\Acustomer_/, from: /(\A|@)#{CONFIG[:sgx]}(\/|\Z)/ do |s|
221	StatsD.increment("stanza_customer")
222
223	sentry_hub = new_sentry_hub(s, name: "stanza_customer")
224	CustomerRepo.new.find(
225		s.to.node.delete_prefix("customer_")
226	).then { |customer|
227		sentry_hub.current_scope.set_user(
228			id: customer.customer_id,
229			jid: s.from.stripped.to_s
230		)
231		customer.stanza_to(s)
232	}.catch { |e| panic(e, sentry_hub) }
233	halt
234end
235
236ADDRESSES_NS = "http://jabber.org/protocol/address"
237message(
238	to: /\A#{CONFIG[:component][:jid]}\Z/,
239	from: /(\A|@)#{CONFIG[:sgx]}(\/|\Z)/
240) do |m|
241	StatsD.increment("inbound_group_text")
242	sentry_hub = new_sentry_hub(m, name: "message")
243
244	address = m.find("ns:addresses", ns: ADDRESSES_NS).first
245		&.find("ns:address", ns: ADDRESSES_NS)
246		&.find { |el| el["jid"].to_s.start_with?("customer_") }
247	pass unless address
248
249	CustomerRepo.new.find(
250		Blather::JID.new(address["jid"].to_s).node.delete_prefix("customer_")
251	).then { |customer|
252		m.from = m.from.with(domain: CONFIG[:component][:jid])
253		m.to = m.to.with(domain: customer.jid.domain)
254		address["jid"] = customer.jid.to_s
255		BLATHER << m
256	}.catch { |e| panic(e, sentry_hub) }
257end
258
259# Ignore groupchat messages
260# Especially if we have the component join MUC for notifications
261message(type: :groupchat) { true }
262
263def billable_message(m)
264	(m.body && !m.body.empty?) || m.find("ns:x", ns: OOB.registered_ns).first
265end
266
267message do |m|
268	StatsD.increment("message")
269
270	sentry_hub = new_sentry_hub(m, name: "message")
271	today = Time.now.utc.to_date
272	CustomerRepo.new.find_by_jid(m.from.stripped).then { |customer|
273		sentry_hub.current_scope.set_user(
274			id: customer.customer_id, jid: m.from.stripped.to_s
275		)
276		EMPromise.all([
277			(customer.incr_message_usage if billable_message(m)),
278			customer.stanza_from(m)
279		]).then { customer }
280	}.then { |customer|
281		customer.message_usage((today..(today - 30))).then do |usage|
282			next unless usage > 500
283
284			ExpiringLock.new("jmp_usage_notify-#{customer.customer_id}").with do
285				BLATHER.join(CONFIG[:notify_admin], "sgx-jmp")
286				BLATHER.say(
287					CONFIG[:notify_admin],
288					"#{customer.customer_id} has used #{usage} messages since #{today - 30}",
289					:groupchat
290				)
291			end
292		end
293	}.catch { |e| panic(e, sentry_hub) }
294end
295
296message :error? do |m|
297	StatsD.increment("message_error")
298
299	LOG.error "MESSAGE ERROR", stanza: m
300end
301
302IQ_MANAGER = SessionManager.new(self, :id)
303COMMAND_MANAGER = SessionManager.new(
304	self,
305	:sessionid,
306	timeout: 60 * 60,
307	error_if: ->(s) { s.cancel? }
308)
309
310disco_info to: Blather::JID.new(CONFIG[:component][:jid]) do |iq|
311	reply = iq.reply
312	reply.identities = [{
313		name: "JMP.chat",
314		type: "sms",
315		category: "gateway"
316	}]
317	reply.features = [
318		"http://jabber.org/protocol/disco#info",
319		"http://jabber.org/protocol/commands"
320	]
321	form = Blather::Stanza::X.find_or_create(reply.query)
322	form.type = "result"
323	form.fields = [
324		{
325			var: "FORM_TYPE",
326			type: "hidden",
327			value: "http://jabber.org/network/serverinfo"
328		}
329	] + CONFIG[:xep0157]
330	self << reply
331end
332
333disco_info do |iq|
334	reply = iq.reply
335	reply.identities = [{
336		name: "JMP.chat",
337		type: "sms",
338		category: "client"
339	}]
340	reply.features = [
341		"urn:xmpp:receipts"
342	]
343	self << reply
344end
345
346disco_items node: "http://jabber.org/protocol/commands" do |iq|
347	StatsD.increment("command_list")
348
349	sentry_hub = new_sentry_hub(iq, name: iq.node)
350	reply = iq.reply
351	reply.node = "http://jabber.org/protocol/commands"
352
353	CustomerRepo.new.find_by_jid(iq.from.stripped).catch {
354		nil
355	}.then { |customer|
356		CommandList.for(customer)
357	}.then { |list|
358		reply.items = list.map do |item|
359			Blather::Stanza::DiscoItems::Item.new(
360				iq.to,
361				item[:node],
362				item[:name]
363			)
364		end
365		self << reply
366	}.catch { |e| panic(e, sentry_hub) }
367end
368
369iq "/iq/ns:services", ns: "urn:xmpp:extdisco:2" do |iq|
370	StatsD.increment("extdisco")
371
372	reply = iq.reply
373	reply << Nokogiri::XML::Builder.new {
374		services(xmlns: "urn:xmpp:extdisco:2") do
375			service(
376				type: "sip",
377				host: CONFIG[:sip_host]
378			)
379		end
380	}.doc.root
381
382	self << reply
383end
384
385Command.new(
386	"jabber:iq:register",
387	"Register",
388	list_for: ->(*) { true }
389) {
390	Command.customer.catch {
391		Sentry.add_breadcrumb(Sentry::Breadcrumb.new(message: "Customer.create"))
392		Command.execution.customer_repo.create(Command.execution.iq.from.stripped)
393	}.then { |customer|
394		Sentry.add_breadcrumb(Sentry::Breadcrumb.new(message: "Registration.for"))
395		Registration.for(customer, WEB_REGISTER_MANAGER).then(&:write)
396	}.then {
397		StatsD.increment("registration.completed")
398	}.catch_only(Command::Execution::FinalStanza) do |e|
399		StatsD.increment("registration.completed")
400		EMPromise.reject(e)
401	end
402}.register(self).then(&CommandList.method(:register))
403
404# Commands that just pass through to the SGX
405{
406	"number-display" => ["Display JMP Number"],
407	"configure-calls" => ["Configure Calls"],
408	"record-voicemail-greeting" => [
409		"Record Voicemail Greeting",
410		list_for: ->(fwd: nil, **) { !!fwd }
411	]
412}.each do |node, args|
413	Command.new(node, *args) {
414		Command.customer.then do |customer|
415			customer.stanza_from(Command.execution.iq)
416		end
417	}.register(self, guards: [node: node]).then(&CommandList.method(:register))
418end
419
420Command.new(
421	"credit cards",
422	"Credit Card Settings and Management"
423) {
424	Command.customer.then do |customer|
425		url = CONFIG[:credit_card_url].call(
426			customer.jid.to_s.gsub("\\", "%5C"),
427			customer.customer_id
428		)
429		desc = "Manage credits cards and settings"
430		Command.finish("#{desc}: #{url}") do |reply|
431			oob = OOB.find_or_create(reply.command)
432			oob.url = url
433			oob.desc = desc
434		end
435	end
436}.register(self).then(&CommandList.method(:register))
437
438Command.new(
439	"top up",
440	"Buy Account Credit by Credit Card",
441	list_for: ->(payment_methods: [], **) { !payment_methods.empty? },
442	format_error: ->(e) { "Failed to buy credit, system said: #{e.message}" }
443) {
444	Command.customer.then { |customer|
445		BuyAccountCreditForm.for(customer).then do |credit_form|
446			Command.reply { |reply|
447				reply.allowed_actions = [:complete]
448				credit_form.add_to_form(reply.form)
449			}.then do |iq|
450				Transaction.sale(customer, **credit_form.parse(iq.form))
451			end
452		end
453	}.then { |transaction|
454		transaction.insert.then do
455			Command.finish("#{transaction} added to your account balance.")
456		end
457	}.catch_only(BuyAccountCreditForm::AmountValidationError) do |e|
458		Command.finish(e.message, type: :error)
459	end
460}.register(self).then(&CommandList.method(:register))
461
462Command.new(
463	"alt top up",
464	"Buy Account Credit by Bitcoin, Mail, or Interac eTransfer",
465	list_for: ->(customer:, **) { !!customer&.currency }
466) {
467	Command.customer.then { |customer|
468		EMPromise.all([AltTopUpForm.for(customer), customer])
469	}.then do |(alt_form, customer)|
470		Command.reply { |reply|
471			reply.allowed_actions = [:complete]
472			reply.command << alt_form.form
473		}.then do |iq|
474			AddBitcoinAddress.for(iq, alt_form, customer).write
475		end
476	end
477}.register(self).then(&CommandList.method(:register))
478
479Command.new(
480	"reset sip account",
481	"Create or Reset SIP Account"
482) {
483	Command.customer.then(&:reset_sip_account).then do |sip_account|
484		Command.finish do |reply|
485			reply.command << sip_account.form
486		end
487	end
488}.register(self).then(&CommandList.method(:register))
489
490Command.new(
491	"usage",
492	"Show Monthly Usage"
493) {
494	report_for = (Date.today..(Date.today << 1))
495
496	Command.customer.then { |customer|
497		customer.usage_report(report_for)
498	}.then do |usage_report|
499		Command.finish do |reply|
500			reply.command << usage_report.form
501		end
502	end
503}.register(self).then(&CommandList.method(:register))
504
505Command.new(
506	"invite codes",
507	"Refer a friend for free credit"
508) {
509	Command.customer.then(&:unused_invites).then do |invites|
510		if invites.empty?
511			Command.finish("You have no more invites right now, try again later.")
512		else
513			Command.finish do |reply|
514				reply.form.type = :result
515				reply.form.title = "Unused Invite Codes"
516				reply.form.instructions =
517					"Each of these codes is single use and gives the person using " \
518					"them a free month of JMP service. You will receive credit " \
519					"equivalent to one month of free service if they later become " \
520					"a paying customer."
521				FormTable.new(
522					invites.map { |i| [i] },
523					code: "Invite Code"
524				).add_to_form(reply.form)
525			end
526		end
527	end
528}.register(self).then(&CommandList.method(:register))
529
530command :execute?, node: "web-register", sessionid: nil do |iq|
531	StatsD.increment("command", tags: ["node:#{iq.node}"])
532
533	sentry_hub = new_sentry_hub(iq, name: iq.node)
534
535	begin
536		jid = iq.form.field("jid")&.value.to_s.strip
537		tel = iq.form.field("tel")&.value.to_s.strip
538		sentry_hub.current_scope.set_user(jid: jid, tel: tel)
539		if iq.from.stripped != CONFIG[:web_register][:from]
540			BLATHER << iq.as_error("forbidden", :auth)
541		elsif jid == "" || tel !~ /\A\+\d+\Z/
542			reply_with_note(iq, "Invalid JID or telephone number.", type: :error)
543		else
544			IQ_MANAGER.write(Blather::Stanza::Iq::Command.new.tap { |cmd|
545				cmd.to = CONFIG[:web_register][:to]
546				cmd.node = "push-register"
547				cmd.form.fields = [var: "to", value: jid]
548				cmd.form.type = "submit"
549			}).then { |result|
550				WEB_REGISTER_MANAGER.set(result.form.field("from")&.value.to_s.strip, tel)
551			}.then {
552				BLATHER << iq.reply.tap { |reply| reply.status = :completed }
553			}.catch { |e| panic(e, sentry_hub) }
554		end
555	rescue StandardError => e
556		sentry_hub.capture_exception(e)
557	end
558end
559
560Command.new(
561	"info",
562	"Show Account Info",
563	list_for: ->(*) { true }
564) {
565	Command.customer.then(&:info).then do |info|
566		Command.finish do |reply|
567			form = Blather::Stanza::X.new(:result)
568			form.title = "Account Info"
569			form.fields = info.fields
570			reply.command << form
571		end
572	end
573}.register(self).then(&CommandList.method(:register))
574
575Command.new(
576	"customer info",
577	"Show Customer Info",
578	list_for: ->(customer: nil, **) { customer&.admin? }
579) {
580	Command.customer.then do |customer|
581		raise AuthError, "You are not an admin" unless customer&.admin?
582
583		customer_info = CustomerInfoForm.new
584		Command.reply { |reply|
585			reply.allowed_actions = [:next]
586			reply.command << customer_info.picker_form
587		}.then { |response|
588			customer_info.find_customer(response)
589		}.then do |target_customer|
590			target_customer.admin_info.then do |info|
591				Command.finish do |reply|
592					form = Blather::Stanza::X.new(:result)
593					form.title = "Customer Info"
594					form.fields = info.fields
595					reply.command << form
596				end
597			end
598		end
599	end
600}.register(self).then(&CommandList.method(:register))
601
602command sessionid: /./ do |iq|
603	COMMAND_MANAGER.fulfill(iq)
604end
605
606iq type: [:result, :error] do |iq|
607	IQ_MANAGER.fulfill(iq)
608end
609
610iq type: [:get, :set] do |iq|
611	StatsD.increment("unknown_iq")
612
613	self << Blather::StanzaError.new(iq, "feature-not-implemented", :cancel)
614end