import repository from arizona
[raven.git] / publish / smbaker-experiment / build / pubsubexperiment / usr / local / pubsubexperiment / expreceiver.py
1 #!/usr/bin/python
2
3 import os
4 import socket
5 from optparse import OptionParser
6 import sys
7 import threading
8 import time
9 import urllib
10 import urllib2
11 import traceback
12
13 import ravenlib.stats
14 from ravenlib.pubsub.receiver import PubSubReceiver
15
16 # hostname, port for pubsub
17 PUBSUBADDR = ("planetlab2.cs.uoregon.edu", 4322)
18 #PUBSUBADDR = ("quake.cs.arizona.edu", 4322)
19 #PUBSUBADDR = ("localhost", 4322)
20
21 # base URL for OWL
22 BASEURL = "http://quake.cs.arizona.edu/owl_beta/"
23
24 # database name to put our results
25 DBNAME = "pubsubexp"
26
27 # owl parameters for the script/config that will be reporting the results
28 MODULE="psexp"
29 VERSION = "0.7"
30 HEADING="pubsubexp"
31
32 def owlregister(module, fields):
33     url = BASEURL + "register?"
34
35     args = []
36     args.append(('db', DBNAME))
37     args.append(('module', module))
38     args.append(('version', VERSION))
39     args.append(('heading', HEADING))
40     args.append(('fields', ",".join(fields)))
41
42     for f in fields:
43             fname = "%s.%s" % (module, f)
44             type = "string"
45             heading = f
46             headingAlign = "center"
47             align = "right"
48 #            args.append(('field', ",".join((fname,module,type,heading,headingAlign))))   # old owl
49             args.append(('field', ",".join((fname,module,type,heading,headingAlign,align))))   # new owl
50
51     data = urllib.urlencode(args)
52
53 #    print "registering:", data
54
55     try:
56         s = urllib2.urlopen(url, data)
57         print "REGISTER RESULT:", s.read()
58     except:
59         # Note - this would be better done reliably with backoff/retry, like the
60         #  "real" owl client does. As it stands, we may get some 'too many connection'
61         #  errors.
62         print >> sys.stderr, traceback.format_exc()
63
64 def owlregisterall():
65     owlregister(MODULE, ["ver", "cntrecv", "cntwrongversion", "cntmissing", "cntgood", "cntoutoforder"])
66     owlregister("basic", ["host"])
67
68 def owlupdate(dict):
69     url = BASEURL + "update?"
70     data = []
71     data.append( ("db", DBNAME) )
72     for key in dict:
73         data.append( (key, dict[key]) )
74
75     data = urllib.urlencode(data)
76
77 #    print "updating:", data
78
79     try:
80         s = urllib2.urlopen(url, data)
81         print "UPDATE RESULT:", s.read()
82     except:
83         # Note - this would be better done reliably with backoff/retry, like the
84         #  "real" owl client does. As it stands, we may get some 'too many connection'
85         #  errors.
86         print >> sys.stderr, traceback.format_exc()
87
88 class ExperimentReceiver(PubSubReceiver):
89     def __init__(self, hostAddr):
90         PubSubReceiver.__init__(self, hostAddr)
91         # lower the renew interval down to once every two minutes
92         self.renewInterval = 2*60
93
94     def msg_received(self, msg):
95         version = msg.get("version", 0)
96         seqNo = msg.get("seqno", 0)
97         cmd = msg.get("cmd", "nop")
98
99         print "incoming:", msg
100
101         if (cmd == "reset"):
102             # Reset messages are when the sender wants to reset the state of
103             # the receivers for a new experiment. nuke all the counters.
104             print "reset message received"
105             self.version = version
106             self.cntRecv = 0
107             self.cntWrongVersion = 0
108             self.cntMissing = 0
109             self.cntGood = 0
110             self.cntOutOfOrder = 0
111             owlregisterall()
112
113         elif (cmd == "data"):
114             # Data messages are the ones that we're using for testing.
115             # Basically, just check that the sequence numbers are right.
116             self.cntRecv += 1
117             if version != self.version:
118                 print "version mismatch", version, "!=", self.version
119                 self.cntWrongVersion += 1
120             elif seqNo < self.seqNo:
121                 print "seqno order mismatch: cur=", seqNo, "last=", self.seqNo
122                 self.cntOutOfOrder += 1
123             elif seqNo > self.seqNo+1:
124                 print "seqno missing: cur=", seqNo, "last=", self.seqNo
125                 self.cntMissing += (seqNo-(self.seqNo+1))
126             else:
127                 self.cntGood += 1
128
129         elif (cmd == "report"):
130             # Report messages are when the experiment is complete. Send the
131             # results back to owl.
132             print "report message received"
133             results = {MODULE + ".ver": self.version,
134                        MODULE + ".cntrecv": self.cntRecv,
135                        MODULE + ".cntwrongversion": self.cntWrongVersion,
136                        MODULE + ".cntmissing": self.cntMissing,
137                        MODULE + ".cntgood": self.cntGood,
138                        MODULE + ".cntoutoforder": self.cntOutOfOrder,
139                        "basic.host": socket.gethostname()}
140
141             print "sending results:", results
142
143             owlupdate(results)
144
145         elif (cmd == "nop"):
146             # do nothing
147             pass
148
149         else:
150             print "unknown message received:", cmd
151
152         self.seqNo = seqNo
153
154 def make_daemon(program):
155    global loggingfd
156    print "Forking daemon"
157    pid = os.fork()
158
159    # if fork was successful, exit the parent process so it returns
160    try:
161       if pid > 0:
162          os._exit(0)
163    except OSError:
164         print "fork failed"
165         exit(1)
166
167    # Print my pid into /var/run/PROGRAM.pid
168    pid = str(os.getpid())
169    filename = "/var/run/%s.pid" % program
170    try:
171       out_file = open(filename, "w")
172       out_file.write(pid)
173       out_file.close()
174    except IOError:
175        print "error writing %s" % filename
176        # don't exit, we might
177
178    # close any open files
179    sys.stdin.close()
180    sys.stdout.close()
181    sys.stderr.close()
182    for fd in xrange(0,1023):
183            try:
184                os.close(fd)
185            except OSError:
186                pass
187
188    # redirect stdin/out/err to /dev/null
189    sys.stdin = open('/dev/null')       # fd 0
190    sys.stdout = open('/dev/null', 'w') # fd 1
191    sys.stderr = open('/dev/null', 'w') # fd 2
192
193    # disassociate from parent
194    os.chdir("/")
195    os.setsid()
196    os.umask(0)
197
198    return pid
199
200 def Main():
201     parser = OptionParser(usage="expreceiver.py [options]",
202                           description="Options: --daemon")
203
204     parser.add_option("-d", "--daemon", dest="daemon", action="store_true",
205         help="turn into a daemon", default=False)
206
207     (options, args) = parser.parse_args()
208
209     if options.daemon:
210         if os.geteuid() > 0:
211             print "You must be root to use --daemon ..."
212             sys.exit(1)
213         make_daemon("expreceiver")
214
215     received = ExperimentReceiver(PUBSUBADDR)
216
217     # is there a better way to sleep until a KeyboardInterrupt happens?
218     try:
219         while True:
220             time.sleep(1)
221     except:
222         pass
223
224     sys.exit(0)
225
226 if __name__ == "__main__":
227     Main()
228