import repository from arizona
[raven.git] / apps / digdug / digdugclient.py
1 #! /usr/bin/env python
2
3 """
4 <Program Name>
5
6 <Started>
7
8 <Author>
9
10 <Purpose>
11 """
12
13 from optparse import OptionParser
14 import os
15 import socket
16 import SocketServer
17 import sys
18 import signal
19 import traceback
20 import threading
21 import time
22
23 import ravenlib.daemon
24 import ravenlib.report
25 import ravenlib.stats
26 from ravenlib.pubsub.receiver import PubSubReceiver
27 from ravenlib.xmlrpc.server import UDSServerBase
28
29 DEFAULT_PORT = 4322
30
31 # if we don't hear from server within 5 minutes, figure something bad has
32 # happened.
33 STALE_THRESHOLD = 5*60
34
35 # digdugclientversion.py is installed automatically by the postinstall rpm
36 try:
37     from digdugclientversion import *
38 except:
39     version="unknown"
40     release="unknown"
41
42 class BadVersionException(Exception):
43    def __init__(self, clientVersion, serverVersion):
44        self.msg = "Client data is different version than server data (" + str(clientVersion) + " != " + str(serverVersion) + ")"
45
46    def __str__(self):
47        return repr(self.msg)
48
49 class StaleException(Exception):
50    def __init__(self, elapsed):
51        self.msg = "Client is stale (" + str(elapsed) + " seconds)"
52
53    def __str__(self):
54        return repr(self.msg)
55
56 class NotSubscribedException(Exception):
57    def __init__(self):
58        self.msg = "Digdugclient is not subscribed to a digdugserver"
59
60    def __str__(self):
61        return repr(self.msg)
62
63 class DigDugReceiver(PubSubReceiver):
64     def __init__(self, hostAddr, localPort=None):
65         self.seqno = -1
66         self.version = -1
67         self.client = None
68         PubSubReceiver.__init__(self, hostAddr, localPort)
69
70     def set_client(self, x):
71         self.client = x
72
73     def msg_received(self, msg):
74         seqno = msg.get("seqno", 0)
75         ravenlib.report.debug("received message seqno=" + str(seqno))
76         if self.client:
77             self.client.msg_received(msg)
78
79 class DigDugClient(UDSServerBase):
80     def __init__(self, receiver):
81         UDSServerBase.__init__(self, "uds:/tmp/digdugclient", threaded=True)
82         self.data = {"channels": {}, "wantChannels": []}
83         self.msg = {}
84         self.receiver = receiver
85         self.mutex = threading.Lock()
86         self.receive_timestamp = time.time()
87         self.serverData = {}
88         self.subscribed = False
89
90         if receiver:
91             receiver.set_client(self)
92
93     def register_functions(self):
94         UDSServerBase.register_functions(self)
95         self.server.register_function(self.get_kind)
96         self.server.register_function(self.get_channels)
97         self.server.register_function(self.get_channel)
98         self.server.register_function(self.set_state)
99         self.server.register_function(self.set_version)
100         self.server.register_function(self.set_user_data)
101         self.server.register_function(self.clear_user_data)
102         self.server.register_function(self.subscribe)
103         self.server.register_function(self.verify_version)
104         self.server.register_function(self.waitfor)
105         self.server.register_function(self.waitfor_multi)
106
107     def _lock(self):
108         self.mutex.acquire()
109         pass
110
111     def _unlock(self):
112         self.mutex.release()
113         pass
114
115     def _get_channel(self, name):
116         if name in self.data["channels"]:
117             return self.data["channels"][name]
118         else:
119             dict = {"server_states": [], "client_states": [], "server_version": 0, "version": 0}
120             self.data["channels"][name] = dict
121             return dict
122
123     def _signal_owl(self):
124         # signal owl and tell it to run immediately
125         try:
126             file("/tmp/owl_run_now", "wt").write("signaled by digdug")
127         except:
128             ravenlib.report.info("exception while creating owl signal file")
129
130     def _server_state_changed(self, channel):
131         ravenlib.report.info("server_state: " + channel + ": " + ",".join(self.data["channels"][channel]["server_states"]))
132         self._signal_owl()
133
134     def _client_state_changed(self, channel):
135         ravenlib.report.info("client_state: " + channel + ": " + ",".join(self.data["channels"][channel]["client_states"]))
136         self.receiver.update(self.data)
137         self._signal_owl()
138
139     def _client_version_changed(self, channel):
140         ravenlib.report.info("client version updated: " + channel)
141         self.receiver.update(self.data)
142         self._signal_owl()
143
144     def _elapsed_since_update(self):
145         return time.time() - self.receive_timestamp
146
147     def _is_stale(self, threshold=STALE_THRESHOLD):
148         return self._elapsed_since_update() > threshold
149
150     def _apply_msg(self, msg):
151         self.receive_timestamp = time.time()
152         channels = self.msg.get("channels", {})
153         ravenlib.report.debug("_apply_msg: " + str(msg))
154         for channel in channels:
155             new_server_states = channels[channel].get("server_states", [])
156             new_server_version = channels[channel].get("version", 0)
157
158             dict = self._get_channel(channel)
159
160             current_server_states = dict.get("server_states", [])
161             if (new_server_states != current_server_states):
162                 dict["server_states"] = new_server_states
163                 self._server_state_changed(channel)
164
165             current_server_version = dict.get("server_version", 0)
166             if (new_server_version != current_server_version):
167                 ravenlib.report.info("server_version: " + channel + ": " + str(new_server_version))
168                 dict["server_version"] = new_server_version
169
170             # serverChannelData gets a copy of everything that the server
171             # told us. We put these in a separate dict (serverChannelData)
172             # since we don't want that data going back to the server. It's
173             # unidirectional, server->client.
174             self.serverData[channel] = channels[channel].copy()
175
176     def msg_received(self, msg):
177         self._lock()
178         try:
179             self.msg = msg
180             self._apply_msg(self.msg)
181         finally:
182             self._unlock()
183
184     def get_kind(self):
185         return ("digdugclient", version, release)
186
187     def get_channel(self, name):
188         self._lock()
189         try:
190             dict = self._get_channel(name).copy()
191             dict["stale"] = self._is_stale()
192             dict["elapsed_since_update"] = self._elapsed_since_update()
193             dict["serverData"] = self.serverData.get(name)
194         finally:
195             self._unlock()
196         return dict
197
198     def get_channels(self):
199         self._lock()
200         try:
201             keys = self.data["channels"].keys()
202         finally:
203             self._unlock()
204         return keys
205
206     def set_state(self, name, new_states):
207         ravenlib.report.info("set_state: channel=" + name + " new_states=" + ",".join(new_states))
208
209         self._lock()
210         try:
211             dict = self._get_channel(name)
212             if dict.get("client_states",[]) != new_states:
213                 dict["client_states"] = new_states
214                 self._client_state_changed(name)
215         finally:
216             self._unlock()
217
218         return "ok"
219
220     def set_version(self, name, version):
221
222         self._lock()
223         try:
224             dict = self._get_channel(name)
225             dict["version"] = version
226             self.set_modified()
227         finally:
228             self._unlock()
229
230         return "ok"
231
232     def verify_version(self, channel):
233         self._lock()
234         try:
235             dict = self._get_channel(channel)
236
237             client_version = dict.get("version", 0)
238             server_version = dict.get("server_version", 0)
239         finally:
240             self._unlock()
241
242         if (self._is_stale()):
243             raise StaleException(self._elapsed_since_update())
244
245         if (client_version != server_version):
246             raise BadVersionException(client_version, server_version)
247
248         return "ok"
249
250     def waitfor(self, channel, state, count, new_version=False, timeout=0):
251
252         self.waitfor_multi(self, channel, [state], [], count, new_version, timeout)
253
254     def waitfor_multi(self, channel, states, count, new_version=False, timeout=0, raise_badversion=True):
255         if not self.subscribed:
256             raise NotSubscribedException()
257
258         tStart = time.time()
259         while True:
260             self._lock()
261             try:
262                 dict = self._get_channel(channel)
263                 server_states = dict["server_states"]
264
265                 # if we're in one of the '@' states, then allow the version
266                 # change
267                 for state in states:
268                    if state.startswith('@') and state[1:] in server_states:
269                        new_version = True
270
271                 # if new_version==True, then update the version number as
272                 # necessary
273                 client_version = dict.get("version", 0)
274                 server_version = dict.get("server_version", 0)
275                 if (new_version) and (client_version != server_version):
276                     client_version = server_version
277                     dict["version"] = dict.get("server_version", 0)
278                     self._client_version_changed(channel)
279             finally:
280                 self._unlock()
281
282             if (client_version != server_version):
283                 if raise_badversion:
284                     raise BadVersionException(client_version, server_version)
285             else:
286                 for state in states:
287                     state = state.lstrip('@')
288                     if state in server_states:
289                         return {"result": "ok", "count": 1, "state": state}
290
291             if (timeout>0) and ((time.time()-tStart) > timeout):
292                 return {"result": "timeout", "count": 1}
293
294             # only throw stale exceptions if new_version is not set
295             if (not new_version) and (self._is_stale()):
296                 raise StaleException(self._elapsed_since_update())
297
298             time.sleep(1)
299
300     def subscribe(self, hostAddr, channels=[]):
301         # add the channels to the list of the ones we want to receive
302         ravenlib.report.info("subscribe host=" + hostAddr + " channels=" + str(channels))
303         wantChannels = self.data["wantChannels"]
304         for channel in channels:
305             if not (channel in wantChannels):
306                 ravenlib.report.info("adding " + channel + " to wantChannels list")
307                 wantChannels.append(channel)
308
309         self.receiver.update(self.data, sendNow=False)
310         self.receiver.set_host(hostAddr)
311
312         if self.subscribed:
313             # if we were already subscribed, then do the update again, this time
314             # with sendNow=True, so that we inform the server of the new channels
315             # we are interested in.
316             self.receiver.update(self.data, sendNow=True)
317
318         self.subscribed = True
319
320     def add_user_data(self, channel, name, value):
321         dict = self._get_channel(channel)
322         if not "userdata" in dict:
323             dict["userdata"] = {}
324
325         dict["userdata"][name] = value
326
327         self.receiver.update(self.data)
328
329     def set_user_data(self, channel, dict):
330         channel_dict = self._get_channel(channel)
331         channel_dict["userdata"] = dict
332         self.receiver.update(self.data)
333
334     def clear_user_data(self, channel, name=None):
335         dict = self._get_channel(channel)
336         if not "userdata" in dict:
337             dict["userdata"] = {}
338
339         if name == None:
340             # if name == None then it means to globally delete everything
341             dict["userdata"] = {}
342         else:
343             if name in dict["userdata"]:
344                 del dict["userdata"][name]
345
346         self.receiver.update(self.data)
347
348
349 def create_parser():
350    # Generate command line parser
351    parser = OptionParser(usage="digdugclient [options]",
352         description="Starts the digdug client")
353
354    parser.add_option("-p", "--port", dest="port",
355         help="port to send to", action="store", type="int", default=DEFAULT_PORT)
356    parser.add_option("-l", "--localport", dest="localport",
357         help="local port to find to", action="store", type="int", default=None)
358    parser.add_option("-s", "--sync", dest="sync",
359         help="sync mode (no daemon)", action="store_true", default=False)
360    parser.add_option("-v", "--verbose", dest="verbose",
361         help="verbose mode", action="store_true", default=False)
362    parser.add_option("", "--version", dest="print_version",
363         help="print version number and exit", action="store_true", default=False)
364
365    parser.disable_interspersed_args()
366
367    return parser
368
369 def dumpstacks(signal, frame):
370     code = []\r
371     for threadId, stack in sys._current_frames().items():\r
372         code.append("\n# Thread: %d" % (threadId))\r
373         for filename, lineno, name, line in traceback.extract_stack(stack):\r
374             code.append('File: "%s", line %d, in %s' % (filename, lineno, name))\r
375             if line:\r
376                 code.append("  %s" % (line.strip()))\r
377 \r
378     file("/var/log/digdugstacktrace", "w").write("\n".join(code))\r
379 \r
380 def main():
381     global glo_options
382
383     signal.signal(signal.SIGHUP, dumpstacks)
384
385     parser = create_parser()
386     (glo_options, args) = parser.parse_args()
387
388     if glo_options.print_version:
389        print version + "-" + release
390        sys.exit(0)
391
392     if glo_options.verbose:
393        ravenlib.report.getLogger().setLevel(ravenlib.report.DEBUG)
394
395     ravenlib.report.info("Starting")
396     if not glo_options.sync:
397         # run as a daemon
398         ravenlib.daemon.make_daemon("digdugclient")
399
400     receiver = DigDugReceiver(hostAddr = None, localPort = glo_options.localport) #("localhost", glo_options.port))
401
402     xmlrpc_server = DigDugClient(receiver)
403     xmlrpc_server.run()
404
405 def error_exc(msg, exc):
406     # fancy printing for a message and exception
407     # Msg: Exception
408     #  | tb line 1
409     #  | tb line 2
410     #  ...
411     msg = [msg + ":" + str(exc)] + traceback.format_exc().splitlines()
412
413     file("/var/log/digdugexception.log","a").write("\n | ".join(msg) + "\n")
414
415 def main_exc_wrapper():
416    try:
417        main()
418    except Exception, e:
419        try:
420            error_exc("Exception in main", e)
421        except:
422            # Uh oh, logging was broken. Just send a message to the console.
423            print "Exception while trying to log an exception!"
424
425        # re-raise it so the stack dump, etc goes to the console
426        raise
427
428 if __name__ == "__main__":
429     main_exc_wrapper()