report chargeprocessor idle state in log; join chargeprocessor queue in main loop
smbaker [Tue, 13 Nov 2012 23:04:33 +0000 (15:04 -0800)]
apps/gacks/gackscollector.py

index 168fe64..cc5d7aa 100644 (file)
@@ -237,7 +237,8 @@ class ChargeProcessor(threading.Thread):
             if num_charges == 0 or processed_charges > 0:
                 self.charge_file_manager.log_processed_file(host, charge_file)
 
-
+    def join(self):
+        self.host_queue.join()
 
     def run(self):
         self.log.writeTS("ChargeProcessor: starting " )
@@ -253,11 +254,16 @@ class ChargeProcessor(threading.Thread):
         while True:
             try:
                 if self.host_queue.empty():
-                    time.sleep(delay)
-                else:
-                    host = self.host_queue.get()
-                    self.process_charges(host)
-                    self.host_queue.task_done()
+                    self.log.writeTS("ChargeProcessor: Queue is empty")
+                    while self.host_queue.empty():
+                        time.sleep(delay)
+                    self.log.writeTS("ChargeProcessor: Queue is non-empty")
+
+                # invariant: host_queue is not empty
+
+                host = self.host_queue.get()
+                self.process_charges(host)
+                self.host_queue.task_done()
             except:
                 self.log.writeTS(traceback.format_exc())
 
@@ -390,10 +396,16 @@ class GacksCollector(Daemon):
                 self.populate_hosts_queue()
 
                 tStart = time.time()
+                # wait for the collectors to finish
                 self.queue.join()
+
+                self.log.writeTS("GacksCollector: mainloop - waiting on processor" )
+
+                # wait for the charge processors to finish
+                self.charge_procesor.join()
+
                 tStop = time.time()
                 tElapsed = int(tStop - tStart)
-
                 self.log.writeTS("GacksCollector: mainloop - queue processing took %d seconds" % tElapsed)
 
                 delay = run_interval - tElapsed
@@ -423,5 +435,5 @@ if __name__ == '__main__':
         sys.exit(0)
     else:
         print "usage: %s start|stop|restart" % sys.argv[0]
-        sys.exit(2)    
-        
+        sys.exit(2)
+