Optional alternate transcription with rev.ai

Stephen Paul Weber created

The bitfield bit 1 was used by a different project (sgx-catapult, see:
https://gitlab.com/ossguy/sgx-catapult/-/commit/459d7d1dfe208db1708f1d648b82b38c002ad35a).
This other project no longer uses the bit, and in fact that whole project is
dead and gone, but if you previously ran that project against the same redis
that you now run this project against then please make sure you have zeroed-out
that bit first.

You can verify using this script:

    redis = Redis.new
    redis.keys("catapult_settings_flags-*").each do |k|
      p redis.getbit(k, 1)
    end

Change summary

config-schema.dhall             |  1 
config.dhall.sample             |  1 
lib/backend_sgx.rb              |  1 
lib/bwmsgsv2_repo.rb            | 18 ++++++--
lib/customer.rb                 |  2 
lib/rev_ai.rb                   | 70 +++++++++++++++++++++++++++++++++++
lib/trivial_backend_sgx_repo.rb |  5 +-
test/test_helper.rb             | 12 ++++++
web.rb                          | 64 ++++++++++++++++++++++++++++++-
9 files changed, 163 insertions(+), 11 deletions(-)

Detailed changes

config-schema.dhall 🔗

@@ -37,6 +37,7 @@
       , monthly_price : Natural
       , name : Text
       }
+, rev_ai_token : Text
 , server : { host : Text, port : Natural }
 , sgx : Text
 , sip : { app : Text, realm : Text }

config.dhall.sample 🔗

@@ -80,6 +80,7 @@ in
 	keep_area_codes = ["555"],
 	keep_area_codes_in = { account = "", site_id = "", sip_peer_id = "" },
 	snikket_hosting_api = "",
+	rev_ai_token = "",
 	upstream_domain = "example.net",
 	approved_domains = toMap { `example.com` = Some "customer_id" }
 }

lib/backend_sgx.rb 🔗

@@ -14,6 +14,7 @@ class BackendSgx
 		ogm_url Either(String, nil, NotLoaded)
 		fwd Either(CustomerFwd, nil, NotLoaded)
 		transcription_enabled Either(Bool(), NotLoaded)
+		alternate_transcription_enabled Either(Bool(), NotLoaded)
 		registered? Either(IBR, FalseClass, NotLoaded)
 	end
 

lib/bwmsgsv2_repo.rb 🔗

@@ -8,6 +8,7 @@ require_relative "trivial_backend_sgx_repo"
 
 class Bwmsgsv2Repo
 	VOICEMAIL_TRANSCRIPTION_DISABLED = 0
+	VOICEMAIL_ALT_TRANSCRIPTION = 1
 
 	def initialize(
 		jid: CONFIG[:sgx],
@@ -23,11 +24,12 @@ class Bwmsgsv2Repo
 
 	def get(customer_id)
 		sgx = @trivial_repo.get(customer_id)
-		fetch_raw(sgx.from_jid).then do |(((ogm_url, fwd_time, fwd), trans_d), reg)|
+		fetch_raw(sgx.from_jid).then do |(((ogm_url, fwd_time, fwd), flags), reg)|
 			sgx.with(
 				ogm_url: ogm_url,
 				fwd: CustomerFwd.for(uri: fwd, timeout: fwd_time),
-				transcription_enabled: !trans_d,
+				transcription_enabled: !flags[VOICEMAIL_TRANSCRIPTION_DISABLED],
+				alternate_transcription_enabled: flags[VOICEMAIL_ALT_TRANSCRIPTION],
 				registered?: reg
 			)
 		end
@@ -80,9 +82,15 @@ protected
 				"catapult_fwd_timeout-#{from_jid}",
 				("catapult_fwd-#{tel}" if tel)
 			].compact),
-			@redis.getbit(
-				"catapult_settings_flags-#{from_jid}", VOICEMAIL_TRANSCRIPTION_DISABLED
-			).then { |x| x == 1 }
+			unpack_flags(from_jid)
 		])
 	end
+
+	def unpack_flags(from_jid)
+		@redis.bitfield(
+			"catapult_settings_flags-#{from_jid}",
+			"GET", "u1", VOICEMAIL_TRANSCRIPTION_DISABLED,
+			"GET", "u1", VOICEMAIL_ALT_TRANSCRIPTION
+		).then { |arr| arr.map { |x| x.to_i == 1 } }
+	end
 end

lib/customer.rb 🔗

@@ -27,7 +27,7 @@ class Customer
 	               :message_limit, :auto_top_up_amount, :monthly_overage_limit,
 	               :monthly_price, :save_plan!
 	def_delegators :@sgx, :deregister!, :register!, :registered?, :set_ogm_url,
-	               :fwd, :transcription_enabled
+	               :fwd, :transcription_enabled, :alternate_transcription_enabled
 	def_delegators :@usage, :usage_report, :message_usage, :incr_message_usage
 	def_delegators :@financials, :payment_methods, :btc_addresses,
 	               :add_btc_address, :declines, :mark_decline,

lib/rev_ai.rb 🔗

@@ -0,0 +1,70 @@
+# frozen_string_literal: true
+
+require "em-http"
+require "em_promise"
+require "json"
+
+class RevAi
+	def initialize(token: CONFIG[:rev_ai_token])
+		@token = token
+	end
+
+	def stt(language, media_url, callback_url, **kwargs)
+		req(
+			:post,
+			"https://api.rev.ai/speechtotext/v1/jobs",
+			metadata: { media_url: media_url }.merge(kwargs).to_json,
+			source_config: { url: media_url },
+			notification_config: { url: callback_url },
+			remove_disfluencies: language == "en",
+			skip_diarization: true,
+			language: language
+		)
+	end
+
+	def stt_result(job)
+		job = job["job"]
+		req(
+			:get,
+			"https://api.rev.ai/speechtotext/v1/jobs/#{job['id']}/transcript",
+			accept: "text/plain"
+		).then do |res|
+			text = res.response.split("    ", 3)[2].strip
+			job.merge("text" => text, "metadata" => JSON.parse(job["metadata"]))
+		end
+	end
+
+	def language_id(media_url, callback_url, **kwargs)
+		req(
+			:post,
+			"https://api.rev.ai/languageid/v1/jobs",
+			metadata: { media_url: media_url }.merge(kwargs).to_json,
+			source_config: { url: media_url },
+			notification_config: { url: callback_url }
+		)
+	end
+
+	def language_id_result(job)
+		job = job["job"]
+		req(
+			:get,
+			"https://api.rev.ai/languageid/v1/jobs/#{job['id']}/result"
+		).then do |res|
+			json = JSON.parse(res.response)
+			job.merge(json).merge("metadata" => JSON.parse(job["metadata"]))
+		end
+	end
+
+	def req(m, url, accept: nil, **kwargs)
+		EM::HttpRequest.new(
+			url, tls: { verify_peer: true }
+		).public_send(
+			"a#{m}",
+			head: {
+				"Authorization" => "Bearer #{@token}",
+				"Content-Type" => "application/json",
+				"Accept" => accept
+			}, body: kwargs.to_json
+		)
+	end
+end

lib/trivial_backend_sgx_repo.rb 🔗

@@ -16,12 +16,13 @@ class TrivialBackendSgxRepo
 
 	def get(customer_id)
 		BackendSgx.new(
-			jid: @jid,
-			creds: @creds,
+			jid: @jid, creds: @creds,
 			from_jid: Blather::JID.new("customer_#{customer_id}", @component_jid),
 			ogm_url: NotLoaded.new(:ogm_url),
 			fwd: NotLoaded.new(:fwd_timeout),
 			transcription_enabled: NotLoaded.new(:transcription_enabled),
+			alternate_transcription_enabled:
+				NotLoaded.new(:alternate_transcription_enabled),
 			registered?: NotLoaded.new(:registered?)
 		)
 	end

test/test_helper.rb 🔗

@@ -205,6 +205,18 @@ class FakeRedis
 		get(key).then { |v| v.to_i.to_s(2)[bit].to_i }
 	end
 
+	def bitfield(key, *ops)
+		get(key).then do |v|
+			bits = v.to_i.to_s(2)
+			ops.each_slice(3).map do |(op, encoding, offset)|
+				raise "unsupported bitfield op" unless op == "GET"
+				raise "unsupported bitfield op" unless encoding == "u1"
+
+				bits[offset].to_i
+			end
+		end
+	end
+
 	def hget(key, field)
 		@values.dig(key, field)
 	end

web.rb 🔗

@@ -11,6 +11,7 @@ require "sentry-ruby"
 require_relative "lib/call_attempt_repo"
 require_relative "lib/cdr"
 require_relative "lib/oob"
+require_relative "lib/rev_ai"
 require_relative "lib/roda_capture"
 require_relative "lib/roda_em_promise"
 require_relative "lib/rack_fiber"
@@ -109,6 +110,10 @@ class Web < Roda
 		opts[:call_attempt_repo] || CallAttemptRepo.new
 	end
 
+	def rev_ai
+		RevAi.new
+	end
+
 	TEL_CANDIDATES = {
 		"Restricted" => "14",
 		"anonymous" => "15",
@@ -135,8 +140,11 @@ class Web < Roda
 		)
 	end
 
-	def inbound_calls_path(suffix, customer_id=nil)
-		["/inbound/calls/#{params['callId']}", suffix].compact.join("/") +
+	def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
+		[
+			"/inbound/calls/#{call_id || params['callId']}",
+			suffix
+		].compact.join("/") +
 			(customer_id ? "?customer_id=#{customer_id}" : "")
 	end
 
@@ -158,6 +166,19 @@ class Web < Roda
 		raise $! unless $!.response_code.to_s == "404"
 	end
 
+	def do_alternate_transcription(customer, call_id)
+		return unless customer.alternate_transcription_enabled
+
+		rev_ai.language_id(
+			jmp_media_url,
+			url(inbound_calls_path(
+				"voicemail/rev_ai/language_id", call_id: call_id
+			)),
+			from_jid: from_jid,
+			customer_id: customer.customer_id
+		)
+	end
+
 	route do |r|
 		r.on "inbound" do
 			r.on "calls" do
@@ -204,7 +225,11 @@ class Web < Roda
 								"https://jmp.chat"
 							)
 
-							customer_repo.find_by_tel(params["to"]).then do |customer|
+							customer_repo(
+								sgx_repo: Bwmsgsv2Repo.new
+							).find_by_tel(params["to"]).then do |customer|
+								do_alternate_transcription(customer, call_id)
+
 								m = Blather::Stanza::Message.new
 								m.chat_state = nil
 								m.from = from_jid
@@ -236,6 +261,39 @@ class Web < Roda
 							end
 						end
 
+						r.on "rev_ai" do
+							r.post "language_id" do
+								rev_ai.language_id_result(params).then do |result|
+									rev_ai.stt(
+										result["top_language"],
+										result.dig("metadata", "media_url"),
+										url(inbound_calls_path(
+											"voicemail/rev_ai/transcription",
+											call_id: call_id
+										)),
+										**result["metadata"].transform_keys(&:to_sym)
+									).then { "OK" }
+								end
+							end
+
+							r.post "transcription" do
+								rev_ai.stt_result(params).then do |result|
+									customer_repo.find(
+										result.dig("metadata", "customer_id")
+									).then do |customer|
+										m = Blather::Stanza::Message.new
+										m.chat_state = nil
+										m.from = result.dig("metadata", "from_jid")
+										m.subject = "Voicemail Transcription"
+										m.body = "Alternate Transcription: #{result['text']}"
+										customer.stanza_to(m)
+
+										"OK"
+									end
+								end
+							end
+						end
+
 						r.post do
 							customer_repo(sgx_repo: Bwmsgsv2Repo.new)
 								.find_by_tel(params["to"])