r2s-bwmsgsv2.rb

  1#!/usr/bin/env ruby
  2# frozen_string_literal: true
  3
  4# Copyright (C) 2020  Denver Gingerich <denver@ossguy.com>
  5#
  6# This file is part of sgx-bwmsgsv2.
  7#
  8# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
  9# the terms of the GNU Affero General Public License as published by the Free
 10# Software Foundation, either version 3 of the License, or (at your option) any
 11# later version.
 12#
 13# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
 14# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 15# FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more
 16# details.
 17#
 18# You should have received a copy of the GNU Affero General Public License along
 19# with sgx-bwmsgsv2.  If not, see <http://www.gnu.org/licenses/>.
 20
 21require 'redis'
 22require 'hiredis'
 23require 'net/http'
 24require 'time'
 25
 26$stdout.sync = true
 27
 28puts "Redis queue to SGX HTTP request translator - for Bandwidth API V2 SGX\n"\
 29	"==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
 30
 31if ARGV.size != 3
 32	# note that <redis_queue_suffix> should match h2r-bwmsgsv2's $queue_name
 33	puts "Usage: r2s-bwmsgsv2.rb <redis_queue_suffix> <sgx_hostname> "\
 34		"<sgx_https_port>"
 35	exit 0
 36end
 37
 38["INT", "TERM"].each do |sig|
 39	trap(sig) do
 40		puts "Translator has terminated due to SIG#{sig}."
 41		exit 0
 42	end
 43end
 44
 45t = Time.now
 46puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
 47
 48redis = Redis.new(driver: :hiredis)
 49
 50pending_len = redis.llen('pending_messages-' + ARGV[0])
 51if pending_len != 0
 52	puts 'Translator terminated due to non-empty pending queue of size ' +
 53		pending_len.to_s
 54	exit 1
 55end
 56
 57loop do
 58	timestamps_plus_json_blob = redis.brpoplpush('incoming_messages-' +
 59		ARGV[0], 'pending_messages-' + ARGV[0])
 60
 61	t = Time.now
 62	ts = "%d.%09d" % [t.to_i, t.nsec]
 63	tai_timestamp = `./tai`.strip
 64	tai_yyyymmdd = Time.at(tai_timestamp.to_i).strftime('%Y%m%d')
 65	puts "LOG %s, %s: handling message sent on %s ...\n" %
 66		[ts, tai_timestamp, tai_yyyymmdd]
 67
 68	# WARNING: since not atomic with setex() below, translator is singleton
 69	day_msg_count = redis.incr(
 70		"archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-total"
 71	)
 72	# TODO: confirm day_msg_count is integer > 0 (otherwise likely an error)
 73	#  unsure how to best error here: bail would cause infinite bounce; eml?
 74
 75	t = Time.now
 76	puts "LOG %d.%09d: total msgs for %s-%s now at %s\n" %
 77		[t.to_i, t.nsec, tai_yyyymmdd, ARGV[0], day_msg_count]
 78
 79	# TODO: print less stuff in here
 80	puts "TODO - got some stuff: " + timestamps_plus_json_blob
 81	puts "TODO - daymsgcount: '#{day_msg_count}'"
 82
 83	t2 = Time.now
 84	ts2 = "%d.%09d" % [t2.to_i, t2.nsec]
 85	tai_timestamp2 = `./tai`.strip
 86
 87	new_json_blob =
 88		'"ts_040_tai-translator_pre_send":' + tai_timestamp2 +
 89		',"ts_040_unix-translator_pre_send":' + ts2 +
 90		',"ts_030_tai-translator_received":' + tai_timestamp +
 91		',"ts_030_unix-translator_received":' + ts +
 92		',' + timestamps_plus_json_blob
 93
 94	uri = URI("http://#{ARGV[1]}:#{ARGV[2]}/")
 95	req = Net::HTTP::Post.new(uri, 'Content-Type' => 'application/json')
 96	req.body = new_json_blob.split('G', 2)[1][2..-1] # only use MSG part
 97	begin
 98		res = Net::HTTP.start(uri.hostname, uri.port) do |http|
 99			http.request(req)
100		end
101	# list of exceptions is from https://stackoverflow.com/a/5370726
102	rescue Timeout::Error, Errno::EINVAL, Errno::ECONNRESET, EOFError,
103	       Net::HTTPBadResponse, Net::HTTPHeaderSyntaxError,
104	       Net::ProtocolError => e
105
106		puts "TODO problem '#{e}' - now we should try again..."
107		# TODO: actually try again
108	else
109		puts "TODO - res: '#{res.code}' '#{res.message}': '#{res.body}'"
110	end
111
112	# TODO: add following functionality:
113	# if SGX gives back anything but response.code 200, retry 5 times-ish
114	# if SGX failed to respond after retries, email or other notify
115	#  and back off with even longer retries (can't do anything else really)
116
117	exists = redis.exists(
118		"archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-#{day_msg_count}"
119	)
120	if exists
121		puts 'Translator terminated since archive message at index ' +
122			day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
123			" unexpectedly exists already, with retval '#{exists}'"
124		exit 2
125	end
126
127	# WARNING: since not atomic with incr() above, translator is singleton
128	rv1 = redis.setex(
129		"archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-#{day_msg_count}",
130		259200, new_json_blob
131	)
132	if rv1 != 'OK'
133		puts 'Translator terminated since archive message at index ' +
134			day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
135			" could not be saved, with return value '#{rv1}'"
136		exit 3
137	end
138
139	pending_len = redis.llen('pending_messages-' + ARGV[0])
140	if pending_len != 1
141		puts 'Translator terminated since pending queue (at index ' +
142			day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
143			') has unexpected non-1 length of ' + pending_len.to_s
144		exit 4
145	end
146
147	pending_item = redis.rpop('pending_messages-' + ARGV[0])
148	if timestamps_plus_json_blob != pending_item
149		puts 'Translator terminated since archived item (at index ' +
150			day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
151			") does not match pending item '#{pending_item}'"
152		exit 5
153	end
154end