report any expired sites & nodes
[monitor.git] / automate / vxargs.py
1 #!/usr/bin/env python
2
3 # DHARMA Project
4 # Copyright (C) 2003-2004 Yun Mao, University of Pennsylvania
5
6 # This library is free software; you can redistribute it and/or
7 # modify it under the terms of version 2.1 of the GNU Lesser General Public
8 # License as published by the Free Software Foundation.
9
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18
19
20 """
21 vxargs: Visualized xargs with redirected output
22
23 """
24 version = "0.3.1"
25 import os, sys, time, signal, errno
26 import curses, random
27 import getopt
28
29 update_rate = 1
30
31 final_stats = {}
32 gsl = None
33 stopping = 0
34
35 def getListFromFile(f):
36     """I'll ignore the line starting with #
37
38     @param f: file object of the host list file
39     @return: a list of hostnames (or IPs)
40     """
41     hostlist = []
42     for line in f:
43         if line[0]!='#':
44             if line.strip():
45                 hostlist.append([line.strip(),''])
46         elif hostlist and hostlist[-1][1]=='':
47             hostlist[-1][1] = line.strip()[1:]
48     return hostlist
49
50 def get_last_line(fn):
51     #equ to tail -n1 fn
52     try:
53         lines = open(fn,'r').readlines()
54         if len(lines)>0:
55             return (0, lines[-1].strip())
56     except IOError:
57         pass
58     return (1,'')
59
60 class Slot:
61     def __init__(self, outdir, num, screen, timeout, name, count):
62         self.outdir = outdir
63         self.slotnum = num
64         self.screen = screen
65         self.comment = ""
66         self.startTime = time.time()
67         self.timeout = timeout
68         self.name = name
69         self.count = count
70
71     def drawLine(self, comment='', done = False):
72         if self.screen is None: return
73         if comment == '':
74             comment = self.comment
75         else:
76             self.comment = comment
77         stdscr = self.screen
78         elapsed = time.time()-self.startTime
79         try:
80             y,x = stdscr.getmaxyx()
81             spaces = ' '*x
82             stdscr.addstr(self.slotnum+2, 0, spaces) #title occupies two lines
83             if done:
84                 stdscr.addstr(self.slotnum+2,0, comment[:x])
85             else:
86                 #construct the string
87                 output = "(%3ds)%3d: %s " % ( round(elapsed), self.count, self.name )
88                 spaceleft = x - len(output)
89                 if self.outdir and spaceleft>1:
90                      outfn = os.path.join(self.outdir, '%s.out' % self.name)
91                      errfn = os.path.join(self.outdir, '%s.err' % self.name)
92                      lout = get_last_line(outfn)
93                      lerr = get_last_line(errfn)
94                      if lerr[0]==0 and lerr[1]:
95                          output += lerr[1]
96                      elif lout[0]==0 and lout[1]:
97                          output += lout[1]
98                      else:
99                          output += comment
100                 else:
101                     output += comment
102                 stdscr.addstr(self.slotnum+2, 0, output[:x] )
103             stdscr.refresh()
104         except curses.error: #some of them will be out of screen, ignore it
105             pass
106     def update(self, pid):
107         self.drawLine()
108         if self.timeout >0:
109             self.kill(pid)
110
111     def kill(self, pid):
112         overtime = time.time()-self.startTime - self.timeout
113         try:
114             if overtime > 3: #expired more than 3 seconds, send -9
115                 os.kill(-pid, signal.SIGKILL)
116             elif overtime > 2: #expired more than 2 seconds, send -15
117                 os.kill(-pid, signal.SIGTERM)
118             elif overtime >= 0:
119                 os.kill(-pid, signal.SIGINT)
120         except OSError, e:
121             if e.errno != errno.ESRCH: # No such process
122                 raise e
123
124     def stop(self, pid):
125         """stop current pid b/c we caught SIGINT twice
126         """
127         self.startTime = time.time() - self.timeout 
128         self.kill(pid)
129     
130 class Slots:
131     pids = {}
132     def __init__(self, max, screen, timeout, outdir):
133         self.maxChild = max
134         self.slots = range(self.maxChild)
135         self.screen = screen
136         self.t = timeout
137         self.outdir = outdir
138         
139     def getSlot(self, name, count):
140         if not self.slots:
141             #it's empty, wait until other jobs finish
142             slot =  self.waitJobs().slotnum
143         else:
144             slot = self.slots[0]
145             self.slots.remove(slot)
146         return Slot(self.outdir, slot, self.screen, self.t, name, count)
147     
148     def mapPID(self, pid, slot):
149         """@param slot: slot object
150         """
151         self.pids[pid] = slot
152
153     def waitJobs(self):
154         while 1:
155             try:
156                 pid, status = os.wait()
157                 break
158             except OSError, e:
159                 if e.errno == errno.ECHILD: #no child processes
160                     raise RuntimeError('no child processes when waiting')
161         slot = self.pids[pid]
162         if self.outdir:
163             open(os.path.join(self.outdir, '%s.status' % slot.name),'w').write('%d' % (status>>8))
164             if (status & 0xFF) !=0:
165                 open(os.path.join(self.outdir, 'killed_list'),'a').write('%s\n' % (slot.name))
166             if status >>8:
167                 open(os.path.join(self.outdir, 'abnormal_list'),'a').write('%s\n' % (slot.name))
168         del self.pids[pid]
169         s = status >> 8
170         if final_stats.has_key(s):
171             final_stats[s]+= 1
172         else:
173             final_stats[s]=1
174         return slot
175     def update(self):
176         for k,v in self.pids.items():
177             v.update(k)
178     def timeout(self):
179         self.update()
180         signal.alarm(update_rate)
181         
182     def drawTitle(self, stuff):
183         if self.screen:
184             y,x = self.screen.getmaxyx()
185             spaces = ' '*(x*2)
186             self.screen.addstr(0,0,  spaces)
187             self.screen.addstr(0,0, stuff[:x*2])
188             self.screen.refresh()
189         else:
190             print stuff
191     def stop(self):
192         if stopping ==1:
193             msg = 'Stopping -- Waiting current jobs done. Press Ctrl-C again to kill current jobs.'
194         else:
195             msg = 'Stopping -- Killing current jobs'
196         self.drawTitle(msg)
197         if stopping >1:
198             for k,v in self.pids.items():
199                 v.stop(k)
200         return
201
202 def handler(signum, frame_unused):
203     global gsl
204     if signum==signal.SIGALRM:
205         gsl.timeout()
206     if signum==signal.SIGINT:
207         global stopping
208         stopping += 1
209         gsl.stop()
210
211 def generateCommands(cmd_line, args):
212     return [per_arg.replace('{}', args[0]) for per_arg in cmd_line]
213         
214 def spawn(cmdline, outfn, errfn, setpgrp = False):
215    """A cleverer spawn that lets you redirect stdout and stderr to
216    outfn and errfn.  Returns pid of child.
217    You can't do this with os.spawn, sadly.
218    """
219    pid = os.fork()
220    if pid==0: #child
221        out = open(outfn, 'w')
222        os.dup2(out.fileno() ,sys.stdout.fileno())
223        err = open(errfn, 'w')
224        os.dup2(err.fileno(), sys.stderr.fileno())
225        if setpgrp:
226            os.setpgrp()
227        try:
228            os.execvp(cmdline[0], cmdline)
229        except OSError,e:
230            print >> sys.stderr, "error before execution:",e
231            sys.exit(255)
232    #father process
233    return pid
234
235 def start(win, max_child, hlist, outdir, randomize, command_line, timeout):
236
237     total = len(hlist)
238
239     if randomize:
240         random.shuffle(hlist)
241
242     signal.signal(signal.SIGALRM, handler)
243     signal.signal(signal.SIGINT, handler)
244     signal.alarm(update_rate)
245
246     sl = Slots(max_child, win, timeout, outdir)
247     global gsl
248     global stopping
249     gsl = sl
250     count = 0
251     for i in hlist:
252         slot = sl.getSlot(i[0], count)
253         if stopping>0:
254             slot.drawLine('Done', done=True)
255             break
256             
257         count += 1
258         slot.drawLine(i[1])
259         x = generateCommands(command_line, i)
260         
261         sl.drawTitle("%d/%d:%s" %(count, total,' '.join(x)))
262         
263         outpath = '/dev/null'
264         errpath = '/dev/null'
265         if outdir:
266             outpath = os.path.join(outdir, '%s.out'%i[0])
267             errpath = os.path.join(outdir, '%s.err'%i[0])
268
269         pid = spawn(x, outpath, errpath, setpgrp = True)
270         sl.mapPID(pid, slot)
271
272     while sl.pids:
273         try:
274             slot = sl.waitJobs()
275         except RuntimeError:
276             print >> sys.stderr, 'Warning: lost tracking of %d jobs' % len(sl.pids)
277             return
278         slot.drawLine('Done', done = True) #Done
279
280 def get_output(outdir, argument_list, out= True, err=False, status=False):
281     """
282
283     For post processing the output dir.
284
285     @param out: decide whether to process *.out files
286     @param err: decide whether to process *.err files
287     @param status: decide whether to process *.status files
288     
289     @return: (out, err, status): out is a hash table, in which the
290     keys are the arguments, and the values are the string of the
291     output, if available. err is similar. the values of hash table
292     status is the value of exit status in int.
293     
294     """
295     if not out and not err and not status:
296         raise RuntimeError("one of out, err and status has to be True")
297     
298     result = ({},{},{})
299     mapping = ('out','err','status')
300     p = []
301     if out: p.append(0)
302     if err: p.append(1)
303     if status: p.append(2)
304     for arg in argument_list:
305         basefn = os.path.join(outdir, arg)
306         for i in p:
307             fn = '.'.join( (basefn, mapping[i]) ) #basefn.ext
308             try:
309                 lines = open(fn).readlines()
310                 result[i][arg]=''.join(lines)
311             except IOError:
312                 pass
313     if not status: return result
314     int_status = {}
315     for k,v in result[2].items():
316         try:
317             int_status[k] = int(v.strip())
318         except ValueError:
319             pass
320     return result[0], result[1], int_status
321
322 def main():
323     options = 'hP:ra:o:yt:pn'
324     long_opts = ['help','max-procs=','randomize','args=','output=','noprompt','timeout=','plain', 'version','no-exec']
325     try:
326         opts,args = getopt.getopt(sys.argv[1:], options,long_opts)
327     except getopt.GetoptError:
328         print "Unknown options"
329         usage()
330         sys.exit(1)
331     #set default values
332     ask_prompt = True
333     maxchild = 30
334     randomize = False
335     hostfile = sys.stdin
336     outdir = ''
337     timeout = 0
338     plain = False
339     no_exec = False
340     if os.environ.has_key('VXARGS_OUTDIR'):
341         outdir = os.environ['VXARGS_OUTDIR']
342     for o,a in opts:
343         if o in ['--version']:
344             print "vxargs version",version
345             print "Copyright (c) 2004 Yun Mao (maoy@cis.upenn.edu)"
346             print "Freely distributed under GNU LGPL License"
347             sys.exit(1)
348         elif o in ['-h','--help']:
349             usage()
350             sys.exit(1)
351         elif o in ['-r','--randomize']:
352             randomize = True
353         elif o in ['-P','--max-procs']:
354             maxchild = int(a)
355         elif o in ['-a','--args']:
356             try:
357                 hostfile = open(a,'r')
358             except IOError, e:
359                 print "argument file %s has error: %s" % ( a, str(e) )
360                 sys.exit(3)
361         elif o in ['-o','--output']:
362             outdir = a
363             if a =='/dev/null': outdir = ''
364         elif o in ['-y','--noprompt']:
365             ask_prompt = False
366         elif o in ['-t','--timeout']:
367             timeout = int(a)
368         elif o in ['-p','--plain']:
369             plain = True
370         elif o in ['-n','--no-exec']:
371             no_exec = True
372         else:
373             print 'Unknown options'
374             usage()
375             sys.exit(1)
376     if len(args)<1:
377         print "No command given."
378         usage()
379         sys.exit(1)
380     #now test outdir
381     if outdir:
382         if os.path.exists(outdir):
383             if not os.path.isdir(outdir):
384                 print "%s exists and is not a dir, won't continue" % outdir
385                 sys.exit(3)
386             elif no_exec:
387                 print "%s is the destination dir and would be destroyed." % (outdir)
388             elif ask_prompt:
389                 if hostfile == sys.stdin:
390                     print "You must specify --noprompt (-y) option if no --args (-a) or --no-exec (-n) is given. Doing so will destroy folder %s." % (outdir)
391                     sys.exit(3)
392                 else:
393                     result = raw_input("%s exists. Continue will destroy everything in it. Are you sure? (y/n) " % (outdir))
394                     if result not in ['y','Y']:
395                         sys.exit(3)
396             os.system('rm -f %s' % (os.path.join(outdir,'*')))
397         else:
398             if not no_exec:
399                 os.system('mkdir -p %s' % outdir)
400     
401     hlist = getListFromFile(hostfile)
402     if no_exec:
403         for i in hlist:
404             real_cmdline = generateCommands(args, i)
405             print ' '.join(real_cmdline)
406         sys.exit(0)
407         
408     if plain: # no fancy output
409         return start(None, maxchild, hlist, outdir, randomize, args, timeout)
410     else:
411         # use fancy curses-based animation
412         try:
413             curses.wrapper(start, maxchild, hlist, outdir, randomize, args, timeout)
414         except curses.error:
415             sys.exit(4)
416     #post execution, output some stats
417     total = 0
418     for k,v in final_stats.items():
419         print "exit code %d: %d job(s)" % (k,v)
420         total += v
421     print "total number of jobs:", total
422 def usage():
423     print """\
424 NAME
425
426   vxargs - build and execute command lines from an argument list file
427   with visualization and parallelism, and output redirection.
428    
429 DESCRIPTION
430
431   vxargs reads a list of arguments from a txt file or standard input,
432   delimited by newlines, and executes the command one or more times
433   with initial arguments in which {} is substituted by the arguments
434   read from the file or standard input. The current executing commands
435   and progress will be dynamically updated on the screen. Stdout and
436   stderr of each command will be redirected to separate files. A list
437   of all processes with a non-zero exit status is generated in file
438   abnormal_list. A list of all timeout processes is generated in file
439   killed_list.
440   
441 SYNOPSIS
442
443   vxargs [OPTIONS] command [initial-arguments]
444
445 OPTIONS
446
447   --help
448     Print a summary of the options to vxargs and exit.
449
450   --max-procs=max-procs, -P max-procs
451     Run up to max-procs processes at a time; the default is 30.
452
453   --randomize, -r [OPTIONAL]
454     Randomize the host list before all execution.
455
456   --args=filename, -a filename
457     The arguments file. If unspecified, the arguments will be read
458     from standard input, and -y option must be specified.
459     
460   --output=outdir, -o outdir
461     output directory for stdout and stderr files
462     The default value is specified by the environment variable VXARGS_OUTDIR.
463     If it is unspecified, both stdout and stderr will be redirected
464     to /dev/null.
465     Note that if the directory existed before execution, everything
466     inside will be wiped.
467
468   --timeout=timeout, -t timeout
469     The maximal time in second for each command to execute. timeout=0
470     means infinite.  0 (i.e. infinite) is the default value. When the time is up,
471     vxargs will send signal SIGINT to the process. If the process does not
472     stop after 2 seconds, vxargs will send SIGTERM signal, and send SIGKILL
473     if it still keeps running after 3 seconds.
474
475   --noprompt, -y
476     Wipe out the outdir without confirmation.
477
478   --no-exec, -n
479     Print the commands that would be executed, but do not execute them.
480
481   --plain, -p
482     Don't use curses-based output, but plain output to stdout
483     instead. It will be less exciting, but will do the same job
484     effectively. It is useful if one wants to start vxargs from cron
485     or by another program that doesn't want to see the output.
486     By default, vxargs uses the curses-based output.
487
488   --version
489     Display current version and copyright information.
490     
491 EXAMPLES:
492   Suppose the iplist.txt file has following content:
493 $ cat iplist.txt
494 216.165.109.79
495 #planetx.scs.cs.nyu.edu
496 158.130.6.254
497 #planetlab1.cis.upenn.edu
498 158.130.6.253
499 #planetlab2.cis.upenn.edu
500 128.232.103.203
501 #planetlab3.xeno.cl.cam.ac.uk
502
503 Note that lines starting with '#' will be interpreted as comment for
504 the previous lines, which is optional, for visualization purpose only.
505
506 $ vxargs -a iplist.txt -o /tmp/result -P 10 ssh upenn_dharma@{} "hostname;uptime"
507
508 ...[ UI output]...
509
510 $ cat /tmp/result/*
511 planetlab3.xeno.cl.cam.ac.uk
512  03:13:21 up 4 days, 14:36,  0 users,  load average: 0.36, 0.44, 0.44
513 planetlab2.cis.upenn.edu
514  03:13:20  up 26 days, 16:19,  0 users,  load average: 8.11, 7.41, 7.41
515 planetlab1.cis.upenn.edu
516  03:13:19  up 22 days, 20:02,  0 users,  load average: 13.60, 12.55, 12.59
517 ssh: connect to host 216.165.109.79 port 22: Connection timed out
518 $
519
520 other examples:
521 cat iplist.txt | vxargs -o /tmp/result rsync -az -e ssh --delete mirror $SLICE@{}:
522
523 vxargs -a iplist.txt -o /tmp/result ssh {} killall -9 java
524
525 For more information, please visit http://dharma.cis.upenn.edu/planetlab/vxargs/
526 """
527 if __name__=='__main__':
528     main()
529