1# frozen_string_literal: true
2
3require "digest"
4require "forwardable"
5require "multibases"
6require "multihashes"
7require "roda"
8require "thin"
9require "sentry-ruby"
10
11require_relative "lib/call_attempt_repo"
12require_relative "lib/cdr"
13require_relative "lib/cdr_repo"
14require_relative "lib/oob"
15require_relative "lib/rev_ai"
16require_relative "lib/roda_capture"
17require_relative "lib/roda_em_promise"
18require_relative "lib/rack_fiber"
19
20class OGMDownload
21 def initialize(url)
22 @digest = Digest::SHA512.new
23 @f = Tempfile.open("ogm")
24 @req = EM::HttpRequest.new(url, tls: { verify_peer: true })
25 end
26
27 def download
28 http = @req.aget
29 http.stream do |chunk|
30 @digest << chunk
31 @f.write chunk
32 end
33 http.then { @f.close }.catch do |e|
34 @f.close!
35 EMPromise.reject(e)
36 end
37 end
38
39 def cid
40 Multibases.encode(
41 "base58btc",
42 [1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
43 ).pack.to_s
44 end
45
46 def path
47 @f.path
48 end
49end
50
51# rubocop:disable Metrics/ClassLength
52class Web < Roda
53 use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
54 use Sentry::Rack::CaptureExceptions
55 plugin :json_parser
56 plugin :type_routing
57 plugin :public
58 plugin :render, engine: "slim"
59 plugin RodaCapture
60 plugin RodaEMPromise # Must go last!
61
62 class << self
63 attr_reader :customer_repo, :log, :outbound_transfers
64
65 def run(log, *listen_on)
66 plugin :common_logger, log, method: :info
67 @outbound_transfers = {}
68 Thin::Logging.logger = log
69 Thin::Server.start(
70 *listen_on,
71 freeze.app,
72 signals: false
73 )
74 end
75 end
76
77 extend Forwardable
78 def_delegators :'self.class', :outbound_transfers
79 def_delegators :request, :params
80
81 def log
82 opts[:common_logger]
83 end
84
85 def log_error(e)
86 log.error(
87 "Error raised during #{request.fullpath}: #{e.class}",
88 e,
89 loggable_params
90 )
91 if e.is_a?(::Exception)
92 Sentry.capture_exception(e)
93 else
94 Sentry.capture_message(e.to_s)
95 end
96 end
97
98 def loggable_params
99 params.dup.tap do |p|
100 p.delete("to")
101 p.delete("from")
102 end
103 end
104
105 def customer_repo(**kwargs)
106 kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
107 opts[:customer_repo] || CustomerRepo.new(**kwargs)
108 end
109
110 def find_by_tel_with_fallback(sgx_repo:, **kwargs)
111 customer_repo(sgx_repo: sgx_repo).find_by_tel(params["to"]).catch { |e|
112 next EMPromise.reject(e) if e.is_a?(CustomerRepo::NotFound)
113
114 log_error(e)
115 customer_repo(
116 sgx_repo: TrivialBackendSgxRepo.new(**kwargs)
117 ).find_by_tel(params["to"])
118 }
119 end
120
121 def call_attempt_repo
122 opts[:call_attempt_repo] || CallAttemptRepo.new
123 end
124
125 def cdr_repo
126 opts[:cdr_repo] || CDRRepo.new
127 end
128
129 def rev_ai
130 RevAi.new(logger: log.child(loggable_params))
131 end
132
133 TEL_CANDIDATES = {
134 "Restricted" => "14",
135 "anonymous" => "15",
136 "Anonymous" => "16",
137 "unavailable" => "17",
138 "Unavailable" => "18"
139 }.freeze
140
141 def sanitize_tel_candidate(candidate)
142 if candidate.length < 3
143 "13;phone-context=anonymous.phone-context.soprani.ca"
144 elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
145 candidate
146 else
147 "#{TEL_CANDIDATES.fetch(candidate, '19')}" \
148 ";phone-context=anonymous.phone-context.soprani.ca"
149 end
150 end
151
152 def from_jid
153 Blather::JID.new(
154 sanitize_tel_candidate(params["from"]),
155 CONFIG[:component][:jid]
156 )
157 end
158
159 def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
160 [
161 "/inbound/calls/#{call_id || params['callId']}",
162 suffix
163 ].compact.join("/") +
164 (customer_id ? "?customer_id=#{customer_id}" : "")
165 end
166
167 def url(path)
168 "#{request.base_url}#{path}"
169 end
170
171 def modify_call(call_id)
172 body = Bandwidth::ApiModifyCallRequest.new
173 yield body
174 BANDWIDTH_VOICE.modify_call(
175 CONFIG[:creds][:account],
176 call_id,
177 body: body
178 )
179 rescue Bandwidth::ApiErrorResponseException
180 # If call does not exist, don't need to hang up or send to voicemail
181 # Other side must have hung up already
182 raise $! unless $!.response_code.to_s == "404"
183 end
184
185 def start_transcription(customer, call_id, media_url)
186 return unless customer.transcription_enabled
187
188 rev_ai.language_id(
189 media_url,
190 url(inbound_calls_path("voicemail/language_id", call_id: call_id)),
191 from_jid: from_jid,
192 customer_id: customer.customer_id
193 )
194 end
195
196 route do |r|
197 r.on "inbound" do
198 r.on "calls" do
199 r.post "status" do
200 if params["eventType"] == "disconnect"
201 if (outbound_leg = outbound_transfers.delete(params["callId"]))
202 modify_call(outbound_leg) do |call|
203 call.state = "completed"
204 end
205 end
206
207 customer_repo.find_by_tel(params["to"]).then do |customer|
208 cdr_repo.put(CDR.for_inbound(customer.customer_id, params))
209 end
210 end
211 "OK"
212 end
213
214 r.on :call_id do |call_id|
215 r.post "transfer_complete" do
216 outbound_leg = outbound_transfers.delete(call_id)
217 if params["cause"] == "hangup" && params["tag"] == "connected"
218 log.info "Normal hangup, now end #{call_id}", loggable_params
219 modify_call(call_id) { |call| call.state = "completed" }
220 elsif !outbound_leg
221 log.debug "Inbound disconnected", loggable_params
222 else
223 log.debug "Go to voicemail", loggable_params
224 modify_call(call_id) do |call|
225 call.redirect_url = url inbound_calls_path(:voicemail)
226 end
227 end
228 ""
229 end
230
231 r.on "voicemail" do
232 r.post "audio" do
233 duration = Time.parse(params["endTime"]) -
234 Time.parse(params["startTime"])
235 next "OK<5" unless duration > 5
236
237 jmp_media_url = params["mediaUrl"].sub(
238 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
239 "https://jmp.chat"
240 )
241
242 find_by_tel_with_fallback(
243 sgx_repo: Bwmsgsv2Repo.new,
244 transcription_enabled: false
245 ).then do |customer|
246 start_transcription(customer, call_id, jmp_media_url)
247
248 m = Blather::Stanza::Message.new
249 m.chat_state = nil
250 m.from = from_jid
251 m.subject = "New Voicemail"
252 m.body = jmp_media_url
253 m << OOB.new(jmp_media_url, desc: "Voicemail Recording")
254 customer.stanza_to(m)
255
256 "OK"
257 end
258 end
259
260 r.post "language_id" do
261 rev_ai.language_id_result(params).then { |result|
262 rev_ai.stt(
263 result["top_language"],
264 result.dig("metadata", "media_url"),
265 url(inbound_calls_path(
266 "voicemail/transcription",
267 call_id: call_id
268 )),
269 **result["metadata"].transform_keys(&:to_sym)
270 ).then { "OK" }
271 }.catch_only(RevAi::Failed) { |e|
272 log_error(e)
273 "Failure logged"
274 }
275 end
276
277 r.post "transcription" do
278 rev_ai.stt_result(params, request.url).then { |result|
279 next "OK" if result["text"].to_s.empty?
280
281 customer_repo.find(
282 result.dig("metadata", "customer_id")
283 ).then do |customer|
284 m = Blather::Stanza::Message.new
285 m.chat_state = nil
286 m.from = result.dig("metadata", "from_jid")
287 m.subject = "Voicemail Transcription"
288 m.body = result["text"]
289 customer.stanza_to(m)
290
291 "OK"
292 end
293 }.catch_only(RevAi::Failed) { |e|
294 log_error(e)
295 "Failure logged"
296 }
297 end
298
299 r.post do
300 find_by_tel_with_fallback(
301 sgx_repo: Bwmsgsv2Repo.new,
302 ogm_url: nil
303 ).then { |c|
304 c.ogm(params["from"]) if c.fwd.voicemail_enabled?
305 }.then { |ogm|
306 next render :hangup unless ogm
307
308 render :voicemail, locals: { ogm: ogm }
309 }.catch_only(CustomerRepo::NotFound) {
310 render "inbound/no_customer"
311 }
312 end
313 end
314
315 r.post do
316 customer_repo(
317 sgx_repo: Bwmsgsv2Repo.new
318 ).find(params.fetch("customer_id")).then do |customer|
319 call_attempt_repo.find_inbound(
320 customer,
321 params["from"],
322 call_id: call_id,
323 digits: params["digits"]
324 ).then { |ca| render(*ca.to_render) }
325 end
326 end
327 end
328
329 r.post do
330 customer_repo(
331 sgx_repo: Bwmsgsv2Repo.new
332 ).find_by_tel(params["to"]).then { |customer|
333 EMPromise.all([
334 customer.customer_id, customer.fwd,
335 call_attempt_repo.find_inbound(
336 customer, params["from"], call_id: params["callId"]
337 )
338 ])
339 }.then { |(customer_id, fwd, ca)|
340 call = ca.create_call(fwd, CONFIG[:creds][:account]) { |cc|
341 cc.from = params["from"]
342 cc.application_id = params["applicationId"]
343 cc.answer_url = url inbound_calls_path(nil, customer_id)
344 cc.disconnect_url = url inbound_calls_path(:transfer_complete)
345 }
346
347 next EMPromise.reject(:voicemail) unless call
348
349 outbound_transfers[params["callId"]] = call
350 render :ring, locals: { duration: 300 }
351 }.catch_only(CustomerFwd::InfiniteTimeout) { |e|
352 render :forward, locals: { fwd: e.fwd, from: params["from"] }
353 }.catch { |e|
354 log_error(e) unless e == :voicemail
355 render :redirect, locals: { to: inbound_calls_path(:voicemail) }
356 }
357 end
358 end
359 end
360
361 r.on "outbound" do
362 r.on "calls" do
363 r.post "status" do
364 log.info "#{params['eventType']} #{params['callId']}", loggable_params
365 if params["eventType"] == "disconnect"
366 call_attempt_repo.ending_call(c, params["callId"])
367 cdr_repo.put(CDR.for_outbound(params)).catch(&method(:log_error))
368 end
369 "OK"
370 end
371
372 r.post do
373 from = params["from"].sub(/^(?:\+|c)/, "")
374 from = from.sub(/^1/, "") if from.length > 10
375 customer_repo(
376 sgx_repo: Bwmsgsv2Repo.new
377 ).find_by_format(from).then { |c|
378 call_attempt_repo.find_outbound(
379 c,
380 params["to"],
381 call_id: params["callId"],
382 digits: params["digits"]
383 ).then do |ca|
384 r.json { ca.to_json }
385
386 call_attempt_repo.starting_call(c, params["callId"])
387 render(*ca.to_render)
388 end
389 }.catch_only(CustomerRepo::NotFound) {
390 render "outbound/no_customer"
391 }
392 end
393 end
394 end
395
396 r.on "ogm" do
397 r.post "start" do
398 render :record_ogm, locals: { customer_id: params["customer_id"] }
399 end
400
401 r.post do
402 jmp_media_url = params["mediaUrl"].sub(
403 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
404 "https://jmp.chat"
405 )
406 ogm = OGMDownload.new(jmp_media_url)
407 ogm.download.then do
408 File.rename(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
409 File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
410 customer_repo.find(params["customer_id"]).then do |customer|
411 customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
412 end
413 end
414 end
415 end
416
417 r.public
418 end
419end
420# rubocop:enable Metrics/ClassLength