import repository from arizona
[raven.git] / apps / ravenpublish / kongclient.py
1 import ConfigParser
2 import os
3 import sys
4 import glob
5
6 import ravenlib.client.digdugapi
7 import ravenlib.tableprint
8 from ravenlib.ravenlog import RavenLog
9
10 class KongClientError(Exception):
11     def __init__(self, value):
12         self.value = value
13     def __str__(self):
14         return repr(self.value)
15
16 class KongClientTerminated(KongClientError):
17     pass
18
19
20 class KongClient(RavenLog):
21     def __init__(self, addr=None, channel=None, configdir=None, suppressConnectMessage=False, deferConnect=False):
22         # RavenLog object is mixed in to give us a configurable Print statement
23         RavenLog.__init__(self)
24
25         self.addr = addr
26         self.channel = channel
27         self.state = None
28         self.minClients = 0
29         self.maxClients = 0
30         self.set_table_output("ascii")
31         self.digdug_variables = {}
32         self.suppressConnectMessage = suppressConnectMessage
33         self.deferConnect = deferConnect
34
35         self.digdug = None
36         self.digdug_kind = None
37         self.digdug_version = None
38         self.digdug_release = None
39
40         self.terminate_signal = False
41
42         if configdir != None:
43             self.readConfigFiles(configdir)
44
45         # initial states; i.e. things that cause an incversion
46         self.initials = ["initialize", "reset"]
47
48         # list of state requirements. For example, to set the "run" state, we
49         # require clients to be in the "prepared" state
50         self.requires = {}
51         self.requires["prepare"] = {"server": "initialize", "client": "initialized"}
52         self.requires["run"] = {"server": "prepare", "client": "ready", "set_membership": True}
53
54         if not self.deferConnect:
55             self.connect()
56
57     def terminate(self):
58         self.terminate_signal = True
59
60     def clone(self):
61         # Create a copy of this object, similar to this one, but using a different
62         # connection. Useful for passing to threads which want a private object.
63
64         x = KongClient(self.addr, self.channel, suppressConnectMessage=True, deferConnect=True)
65         x.digdug_kind = self.digdug_kind
66         x.digdug_version = self.digdug_version
67         x.digdug_release = self.digdug_release
68         return x
69
70     def readConfigFiles(self, dir):
71         parser = ConfigParser.ConfigParser()
72         parser.read(glob.glob(os.path.join(dir, "*.conf")))
73         if parser.has_section("kong"):
74             if parser.has_option("kong", "channel"):
75                 self.channel = parser.get("kong", "channel")
76             if parser.has_option("kong", "server"):
77                 self.addr = parser.get("kong", "server")
78                 if (not self.addr.startswith("http:")) or (not self.addr.startswith("http:")):
79                     self.addr = "http://" + self.addr
80
81     def set_minClients(self, minClients):
82         self.minClients = minClients
83
84     def set_maxClients(self, maxClients):
85         self.maxClients = maxClients
86
87     def set_variable(self, name, value):
88         self.digdug_variables[name] = value
89
90     def set_table_output(self, format, stream=sys.stdout):
91         if (format=="html"):
92             self.table_formatter = ravenlib.tableprint.TableFormatterHtml(stream)
93         elif (format=="ascii"):
94             self.table_formatter = ravenlib.tableprint.TableFormatterAscii(stream)
95         elif (format=="csv"):
96             self.table_formatter = ravenlib.tableprint.TableFormatterCSV(stream)
97
98     def connect(self):
99         if not self.digdug:
100             if not self.suppressConnectMessage:
101                 self.Print("kong: connecting to", self.addr)
102             self.digdug = ravenlib.client.digdugapi.DigDugClient(self.addr)
103             if not self.digdug_kind:
104                 (digdug_kind, digdug_version, digdug_release) = self.digdug.get_kind()
105
106     def waitfor(self, state):
107         # because of the membership variables, we always want to call waitfor,
108         # even if minclients is zero, so that we can get the list of clients
109         # that are ready, to pass to set_membership.
110
111         #if self.minClients == 0:
112         #    return
113
114         self.connect()
115
116         self.Print("waiting for", self.minClients, "clients to enter client state", state)
117         ready_list = []
118         ready_addrs = []
119         while True:
120             result = self.digdug.waitfor(self.channel, state, count = self.minClients, timeout = 5)
121
122             if self.terminate_signal:
123                 self.Print("    termination request received in waitfor")
124                 raise KongClientTerminated("terminate in waitfor")
125
126             this_ready_list = result.get("list", [])
127
128             if (self.maxClients>0) and (len(this_ready_list) > self.maxClients):
129                 this_ready_list = this_ready_list[:self.maxClients]
130
131             for ready_dict in this_ready_list:
132                 client_addr = ready_dict["addr"]
133                 if not client_addr in ready_addrs:
134                     ready_addrs.append(client_addr)
135                     ready_list.append(ready_dict)
136                     self.Print("  ", state, "(", str(min(len(ready_list), result.get("count",0))) + "):", ready_dict["hostname"])
137
138             # break on success
139             if result.get("result") == "ok":
140                 self.Print("   successful completion of waitfor '" + state + "'")
141                 return this_ready_list
142
143     def get_channel_data(self):
144         self.connect()
145         return self.digdug.get_channel(self.channel)
146
147     def get_clients(self):
148         self.connect()
149         return self.digdug.get_clients([self.channel])
150
151     def get_client_status(self, states=[], showResults=True, showMeta=True, showStates=True, showUnsubscribed=False):
152         clients = self.get_clients()
153         l = []
154         for key in clients.keys():
155             client_dict = clients[key]
156
157             if (not showUnsubscribed) and (not client_dict.get("subscribed", False)):
158                  continue
159
160             addr = client_dict["addr"]
161             hostname = addr[0]
162             port = addr[1]
163
164             # use the hostname if available
165             if "hostname" in client_dict:
166                 hostname = client_dict["hostname"]
167
168             if "channels" in client_dict:
169                 channel_dict = client_dict["channels"].get(self.channel, None)
170             else:
171                 channel_dict = None
172
173             # do some reformatting
174             dict = {}
175             dict["addr"] = str(hostname) + ":" + str(port)
176
177             if showUnsubscribed:
178                 # only need to show subscribed status if showUnsubscribed
179                 # is set, otherwise it's true for every displayed item
180                 dict["subscribed"] = client_dict.get("subscribed", "False")
181
182             if showMeta:
183                 dict["age"] = int(client_dict.get("age", 0))
184
185             if channel_dict != None:
186                 client_states = channel_dict.get("client_states", [])
187
188                 if states:
189                     found=False
190                     for state in states:
191                         if state in client_states:
192                             found=True
193                     if not found:
194                         continue
195
196                 if showStates:
197                     dict["state"] = "/".join(client_states)
198
199                 if (showResults) and ("userdata" in channel_dict):
200                     dict.update(channel_dict["userdata"])
201
202             l.append(dict)
203
204         return l
205
206     def status(self, states=[], showResults=True, showMeta=True, showStates=True, showUnsubscribed=False):
207         l = self.get_client_status(states, showResults, showMeta, showStates, showUnsubscribed)
208
209         fields = ravenlib.tableprint.build_fields(l, keyFields=["addr"])
210
211         fields = ravenlib.tableprint.sort_fields(fields, ["addr","subscribed","age","state"])
212
213         ravenlib.tableprint.print_table(l, fields, self.table_formatter, sortby="addr")
214
215     def results(self):
216         self.status(showMeta=False, showStates=False, states=["complete"])
217
218     def set_membership(self, ready_list):
219         self.connect()
220         clients = self.digdug.get_clients()
221         mem_list = []
222         if ready_list:
223             for ready_dict in ready_list:
224                 addr = ready_dict["addr"][0] + ":" + str(ready_dict["addr"][1])
225                 mem_dict = {"hostname": ready_dict["hostname"]}
226
227                 # get client data so we can extract mem_* userdata from it
228                 client_dict = clients.get(addr, {})
229                 if "channels" in client_dict:
230                     channel_dict = client_dict["channels"].get(self.channel, None)
231                     if channel_dict:
232                         #print "xxx", addr, ready_dict["hostname"], channel_dict
233                         for key in channel_dict.get("userdata",{}):
234                             if key.startswith("mem_"):
235                                 mem_dict[key] = channel_dict["userdata"][key]
236                 mem_list.append(mem_dict)
237
238         self.Print("   setting membership list to", len(mem_list), "nodes")
239         # print mem_list
240         self.digdug.set_membership(self.channel, mem_list)
241
242     def setstate(self, state):
243         self.connect()
244         if state in self.requires:
245             serverState = self.requires[state].get("server", None)
246             if serverState:
247                 dict = self.digdug.get_channel(self.channel)
248                 # print dict
249                 digdug_serverStates = dict.get("server_states", [])
250                 if not serverState in digdug_serverStates:
251                     self.Print("error: can only transition to state", state, "from state", serverState)
252                     self.Print("current state is", digdug_serverStates)
253                     sys.exit(-1)
254
255             clientState = self.requires[state].get("client", None)
256             if clientState:
257                 ready_list = self.waitfor(clientState)
258                 if self.requires[state].get("set_membership", False):
259                     self.set_membership(ready_list)
260
261         if state in self.initials:
262             self.digdug.inc_version(self.channel)
263
264         self.Print("kong: setting server state to", state)
265
266         self.digdug.set_variables(self.channel, self.digdug_variables)
267         self.digdug.set_state(self.channel, [state])
268
269     def run(self):
270         if self.minClients==0:
271             print >> sys.stderr, "Error: must use -n to specify minimum number of clients"
272             sys.exit(-1)
273         self.setstate("initialize")
274         self.setstate("prepare")
275         self.setstate("run")
276         self.waitfor("complete")
277
278         print "Use 'raven experiment results' to print results"
279
280     def reset(self):
281         self.setstate("reset")
282