r2s-bwmsgsv2.rb

  1#!/usr/bin/env ruby
  2#
  3# Copyright (C) 2020  Denver Gingerich <denver@ossguy.com>
  4#
  5# This file is part of sgx-bwmsgsv2.
  6#
  7# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
  8# the terms of the GNU Affero General Public License as published by the Free
  9# Software Foundation, either version 3 of the License, or (at your option) any
 10# later version.
 11#
 12# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
 13# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 14# FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more
 15# details.
 16#
 17# You should have received a copy of the GNU Affero General Public License along
 18# with sgx-bwmsgsv2.  If not, see <http://www.gnu.org/licenses/>.
 19
 20require 'redis'
 21require 'hiredis'
 22require 'net/http'
 23require 'time'
 24
 25$stdout.sync = true
 26
 27puts "Redis queue to SGX HTTP request translator - for Bandwidth API V2 SGX\n"\
 28	"==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
 29
 30if ARGV.size != 3
 31	# note that <redis_queue_suffix> should match h2r-bwmsgsv2's $queue_name
 32	puts "Usage: r2s-bwmsgsv2.rb <redis_queue_suffix> <sgx_hostname> "\
 33		"<sgx_https_port>"
 34	exit 0
 35end
 36
 37["INT", "TERM"].each do |sig|
 38	trap(sig) do
 39		puts "Translator has terminated due to SIG#{sig}."
 40		exit 0
 41	end
 42end
 43
 44t = Time.now
 45puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
 46
 47redis = Redis.new(:driver => :hiredis)
 48
 49pending_len = redis.llen('pending_messages-' + ARGV[0])
 50if pending_len != 0
 51	puts 'Translator terminated due to non-empty pending queue of size ' +
 52		pending_len.to_s
 53	exit 1
 54end
 55
 56while true
 57	timestamps_plus_json_blob = redis.brpoplpush('incoming_messages-' +
 58		ARGV[0], 'pending_messages-' + ARGV[0])
 59
 60	t = Time.now
 61	ts = "%d.%09d" % [t.to_i, t.nsec]
 62	tai_timestamp = `./tai`.strip
 63	tai_yyyymmdd = Time.at(tai_timestamp.to_i).strftime('%Y%m%d')
 64	puts "LOG %s, %s: handling message sent on %s ...\n" %
 65		[ts, tai_timestamp, tai_yyyymmdd]
 66
 67	# WARNING: since not atomic with setex() below, translator is singleton
 68	day_msg_count = redis.incr(
 69		"archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-total"
 70	)
 71	# TODO: confirm day_msg_count is integer > 0 (otherwise likely an error)
 72
 73	t = Time.now
 74	puts "LOG %d.%09d: total msgs for %s-%s now at %s\n" %
 75		[t.to_i, t.nsec, tai_yyyymmdd, ARGV[0], day_msg_count]
 76
 77	# TODO: print less stuff in here
 78	puts "TODO - got some stuff: " + timestamps_plus_json_blob
 79	puts "TODO - daymsgcount: '#{day_msg_count}'"
 80
 81	t2 = Time.now
 82	ts2 = "%d.%09d" % [t2.to_i, t2.nsec]
 83	tai_timestamp2 = `./tai`.strip
 84
 85	new_json_blob =
 86		'"ts_040_tai-translator_pre_send":' + tai_timestamp2 +
 87		',"ts_040_unix-translator_pre_send":' + ts2 +
 88		',"ts_030_tai-translator_received":' + tai_timestamp +
 89		',"ts_030_unix-translator_received":' + ts +
 90		',' + timestamps_plus_json_blob
 91
 92	# TODO: fix so sending new_json_blob to SGX, and via POST
 93	response = Net::HTTP.get_response(
 94		URI("https://#{ARGV[1]}:#{ARGV[2]}/r2tst.php")
 95	)
 96
 97	puts "TODO - response.code = #{response.code} and body #{response.body}"
 98
 99	# if SGX gives back anything but response.code 200, retry 5 times-ish
100	# if SGX failed to respond after retries, email or other notify
101	#  and back off with even longer retries (can't do anything else really)
102
103	# if got HTTP 200, then:
104
105	exists = redis.exists(
106		"archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-#{day_msg_count}"
107	)
108	if exists
109		puts 'Translator terminated since archive message at index ' +
110			day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
111			" unexpectedly exists already, with retval '#{exists}'"
112		exit 2
113	end
114
115	# WARNING: since not atomic with incr() above, translator is singleton
116	rv1 = redis.setex(
117		"archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-#{day_msg_count}",
118		259200, new_json_blob
119	)
120	if rv1 != 'OK'
121		puts 'Translator terminated since archive message at index ' +
122			day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
123			" could not be saved, with return value '#{rv1}'"
124		exit 3
125	end
126
127	pending_len = redis.llen('pending_messages-' + ARGV[0])
128	if pending_len != 1
129		puts 'Translator terminated since pending queue (at index ' +
130			day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
131			') has unexpected non-1 length of ' + pending_len.to_s
132		exit 4
133	end
134
135	pending_item = redis.rpop('pending_messages-' + ARGV[0])
136	if timestamps_plus_json_blob != pending_item
137		puts 'Translator terminated since archived item (at index ' +
138			day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
139			") does not match pending item '#{pending_item}'"
140		exit 5
141	end
142end