web.rb

  1# frozen_string_literal: true
  2
  3require "digest"
  4require "forwardable"
  5require "multibases"
  6require "multihashes"
  7require "roda"
  8require "thin"
  9require "sentry-ruby"
 10
 11require_relative "lib/call_attempt_repo"
 12require_relative "lib/cdr"
 13require_relative "lib/oob"
 14require_relative "lib/rev_ai"
 15require_relative "lib/roda_capture"
 16require_relative "lib/roda_em_promise"
 17require_relative "lib/rack_fiber"
 18
 19class OGMDownload
 20	def initialize(url)
 21		@digest = Digest::SHA512.new
 22		@f = Tempfile.open("ogm")
 23		@req = EM::HttpRequest.new(url, tls: { verify_peer: true })
 24	end
 25
 26	def download
 27		http = @req.aget
 28		http.stream do |chunk|
 29			@digest << chunk
 30			@f.write chunk
 31		end
 32		http.then { @f.close }.catch do |e|
 33			@f.close!
 34			EMPromise.reject(e)
 35		end
 36	end
 37
 38	def cid
 39		Multibases.encode(
 40			"base58btc",
 41			[1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
 42		).pack.to_s
 43	end
 44
 45	def path
 46		@f.path
 47	end
 48end
 49
 50# rubocop:disable Metrics/ClassLength
 51class Web < Roda
 52	use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
 53	use Sentry::Rack::CaptureExceptions
 54	plugin :json_parser
 55	plugin :type_routing
 56	plugin :public
 57	plugin :render, engine: "slim"
 58	plugin RodaCapture
 59	plugin RodaEMPromise # Must go last!
 60
 61	class << self
 62		attr_reader :customer_repo, :log, :outbound_transfers
 63
 64		def run(log, *listen_on)
 65			plugin :common_logger, log, method: :info
 66			@outbound_transfers = {}
 67			Thin::Logging.logger = log
 68			Thin::Server.start(
 69				*listen_on,
 70				freeze.app,
 71				signals: false
 72			)
 73		end
 74	end
 75
 76	extend Forwardable
 77	def_delegators :'self.class', :outbound_transfers
 78	def_delegators :request, :params
 79
 80	def log
 81		opts[:common_logger]
 82	end
 83
 84	def log_error(e)
 85		log.error(
 86			"Error raised during #{request.fullpath}: #{e.class}",
 87			e,
 88			loggable_params
 89		)
 90		if e.is_a?(::Exception)
 91			Sentry.capture_exception(e)
 92		else
 93			Sentry.capture_message(e.to_s)
 94		end
 95	end
 96
 97	def loggable_params
 98		params.dup.tap do |p|
 99			p.delete("to")
100			p.delete("from")
101		end
102	end
103
104	def customer_repo(**kwargs)
105		kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
106		opts[:customer_repo] || CustomerRepo.new(**kwargs)
107	end
108
109	def call_attempt_repo
110		opts[:call_attempt_repo] || CallAttemptRepo.new
111	end
112
113	def rev_ai
114		RevAi.new
115	end
116
117	TEL_CANDIDATES = {
118		"Restricted" => "14",
119		"anonymous" => "15",
120		"Anonymous" => "16",
121		"unavailable" => "17",
122		"Unavailable" => "18"
123	}.freeze
124
125	def sanitize_tel_candidate(candidate)
126		if candidate.length < 3
127			"13;phone-context=anonymous.phone-context.soprani.ca"
128		elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
129			candidate
130		else
131			"#{TEL_CANDIDATES.fetch(candidate, '19')}" \
132				";phone-context=anonymous.phone-context.soprani.ca"
133		end
134	end
135
136	def from_jid
137		Blather::JID.new(
138			sanitize_tel_candidate(params["from"]),
139			CONFIG[:component][:jid]
140		)
141	end
142
143	def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
144		[
145			"/inbound/calls/#{call_id || params['callId']}",
146			suffix
147		].compact.join("/") +
148			(customer_id ? "?customer_id=#{customer_id}" : "")
149	end
150
151	def url(path)
152		"#{request.base_url}#{path}"
153	end
154
155	def modify_call(call_id)
156		body = Bandwidth::ApiModifyCallRequest.new
157		yield body
158		BANDWIDTH_VOICE.modify_call(
159			CONFIG[:creds][:account],
160			call_id,
161			body: body
162		)
163	rescue Bandwidth::ApiErrorResponseException
164		# If call does not exist, don't need to hang up or send to voicemail
165		# Other side must have hung up already
166		raise $! unless $!.response_code.to_s == "404"
167	end
168
169	def start_transcription(customer, call_id, media_url)
170		return unless customer.transcription_enabled
171
172		rev_ai.language_id(
173			media_url,
174			url(inbound_calls_path("voicemail/language_id", call_id: call_id)),
175			from_jid: from_jid,
176			customer_id: customer.customer_id
177		)
178	end
179
180	route do |r|
181		r.on "inbound" do
182			r.on "calls" do
183				r.post "status" do
184					if params["eventType"] == "disconnect"
185						if (outbound_leg = outbound_transfers.delete(params["callId"]))
186							modify_call(outbound_leg) do |call|
187								call.state = "completed"
188							end
189						end
190
191						customer_repo.find_by_tel(params["to"]).then do |customer|
192							CDR.for_inbound(customer.customer_id, params).save
193						end
194					end
195					"OK"
196				end
197
198				r.on :call_id do |call_id|
199					r.post "transfer_complete" do
200						outbound_leg = outbound_transfers.delete(call_id)
201						if params["cause"] == "hangup" && params["tag"] == "connected"
202							log.info "Normal hangup, now end #{call_id}", loggable_params
203							modify_call(call_id) { |call| call.state = "completed" }
204						elsif !outbound_leg
205							log.debug "Inbound disconnected", loggable_params
206						else
207							log.debug "Go to voicemail", loggable_params
208							modify_call(call_id) do |call|
209								call.redirect_url = url inbound_calls_path(:voicemail)
210							end
211						end
212						""
213					end
214
215					r.on "voicemail" do
216						r.post "audio" do
217							duration = Time.parse(params["endTime"]) -
218							           Time.parse(params["startTime"])
219							next "OK<5" unless duration > 5
220
221							jmp_media_url = params["mediaUrl"].sub(
222								/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
223								"https://jmp.chat"
224							)
225
226							customer_repo(
227								sgx_repo: Bwmsgsv2Repo.new
228							).find_by_tel(params["to"]).then do |customer|
229								start_transcription(customer, call_id, jmp_media_url)
230
231								m = Blather::Stanza::Message.new
232								m.chat_state = nil
233								m.from = from_jid
234								m.subject = "New Voicemail"
235								m.body = jmp_media_url
236								m << OOB.new(jmp_media_url, desc: "Voicemail Recording")
237								customer.stanza_to(m)
238
239								"OK"
240							end
241						end
242
243						r.post "language_id" do
244							rev_ai.language_id_result(params).then { |result|
245								rev_ai.stt(
246									result["top_language"],
247									result.dig("metadata", "media_url"),
248									url(inbound_calls_path(
249										"voicemail/transcription",
250										call_id: call_id
251									)),
252									**result["metadata"].transform_keys(&:to_sym)
253								).then { "OK" }
254							}.catch_only(RevAi::Failed) { |e|
255								log_error(e)
256								"Failure logged"
257							}
258						end
259
260						r.post "transcription" do
261							rev_ai.stt_result(params).then { |result|
262								next "OK" if result["text"].empty?
263
264								customer_repo.find(
265									result.dig("metadata", "customer_id")
266								).then do |customer|
267									m = Blather::Stanza::Message.new
268									m.chat_state = nil
269									m.from = result.dig("metadata", "from_jid")
270									m.subject = "Voicemail Transcription"
271									m.body = result["text"]
272									customer.stanza_to(m)
273
274									"OK"
275								end
276							}.catch_only(RevAi::Failed) { |e|
277								log_error(e)
278								"Failure logged"
279							}
280						end
281
282						r.post do
283							customer_repo(sgx_repo: Bwmsgsv2Repo.new)
284								.find_by_tel(params["to"])
285								.then { |c|
286									EMPromise.all([c, c.ogm(params["from"])])
287								}.then do |(customer, ogm)|
288									render :voicemail, locals: {
289										ogm: ogm,
290										transcription_enabled: customer.transcription_enabled
291									}
292								end
293						end
294					end
295
296					r.post do
297						customer_repo(
298							sgx_repo: Bwmsgsv2Repo.new
299						).find(params.fetch("customer_id")).then do |customer|
300							call_attempt_repo.find_inbound(
301								customer,
302								params["from"],
303								call_id: call_id,
304								digits: params["digits"]
305							).then { |ca| render(*ca.to_render) }
306						end
307					end
308				end
309
310				r.post do
311					customer_repo(
312						sgx_repo: Bwmsgsv2Repo.new
313					).find_by_tel(params["to"]).then { |customer|
314						EMPromise.all([
315							customer.customer_id, customer.fwd,
316							call_attempt_repo.find_inbound(
317								customer, params["from"], call_id: params["callId"]
318							)
319						])
320					}.then { |(customer_id, fwd, ca)|
321						call = ca.create_call(fwd, CONFIG[:creds][:account]) { |cc|
322							cc.from = params["from"]
323							cc.application_id = params["applicationId"]
324							cc.answer_url = url inbound_calls_path(nil, customer_id)
325							cc.disconnect_url = url inbound_calls_path(:transfer_complete)
326						}
327
328						next EMPromise.reject(:voicemail) unless call
329
330						outbound_transfers[params["callId"]] = call
331						render :ring, locals: { duration: 300 }
332					}.catch { |e|
333						log_error(e) unless e == :voicemail
334						render :redirect, locals: { to: inbound_calls_path(:voicemail) }
335					}
336				end
337			end
338		end
339
340		r.on "outbound" do
341			r.on "calls" do
342				r.post "status" do
343					log.info "#{params['eventType']} #{params['callId']}", loggable_params
344					if params["eventType"] == "disconnect"
345						call_attempt_repo.ending_call(c, params["callId"])
346						CDR.for_outbound(params).save.catch(&method(:log_error))
347					end
348					"OK"
349				end
350
351				r.post do
352					from = params["from"].sub(/^\+1/, "")
353					customer_repo(
354						sgx_repo: Bwmsgsv2Repo.new
355					).find_by_format(from).then do |c|
356						call_attempt_repo.find_outbound(
357							c,
358							params["to"],
359							call_id: params["callId"],
360							digits: params["digits"]
361						).then do |ca|
362							r.json { ca.to_json }
363
364							call_attempt_repo.starting_call(c, params["callId"])
365							render(*ca.to_render)
366						end
367					end
368				end
369			end
370		end
371
372		r.on "ogm" do
373			r.post "start" do
374				render :record_ogm, locals: { customer_id: params["customer_id"] }
375			end
376
377			r.post do
378				jmp_media_url = params["mediaUrl"].sub(
379					/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
380					"https://jmp.chat"
381				)
382				ogm = OGMDownload.new(jmp_media_url)
383				ogm.download.then do
384					File.rename(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
385					File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
386					customer_repo.find(params["customer_id"]).then do |customer|
387						customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
388					end
389				end
390			end
391		end
392
393		r.public
394	end
395end
396# rubocop:enable Metrics/ClassLength