diff --git a/.rubocop.yml b/.rubocop.yml index c719739bcdeaaf6c3424c1e54012a51f5caa8117..0b470abd7d329869ce538688266e031d9e17ea5f 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -20,6 +20,7 @@ Metrics/BlockLength: - route - "on" - json + - "OptionParser.new" Exclude: - test/* - lib/tasks/* diff --git a/bin/porting b/bin/porting index 8313e6b881caeb8cd8ce5ab011e01e51dd54d7e9..8f19c90098ed0160a672969c6c198727c3cfb8ad 100755 --- a/bin/porting +++ b/bin/porting @@ -4,6 +4,7 @@ require "date" require "dhall" require "em-hiredis" +require "pg/em/connection_pool" require "em-http" require "em_promise" require "json" @@ -16,9 +17,14 @@ require "time" @verbosity = 0 @real_data = true @dry_run = false +@sources = [] +@filters = {} OptionParser.new do |opts| - opts.banner = "Usage: porting [-vvf] DHALL_CONFIG" + opts.banner = + "Usage: porting [-vn] " \ + "[-c CUSTOMER_ID] " \ + "-s DHALL_CONFIG" opts.on( "-v", "--verbose", @@ -27,10 +33,6 @@ OptionParser.new do |opts| @verbosity += 1 end - opts.on("-f", "--fake", "Run with fake ports rather than fetching") do - @real_data = false - end - opts.on( "-n", "--dry-run", "Figure out what state they're in, but don't take action" @@ -42,15 +44,32 @@ OptionParser.new do |opts| puts opts exit end + + opts.on( + "-sSOURCE", "--source=SOURCE", "Source of ports (required, repeatable)" + ) do |source| + @sources.append source + end + + opts.on( + "-cCUSTOMER_ID", "--customer-id=CUSTOMER_ID", + "Filter by customer ID (db source only)" + ) do |customer_id| + @filters[:customer_id] = customer_id + end end.parse! +@sources = ["db", "bandwidth"] if @sources.empty? + SCHEMA = "{ bandwidth : { account: Text, username: Text, password: Text }, xmpp: { jid: Text, password: Text }, notification: { endpoint: Text, source_number: Text }, pubsub: { server: Text, node: Text }, testing_tel: Text, - admin_server: Text + admin_server: Text, + sgx: Text, + sgx_creds: List { mapKey: Text, mapValue: {} } }" raise "Need a Dhall config" unless ARGV[0] @@ -62,7 +81,9 @@ CONFIG = Dhall::Coder require_relative "../lib/blather_notify" require_relative "../lib/expiring_lock" require_relative "../lib/form_to_h" -require_relative "../lib/porting_step" +require_relative "../lib/porting_step_repo" +require_relative "../lib/port_repo" +require_relative "../lib/postgres" Faraday.default_adapter = :em_synchrony BandwidthIris::Client.global_options = { @@ -71,131 +92,6 @@ BandwidthIris::Client.global_options = { password: CONFIG[:bandwidth][:password] } -class FullManual < PortingStepRepo::Outputs - def info(id, key, msg) - puts "[#{id}] INFO(#{key}): #{msg}" - end - - def warn(id, key, msg) - puts "[#{id}] WARN(#{key}): #{msg}" - end - - def error(id, key, e_or_msg) - puts "[#{id}] ERRR(#{key}): #{e_or_msg}" - return unless e_or_msg.respond_to?(:backtrace) - - e_or_msg.backtrace.each do |b| - puts "[#{id}] ERRR(#{key}): #{b}" - end - end - - def to_customer(id, key, tel, msg) - puts "[#{id}] CUST(#{key}, #{tel}): #{msg}" - end -end - -class ObservedAuto < FullManual - def initialize(endpoint, source_number) - @endpoint = endpoint - @src = source_number - end - - def to_customer(id, key, tel, msg) - ExpiringLock.new(lock_key(id, key)).with do - EM::HttpRequest - .new(@endpoint) - .apost( - head: { "Content-Type" => "application/json" }, - body: format_msg(tel, msg) - ) - end - end - -protected - - def lock_key(id, key) - "jmp_port_customer_msg_#{key}-#{id}" - end - - def format_msg(tel, msg) - [{ - time: DateTime.now.iso8601, - type: "message-received", - to: tel, - description: "Incoming message received", - message: actual_message(tel, msg) - }].to_json - end - - def actual_message(tel, msg) - { - id: SecureRandom.uuid, - owner: tel, - applicationId: SecureRandom.uuid, - time: DateTime.now.iso8601, - segmentCount: 1, - direction: "in", - to: [tel], from: @src, - text: msg - } - end -end - -class FullAutomatic < ObservedAuto - using FormToH - - def initialize(pubsub_addr, endpoint, source_number) - @pubsub = BlatherNotify.pubsub(pubsub_addr) - - Sentry.init do |config| - config.background_worker_threads = 0 - end - - super(endpoint, source_number) - end - - # No one's watch; swallow informational messages - def info(*); end - - def warn(id, key, msg) - ExpiringLock.new(warn_lock_key(id, key), expiry: 60 * 15).with do - entrykey = "#{id}:#{key}" - @pubsub.publish("#{entrykey}": error_entry("Port Warning", msg, entrykey)) - end - end - - def error(id, key, e_or_msg) - Sentry.with_scope do |scope| - scope.set_context("port", { id: id, action: key }) - - if e_or_msg.is_a?(::Exception) - Sentry.capture_exception(e_or_msg) - else - Sentry.capture_message(e_or_msg.to_s) - end - end - end - -protected - - def error_entry(title, text, id) - Nokogiri::XML::Builder.new { |xml| - xml.entry(xmlns: "http://www.w3.org/2005/Atom") do - xml.updated DateTime.now.iso8601 - xml.id id - xml.title title - xml.content text.to_s, type: "text" - xml.author { xml.name "porting" } - xml.generator "porting", version: "1.0" - end - }.doc.root - end - - def warn_lock_key(id, key) - "jmp_port_warn_msg_#{key}-#{id}" - end -end - @output = case @verbosity when 0 @@ -213,57 +109,33 @@ end FullManual.new end -ports = if @real_data - BandwidthIris::PortIn.list( - page: 1, - size: 50, - start_date: Date.today - 1, - end_date: Date.today - ) || [] -else - MP = Struct.new( - :order_id, - :processing_status, - :actual_foc_date, - :last_modified_date, - :customer_order_id, - :billing_telephone_number - ) - - minutes = 1.0 / (24 * 60) - - [ - # This should be ignored - MP.new("T01", "SUBMITTED", nil, DateTime.now - 1, "ignored", "9998887777"), - MP.new( - "T02", "COMPLETE", DateTime.now - 60 * minutes, - DateTime.now - 55 * minutes, "0001", "2223334444" - ) - ] -end +PORT_SOURCES = @sources.map(&:downcase).map { |source| + case source + when "bandwidth" + PortRepo::Bandwidth.new(@output, dry_run: @dry_run) + when "db" + PortRepo::Db.new(@output, dry_run: @dry_run, filters: @filters) + when "fake" + PortRepo::Fake.new(@output, dry_run: @dry_run) + else + puts <<~ERR + Invalid port source (-s / --source): #{source}. + Valid options: fake, db, endstream + ERR + # EINVAL + exit! 22 + end +} EM.run do REDIS = EM::Hiredis.connect + DB = Postgres.connect(dbname: "jmp") BlatherNotify.start( CONFIG[:xmpp][:jid], CONFIG[:xmpp][:password] ).then { - ports.reduce(EMPromise.resolve(nil)) { |promise, port| - promise.then do - @output.info(port.order_id, :start, "Here we go") - PortingStepRepo.new(output: @output).find(port).then { |s| - @output.info(port.order_id, :class, s.class) - s - }.then { |s| - if @dry_run - @output.info("DRY", :dry, "Not taking action") - else - s.perform_next_step - end - } - end - } + EMPromise.all(PORT_SOURCES.map(&:process)) }.catch { |e| @output.error("ROOT", :catch, e) }.then { BlatherNotify.shutdown } diff --git a/lib/db_port.rb b/lib/db_port.rb new file mode 100644 index 0000000000000000000000000000000000000000..a8177da8ac16fbb2cd47b79b81684fca721cc3d6 --- /dev/null +++ b/lib/db_port.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true + +require "value_semantics/monkey_patched" +require_relative "./validators" + +class DbPort + module Status + SUBMITTED = "submitted" + COMPLETE = "complete" + FOC = "foc" + end + + value_semantics do + id String + processing_status PortProcessingStatus + actual_foc_date Either(DateTime, nil), default: nil, coerce: true + updated_at DateTime, coerce: true + tel ValidTelString + customer_id String + backend_sgx ValidBackendSgx + end + + def self.coerce_actual_foc_date(time) + time&.to_datetime + end + + def self.coerce_updated_at(raw) + raw.to_datetime + end + + def self.from(**kwargs) + new(**kwargs) + end +end diff --git a/lib/port_repo.rb b/lib/port_repo.rb new file mode 100644 index 0000000000000000000000000000000000000000..e738fde788581f3f835ed7b5eaa9c8fdcae0add0 --- /dev/null +++ b/lib/port_repo.rb @@ -0,0 +1,146 @@ +# frozen_string_literal: true + +require "em_promise" +require "delegate" +require "ruby-bandwidth-iris" +require "lazy_object" + +require_relative "db_port" + +class PortRepo + def initialize(output, dry_run: false) + @output = output + @dry_run = dry_run + @step_repo = PortingStepRepo.new(output: output) + end + + def list + raise NotImplementedError, "subclass must implement" + end + + def process + list.then { |ports| + EMPromise.all(ports.map(&method(:tick))) + } + end + + class Bandwidth < self + class PortWithBackend < SimpleDelegator + def backend_sgx + CONFIG[:sgx] + end + + def id + self[:order_id] + end + + def customer_id + self[:customer_order_id] + end + + def tel + self[:billing_telephone_number] + end + + def updated_at + self[:last_modified_date] + end + end + + def list + (BandwidthIris::PortIn.list( + page: 1, + size: 50, + start_date: Date.today - 1, + end_date: Date.today + ) || []).map(&PortWithBackend.method(:new)) + end + end + + class Db < self + def initialize( + output, + dry_run: false, + db: LazyObject.new { DB }, + filters: {} + ) + super(output, dry_run: dry_run) + @db = db + @filters = filters + end + + def list(**kwargs) + rows(**@filters.merge(kwargs)).then { |r| + r.map { |row| + DbPort.from(**row.transform_keys(&:to_sym)) + } + } + end + + protected + + def rows( + max_results: 50, + offset: 0, + start_date: Date.today - 1, + end_date: Date.today + 1, + customer_id: nil + ) + conditions = ["actual_foc_date >= $1", "actual_foc_date <= $2"] + params = [start_date, end_date] + + if customer_id + conditions << "customer_id = $#{params.length + 1}" + params << customer_id + end + + @db.exec_defer(<<~SQL, params + [max_results, offset]) + SELECT * FROM ports + WHERE #{conditions.join(' AND ')} + LIMIT $#{params.length + 1} + OFFSET $#{params.length + 2} + SQL + end + end + + class Fake < Bandwidth + FakePort = Struct.new( + :id, + :processing_status, + :actual_foc_date, + :updated_at, + :customer_id, + :tel, + :backend_sgx + ) + + def list + minutes = 1.0 / (24 * 60) + + [ + # This should be ignored + FakePort.new( + "T01", "SUBMITTED", nil, DateTime.now - 1, + "ignored", "9998887777", "testroute" + ), + FakePort.new( + "T02", "COMPLETE", DateTime.now - (60 * minutes), + DateTime.now - (55 * minutes), "0001", "2223334444", "testroute" + ) + ] + end + end + +protected + + def tick(port) + @step_repo.find(port).then { |step| + @output.info(port.id, :class, step.class) + if @dry_run + @output.info("DRY", :dry, "Not taking action") + else + step.perform_next_step + end + } + end +end diff --git a/lib/porting_step.rb b/lib/porting_step.rb deleted file mode 100644 index 3d3e93779f1f68ff83c3e5f31d9bcbdf1e7419f7..0000000000000000000000000000000000000000 --- a/lib/porting_step.rb +++ /dev/null @@ -1,311 +0,0 @@ -# frozen_string_literal: true - -require "date" -require "em_promise" -require "lazy_object" -require "value_semantics/monkey_patched" - -require_relative "blather_notify" - -class Tel - def initialize(str) - @tel = if str.is_a? Tel - str.to_s - else - "+1#{str.sub(/^\+?1?/, '')}" - end - end - - def to_s - @tel - end - - def ==(other) - to_s == other.to_s - end -end - -class PortingStepRepo - # Any thing that debounces messages must happen inside this. - # The porting logic will pass everything in every time. - class Outputs - def info(port_id, kind, msg); end - - def warn(port_id, kind, msg); end - - def error(port_id, kind, e_or_msg); end - - def to_customer(port_id, kind, tel, msg); end - end - - value_semantics do - redis Anything(), default: LazyObject.new { REDIS } - blather_notify Anything(), default: BlatherNotify - admin_server Anything(), default: CONFIG[:admin_server] - testing_tel Anything(), default: CONFIG[:testing_tel] - output Outputs, default: Outputs.new - end - - def find(port) - if_processing(port) do - case port.processing_status - when "FOC" - FOC.new(**to_h).find(port) - when "COMPLETE" - Complete.new(**to_h).find(port) - else - Wait.new(port, output: output) - end - end - end - - def if_processing(port) - redis.exists("jmp_port_freeze-#{port.order_id}").then do |v| - next Frozen.new(port, output: output) if v == 1 - - yield - end - end - - class FOC < self - # This is how long we'll wait for a port to move from FOC to COMPLETED - # It's in fractional days because DateTime - GRACE_PERIOD = 15.0 / (24 * 60) - - def find(port) - Alert.for( - port, - grace_period: GRACE_PERIOD, - output: output, key: :late_foc, - msg: "⚠ Port is still in FOC state a while past FOC", - real_step: Wait.new(port, output: output) - ) - end - end - - class Complete < self - # If it's been 35 minutes and the number isn't reachable, a human - # should get involved - GRACE_PERIOD = 35.0 / (24 * 60) - - def find(port) - if_not_complete(port) do - exe = blather_notify.command_execution(admin_server, "customer info") - AdminCommand.new(exe: exe, **to_h).find(port).then do |step| - Alert.for( - port, - grace_period: GRACE_PERIOD, output: output, - key: :late_finish, msg: msg(port), real_step: step - ) - end - end - end - - def if_not_complete(port) - redis.exists("jmp_port_complete-#{port.order_id}").then do |v| - next Done.new(port, output: output) if v == 1 - - yield - end - end - - def msg(port) - "⚠ Port still hasn't finished. We'll keep trying unless you set redis "\ - "key `jmp_port_freeze-#{port.order_id}`" - end - - class AdminCommand < self - def initialize(exe:, **kwargs) - @exe = exe - super(**kwargs) - end - - def to_h - super.merge(exe: @exe) - end - - def find(port) - @exe.fetch_and_submit(q: port.customer_order_id).then do |form| - tel = Tel.new(port.billing_telephone_number) - if tel == Tel.new(form.tel) - GoodNumber.new(**to_h).find(port) - else - WrongNumber.new(right_number: tel, execution: @exe) - end - end - end - - class GoodNumber < self - def find(port) - Reachability.new(type: "voice", **to_h).test(port) do - Reachability.new(type: "sms", **to_h).test(port) do - FinishUp.new(port, redis: redis, output: output) - end - end - end - - class Reachability < self - def initialize(type:, **args) - @type = type - super(**args) - end - - def test(port) - execute_command(port).then do |response| - if response.count == "0" - RunTest.new( - type: @type, tel: port.billing_telephone_number, - **to_h.slice(:blather_notify, :testing_tel, :admin_server) - ) - else - yield - end - end - end - - class RunTest - # This is here for tests - attr_reader :type - - def initialize( - type:, tel:, blather_notify:, testing_tel:, admin_server: - ) - @type = type - @tel = tel - @blather_notify = blather_notify - @testing_tel = testing_tel - @admin_server = admin_server - end - - def perform_next_step - @blather_notify.command_execution(@admin_server, "reachability") - .fetch_and_submit( - tel: @tel, type: @type, reachability_tel: @testing_tel - ) - end - end - - protected - - def execute_command(port) - blather_notify.command_execution(admin_server, "reachability") - .fetch_and_submit(tel: port.billing_telephone_number, type: @type) - end - end - - class FinishUp - MESSAGE = "Hi! This is JMP support - your number has "\ - "successfully transferred in to JMP! All calls/messages "\ - "will now use your transferred-in number - your old JMP "\ - "number has been disabled. Let us know if you have any "\ - "questions and thanks for using JMP!" - - def initialize(port, redis:, output:) - @port = port - @redis = redis - @output = output - end - - def set_key - @redis.set( - "jmp_port_complete-#{@port.order_id}", - DateTime.now.iso8601, - "EX", - 60 * 60 * 24 * 2 ### 2 Days should be enough to not see it listed - ) - end - - def perform_next_step - set_key.then do - EMPromise.all([ - @output.info(@port.order_id, :complete, "Port Complete!"), - @output.to_customer( - @port.order_id, :complete, - Tel.new(@port.billing_telephone_number), MESSAGE - ) - ]) - end - end - end - end - - class WrongNumber - def initialize(right_number:, execution:) - @right_number = right_number - @exe = execution - end - - def perform_next_step - @exe.fetch_and_submit(action: "number_change").then do |_form| - @exe.fetch_and_submit(new_tel: @right_number, should_delete: "true") - end - end - end - end - end - - # This doesn't do anything and just waits for something to happen later - class Wait - def initialize(port, output:) - @port = port - @output = output - end - - def perform_next_step - @output.info(@port.order_id, :wait, "Waiting...") - end - end - - # This also doesn't do anything but is more content about it - class Done - def initialize(port, output:) - @port = port - @output = output - end - - def perform_next_step - @output.info(@port.order_id, :done, "Done.") - end - end - - # This also also doesn't do anything but is intentional - class Frozen - def initialize(port, output:) - @port = port - @output = output - end - - def perform_next_step - @output.info(@port.order_id, :frozen, "Frozen.") - end - end - - # This class sends and error to the human to check things out - class Alert - def self.for(port, grace_period:, real_step:, **args) - if (DateTime.now - port.actual_foc_date) > grace_period - new(port, real_step: real_step, **args) - else - real_step - end - end - - # For tests - attr_reader :key - attr_reader :real_step - - def initialize(port, real_step:, output:, msg:, key:) - @port = port - @real_step = real_step - @output = output - @msg = msg - @key = key - end - - def perform_next_step - @output.warn(@port.order_id, @key, @msg).then { - @real_step.perform_next_step - } - end - end -end diff --git a/lib/porting_step_repo.rb b/lib/porting_step_repo.rb new file mode 100644 index 0000000000000000000000000000000000000000..508cf608856eddbcf464e497065c13cd828f23b0 --- /dev/null +++ b/lib/porting_step_repo.rb @@ -0,0 +1,511 @@ +# frozen_string_literal: true + +require "date" +require "value_semantics/monkey_patched" +require "em_promise" + +require_relative "blather_notify" +require_relative "utils" + +class PortingStepRepo + class Tel + def initialize(str) + @tel = if str.is_a? Tel + str.to_s + else + "+1#{str.sub(/^\+?1?/, '')}" + end + end + + def to_s + @tel + end + + def ==(other) + to_s == other.to_s + end + end + + # Any thing that debounces messages must happen inside this. + # The porting logic will pass everything in every time. + class Outputs + def info(port_id, kind, msg); end + + def warn(port_id, kind, msg); end + + def error(port_id, kind, e_or_msg); end + + def to_customer(port_id, kind, tel, msg); end + end + + value_semantics do + redis Anything(), default: LazyObject.new { REDIS } + blather_notify Anything(), default: BlatherNotify + admin_server Anything(), default: CONFIG[:admin_server] + testing_tel Anything(), default: CONFIG[:testing_tel] + output Outputs, default: Outputs.new + end + + def find(port) + if_processing(port) do + case port.processing_status + when "FOC" + FOC.new(**to_h).find(port) + when "COMPLETE" + Complete.new(**to_h).find(port) + else + Wait.new(port, output: output) + end + end + end + + def if_processing(port) + redis.exists("jmp_port_freeze-#{port.id}").then do |v| + next Frozen.new(port, output: output) if v == 1 + + yield + end + end + + class FOC < self + # This is how long we'll wait for a port to move from FOC to COMPLETED + # It's in fractional days because DateTime + GRACE_PERIOD = 15.0 / (24 * 60) + + def find(port) + Alert.for( + port, + grace_period: GRACE_PERIOD, + output: output, key: :late_foc, + msg: "⚠ Port is still in FOC state a while past FOC", + real_step: Wait.new(port, output: output) + ) + end + end + + class Complete < self + # If it's been 35 minutes and the number isn't reachable, a human + # should get involved + GRACE_PERIOD = 35.0 / (24 * 60) + + def find(port) + if_not_complete(port) do + exe = blather_notify.command_execution(admin_server, "customer info") + AdminCommand.new(exe: exe, **to_h).find(port).then do |step| + Alert.for( + port, + grace_period: GRACE_PERIOD, output: output, + key: :late_finish, msg: msg(port), real_step: step + ) + end + end + end + + def if_not_complete(port) + redis.exists("jmp_port_complete-#{port.id}").then do |v| + next Done.new(port, output: output) if v == 1 + + yield + end + end + + def msg(port) + "⚠ Port still hasn't finished. We'll keep trying unless you set redis " \ + "key `jmp_port_freeze-#{port.id}`" + end + + class NoCustomer < self + attr_reader :port + + NO_GRACE_PERIOD = 0 + + def initialize(port:, **kwargs) + @port = port + super(**kwargs) + end + + def self.for(port:, **kwargs) + Alert.for( + port, + grace_period: NO_GRACE_PERIOD, + real_step: new(port: port, **kwargs), + output: kwargs[:output], + msg: msg(port), + key: :port_for_unknown_customer + ) + end + + def self.msg(port) + "⚠ Freezing port #{port.id} for unknown customer: #{port.customer_id}." + end + + def perform_next_step + redis.set("jmp_port_freeze-#{port.id}", 1) + end + end + + class AdminCommand < self + def initialize(exe:, **kwargs) + @exe = exe + super(**kwargs) + end + + def to_h + super.merge(exe: @exe) + end + + def find(port) + @exe.fetch_and_submit(q: port.customer_id).then do |form| + next NoCustomer.for(port: port, **to_h.except(:exe)) unless form.tel + + tel = Tel.new(port.tel) + good = GoodNumber.new(**to_h).find(port) + wrong = WrongNumber.new( + right_number: tel, execution: @exe, new_backend: port.backend_sgx + ) + tel == Tel.new(form.tel) ? good : wrong + end + end + + class GoodNumber < self + def find(port) + Reachability.new(type: "voice", **to_h).test(port) do + Reachability.new(type: "sms", **to_h).test(port) do + FinishUp.new(port, redis: redis, output: output) + end + end + rescue StandardError => e + ReachabilityFailure.for(error: e, port: port, **to_h) + end + + class ReachabilityFailure < self + attr_reader :error + + NO_GRACE_PERIOD = 0 + + # @param error [StandardError] + def self.for(port:, error:, **kwargs) + Alert.for( + port, + grace_period: NO_GRACE_PERIOD, + real_step: nil, + output: kwargs[:output], + msg: msg(port, error), + key: :reachability_failure + ) + end + + def self.msg(port, error) + "⚠ Error checking #{port.id} reachability: #{error}" + end + end + + class Reachability < self + def initialize(type:, **args) + @type = type + super(**args) + end + + def test(port) + execute_command(port).then { |response| + next yield unless response.count == "0" + + args = to_h.slice( + :blather_notify, :testing_tel, :admin_server, :output + ) + RunTest.new(type: @type, tel: port.tel, port: port, **args) + }.catch do |e| + ReachabilityFailure.for(port: port, error: e, output: output) + end + end + + class RunTest + # This is here for tests + attr_reader :type + + def initialize( + type:, tel:, port:, output:, + blather_notify:, testing_tel:, admin_server: + ) + @type = type + @tel = tel + @port = port + @output = output + @blather_notify = blather_notify + @testing_tel = testing_tel + @admin_server = admin_server + end + + def perform_next_step + @blather_notify.command_execution(@admin_server, "reachability") + .fetch_and_submit( + tel: @tel, type: @type, reachability_tel: @testing_tel + ).catch do |e| + ReachabilityFailure.for( + port: @port, error: e, output: @output + ).perform_next_step + end + end + end + + protected + + def execute_command(port) + blather_notify.command_execution(admin_server, "reachability") + .fetch_and_submit(tel: port.tel, type: @type) + end + end + + class FinishUp + MESSAGE = "Hi! This is JMP support - your number has " \ + "successfully transferred in to JMP! All calls/messages " \ + "will now use your transferred-in number - your old JMP " \ + "number has been disabled. Let us know if you have any " \ + "questions and thanks for using JMP!" + + def initialize(port, redis:, output:) + @port = port + @redis = redis + @output = output + end + + def set_key + @redis.set( + "jmp_port_complete-#{@port.id}", + DateTime.now.iso8601, + "EX", + 60 * 60 * 24 * 2 ### 2 Days should be enough to not see it listed + ) + end + + def perform_next_step + set_key.then do + EMPromise.all([ + @output.info(@port.id, :complete, "Port Complete!"), + @output.to_customer( + @port.id, :complete, + Tel.new(@port.tel), MESSAGE + ) + ]) + end + end + end + end + + class WrongNumber + attr_reader :new_backend + + def initialize( + right_number:, + execution:, + new_backend: + ) + @new_backend = new_backend + @right_number = right_number + @exe = execution + end + + def perform_next_step + @exe.fetch_and_submit(action: "number_change").then do |_form| + @exe.fetch_and_submit( + new_backend: @new_backend, + new_tel: @right_number, + should_delete: "true" + ) + end + end + end + end + end + + # This doesn't do anything and just waits for something to happen later + class Wait + def initialize(port, output:) + @port = port + @output = output + end + + def perform_next_step + @output.info(@port.id, :wait, "Waiting...") + end + end + + # This also doesn't do anything but is more content about it + class Done + def initialize(port, output:) + @port = port + @output = output + end + + def perform_next_step + @output.info(@port.id, :done, "Done.") + end + end + + # This also also doesn't do anything but is intentional + class Frozen + def initialize(port, output:) + @port = port + @output = output + end + + def perform_next_step + @output.info(@port.id, :frozen, "Frozen.") + end + end + + # This class sends and error to the human to check things out + class Alert + # @param [PortingStepRepo, NilClass] real_step If `nil`, just notify + def self.for(port, grace_period:, real_step:, **args) + if (DateTime.now - port.actual_foc_date.to_datetime) > grace_period + new(port, real_step: real_step, **args) + else + real_step + end + end + + # For tests + attr_reader :key + attr_reader :real_step + + def initialize(port, real_step:, output:, msg:, key:) + @port = port + @real_step = real_step + @output = output + @msg = msg + @key = key + end + + def perform_next_step + @output.warn(@port.id, @key, @msg).then { + @real_step&.perform_next_step + } + end + end +end + +class FullManual < PortingStepRepo::Outputs + def info(id, key, msg) + puts "[#{id}] INFO(#{key}): #{msg}" + end + + def warn(id, key, msg) + puts "[#{id}] WARN(#{key}): #{msg}" + end + + def error(id, key, e_or_msg) + puts "[#{id}] ERRR(#{key}): #{e_or_msg}" + return unless e_or_msg.respond_to?(:backtrace) + + e_or_msg.backtrace.each do |b| + puts "[#{id}] ERRR(#{key}): #{b}" + end + end + + def to_customer(id, key, tel, msg) + puts "[#{id}] CUST(#{key}, #{tel}): #{msg}" + end +end + +class ObservedAuto < FullManual + def initialize(endpoint, source_number) + @endpoint = endpoint + @src = source_number + end + + def to_customer(id, key, tel, msg) + ExpiringLock.new(lock_key(id, key)).with do + EM::HttpRequest + .new(@endpoint) + .apost( + head: { "Content-Type" => "application/json" }, + body: format_msg(tel, msg) + ) + end + end + +protected + + def lock_key(id, key) + "jmp_port_customer_msg_#{key}-#{id}" + end + + def format_msg(tel, msg) + [{ + time: DateTime.now.iso8601, + type: "message-received", + to: tel, + description: "Incoming message received", + message: actual_message(tel, msg) + }].to_json + end + + def actual_message(tel, msg) + { + id: SecureRandom.uuid, + owner: tel, + applicationId: SecureRandom.uuid, + time: DateTime.now.iso8601, + segmentCount: 1, + direction: "in", + to: [tel], from: @src, + text: msg + } + end +end + +class FullAutomatic < ObservedAuto + using FormToH + + def initialize(pubsub_addr, endpoint, source_number) + @pubsub = BlatherNotify.pubsub(pubsub_addr) + + Sentry.init do |config| + config.background_worker_threads = 0 + end + + super(endpoint, source_number) + end + + # No one's watch; swallow informational messages + def info(*); end + + def warn(id, key, msg) + ExpiringLock.new(warn_lock_key(id, key), expiry: 60 * 15).with do + entrykey = "#{id}:#{key}" + @pubsub.publish("#{entrykey}": error_entry("Port Warning", msg, entrykey)) + end + end + + def error(id, key, e_or_msg) + Sentry.with_scope do |scope| + scope.set_context("port", { id: id, action: key }) + + if e_or_msg.is_a?(::Exception) + Sentry.capture_exception(e_or_msg) + else + Sentry.capture_message(e_or_msg.to_s) + end + end + end + +protected + + def error_entry(title, text, id) + Nokogiri::XML::Builder.new { |xml| + xml.entry(xmlns: "http://www.w3.org/2005/Atom") do + xml.updated DateTime.now.iso8601 + xml.id id + xml.title title + xml.content text.to_s, type: "text" + xml.author { xml.name "porting" } + xml.generator "porting", version: "1.0" + end + }.doc.root + end + + def warn_lock_key(id, key) + "jmp_port_warn_msg_#{key}-#{id}" + end +end diff --git a/lib/utils.rb b/lib/utils.rb index 68868415d97584af51784a619a495a82384289f9..4df154d6ae30ab2dab22142a82bac8de993ff354 100644 --- a/lib/utils.rb +++ b/lib/utils.rb @@ -15,3 +15,11 @@ class NilClass nil end end + +class Hash + def except(*keys) + h = dup + keys.each { |key| h.delete(key) } + h + end +end diff --git a/lib/validators.rb b/lib/validators.rb new file mode 100644 index 0000000000000000000000000000000000000000..c65a5c6887ac1c19f20a64931a128fccab9a6e02 --- /dev/null +++ b/lib/validators.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +module ValidBackendSgx + def self.===(value) + ([CONFIG[:sgx]] + CONFIG[:sgx_creds].keys).include?(value) + end +end + +module ValidTelString + def self.===(value) + value.is_a?(String) && value =~ /\A\+1(\d{3})(\d{3})(\d+)\Z/ + end +end + +module PortProcessingStatus + def self.===(value) + value.is_a?(String) && + ["complete", "submitted", "foc"].include?(value.downcase) + end +end diff --git a/schemas b/schemas index eb6d23cc183e8cebc2c61b0498ddc0593f24fb86..d1c2a6e6fe23408f6f38513f4040518d95fe565c 160000 --- a/schemas +++ b/schemas @@ -1 +1 @@ -Subproject commit eb6d23cc183e8cebc2c61b0498ddc0593f24fb86 +Subproject commit d1c2a6e6fe23408f6f38513f4040518d95fe565c diff --git a/test/test_helper.rb b/test/test_helper.rb index e65696a9cceebbd16ee934876700fbca64730b5b..3539f9ba0fcdf798dd1da593a5d294e67922fb07 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -502,3 +502,32 @@ module Minitest end end end + +require "lazy_object" +require "porting_step_repo" + +class MockOutputs < PortingStepRepo::Outputs + def initialize(mock) + @mock = mock + end + + def info(id, key, msg) + @mock.info(id, key, msg) + end + + def warn(id, key, msg) + @mock.warn(id, key, msg) + end + + def error(id, key, e_or_msg) + @mock.error(id, key, e_or_msg) + end + + def to_customer(id, key, tel, msg) + @mock.to_customer(id, key, tel, msg) + end + + def verify + @mock.verify + end +end diff --git a/test/test_port_repo.rb b/test/test_port_repo.rb new file mode 100644 index 0000000000000000000000000000000000000000..d4cbd6ed888dcf8fd4eb34126c629e523061c5e5 --- /dev/null +++ b/test/test_port_repo.rb @@ -0,0 +1,106 @@ +# frozen_string_literal: true + +require "date" +require "test_helper" +require "port_repo" +require "db_port" + +class FakeDbForPorts < FakeDB + def initialize(rows=[]) + super({}) + @rows = rows + end + + def exec_defer(_, _) + EMPromise.resolve(@rows) + end +end + +class PortRepoDbTest < Minitest::Test + def setup + @output = FullManual.new + end + + def test_list_returns_db_ports + rows = [ + { + "id" => "port1", + "processing_status" => "complete", + "actual_foc_date" => DateTime.now, + "updated_at" => DateTime.now, + "tel" => "+12225551234", + "customer_id" => "cust1", + "backend_sgx" => "sgx" + } + ] + db = FakeDbForPorts.new(rows) + repo = PortRepo::Db.new(@output, db: db) + + ports = repo.list.sync + + assert_equal 1, ports.length + assert_kind_of DbPort, ports[0] + assert_equal "port1", ports[0].id + assert_equal "complete", ports[0].processing_status + end + em :test_list_returns_db_ports + + def test_list_empty_when_no_rows + db = FakeDbForPorts.new([]) + repo = PortRepo::Db.new(@output, db: db) + + ports = repo.list.sync + + assert_equal 0, ports.length + end + em :test_list_empty_when_no_rows +end + +class PortRepoBandwidthTest < Minitest::Test + def setup + @output = FullManual.new + end + + def test_list_returns_empty_when_no_ports + stub_request( + :get, + /https:\/\/dashboard\.bandwidth\.com\/v1\.0\/accounts\/.*\/portins/ + ).to_return(status: 204, body: "") + + repo = PortRepo::Bandwidth.new(@output) + ports = repo.list + + assert_equal 0, ports.length + end + + def test_list_wraps_ports_in_port_with_backend + stub_request( + :get, + /https:\/\/dashboard\.bandwidth\.com\/v1\.0\/accounts\/.*\/portins/ + ).to_return(status: 200, body: <<~XML) + + 1 + + + abc123 + cust456 + 5551234567 + 2025-01-13T12:00:00.000Z + 2025-01-12T12:00:00.000Z + COMPLETE + + + XML + + repo = PortRepo::Bandwidth.new(@output) + ports = repo.list + + assert_equal 1, ports.length + port = ports[0] + assert_kind_of PortRepo::Bandwidth::PortWithBackend, port + assert_equal "abc123", port.id + assert_equal "cust456", port.customer_id + assert_equal "5551234567", port.tel + assert_equal CONFIG[:sgx], port.backend_sgx + end +end diff --git a/test/test_porting_step.rb b/test/test_porting_step.rb index f172fe687f9f3d60c204c2a56663cb27d5bcac7c..8972a124b0bb3ec0acdb1a85a408e012b84ae735 100644 --- a/test/test_porting_step.rb +++ b/test/test_porting_step.rb @@ -7,7 +7,8 @@ require "test_helper" require "customer_info" require "form_template" require "form_to_h" -require "porting_step" +require "porting_step_repo" +require "port_repo" MINS = 1.0 / (24 * 60) @@ -38,7 +39,7 @@ class BlatherNotifyMock < Minitest::Mock args.each_slice(2) do |(submission, result)| expect( :fetch_and_submit, - EMPromise.resolve(to_response(result)), + to_promise(result), **submission ) end @@ -46,6 +47,14 @@ class BlatherNotifyMock < Minitest::Mock using FormToH + def to_promise(result) + if result.is_a?(Exception) + EMPromise.reject(result) + else + EMPromise.resolve(to_response(result)) + end + end + def to_response(form) OpenStruct.new(form.to_h) end @@ -87,12 +96,13 @@ end class PortingStepTest < Minitest::Test Port = Struct.new( - :order_id, + :id, :processing_status, :actual_foc_date, - :last_modified_date, - :customer_order_id, - :billing_telephone_number + :updated_at, + :customer_id, + :tel, + :backend_sgx ) def test_ignore_submitted_ports @@ -105,7 +115,8 @@ class PortingStepTest < Minitest::Test nil, DateTime.now - 1 * MINS, "ignored", - "9998887777" + "9998887777", + Blather::JID.new("testroute") )).sync assert_kind_of PortingStepRepo::Wait, step @@ -122,7 +133,8 @@ class PortingStepTest < Minitest::Test DateTime.now - 5 * MINS, DateTime.now - 1 * MINS, "ignored", - "9998887777" + "9998887777", + Blather::JID.new("testroute") )).sync assert_kind_of PortingStepRepo::Wait, step @@ -139,7 +151,8 @@ class PortingStepTest < Minitest::Test DateTime.now - 25 * MINS, DateTime.now - 1 * MINS, "ignored", - "9998887777" + "9998887777", + Blather::JID.new("testroute") )).sync assert_kind_of PortingStepRepo::Alert, step @@ -159,7 +172,8 @@ class PortingStepTest < Minitest::Test DateTime.now - 25 * MINS, DateTime.now - 1 * MINS, "completed", - "9998887777" + "9998887777", + Blather::JID.new("testroute") )).sync assert_kind_of PortingStepRepo::Done, step @@ -188,15 +202,50 @@ class PortingStepTest < Minitest::Test DateTime.now - 25 * MINS, DateTime.now - 1 * MINS, "starting", - "9998887777" + "9998887777", + Blather::JID.new("testroute") )).sync assert_kind_of PortingStepRepo::Complete::AdminCommand::WrongNumber, step + assert_equal Blather::JID.new("testroute"), step.new_backend assert_mock redis assert_mock notify end em :test_change_number + def test_unknown_customer + redis = Minitest::Mock.new + redis.expect(:exists, EMPromise.resolve(0), ["jmp_port_freeze-01"]) + redis.expect(:exists, "0", ["jmp_port_complete-01"]) + + notify = BlatherNotifyMock.new + notify.expect_execution( + "sgx", "customer info", + { q: "unknown_customer" }, admin_info("unknown_customer", nil).form + ) + + step = PortingStepRepo.new( + redis: redis, + blather_notify: notify, + admin_server: "sgx" + ).find(Port.new( + "01", + "COMPLETE", + DateTime.now - 25 * MINS, + DateTime.now - 1 * MINS, + "unknown_customer", + "9998887777", + Blather::JID.new("testroute") + )).sync + + assert_kind_of PortingStepRepo::Alert, step + assert_equal :port_for_unknown_customer, step.key + assert_kind_of PortingStepRepo::Complete::NoCustomer, step.real_step + assert_mock redis + assert_mock notify + end + em :test_unknown_customer + def test_first_reachability redis = Minitest::Mock.new redis.expect(:exists, EMPromise.resolve(0), ["jmp_port_freeze-01"]) @@ -224,7 +273,8 @@ class PortingStepTest < Minitest::Test DateTime.now - 25 * MINS, DateTime.now - 1 * MINS, "starting", - "9998887777" + "9998887777", + Blather::JID.new("testroute") )).sync assert_kind_of( @@ -271,7 +321,8 @@ class PortingStepTest < Minitest::Test DateTime.now - 25 * MINS, DateTime.now - 1 * MINS, "starting", - "9998887777" + "9998887777", + Blather::JID.new("testroute") )).sync assert_kind_of( @@ -318,7 +369,8 @@ class PortingStepTest < Minitest::Test DateTime.now - 25 * MINS, DateTime.now - 1 * MINS, "starting", - "9998887777" + "9998887777", + Blather::JID.new("testroute") )).sync assert_kind_of( @@ -357,7 +409,8 @@ class PortingStepTest < Minitest::Test DateTime.now - 55 * MINS, DateTime.now - 50 * MINS, "starting", - "9998887777" + "9998887777", + Blather::JID.new("testroute") )).sync assert_kind_of PortingStepRepo::Alert, step @@ -381,7 +434,8 @@ class PortingStepTest < Minitest::Test nil, DateTime.now - 1 * MINS, "ignored", - "9998887777" + "9998887777", + Blather::JID.new("testroute") ), Port.new( "01", @@ -389,7 +443,8 @@ class PortingStepTest < Minitest::Test DateTime.now - 300 * MINS, DateTime.now - 300 * MINS, "ignored", - "9998887777" + "9998887777", + Blather::JID.new("testroute") ), Port.new( "01", @@ -397,7 +452,8 @@ class PortingStepTest < Minitest::Test DateTime.now - 10 * MINS, DateTime.now - 10 * MINS, "ignored", - "9998887777" + "9998887777", + Blather::JID.new("testroute") ), Port.new( "01", @@ -405,7 +461,8 @@ class PortingStepTest < Minitest::Test DateTime.now - 300 * MINS, DateTime.now - 300 * MINS, "ignored", - "9998887777" + "9998887777", + Blather::JID.new("testroute") ) ].each do |port| redis = Minitest::Mock.new @@ -416,4 +473,110 @@ class PortingStepTest < Minitest::Test end end em :test_ignore_frozen_ports + + def test_reachability_error_during_find + redis = Minitest::Mock.new + redis.expect(:exists, EMPromise.resolve(0), ["jmp_port_freeze-01"]) + redis.expect(:exists, "0", ["jmp_port_complete-01"]) + + notify = BlatherNotifyMock.new + notify.expect_execution( + "sgx", "customer info", + { q: "starting" }, admin_info("starting", "+19998887777").form + ) + + notify.expect_execution( + "sgx", "reachability", + { tel: "9998887777", type: "voice" }, + RuntimeError.new("Sender not in whitelist") + ) + + step = PortingStepRepo.new( + redis: redis, + blather_notify: notify, + admin_server: "sgx" + ).find(Port.new( + "01", + "COMPLETE", + DateTime.now - 25 * MINS, + DateTime.now - 1 * MINS, + "starting", + "9998887777", + Blather::JID.new("testroute") + )).sync + + assert_kind_of PortingStepRepo::Alert, step + assert_equal :reachability_failure, step.key + assert_nil step.real_step + assert_mock redis + assert_mock notify + end + em :test_reachability_error_during_find + + def test_reachability_error_during_run_test + redis = Minitest::Mock.new + redis.expect(:exists, EMPromise.resolve(0), ["jmp_port_freeze-01"]) + redis.expect(:exists, "0", ["jmp_port_complete-01"]) + + notify = BlatherNotifyMock.new + notify.expect_execution( + "sgx", "customer info", + { q: "starting" }, admin_info("starting", "+19998887777").form + ) + + notify.expect_execution( + "sgx", "reachability", + { tel: "9998887777", type: "voice" }, + FormTemplate.render("reachability_result", count: 0) + ) + + notify.expect_execution( + "sgx", "reachability", + { tel: "9998887777", type: "voice", reachability_tel: "+15551234567" }, + RuntimeError.new("Sender not in whitelist") + ) + + output_mock = Minitest::Mock.new + output_mock.expect( + :warn, + EMPromise.resolve(nil), + [ + "01", + :reachability_failure, + Matching.new { |m| m =~ /Error checking.*reachability/ } + ] + ) + output = MockOutputs.new(output_mock) + + port = Port.new( + "01", + "COMPLETE", + DateTime.now - 25 * MINS, + DateTime.now - 1 * MINS, + "starting", + "9998887777", + Blather::JID.new("testroute") + ) + + step = PortingStepRepo.new( + redis: redis, + blather_notify: notify, + admin_server: "sgx", + output: output, + testing_tel: "+15551234567" + ).find(port).sync + + assert_kind_of( + PortingStepRepo::Complete::AdminCommand::GoodNumber:: + Reachability::RunTest, + step + ) + + step.perform_next_step.sync + + assert_mock redis + assert_mock notify + assert_mock output_mock + end + em :test_reachability_error_during_run_test end