1# frozen_string_literal: true
2
3require "digest"
4require "forwardable"
5require "multibases"
6require "multihashes"
7require "roda"
8require "thin"
9require "sentry-ruby"
10
11require_relative "lib/call_attempt_repo"
12require_relative "lib/cdr"
13require_relative "lib/oob"
14require_relative "lib/rev_ai"
15require_relative "lib/roda_capture"
16require_relative "lib/roda_em_promise"
17require_relative "lib/rack_fiber"
18
19class OGMDownload
20 def initialize(url)
21 @digest = Digest::SHA512.new
22 @f = Tempfile.open("ogm")
23 @req = EM::HttpRequest.new(url, tls: { verify_peer: true })
24 end
25
26 def download
27 http = @req.aget
28 http.stream do |chunk|
29 @digest << chunk
30 @f.write chunk
31 end
32 http.then { @f.close }.catch do |e|
33 @f.close!
34 EMPromise.reject(e)
35 end
36 end
37
38 def cid
39 Multibases.encode(
40 "base58btc",
41 [1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
42 ).pack.to_s
43 end
44
45 def path
46 @f.path
47 end
48end
49
50# rubocop:disable Metrics/ClassLength
51class Web < Roda
52 use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
53 use Sentry::Rack::CaptureExceptions
54 plugin :json_parser
55 plugin :type_routing
56 plugin :public
57 plugin :render, engine: "slim"
58 plugin RodaCapture
59 plugin RodaEMPromise # Must go last!
60
61 class << self
62 attr_reader :customer_repo, :log, :outbound_transfers
63
64 def run(log, *listen_on)
65 plugin :common_logger, log, method: :info
66 @outbound_transfers = {}
67 Thin::Logging.logger = log
68 Thin::Server.start(
69 *listen_on,
70 freeze.app,
71 signals: false
72 )
73 end
74 end
75
76 extend Forwardable
77 def_delegators :'self.class', :outbound_transfers
78 def_delegators :request, :params
79
80 def log
81 opts[:common_logger]
82 end
83
84 def log_error(e)
85 log.error(
86 "Error raised during #{request.fullpath}: #{e.class}",
87 e,
88 loggable_params
89 )
90 if e.is_a?(::Exception)
91 Sentry.capture_exception(e)
92 else
93 Sentry.capture_message(e.to_s)
94 end
95 end
96
97 def loggable_params
98 params.dup.tap do |p|
99 p.delete("to")
100 p.delete("from")
101 end
102 end
103
104 def customer_repo(**kwargs)
105 kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
106 opts[:customer_repo] || CustomerRepo.new(**kwargs)
107 end
108
109 def call_attempt_repo
110 opts[:call_attempt_repo] || CallAttemptRepo.new
111 end
112
113 def rev_ai
114 RevAi.new
115 end
116
117 TEL_CANDIDATES = {
118 "Restricted" => "14",
119 "anonymous" => "15",
120 "Anonymous" => "16",
121 "unavailable" => "17",
122 "Unavailable" => "18"
123 }.freeze
124
125 def sanitize_tel_candidate(candidate)
126 if candidate.length < 3
127 "13;phone-context=anonymous.phone-context.soprani.ca"
128 elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
129 candidate
130 else
131 "#{TEL_CANDIDATES.fetch(candidate, '19')}" \
132 ";phone-context=anonymous.phone-context.soprani.ca"
133 end
134 end
135
136 def from_jid
137 Blather::JID.new(
138 sanitize_tel_candidate(params["from"]),
139 CONFIG[:component][:jid]
140 )
141 end
142
143 def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
144 [
145 "/inbound/calls/#{call_id || params['callId']}",
146 suffix
147 ].compact.join("/") +
148 (customer_id ? "?customer_id=#{customer_id}" : "")
149 end
150
151 def url(path)
152 "#{request.base_url}#{path}"
153 end
154
155 def modify_call(call_id)
156 body = Bandwidth::ApiModifyCallRequest.new
157 yield body
158 BANDWIDTH_VOICE.modify_call(
159 CONFIG[:creds][:account],
160 call_id,
161 body: body
162 )
163 rescue Bandwidth::ApiErrorResponseException
164 # If call does not exist, don't need to hang up or send to voicemail
165 # Other side must have hung up already
166 raise $! unless $!.response_code.to_s == "404"
167 end
168
169 def start_transcription(customer, call_id, media_url)
170 return unless customer.transcription_enabled
171
172 rev_ai.language_id(
173 media_url,
174 url(inbound_calls_path("voicemail/language_id", call_id: call_id)),
175 from_jid: from_jid,
176 customer_id: customer.customer_id
177 )
178 end
179
180 route do |r|
181 r.on "inbound" do
182 r.on "calls" do
183 r.post "status" do
184 if params["eventType"] == "disconnect"
185 if (outbound_leg = outbound_transfers.delete(params["callId"]))
186 modify_call(outbound_leg) do |call|
187 call.state = "completed"
188 end
189 end
190
191 customer_repo.find_by_tel(params["to"]).then do |customer|
192 CDR.for_inbound(customer.customer_id, params).save
193 end
194 end
195 "OK"
196 end
197
198 r.on :call_id do |call_id|
199 r.post "transfer_complete" do
200 outbound_leg = outbound_transfers.delete(call_id)
201 if params["cause"] == "hangup" && params["tag"] == "connected"
202 log.info "Normal hangup, now end #{call_id}", loggable_params
203 modify_call(call_id) { |call| call.state = "completed" }
204 elsif !outbound_leg
205 log.debug "Inbound disconnected", loggable_params
206 else
207 log.debug "Go to voicemail", loggable_params
208 modify_call(call_id) do |call|
209 call.redirect_url = url inbound_calls_path(:voicemail)
210 end
211 end
212 ""
213 end
214
215 r.on "voicemail" do
216 r.post "audio" do
217 duration = Time.parse(params["endTime"]) -
218 Time.parse(params["startTime"])
219 next "OK<5" unless duration > 5
220
221 jmp_media_url = params["mediaUrl"].sub(
222 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
223 "https://jmp.chat"
224 )
225
226 customer_repo(
227 sgx_repo: Bwmsgsv2Repo.new
228 ).find_by_tel(params["to"]).then do |customer|
229 start_transcription(customer, call_id, jmp_media_url)
230
231 m = Blather::Stanza::Message.new
232 m.chat_state = nil
233 m.from = from_jid
234 m.subject = "New Voicemail"
235 m.body = jmp_media_url
236 m << OOB.new(jmp_media_url, desc: "Voicemail Recording")
237 customer.stanza_to(m)
238
239 "OK"
240 end
241 end
242
243 r.post "language_id" do
244 rev_ai.language_id_result(params).then do |result|
245 rev_ai.stt(
246 result["top_language"],
247 result.dig("metadata", "media_url"),
248 url(inbound_calls_path(
249 "voicemail/transcription",
250 call_id: call_id
251 )),
252 **result["metadata"].transform_keys(&:to_sym)
253 ).then { "OK" }
254 end
255 end
256
257 r.post "transcription" do
258 rev_ai.stt_result(params).then do |result|
259 next "OK" if result["text"].empty?
260
261 customer_repo.find(
262 result.dig("metadata", "customer_id")
263 ).then do |customer|
264 m = Blather::Stanza::Message.new
265 m.chat_state = nil
266 m.from = result.dig("metadata", "from_jid")
267 m.subject = "Voicemail Transcription"
268 m.body = result["text"]
269 customer.stanza_to(m)
270
271 "OK"
272 end
273 end
274 end
275
276 r.post do
277 customer_repo(sgx_repo: Bwmsgsv2Repo.new)
278 .find_by_tel(params["to"])
279 .then { |c|
280 EMPromise.all([c, c.ogm(params["from"])])
281 }.then do |(customer, ogm)|
282 render :voicemail, locals: {
283 ogm: ogm,
284 transcription_enabled: customer.transcription_enabled
285 }
286 end
287 end
288 end
289
290 r.post do
291 customer_repo(
292 sgx_repo: Bwmsgsv2Repo.new
293 ).find(params.fetch("customer_id")).then do |customer|
294 call_attempt_repo.find_inbound(
295 customer,
296 params["from"],
297 call_id: call_id,
298 digits: params["digits"]
299 ).then { |ca| render(*ca.to_render) }
300 end
301 end
302 end
303
304 r.post do
305 customer_repo(
306 sgx_repo: Bwmsgsv2Repo.new
307 ).find_by_tel(params["to"]).then { |customer|
308 EMPromise.all([
309 customer.customer_id, customer.fwd,
310 call_attempt_repo.find_inbound(
311 customer, params["from"], call_id: params["callId"]
312 )
313 ])
314 }.then { |(customer_id, fwd, ca)|
315 call = ca.create_call(fwd, CONFIG[:creds][:account]) { |cc|
316 cc.from = params["from"]
317 cc.application_id = params["applicationId"]
318 cc.answer_url = url inbound_calls_path(nil, customer_id)
319 cc.disconnect_url = url inbound_calls_path(:transfer_complete)
320 }
321
322 next EMPromise.reject(:voicemail) unless call
323
324 outbound_transfers[params["callId"]] = call
325 render :ring, locals: { duration: 300 }
326 }.catch { |e|
327 log_error(e) unless e == :voicemail
328 render :redirect, locals: { to: inbound_calls_path(:voicemail) }
329 }
330 end
331 end
332 end
333
334 r.on "outbound" do
335 r.on "calls" do
336 r.post "status" do
337 log.info "#{params['eventType']} #{params['callId']}", loggable_params
338 if params["eventType"] == "disconnect"
339 call_attempt_repo.ending_call(c, params["callId"])
340 CDR.for_outbound(params).save.catch(&method(:log_error))
341 end
342 "OK"
343 end
344
345 r.post do
346 from = params["from"].sub(/^\+1/, "")
347 customer_repo(
348 sgx_repo: Bwmsgsv2Repo.new
349 ).find_by_format(from).then do |c|
350 call_attempt_repo.find_outbound(
351 c,
352 params["to"],
353 call_id: params["callId"],
354 digits: params["digits"]
355 ).then do |ca|
356 r.json { ca.to_json }
357
358 call_attempt_repo.starting_call(c, params["callId"])
359 render(*ca.to_render)
360 end
361 end
362 end
363 end
364 end
365
366 r.on "ogm" do
367 r.post "start" do
368 render :record_ogm, locals: { customer_id: params["customer_id"] }
369 end
370
371 r.post do
372 jmp_media_url = params["mediaUrl"].sub(
373 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
374 "https://jmp.chat"
375 )
376 ogm = OGMDownload.new(jmp_media_url)
377 ogm.download.then do
378 File.rename(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
379 File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
380 customer_repo.find(params["customer_id"]).then do |customer|
381 customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
382 end
383 end
384 end
385 end
386
387 r.public
388 end
389end
390# rubocop:enable Metrics/ClassLength