import repository from arizona
[raven.git] / apps / digdug / digdugserver.py
1 #! /usr/bin/env python
2
3 """
4 <Program Name>
5
6 <Started>
7
8 <Author>
9
10 <Purpose>
11 """
12
13 from ConfigParser import ConfigParser
14 from optparse import OptionParser
15 import os
16 import socket
17 import sys
18 import threading
19 import time
20 import SocketServer
21
22 import ravenlib.daemon
23 import ravenlib.report
24 from ravenlib.pubsub.sender import PubSubSender, PubSubServer
25 from ravenlib.xmlrpc.server import UDSServerBase
26
27 glo_options = None
28 glo_config = None
29
30 CONFIG_FILE = "/etc/digdugserver.conf"
31
32 DEFAULT_CONTROLADDR = "uds:/tmp/digdugserver"
33 DEFAULT_PORT = 4322
34
35 # digdugserverversion.py is installed automatically by the postinstall rpm
36 try:
37     from digdugserverversion import *
38 except:
39     version="unknown"
40     release="unknown"
41
42 class DigDugServer(UDSServerBase):
43     def __init__(self, controladdr, sender):
44         ravenlib.report.info("Binding control socket to " + str(controladdr))
45         UDSServerBase.__init__(self, controladdr, threaded=True)
46         self.msg = {"channels": {}}
47         self.sender = sender
48         self.mutex = threading.Lock()
49
50     def register_functions(self):
51         UDSServerBase.register_functions(self)
52         self.server.register_function(self.get_kind)
53         self.server.register_function(self.get_channel)
54         self.server.register_function(self.get_channels)
55         self.server.register_function(self.get_clients)
56         self.server.register_function(self.purge_unsubscribed)
57         self.server.register_function(self.set_state)
58         self.server.register_function(self.set_version)
59         self.server.register_function(self.set_variables)
60         self.server.register_function(self.inc_version)
61         self.server.register_function(self.waitfor)
62         self.server.register_function(self.set_membership)
63
64     def _lock(self):
65         self.mutex.acquire()
66
67     def _unlock(self):
68         self.mutex.release()
69
70     def _get_channel(self, name):
71         if name in self.msg["channels"]:
72             return self.msg["channels"][name]
73         else:
74             dict = {"server_states": [], "version": int(time.time())}
75             self.msg["channels"][name] = dict
76             return dict
77
78     def set_modified(self):
79         ravenlib.report.debug("msg=" + str(self.msg))
80         self.sender.set_message(self.msg)
81         self.sender.send_now()
82
83     def get_kind(self):
84         return ("digdugserver", version, release)
85
86     def get_channel(self, name):
87         return self._get_channel(name)
88
89     def get_channels(self):
90         keys = self.msg["channels"].keys()
91         return keys
92
93     def get_clients(self, channels=[]):
94         clients = self.sender.get_subscribers()
95
96         # change the timestamp field to an age field, since timestamps are
97         # uninteresting outside the server
98         tNow = time.time()
99         for key in clients.keys():
100             client_dict = clients[key]
101             client_dict["age"] = int(tNow - client_dict.get("timestamp", 0))
102
103         if channels:
104             # filter the caller's view
105             filtered_clients = {}
106             for key in clients.keys():
107                 client_dict = clients[key]
108                 if "wantChannels" in client_dict:
109                     found = False
110                     for channel in channels:
111                         if channel in client_dict["wantChannels"]:
112                             found=True
113                     if found:
114                         filtered_clients[key] = client_dict
115             clients = filtered_clients
116
117         #ravenlib.report.debug("get_clients: " + str(clients))
118
119         return clients
120
121     def purge_unsubscribed(self):
122         self.sender.purge_unsubscribed()
123         return "ok"
124
125     def purge_userdata(self, channel=None):
126         # to be completed...
127         return "ok"
128
129     def set_variables(self, channel, variables_dict):
130         modified = {}
131
132         self._lock()
133         try:
134             dict = self._get_channel(channel)
135
136             for key in variables_dict.keys():
137                value = variables_dict[key]
138                if (not key in dict) or (dict[key]!=value):
139                    dict[key] = value
140                    modified[key] = value
141
142             if (modified != {}):
143                 self.set_modified()
144         finally:
145             self._unlock()
146
147         # pretty-printing of what variables were changed
148
149         for key in modified:
150             value = modified[key]
151             if (value=="membership"):
152                  hostnames = [d.get("hostname","") for d in new_membership]
153                  ravenlib.report.info("set var " + key + " on " + channel + ": hostnames = " + ",".join(hostnames))
154             else:
155                  desc = str(value)
156                  if (len(desc)>40):
157                      desc = desc[:40] + "..."
158                  ravenlib.report.info("set var " + key + " on " + channel + ": values = " + desc)
159
160         return "ok"
161
162     def set_membership(self, channel, new_membership=[]):
163         return self.set_variables(channel, {"membership": new_membership})
164
165     def set_state(self, channel, new_states=[]):
166         return self.set_variables(channel, {"server_states": new_states})
167
168     def set_version(self, channel, version):
169         return self.set_variables(channel, {"version": version})
170
171     def inc_version(self, name):
172
173         self._lock()
174         try:
175             dict = self._get_channel(name)
176             dict["version"] = dict.get("version", 0) + 1
177             ravenlib.report.info("inc_version: channel=" + name + " new_version=" + str(dict["version"]))
178             self.set_modified()
179         finally:
180             self._unlock()
181
182         return "ok"
183
184     def waitfor(self, channel, state, count=1, new_version=False, timeout=0):
185         tStart = time.time()
186         while True:
187             server_version = self._get_channel(channel).get("version", 0)
188             found_list = []
189             badversion_list = []
190             self.sender.lock()
191             try:
192                 subscribers = self.sender.subscribers
193                 for subkey in subscribers:
194                     clientDict = subscribers[subkey]
195                     channelsDict = clientDict.get("channels", {})
196                     if channel in channelsDict:
197                         dict = channelsDict[channel]
198                         client_version = dict.get("version", 0)
199                         states = dict.get("client_states", [])
200                         if (client_version != server_version):
201                             badversion_list.append(
202                                 {"addr": clientDict["addr"],
203                                  "hostname": clientDict.get("hostname", "unknown"),
204                                  "client_version": client_version,
205                                  "server_version": server_version})
206                         elif state in states:
207                             found_list.append(
208                                  {"addr": clientDict["addr"],
209                                   "hostname": clientDict.get("hostname", "unknown")})
210             finally:
211                 self.sender.unlock()
212
213             if len(found_list) >= count:
214                 return {"result": "ok", "count": len(found_list), "list": found_list, "badversion_list": badversion_list}
215
216             if (timeout>0) and ((time.time()-tStart) > timeout):
217                 return {"result": "timeout", "count": len(found_list), "list": found_list, "badversion_list": badversion_list}
218
219             time.sleep(1)
220
221 def get_config_opt(name, default):
222     if glo_config.has_option("DEFAULT", name):
223         return glo_config.get("DEFAULT", name)
224     else:
225         return default
226
227 def read_config_file():
228     global glo_config
229
230     glo_config = ConfigParser()
231     if os.path.isfile(CONFIG_FILE):
232         glo_config.read(CONFIG_FILE)
233
234 def create_parser():
235    # Generate command line parser
236    parser = OptionParser(usage="digdugserver [options]",
237         description="Starts the digdug server")
238
239    parser.add_option("-p", "--port", dest="port",
240         help="port to listen on", action="store", type="int", default=int(get_config_opt("listenport", DEFAULT_PORT)))
241    parser.add_option("-s", "--sync", dest="sync",
242         help="sync mode (no daemon)", action="store_true", default=False)
243    parser.add_option("-v", "--verbose", dest="verbose",
244         help="verbose mode", action="store_true", default=False)
245    parser.add_option("-c", "--controladdr", dest="controladdr",
246         help="contral address", action="store", default=get_config_opt("controladdr", DEFAULT_CONTROLADDR))
247    parser.add_option("", "--version", dest="print_version",
248         help="print version number and exit", action="store_true", default=False)
249
250    parser.disable_interspersed_args()
251
252    return parser
253
254 def main():
255     global glo_options
256
257     read_config_file()
258
259     parser = create_parser()
260     (glo_options, args) = parser.parse_args()
261
262     if glo_options.print_version:
263        print version + "-" + release
264        sys.exit(0)
265
266     if glo_options.verbose:
267        ravenlib.report.getLogger().setLevel(ravenlib.report.DEBUG)
268
269     ravenlib.report.info("Starting")
270     if not glo_options.sync:
271         # run as a daemon
272         ravenlib.daemon.make_daemon("digdugserver")
273
274     sender = PubSubSender()
275     server = PubSubServer(("", glo_options.port), sender)
276
277     xmlrpc_server = DigDugServer(glo_options.controladdr, sender)
278     xmlrpc_server.run()
279
280 if __name__ == "__main__":
281     main()