import repository from arizona
[raven.git] / lib / ravenlib / pubsub / receiver.py
1 import pickle
2 import socket
3 import sys
4 import threading
5 import time
6
7 DEFAULT_RENEW_INTERVAL = 5*60
8
9 class PubSubReceiver(threading.Thread):
10     def __init__(self, hostAddr, localPort = None, key=None):
11         threading.Thread.__init__(self)
12         self.hostAddr = hostAddr
13         self.renewInterval = DEFAULT_RENEW_INTERVAL
14         self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
15         if (localPort != None):
16             self.socket.bind(('', localPort))
17         self.socket.settimeout(30)
18         self.defaultPort = 4321
19         self.renew_dict = {}
20         self.key = key
21         self.start()
22
23     def msg_received(self, msg):
24         print >> sys.stdout, "msg_received"
25         pass
26
27     def renew(self):
28         self.update(command = "renew")
29
30     def update(self, dict=None, command="update", sendNow=True):
31         if dict:
32             self.renew_dict = dict.copy()
33         self.renew_dict["command"] = command
34         self.renew_dict["hostname"] = socket.gethostname()
35         if sendNow:
36             if self.hostAddr == None:
37                 print >> sys.stdout, "Hostaddr is None. Skipping update."
38             else:
39                 self.socket.sendto(pickle.dumps(self.renew_dict), 0, self.hostAddr)
40
41     def try_receive(self):
42         try:
43             (data, remoteAddr) =  self.socket.recvfrom(64000)
44         except socket.timeout:
45             return
46         except Exception, e:
47             # We can catch an EINTR when another thread (i.e. tempest) does
48             # an os.popen. EINTR does not appear to be a subclass of OSError
49             # or IOError, and the only way I've found to catch it is by
50             # trapping a general exception and checking the first part of the
51             # tuple.
52             if e[0] == 4:
53                 return
54             raise e
55
56         if "\0" in data:
57            (pickled_data, pickled_sig) = data.split("\0", 2)
58            data = pickle.loads(pickled_data)
59            sig = pickle.loads(pickled_sig)
60         else:
61            pickled_data = data
62            data = pickle.loads(pickled_data)
63            pickled_sig = None
64            sig = None
65
66         if self.key:
67             if not sig:
68                 print >> sys.stderr, "No signature in packet. Dropping."
69                 return
70
71             if not "signature" in sig:
72                 print >> sys.stderr, "Signature malformed in packet. Dropping."
73                 return
74
75             if not self.key.verify(pickled_data, sig["signature"]):
76                 print >> sys.stderr, "Signature verification failed. Dropping."
77                 return
78
79         self.msg_received(data)
80
81     def run(self):
82         print >> sys.stdout, "PubSubReceiver running"
83         lastRenewTime = 0
84         while True:
85             if (time.time() - lastRenewTime > self.renewInterval):
86                 self.renew()
87                 lastRenewTime = time.time()
88             self.try_receive()
89
90     def set_host(self, hostAddr):
91         if (hostAddr==None) or (isinstance(hostAddr, tuple)):
92             self.hostAddr = hostAddr
93         elif isinstance(hostAddr, str):
94             parts = hostAddr.split(":")
95             if len(parts) >= 2:
96                 self.hostAddr = (parts[0], int(parts[1]))
97             else:
98                 self.hostAddr = (parts[0], self.defaultPort)
99         else:
100             print >> sys.stderr, "Unsure how to understand", hostAddr
101
102         print >> sys.stdout, "Host addr set to", self.hostAddr
103
104         self.renew()
105
106     def get_host(self):
107         return self.hostAddr
108