sgx_endstream.rb

  1#!/usr/bin/ruby
  2# frozen_string_literal: true
  3
  4require "blather/client/dsl"
  5require "dhall"
  6require "em-hiredis"
  7require "em-http"
  8require "em_promise"
  9require "json"
 10require "ougai"
 11require "sentry-ruby"
 12
 13require_relative "lib/addresses"
 14require_relative "lib/blather_client"
 15require_relative "lib/em"
 16require_relative "lib/event_emitter"
 17require_relative "lib/incoming_mms"
 18require_relative "lib/oob"
 19require_relative "lib/outgoing_mms"
 20require_relative "lib/proxied_jid"
 21require_relative "lib/registration_repo"
 22Dir["#{File.dirname(__FILE__)}/lib/blather_ext/**/*.rb"]
 23	.sort
 24	.each(&Kernel.method(:require))
 25
 26singleton_class.class_eval do
 27	include Blather::DSL
 28	include EventEmitter
 29
 30	Blather::DSL.append_features(self)
 31end
 32
 33$stdout.sync = true
 34LOG = Ougai::Logger.new($stdout)
 35LOG.level = ENV.fetch("LOG_LEVEL", "info")
 36LOG.formatter = Ougai::Formatters::Readable.new(
 37	nil,
 38	nil,
 39	plain: !$stdout.isatty
 40)
 41Blather.logger = LOG
 42
 43def log
 44	Thread.current[:log] || LOG
 45end
 46
 47Sentry.init do |config|
 48	config.logger = LOG
 49	config.breadcrumbs_logger = [:sentry_logger]
 50end
 51
 52CONFIG = Dhall::Coder
 53	.new(safe: Dhall::Coder::JSON_LIKE + [Symbol, Proc])
 54	.load(
 55		"(#{ARGV[0]}) : #{__dir__}/config-schema.dhall",
 56		transform_keys: ->(k) { k&.to_sym }
 57	)
 58
 59def panic(e, hub=nil)
 60	log.fatal(
 61		"Error raised during event loop: #{e.class}",
 62		e
 63	)
 64	if e.is_a?(Exception)
 65		(hub || Sentry).capture_exception(e, hint: { background: false })
 66	else
 67		(hub || Sentry).capture_message(e.to_s, hint: { background: false })
 68	end
 69	exit 1
 70end
 71
 72EM.error_handler(&method(:panic))
 73
 74@client = BlatherClient.new
 75
 76setup(
 77	CONFIG[:component][:jid],
 78	CONFIG[:component][:secret],
 79	CONFIG[:server][:host],
 80	CONFIG[:server][:port],
 81	nil,
 82	nil,
 83	async: true
 84)
 85
 86@connected = false
 87@shutdown_requested = false
 88
 89when_ready do
 90	@connected = true
 91	log.info "Ready"
 92	REDIS = EM::Hiredis.connect
 93	REGISTRATION_REPO = RegistrationRepo.new(redis: REDIS)
 94end
 95
 96disconnected do
 97	next if @shutdown_requested
 98
 99	if @connected
100		log.fatal(
101			"XMPP connection lost",
102			server: "#{CONFIG[:server][:host]}:#{CONFIG[:server][:port]}",
103			component: CONFIG[:component][:jid]
104		)
105	else
106		log.fatal(
107			"Failed to establish XMPP connection",
108			server: "#{CONFIG[:server][:host]}:#{CONFIG[:server][:port]}",
109			component: CONFIG[:component][:jid]
110		)
111	end
112
113	EM.stop
114end
115
116disco_info to: Blather::JID.new(CONFIG[:component][:jid]) do |iq|
117	reply = iq.reply
118	reply.identities = [{
119		name: "SGX Endstream",
120		type: "sms",
121		category: "gateway"
122	}]
123	reply.features = [
124		"http://jabber.org/protocol/disco#info",
125		"http://jabber.org/protocol/address",
126		"jabber:iq:register"
127	]
128	self << reply
129end
130
131disco_info to: /\A\+?\d+@/ do |iq|
132	reply = iq.reply
133	reply.identities = [{
134		name: "SGX Endstream",
135		type: "sms",
136		category: "client"
137	}]
138	reply.features = [
139		"urn:xmpp:receipts"
140	]
141	self << reply
142end
143
144ibr type: :get do |iq|
145	REGISTRATION_REPO.find(iq.from).then do |creds|
146		reply = iq.reply
147		reply.registered = true if creds&.first
148		reply.phone = creds&.first.to_s
149		reply.username = ""
150		reply.password = ""
151		self << reply
152	end
153end
154
155ibr type: :set do |iq|
156	unless iq.remove?
157		if iq.phone.to_s !~ /\A\+?1\d{10}\Z/
158			self << iq.as_error("bad-request", :modify, "Invalid phone number")
159			next
160		end
161
162		if iq.username.to_s.empty? || iq.password.to_s.empty?
163			self << iq.as_error(
164				"bad-request",
165				:modify,
166				"Username and password are required"
167			)
168			next
169		end
170	end
171
172	REGISTRATION_REPO.delete(iq.from).then { |status|
173		next status if !status[:code].to_i.zero? || iq.remove?
174
175		REGISTRATION_REPO.put(iq.from, iq.phone, iq.username, iq.password)
176	}.then { |status|
177		self << if status[:code].to_i.zero?
178			iq.reply
179		else
180			iq.as_error("bad-request", :modify, status[:text])
181		end
182	}
183end
184
185message from: /@sms.chat.1pcom.net\Z/ do |m|
186	json = JSON.parse(m.body) rescue nil
187
188	tel = if m.from.node.length > 7
189		"+#{m.from.node}"
190	else
191		"#{m.from.node};phone-context=ca-us.phone-context.soprani.ca"
192	end
193	m = m.dup.tap { _1.subject = nil } # They send a generic subject
194	m.from = Blather::JID.new(tel, CONFIG[:component][:jid])
195	m.to = ProxiedJID.new(m.to).unproxied
196
197	if json.is_a?(Hash) && (resp = json["response"])
198		log.info("SMS Status", json)
199		m = m.reply.tap { _1.body = "" }.tap { _1.id = resp["id"] }.as_error(
200			"recipient-unavailable",
201			:cancel,
202			"#{resp['text']} (#{resp['code']} #{resp['subcode']} #{resp['dlrid']})"
203		)
204		emit_failed_event(
205			endstream_id: resp["id"],
206			error_code: "#{resp['code']} #{resp['subcode']}",
207			error_description: resp["text"]
208		)
209	else
210		emit_incoming_event(m.to, from: tel, body: m.body, endstream_id: m.id)
211	end
212
213	self << m
214end
215
216message from: /@mms.chat.1pcom.net\Z/ do |m|
217	json = JSON.parse(m.body)
218	if json.is_a?(Hash) && json["GlobalStatus"]
219		log.info("MMS Status", json["disposition"])
220		next
221	end
222
223	IncomingMMS.for(m.to, json).then { |incoming|
224		tel = if m.from.node.length > 7
225			"+#{m.from.node}"
226		else
227			"#{m.from.node};phone-context=ca-us.phone-context.soprani.ca"
228		end
229
230		to_send = incoming.to_stanza
231		to_send.id = m.id
232		to_send.from = Blather::JID.new(tel, CONFIG[:component][:jid])
233		self << to_send
234
235		emit_incoming_event(
236			incoming.unproxied_to,
237			from: tel,
238			body: incoming.body_text,
239			endstream_id: m.id,
240			media_urls: incoming.media_urls
241		)
242	}
243end
244
245# Swallow errors, endstream doesn't want them
246message type: :error do
247	true
248end
249
250# @parameter m [Blather::Stanza::Message]
251def send_outgoing_mms(m)
252	oobs = m.oobs
253	id = m.id
254	self << OutgoingMMS.for(m).to_stanza(id: id, from: m.from)
255
256	emit_outgoing_event(
257		m.from,
258		to: m.recipients,
259		body: m.body.to_s.sub(oobs.first&.url.to_s, ""), # OOB's already captured
260		stanza_id: id,
261		media_urls: oobs.map(&:url)
262	)
263end
264
265message :addresses, to: Blather::JID.new(CONFIG[:component][:jid]) do |m|
266	send_outgoing_mms(m)
267end
268
269message ->(m) { !m.oobs.empty? }, to: /\A\+?\d+@/ do |m|
270	# TODO: if too big or bad mime, send sms
271	send_outgoing_mms(m)
272end
273
274def too_long_for_sms?(m)
275	# ~3 segments
276	m.body.length > (m.body.ascii_only? ? 160 : 70) * 3
277end
278
279message :body, method(:too_long_for_sms?).to_proc, to: /\A\+?\d+@/ do |m|
280	send_outgoing_mms(m)
281end
282
283message(
284	:body,
285	to: /(?:\A\+?\d+@)|(?:;phone-context=ca-us\.phone-context\.soprani\.ca@)/
286) do |m|
287	owner_jid = m.from
288	dest = m.to.node
289	body = m.body
290	stanza_id = m.id
291
292	m.to = Blather::JID.new(
293		dest.sub(/\A\+/, "").sub(/;phone-context=.*\Z/, ""),
294		"sms.chat.1pcom.net"
295	)
296	m.from = ProxiedJID.proxy(m.from, CONFIG[:component][:jid])
297	self << m
298
299	emit_outgoing_event(
300		owner_jid,
301		to: [dest],
302		body: body,
303		stanza_id: stanza_id
304	)
305end
306
307iq type: [:get, :set] do |iq|
308	self << iq.as_error("service-unavailable", :cancel)
309end
310
311trap(:INT) do
312	@shutdown_requested = true
313	EM.stop
314end
315
316trap(:TERM) do
317	@shutdown_requested = true
318	EM.stop
319end
320
321EM.run { client.run }