import repository from arizona
[raven.git] / apps / gacks / gacksnm.py
1 #!/usr/bin/env python
2
3 ##
4 # Gacks Server
5 ##
6
7 import atexit
8 import getopt
9 import tempfile
10 import os
11 import random
12 import sys
13 import threading
14
15 from xmlrpclib import ServerProxy
16 from sfa.trust.credential import Credential
17
18 from gacksexcep import *
19 from gackscalendar import *
20 from gackscalendar_list import *
21 from gacksreceipt import *
22 from gackslog import *
23 from gacksclient import GacksClient
24
25 CRASH_LOG_DIR = "/usr/local/gacksnm/var/log"
26 PID_FN = "/var/run/gacksnm.pid"
27
28 GACKS_OWL_SCRIPT = "/etc/owl/scripts.d/gacksnm"
29
30 glo_daemon = False
31
32 Logger = GacksLogger(name = "gacksnm", enable_stdout=True)
33
34 class GacksNodeManager:
35    def __init__(self):
36        self.gacksCentralHost = "stork-repository.cs.arizona.edu" #"198.0.0.136"
37        self.gacksCentralURL = "http://" + self.gacksCentralHost + "/GACKSAPI/"
38        self.currentUpdateHash = None
39        self.backend = None
40        self.init_calendar()
41        self.init_plc()
42
43    def init_calendar(self):
44        self.calendar = GacksListCalendar()
45        self.calendar.reset()
46
47    def get_key_file(self):
48        return "/usr/local/gacksnm/var/private/keys/gacksnm.pkey"
49
50    def update_calendar(self, newItems):
51        self.calendar.lock()
52        try:
53            self.calendar.reset()
54            for item in newItems:
55                self.calendar.insert_record(item)
56        finally:
57            self.calendar.unlock()
58
59    def poll(self):
60        client = GacksClient(self.gacksCentralURL, self.get_key_file())
61        (result, newHash, newItems) = client.get_update(self.currentUpdateHash)
62        if result == "update":
63            Logger.log_msg("GacksNodeManager.poll", "received update")
64            self.currentUpdateHash = newHash
65            self.update_calendar(newItems)
66            self.compute_reservations()
67        elif result == "notmodified":
68            Logger.log_msg("GacksNodeManager.poll", "received notmodified")
69
70    def hrn_to_slicename(self, hrn):
71        parts = hrn.split(".", 3)
72        return parts[1] + "_" + parts[2]
73
74    def init_plc(self):
75        self.nodemanager = ServerProxy('http://127.0.0.1:812/')
76
77        rec = self.nodemanager.GetRecord()
78        attrs = rec.get("attributes", {})
79
80        # get the resources that PLC has assigned to this node
81        cpu_pct = int(attrs.get("cpu_pct", 0))
82        net_min_rate = int(attrs.get("net_min_rate", 0))
83
84        # maps canopus resource IDs into planetlab resource IDs
85        self.resources = {}
86        if (cpu_pct>0):
87            self.resources["cpu"] = {"plc_id": "cpu_pct", "qty": cpu_pct}
88        if (net_min_rate>1):
89            # net_min_rate == 1 on a node with no network reservation; presumably
90            # it'll be >1 on a node with network reservation
91            self.resources["network"] = {"plc_id": "net_min_rate", "qty": net_min_rate}
92
93        print "nodemanager resources available:", self.resources
94
95        self.backend = "plc"
96
97    def reserve_plc(self, reservations):
98        # Convert gacks reservation list into PLC nodemanager loans
99        # PLC loans are of the form (slicename, resourcename, qty)
100        loans = []
101        for key in reservations.keys():
102            reservation = reservations[key]
103            if (reservation.get("consumer", None) == None):
104                continue
105
106            nm_resource = self.resources.get(reservation["id"])
107            if (nm_resource == None):
108                continue
109
110            slicename = self.hrn_to_slicename(reservation["consumer"])
111            loan = (slicename, nm_resource["plc_id"], nm_resource["qty"] * reservation["qty"] / 100)
112            loans.append(loan)
113        print "calling setloans:", loans
114        self.nodemanager.SetLoans("arizona_gacksnm", loans)
115
116    def compute_reservations(self):
117        self.calendar.lock()
118        recList = self.calendar.query_overlap(None, 0, INFINITY, time.time(), time.time(), None, None)
119        self.calendar.unlock()
120        Logger.log_msg("GacksNodeManager.compute_reservations", "%d items in resv set" % (len(recList)))
121
122        # build a set of reservations for each (id, consumer) that map to a
123        # quantity
124        reservations = {}
125        for item in recList:
126            key = item.id + "#" + str(item.consumerHRN)
127            reservation = reservations.get(key, {"id": item.id, "consumer": item.consumerHRN, "qty": 0})
128            reservation["qty"] = reservation["qty"] + item.get_quantity()
129            reservations[key] = reservation
130
131        for key in reservations.keys():
132            reservation = reservations[key]
133            print reservation["id"], reservation["qty"], reservation["consumer"]
134
135        if (self.backend == "plc"):
136            self.reserve_plc(reservations)
137
138        self.report_owl_reservations(reservations)
139
140    def report_owl_reservations(self, reservations, ):
141        try:
142            f = open(GACKS_OWL_SCRIPT, "w")
143        except:
144            # failed to create the file, probably because owl not running
145            return
146
147        # organize the reservations by id
148        resid = {}
149        for key in reservations.keys():
150            reservation = reservations[key]
151            l = resid.get(reservation["id"], [])
152            l.append(str(reservation["consumer"]) + ":" + str(reservation["qty"]))
153            resid[reservation["id"]] = l
154
155        f.write("#!/bin/bash\n")
156        f.write("echo [gacksnm]\n")
157
158        for key in resid.keys():
159            if key in ["cpu", "disk", "network"]:
160                f.write('echo ' + key + '="' + ', '.join(resid[key]) + '"\n')
161
162        for key in ["cpu", "disk", "network"]:
163            item = self.resources.get(key, None)
164            if item:
165                f.write('echo ' + 'a'+key + '="' + str(item["qty"]) + '"\n')
166
167        f.close()
168        os.chmod(GACKS_OWL_SCRIPT, 0777)
169
170
171 class Periodic:
172     """Periodically make a function call."""\r
173     """XXX borrowed from Sirius"""\r
174 \r
175     def __init__(self, target, interval, mindelta, maxdelta):\r
176         self._target = target\r
177         self._interval = interval\r
178         self._deltarange = mindelta, maxdelta+1\r
179         thr = threading.Thread(target=self.run, args=[target])\r
180         thr.setDaemon(True)\r
181         thr.start()\r
182 \r
183     def run(self, target):\r
184         nextintervalstart = int(time.time() / self._interval) * self._interval\r
185         while True:\r
186             try:\r
187                 self._target()\r
188             except:\r
189                 Logger.log_exception("Periodic.run")\r
190             nextintervalstart += self._interval\r
191             nextfiring = nextintervalstart + random.randrange(*self._deltarange)\r
192             while True:\r
193                 t = time.time()\r
194                 if t < nextfiring:\r
195                     try:\r
196                         time.sleep(nextfiring - t)\r
197                     except:\r
198                         Logger.log_exception("Periodic.run")\r
199                 else:\r
200                     break
201
202 def daemon():
203     """Daemonize the current process.\r
204        XXX code from princeton plc.py """\r
205     if os.fork() != 0: os._exit(0)\r
206     os.setsid()\r
207     if os.fork() != 0: os._exit(0)\r
208     os.umask(0)\r
209     devnull = os.open(os.devnull, os.O_RDWR)\r
210     os.dup2(devnull, 0)\r
211     # xxx fixme - this is just to make sure that nothing gets stupidly lost - should use devnull\r
212     crashlog = os.open(os.path.join(CRASH_LOG_DIR, "gacksnm.demon"), os.O_RDWR | os.O_APPEND | os.O_CREAT, 0644)\r
213     os.dup2(crashlog, 1)\r
214     os.dup2(crashlog, 2)
215     try:
216         open(PID_FN,"w").write(str(os.getpid()))
217     except:
218         pass
219
220 def usage():
221     print "Usage: %s [OPTION]..." % sys.argv[0]
222     print "Options:"
223     print "     -h, --help              This message"
224     print "     -d, --daemon            Daemonize"
225     sys.exit(1)
226
227 def get_options():
228     global glo_daemon
229
230     # Get options
231     try:
232         (opts, argv) = getopt.getopt(sys.argv[1:], "dh", ["daemon", "help"])
233     except getopt.GetoptError, err:
234         print "Error: " + err.msg
235         usage()
236
237     for (opt, optval) in opts:
238         if opt == "-d" or opt == "--daemon":
239             glo_daemon = True
240         elif opt == "-h" or opt == "--help":
241             usage()
242             sys.exit(0)
243
244 def main():
245     get_options()
246
247     if glo_daemon:
248         daemon()
249
250     nm = GacksNodeManager()
251
252     # poll for updates from GacksCentral
253     periodicPoll = Periodic(nm.poll, 30, 0, 0) # 10*60, 0, 0)
254
255     # recompute the current reservation set
256     periodicUpdate = Periodic(nm.compute_reservations, 10, 0, 0) #5*60, 0, 0)
257
258     # main thread can just sit here and do nothing
259     while True:
260         time.sleep(10)
261
262 if __name__=="__main__":
263     main()
264
265
266
267