From ac9a8ebf22c2dd61449c2fe733182b09c35f5bea Mon Sep 17 00:00:00 2001 From: Denver Gingerich Date: Sun, 19 Apr 2020 23:20:20 +0000 Subject: [PATCH] exists?, update TODO, add warnings, manage pending We've taken care of a number of TODO items in this commit, including checking if the archived_message key exists before setting it, adding warnings about how this program should be treated as a singleton until some extra atomicity is added, adding a couple TODO items, and finally have the pending queue being managed correctly (checked at launch time to confirm emptiness, then popped and checked again after processing a message). With this change, all of the queue state is now managed correctly, so the translator can be run without needing to manually futz with the queues between each invocation to get them in the right state. However, there is still more work to do, as the translator does not yet communicate correctly with the SGX (due to some of the remaining TODO items, specifically related to how the HTTP request is sent). --- r2s-bwmsgsv2.rb | 50 +++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/r2s-bwmsgsv2.rb b/r2s-bwmsgsv2.rb index 4122153b3af6f696583ca38d5d82de58865a4a27..a88487e9e95b7d778e788b50be1fde5b0ed5265e 100755 --- a/r2s-bwmsgsv2.rb +++ b/r2s-bwmsgsv2.rb @@ -47,6 +47,13 @@ puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec] redis = Redis.new(:driver => :hiredis) +pending_len = redis.llen('pending_messages-' + ARGV[0]) +if pending_len != 0 + puts 'Translator terminated due to non-empty pending queue of size ' + + pending_len.to_s + exit 1 +end + while true timestamps_plus_json_blob = redis.brpoplpush('incoming_messages-' + ARGV[0], 'pending_messages-' + ARGV[0]) @@ -57,6 +64,7 @@ while true puts "LOG %d.%09d, %s: msg [TODO: ID] sent on %s - incrementing\n" % [t.to_i, t.nsec, tai_timestamp, tai_yyyymmdd] + # WARNING: since not atomic with setex() below, translator is singleton day_msg_count = redis.incr( "archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-total" ) @@ -65,13 +73,11 @@ while true puts "LOG %d.%09d: total msgs for %s-%s now at %s\n" % [t.to_i, t.nsec, tai_yyyymmdd, ARGV[0], day_msg_count] - # TODO: do something with day_msg_count - # TODO: print less stuff in here puts "TODO - got some stuff: " + timestamps_plus_json_blob puts "TODO - daymsgcount: '#{day_msg_count}'" - # add some timestamps to timestamps_plus_json_blob (our own) + # TODO: add some timestamps to timestamps_plus_json_blob (our own) # TODO: fix so sending timestamps_plus_json_blob to SGX, and via POST response = Net::HTTP.get_response( @@ -86,17 +92,41 @@ while true # if got HTTP 200, then: - # TODO: "archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-#{day_msg_count}"? - # if exists, then fail, and leave thing on pending queue (not atomic) + exists = redis.exists( + "archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-#{day_msg_count}" + ) + if exists + puts 'Translator terminated since archive message at index ' + + day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s + + " unexpectedly exists already, with retval '#{exists}'" + exit 2 + end + # WARNING: since not atomic with incr() above, translator is singleton rv1 = redis.setex( "archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-#{day_msg_count}", 259200, timestamps_plus_json_blob ) - puts "TODO - rv1: " + rv1 - # TODO: confirm rv1 == 'OK' + if rv1 != 'OK' + puts 'Translator terminated since archive message at index ' + + day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s + + " could not be saved, with return value '#{rv1}'" + exit 3 + end + + pending_len = redis.llen('pending_messages-' + ARGV[0]) + if pending_len != 1 + puts 'Translator terminated since pending queue (at index ' + + day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s + + ') has unexpected non-1 length of ' + pending_len.to_s + exit 4 + end - # confirm pending key has one element and e == timestamps_plus_json_blob - # if so, then rpop pending_messages-#{ARGV[0]} and discard - # if not, then email or other notify, and probably exit with failure + pending_item = redis.rpop('pending_messages-' + ARGV[0]) + if timestamps_plus_json_blob != pending_item + puts 'Translator terminated since archived item (at index ' + + day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s + + ") does not match pending item '#{pending_item}'" + exit 5 + end end