gackswatcher wip rewrite to use cpuacct
smbaker [Mon, 17 Dec 2012 23:53:34 +0000 (15:53 -0800)]
apps/gacks/gackswatcher.py [changed mode: 0644->0755]

old mode 100644 (file)
new mode 100755 (executable)
index 017ba3f..2dfe541
@@ -45,7 +45,26 @@ def get_cgroup_dir():
                 print "Giving up"
                 sys.exit(-1)
 
+def get_cpuacct_dir():
+    tries = 10
+    while True:
+        if os.path.exists("/dev/cgroup"):
+            return "/dev/cgroup"
+        elif os.path.exists("/sys/fs/cgroup/cpuacct/system/libvirtd.service/libvirt/lxc"):
+            return "/sys/fs/cgroup/cpuacct/system/libvirtd.service/libvirt/lxc"
+        else:
+            # cgroups seems to be missing right after boot on the LXC nodes.
+            print "I don't know where to find cpuacct!"
+            if (tries > 0):
+                print "Trying again in 30 seconds"
+                time.sleep(30)
+                tries = tries - 1
+            else:
+                print "Giving up"
+                sys.exit(-1)
+
 CGROUP_DIR = get_cgroup_dir()
+CPUACCT_DIR = get_cpuacct_dir()
 
 PROC_DIR="/proc"
 WATCHER_DIR="/var/gacks/watcher/incoming"
@@ -119,6 +138,33 @@ def mutex_unlock(lock):
          if os.path.isfile(filename):
             os.remove(filename)
 
+def get_core_list(fn):
+   # Read a cpuset.cpus file and return a list of the cores that are
+   # included.
+
+   data = open(fn,"r").readline()
+   units =[]
+   for part in data.split(","):
+        unitRange = part.split("-")
+        if len(unitRange) == 1:
+            unitRange = (unitRange[0], unitRange[0])
+        for i in range(int(unitRange[0]), int(unitRange[1])+1):
+            if not i in units:
+                units.append(i)
+
+   return units
+
+def get_core_count(unreserved_cores, this_cores):
+    # Count the number of cores that are listed in this_cores, but not in
+    # unreserved_cores.
+
+    count = 0
+    for core in this_cores:
+        if not core in unreserved_cores:
+            count += 1
+    return count
+
+
 class GacksWatcherException(Exception):
     def __init__(self, value):
         self.value = value
@@ -128,73 +174,57 @@ class GacksWatcherException(Exception):
 class GacksWatcherNoDefaultCgroup(GacksWatcherException):
     pass
 
-class Process:
-    def __init__(self, pid=None):
-        self.pid=pid
-        self.cmd=None
-        self.cgroups=[CGROUP_NOGROUP]
-        self.times=[]
-        self.touched=False
-
-    def update_stat(self, line, sampletime):
-        if not (")" in line):
-            return
-
-        (pidcmd, line) = line.split(")",1)
-        if not ("(" in pidcmd):
-            return
-
-        (junk, self.cmd) = pidcmd.split("(",1)
-
-        line=line.strip()
-        parts=line.split(" ")
-
-        utime = int(parts[11])
-        stime = int(parts[12])
-
-        self.update_times(utime, stime, sampletime)
-
-    def update_times(self, utime, stime, sampletime):
-        timerec = (utime, stime, sampletime)
-
-        self.times.insert(0, timerec)
-
-        # two samples is enough
-        if len(self.times)>2:
-            self.times.pop()
-
-    def update_cgroups(self, cgroup_names):
-        if not cgroup_names:
-            self.cgroups = [CGROUP_NOGROUP]
-        else:
-            self.cgroups = uniq_list(cgroup_names)
-
-    def delta_times(self):
-        if (len(self.times)<2):
-            return (0,0,0)
-
-        return (delta_time(self.times[0][0], self.times[1][0]),
-                delta_time(self.times[0][1], self.times[1][1]),
-                delta_time(self.times[0][2], self.times[1][2]))
-
 class Cgroup:
     def __init__(self, name):
         self.name = name
-        self.utime = 0
-        self.stime = 0
+        self.utime = 0        # elapsed
+        self.stime = 0        # elapsed
         self.etime = 0
+        self.last_utime = None
+        self.last_stime = None
         self.charge = 0         # charge based on stime+utime
         self.core_count = 0
         self.resv_charge = 0    # charge based on whether a reservation occurs (0=no resv, 1=reserved this whole time period)
         self.core_charge = 0    # charge based on number of cores reserved (0=no cores)
         self.processes = []
 
-    def add_process(self, process):
-        (utime, stime, etime) = process.delta_times()
-        self.utime += utime
-        self.stime += stime
-        self.etime = etime
-        self.processes.append(process)
+    def update_times(self):
+        self.last_elapsed_utime = 0
+        self.last_elapsed_utime = 0
+        utime = 0
+        stime = 0
+        stat_fn = os.path.join(CPUACCT_DIR, self.name, "cpuacct.stat")
+        if os.path.exists(stat_fn):
+            for line in open(stat_fn).readlines():
+                parts = line.strip().split()
+                if (parts[0] == "user"):
+                   utime = int(parts[1])
+                elif (parts[0] == "system"):
+                   stime = int(parts[1])
+        if (utime!=0 and stime!=0):
+            if (self.last_utime != None):
+                elapsed_utime = (utime - self.last_utime)
+                self.utime = self.utime + elapsed_utime
+            if (self.last_stime != None):
+                elapsed_stime = (stime - self.last_stime)
+                self.stime = self.stime + elapsed_stime
+            self.last_utime = utime
+            self.last_stime = stime
+
+    def update_reservations(self, unreserved_cores):
+        cg_dir = os.path.join(CGROUP_DIR, self.name)
+        if not os.path.isdir(cg_dir):
+            return
+
+        try:
+            this_cores = get_core_list(os.path.join(cg_dir, "cpuset.cpus"))
+        except IOError:
+            # we couldn't read cpuset.cpus; maybe the cgroup disappeared
+            return
+
+        count = get_core_count(unreserved_cores, this_cores)
+
+        self.core_count = count
 
     def merge(self, cgroup, threshold=0, chargeUnit=1):
         if (cgroup.utime + cgroup.stime) >= threshold:
@@ -212,32 +242,7 @@ class Cgroup:
 class CgroupTotals:
     def __init__(self):
         self.cgroups = {}
-
-    def get_core_list(self, fn):
-       # Read a cpuset.cpus file and return a list of the cores that are
-       # included.
-
-       data = open(fn,"r").readline()
-       units =[]
-       for part in data.split(","):
-            unitRange = part.split("-")
-            if len(unitRange) == 1:
-                unitRange = (unitRange[0], unitRange[0])
-            for i in range(int(unitRange[0]), int(unitRange[1])+1):
-                if not i in units:
-                    units.append(i)
-
-       return units
-
-    def get_core_count(self, unreserved_cores, this_cores):
-        # Count the number of cores that are listed in this_cores, but not in
-        # unreserved_cores.
-
-        count = 0
-        for core in this_cores:
-            if not core in unreserved_cores:
-                count += 1
-        return count
+        self.working_cgroups = {}
 
     def get_unreserved_cores(self):
         # We need to get the list of default cpus from somewhere.
@@ -255,10 +260,73 @@ class CgroupTotals:
 
         for fn in things_to_try:
             if os.path.exists(fn) and os.path.isfile(fn):
-                return self.get_core_list(fn)
+                return get_core_list(fn)
 
         raise GacksWatcherNoDefaultCgroup("Did not find a default cgroup file")
 
+    def find_working(self):
+        """ Update self.working_cgroups to contain a list of all VM cgroups
+            in LXC directory. Add new ones, and remove ones that don't
+            exist anymore.
+        """
+        # find new things
+        for fn in os.listdir(CGROUP_DIR):
+            if os.path.exists(os.path.join(CGROUP_DIR, fn, "cpuset.cpus")):
+                if not (fn in self.working_cgroups):
+                    self.working_cgroups[fn] = Cgroup(fn)
+
+        # delete everything that no longer exists
+        for cgroup_name in list(self.working_cgroups.keys()):
+            if not (os.path.exists(os.path.join(CGROUP_DIR, cgroup_name, "cpuset.cpus"))):
+                del self.working_cgroups[cgroup_name]
+
+    def update_working_times(self):
+        """ For each working cgroup, update its elapsed stime and utime """
+        for cgroup_name,cgroup in self.working_cgroups.items():
+            cgroup.update_times()
+
+    def update_working_reservations(self):
+        """ For each working cgroup, compute its core_count attribute. This is
+            the number of cores that is reserved.
+        """
+        try:
+            unreserved_cores = self.get_unreserved_cores()
+        except IOError:
+            print "IOerror in get_unreserved_cores"
+            # error message?
+            return
+        except GacksWatcherNoDefaultCgroup:
+            print "no default cgroup in get_unreserved_cores"
+            # error message?
+            return
+
+        for cgroup_name,cgroup in self.working_cgroups.items():
+            cgroup.update_reservations(unreserved_cores)
+
+    def clear_working(self):
+        """ For each working cgroup, clear the elapsed times and core_count. """
+        for cgroup_name,cgroup in self.working_cgroups.items():
+            cgroup.stime = 0
+            cgroup.mtime = 0
+            cgroup.etime = 0
+            cgroup.core_count = 0
+
+    def charge_working(self, threshold, chargeUnit):
+        for cgroup_name,cgroup in self.working_cgroups.items():
+            self.add(cgroup, threshold, chargeUnit)
+
+    def do_update(self, threshold=0, chargeUnit=1):
+        self.find_working()                        # populate the working set
+        self.update_working_times()                # update the elapsed stime, utime
+        self.update_working_reservations()         # update the core_count
+        self.charge_working(threshold, chargeUnit) # turn ETs and core_count into charges
+        self.clear_working()                       # zero the ETs and core_count for the next pass
+
+    def dump_working(self):
+        print "%-20s %-10s %-10s %-5s" % ("name", "last utime", "last stime", "cores")
+        for cgroup_name,cgroup in self.working_cgroups.items():
+            print "%-20s %10s %10s %5s" % (cgroup.name, str(cgroup.last_utime), str(cgroup.last_stime), str(cgroup.core_count))
+
     def add(self, cgroup, threshold=0, chargeUnit=1):
         name = cgroup.name
         if not (name in self.cgroups):
@@ -280,41 +348,6 @@ class CgroupTotals:
 
         return cglist
 
-    def update_reservations(self, chargeUnit=1):
-        # Count how many cores each cgroup has reserved. If a nonzero number
-        # of cores is reserved then add the cgroup to our list, and add
-        # chargeUnit to its resv_charge.
-
-        try:
-            unreserved_cores = self.get_unreserved_cores()
-        except IOError:
-            print "IOerror in get_unreserved_cores"
-            # error message?
-            return
-        except GacksWatcherNoDefaultCgroup:
-            print "no default cgroup in get_unreserved_cores"
-            # error message?
-            return
-
-        names = os.listdir(CGROUP_DIR)
-        for name in names:
-            cg_dir = os.path.join(CGROUP_DIR, name)
-            if not os.path.isdir(cg_dir):
-                continue
-
-            try:
-                this_cores = self.get_core_list(os.path.join(cg_dir, "cpuset.cpus"))
-            except IOError:
-                # we couldn't read cpuset.cpus; maybe the cgroup disappeared
-                continue
-
-            count = self.get_core_count(unreserved_cores, this_cores)
-
-            if (count > 0):
-                cg = Cgroup(name)
-                cg.core_count = count
-                self.add(cg, 1, chargeUnit)
-
     def dump(self):
         cglist = self.get_groups_sorted_by_charge()
 
@@ -338,144 +371,22 @@ class CgroupTotals:
 
         os.rename(tmp_fn, filename)
 
-class Processes:
-    def __init__(self):
-        self.processes = {}
-
-    def get_timestamp(self):
-        # wall-time is good enough, multiple it by USER_HZ (100)
-        return int(time.time()*100)
-
-    def update_proc(self, name, sampleTime):
-        try:
-            pid = int(name)
-
-            if not (pid in self.processes):
-                self.processes[pid] = Process(pid)
-
-            stat = open(os.path.join(PROC_DIR, os.path.join(name, "stat")), "r").readline()
-
-            self.processes[pid].touched=True
-            self.processes[pid].update_stat(stat, sampleTime)
-
-            # Look in /proc/<pid>/cgroup to see which cgroups this process
-            # belongs to.
-            cgroup_filename = os.path.join(PROC_DIR, os.path.join(name, "cgroup"))
-            if os.path.exists(cgroup_filename):
-                cgroups = open(cgroup_filename, "r").readlines()
-                cgroup_names = []
-                for line in cgroups:
-                    parts = line.strip().split(":")
-                    if len(parts)==3:
-                        cgroup_name=parts[2].lstrip("/")
-                        if (cgroup_name==""):
-                            cgroup_name=CGROUP_BLANKGROUP
-
-                        if cgroup_name.startswith("libvirt/lxc/"):
-                           cgroup_name = cgroup_name[12:]
-                        elif cgroup_name.startswith("system/libvirtd.service"):
-                           cgroup_name = "libvert_service"
-
-                        cgroup_names.append(cgroup_name)
-                self.processes[pid].update_cgroups(cgroup_names)
-
-        except IOError:
-            # maybe the process disappeared while we were trying to read its
-            # files.
-            pass
-
-    def update(self):
-        sampleTime = self.get_timestamp()
-
-        # reset the touch bits so we can see what we updated
-        for pid in self.processes.keys():
-            self.processes[pid].touched = False
-
-        names = os.listdir(PROC_DIR)
-        for name in names:
-            if not name.isdigit():
-                continue
-
-            self.update_proc(name, sampleTime)
-
-        # delete everything we didn't touch
-        for pid in self.processes.keys():
-            if not (self.processes[pid].touched):
-                del self.processes[pid]
-
-    def get_cgroups(self):
-        cgroups={}
-        for pid in self.processes:
-            process = self.processes[pid]
-            (utime, stime, etime) = process.delta_times()
-            if (utime+stime>0):
-                for cgroup_name in process.cgroups:
-                    if not cgroup_name in cgroups:
-                        cgroups[cgroup_name] = Cgroup(cgroup_name)
-                    cgroups[cgroup_name].add_process(process)
-        return cgroups
-
-    def dump(self, threshold=DEFAULT_THRESHOLD):
-        plist = []
-        for pid in self.processes:
-            process = self.processes[pid]
-            (utime, stime, etime) = process.delta_times()
-            if (utime+stime>=threshold):
-                plist.append(process)
-
-        #sort by ctime+utime
-        plist = sorted(plist, key = lambda process: process.delta_times()[0]+process.delta_times()[1], reverse=True)
-
-        print "%-8s %-24s %-24s %-8s %-8s %-8s" % ("pid", "cmd", "cgroups", "utime", "stime", "elp-samp")
-        for process in plist:
-            (utime, stime, etime) = process.delta_times()
-            print "%8d %-24s %-24s %8d %8d %8d" % (process.pid, str(process.cmd), ",".join(process.cgroups), utime, stime, etime)
-
-    def dump_cgroups(self, threshold=DEFAULT_THRESHOLD):
-        cgroups = self.get_cgroups()
-        cglist = []
-
-        for cgroup_name in cgroups:
-            cgroup = cgroups[cgroup_name]
-            if (cgroup.utime+cgroup.stime >= threshold) or (cgroup.core_charge>0):
-                cglist.append(cgroup)
-
-        cglist =sorted(cglist, key = lambda cg: cg.utime+cg.stime, reverse=True)
-
-        print "%-24s %-8s %-8s %-8s %-8s" % ("cgroup", "procs", "utime", "stime", "elp-samp")
-        for cgroup in cglist:
-            print "%-24s %8d %8d %8d %8d" % (cgroup.name, len(cgroup.processes), cgroup.utime, cgroup.stime, cgroup.etime)
+def dump_working(opts):
+    cgroups = CgroupTotals()
 
-def top(opts):
-    processes = Processes()
-    while (1):
-        processes.update()
-        os.system("clear")
-        processes.dump(opts.threshold)
+    cgroups.find_working()                        # populate the working set
+    cgroups.update_working_times()                # update the elapsed stime, utime
+    cgroups.update_working_reservations()         # update the core_count
 
-        time.sleep(opts.delay)
+    cgroups.dump_working()
 
-def topgroups(opts):
-    processes = Processes()
-    while (1):
-        processes.update()
-        os.system("clear")
-        processes.dump_cgroups(opts.threshold)
-
-        time.sleep(opts.delay)
 
 def chargetest(opts):
-    processes = Processes()
     cgroups = CgroupTotals()
     while (1):
-        processes.update()
-        cgroups.update_reservations(1.0/(3600/opts.delay))
-        os.system("clear")
+        cgroups.do_update(opts.threshold, 1.0/(3600/opts.delay))
 
-        cgroups_dict = processes.get_cgroups()
-        for cgroup_name in cgroups_dict:
-            cgroup = cgroups_dict[cgroup_name]
-            cgroups.add(cgroup, opts.threshold, 1.0/(3600/opts.delay))
+        os.system("clear")
 
         cgroups.dump()
 
@@ -494,17 +405,9 @@ def monitor(opts):
     print "sighup handler installed. Send sighup to force report write."
     signal.signal(signal.SIGHUP, handler)
 
-    processes = Processes()
     cgroups = CgroupTotals()
     while (1):
-        processes.update()
-
-        cgroups.update_reservations(1.0/(3600/opts.delay))
-
-        cgroups_dict = processes.get_cgroups()
-        for cgroup_name in cgroups_dict:
-            cgroup = cgroups_dict[cgroup_name]
-            cgroups.add(cgroup, opts.threshold, 1.0/(3600/opts.delay))
+        cgroups.do_update(opts.threshold, 1.0/(3600/opts.delay))
 
         tNow = time.time()
         hour = int(tNow) / opts.reportinterval
@@ -588,7 +491,7 @@ def main():
     (opts, args) = parser.parse_args()
 
     if (len(args)==0):
-        print "specify a command: top, topgroups, chargetest, monitor"
+        print "specify a command: dumpworking, topgroups, chargetest, monitor"
         sys.exit(-1)
 
     if opts.daemon:
@@ -597,8 +500,8 @@ def main():
 
     cmd = args[0]
 
-    if cmd == "top":
-        top(opts)
+    if cmd == "dumpworking":
+        dump_working(opts)
     elif cmd == "topgroups":
         topgroups(opts)
     elif cmd == "chargetest":