1# frozen_string_literal: true
2
3require "digest"
4require "forwardable"
5require "multibases"
6require "multihashes"
7require "roda"
8require "thin"
9require "sentry-ruby"
10
11require_relative "lib/call_attempt_repo"
12require_relative "lib/cdr"
13require_relative "lib/oob"
14require_relative "lib/rev_ai"
15require_relative "lib/roda_capture"
16require_relative "lib/roda_em_promise"
17require_relative "lib/rack_fiber"
18
19class OGMDownload
20 def initialize(url)
21 @digest = Digest::SHA512.new
22 @f = Tempfile.open("ogm")
23 @req = EM::HttpRequest.new(url, tls: { verify_peer: true })
24 end
25
26 def download
27 http = @req.aget
28 http.stream do |chunk|
29 @digest << chunk
30 @f.write chunk
31 end
32 http.then { @f.close }.catch do |e|
33 @f.close!
34 EMPromise.reject(e)
35 end
36 end
37
38 def cid
39 Multibases.encode(
40 "base58btc",
41 [1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
42 ).pack.to_s
43 end
44
45 def path
46 @f.path
47 end
48end
49
50# rubocop:disable Metrics/ClassLength
51class Web < Roda
52 use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
53 use Sentry::Rack::CaptureExceptions
54 plugin :json_parser
55 plugin :type_routing
56 plugin :public
57 plugin :render, engine: "slim"
58 plugin RodaCapture
59 plugin RodaEMPromise # Must go last!
60
61 class << self
62 attr_reader :customer_repo, :log, :outbound_transfers
63
64 def run(log, *listen_on)
65 plugin :common_logger, log, method: :info
66 @outbound_transfers = {}
67 Thin::Logging.logger = log
68 Thin::Server.start(
69 *listen_on,
70 freeze.app,
71 signals: false
72 )
73 end
74 end
75
76 extend Forwardable
77 def_delegators :'self.class', :outbound_transfers
78 def_delegators :request, :params
79
80 def log
81 opts[:common_logger]
82 end
83
84 def log_error(e)
85 log.error(
86 "Error raised during #{request.fullpath}: #{e.class}",
87 e,
88 loggable_params
89 )
90 if e.is_a?(::Exception)
91 Sentry.capture_exception(e)
92 else
93 Sentry.capture_message(e.to_s)
94 end
95 end
96
97 def loggable_params
98 params.dup.tap do |p|
99 p.delete("to")
100 p.delete("from")
101 end
102 end
103
104 def customer_repo(**kwargs)
105 kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
106 opts[:customer_repo] || CustomerRepo.new(**kwargs)
107 end
108
109 def call_attempt_repo
110 opts[:call_attempt_repo] || CallAttemptRepo.new
111 end
112
113 def rev_ai
114 RevAi.new
115 end
116
117 TEL_CANDIDATES = {
118 "Restricted" => "14",
119 "anonymous" => "15",
120 "Anonymous" => "16",
121 "unavailable" => "17",
122 "Unavailable" => "18"
123 }.freeze
124
125 def sanitize_tel_candidate(candidate)
126 if candidate.length < 3
127 "13;phone-context=anonymous.phone-context.soprani.ca"
128 elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
129 candidate
130 else
131 "#{TEL_CANDIDATES.fetch(candidate, '19')}" \
132 ";phone-context=anonymous.phone-context.soprani.ca"
133 end
134 end
135
136 def from_jid
137 Blather::JID.new(
138 sanitize_tel_candidate(params["from"]),
139 CONFIG[:component][:jid]
140 )
141 end
142
143 def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
144 [
145 "/inbound/calls/#{call_id || params['callId']}",
146 suffix
147 ].compact.join("/") +
148 (customer_id ? "?customer_id=#{customer_id}" : "")
149 end
150
151 def url(path)
152 "#{request.base_url}#{path}"
153 end
154
155 def modify_call(call_id)
156 body = Bandwidth::ApiModifyCallRequest.new
157 yield body
158 BANDWIDTH_VOICE.modify_call(
159 CONFIG[:creds][:account],
160 call_id,
161 body: body
162 )
163 rescue Bandwidth::ApiErrorResponseException
164 # If call does not exist, don't need to hang up or send to voicemail
165 # Other side must have hung up already
166 raise $! unless $!.response_code.to_s == "404"
167 end
168
169 def do_alternate_transcription(customer, call_id)
170 return unless customer.alternate_transcription_enabled
171
172 rev_ai.language_id(
173 jmp_media_url,
174 url(inbound_calls_path(
175 "voicemail/rev_ai/language_id", call_id: call_id
176 )),
177 from_jid: from_jid,
178 customer_id: customer.customer_id
179 )
180 end
181
182 route do |r|
183 r.on "inbound" do
184 r.on "calls" do
185 r.post "status" do
186 if params["eventType"] == "disconnect"
187 if (outbound_leg = outbound_transfers.delete(params["callId"]))
188 modify_call(outbound_leg) do |call|
189 call.state = "completed"
190 end
191 end
192
193 customer_repo.find_by_tel(params["to"]).then do |customer|
194 CDR.for_inbound(customer.customer_id, params).save
195 end
196 end
197 "OK"
198 end
199
200 r.on :call_id do |call_id|
201 r.post "transfer_complete" do
202 outbound_leg = outbound_transfers.delete(call_id)
203 if params["cause"] == "hangup" && params["tag"] == "connected"
204 log.info "Normal hangup, now end #{call_id}", loggable_params
205 modify_call(call_id) { |call| call.state = "completed" }
206 elsif !outbound_leg
207 log.debug "Inbound disconnected", loggable_params
208 else
209 log.debug "Go to voicemail", loggable_params
210 modify_call(call_id) do |call|
211 call.redirect_url = url inbound_calls_path(:voicemail)
212 end
213 end
214 ""
215 end
216
217 r.on "voicemail" do
218 r.post "audio" do
219 duration = Time.parse(params["endTime"]) -
220 Time.parse(params["startTime"])
221 next "OK<5" unless duration > 5
222
223 jmp_media_url = params["mediaUrl"].sub(
224 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
225 "https://jmp.chat"
226 )
227
228 customer_repo(
229 sgx_repo: Bwmsgsv2Repo.new
230 ).find_by_tel(params["to"]).then do |customer|
231 do_alternate_transcription(customer, call_id)
232
233 m = Blather::Stanza::Message.new
234 m.chat_state = nil
235 m.from = from_jid
236 m.subject = "New Voicemail"
237 m.body = jmp_media_url
238 m << OOB.new(jmp_media_url, desc: "Voicemail Recording")
239 customer.stanza_to(m)
240
241 "OK"
242 end
243 end
244
245 r.post "transcription" do
246 duration = Time.parse(params["endTime"]) -
247 Time.parse(params["startTime"])
248 next "OK<5" unless duration > 5
249
250 customer_repo.find_by_tel(params["to"]).then do |customer|
251 m = Blather::Stanza::Message.new
252 m.chat_state = nil
253 m.from = from_jid
254 m.subject = "Voicemail Transcription"
255 m.body = BANDWIDTH_VOICE.get_recording_transcription(
256 params["accountId"], params["callId"], params["recordingId"]
257 ).data.transcripts[0].text
258 customer.stanza_to(m)
259
260 "OK"
261 end
262 end
263
264 r.on "rev_ai" do
265 r.post "language_id" do
266 rev_ai.language_id_result(params).then do |result|
267 rev_ai.stt(
268 result["top_language"],
269 result.dig("metadata", "media_url"),
270 url(inbound_calls_path(
271 "voicemail/rev_ai/transcription",
272 call_id: call_id
273 )),
274 **result["metadata"].transform_keys(&:to_sym)
275 ).then { "OK" }
276 end
277 end
278
279 r.post "transcription" do
280 rev_ai.stt_result(params).then do |result|
281 customer_repo.find(
282 result.dig("metadata", "customer_id")
283 ).then do |customer|
284 m = Blather::Stanza::Message.new
285 m.chat_state = nil
286 m.from = result.dig("metadata", "from_jid")
287 m.subject = "Voicemail Transcription"
288 m.body = "Alternate Transcription: #{result['text']}"
289 customer.stanza_to(m)
290
291 "OK"
292 end
293 end
294 end
295 end
296
297 r.post do
298 customer_repo(sgx_repo: Bwmsgsv2Repo.new)
299 .find_by_tel(params["to"])
300 .then { |c|
301 EMPromise.all([c, c.ogm(params["from"])])
302 }.then do |(customer, ogm)|
303 render :voicemail, locals: {
304 ogm: ogm,
305 transcription_enabled: customer.transcription_enabled
306 }
307 end
308 end
309 end
310
311 r.post do
312 customer_repo(
313 sgx_repo: Bwmsgsv2Repo.new
314 ).find(params.fetch("customer_id")).then do |customer|
315 call_attempt_repo.find_inbound(
316 customer,
317 params["from"],
318 call_id: call_id,
319 digits: params["digits"]
320 ).then { |ca| render(*ca.to_render) }
321 end
322 end
323 end
324
325 r.post do
326 customer_repo(
327 sgx_repo: Bwmsgsv2Repo.new
328 ).find_by_tel(params["to"]).then { |customer|
329 EMPromise.all([
330 customer.customer_id, customer.fwd,
331 call_attempt_repo.find_inbound(
332 customer, params["from"], call_id: params["callId"]
333 )
334 ])
335 }.then { |(customer_id, fwd, ca)|
336 call = ca.create_call(fwd, CONFIG[:creds][:account]) { |cc|
337 cc.from = params["from"]
338 cc.application_id = params["applicationId"]
339 cc.answer_url = url inbound_calls_path(nil, customer_id)
340 cc.disconnect_url = url inbound_calls_path(:transfer_complete)
341 }
342
343 next EMPromise.reject(:voicemail) unless call
344
345 outbound_transfers[params["callId"]] = call
346 render :ring, locals: { duration: 300 }
347 }.catch { |e|
348 log_error(e) unless e == :voicemail
349 render :redirect, locals: { to: inbound_calls_path(:voicemail) }
350 }
351 end
352 end
353 end
354
355 r.on "outbound" do
356 r.on "calls" do
357 r.post "status" do
358 log.info "#{params['eventType']} #{params['callId']}", loggable_params
359 if params["eventType"] == "disconnect"
360 call_attempt_repo.ending_call(c, params["callId"])
361 CDR.for_outbound(params).save.catch(&method(:log_error))
362 end
363 "OK"
364 end
365
366 r.post do
367 from = params["from"].sub(/^\+1/, "")
368 customer_repo(
369 sgx_repo: Bwmsgsv2Repo.new
370 ).find_by_format(from).then do |c|
371 call_attempt_repo.find_outbound(
372 c,
373 params["to"],
374 call_id: params["callId"],
375 digits: params["digits"]
376 ).then do |ca|
377 r.json { ca.to_json }
378
379 call_attempt_repo.starting_call(c, params["callId"])
380 render(*ca.to_render)
381 end
382 end
383 end
384 end
385 end
386
387 r.on "ogm" do
388 r.post "start" do
389 render :record_ogm, locals: { customer_id: params["customer_id"] }
390 end
391
392 r.post do
393 jmp_media_url = params["mediaUrl"].sub(
394 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
395 "https://jmp.chat"
396 )
397 ogm = OGMDownload.new(jmp_media_url)
398 ogm.download.then do
399 File.rename(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
400 File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
401 customer_repo.find(params["customer_id"]).then do |customer|
402 customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
403 end
404 end
405 end
406 end
407
408 r.public
409 end
410end
411# rubocop:enable Metrics/ClassLength