web.rb

  1# frozen_string_literal: true
  2
  3require "digest"
  4require "forwardable"
  5require "multibases"
  6require "multihashes"
  7require "roda"
  8require "thin"
  9require "sentry-ruby"
 10
 11require_relative "lib/call_attempt_repo"
 12require_relative "lib/cdr"
 13require_relative "lib/oob"
 14require_relative "lib/rev_ai"
 15require_relative "lib/roda_capture"
 16require_relative "lib/roda_em_promise"
 17require_relative "lib/rack_fiber"
 18
 19class OGMDownload
 20	def initialize(url)
 21		@digest = Digest::SHA512.new
 22		@f = Tempfile.open("ogm")
 23		@req = EM::HttpRequest.new(url, tls: { verify_peer: true })
 24	end
 25
 26	def download
 27		http = @req.aget
 28		http.stream do |chunk|
 29			@digest << chunk
 30			@f.write chunk
 31		end
 32		http.then { @f.close }.catch do |e|
 33			@f.close!
 34			EMPromise.reject(e)
 35		end
 36	end
 37
 38	def cid
 39		Multibases.encode(
 40			"base58btc",
 41			[1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
 42		).pack.to_s
 43	end
 44
 45	def path
 46		@f.path
 47	end
 48end
 49
 50# rubocop:disable Metrics/ClassLength
 51class Web < Roda
 52	use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
 53	use Sentry::Rack::CaptureExceptions
 54	plugin :json_parser
 55	plugin :type_routing
 56	plugin :public
 57	plugin :render, engine: "slim"
 58	plugin RodaCapture
 59	plugin RodaEMPromise # Must go last!
 60
 61	class << self
 62		attr_reader :customer_repo, :log, :outbound_transfers
 63
 64		def run(log, *listen_on)
 65			plugin :common_logger, log, method: :info
 66			@outbound_transfers = {}
 67			Thin::Logging.logger = log
 68			Thin::Server.start(
 69				*listen_on,
 70				freeze.app,
 71				signals: false
 72			)
 73		end
 74	end
 75
 76	extend Forwardable
 77	def_delegators :'self.class', :outbound_transfers
 78	def_delegators :request, :params
 79
 80	def log
 81		opts[:common_logger]
 82	end
 83
 84	def log_error(e)
 85		log.error(
 86			"Error raised during #{request.fullpath}: #{e.class}",
 87			e,
 88			loggable_params
 89		)
 90		if e.is_a?(::Exception)
 91			Sentry.capture_exception(e)
 92		else
 93			Sentry.capture_message(e.to_s)
 94		end
 95	end
 96
 97	def loggable_params
 98		params.dup.tap do |p|
 99			p.delete("to")
100			p.delete("from")
101		end
102	end
103
104	def customer_repo(**kwargs)
105		kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
106		opts[:customer_repo] || CustomerRepo.new(**kwargs)
107	end
108
109	def find_by_tel_with_fallback(sgx_repo:, **kwargs)
110		customer_repo(sgx_repo: sgx_repo).find_by_tel(params["to"]).catch { |e|
111			next EMPromise.reject(e) if e.is_a?(CustomerRepo::NotFound)
112
113			log_error(e)
114			customer_repo(
115				sgx_repo: TrivialBackendSgxRepo.new(**kwargs)
116			).find_by_tel(params["to"])
117		}
118	end
119
120	def call_attempt_repo
121		opts[:call_attempt_repo] || CallAttemptRepo.new
122	end
123
124	def rev_ai
125		RevAi.new(logger: log.child(loggable_params))
126	end
127
128	TEL_CANDIDATES = {
129		"Restricted" => "14",
130		"anonymous" => "15",
131		"Anonymous" => "16",
132		"unavailable" => "17",
133		"Unavailable" => "18"
134	}.freeze
135
136	def sanitize_tel_candidate(candidate)
137		if candidate.length < 3
138			"13;phone-context=anonymous.phone-context.soprani.ca"
139		elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
140			candidate
141		else
142			"#{TEL_CANDIDATES.fetch(candidate, '19')}" \
143				";phone-context=anonymous.phone-context.soprani.ca"
144		end
145	end
146
147	def from_jid
148		Blather::JID.new(
149			sanitize_tel_candidate(params["from"]),
150			CONFIG[:component][:jid]
151		)
152	end
153
154	def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
155		[
156			"/inbound/calls/#{call_id || params['callId']}",
157			suffix
158		].compact.join("/") +
159			(customer_id ? "?customer_id=#{customer_id}" : "")
160	end
161
162	def url(path)
163		"#{request.base_url}#{path}"
164	end
165
166	def modify_call(call_id)
167		body = Bandwidth::ApiModifyCallRequest.new
168		yield body
169		BANDWIDTH_VOICE.modify_call(
170			CONFIG[:creds][:account],
171			call_id,
172			body: body
173		)
174	rescue Bandwidth::ApiErrorResponseException
175		# If call does not exist, don't need to hang up or send to voicemail
176		# Other side must have hung up already
177		raise $! unless $!.response_code.to_s == "404"
178	end
179
180	def start_transcription(customer, call_id, media_url)
181		return unless customer.transcription_enabled
182
183		rev_ai.language_id(
184			media_url,
185			url(inbound_calls_path("voicemail/language_id", call_id: call_id)),
186			from_jid: from_jid,
187			customer_id: customer.customer_id
188		)
189	end
190
191	route do |r|
192		r.on "inbound" do
193			r.on "calls" do
194				r.post "status" do
195					if params["eventType"] == "disconnect"
196						if (outbound_leg = outbound_transfers.delete(params["callId"]))
197							modify_call(outbound_leg) do |call|
198								call.state = "completed"
199							end
200						end
201
202						customer_repo.find_by_tel(params["to"]).then do |customer|
203							CDR.for_inbound(customer.customer_id, params).save
204						end
205					end
206					"OK"
207				end
208
209				r.on :call_id do |call_id|
210					r.post "transfer_complete" do
211						outbound_leg = outbound_transfers.delete(call_id)
212						if params["cause"] == "hangup" && params["tag"] == "connected"
213							log.info "Normal hangup, now end #{call_id}", loggable_params
214							modify_call(call_id) { |call| call.state = "completed" }
215						elsif !outbound_leg
216							log.debug "Inbound disconnected", loggable_params
217						else
218							log.debug "Go to voicemail", loggable_params
219							modify_call(call_id) do |call|
220								call.redirect_url = url inbound_calls_path(:voicemail)
221							end
222						end
223						""
224					end
225
226					r.on "voicemail" do
227						r.post "audio" do
228							duration = Time.parse(params["endTime"]) -
229							           Time.parse(params["startTime"])
230							next "OK<5" unless duration > 5
231
232							jmp_media_url = params["mediaUrl"].sub(
233								/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
234								"https://jmp.chat"
235							)
236
237							find_by_tel_with_fallback(
238								sgx_repo: Bwmsgsv2Repo.new,
239								transcription_enabled: false
240							).then do |customer|
241								start_transcription(customer, call_id, jmp_media_url)
242
243								m = Blather::Stanza::Message.new
244								m.chat_state = nil
245								m.from = from_jid
246								m.subject = "New Voicemail"
247								m.body = jmp_media_url
248								m << OOB.new(jmp_media_url, desc: "Voicemail Recording")
249								customer.stanza_to(m)
250
251								"OK"
252							end
253						end
254
255						r.post "language_id" do
256							rev_ai.language_id_result(params).then { |result|
257								rev_ai.stt(
258									result["top_language"],
259									result.dig("metadata", "media_url"),
260									url(inbound_calls_path(
261										"voicemail/transcription",
262										call_id: call_id
263									)),
264									**result["metadata"].transform_keys(&:to_sym)
265								).then { "OK" }
266							}.catch_only(RevAi::Failed) { |e|
267								log_error(e)
268								"Failure logged"
269							}
270						end
271
272						r.post "transcription" do
273							rev_ai.stt_result(params, request.url).then { |result|
274								next "OK" if result["text"].to_s.empty?
275
276								customer_repo.find(
277									result.dig("metadata", "customer_id")
278								).then do |customer|
279									m = Blather::Stanza::Message.new
280									m.chat_state = nil
281									m.from = result.dig("metadata", "from_jid")
282									m.subject = "Voicemail Transcription"
283									m.body = result["text"]
284									customer.stanza_to(m)
285
286									"OK"
287								end
288							}.catch_only(RevAi::Failed) { |e|
289								log_error(e)
290								"Failure logged"
291							}
292						end
293
294						r.post do
295							find_by_tel_with_fallback(
296								sgx_repo: Bwmsgsv2Repo.new,
297								ogm_url: nil
298							).then { |c|
299								c.ogm(params["from"]) if c.fwd.voicemail_enabled?
300							}.then { |ogm|
301								next render :hangup unless ogm
302
303								render :voicemail, locals: { ogm: ogm }
304							}.catch_only(CustomerRepo::NotFound) {
305								render "inbound/no_customer"
306							}
307						end
308					end
309
310					r.post do
311						customer_repo(
312							sgx_repo: Bwmsgsv2Repo.new
313						).find(params.fetch("customer_id")).then do |customer|
314							call_attempt_repo.find_inbound(
315								customer,
316								params["from"],
317								call_id: call_id,
318								digits: params["digits"]
319							).then { |ca| render(*ca.to_render) }
320						end
321					end
322				end
323
324				r.post do
325					customer_repo(
326						sgx_repo: Bwmsgsv2Repo.new
327					).find_by_tel(params["to"]).then { |customer|
328						EMPromise.all([
329							customer.customer_id, customer.fwd,
330							call_attempt_repo.find_inbound(
331								customer, params["from"], call_id: params["callId"]
332							)
333						])
334					}.then { |(customer_id, fwd, ca)|
335						call = ca.create_call(fwd, CONFIG[:creds][:account]) { |cc|
336							cc.from = params["from"]
337							cc.application_id = params["applicationId"]
338							cc.answer_url = url inbound_calls_path(nil, customer_id)
339							cc.disconnect_url = url inbound_calls_path(:transfer_complete)
340						}
341
342						next EMPromise.reject(:voicemail) unless call
343
344						outbound_transfers[params["callId"]] = call
345						render :ring, locals: { duration: 300 }
346					}.catch_only(CustomerFwd::InfiniteTimeout) { |e|
347						render :forward, locals: { fwd: e.fwd, from: params["from"] }
348					}.catch { |e|
349						log_error(e) unless e == :voicemail
350						render :redirect, locals: { to: inbound_calls_path(:voicemail) }
351					}
352				end
353			end
354		end
355
356		r.on "outbound" do
357			r.on "calls" do
358				r.post "status" do
359					log.info "#{params['eventType']} #{params['callId']}", loggable_params
360					if params["eventType"] == "disconnect"
361						call_attempt_repo.ending_call(c, params["callId"])
362						CDR.for_outbound(params).save.catch(&method(:log_error))
363					end
364					"OK"
365				end
366
367				r.post do
368					from = params["from"].sub(/^(?:\+|c)/, "")
369					from = from.sub(/^1/, "") if from.length > 10
370					customer_repo(
371						sgx_repo: Bwmsgsv2Repo.new
372					).find_by_format(from).then { |c|
373						call_attempt_repo.find_outbound(
374							c,
375							params["to"],
376							call_id: params["callId"],
377							digits: params["digits"]
378						).then do |ca|
379							r.json { ca.to_json }
380
381							call_attempt_repo.starting_call(c, params["callId"])
382							render(*ca.to_render)
383						end
384					}.catch_only(CustomerRepo::NotFound) {
385						render "outbound/no_customer"
386					}
387				end
388			end
389		end
390
391		r.on "ogm" do
392			r.post "start" do
393				render :record_ogm, locals: { customer_id: params["customer_id"] }
394			end
395
396			r.post do
397				jmp_media_url = params["mediaUrl"].sub(
398					/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
399					"https://jmp.chat"
400				)
401				ogm = OGMDownload.new(jmp_media_url)
402				ogm.download.then do
403					File.rename(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
404					File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
405					customer_repo.find(params["customer_id"]).then do |customer|
406						customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
407					end
408				end
409			end
410		end
411
412		r.public
413	end
414end
415# rubocop:enable Metrics/ClassLength