1# frozen_string_literal: true
2
3require "digest"
4require "fileutils"
5require "forwardable"
6require "multibases"
7require "multihashes"
8require "roda"
9require "sentry-ruby"
10require "thin"
11
12require_relative "lib/bandwidth_tn_order"
13require_relative "lib/call_attempt_repo"
14require_relative "lib/trust_level_repo"
15require_relative "lib/cdr"
16require_relative "lib/cdr_repo"
17require_relative "lib/oob"
18require_relative "lib/rev_ai"
19require_relative "lib/roda_capture"
20require_relative "lib/roda_em_promise"
21require_relative "lib/rack_fiber"
22require_relative "lib/reachability_repo"
23
24class OGMDownload
25 def initialize(url)
26 @digest = Digest::SHA512.new
27 @f = Tempfile.open("ogm")
28 @req = EM::HttpRequest.new(url, tls: { verify_peer: true })
29 end
30
31 def download
32 http = @req.aget
33 http.stream do |chunk|
34 @digest << chunk
35 @f.write chunk
36 end
37 http.then { @f.close }.catch do |e|
38 @f.close!
39 EMPromise.reject(e)
40 end
41 end
42
43 def cid
44 Multibases.encode(
45 "base58btc",
46 [1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
47 ).pack.to_s
48 end
49
50 def path
51 @f.path
52 end
53end
54
55# rubocop:disable Metrics/ClassLength
56class Web < Roda
57 use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
58 use Sentry::Rack::CaptureExceptions
59 plugin :json_parser
60 plugin :type_routing
61 plugin :public
62 plugin :render, engine: "slim"
63 plugin RodaCapture
64 plugin RodaEMPromise # Must go last!
65
66 class << self
67 attr_reader :customer_repo, :log, :outbound_transfers
68
69 def run(log, *listen_on)
70 plugin :common_logger, log, method: :info
71 @outbound_transfers = {}
72 Thin::Logging.logger = log
73 Thin::Server.start(
74 *listen_on,
75 freeze.app,
76 signals: false
77 )
78 end
79 end
80
81 extend Forwardable
82 def_delegators :'self.class', :outbound_transfers
83 def_delegators :request, :params
84
85 def log
86 opts[:common_logger]
87 end
88
89 def log_error(e)
90 log.error(
91 "Error raised during #{request.fullpath}: #{e.class}",
92 e,
93 loggable_params
94 )
95 if e.is_a?(::Exception)
96 Sentry.capture_exception(e)
97 else
98 Sentry.capture_message(e.to_s)
99 end
100 end
101
102 def loggable_params
103 params.dup.tap do |p|
104 p.delete("to")
105 p.delete("from")
106 end
107 end
108
109 def customer_repo(**kwargs)
110 kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
111 opts[:customer_repo] || CustomerRepo.new(**kwargs)
112 end
113
114 def trust_level_repo(**kwargs)
115 opts[:trust_level_repo] || TrustLevelRepo.new(**kwargs)
116 end
117
118 def reachability_repo(**kwargs)
119 opts[:reachability_repo] || ReachabilityRepo::Voice.new(**kwargs)
120 end
121
122 def find_by_tel_with_fallback(sgx_repo:, **kwargs)
123 customer_repo(sgx_repo: sgx_repo).find_by_tel(params["to"]).catch { |e|
124 next EMPromise.reject(e) if e.is_a?(CustomerRepo::NotFound)
125
126 log_error(e)
127 customer_repo(
128 sgx_repo: TrivialBackendSgxRepo.new(**kwargs)
129 ).find_by_tel(params["to"])
130 }
131 end
132
133 def call_attempt_repo
134 opts[:call_attempt_repo] || CallAttemptRepo.new
135 end
136
137 def cdr_repo
138 opts[:cdr_repo] || CDRRepo.new
139 end
140
141 def rev_ai
142 RevAi.new(logger: log.child(loggable_params))
143 end
144
145 TEL_CANDIDATES = {
146 "Restricted" => "14",
147 "anonymous" => "15",
148 "Anonymous" => "16",
149 "unavailable" => "17",
150 "Unavailable" => "18"
151 }.freeze
152
153 def sanitize_tel_candidate(candidate)
154 if candidate.length < 3
155 "13;phone-context=anonymous.phone-context.soprani.ca"
156 elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
157 candidate
158 else
159 "#{TEL_CANDIDATES.fetch(candidate, '19')}" \
160 ";phone-context=anonymous.phone-context.soprani.ca"
161 end
162 end
163
164 def from_jid
165 Blather::JID.new(
166 sanitize_tel_candidate(params["from"]),
167 CONFIG[:component][:jid]
168 )
169 end
170
171 def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
172 [
173 "/inbound/calls/#{call_id || params['callId']}",
174 suffix
175 ].compact.join("/") +
176 (customer_id ? "?customer_id=#{customer_id}" : "")
177 end
178
179 def url(path)
180 "#{request.base_url}#{path}"
181 end
182
183 def modify_call(call_id)
184 body = Bandwidth::UpdateCall.new
185 yield body
186 BANDWIDTH_VOICE.update_call(
187 CONFIG[:creds][:account],
188 call_id,
189 body,
190 {}
191 )
192 rescue Bandwidth::ApiError
193 # If call does not exist, don't need to hang up or send to voicemail
194 # Other side must have hung up already
195 raise $! unless [404, 409].include?($!.code)
196 end
197
198 def start_transcription(customer, call_id, media_url)
199 return unless customer.transcription_enabled
200
201 rev_ai.language_id(
202 media_url,
203 url(inbound_calls_path("voicemail/language_id", call_id: call_id)),
204 from_jid: from_jid,
205 customer_id: customer.customer_id
206 )
207 end
208
209 def call_inputs(customer, from, call_id)
210 EMPromise.all([
211 customer.customer_id, customer.fwd,
212 call_attempt_repo.find_inbound(customer, from, call_id: call_id)
213 ])
214 end
215
216 def json_call(customer, fwd, call_attempt)
217 raise "Not allowed" unless params["secret"] == CONFIG[:component][:secret]
218
219 customer.ogm(params["from"]).then do |ogm|
220 call_attempt.as_json.merge(
221 fwd: fwd,
222 ogm: ogm
223 ).to_json
224 end
225 end
226
227 def transfer_complete_url(customer_id, call_id, tries)
228 url(
229 inbound_calls_path(:transfer_complete, customer_id, call_id: call_id)
230 ) + (tries ? "&tries=#{tries}" : "")
231 end
232
233 def create_call(customer, from, call_id, application_id, tries: nil)
234 call_inputs(customer, from, call_id).then do |(customer_id, fwd, ca)|
235 request.json { json_call(customer, fwd, ca) }
236 ca.create_call(fwd, CONFIG[:creds][:account]) do |cc|
237 cc.from = from
238 cc.application_id = application_id
239 cc.answer_url = url inbound_calls_path(nil, customer_id)
240 cc.disconnect_url = transfer_complete_url(customer_id, call_id, tries)
241 end
242 end
243 end
244
245 def inbound_from
246 if params["from"] && params["from"] =~ /\A\+?\d+\Z/
247 params["from"]
248 else
249 log.info "Inbound call with unusual from: #{params['from']}"
250 TEL_CANDIDATES.fetch(params["from"], "19")
251 end
252 end
253
254 def hangup
255 request.json { {}.to_json }
256
257 render :hangup
258 end
259
260 route do |r|
261 r.get "healthcheck" do
262 "OK"
263 end
264
265 r.on "inbound" do
266 r.on "calls" do
267 r.post "status" do
268 if params["eventType"] == "disconnect"
269 if (outbound_leg = outbound_transfers.delete(params["callId"]))
270 modify_call(outbound_leg) do |call|
271 call.state = "completed"
272 end
273 end
274
275 customer_repo.find_by_tel(params["to"]).then do |customer|
276 cdr_repo.put(CDR.for_inbound(customer.customer_id, params))
277 end
278 end
279 "OK"
280 end
281
282 r.on :call_id do |call_id|
283 r.post "transfer_complete" do
284 outbound_leg = outbound_transfers.delete(call_id)
285 if params["cause"] == "hangup" && params["tag"] == "connected"
286 log.info "Normal hangup, now end #{call_id}", loggable_params
287 modify_call(call_id) { |call| call.state = "completed" }
288 elsif !outbound_leg
289 log.debug "Inbound disconnected", loggable_params
290 elsif params["cause"] == "error" && params["tries"].to_i < 15
291 log.info "2nd leg error, retry", loggable_params
292 customer_repo(
293 sgx_repo: Bwmsgsv2Repo.new
294 ).find(params["customer_id"]).then { |customer|
295 create_call(
296 customer, params["from"], call_id, params["applicationId"],
297 tries: params["tries"].to_i + 1
298 ).then { |call|
299 outbound_transfers[params["callId"]] = call
300 }.catch(&log.method(:error))
301 }
302 else
303 log.debug "Go to voicemail", loggable_params
304 modify_call(call_id) do |call|
305 call.redirect_url = url inbound_calls_path(:voicemail)
306 end
307 end
308 ""
309 end
310
311 r.on "voicemail" do
312 r.post "audio" do
313 duration = Time.parse(params["endTime"]) -
314 Time.parse(params["startTime"])
315 next "OK<5" unless duration > 5
316
317 jmp_media_url = params["mediaUrl"].sub(
318 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
319 "https://jmp.chat"
320 )
321
322 find_by_tel_with_fallback(
323 sgx_repo: Bwmsgsv2Repo.new,
324 transcription_enabled: false
325 ).then do |customer|
326 start_transcription(customer, call_id, jmp_media_url)
327
328 m = Blather::Stanza::Message.new
329 m.chat_state = nil
330 m.from = from_jid
331 m.subject = "New Voicemail"
332 m << OOB.new(jmp_media_url)
333 customer.stanza_to(m)
334
335 "OK"
336 end
337 end
338
339 r.post "language_id" do
340 rev_ai.language_id_result(params).then { |result|
341 rev_ai.stt(
342 result["top_language"],
343 result.dig("metadata", "media_url"),
344 url(inbound_calls_path(
345 "voicemail/transcription",
346 call_id: call_id
347 )),
348 **result["metadata"].transform_keys(&:to_sym)
349 ).then { "OK" }
350 }.catch_only(RevAi::Failed) { |e|
351 log_error(e)
352 "Failure logged"
353 }
354 end
355
356 r.post "transcription" do
357 rev_ai.stt_result(params, request.url).then { |result|
358 next "OK" if result["text"].to_s.empty?
359
360 customer_repo.find(
361 result.dig("metadata", "customer_id")
362 ).then do |customer|
363 m = Blather::Stanza::Message.new
364 m.chat_state = nil
365 m.from = result.dig("metadata", "from_jid")
366 m.subject = "Voicemail Transcription"
367 m.body = result["text"]
368 customer.stanza_to(m)
369
370 "OK"
371 end
372 }.catch_only(RevAi::Failed) { |e|
373 log_error(e)
374 "Failure logged"
375 }
376 end
377
378 r.post do
379 find_by_tel_with_fallback(
380 sgx_repo: Bwmsgsv2Repo.new,
381 ogm_url: nil
382 ).then { |c|
383 c.ogm(params["from"])
384 }.then { |ogm|
385 next hangup unless ogm
386
387 render :voicemail, locals: { ogm: ogm }
388 }.catch_only(CustomerRepo::NotFound) {
389 render "inbound/no_customer"
390 }
391 end
392 end
393
394 r.post do
395 customer_repo(
396 sgx_repo: Bwmsgsv2Repo.new
397 ).find(params.fetch("customer_id")).then do |customer|
398 call_attempt_repo.find_inbound(
399 customer,
400 params["from"],
401 call_id: call_id,
402 digits: params["digits"]
403 ).then { |ca| render(*ca.to_render) }
404 end
405 end
406 end
407
408 r.post do
409 customer_repo(
410 sgx_repo: Bwmsgsv2Repo.new
411 ).find_by_tel(params["to"]).then { |customer|
412 reachability_repo.find(customer, params["from"]).then do |reach|
413 reach.filter(if_yes: ->(_) { hangup }) do
414 create_call(
415 customer,
416 inbound_from,
417 params["callId"],
418 params["applicationId"]
419 ).then { |call|
420 next EMPromise.reject(:voicemail) unless call
421
422 outbound_transfers[params["callId"]] = call
423 render :ring, locals: { duration: 300 }
424 }
425 end
426 end
427 }.catch_only(CustomerFwd::InfiniteTimeout) { |e|
428 render :forward, locals: { fwd: e.fwd, from: params["from"] }
429 }.catch { |e|
430 log_error(e) unless e == :voicemail
431 r.json { { error: e.to_s }.to_json }
432 render :redirect, locals: { to: inbound_calls_path(:voicemail) }
433 }
434 end
435 end
436 end
437
438 r.on "outbound" do
439 r.on "calls" do
440 r.post "status" do
441 log.info "#{params['eventType']} #{params['callId']}", loggable_params
442 if params["eventType"] == "disconnect"
443 from = params["from"].sub(/^(?:\+|c)/, "")
444
445 customer_repo.find_by_format(from).then { |customer|
446 trust_level_repo.find(customer).then { |tl| [customer, tl] }
447 }.then { |(customer, trust_level)|
448 next "OK" unless trust_level.write_cdr?
449
450 customer_id = customer.customer_id
451 call_attempt_repo.ending_call(customer_id, params["callId"])
452 cdr_repo
453 .put(CDR.for_outbound(customer_id, params))
454 .catch(&method(:log_error))
455
456 "OK"
457 }
458 else
459 "OK"
460 end
461 end
462
463 r.post do
464 from = params["from"].sub(/^(?:\+|c)/, "")
465 customer_repo(
466 sgx_repo: Bwmsgsv2Repo.new
467 ).find_by_format(from).then { |c|
468 call_attempt_repo.find_outbound(
469 c,
470 params["to"],
471 call_id: params["callId"],
472 digits: params["digits"]
473 ).then do |ca|
474 r.json { ca.to_json }
475
476 call_attempt_repo.starting_call(c, params["callId"]).then do
477 render(*ca.to_render)
478 end
479 end
480 }.catch_only(CustomerRepo::NotFound) {
481 log.info "No customer for outbound call", params
482 render "outbound/no_customer"
483 }
484 end
485 end
486 end
487
488 r.on "ogm" do
489 r.post "start" do
490 render :record_ogm, locals: { customer_id: params["customer_id"] }
491 end
492
493 r.post do
494 jmp_media_url = params["mediaUrl"].sub(
495 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
496 "https://jmp.chat"
497 )
498 ogm = OGMDownload.new(jmp_media_url)
499 ogm.download.then do
500 FileUtils.mv(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
501 File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
502 customer_repo.find(params["customer_id"]).then do |customer|
503 customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
504 end
505 end
506 end
507 end
508
509 r.on "orders" do
510 token = r.env["HTTP_AUTHORIZATION"].to_s.sub(/\ABearer\s+/, "")
511
512 r.json do
513 if (orderer = CONFIG[:bulk_order_tokens][token.to_sym])
514 r.get :order_id do |order_id|
515 BandwidthTNOrder.get(order_id).then do |order|
516 if order.status == :complete
517 Transaction.new(
518 customer_id: orderer[:customer_id],
519 transaction_id: order.id,
520 amount: order.tels.length * -1.75,
521 note: "Bulk order",
522 ignore_duplicate: true
523 ).insert.then do
524 {
525 id: order.id,
526 status: order.status,
527 tels: order.tels
528 }.to_json
529 end
530 else
531 { id: order.id, status: order.status }.to_json
532 end
533 end
534 end
535
536 r.on "tels" do
537 r.on :tel, method: :delete do |tel|
538 tn_repo = BandwidthTnRepo.new
539 tn = tn_repo.find(tel)
540 if tn&.dig(:sip_peer, :peer_id).to_s == orderer[:peer_id]
541 tn_repo.disconnect(tel, orderer[:customer_id])
542 { status: "disconnected" }.to_json
543 else
544 response.status = 401
545 { error: "Number not found" }.to_json
546 end
547 end
548 end
549
550 r.post do
551 customer_repo.find(orderer[:customer_id]).then do |customer|
552 if customer.balance >= 1.75 * params["quantity"].to_i
553 BandwidthTNOrder.create_custom(
554 name: "Bulk order",
555 customer_order_id: orderer[:customer_id],
556 peer_id: orderer[:peer_id],
557 state_search_and_order_type: {
558 quantity: params["quantity"].to_i,
559 state: ["WA", "CA", "TX", "IL", "NY", "FL"].sample
560 }
561 ).then do |order|
562 { id: order.id }.to_json
563 end
564 else
565 response.status = 402
566 { error: "Balance too low" }.to_json
567 end
568 end
569 end
570 else
571 response.status = 401
572 { error: "Bad token" }.to_json
573 end
574 end
575 end
576
577 r.public
578 end
579end
580# rubocop:enable Metrics/ClassLength