web.rb

  1# frozen_string_literal: true
  2
  3require "digest"
  4require "forwardable"
  5require "multibases"
  6require "multihashes"
  7require "roda"
  8require "thin"
  9require "sentry-ruby"
 10
 11require_relative "lib/call_attempt_repo"
 12require_relative "lib/cdr"
 13require_relative "lib/oob"
 14require_relative "lib/rev_ai"
 15require_relative "lib/roda_capture"
 16require_relative "lib/roda_em_promise"
 17require_relative "lib/rack_fiber"
 18
 19class OGMDownload
 20	def initialize(url)
 21		@digest = Digest::SHA512.new
 22		@f = Tempfile.open("ogm")
 23		@req = EM::HttpRequest.new(url, tls: { verify_peer: true })
 24	end
 25
 26	def download
 27		http = @req.aget
 28		http.stream do |chunk|
 29			@digest << chunk
 30			@f.write chunk
 31		end
 32		http.then { @f.close }.catch do |e|
 33			@f.close!
 34			EMPromise.reject(e)
 35		end
 36	end
 37
 38	def cid
 39		Multibases.encode(
 40			"base58btc",
 41			[1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
 42		).pack.to_s
 43	end
 44
 45	def path
 46		@f.path
 47	end
 48end
 49
 50# rubocop:disable Metrics/ClassLength
 51class Web < Roda
 52	use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
 53	use Sentry::Rack::CaptureExceptions
 54	plugin :json_parser
 55	plugin :type_routing
 56	plugin :public
 57	plugin :render, engine: "slim"
 58	plugin RodaCapture
 59	plugin RodaEMPromise # Must go last!
 60
 61	class << self
 62		attr_reader :customer_repo, :log, :outbound_transfers
 63
 64		def run(log, *listen_on)
 65			plugin :common_logger, log, method: :info
 66			@outbound_transfers = {}
 67			Thin::Logging.logger = log
 68			Thin::Server.start(
 69				*listen_on,
 70				freeze.app,
 71				signals: false
 72			)
 73		end
 74	end
 75
 76	extend Forwardable
 77	def_delegators :'self.class', :outbound_transfers
 78	def_delegators :request, :params
 79
 80	def log
 81		opts[:common_logger]
 82	end
 83
 84	def log_error(e)
 85		log.error(
 86			"Error raised during #{request.fullpath}: #{e.class}",
 87			e,
 88			loggable_params
 89		)
 90		if e.is_a?(::Exception)
 91			Sentry.capture_exception(e)
 92		else
 93			Sentry.capture_message(e.to_s)
 94		end
 95	end
 96
 97	def loggable_params
 98		params.dup.tap do |p|
 99			p.delete("to")
100			p.delete("from")
101		end
102	end
103
104	def customer_repo(**kwargs)
105		kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
106		opts[:customer_repo] || CustomerRepo.new(**kwargs)
107	end
108
109	def call_attempt_repo
110		opts[:call_attempt_repo] || CallAttemptRepo.new
111	end
112
113	def rev_ai
114		RevAi.new
115	end
116
117	TEL_CANDIDATES = {
118		"Restricted" => "14",
119		"anonymous" => "15",
120		"Anonymous" => "16",
121		"unavailable" => "17",
122		"Unavailable" => "18"
123	}.freeze
124
125	def sanitize_tel_candidate(candidate)
126		if candidate.length < 3
127			"13;phone-context=anonymous.phone-context.soprani.ca"
128		elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
129			candidate
130		else
131			"#{TEL_CANDIDATES.fetch(candidate, '19')}" \
132				";phone-context=anonymous.phone-context.soprani.ca"
133		end
134	end
135
136	def from_jid
137		Blather::JID.new(
138			sanitize_tel_candidate(params["from"]),
139			CONFIG[:component][:jid]
140		)
141	end
142
143	def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
144		[
145			"/inbound/calls/#{call_id || params['callId']}",
146			suffix
147		].compact.join("/") +
148			(customer_id ? "?customer_id=#{customer_id}" : "")
149	end
150
151	def url(path)
152		"#{request.base_url}#{path}"
153	end
154
155	def modify_call(call_id)
156		body = Bandwidth::ApiModifyCallRequest.new
157		yield body
158		BANDWIDTH_VOICE.modify_call(
159			CONFIG[:creds][:account],
160			call_id,
161			body: body
162		)
163	rescue Bandwidth::ApiErrorResponseException
164		# If call does not exist, don't need to hang up or send to voicemail
165		# Other side must have hung up already
166		raise $! unless $!.response_code.to_s == "404"
167	end
168
169	def do_alternate_transcription(customer, call_id)
170		return unless customer.alternate_transcription_enabled
171
172		rev_ai.language_id(
173			jmp_media_url,
174			url(inbound_calls_path(
175				"voicemail/rev_ai/language_id", call_id: call_id
176			)),
177			from_jid: from_jid,
178			customer_id: customer.customer_id
179		)
180	end
181
182	route do |r|
183		r.on "inbound" do
184			r.on "calls" do
185				r.post "status" do
186					if params["eventType"] == "disconnect"
187						if (outbound_leg = outbound_transfers.delete(params["callId"]))
188							modify_call(outbound_leg) do |call|
189								call.state = "completed"
190							end
191						end
192
193						customer_repo.find_by_tel(params["to"]).then do |customer|
194							CDR.for_inbound(customer.customer_id, params).save
195						end
196					end
197					"OK"
198				end
199
200				r.on :call_id do |call_id|
201					r.post "transfer_complete" do
202						outbound_leg = outbound_transfers.delete(call_id)
203						if params["cause"] == "hangup" && params["tag"] == "connected"
204							log.info "Normal hangup, now end #{call_id}", loggable_params
205							modify_call(call_id) { |call| call.state = "completed" }
206						elsif !outbound_leg
207							log.debug "Inbound disconnected", loggable_params
208						else
209							log.debug "Go to voicemail", loggable_params
210							modify_call(call_id) do |call|
211								call.redirect_url = url inbound_calls_path(:voicemail)
212							end
213						end
214						""
215					end
216
217					r.on "voicemail" do
218						r.post "audio" do
219							duration = Time.parse(params["endTime"]) -
220							           Time.parse(params["startTime"])
221							next "OK<5" unless duration > 5
222
223							jmp_media_url = params["mediaUrl"].sub(
224								/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
225								"https://jmp.chat"
226							)
227
228							customer_repo(
229								sgx_repo: Bwmsgsv2Repo.new
230							).find_by_tel(params["to"]).then do |customer|
231								do_alternate_transcription(customer, call_id)
232
233								m = Blather::Stanza::Message.new
234								m.chat_state = nil
235								m.from = from_jid
236								m.subject = "New Voicemail"
237								m.body = jmp_media_url
238								m << OOB.new(jmp_media_url, desc: "Voicemail Recording")
239								customer.stanza_to(m)
240
241								"OK"
242							end
243						end
244
245						r.post "transcription" do
246							duration = Time.parse(params["endTime"]) -
247							           Time.parse(params["startTime"])
248							next "OK<5" unless duration > 5
249
250							customer_repo.find_by_tel(params["to"]).then do |customer|
251								m = Blather::Stanza::Message.new
252								m.chat_state = nil
253								m.from = from_jid
254								m.subject = "Voicemail Transcription"
255								m.body = BANDWIDTH_VOICE.get_recording_transcription(
256									params["accountId"], params["callId"], params["recordingId"]
257								).data.transcripts[0].text
258								customer.stanza_to(m)
259
260								"OK"
261							end
262						end
263
264						r.on "rev_ai" do
265							r.post "language_id" do
266								rev_ai.language_id_result(params).then do |result|
267									rev_ai.stt(
268										result["top_language"],
269										result.dig("metadata", "media_url"),
270										url(inbound_calls_path(
271											"voicemail/rev_ai/transcription",
272											call_id: call_id
273										)),
274										**result["metadata"].transform_keys(&:to_sym)
275									).then { "OK" }
276								end
277							end
278
279							r.post "transcription" do
280								rev_ai.stt_result(params).then do |result|
281									customer_repo.find(
282										result.dig("metadata", "customer_id")
283									).then do |customer|
284										m = Blather::Stanza::Message.new
285										m.chat_state = nil
286										m.from = result.dig("metadata", "from_jid")
287										m.subject = "Voicemail Transcription"
288										m.body = "Alternate Transcription: #{result['text']}"
289										customer.stanza_to(m)
290
291										"OK"
292									end
293								end
294							end
295						end
296
297						r.post do
298							customer_repo(sgx_repo: Bwmsgsv2Repo.new)
299								.find_by_tel(params["to"])
300								.then { |c|
301									EMPromise.all([c, c.ogm(params["from"])])
302								}.then do |(customer, ogm)|
303									render :voicemail, locals: {
304										ogm: ogm,
305										transcription_enabled: customer.transcription_enabled
306									}
307								end
308						end
309					end
310
311					r.post do
312						customer_repo(
313							sgx_repo: Bwmsgsv2Repo.new
314						).find(params.fetch("customer_id")).then do |customer|
315							call_attempt_repo.find_inbound(
316								customer,
317								params["from"],
318								call_id: call_id,
319								digits: params["digits"]
320							).then { |ca| render(*ca.to_render) }
321						end
322					end
323				end
324
325				r.post do
326					customer_repo(
327						sgx_repo: Bwmsgsv2Repo.new
328					).find_by_tel(params["to"]).then { |customer|
329						EMPromise.all([
330							customer.customer_id, customer.fwd,
331							call_attempt_repo.find_inbound(
332								customer, params["from"], call_id: params["callId"]
333							)
334						])
335					}.then { |(customer_id, fwd, ca)|
336						call = ca.create_call(fwd, CONFIG[:creds][:account]) { |cc|
337							cc.from = params["from"]
338							cc.application_id = params["applicationId"]
339							cc.answer_url = url inbound_calls_path(nil, customer_id)
340							cc.disconnect_url = url inbound_calls_path(:transfer_complete)
341						}
342
343						next EMPromise.reject(:voicemail) unless call
344
345						outbound_transfers[params["callId"]] = call
346						render :ring, locals: { duration: 300 }
347					}.catch { |e|
348						log_error(e) unless e == :voicemail
349						render :redirect, locals: { to: inbound_calls_path(:voicemail) }
350					}
351				end
352			end
353		end
354
355		r.on "outbound" do
356			r.on "calls" do
357				r.post "status" do
358					log.info "#{params['eventType']} #{params['callId']}", loggable_params
359					if params["eventType"] == "disconnect"
360						call_attempt_repo.ending_call(c, params["callId"])
361						CDR.for_outbound(params).save.catch(&method(:log_error))
362					end
363					"OK"
364				end
365
366				r.post do
367					from = params["from"].sub(/^\+1/, "")
368					customer_repo(
369						sgx_repo: Bwmsgsv2Repo.new
370					).find_by_format(from).then do |c|
371						call_attempt_repo.find_outbound(
372							c,
373							params["to"],
374							call_id: params["callId"],
375							digits: params["digits"]
376						).then do |ca|
377							r.json { ca.to_json }
378
379							call_attempt_repo.starting_call(c, params["callId"])
380							render(*ca.to_render)
381						end
382					end
383				end
384			end
385		end
386
387		r.on "ogm" do
388			r.post "start" do
389				render :record_ogm, locals: { customer_id: params["customer_id"] }
390			end
391
392			r.post do
393				jmp_media_url = params["mediaUrl"].sub(
394					/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
395					"https://jmp.chat"
396				)
397				ogm = OGMDownload.new(jmp_media_url)
398				ogm.download.then do
399					File.rename(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
400					File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
401					customer_repo.find(params["customer_id"]).then do |customer|
402						customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
403					end
404				end
405			end
406		end
407
408		r.public
409	end
410end
411# rubocop:enable Metrics/ClassLength