import repository from arizona
[raven.git] / apps / gacks / gacksbackend.py
1 #!/usr/bin/env python
2
3 ##
4 # Gacks Server
5 ##
6
7 import atexit
8 import getopt
9 import tempfile
10 import os
11 import random
12 import sys
13 import threading
14
15 CRASH_LOG_DIR = "/usr/local/gacks/var/log"
16 PID_FN = "/var/run/gacksbackend.pid"
17
18 from gacksexcep import *
19 from gacksreceipt import *
20 from gackslog import *
21 from gackscalendar_mysql import *
22 from gacksresource import *
23 from gacksaccountmanager import *
24 from gackspolicy import *
25 from gacksinvoice import *
26 from gacksclient import GacksClient
27 from gackshandler_plc import GacksPLCHandler
28
29 glo_daemon = False
30 glo_test = False
31
32 Logger = GacksLogger(name = "gacksbackend", enable_stdout=True)
33
34 class GacksBackend:
35    def __init__(self, calendar, resources, invoices=None, policies=None, accounts=None):
36        self.calendar = calendar
37        self.resourceDirectory = resources
38        self.invoices = invoices
39        self.policies = policies
40        self.accounts = accounts
41        self.handlers = {}
42
43    def poll(self):
44        self.compute_reservations()
45
46    def do_accounting(self, tResv, resDef, resources):
47        allocators = {}
48
49        if (self.invoices==None) or (self.accounts==None) or (self.policies==None):
50            return
51
52        # build a dictionary that maps allocatorHRNS to the records they reserve
53        for consumerHRN in resources:
54            resource = resources[consumerHRN]
55            for record in resource["records"]:
56                if len(record.allocatorHRNs)>=3:
57                    billable_allocator = record.allocatorHRNs[2]
58                    if not (billable_allocator in allocators):
59                        allocators[billable_allocator] = []
60                    allocators[billable_allocator].append(record)
61
62        if resDef.kind==GROUPED:
63            self.do_accounting_grouped(tResv, resDef, allocators)
64
65    def do_accounting_grouped(self, tResv, resDef, allocators):
66        for allocatorHRN in allocators.keys():
67            # we need the account to know the policy
68            account = self.accounts.get_account(allocatorHRN, "user")
69            if not account:
70                print "no account:", allocatorHRN
71                return
72
73            # we need the policy to know the cost/unit
74            policy = self.policies.get_policy(account.level, resDef.name)
75            if not policy:
76                print "no policy:", allocatorHRN, account.level
77                return
78
79            records = allocators[allocatorHRN]
80            for record in records:
81                for i in range(record.unitStart, record.unitStop):
82                    (hostitem,offset) = resDef.inverse_map(i)
83
84                    self.invoices.add_charge(allocatorHRN, hostitem.name, KIND_SLOT_CHARGE, tResv, 1 * policy.cost)
85
86    def compute_reservations(self):
87        # Slot starts at the beginning of the hour and runs to the end of the
88        # hour. Any reservation that occurs within that time period will allocate
89        # the full hour.
90
91        tNow = int(time.time() / 3600) * 3600
92
93        print "query", tNow, tNow+3600
94
95        self.calendar.lock()
96        recList = self.calendar.query_overlap(None, 0, INFINITY, tNow, tNow+3600, None, None)
97        self.calendar.unlock()
98        Logger.log_msg("GacksNodeManager.compute_reservations", "%d items in resv set" % (len(recList)))
99
100        # Preprocess the reservations to sort them by kind and consumer
101        resources = {}
102        for item in recList:
103            if item.consumerHRN==None:
104                # no consumer -- nothing to reserve
105                continue
106
107            resource = resources.get(item.id, {"id": item.id, "consumers": {}})
108            resources[item.id] = resource
109
110            reservation = resource["consumers"].get(item.consumerHRN, {"id": item.id, "consumer": item.consumerHRN, "qty": 0, "records": []})
111            reservation["qty"] = reservation["qty"] + item.get_quantity()
112            reservation["records"].append(item)
113            resource["consumers"][item.consumerHRN] = reservation
114
115        # Print some debugging info
116        for kind in resources.keys():
117            resource = resources[kind]
118            for consumer in resource["consumers"].keys():
119                reservation = resource["consumers"][consumer]
120                print reservation["id"], reservation["qty"], reservation["consumer"]
121
122        for kind in resources.keys():
123            resource = resources[kind]
124            resourceDefinition = self.resourceDirectory.get_resource(kind)
125            if not resourceDefinition:
126                print "Failed to lookup resource in resources directory:", kind
127                continue
128
129            if not (kind in self.handlers):
130                if resourceDefinition.handler == "plc":
131                    self.handlers[kind] = GacksPLCHandler(resourceDefinition)
132                else:
133                    print "unknown handler kind:", resourceDefinition.handler
134
135            if not kind in self.handlers:
136                print "Don't know how to handle:", kind
137            else:
138                # resource is a dict: {"id", "consumers"}
139                # resource["consumers"] is a dict of dicts: consumerHRN -> {"id", "consumer", "qty", "records"}
140                pass #self.handlers[kind].applyReservations(resource["consumers"])
141
142            self.do_accounting(tNow, resourceDefinition, resource["consumers"])
143
144 def test(resourceDir):
145     print "TEST"
146     resourceDefinition = resourceDir.get_resource("plc.vicci.cores")
147     r0 = GacksRecord("plc.vicci.cores", 0, 0)
148     r11 = GacksRecord("plc.vicci.cores", 11, 11)
149     r71 = GacksRecord("plc.vicci.cores", 71, 73)
150     r111 = GacksRecord("plc.vicci.cores", 111, 111)
151     resource = {"plc.arizona.beta": {"id": "plc.vicci.cores", "consumer": "plc.arizona.beta", "qty": 6, "records": [r0,r11,r71,r111]}}
152     handler = GacksPLCHandler(resourceDefinition)
153     handler.applyReservations(resource)
154
155 class Periodic:
156     """Periodically make a function call."""\r
157     """XXX borrowed from Sirius"""\r
158 \r
159     def __init__(self, target, interval, mindelta, maxdelta):\r
160         self._target = target\r
161         self._interval = interval\r
162         self._deltarange = mindelta, maxdelta+1\r
163         thr = threading.Thread(target=self.run, args=[target])\r
164         thr.setDaemon(True)\r
165         thr.start()\r
166 \r
167     def run(self, target):\r
168         nextintervalstart = int(time.time() / self._interval) * self._interval\r
169         while True:\r
170             try:\r
171                 self._target()\r
172             except:\r
173                 Logger.log_exception("Periodic.run")\r
174             nextintervalstart += self._interval\r
175             nextfiring = nextintervalstart + random.randrange(*self._deltarange)\r
176             while True:\r
177                 t = time.time()\r
178                 if t < nextfiring:\r
179                     try:\r
180                         time.sleep(nextfiring - t)\r
181                     except:\r
182                         Logger.log_exception("Periodic.run")\r
183                 else:\r
184                     break
185
186 def daemon():
187     """Daemonize the current process.\r
188        XXX code from princeton plc.py """\r
189     if os.fork() != 0: os._exit(0)\r
190     os.setsid()\r
191     if os.fork() != 0: os._exit(0)\r
192     os.umask(0)\r
193     devnull = os.open(os.devnull, os.O_RDWR)\r
194     os.dup2(devnull, 0)\r
195     # xxx fixme - this is just to make sure that nothing gets stupidly lost - should use devnull\r
196     crashlog = os.open(os.path.join(CRASH_LOG_DIR, "gacksbackend.demon"), os.O_RDWR | os.O_APPEND | os.O_CREAT, 0644)\r
197     os.dup2(crashlog, 1)\r
198     os.dup2(crashlog, 2)
199     try:
200         open(PID_FN,"w").write(str(os.getpid()))
201     except:
202         pass
203
204 def usage():
205     print "Usage: %s [OPTION]..." % sys.argv[0]
206     print "Options:"
207     print "     -h, --help              This message"
208     print "     -d, --daemon            Daemonize"
209     sys.exit(1)
210
211 def get_options():
212     global glo_daemon, glo_test
213
214     # Get options
215     try:
216         (opts, argv) = getopt.getopt(sys.argv[1:], "dht", ["daemon", "help", "test"])
217     except getopt.GetoptError, err:
218         print "Error: " + err.msg
219         usage()
220
221     for (opt, optval) in opts:
222         if opt == "-d" or opt == "--daemon":
223             glo_daemon = True
224         elif opt == "-h" or opt == "--help":
225             usage()
226             sys.exit(0)
227         elif opt =="-t" or opt == "--test":
228             glo_test = True
229
230 def main():
231     get_options()
232
233     if glo_daemon:
234         daemon()
235
236     calendar = GacksMySQLCalendar()
237     calendar.open()
238
239     resources = GacksResourceDirectory("/etc/gacks/resources.d")
240     policies = GacksPolicyDirectory("/etc/gacks/policy.d")
241     accounts = GacksAccountManager()
242     invoices = GacksInvoiceManager(accounts=accounts)
243
244     if glo_test:
245         test(resources)
246         sys.exit(0)
247
248     nm = GacksBackend(calendar, resources, invoices, policies, accounts)
249
250     # poll for updates from GacksCentral and compute reservations
251     periodicPoll = Periodic(nm.poll, 30, 0, 0) # 10*60, 0, 0)
252
253     # main thread can just sit here and do nothing
254     while True:
255         time.sleep(10)
256
257 if __name__=="__main__":
258     main()
259
260
261
262