d82fa5cc89172ea59efe9140653d1fbcdb39b674
[raven.git] / apps / gacks / gackswatcher.py
1 #!/usr/bin/env python
2
3 # GacksWatcher - Best Effort Monitoring Daemon
4 #
5 # This daemon is intented to be run with the "monitor" command in normal
6 # opertion. See "--help" for other commands.
7 #
8 # By default, it will subdivide time into 5-minute intervals. If a slice
9 # consumes more than 'threshold' time within an interval, it will be billed
10 # for that interval. Billing is normalized so that a 1.0 represents a full hour
11 # of usage. i.e. if 5-minute intervals were used, then a single 5-minute interval
12 # would result in a charge of 1/12 = 0.083.
13 #
14 # In monitor mode, data will be written out to /var/gacks/watcher/incoming/
15 # When monitoring, data is written out at the start of every new hour.
16 #
17 # When running in a vserver root, make sure to set ctx 1, so we can see the
18 # processes of all the vservers.
19 #
20 # /usr/sbin/chcontext --silent --ctx 1 bash
21
22 import atexit
23 import fcntl
24 import os
25 from optparse import OptionParser
26 import signal
27 import sys
28 import time
29
30 def get_cgroup_dir():
31     tries = 10
32     while True:
33         if os.path.exists("/dev/cgroup"):
34             return "/dev/cgroup"
35         elif os.path.exists("/sys/fs/cgroup/cpuset/libvirt/lxc"):
36             return "/sys/fs/cgroup/cpuset/libvirt/lxc"
37         else:
38             # cgroups seems to be missing right after boot on the LXC nodes.
39             print "I don't know where to find cgroup!"
40             if (tries > 0):
41                 print "Trying again in 30 seconds"
42                 time.sleep(30)
43                 tries = tries - 1
44             else:
45                 print "Giving up"
46                 sys.exit(-1)
47
48 def get_cpuacct_dir():
49     tries = 10
50     while True:
51         if os.path.exists("/dev/cgroup"):
52             return "/dev/cgroup"
53         elif os.path.exists("/sys/fs/cgroup/cpuacct/system/libvirtd.service/libvirt/lxc"):
54             return "/sys/fs/cgroup/cpuacct/system/libvirtd.service/libvirt/lxc"
55         else:
56             # cgroups seems to be missing right after boot on the LXC nodes.
57             print "I don't know where to find cpuacct!"
58             if (tries > 0):
59                 print "Trying again in 30 seconds"
60                 time.sleep(30)
61                 tries = tries - 1
62             else:
63                 print "Giving up"
64                 sys.exit(-1)
65
66 CGROUP_DIR = get_cgroup_dir()
67 CPUACCT_DIR = get_cpuacct_dir()
68
69 PROC_DIR="/proc"
70 WATCHER_DIR="/var/gacks/watcher/incoming"
71
72 CGROUP_NOGROUP="nogroup"
73 CGROUP_BLANKGROUP="blankgroup"
74
75 # 'delay' is the period at which measurements are taken.
76
77 DEFAULT_DELAY = 300
78
79 # 'threshold' is the ammount of utime+stime that a slice must consume within
80 #     a period to be perceived as running. This threshold is expressed in
81 #     hundredths of a second (i.e. 10ms). We choose a default that is .05% of
82 #     the default delay. Thus if a process accumulates 150ms or more of time in
83 #     a 300,000ms interval then we will conclude the process is running during
84 #     that interval.
85
86 DEFAULT_THRESHOLD = int(DEFAULT_DELAY*100 * 0.0005)
87
88 assert(DEFAULT_THRESHOLD >= 1)
89
90 glo_sighup_received = False
91
92 def handler(signum, frame):
93     global glo_sighup_received
94     if (signum==signal.SIGHUP):
95         glo_sighup_received = True
96
97 def uniq_list(lst):
98    keys = {}
99    for e in lst:
100        keys[e] = 1
101    return keys.keys()
102
103 def delta_time(x,y):
104     # x is the most recent (newest) timestamp
105     # y is the least recent (oldest) timestamp
106     # TODO: verify wraparounds
107     if (x>=y):
108         return x-y
109     elif y<=0xFFFFFFFF:
110         # 32-bit wraparound
111         return (x+0xFFFFFFFF)-y
112     else:
113         # 64-bit wraparound
114         return (x+0xFFFFFFFFFFFFFFFF)-y
115
116 def mutex_lock(program, lockdir="/var/lock", unlock_on_exit=True):
117    if not os.path.exists(lockdir):
118       os.makedirs(lockdir)
119
120    lock = file(os.path.join(lockdir, program + ".lock"), "w")
121
122    try:
123       fcntl.flock(lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
124    except IOError:
125       lock.close()
126       return None
127
128    if unlock_on_exit:
129       atexit.register(mutex_unlock, lock)
130    return lock
131
132 def mutex_unlock(lock):
133    if lock != None:
134       if not lock.closed:
135          filename = lock.name
136          fcntl.flock(lock, fcntl.LOCK_UN)
137          lock.close()
138          if os.path.isfile(filename):
139             os.remove(filename)
140
141 def get_core_list(fn):
142    # Read a cpuset.cpus file and return a list of the cores that are
143    # included.
144
145    data = open(fn,"r").readline()
146    units =[]
147    for part in data.split(","):
148         unitRange = part.split("-")
149         if len(unitRange) == 1:
150             unitRange = (unitRange[0], unitRange[0])
151         for i in range(int(unitRange[0]), int(unitRange[1])+1):
152             if not i in units:
153                 units.append(i)
154
155    return units
156
157 def get_core_count(unreserved_cores, this_cores):
158     # Count the number of cores that are listed in this_cores, but not in
159     # unreserved_cores.
160
161     count = 0
162     for core in this_cores:
163         if not core in unreserved_cores:
164             count += 1
165     return count
166
167
168 class GacksWatcherException(Exception):
169     def __init__(self, value):
170         self.value = value
171     def __str__(self):
172         return repr(self.value)
173
174 class GacksWatcherNoDefaultCgroup(GacksWatcherException):
175     pass
176
177 class Cgroup:
178     def __init__(self, name):
179         self.name = name
180         self.utime = 0        # elapsed
181         self.stime = 0        # elapsed
182         self.last_utime = None
183         self.last_stime = None
184         self.charge = 0         # charge based on stime+utime
185         self.core_count = 0
186         self.resv_charge = 0    # charge based on whether a reservation occurs (0=no resv, 1=reserved this whole time period)
187         self.core_charge = 0    # charge based on number of cores reserved (0=no cores)
188         self.processes = []
189
190     def update_times(self):
191         self.last_elapsed_utime = 0
192         self.last_elapsed_utime = 0
193         utime = 0
194         stime = 0
195         stat_fn = os.path.join(CPUACCT_DIR, self.name, "cpuacct.stat")
196         if os.path.exists(stat_fn):
197             for line in open(stat_fn).readlines():
198                 parts = line.strip().split()
199                 if (parts[0] == "user"):
200                    utime = int(parts[1])
201                 elif (parts[0] == "system"):
202                    stime = int(parts[1])
203         if (utime!=0 and stime!=0):
204             if (self.last_utime != None):
205                 elapsed_utime = (utime - self.last_utime)
206                 self.utime = self.utime + elapsed_utime
207             if (self.last_stime != None):
208                 elapsed_stime = (stime - self.last_stime)
209                 self.stime = self.stime + elapsed_stime
210             self.last_utime = utime
211             self.last_stime = stime
212
213     def update_reservations(self, unreserved_cores):
214         cg_dir = os.path.join(CGROUP_DIR, self.name)
215         if not os.path.isdir(cg_dir):
216             return
217
218         try:
219             this_cores = get_core_list(os.path.join(cg_dir, "cpuset.cpus"))
220         except IOError:
221             # we couldn't read cpuset.cpus; maybe the cgroup disappeared
222             return
223
224         count = get_core_count(unreserved_cores, this_cores)
225
226         self.core_count = count
227
228     def merge(self, cgroup, threshold=0, chargeUnit=1):
229         if (cgroup.utime + cgroup.stime) >= threshold:
230             self.utime += cgroup.utime
231             self.stime += cgroup.stime
232             self.charge = self.charge + chargeUnit
233
234         # Core_count doesn't use a threshold. If there's a nonzero core_count
235         # then we always apply it.
236         if (cgroup.core_count > 0):
237             self.resv_charge += chargeUnit
238             self.core_charge += (chargeUnit * cgroup.core_count)
239
240 class CgroupTotals:
241     def __init__(self):
242         self.cgroups = {}
243         self.working_cgroups = {}
244
245     def get_unreserved_cores(self):
246         # We need to get the list of default cpus from somewhere.
247         # If we're using vservers, then the nodemanager will put the default
248         # reservation in /etc/vservers/.defaults/cgroup/cpuset.cpus.
249
250         # On lxc, we don't have the .defaults directory anymore, so for now
251         # let's try looking at some pl slices that we know will get default
252         # reservations.
253
254         things_to_try = ["/etc/vservers/.defaults/cgroup/cpuset.cpus",
255                          "/sys/fs/cgroup/cpuset/libvirt/lxc/pl_drl/cpuset.cpus",
256                          "/sys/fs/cgroup/cpuset/libvirt/lxc/pl_sfacm/cpuset.cpus",
257                          "/sys/fs/cgroup/cpuset/libvirt/lxc/pl_sirius/cpuset.cpus"]
258
259         for fn in things_to_try:
260             if os.path.exists(fn) and os.path.isfile(fn):
261                 return get_core_list(fn)
262
263         raise GacksWatcherNoDefaultCgroup("Did not find a default cgroup file")
264
265     def find_working(self):
266         """ Update self.working_cgroups to contain a list of all VM cgroups
267             in LXC directory. Add new ones, and remove ones that don't
268             exist anymore.
269         """
270         # find new things
271         for fn in os.listdir(CGROUP_DIR):
272             if os.path.exists(os.path.join(CGROUP_DIR, fn, "cpuset.cpus")):
273                 if not (fn in self.working_cgroups):
274                     self.working_cgroups[fn] = Cgroup(fn)
275
276         # delete everything that no longer exists
277         for cgroup_name in list(self.working_cgroups.keys()):
278             if not (os.path.exists(os.path.join(CGROUP_DIR, cgroup_name, "cpuset.cpus"))):
279                 del self.working_cgroups[cgroup_name]
280
281     def update_working_times(self):
282         """ For each working cgroup, update its elapsed stime and utime """
283         for cgroup_name,cgroup in self.working_cgroups.items():
284             cgroup.update_times()
285
286     def update_working_reservations(self):
287         """ For each working cgroup, compute its core_count attribute. This is
288             the number of cores that is reserved.
289         """
290         try:
291             unreserved_cores = self.get_unreserved_cores()
292         except IOError:
293             print "IOerror in get_unreserved_cores"
294             # error message?
295             return
296         except GacksWatcherNoDefaultCgroup:
297             print "no default cgroup in get_unreserved_cores"
298             # error message?
299             return
300
301         for cgroup_name,cgroup in self.working_cgroups.items():
302             cgroup.update_reservations(unreserved_cores)
303
304     def clear_working(self):
305         """ For each working cgroup, clear the elapsed times and core_count. """
306         for cgroup_name,cgroup in self.working_cgroups.items():
307             cgroup.stime = 0
308             cgroup.utime = 0
309             cgroup.core_count = 0
310
311     def charge_working(self, threshold, chargeUnit):
312         for cgroup_name,cgroup in self.working_cgroups.items():
313             self.add(cgroup, threshold, chargeUnit)
314
315     def do_update(self, threshold=0, chargeUnit=1):
316         self.find_working()                        # populate the working set
317         self.update_working_times()                # update the elapsed stime, utime
318         self.update_working_reservations()         # update the core_count
319         self.charge_working(threshold, chargeUnit) # turn ETs and core_count into charges
320         self.clear_working()                       # zero the ETs and core_count for the next pass
321
322     def dump_working(self):
323         print "%-20s %-10s %-10s %-5s" % ("name", "last utime", "last stime", "cores")
324         for cgroup_name,cgroup in self.working_cgroups.items():
325             print "%-20s %10s %10s %5s" % (cgroup.name, str(cgroup.last_utime), str(cgroup.last_stime), str(cgroup.core_count))
326
327     def add(self, cgroup, threshold=0, chargeUnit=1):
328         name = cgroup.name
329         if not (name in self.cgroups):
330             self.cgroups[name] = Cgroup(name)
331
332         self.cgroups[name].merge(cgroup, threshold, chargeUnit)
333
334     def get_groups_sorted_by_charge(self, minCharge=0.0095):
335         # XXX where did I come up with minCharge=0.0095 from?
336
337         cglist = []
338
339         for cgroup_name in self.cgroups:
340             cgroup = self.cgroups[cgroup_name]
341             if (cgroup.charge >= minCharge) or (cgroup.resv_charge >= minCharge):
342                 cglist.append(cgroup)
343
344         cglist = sorted(cglist, key = lambda cg: cg.charge, reverse=True)
345
346         return cglist
347
348     def dump(self):
349         cglist = self.get_groups_sorted_by_charge()
350
351         print "%-24s %-8s %-8s %-8s %-8s %-8s" % ("cgroup", "charge", "utime", "stime", "resv-chg", "core-chg")
352         for cgroup in cglist:
353             print "%-24s %8.2f %8d %8d %8.2f %8.2f" % (cgroup.name, cgroup.charge, cgroup.utime, cgroup.stime, cgroup.resv_charge, cgroup.core_charge)
354
355     def save_summary_file(self, filename, dump=False):
356         cglist = self.get_groups_sorted_by_charge()
357
358         dir = os.path.dirname(os.path.abspath(filename))
359         if not (os.path.exists(dir)):
360             os.makedirs(dir)
361
362         tmp_fn = os.path.join(dir, "gackswatcher.tmp")
363
364         f = open(tmp_fn, "w")
365         for cgroup in cglist:
366             line = "%s %0.2f %0.2f %0.2f" % (cgroup.name, cgroup.charge, cgroup.resv_charge, cgroup.core_charge)
367             f.write(line + "\n")
368             if dump:
369                 print line
370         f.close()
371
372         os.rename(tmp_fn, filename)
373
374 def dump_working(opts):
375     cgroups = CgroupTotals()
376
377     cgroups.find_working()                        # populate the working set
378     cgroups.update_working_times()                # update the elapsed stime, utime
379     cgroups.update_working_reservations()         # update the core_count
380
381     cgroups.dump_working()
382
383
384 def chargetest(opts):
385     cgroups = CgroupTotals()
386     while (1):
387         cgroups.do_update(opts.threshold, 1.0/(3600/opts.delay))
388
389         os.system("clear")
390
391         cgroups.dump()
392
393         time.sleep(opts.delay)
394
395 def monitor(opts):
396     global glo_sighup_received
397
398     if not mutex_lock("gackswatcher"):
399         print >> sys.stderr, "gackswatcher is already running. aborting."
400         sys.exit(-1)
401
402     tNow = time.time()
403     last_hour = int(tNow) / opts.reportinterval
404
405     print "sighup handler installed. Send sighup to force report write."
406     signal.signal(signal.SIGHUP, handler)
407
408     cgroups = CgroupTotals()
409     while (1):
410         cgroups.do_update(opts.threshold, 1.0/(3600/opts.delay))
411
412         tNow = time.time()
413         hour = int(tNow) / opts.reportinterval
414
415         if (hour!=last_hour) or (glo_sighup_received):
416             glo_sighup_received = False
417
418             # Save the totals
419             print "generating report at time", int(tNow), "hour", hour
420             cgroups.save_summary_file(os.path.join(WATCHER_DIR, str(int(tNow))), not opts.daemon)
421
422             # We've written the totals, so start over with a new 'cgroups'
423             # to hold new totals.
424             cgroups = CgroupTotals()
425             last_hour = hour
426
427         # XXX can we miss processes that live for less than opts.delay time?
428
429         time.sleep(opts.delay)
430
431 class MyParser(OptionParser):
432     def format_epilog(self, formatter):
433         return self.epilog
434
435 def create_parser():
436    # Generate command line parser
437    parser = MyParser(usage="gackswatcher [options] command [command_options] [command_args]",
438         description="Gacks best effort monitoring daemon",
439         epilog="""
440 Command: [top | topgroups | chargetest | monitor]
441
442   top - display output that is similar to the unix 'top' command
443
444   topgroups - display a list of cgroups and how much utime/stime is consumed by
445      processes in each cgroup.
446
447   chargetest - display output that looks like what would be sent to Gacks.
448      Processes are charged a fractional unit if the process consumes stime+utime
449      that is greater than threshold during a period.
450
451   monitor - run in monitor mode. Output is saved to /var/gacks/watcher/incoming.
452 """,
453         version="gackswatcher ??")
454
455    parser.add_option('-d', '--daemon', action='store_true', dest='daemon', default=False,
456                       help='run daemonized')
457
458    parser.add_option("-e", "--delay", dest="delay", type="int",
459         help="delay between calculations", metavar="seconds", default=DEFAULT_DELAY)
460
461    parser.add_option("-t", "--threshold", dest="threshold", type="int",
462         help="threshold in .1 second units", metavar="tenths-of-seconds", default=DEFAULT_THRESHOLD)
463
464    parser.add_option("-r", "--reportinterval", dest="reportinterval", type="int",
465         help="interval to generate reports", metavar="seconds", default=3600)
466
467    parser.disable_interspersed_args()
468
469    return parser
470
471 # stolen from nodemanager
472 def daemon():
473     """Daemonize the current process."""
474     if os.fork() != 0: os._exit(0)
475     os.setsid()
476     if os.fork() != 0: os._exit(0)
477     os.chdir('/')
478     os.umask(0022)
479     devnull = os.open(os.devnull, os.O_RDWR)
480     os.dup2(devnull, 0)
481     # xxx fixme - this is just to make sure that nothing gets stupidly lost - should use devnull
482     try:
483         crashlog = os.open('/var/log/gackswatcher.daemon', os.O_RDWR | os.O_APPEND | os.O_CREAT, 0644)
484         os.dup2(crashlog, 1)
485         os.dup2(crashlog, 2)
486     except:
487         print "failed to create crashlog file"
488
489 def main():
490     parser = create_parser()
491     (opts, args) = parser.parse_args()
492
493     if (len(args)==0):
494         print "specify a command: dumpworking, chargetest, monitor"
495         sys.exit(-1)
496
497     if opts.daemon:
498         print "daemonizing"
499         daemon()
500
501     cmd = args[0]
502
503     if cmd == "dumpworking":
504         dump_working(opts)
505     elif cmd == "chargetest":
506         chargetest(opts)
507     elif cmd == "monitor":
508         monitor(opts)
509     else:
510         print "unknown command", cmd
511
512 if __name__=="__main__":
513    main()
514
515