@@ -47,6 +47,13 @@ puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
redis = Redis.new(:driver => :hiredis)
+pending_len = redis.llen('pending_messages-' + ARGV[0])
+if pending_len != 0
+ puts 'Translator terminated due to non-empty pending queue of size ' +
+ pending_len.to_s
+ exit 1
+end
+
while true
timestamps_plus_json_blob = redis.brpoplpush('incoming_messages-' +
ARGV[0], 'pending_messages-' + ARGV[0])
@@ -57,6 +64,7 @@ while true
puts "LOG %d.%09d, %s: msg [TODO: ID] sent on %s - incrementing\n" %
[t.to_i, t.nsec, tai_timestamp, tai_yyyymmdd]
+ # WARNING: since not atomic with setex() below, translator is singleton
day_msg_count = redis.incr(
"archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-total"
)
@@ -65,13 +73,11 @@ while true
puts "LOG %d.%09d: total msgs for %s-%s now at %s\n" %
[t.to_i, t.nsec, tai_yyyymmdd, ARGV[0], day_msg_count]
- # TODO: do something with day_msg_count
-
# TODO: print less stuff in here
puts "TODO - got some stuff: " + timestamps_plus_json_blob
puts "TODO - daymsgcount: '#{day_msg_count}'"
- # add some timestamps to timestamps_plus_json_blob (our own)
+ # TODO: add some timestamps to timestamps_plus_json_blob (our own)
# TODO: fix so sending timestamps_plus_json_blob to SGX, and via POST
response = Net::HTTP.get_response(
@@ -86,17 +92,41 @@ while true
# if got HTTP 200, then:
- # TODO: "archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-#{day_msg_count}"?
- # if exists, then fail, and leave thing on pending queue (not atomic)
+ exists = redis.exists(
+ "archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-#{day_msg_count}"
+ )
+ if exists
+ puts 'Translator terminated since archive message at index ' +
+ day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
+ " unexpectedly exists already, with retval '#{exists}'"
+ exit 2
+ end
+ # WARNING: since not atomic with incr() above, translator is singleton
rv1 = redis.setex(
"archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-#{day_msg_count}",
259200, timestamps_plus_json_blob
)
- puts "TODO - rv1: " + rv1
- # TODO: confirm rv1 == 'OK'
+ if rv1 != 'OK'
+ puts 'Translator terminated since archive message at index ' +
+ day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
+ " could not be saved, with return value '#{rv1}'"
+ exit 3
+ end
+
+ pending_len = redis.llen('pending_messages-' + ARGV[0])
+ if pending_len != 1
+ puts 'Translator terminated since pending queue (at index ' +
+ day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
+ ') has unexpected non-1 length of ' + pending_len.to_s
+ exit 4
+ end
- # confirm pending key has one element and e == timestamps_plus_json_blob
- # if so, then rpop pending_messages-#{ARGV[0]} and discard
- # if not, then email or other notify, and probably exit with failure
+ pending_item = redis.rpop('pending_messages-' + ARGV[0])
+ if timestamps_plus_json_blob != pending_item
+ puts 'Translator terminated since archived item (at index ' +
+ day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
+ ") does not match pending item '#{pending_item}'"
+ exit 5
+ end
end