sgx_endstream.rb

  1#!/usr/bin/ruby
  2# frozen_string_literal: true
  3
  4require "blather/client/dsl"
  5require "dhall"
  6require "em-hiredis"
  7require "em-http"
  8require "em_promise"
  9require "json"
 10require "ougai"
 11require "sentry-ruby"
 12
 13require_relative "lib/addresses"
 14require_relative "lib/blather_client"
 15require_relative "lib/blather_ext"
 16require_relative "lib/em"
 17require_relative "lib/event_emitter"
 18require_relative "lib/incoming_mms"
 19require_relative "lib/oob"
 20require_relative "lib/outgoing_mms"
 21require_relative "lib/proxied_jid"
 22require_relative "lib/registration_repo"
 23
 24singleton_class.class_eval do
 25	include Blather::DSL
 26	include EventEmitter
 27
 28	Blather::DSL.append_features(self)
 29end
 30
 31$stdout.sync = true
 32LOG = Ougai::Logger.new($stdout)
 33LOG.level = ENV.fetch("LOG_LEVEL", "info")
 34LOG.formatter = Ougai::Formatters::Readable.new(
 35	nil,
 36	nil,
 37	plain: !$stdout.isatty
 38)
 39Blather.logger = LOG
 40
 41def log
 42	Thread.current[:log] || LOG
 43end
 44
 45Sentry.init do |config|
 46	config.logger = LOG
 47	config.breadcrumbs_logger = [:sentry_logger]
 48end
 49
 50CONFIG = Dhall::Coder
 51	.new(safe: Dhall::Coder::JSON_LIKE + [Symbol, Proc])
 52	.load(
 53		"(#{ARGV[0]}) : #{__dir__}/config-schema.dhall",
 54		transform_keys: ->(k) { k&.to_sym }
 55	)
 56
 57def panic(e, hub=nil)
 58	log.fatal(
 59		"Error raised during event loop: #{e.class}",
 60		e
 61	)
 62	if e.is_a?(Exception)
 63		(hub || Sentry).capture_exception(e, hint: { background: false })
 64	else
 65		(hub || Sentry).capture_message(e.to_s, hint: { background: false })
 66	end
 67	exit 1
 68end
 69
 70EM.error_handler(&method(:panic))
 71
 72@client = BlatherClient.new
 73
 74setup(
 75	CONFIG[:component][:jid],
 76	CONFIG[:component][:secret],
 77	CONFIG[:server][:host],
 78	CONFIG[:server][:port],
 79	nil,
 80	nil,
 81	async: true
 82)
 83
 84@connected = false
 85@shutdown_requested = false
 86
 87when_ready do
 88	@connected = true
 89	log.info "Ready"
 90	REDIS = EM::Hiredis.connect
 91	REGISTRATION_REPO = RegistrationRepo.new(redis: REDIS)
 92end
 93
 94disconnected do
 95	next if @shutdown_requested
 96
 97	if @connected
 98		log.fatal(
 99			"XMPP connection lost",
100			server: "#{CONFIG[:server][:host]}:#{CONFIG[:server][:port]}",
101			component: CONFIG[:component][:jid]
102		)
103	else
104		log.fatal(
105			"Failed to establish XMPP connection",
106			server: "#{CONFIG[:server][:host]}:#{CONFIG[:server][:port]}",
107			component: CONFIG[:component][:jid]
108		)
109	end
110
111	EM.stop
112end
113
114disco_info to: Blather::JID.new(CONFIG[:component][:jid]) do |iq|
115	reply = iq.reply
116	reply.identities = [{
117		name: "SGX Endstream",
118		type: "sms",
119		category: "gateway"
120	}]
121	reply.features = [
122		"http://jabber.org/protocol/disco#info",
123		"http://jabber.org/protocol/address",
124		"jabber:iq:register"
125	]
126	self << reply
127end
128
129disco_info to: /\A\+?\d+@/ do |iq|
130	reply = iq.reply
131	reply.identities = [{
132		name: "SGX Endstream",
133		type: "sms",
134		category: "client"
135	}]
136	reply.features = [
137		"urn:xmpp:receipts"
138	]
139	self << reply
140end
141
142ibr type: :get do |iq|
143	REGISTRATION_REPO.find(iq.from).then do |creds|
144		reply = iq.reply
145		reply.registered = true if creds&.first
146		reply.phone = creds&.first.to_s
147		reply.username = ""
148		reply.password = ""
149		self << reply
150	end
151end
152
153ibr type: :set do |iq|
154	unless iq.remove?
155		if iq.phone.to_s !~ /\A\+?1\d{10}\Z/
156			self << iq.as_error("bad-request", :modify, "Invalid phone number")
157			next
158		end
159
160		if iq.username.to_s.empty? || iq.password.to_s.empty?
161			self << iq.as_error(
162				"bad-request",
163				:modify,
164				"Username and password are required"
165			)
166			next
167		end
168	end
169
170	REGISTRATION_REPO.delete(iq.from).then { |status|
171		next status if !status[:code].to_i.zero? || iq.remove?
172
173		REGISTRATION_REPO.put(iq.from, iq.phone, iq.username, iq.password)
174	}.then { |status|
175		self << if status[:code].to_i.zero?
176			iq.reply
177		else
178			iq.as_error("bad-request", :modify, status[:text])
179		end
180	}
181end
182
183message from: /@sms.chat.1pcom.net\Z/ do |m|
184	json = JSON.parse(m.body) rescue nil
185
186	tel = if m.from.node.length > 7
187		"+#{m.from.node}"
188	else
189		"#{m.from.node};phone-context=ca-us.phone-context.soprani.ca"
190	end
191	m = m.dup.tap { _1.subject = nil } # They send a generic subject
192	m.from = Blather::JID.new(tel, CONFIG[:component][:jid])
193	m.to = ProxiedJID.new(m.to).unproxied
194
195	if json.is_a?(Hash) && (resp = json["response"])
196		log.info("SMS Status", json)
197		m = m.reply.tap { _1.body = "" }.tap { _1.id = resp["id"] }.as_error(
198			"recipient-unavailable",
199			:cancel,
200			"#{resp['text']} (#{resp['code']} #{resp['subcode']} #{resp['dlrid']})"
201		)
202		emit_failed_event(
203			endstream_id: resp["id"],
204			error_code: "#{resp['code']} #{resp['subcode']}",
205			error_description: resp["text"]
206		)
207	else
208		emit_incoming_event(m.to, from: tel, body: m.body, endstream_id: m.id)
209	end
210
211	self << m
212end
213
214message from: /@mms.chat.1pcom.net\Z/ do |m|
215	json = JSON.parse(m.body)
216	if json.is_a?(Hash) && json["GlobalStatus"]
217		log.info("MMS Status", json["disposition"])
218		next
219	end
220
221	IncomingMMS.for(m.to, json).then { |incoming|
222		to_send = incoming.to_stanza
223		to_send.id = m.id
224		to_send.from = Blather::JID.new("+#{m.from.node}", CONFIG[:component][:jid])
225		self << to_send
226
227		emit_incoming_event(
228			incoming.unproxied_to,
229			from: "+#{m.from.node}",
230			body: incoming.body_text,
231			endstream_id: m.id,
232			media_urls: incoming.media_urls
233		)
234	}
235end
236
237# Swallow errors, endstream doesn't want them
238message type: :error do
239	true
240end
241
242# @parameter m [Blather::Stanza::Message]
243def send_outgoing_mms(m)
244	oobs = m.oobs
245	id = m.id
246	self << OutgoingMMS.for(m).to_stanza(id: id, from: m.from)
247
248	emit_outgoing_event(
249		m.from,
250		to: m.recipients,
251		body: m.body.to_s.sub(oobs.first&.url.to_s, ""), # OOB's already captured
252		stanza_id: id,
253		media_urls: oobs.map(&:url)
254	)
255end
256
257message :addresses, to: Blather::JID.new(CONFIG[:component][:jid]) do |m|
258	send_outgoing_mms(m)
259end
260
261message ->(m) { !m.oobs.empty? }, to: /\A\+?\d+@/ do |m|
262	# TODO: if too big or bad mime, send sms
263	send_outgoing_mms(m)
264end
265
266def too_long_for_sms?(m)
267	# ~3 segments
268	m.body.length > (m.body.ascii_only? ? 160 : 70) * 3
269end
270
271message :body, method(:too_long_for_sms?).to_proc, to: /\A\+?\d+@/ do |m|
272	send_outgoing_mms(m)
273end
274
275message(
276	:body,
277	to: /(?:\A\+?\d+@)|(?:;phone-context=ca-us\.phone-context\.soprani\.ca@)/
278) do |m|
279	owner_jid = m.from
280	dest = m.to.node
281	body = m.body
282	stanza_id = m.id
283
284	m.to = Blather::JID.new(
285		dest.sub(/\A\+/, "").sub(/;phone-context=.*\Z/, ""),
286		"sms.chat.1pcom.net"
287	)
288	m.from = ProxiedJID.proxy(m.from, CONFIG[:component][:jid])
289	self << m
290
291	emit_outgoing_event(
292		owner_jid,
293		to: [dest],
294		body: body,
295		stanza_id: stanza_id
296	)
297end
298
299iq type: [:get, :set] do |iq|
300	self << iq.as_error("service-unavailable", :cancel)
301end
302
303trap(:INT) do
304	@shutdown_requested = true
305	EM.stop
306end
307
308trap(:TERM) do
309	@shutdown_requested = true
310	EM.stop
311end
312
313EM.run { client.run }