web.rb

  1# frozen_string_literal: true
  2
  3require "digest"
  4require "forwardable"
  5require "multibases"
  6require "multihashes"
  7require "roda"
  8require "thin"
  9require "sentry-ruby"
 10
 11require_relative "lib/call_attempt_repo"
 12require_relative "lib/cdr"
 13require_relative "lib/oob"
 14require_relative "lib/rev_ai"
 15require_relative "lib/roda_capture"
 16require_relative "lib/roda_em_promise"
 17require_relative "lib/rack_fiber"
 18
 19class OGMDownload
 20	def initialize(url)
 21		@digest = Digest::SHA512.new
 22		@f = Tempfile.open("ogm")
 23		@req = EM::HttpRequest.new(url, tls: { verify_peer: true })
 24	end
 25
 26	def download
 27		http = @req.aget
 28		http.stream do |chunk|
 29			@digest << chunk
 30			@f.write chunk
 31		end
 32		http.then { @f.close }.catch do |e|
 33			@f.close!
 34			EMPromise.reject(e)
 35		end
 36	end
 37
 38	def cid
 39		Multibases.encode(
 40			"base58btc",
 41			[1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
 42		).pack.to_s
 43	end
 44
 45	def path
 46		@f.path
 47	end
 48end
 49
 50# rubocop:disable Metrics/ClassLength
 51class Web < Roda
 52	use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
 53	use Sentry::Rack::CaptureExceptions
 54	plugin :json_parser
 55	plugin :type_routing
 56	plugin :public
 57	plugin :render, engine: "slim"
 58	plugin RodaCapture
 59	plugin RodaEMPromise # Must go last!
 60
 61	class << self
 62		attr_reader :customer_repo, :log, :outbound_transfers
 63
 64		def run(log, *listen_on)
 65			plugin :common_logger, log, method: :info
 66			@outbound_transfers = {}
 67			Thin::Logging.logger = log
 68			Thin::Server.start(
 69				*listen_on,
 70				freeze.app,
 71				signals: false
 72			)
 73		end
 74	end
 75
 76	extend Forwardable
 77	def_delegators :'self.class', :outbound_transfers
 78	def_delegators :request, :params
 79
 80	def log
 81		opts[:common_logger]
 82	end
 83
 84	def log_error(e)
 85		log.error(
 86			"Error raised during #{request.fullpath}: #{e.class}",
 87			e,
 88			loggable_params
 89		)
 90		if e.is_a?(::Exception)
 91			Sentry.capture_exception(e)
 92		else
 93			Sentry.capture_message(e.to_s)
 94		end
 95	end
 96
 97	def loggable_params
 98		params.dup.tap do |p|
 99			p.delete("to")
100			p.delete("from")
101		end
102	end
103
104	def customer_repo(**kwargs)
105		kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
106		opts[:customer_repo] || CustomerRepo.new(**kwargs)
107	end
108
109	def call_attempt_repo
110		opts[:call_attempt_repo] || CallAttemptRepo.new
111	end
112
113	def rev_ai
114		RevAi.new
115	end
116
117	TEL_CANDIDATES = {
118		"Restricted" => "14",
119		"anonymous" => "15",
120		"Anonymous" => "16",
121		"unavailable" => "17",
122		"Unavailable" => "18"
123	}.freeze
124
125	def sanitize_tel_candidate(candidate)
126		if candidate.length < 3
127			"13;phone-context=anonymous.phone-context.soprani.ca"
128		elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
129			candidate
130		else
131			"#{TEL_CANDIDATES.fetch(candidate, '19')}" \
132				";phone-context=anonymous.phone-context.soprani.ca"
133		end
134	end
135
136	def from_jid
137		Blather::JID.new(
138			sanitize_tel_candidate(params["from"]),
139			CONFIG[:component][:jid]
140		)
141	end
142
143	def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
144		[
145			"/inbound/calls/#{call_id || params['callId']}",
146			suffix
147		].compact.join("/") +
148			(customer_id ? "?customer_id=#{customer_id}" : "")
149	end
150
151	def url(path)
152		"#{request.base_url}#{path}"
153	end
154
155	def modify_call(call_id)
156		body = Bandwidth::ApiModifyCallRequest.new
157		yield body
158		BANDWIDTH_VOICE.modify_call(
159			CONFIG[:creds][:account],
160			call_id,
161			body: body
162		)
163	rescue Bandwidth::ApiErrorResponseException
164		# If call does not exist, don't need to hang up or send to voicemail
165		# Other side must have hung up already
166		raise $! unless $!.response_code.to_s == "404"
167	end
168
169	def start_transcription(customer, call_id, media_url)
170		return unless customer.transcription_enabled
171
172		rev_ai.language_id(
173			media_url,
174			url(inbound_calls_path("voicemail/language_id", call_id: call_id)),
175			from_jid: from_jid,
176			customer_id: customer.customer_id
177		)
178	end
179
180	route do |r|
181		r.on "inbound" do
182			r.on "calls" do
183				r.post "status" do
184					if params["eventType"] == "disconnect"
185						if (outbound_leg = outbound_transfers.delete(params["callId"]))
186							modify_call(outbound_leg) do |call|
187								call.state = "completed"
188							end
189						end
190
191						customer_repo.find_by_tel(params["to"]).then do |customer|
192							CDR.for_inbound(customer.customer_id, params).save
193						end
194					end
195					"OK"
196				end
197
198				r.on :call_id do |call_id|
199					r.post "transfer_complete" do
200						outbound_leg = outbound_transfers.delete(call_id)
201						if params["cause"] == "hangup" && params["tag"] == "connected"
202							log.info "Normal hangup, now end #{call_id}", loggable_params
203							modify_call(call_id) { |call| call.state = "completed" }
204						elsif !outbound_leg
205							log.debug "Inbound disconnected", loggable_params
206						else
207							log.debug "Go to voicemail", loggable_params
208							modify_call(call_id) do |call|
209								call.redirect_url = url inbound_calls_path(:voicemail)
210							end
211						end
212						""
213					end
214
215					r.on "voicemail" do
216						r.post "audio" do
217							duration = Time.parse(params["endTime"]) -
218							           Time.parse(params["startTime"])
219							next "OK<5" unless duration > 5
220
221							jmp_media_url = params["mediaUrl"].sub(
222								/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
223								"https://jmp.chat"
224							)
225
226							customer_repo(
227								sgx_repo: Bwmsgsv2Repo.new
228							).find_by_tel(params["to"]).then do |customer|
229								start_transcription(customer, call_id, jmp_media_url)
230
231								m = Blather::Stanza::Message.new
232								m.chat_state = nil
233								m.from = from_jid
234								m.subject = "New Voicemail"
235								m.body = jmp_media_url
236								m << OOB.new(jmp_media_url, desc: "Voicemail Recording")
237								customer.stanza_to(m)
238
239								"OK"
240							end
241						end
242
243						r.post "language_id" do
244							rev_ai.language_id_result(params).then do |result|
245								rev_ai.stt(
246									result["top_language"],
247									result.dig("metadata", "media_url"),
248									url(inbound_calls_path(
249										"voicemail/transcription",
250										call_id: call_id
251									)),
252									**result["metadata"].transform_keys(&:to_sym)
253								).then { "OK" }
254							end
255						end
256
257						r.post "transcription" do
258							rev_ai.stt_result(params).then do |result|
259								next "OK" if result["text"].empty?
260
261								customer_repo.find(
262									result.dig("metadata", "customer_id")
263								).then do |customer|
264									m = Blather::Stanza::Message.new
265									m.chat_state = nil
266									m.from = result.dig("metadata", "from_jid")
267									m.subject = "Voicemail Transcription"
268									m.body = result["text"]
269									customer.stanza_to(m)
270
271									"OK"
272								end
273							end
274						end
275
276						r.post do
277							customer_repo(sgx_repo: Bwmsgsv2Repo.new)
278								.find_by_tel(params["to"])
279								.then { |c|
280									EMPromise.all([c, c.ogm(params["from"])])
281								}.then do |(customer, ogm)|
282									render :voicemail, locals: {
283										ogm: ogm,
284										transcription_enabled: customer.transcription_enabled
285									}
286								end
287						end
288					end
289
290					r.post do
291						customer_repo(
292							sgx_repo: Bwmsgsv2Repo.new
293						).find(params.fetch("customer_id")).then do |customer|
294							call_attempt_repo.find_inbound(
295								customer,
296								params["from"],
297								call_id: call_id,
298								digits: params["digits"]
299							).then { |ca| render(*ca.to_render) }
300						end
301					end
302				end
303
304				r.post do
305					customer_repo(
306						sgx_repo: Bwmsgsv2Repo.new
307					).find_by_tel(params["to"]).then { |customer|
308						EMPromise.all([
309							customer.customer_id, customer.fwd,
310							call_attempt_repo.find_inbound(
311								customer, params["from"], call_id: params["callId"]
312							)
313						])
314					}.then { |(customer_id, fwd, ca)|
315						call = ca.create_call(fwd, CONFIG[:creds][:account]) { |cc|
316							cc.from = params["from"]
317							cc.application_id = params["applicationId"]
318							cc.answer_url = url inbound_calls_path(nil, customer_id)
319							cc.disconnect_url = url inbound_calls_path(:transfer_complete)
320						}
321
322						next EMPromise.reject(:voicemail) unless call
323
324						outbound_transfers[params["callId"]] = call
325						render :ring, locals: { duration: 300 }
326					}.catch { |e|
327						log_error(e) unless e == :voicemail
328						render :redirect, locals: { to: inbound_calls_path(:voicemail) }
329					}
330				end
331			end
332		end
333
334		r.on "outbound" do
335			r.on "calls" do
336				r.post "status" do
337					log.info "#{params['eventType']} #{params['callId']}", loggable_params
338					if params["eventType"] == "disconnect"
339						call_attempt_repo.ending_call(c, params["callId"])
340						CDR.for_outbound(params).save.catch(&method(:log_error))
341					end
342					"OK"
343				end
344
345				r.post do
346					from = params["from"].sub(/^\+1/, "")
347					customer_repo(
348						sgx_repo: Bwmsgsv2Repo.new
349					).find_by_format(from).then do |c|
350						call_attempt_repo.find_outbound(
351							c,
352							params["to"],
353							call_id: params["callId"],
354							digits: params["digits"]
355						).then do |ca|
356							r.json { ca.to_json }
357
358							call_attempt_repo.starting_call(c, params["callId"])
359							render(*ca.to_render)
360						end
361					end
362				end
363			end
364		end
365
366		r.on "ogm" do
367			r.post "start" do
368				render :record_ogm, locals: { customer_id: params["customer_id"] }
369			end
370
371			r.post do
372				jmp_media_url = params["mediaUrl"].sub(
373					/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
374					"https://jmp.chat"
375				)
376				ogm = OGMDownload.new(jmp_media_url)
377				ogm.download.then do
378					File.rename(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
379					File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
380					customer_repo.find(params["customer_id"]).then do |customer|
381						customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
382					end
383				end
384			end
385		end
386
387		r.public
388	end
389end
390# rubocop:enable Metrics/ClassLength