1# frozen_string_literal: true
2
3require "digest"
4require "fileutils"
5require "forwardable"
6require "multibases"
7require "multihashes"
8require "roda"
9require "sentry-ruby"
10require "thin"
11
12require_relative "lib/call_attempt_repo"
13require_relative "lib/cdr"
14require_relative "lib/cdr_repo"
15require_relative "lib/oob"
16require_relative "lib/rev_ai"
17require_relative "lib/roda_capture"
18require_relative "lib/roda_em_promise"
19require_relative "lib/rack_fiber"
20require_relative "lib/reachability_repo"
21
22class OGMDownload
23 def initialize(url)
24 @digest = Digest::SHA512.new
25 @f = Tempfile.open("ogm")
26 @req = EM::HttpRequest.new(url, tls: { verify_peer: true })
27 end
28
29 def download
30 http = @req.aget
31 http.stream do |chunk|
32 @digest << chunk
33 @f.write chunk
34 end
35 http.then { @f.close }.catch do |e|
36 @f.close!
37 EMPromise.reject(e)
38 end
39 end
40
41 def cid
42 Multibases.encode(
43 "base58btc",
44 [1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
45 ).pack.to_s
46 end
47
48 def path
49 @f.path
50 end
51end
52
53# rubocop:disable Metrics/ClassLength
54class Web < Roda
55 use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
56 use Sentry::Rack::CaptureExceptions
57 plugin :json_parser
58 plugin :type_routing
59 plugin :public
60 plugin :render, engine: "slim"
61 plugin RodaCapture
62 plugin RodaEMPromise # Must go last!
63
64 class << self
65 attr_reader :customer_repo, :log, :outbound_transfers
66
67 def run(log, *listen_on)
68 plugin :common_logger, log, method: :info
69 @outbound_transfers = {}
70 Thin::Logging.logger = log
71 Thin::Server.start(
72 *listen_on,
73 freeze.app,
74 signals: false
75 )
76 end
77 end
78
79 extend Forwardable
80 def_delegators :'self.class', :outbound_transfers
81 def_delegators :request, :params
82
83 def log
84 opts[:common_logger]
85 end
86
87 def log_error(e)
88 log.error(
89 "Error raised during #{request.fullpath}: #{e.class}",
90 e,
91 loggable_params
92 )
93 if e.is_a?(::Exception)
94 Sentry.capture_exception(e)
95 else
96 Sentry.capture_message(e.to_s)
97 end
98 end
99
100 def loggable_params
101 params.dup.tap do |p|
102 p.delete("to")
103 p.delete("from")
104 end
105 end
106
107 def customer_repo(**kwargs)
108 kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
109 opts[:customer_repo] || CustomerRepo.new(**kwargs)
110 end
111
112 def reachability_repo(**kwargs)
113 opts[:reachability_repo] || ReachabilityRepo::Voice.new(**kwargs)
114 end
115
116 def find_by_tel_with_fallback(sgx_repo:, **kwargs)
117 customer_repo(sgx_repo: sgx_repo).find_by_tel(params["to"]).catch { |e|
118 next EMPromise.reject(e) if e.is_a?(CustomerRepo::NotFound)
119
120 log_error(e)
121 customer_repo(
122 sgx_repo: TrivialBackendSgxRepo.new(**kwargs)
123 ).find_by_tel(params["to"])
124 }
125 end
126
127 def call_attempt_repo
128 opts[:call_attempt_repo] || CallAttemptRepo.new
129 end
130
131 def cdr_repo
132 opts[:cdr_repo] || CDRRepo.new
133 end
134
135 def rev_ai
136 RevAi.new(logger: log.child(loggable_params))
137 end
138
139 TEL_CANDIDATES = {
140 "Restricted" => "14",
141 "anonymous" => "15",
142 "Anonymous" => "16",
143 "unavailable" => "17",
144 "Unavailable" => "18"
145 }.freeze
146
147 def sanitize_tel_candidate(candidate)
148 if candidate.length < 3
149 "13;phone-context=anonymous.phone-context.soprani.ca"
150 elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
151 candidate
152 else
153 "#{TEL_CANDIDATES.fetch(candidate, '19')}" \
154 ";phone-context=anonymous.phone-context.soprani.ca"
155 end
156 end
157
158 def from_jid
159 Blather::JID.new(
160 sanitize_tel_candidate(params["from"]),
161 CONFIG[:component][:jid]
162 )
163 end
164
165 def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
166 [
167 "/inbound/calls/#{call_id || params['callId']}",
168 suffix
169 ].compact.join("/") +
170 (customer_id ? "?customer_id=#{customer_id}" : "")
171 end
172
173 def url(path)
174 "#{request.base_url}#{path}"
175 end
176
177 def modify_call(call_id)
178 body = Bandwidth::ApiModifyCallRequest.new
179 yield body
180 BANDWIDTH_VOICE.modify_call(
181 CONFIG[:creds][:account],
182 call_id,
183 body: body
184 )
185 rescue Bandwidth::APIException
186 # If call does not exist, don't need to hang up or send to voicemail
187 # Other side must have hung up already
188 raise $! unless [404, 409].include?($!.response_code)
189 end
190
191 def start_transcription(customer, call_id, media_url)
192 return unless customer.transcription_enabled
193
194 rev_ai.language_id(
195 media_url,
196 url(inbound_calls_path("voicemail/language_id", call_id: call_id)),
197 from_jid: from_jid,
198 customer_id: customer.customer_id
199 )
200 end
201
202 def call_inputs(customer, from, call_id)
203 EMPromise.all([
204 customer.customer_id, customer.fwd,
205 call_attempt_repo.find_inbound(customer, from, call_id: call_id)
206 ])
207 end
208
209 def json_call(customer, fwd, call_attempt)
210 raise "Not allowed" unless params["secret"] == CONFIG[:component][:secret]
211
212 customer.ogm(params["from"]).then do |ogm|
213 call_attempt.as_json.merge(
214 fwd: fwd,
215 ogm: ogm
216 ).to_json
217 end
218 end
219
220 def transfer_complete_url(customer_id, call_id, tries)
221 url(
222 inbound_calls_path(:transfer_complete, customer_id, call_id: call_id)
223 ) + (tries ? "&tries=#{tries}" : "")
224 end
225
226 def create_call(customer, from, call_id, application_id, tries: nil)
227 call_inputs(customer, from, call_id).then do |(customer_id, fwd, ca)|
228 request.json { json_call(customer, fwd, ca) }
229 ca.create_call(fwd, CONFIG[:creds][:account]) do |cc|
230 cc.from = from
231 cc.application_id = application_id
232 cc.answer_url = url inbound_calls_path(nil, customer_id)
233 cc.disconnect_url = transfer_complete_url(customer_id, call_id, tries)
234 end
235 end
236 end
237
238 def inbound_from
239 if params["from"] && params["from"] =~ /\A\+?\d+\Z/
240 params["from"]
241 else
242 log.info "Inbound call with unusual from: #{params['from']}"
243 TEL_CANDIDATES.fetch(params["from"], "19")
244 end
245 end
246
247 def hangup
248 request.json { {}.to_json }
249
250 render :hangup
251 end
252
253 route do |r|
254 r.get "healthcheck" do
255 "OK"
256 end
257
258 r.on "inbound" do
259 r.on "calls" do
260 r.post "status" do
261 if params["eventType"] == "disconnect"
262 if (outbound_leg = outbound_transfers.delete(params["callId"]))
263 modify_call(outbound_leg) do |call|
264 call.state = "completed"
265 end
266 end
267
268 customer_repo.find_by_tel(params["to"]).then do |customer|
269 cdr_repo.put(CDR.for_inbound(customer.customer_id, params))
270 end
271 end
272 "OK"
273 end
274
275 r.on :call_id do |call_id|
276 r.post "transfer_complete" do
277 outbound_leg = outbound_transfers.delete(call_id)
278 if params["cause"] == "hangup" && params["tag"] == "connected"
279 log.info "Normal hangup, now end #{call_id}", loggable_params
280 modify_call(call_id) { |call| call.state = "completed" }
281 elsif !outbound_leg
282 log.debug "Inbound disconnected", loggable_params
283 elsif params["cause"] == "error" && params["tries"].to_i < 15
284 log.info "2nd leg error, retry", loggable_params
285 customer_repo(
286 sgx_repo: Bwmsgsv2Repo.new
287 ).find(params["customer_id"]).then { |customer|
288 create_call(
289 customer, params["from"], call_id, params["applicationId"],
290 tries: params["tries"].to_i + 1
291 ).then { |call|
292 outbound_transfers[params["callId"]] = call
293 }.catch(&log.method(:error))
294 }
295 else
296 log.debug "Go to voicemail", loggable_params
297 modify_call(call_id) do |call|
298 call.redirect_url = url inbound_calls_path(:voicemail)
299 end
300 end
301 ""
302 end
303
304 r.on "voicemail" do
305 r.post "audio" do
306 duration = Time.parse(params["endTime"]) -
307 Time.parse(params["startTime"])
308 next "OK<5" unless duration > 5
309
310 jmp_media_url = params["mediaUrl"].sub(
311 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
312 "https://jmp.chat"
313 )
314
315 find_by_tel_with_fallback(
316 sgx_repo: Bwmsgsv2Repo.new,
317 transcription_enabled: false
318 ).then do |customer|
319 start_transcription(customer, call_id, jmp_media_url)
320
321 m = Blather::Stanza::Message.new
322 m.chat_state = nil
323 m.from = from_jid
324 m.subject = "New Voicemail"
325 m << OOB.new(jmp_media_url)
326 customer.stanza_to(m)
327
328 "OK"
329 end
330 end
331
332 r.post "language_id" do
333 rev_ai.language_id_result(params).then { |result|
334 rev_ai.stt(
335 result["top_language"],
336 result.dig("metadata", "media_url"),
337 url(inbound_calls_path(
338 "voicemail/transcription",
339 call_id: call_id
340 )),
341 **result["metadata"].transform_keys(&:to_sym)
342 ).then { "OK" }
343 }.catch_only(RevAi::Failed) { |e|
344 log_error(e)
345 "Failure logged"
346 }
347 end
348
349 r.post "transcription" do
350 rev_ai.stt_result(params, request.url).then { |result|
351 next "OK" if result["text"].to_s.empty?
352
353 customer_repo.find(
354 result.dig("metadata", "customer_id")
355 ).then do |customer|
356 m = Blather::Stanza::Message.new
357 m.chat_state = nil
358 m.from = result.dig("metadata", "from_jid")
359 m.subject = "Voicemail Transcription"
360 m.body = result["text"]
361 customer.stanza_to(m)
362
363 "OK"
364 end
365 }.catch_only(RevAi::Failed) { |e|
366 log_error(e)
367 "Failure logged"
368 }
369 end
370
371 r.post do
372 find_by_tel_with_fallback(
373 sgx_repo: Bwmsgsv2Repo.new,
374 ogm_url: nil
375 ).then { |c|
376 c.ogm(params["from"])
377 }.then { |ogm|
378 next hangup unless ogm
379
380 render :voicemail, locals: { ogm: ogm }
381 }.catch_only(CustomerRepo::NotFound) {
382 render "inbound/no_customer"
383 }
384 end
385 end
386
387 r.post do
388 customer_repo(
389 sgx_repo: Bwmsgsv2Repo.new
390 ).find(params.fetch("customer_id")).then do |customer|
391 call_attempt_repo.find_inbound(
392 customer,
393 params["from"],
394 call_id: call_id,
395 digits: params["digits"]
396 ).then { |ca| render(*ca.to_render) }
397 end
398 end
399 end
400
401 r.post do
402 customer_repo(
403 sgx_repo: Bwmsgsv2Repo.new
404 ).find_by_tel(params["to"]).then { |customer|
405 reachability_repo.find(customer, params["from"]).then do |reach|
406 reach.filter(if_yes: ->(_) { hangup }) do
407 create_call(
408 customer,
409 inbound_from,
410 params["callId"],
411 params["applicationId"]
412 ).then { |call|
413 next EMPromise.reject(:voicemail) unless call
414
415 outbound_transfers[params["callId"]] = call
416 render :ring, locals: { duration: 300 }
417 }
418 end
419 end
420 }.catch_only(CustomerFwd::InfiniteTimeout) { |e|
421 render :forward, locals: { fwd: e.fwd, from: params["from"] }
422 }.catch { |e|
423 log_error(e) unless e == :voicemail
424 r.json { { error: e.to_s }.to_json }
425 render :redirect, locals: { to: inbound_calls_path(:voicemail) }
426 }
427 end
428 end
429 end
430
431 r.on "outbound" do
432 r.on "calls" do
433 r.post "status" do
434 log.info "#{params['eventType']} #{params['callId']}", loggable_params
435 if params["eventType"] == "disconnect"
436 customer_id = params["from"].sub(/^(?:\+|c)/, "")
437 call_attempt_repo.ending_call(customer_id, params["callId"])
438 cdr_repo
439 .put(CDR.for_outbound(customer_id, params))
440 .catch(&method(:log_error))
441 end
442 "OK"
443 end
444
445 r.post do
446 from = params["from"].sub(/^(?:\+|c)/, "")
447 customer_repo(
448 sgx_repo: Bwmsgsv2Repo.new
449 ).find_by_format(from).then { |c|
450 call_attempt_repo.find_outbound(
451 c,
452 params["to"],
453 call_id: params["callId"],
454 digits: params["digits"]
455 ).then do |ca|
456 r.json { ca.to_json }
457
458 call_attempt_repo.starting_call(c, params["callId"])
459 render(*ca.to_render)
460 end
461 }.catch_only(CustomerRepo::NotFound) {
462 render "outbound/no_customer"
463 }
464 end
465 end
466 end
467
468 r.on "ogm" do
469 r.post "start" do
470 render :record_ogm, locals: { customer_id: params["customer_id"] }
471 end
472
473 r.post do
474 jmp_media_url = params["mediaUrl"].sub(
475 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
476 "https://jmp.chat"
477 )
478 ogm = OGMDownload.new(jmp_media_url)
479 ogm.download.then do
480 FileUtils.mv(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
481 File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
482 customer_repo.find(params["customer_id"]).then do |customer|
483 customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
484 end
485 end
486 end
487 end
488
489 r.public
490 end
491end
492# rubocop:enable Metrics/ClassLength