import repository from arizona
[raven.git] / apps / kong-ui / kong_controller.py
1 #!/usr/bin/env python
2
3 """
4 kong_controller.py
5
6 This file contains the KongController class, responsible for performing tasks
7 requested by the user through the KongUIApplication interface. It handles most
8 if not all tasks asynchronously through the use of the library class KongClient.
9
10 author: Charles Magahern <charles.magahern@arizona.edu>
11 date:   4/12/2011
12 """
13
14 import threading
15 import thread
16 import sys
17 import os
18 import subprocess
19 import string
20
21 import container
22 import kongclient
23
24 (
25     KONG_WORKER_STATUS_IDLE,
26     KONG_WORKER_STATUS_BUSY,
27     KONG_WORKER_STATUS_ABORT
28 ) = range(3)
29
30 # Redirect standard output to accessible objects on creation.
31 class KongOutputBuffer:
32     def __init__(self):
33         self._output_list = []
34         self._output_str  = ""
35     
36     def start(self):
37         sys.stdout = self
38         
39     def end_clean(self):
40         del self._output_list[:]
41         self._output_str = ""
42         sys.stdout = sys.__stdout__
43         
44     def end_flush(self):
45         sys.stdout = sys.__stdout__
46         for item in self._output_list:
47             print item
48         del self._output_list[:]
49         self._output_str = ""
50     
51     def write(self, data):
52         if len(data) > 0 and data[0] != '\n':
53             self._output_list.append(data)
54         self._output_str += data
55         
56     def clear(self):
57         del self._output_list[:]
58         self._output_str = ""
59         
60     def get_contents(self):
61         return self._output_str
62         
63     def get_list_contents(self):
64         return self._output_list
65     
66     @staticmethod
67     def Cleanup():
68         sys.stdout = sys.__stdout__
69
70         
71 class KongController:
72     def __init__(self, wdir=None, delegate=None):
73         self.kong_client     = None                     # KongClient object
74         self.kong_container  = container.container()    # Container object
75         self.working_dir     = None                     # Working directory for the experiment
76         self.delegate        = None                     # Delegate for asynchronous callbacks (UIApplication)
77         self.worker_thread   = None                     # Worker thread for asynchronous fetching operations
78         self.worker_status   = KONG_WORKER_STATUS_IDLE  # Worker thread status (idle, busy, abort)
79         
80         if (wdir == None):
81             self.working_dir = os.getcwd()
82         else:
83             self.working_dir = wdir
84         self.delegate = delegate
85         
86         if (self.check_directory()):    
87             self.kong_container.set_dir(self.working_dir)
88             self.kong_container.load()
89             
90             status_buf = KongOutputBuffer()
91             status_buf.start()
92             kong_dir = self.kong_container.get_kongdir()
93             self.kong_client = kongclient.KongClient(configdir=kong_dir)
94             status = status_buf.get_contents()
95             status_buf.end_clean()
96             
97             if (self.delegate != None):
98                 self.delegate.set_status(status)
99     
100     # Check to see if the directory given during the creation of the class is
101     # a valid raven experiment directory.
102     def check_directory(self):
103         files = os.listdir(self.working_dir)
104         found_conf = False
105         found_kong = False
106         for f in files:
107             if (f == 'raven.conf'):
108                 found_conf = True
109             elif (f == 'kong'):
110                 found_kong = True
111             
112             if (found_conf and found_kong):
113                 break
114         
115         return (found_conf and found_kong)
116     
117     def set_delegate(self, delegate):
118         self.delegate = delegate
119         
120     def set_worker_status(self, status):
121         self.worker_status = status
122     
123     # Checks worker thread's current status to see if it's allowed to run. If so, then it continues,
124     # otherwise it cleans up and exits.
125     def check_worker_status(self):
126         if (self.worker_status == KONG_WORKER_STATUS_ABORT):
127             if (threading.currentThread() == self.worker_thread):
128                 KongOutputBuffer.Cleanup()
129                 if (self.delegate != None):
130                     self.delegate.set_status("Aborted")
131                     self.delegate.append_log("Aborted")
132                     self.delegate.abort_callback()
133                 self.worker_status = KONG_WORKER_STATUS_IDLE
134                 sys.exit()
135     
136     def get_experiment_status(self):
137         def perform_async(self):
138             status = None
139             raw_output = None
140             
141             self.set_worker_status(KONG_WORKER_STATUS_BUSY)
142             
143             # If there's any output from the command, capture it in a buffer to send to
144             # the delegate later.
145             output_buf = KongOutputBuffer()
146             output_buf.start()
147             status = self.kong_client.get_client_status()
148             raw_output = output_buf.get_contents()
149             output_buf.end_clean()
150             
151             self.set_worker_status(KONG_WORKER_STATUS_IDLE)
152             
153             if (self.delegate != None):
154                 self.delegate.get_status_callback({'status' : status, 'raw_output' : raw_output})
155         
156         self.worker_thread = threading.Thread(target=perform_async, args=(self,))
157         self.worker_thread.start()
158         
159     def run_experiment(self, num_machines=1):
160         def perform_async(self, num_machines=1):
161             self.set_worker_status(KONG_WORKER_STATUS_BUSY)
162             self.kong_client.set_minClients(num_machines)
163             def stat_and_clear(buffer):
164                 output = ""
165                 if (self.delegate != None):
166                     output = buffer.get_contents()
167                     status_lines = output.split('\n')
168                     status = ""
169                     for line in status_lines:
170                         if (line[0:5] == 'kong:'):
171                             status = line
172                     
173                     self.delegate.set_status(status)
174                     self.delegate.append_log(output)
175                 buffer.clear()
176         
177             # Begin capturing output into buffer for display in the log panel
178             output_buf = KongOutputBuffer()
179             output_buf.start()
180             
181             # After every state transition, we're giong to check to see if the worker thread
182             # is still allowed to run. We're also going to update the status in the delegate
183             # after each transition, and also append output to the log, if any.
184             self.kong_client.setstate("initialize")
185             stat_and_clear(output_buf)
186             self.check_worker_status()
187             
188             self.kong_client.setstate("prepare")
189             stat_and_clear(output_buf)
190             self.check_worker_status()
191             
192             self.kong_client.setstate("run")
193             stat_and_clear(output_buf)
194             self.check_worker_status()
195             
196             self.kong_client.waitfor("complete")
197             stat_and_clear(output_buf)
198             self.check_worker_status()
199             
200             # Stop capturing output
201             output_buf.end_clean()
202         
203             if (self.delegate != None):
204                 self.delegate.run_callback()
205             self.set_worker_status(KONG_WORKER_STATUS_IDLE)
206         
207         self.worker_thread = threading.Thread(target=perform_async, args=(self, num_machines))
208         self.worker_thread.start()
209     
210     # We don't want to kill the thread immediately, as there may be pending locks or entered
211     # try catch blocks. Instead, set the worker status to ABORT and let the thread operation
212     # check if it has been aborted, and if it has, exit gracefully.
213     def abort_experiment(self):
214         if (self.worker_status == KONG_WORKER_STATUS_BUSY):
215             self.set_worker_status(KONG_WORKER_STATUS_ABORT)
216         
217     def reset_experiment(self):
218         def perform_async(self):
219             raw_output = ""
220             
221             self.set_worker_status(KONG_WORKER_STATUS_BUSY)
222             
223             output_buf = KongOutputBuffer()
224             output_buf.start()
225             self.kong_client.setstate("reset")
226             raw_output += output_buf.get_contents()
227             output_buf.end_clean()
228             
229             if (self.delegate != None):
230                 self.delegate.set_status("kong: setting server state to reset")
231                 self.delegate.reset_callback({'raw_output' : raw_output})
232             self.set_worker_status(KONG_WORKER_STATUS_IDLE)
233             
234         self.worker_thread = threading.Thread(target=perform_async, args=(self,))
235         self.worker_thread.start()
236         
237     def generate_report(self, num_machines=1):
238         def perform_async(self, num_machines=1):
239             output = ""
240             
241             self.set_worker_status(KONG_WORKER_STATUS_BUSY)
242             self.kong_client.set_minClients(num_machines)
243             
244             output_buf = KongOutputBuffer()
245             output_buf.start()
246             self.kong_client.report()
247             output += output_buf.get_contents()
248             output_buf.end_clean()
249             
250             if (self.delegate != None):
251                 self.delegate.set_status("kong is idle")
252                 self.delegate.generate_report_callback(output)
253             self.set_worker_status(KONG_WORKER_STATUS_IDLE)
254             
255         self.worker_thread = threading.Thread(target=perform_async, args=(self, num_machines))
256         self.worker_thread.start()
257     
258
259