web.rb

  1# frozen_string_literal: true
  2
  3require "digest"
  4require "fileutils"
  5require "forwardable"
  6require "multibases"
  7require "multihashes"
  8require "roda"
  9require "sentry-ruby"
 10require "thin"
 11
 12require_relative "lib/bandwidth_tn_order"
 13require_relative "lib/call_attempt_repo"
 14require_relative "lib/trust_level_repo"
 15require_relative "lib/cdr"
 16require_relative "lib/cdr_repo"
 17require_relative "lib/oob"
 18require_relative "lib/rev_ai"
 19require_relative "lib/roda_capture"
 20require_relative "lib/roda_em_promise"
 21require_relative "lib/rack_fiber"
 22require_relative "lib/reachability_repo"
 23
 24class OGMDownload
 25	def initialize(url)
 26		@digest = Digest::SHA512.new
 27		@f = Tempfile.open("ogm")
 28		@req = EM::HttpRequest.new(url, tls: { verify_peer: true })
 29	end
 30
 31	def download
 32		http = @req.aget
 33		http.stream do |chunk|
 34			@digest << chunk
 35			@f.write chunk
 36		end
 37		http.then { @f.close }.catch do |e|
 38			@f.close!
 39			EMPromise.reject(e)
 40		end
 41	end
 42
 43	def cid
 44		Multibases.encode(
 45			"base58btc",
 46			[1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
 47		).pack.to_s
 48	end
 49
 50	def path
 51		@f.path
 52	end
 53end
 54
 55# rubocop:disable Metrics/ClassLength
 56class Web < Roda
 57	use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
 58	use Sentry::Rack::CaptureExceptions
 59	plugin :json_parser
 60	plugin :type_routing
 61	plugin :public
 62	plugin :render, engine: "slim"
 63	plugin RodaCapture
 64	plugin RodaEMPromise # Must go last!
 65
 66	class << self
 67		attr_reader :customer_repo, :log, :outbound_transfers
 68
 69		def run(log, *listen_on)
 70			plugin :common_logger, log, method: :info
 71			@outbound_transfers = {}
 72			Thin::Logging.logger = log
 73			Thin::Server.start(
 74				*listen_on,
 75				freeze.app,
 76				signals: false
 77			)
 78		end
 79	end
 80
 81	extend Forwardable
 82	def_delegators :'self.class', :outbound_transfers
 83	def_delegators :request, :params
 84
 85	def log
 86		opts[:common_logger]
 87	end
 88
 89	def log_error(e)
 90		log.error(
 91			"Error raised during #{request.fullpath}: #{e.class}",
 92			e,
 93			loggable_params
 94		)
 95		if e.is_a?(::Exception)
 96			Sentry.capture_exception(e)
 97		else
 98			Sentry.capture_message(e.to_s)
 99		end
100	end
101
102	def loggable_params
103		params.dup.tap do |p|
104			p.delete("to")
105			p.delete("from")
106		end
107	end
108
109	def customer_repo(**kwargs)
110		kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
111		opts[:customer_repo] || CustomerRepo.new(**kwargs)
112	end
113
114	def trust_level_repo(**kwargs)
115		opts[:trust_level_repo] || TrustLevelRepo.new(**kwargs)
116	end
117
118	def reachability_repo(**kwargs)
119		opts[:reachability_repo] || ReachabilityRepo::Voice.new(**kwargs)
120	end
121
122	def find_by_tel_with_fallback(sgx_repo:, **kwargs)
123		customer_repo(sgx_repo: sgx_repo).find_by_tel(params["to"]).catch { |e|
124			next EMPromise.reject(e) if e.is_a?(CustomerRepo::NotFound)
125
126			log_error(e)
127			customer_repo(
128				sgx_repo: TrivialBackendSgxRepo.new(**kwargs)
129			).find_by_tel(params["to"])
130		}
131	end
132
133	def call_attempt_repo
134		opts[:call_attempt_repo] || CallAttemptRepo.new
135	end
136
137	def cdr_repo
138		opts[:cdr_repo] || CDRRepo.new
139	end
140
141	def rev_ai
142		RevAi.new(logger: log.child(loggable_params))
143	end
144
145	TEL_CANDIDATES = {
146		"Restricted" => "14",
147		"anonymous" => "15",
148		"Anonymous" => "16",
149		"unavailable" => "17",
150		"Unavailable" => "18"
151	}.freeze
152
153	def sanitize_tel_candidate(candidate)
154		if candidate.length < 3
155			"13;phone-context=anonymous.phone-context.soprani.ca"
156		elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
157			candidate
158		else
159			"#{TEL_CANDIDATES.fetch(candidate, '19')}" \
160				";phone-context=anonymous.phone-context.soprani.ca"
161		end
162	end
163
164	def from_jid
165		Blather::JID.new(
166			sanitize_tel_candidate(params["from"]),
167			CONFIG[:component][:jid]
168		)
169	end
170
171	def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
172		[
173			"/inbound/calls/#{call_id || params['callId']}",
174			suffix
175		].compact.join("/") +
176			(customer_id ? "?customer_id=#{customer_id}" : "")
177	end
178
179	def url(path)
180		"#{request.base_url}#{path}"
181	end
182
183	def modify_call(call_id)
184		body = Bandwidth::UpdateCall.new
185		yield body
186		BANDWIDTH_VOICE.update_call(
187			CONFIG[:creds][:account],
188			call_id,
189			body,
190			{}
191		)
192	rescue Bandwidth::ApiError
193		# If call does not exist, don't need to hang up or send to voicemail
194		# Other side must have hung up already
195		raise $! unless [404, 409].include?($!.code)
196	end
197
198	def start_transcription(customer, call_id, media_url)
199		return unless customer.transcription_enabled
200
201		rev_ai.language_id(
202			media_url,
203			url(inbound_calls_path("voicemail/language_id", call_id: call_id)),
204			from_jid: from_jid,
205			customer_id: customer.customer_id
206		)
207	end
208
209	def call_inputs(customer, from, call_id)
210		EMPromise.all([
211			customer.customer_id, customer.fwd,
212			call_attempt_repo.find_inbound(customer, from, call_id: call_id)
213		])
214	end
215
216	def json_call(customer, fwd, call_attempt)
217		raise "Not allowed" unless params["secret"] == CONFIG[:component][:secret]
218
219		customer.ogm(params["from"]).then do |ogm|
220			call_attempt.as_json.merge(
221				fwd: fwd,
222				ogm: ogm
223			).to_json
224		end
225	end
226
227	def transfer_complete_url(customer_id, call_id, tries)
228		url(
229			inbound_calls_path(:transfer_complete, customer_id, call_id: call_id)
230		) + (tries ? "&tries=#{tries}" : "")
231	end
232
233	def create_call(customer, from, call_id, application_id, tries: nil)
234		call_inputs(customer, from, call_id).then do |(customer_id, fwd, ca)|
235			request.json { json_call(customer, fwd, ca) }
236			ca.create_call(fwd, CONFIG[:creds][:account]) do |cc|
237				cc.from = from
238				cc.application_id = application_id
239				cc.answer_url = url inbound_calls_path(nil, customer_id)
240				cc.disconnect_url = transfer_complete_url(customer_id, call_id, tries)
241			end
242		end
243	end
244
245	def inbound_from
246		if params["from"] && params["from"] =~ /\A\+?\d+\Z/
247			params["from"]
248		else
249			log.info "Inbound call with unusual from: #{params['from']}"
250			TEL_CANDIDATES.fetch(params["from"], "19")
251		end
252	end
253
254	def hangup
255		request.json { {}.to_json }
256
257		render :hangup
258	end
259
260	route do |r|
261		r.get "healthcheck" do
262			"OK"
263		end
264
265		r.on "inbound" do
266			r.on "calls" do
267				r.post "status" do
268					if params["eventType"] == "disconnect"
269						if (outbound_leg = outbound_transfers.delete(params["callId"]))
270							modify_call(outbound_leg) do |call|
271								call.state = "completed"
272							end
273						end
274
275						customer_repo.find_by_tel(params["to"]).then do |customer|
276							cdr_repo.put(CDR.for_inbound(customer.customer_id, params))
277						end
278					end
279					"OK"
280				end
281
282				r.on :call_id do |call_id|
283					r.post "transfer_complete" do
284						outbound_leg = outbound_transfers.delete(call_id)
285						if params["cause"] == "hangup" && params["tag"] == "connected"
286							log.info "Normal hangup, now end #{call_id}", loggable_params
287							modify_call(call_id) { |call| call.state = "completed" }
288						elsif !outbound_leg
289							log.debug "Inbound disconnected", loggable_params
290						elsif params["cause"] == "error" && params["tries"].to_i < 15
291							log.info "2nd leg error, retry", loggable_params
292							customer_repo(
293								sgx_repo: Bwmsgsv2Repo.new
294							).find(params["customer_id"]).then { |customer|
295								create_call(
296									customer, params["from"], call_id, params["applicationId"],
297									tries: params["tries"].to_i + 1
298								).then { |call|
299									outbound_transfers[params["callId"]] = call
300								}.catch(&log.method(:error))
301							}
302						else
303							log.debug "Go to voicemail", loggable_params
304							modify_call(call_id) do |call|
305								call.redirect_url = url inbound_calls_path(:voicemail)
306							end
307						end
308						""
309					end
310
311					r.on "voicemail" do
312						r.post "audio" do
313							duration = Time.parse(params["endTime"]) -
314							           Time.parse(params["startTime"])
315							next "OK<5" unless duration > 5
316
317							jmp_media_url = params["mediaUrl"].sub(
318								/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
319								"https://jmp.chat"
320							)
321
322							find_by_tel_with_fallback(
323								sgx_repo: Bwmsgsv2Repo.new,
324								transcription_enabled: false
325							).then do |customer|
326								start_transcription(customer, call_id, jmp_media_url)
327
328								m = Blather::Stanza::Message.new
329								m.chat_state = nil
330								m.from = from_jid
331								m.subject = "New Voicemail"
332								m << OOB.new(jmp_media_url)
333								customer.stanza_to(m)
334
335								"OK"
336							end
337						end
338
339						r.post "language_id" do
340							rev_ai.language_id_result(params).then { |result|
341								rev_ai.stt(
342									result["top_language"],
343									result.dig("metadata", "media_url"),
344									url(inbound_calls_path(
345										"voicemail/transcription",
346										call_id: call_id
347									)),
348									**result["metadata"].transform_keys(&:to_sym)
349								).then { "OK" }
350							}.catch_only(RevAi::Failed) { |e|
351								log_error(e)
352								"Failure logged"
353							}
354						end
355
356						r.post "transcription" do
357							rev_ai.stt_result(params, request.url).then { |result|
358								next "OK" if result["text"].to_s.empty?
359
360								customer_repo.find(
361									result.dig("metadata", "customer_id")
362								).then do |customer|
363									m = Blather::Stanza::Message.new
364									m.chat_state = nil
365									m.from = result.dig("metadata", "from_jid")
366									m.subject = "Voicemail Transcription"
367									m.body = result["text"]
368									customer.stanza_to(m)
369
370									"OK"
371								end
372							}.catch_only(RevAi::Failed) { |e|
373								log_error(e)
374								"Failure logged"
375							}
376						end
377
378						r.post do
379							find_by_tel_with_fallback(
380								sgx_repo: Bwmsgsv2Repo.new,
381								ogm_url: nil
382							).then { |c|
383								c.ogm(params["from"])
384							}.then { |ogm|
385								next hangup unless ogm
386
387								render :voicemail, locals: { ogm: ogm }
388							}.catch_only(CustomerRepo::NotFound) {
389								render "inbound/no_customer"
390							}
391						end
392					end
393
394					r.post do
395						customer_repo(
396							sgx_repo: Bwmsgsv2Repo.new
397						).find(params.fetch("customer_id")).then do |customer|
398							call_attempt_repo.find_inbound(
399								customer,
400								params["from"],
401								call_id: call_id,
402								digits: params["digits"]
403							).then { |ca| render(*ca.to_render) }
404						end
405					end
406				end
407
408				r.post do
409					customer_repo(
410						sgx_repo: Bwmsgsv2Repo.new
411					).find_by_tel(params["to"]).then { |customer|
412						reachability_repo.find(customer, params["from"]).then do |reach|
413							reach.filter(if_yes: ->(_) { hangup }) do
414								create_call(
415									customer,
416									inbound_from,
417									params["callId"],
418									params["applicationId"]
419								).then { |call|
420									next EMPromise.reject(:voicemail) unless call
421
422									outbound_transfers[params["callId"]] = call
423									render :ring, locals: { duration: 300 }
424								}
425							end
426						end
427					}.catch_only(CustomerFwd::InfiniteTimeout) { |e|
428						render :forward, locals: { fwd: e.fwd, from: params["from"] }
429					}.catch { |e|
430						log_error(e) unless e == :voicemail
431						r.json { { error: e.to_s }.to_json }
432						render :redirect, locals: { to: inbound_calls_path(:voicemail) }
433					}
434				end
435			end
436		end
437
438		r.on "outbound" do
439			r.on "calls" do
440				r.post "status" do
441					log.info "#{params['eventType']} #{params['callId']}", loggable_params
442					if params["eventType"] == "disconnect"
443						from = params["from"].sub(/^(?:\+|c)/, "")
444
445						customer_repo.find_by_format(from).then { |customer|
446							trust_level_repo.find(customer).then { |tl| [customer, tl] }
447						}.then { |(customer, trust_level)|
448							next "OK" unless trust_level.write_cdr?
449
450							customer_id = customer.customer_id
451							call_attempt_repo.ending_call(customer_id, params["callId"])
452							cdr_repo
453								.put(CDR.for_outbound(customer_id, params))
454								.catch(&method(:log_error))
455
456							"OK"
457						}
458					else
459						"OK"
460					end
461				end
462
463				r.post do
464					from = params["from"].sub(/^(?:\+|c)/, "")
465					customer_repo(
466						sgx_repo: Bwmsgsv2Repo.new
467					).find_by_format(from).then { |c|
468						call_attempt_repo.find_outbound(
469							c,
470							params["to"],
471							call_id: params["callId"],
472							digits: params["digits"]
473						).then do |ca|
474							r.json { ca.to_json }
475
476							call_attempt_repo.starting_call(c, params["callId"]).then do
477								render(*ca.to_render)
478							end
479						end
480					}.catch_only(CustomerRepo::NotFound) {
481						log.info "No customer for outbound call", params
482						render "outbound/no_customer"
483					}
484				end
485			end
486		end
487
488		r.on "ogm" do
489			r.post "start" do
490				render :record_ogm, locals: { customer_id: params["customer_id"] }
491			end
492
493			r.post do
494				jmp_media_url = params["mediaUrl"].sub(
495					/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
496					"https://jmp.chat"
497				)
498				ogm = OGMDownload.new(jmp_media_url)
499				ogm.download.then do
500					FileUtils.mv(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
501					File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
502					customer_repo.find(params["customer_id"]).then do |customer|
503						customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
504					end
505				end
506			end
507		end
508
509		r.on "orders" do
510			token = r.env["HTTP_AUTHORIZATION"].to_s.sub(/\ABearer\s+/, "")
511
512			r.json do
513				if (orderer = CONFIG[:bulk_order_tokens][token.to_sym])
514					r.get :order_id do |order_id|
515						BandwidthTNOrder.get(order_id).then do |order|
516							if order.status == :complete
517								Transaction.new(
518									customer_id: orderer[:customer_id],
519									transaction_id: order.id,
520									amount: order.tels.length * -1.75,
521									note: "Bulk order",
522									ignore_duplicate: true
523								).insert.then do
524									{
525										id: order.id,
526										status: order.status,
527										tels: order.tels
528									}.to_json
529								end
530							else
531								{ id: order.id, status: order.status }.to_json
532							end
533						end
534					end
535
536					r.on "tels" do
537						r.on :tel, method: :delete do |tel|
538							tn_repo = BandwidthTnRepo.new
539							tn = tn_repo.find(tel)
540							if tn&.dig(:sip_peer, :peer_id).to_s == orderer[:peer_id]
541								tn_repo.disconnect(tel, orderer[:customer_id])
542								{ status: "disconnected" }.to_json
543							else
544								response.status = 401
545								{ error: "Number not found" }.to_json
546							end
547						end
548					end
549
550					r.post do
551						customer_repo.find(orderer[:customer_id]).then do |customer|
552							if customer.balance >= 1.75 * params["quantity"].to_i
553								BandwidthTNOrder.create_custom(
554									name: "Bulk order",
555									customer_order_id: orderer[:customer_id],
556									peer_id: orderer[:peer_id],
557									state_search_and_order_type: {
558										quantity: params["quantity"].to_i,
559										state: ["WA", "CA", "TX", "IL", "NY", "FL"].sample
560									}
561								).then do |order|
562									{ id: order.id }.to_json
563								end
564							else
565								response.status = 402
566								{ error: "Balance too low" }.to_json
567							end
568						end
569					end
570				else
571					response.status = 401
572					{ error: "Bad token" }.to_json
573				end
574			end
575		end
576
577		r.public
578	end
579end
580# rubocop:enable Metrics/ClassLength