archive charge files
smbaker [Thu, 15 Nov 2012 07:13:18 +0000 (23:13 -0800)]
apps/gacks/gackscollector.py

index cc5d7aa..5b6d691 100644 (file)
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 import os
+import shutil
 import sys
 import time
 import threading
@@ -60,7 +61,8 @@ class ChargeFileManager:
         if not config:
             config = GacksConfig()
         self.config = config
-        self.incoming_dir = self.config.get('gacks_collector', 'incoming_directory')        
+        self.incoming_dir = self.config.get('gacks_collector', 'incoming_directory')
+        self.archive_dir = self.config.get('gacks_collector', 'archive_directory')
         logdir = self.config.get('gacks', 'logdir')            
         self.log = GacksLogger(directory = logdir, name='chargefilemanager.log')
 
@@ -86,7 +88,7 @@ class ChargeFileManager:
             else:
                 processed_files = []
                 if unprocessed == True:
-                    processed_files = self.get_processed_files(host) 
+                    processed_files = self.get_processed_files(host)
                 valid_name = lambda name: host not in name and name not in processed_files
                 charge_files = self.get_files(charges_dir, name_validator = valid_name)
         except:
@@ -123,7 +125,20 @@ class ChargeFileManager:
                     processed_files.append(parts[-1])
             f.close()
         return processed_files
-            
+
+    def archive_processed_file(self, host, filename):
+        archive_dir = os.sep.join([self.archive_dir, time.strftime("%Y-%m") , host])
+        if (not os.path.exists(archive_dir)):
+           os.makedirs(archive_dir)
+        try:
+           shutil.move(filename, archive_dir)
+        except:
+           try:
+               self.log.writeTS("ChargeFileManager: failed to move %s to %s. deleting" % (filename, archive_dir))
+               os.remove(filename)
+           except:
+               self.log.writeTS("ChargeFileManager: failed to delete %s" % filename)
+
     def log_processed_file(self, host, filename):
         processed_charges_log = GacksLogger(directory=self.incoming_dir, name='%s.processed' % host)
         processed_charges_log.writeTS("%s" % (os.path.basename(filename)))
@@ -200,6 +215,7 @@ class ChargeProcessor(threading.Thread):
                 self._process_charge(account.name, host, 1, charge["date"], float(resv_amount) * float(policy.cost) / 100.0, float(resv_amount))
 
     def process_charges(self, host):
+        self.log.writeTS("ChargeProcessor: processing host: %s" % host)
         charge_files = self.charge_file_manager.get_charge_files(host)
         for charge_file in charge_files:
             self.log.writeTS("ChargeProcessor: processing charge file: %s" % charge_file)
@@ -236,6 +252,14 @@ class ChargeProcessor(threading.Thread):
             # successfully procssed at least 1 charge from the file
             if num_charges == 0 or processed_charges > 0:
                 self.charge_file_manager.log_processed_file(host, charge_file)
+                self.charge_file_manager.archive_processed_file(host, charge_file)
+
+        # for cleaning up stuff that was left over before archiving was turned on
+        #charge_files = self.charge_file_manager.get_charge_files(host, unprocessed=False)
+        #for charge_file in charge_files:
+        #    if os.path.exists(charge_file):
+        #        self.log.writeTS("Archiving: %s" % charge_file)
+        #        self.charge_file_manager.archive_processed_file(host, charge_file)
 
     def join(self):
         self.host_queue.join()
@@ -284,7 +308,7 @@ class CollectWorker(threading.Thread):
         self.index = index
 
     def collect_logs(self, host, host_dir=None, dest_dir=None):
-        rsync = '/usr/bin/rsync -acvqlH --timeout 120 -e "ssh -i %s -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"' %\
+        rsync = '/usr/bin/rsync -acvqlH --timeout 120 --remove-source-files -e "ssh -i %s -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"' %\
                  (self.identity_key)
         locals()['rsync'] = rsync
         user = self.identity
@@ -402,7 +426,7 @@ class GacksCollector(Daemon):
                 self.log.writeTS("GacksCollector: mainloop - waiting on processor" )
 
                 # wait for the charge processors to finish
-                self.charge_procesor.join()
+                self.charge_processor.join()
 
                 tStop = time.time()
                 tElapsed = int(tStop - tStart)