import repository from arizona
[raven.git] / 2.0 / plush / storkplushd
1 #! /usr/bin/env python
2
3 # Jude Nelson
4 # Parts copied from pacmand
5 #
6 # Synchronizes repository metafile
7 #
8 #           [option, long option,                     variable,                      action,       data,     default,                       metavar,                       description]
9 """arizonaconfig
10     options=[
11
12             ["-C",   "--configfile",  "configfile",  "store",       "string", "/usr/local/stork/etc/stork.conf", "FILE",      "use a different config file (/usr/local/stork/etc/stork.conf is the default)"],
13              ["",    "--push-interval",    "push_interval",    "store",   "int",    10,    None,   "Interval (in seconds) between each metafile push."],
14              ["",    "--check-interval",   "check_interval",   "store",   "int",    5,     None,   "Interval (in seconds) between checking PLuSH process status."],
15              ["",    "--plush-port",      "plush_port",        "store",    "int",   4000,  None,   "Port on which to open PLuSH."],
16             ["", "--sync", "do_sync", "store_true", None, False, None, "do not daemonize"],
17             ["", "--nest-list", "nest_list", "store", "string", "arizona_nest@planetlab1.arizona-gigapop.net", "nest_list", "Comma-separated list of nest slices to which to send metadata, each in the form of slice-name@planetlab-host"],
18             ["", "--planetlab-user", "planetlab_user", "store", "string", "jnelson@email.arizona.edu", "planetlab_user", "PlanetLab user accont for PLuSH to use to access the nests (NOTE: you need to set this up separately)"],
19             ["", "--metafile-path", "metafile_path", "store", "string", "metafile", "metafile_path", "Path to the metafile to distribute to each Stork nest"],
20             ["", "--plush-cwd", "plush_cwd", "store", "string", "/usr/local/bin/plush", "plush_cwd", "PLuSH installation directory"],
21             ["", "--plush-command", "plush_command", "store", "string", "/usr/bin/plush", "plush_command", "The command that executes PLuSH"],
22             ["", "--remote-path", "remote_path", "store", "string", None, "remote_path", "Path on each nest to store the metafile"],
23             ["", "--client-slices", "client_slices", "store", "string", None, "client_slices", "CSV list of client slices that will talk to the nest slice"],
24             ["", "--nodes-per-thread", "nodes_per_thread", "store", "int", 50, "nodes_per_thread", "Number of nodes to assign to one data-pushing thread"],
25             ["", "--metafile-url", "metafile_url", "store", "string", "http://quake2.cs.arizona.edu/packageinfo/metafile", "metafile_url", "URL to metafile as seen by remote hosts"],
26             ["", "--nest-metafile-url", "nest_metafile_url", "store", "string", "http://localhost:6648/quake2.cs.arizona.edu/packageinfo/metafile", "nest_metafile_url", "URL to metafile on nest as seen by each client"],
27             ["", "--client-metafile-path", "client_metafile_path", "store", "string", "/usr/local/stork/var/packageinfo/quake2.cs.arizona.edu/packageinfo/metafile", "client_metafile_path", "Path to the metafile on the client slice"]
28             ]
29      includes=[]
30 """
31
32 import sys,os,signal,time
33 #should be in the same directory as the other scripts when used...
34 #sys.path += ["../python/refactor"]
35 sys.path += ["/usr/local/stork/bin"]
36 import arizonaconfig, arizonareport, arizonacomm
37 import arizonageneral
38 import threading
39 import thread
40 import xmlrpclib
41
42 # default values
43 glo_push_interval = 10
44 check_interval = 5
45 plush_port = 4000
46 plush_xmlrpc_port = 4001 # == plush_port + 1, as determined by the PLuSH source code
47
48 # names of files PLuSH expects to be in the same working directory as the executable
49 allsite_name = "allsites.xml"
50 experiment_name = "get-update.xml"
51 directory_name = "directory.xml"
52 helper_scripts_name = "helper-scripts"
53 metafile_name = "metafile.tar"
54
55 # change this as needed...
56 planetlab_user = "jnelson@email.arizona.edu"
57
58 # hash table of nest status
59 glo_nest_connection_status = {}
60
61 # global debug flag
62 _debug = True
63
64 # debug macro
65 def debug(msg):
66    if _debug == True:
67       print "DEBUG: " + msg
68
69
70
71 # verify a nest connection by searching the plush logfile for a successful connection.
72 # hostname is the planetlab slice@hostname:portnum string
73 # logfile path should refer to plush_logfile.txt (wherever it is).
74 # returns the last line number in the logfile where the verification is found; returns -1 if not found.
75 def verify_connection( hostname, logfile_path ):
76    logfile = open( logfile_path, "r" )
77    linecount = 0
78    ret = -1
79    for line in logfile.readlines():
80       # look for "Connect to $hostname"
81       if line.find( "Connected to " + hostname ) != -1:
82          ret = linecount
83
84       linecount = linecount + 1
85    #for
86
87    return ret
88       
89       
90
91 # get the connection status of a nest.
92 # returns 0 if the nest is not connected.
93 # returns nonnegative if the nest is connected.
94 def get_connection_status( nest ):
95    global glo_nest_connection_status;
96
97    if glo_nest_connection_status.keys().count(nest) == 0:
98       glo_nest_connection_status[nest] = 0
99       return 0
100
101    return glo_nest_connection_status[nest]
102
103
104 # set the connection status of a nest
105 # status is 0 to indicate no connection
106 # status is nonnegative to indicate connection
107 # (status is usually the line number in the plush log file
108 # where plush indicates it has connected to the nest)
109 def set_connection_status( nest, status ):
110    global glo_nest_connection_status
111
112    glo_nest_connection_status[nest] = status
113
114
115 # invalidate all connections, forcing all nests to reconnect
116 def invalidate_all_connections():
117    global glo_nest_connection_status
118
119    glo_nest_connection_status = {}
120
121
122
123 # have plush connect to a nest, given a plush xmlrpc.Server instance, the nest name, and
124 # the path to the plush logfile.  nest has the format of slice_name@node_name:port_num
125 # Return true if the nest is successfully connected; false if not
126 def plush_connect( plush_server, nest, attempt_count, logfile_path, timeout, plush_info ):
127    client_list = plush_info['client_list']
128
129    if get_connection_status( nest ) != 0:
130       # we're already connected
131       return True
132
133    while attempt_count > 0:
134       print "[storkplushd] attempting to connect to " + nest
135       plush_server.plush.connectHost( nest )
136       time.sleep( timeout )
137
138       result = verify_connection( nest, logfile_path )
139
140       # result is the line number in the logfile where the nest was indicated to have connected last.
141       # Higher result values mean later connections.  Since we store connection status as the line number
142       # in the logfile where the node was last connected, a change in the line number indicates a successful
143       # connection.
144       if result > get_connection_status( nest ):
145          # connection successful!
146          set_connection_status( nest, result )
147          print "[storkplushd]: " + nest + " connection registered"
148
149          # start up storksyncd on the remote nest, such that it monitors the file we'll be pushing to it
150          os.system("scp /usr/local/stork/bin/storksyncd " + nest.split(":")[0] + ":~/")
151          plush_server.plush.runCommandVec("python ~/storksyncd --poll-metafile=" + plush_info['remote_path'] + " --poll-timeout=" + str(2*glo_push_interval) + " --metafile-url=" + plush_info['metafile_url'] + " --sync &", nest)
152          
153          # get hostname of this nest
154          nest_hostname = nest.split("@")[1].split(":")[0]
155           
156          # start up storksyncd on each nest client slice
157          for client in client_list:
158             print "[storkplushd]: Starting storksyncd on " + client + "@" + nest_hostname
159             os.system("scp /usr/local/stork/bin/storksyncd " + client + "@" + nest_hostname + ":~/")
160             os.system("ssh " + client + "@" + nest_hostname + " python /home/" + client + "/storksyncd --poll-metafile=" + plush_info['client_metafile_path'] + " --poll-timeout=" + str(4*glo_push_interval) + " --metafile-url=" + plush_info['nest_metafile_url'])
161             
162             
163          print "[storkplushd]: started storksyncd on " + nest.split(":")[0]
164          return True
165       
166       attempt_count = attempt_count - 1
167       if attempt_count > 0:
168          print "[storkplushd]: no luck connecting to " + nest + ", retrying..."
169
170    print "[storkplushd]: giving up on " + nest
171    return False
172
173
174
175 # start the PLuSH controller
176 def start_plush( plush_dir, plush_command, plush_portnum, nest_list, logfile_path ):
177    invalidate_all_connections() 
178    
179    # start the process
180    debug("plush command: " + plush_dir + "/" + "plush -P " + str(plush_port) )
181    
182    current_dir = os.getcwd()
183    # os.chdir( plush_dir )
184    # os.system( "plush -P " + str(plush_port) + " &")
185    # os.chdir( current_dir )
186    #plush_thread = PLuSH_thread( "cd " + plush_dir + " && ./plush -P " + str(plush_port) + " && cd -")
187    plush_thread = PLuSH_thread( plush_dir + "/plush -P " + str(plush_port) )
188
189    print "[storkplushd]: PLuSH started; giving it some time to discover nest slices..."
190
191    # give plush some time to start up
192    time.sleep(20)
193
194    # open a connection to PLuSH
195    plush_server = xmlrpclib.Server("http://localhost:" + str(plush_portnum) + "/")
196    debug("connecting to XMLRPC PLuSH server at http://localhost:" + str(plush_portnum) + "/")
197    debug("Test connection: " + plush_server.plush.test() )
198    
199    print "[storkplushd]: Initialized PLuSH"
200
201
202
203
204
205
206 # is PLuSH running?
207 def is_plush_running():
208
209    cmd = os.popen("ps aux")
210    proc_list = cmd.readlines()
211    running = False
212
213    cmd.close()
214
215    for proc in proc_list:
216       plush_proc = proc.find("plush -P")
217       if plush_proc >= 0:
218          running = True
219          break
220
221    return running
222 # end
223
224
225
226
227
228 # simple method to write a file, given a string (used to create PLuSH XML files)
229 def make_file( filename, data_str ):
230    try:
231       file = open( filename, "w" )
232       file.writelines( data_str )
233       file.close()
234       return True
235    except:
236       print "[storkplushd]: could not create file " + filename
237       return False
238
239
240
241
242 # thread that forks and executes PLuSH itself
243 # (for some reason, invoking "plush -P 4000 &" from os.system() or the shell
244 # causes PLuSH to die a few seconds later)
245 class PLuSH_thread(threading.Thread):
246
247    # just give the thread the PLuSH command to start
248    def __init__(self, plush_command):
249       threading.Thread.__init__(self)
250       self.plush_command = plush_command
251       
252       if is_plush_running() == False:
253          self.start()
254    #end
255
256    # on running, execute the command and sit idly
257    def run(self):
258       os.system( self.plush_command )
259       while True:
260          time.sleep(10)
261          if is_plush_running() == False:
262             invalidate_all_connections() 
263             break   # die if PLuSH dies
264       #end
265    #end
266 #end
267
268
269 # generate a function call with a variable number of arguments
270 def gen_call( method_name, arg_list ):
271    
272    arg_str = ''
273    for arg in arg_list:
274       arg_str = arg_str + "," + arg
275
276    arg_str = arg_str[1:]
277
278    return method_name + "(" + arg_str + ")"
279
280 # push data to a nest, given an xmlrpc.Server instance initialized to talk to plush, a nest name,
281 # a list of ASCII strings to put into the remote file, and a path to the remote file.
282 # return true on success, false on failure
283 def push_data( plush_server, nests, lines, file_path_remote):
284
285    # can't do anything if plush isn't running
286    if is_plush_running() == False:
287       return False
288
289    debug("push_data: pushing data to " + nests + ":" + file_path_remote)
290
291    tmp_buff_file = "/tmp/.b"
292
293    # try to clear out the file (if it exists)
294    try:
295       cmd1 = gen_call( "plush_server.plush.runCommandVec", ["test -f " + file_path_remote + " && sudo rm -rf " + file_path_remote] + nests.split(',') )
296       cmd2 = gen_call( "plush_server.plush.runCommandVec", ["test -f " + tmp_buff_file + " && sudo rm -rf " + tmp_buff_file] + nests.split(',') )
297       exec( cmd1 )
298       exec( cmd2 )
299
300    except:
301       print "[storkplushd]: could not send file to " + nests
302       return False
303
304    # echo the lines to the file remotely
305    line_count = 0
306    for line in lines:
307       line = line.rstrip("\n")
308       try:
309          cmd = gen_call( "plush_server.plush.runCommandVec", ["sudo echo " + line + " >> " + tmp_buff_file] + nests.split(',') )
310          exec( cmd )
311          line_count = line_count + 1
312       except:
313          print "[storkplushd]: error in transmitting file data to " + file_path_remote + " on nests " + nests
314          return False
315
316    try:
317       cmd = gen_call( "plush_server.plush.runCommandVec", ["sudo mv " + tmp_buff_file + " " + file_path_remote] + nests.split(',') )
318       exec( cmd )
319    except:
320       print "[storkplushd]: error in updating file " + file_path_remote + " on nest " + nests
321       return False
322
323    print "[storkplushd]: sent " + str(line_count) + " lines to " + nests
324    return True
325
326
327
328
329 # Manage node connections--try to connect to every nest
330 class ConnectionManager(threading.Thread):
331
332    # start up
333    def __init__(self, nest_list, plush_info, verboseness):
334       threading.Thread.__init__(self)
335       self.verboseness = verboseness
336       self.plush_info = plush_info
337       self.plush_server = xmlrpclib.Server( plush_info['plush_url'] )
338       self.nests = nest_list
339       self.nests_csv = ''
340       for nest in nests:
341          self.nests_csv = self.nests_csv + ',' + nest
342
343       self.nests_csv = self.nests_csv[1:]
344       
345       self.start()
346       
347    # continuously try to talk to nests
348    def run(self):
349       while True:
350          for nest in self.nests:
351             if get_connection_status( nest ) == 0:
352                try:
353                   plush_connect( self.plush_server, nest, 1, self.plush_info['plush_logfile_path'], 20, self.plush_info)
354                   # time.sleep(3)  # give a slight delay so we don't run out of socket file descriptors for large numbers of nests
355                except:
356                   print "[storkplushd] got exception when connecting to " + nest + ", so I'll come back to it later"
357                   self.plush_server = xmlrpclib.Server( self.plush_info['plush_url'] )
358                   pass
359
360
361
362 # Continuously push out metadata to each connected nest
363 class DataPusher(threading.Thread):
364
365    # start up
366    def __init__(self, timeout, nests, plush_info, verboseness):
367       threading.Thread.__init__(self)
368       self.verboseness = verboseness
369       self.plush_info = plush_info
370       self.plush_server = xmlrpclib.Server( plush_info['plush_url'] )
371       self.timeout_delta = timeout
372       self.nests = nests
373       self.timeout = time.time() + self.timeout_delta
374       self.start()
375
376    # continuously attempt to push metadata to each connected node when it times out
377    def run(self):
378       while True:
379          # read the metadata
380          data = []
381          try:
382             metadata_file = open(self.plush_info['metafile_path'], "r")
383             data = metadata_file.readlines()
384             metadata_file.close()
385          except:
386             print "[storkplushd] could not read from " + self.plush_info['metafile_path']
387             continue
388             pass
389
390          if time.time() > self.timeout:
391             self.timeout = time.time() + self.timeout_delta
392             push_data( self.plush_server, nests, data, self.plush_info['remote_path'] )
393
394 #         for nest in self.nests:
395 #            if get_connection_status( nest ) == 0:
396 #               continue  # don't worry about non-connected nests--that's not our job
397 #
398 #            if time.time() > self.timeouts[nest]:
399 #               # we've timed out--send this nest the metadata
400 #               self.timeouts[nest] = self.timeout_delta + time.time()
401 #               push_data( self.plush_server, nest, data, self.plush_info['remote_path'] )
402
403
404
405 # check the PLuSH process every so often and re-spawn it as needed
406 class PLuSH_Poller(threading.Thread):
407    """
408    <Purpose>
409       Initialize the thread to begin monitoring the PLuSH process
410    """
411    def __init__(self, timeout, plush_info, verboseness):
412       threading.Thread.__init__(self)
413       self.timeout = time.time() + timeout    # poll every few seconds
414       self.timeout_delta = timeout
415       self.plush_info = plush_info
416       self.verboseness = verboseness
417
418       print "[storkplushd]: will check PLuSH process every " + str(timeout) + " seconds."
419       self.start()
420
421    """
422    <Purpose>
423       Check to see if PLuSH is still running every so often.
424    """
425    def run(self):
426       while True:
427           # are we expired?
428           if self.timeout < time.time() :
429              # new timeout...
430              self.timeout = time.time() + self.timeout_delta
431              
432              # if it's not running, then spawn it
433              if is_plush_running() == False:
434                 start_plush( self.plush_info['plush_dir'], self.plush_info['plush_command'], self.plush_info['plush_portnum'], self.plush_info['nest_list'], self.plush_info['plush_logfile_path'] )  
435
436           else:
437             # sleep some to save CPU
438             time.sleep( self.timeout_delta )
439          # end
440       # end
441    # end
442 # end
443
444 def handler_sighup(signum, frame):
445     """
446     <Purpose>
447        Intercepts the "hangup" signal, but doesn't do anything.
448        Simply causes the sleep to return.
449     """
450     pass
451
452
453 def Main():
454     global sync
455     global verbose
456     global pacman_update_event
457     global plush_command
458     global plush_port
459     global glo_push_interval
460     global check_interval
461
462     args = arizonaconfig.init_options("storkplushd",version="2.0", configfile_optvar="configfile")
463
464     
465     loc_push_interval = arizonaconfig.get_option("push_interval")
466     glo_push_interval = loc_push_interval
467     loc_check_interval = arizonaconfig.get_option("check_interval")
468     loc_experiment_path = arizonaconfig.get_option("experiment_path")
469     loc_nest_directory_path = arizonaconfig.get_option("nest_directory_path")
470     loc_plush_port = arizonaconfig.get_option("plush_port")
471     
472     loc_scripts_path = arizonaconfig.get_option("helper_scripts_path")
473     loc_metafile_path = arizonaconfig.get_option("metafile_path")
474     loc_plush_cwd = arizonaconfig.get_option("plush_cwd")
475     loc_plush_command = arizonaconfig.get_option("plush_command")
476     loc_nodes_per_thread = arizonaconfig.get_option("nodes_per_thread")
477     loc_client_slices = arizonaconfig.get_option("client_slices")
478     loc_nest_metafile_url = arizonaconfig.get_option("nest_metafile_url")
479     loc_metafile_url = arizonaconfig.get_option("metafile_url")
480     loc_client_metafile_path = arizonaconfig.get_option("client_metafile_path")
481
482     loc_remote_path = arizonaconfig.get_option("remote_path")
483
484     do_sync = arizonaconfig.get_option("do_sync")
485     verbose = arizonaconfig.get_option("verbose")
486     nest_list = arizonaconfig.get_option("nest_list")
487     loc_planetlab_user = arizonaconfig.get_option("planetlab_user")
488
489     if loc_push_interval == None:
490        loc_push_interval = push_interval
491
492     if loc_check_interval == None:
493        loc_check_interval = check_interval
494
495     if loc_plush_port == None:
496        loc_plush_port = plush_port
497
498     if loc_planetlab_user == None:
499        loc_planetlab_user = planetlab_user
500
501     if loc_remote_path == None:
502        loc_remote_path = "/tmp/metafile." + str(os.getpid())
503
504     loc_plush_url = "http://localhost:" + str(loc_plush_port + 1) + "/"
505
506     # calculate a list of nests
507     nest_array_tmp = nest_list.split(",")
508     nest_array = []
509     for nest in nest_array_tmp:
510        nest = nest.strip()
511        nest_array.append( nest )
512
513     # calculate list of slices
514     client_array_tmp = loc_client_slices.split(",")
515     client_array = []
516     for client in client_array_tmp:
517        client = client.strip()
518        client_array.append( client )
519
520     # we haven't connected to anyone yet, so zero out our connection list
521     for nest in nest_array:
522        glo_nest_connection_status[nest] = 0
523
524     # accumulate PLuSH info into a dictionary
525     plush_info = {
526          'metafile_path':loc_metafile_path,
527          'experiment_path':loc_experiment_path,
528          'plush_url':loc_plush_url,
529          'plush_dir':loc_plush_cwd,
530          'plush_portnum':loc_plush_port + 1,
531          'plush_command':loc_plush_command,
532          'nest_list':nest_array,
533          'plush_logfile_path':"plush-logfile.txt",
534          'remote_path':loc_remote_path,
535          'client_list':client_array,
536          'nest_metafile_url':loc_nest_metafile_url,
537          'metafile_url':loc_metafile_url,
538          'client_metafile_path':loc_client_metafile_path
539     }
540
541     # set the hangup signal handler
542     signal.signal(signal.SIGHUP, handler_sighup)
543
544     if do_sync == False:
545        # run as a daemon
546        arizonageneral.make_daemon("storkplushd")
547
548    
549     # start PLuSH running!
550     start_plush( plush_info['plush_dir'], plush_info['plush_command'], plush_info['plush_portnum'], plush_info['nest_list'], plush_info['plush_logfile_path'] )
551
552    
553     # start monitoring plush
554     plush_mon = PLuSH_Poller( loc_check_interval, plush_info, 1 )
555
556     # start managing connections
557     plush_connect = ConnectionManager( nest_array, plush_info, 1 )
558
559     num_partitions = int(len(nest_array) / loc_nodes_per_thread) + 1
560     nest_partitions = []
561     for i in range(0, num_partitions):
562        nest_partitions.append([])
563
564     curr_partition = 0
565     for nest in nest_array:
566        nest_partitions[curr_partition].append(nest)
567        curr_partition = (curr_partition + 1) % num_partitions
568
569     for partition in nest_partitions:
570        pusher = DataPusher( loc_push_interval, partition, plush_info, 1 )
571     
572     print "[storkplushd]: Started " + str(num_partitions) + " threads to push data"
573
574 """
575     # start pushing metadata
576     # Partition the nest array into lists with loc_nodes_per_thread threads per
577     # connection manager, and supply two DataPusher instances for each partition.
578     push_nest_count = 0
579     connect_nest_count = 0
580     push_nest_partition = []
581     connect_nest_partition = []
582     push_thread_count = 0
583     connect_thread_count = 0
584     for nest in nest_array:
585        if connect_nest_count < loc_nodes_per_thread:
586           connect_nest_partition.append(nest)
587           connect_nest_count = connect_nest_count + 1
588
589        if push_nest_count < 0.5 * loc_nodes_per_thread:
590           push_nest_partition.append(nest)
591           push_nest_count = push_nest_count + 1
592        
593        if push_nest_count >= 0.5 * loc_nodes_per_thread:
594           plush_data_pusher = DataPusher( loc_push_interval, push_nest_partition, plush_info, 1 )
595           push_nest_count = 0
596           push_nest_partition = []
597           push_thread_count = push_thread_count + 1
598        
599        if connect_nest_count >= loc_nodes_per_thread:
600           plush_connection_manager = ConnectionManager( connect_nest_partition, plush_info, 1 )
601           connect_nest_count = 0
602           connect_nest_partition = []
603
604     plush_data_pusher = DataPusher( loc_push_interval, push_nest_partition, plush_info, 1 )
605     plush_connection_manager = ConnectionManager( connect_nest_partition, plush_info, 1 )
606
607     push_thread_count = push_thread_count + 1
608     connect_thread_count = connect_thread_count + 1
609
610     print "[storkplushd]: started " + str(connect_thread_count) + " thread(s) for connection management; " + str(push_thread_count) + " for pushing data."
611 """
612     # wait to die
613
614
615 if __name__ == "__main__":
616     Main()