import repository from arizona
[raven.git] / apps / gacks / gacksenforce.py
1 import os
2 import hashlib
3 import time
4
5 # one hour
6 BUCKET_QUANTUM = (60*60)
7
8 from gacksexcep import *
9 from gackshandle import *
10 from gacksresource import *
11
12 class GacksEnforcer:
13     def __init__(self, accounts, policies, calendar, resources):
14         self.accounts = accounts
15         self.policies = policies
16         self.calendar = calendar
17         self.resources = resources
18
19     def check_same_time(self, handles):
20         firstHandle = handles[0]
21         for handle in handles[1:]:
22             if (handle.timeStart != firstHandle.timeStart) or (handle.timeStop != firstHandle.timeStop):
23                 raise GacksEnforcerDifferentTimes(firstHandle, handle)
24
25     def check_duration(self, policy, handle):
26         if policy.maxDuration < 0:
27             return
28         if handle.get_duration() > policy.maxDuration:
29             raise GacksEnforcerDurationTooLong(handle, policy.maxDuration)
30
31     def check_end_date(self, policy, handle):
32         if policy.maxEndDays < 0:
33             return
34         maxTStop = time.time() + (policy.maxEndDays * 24 * 60 * 60)
35         if (handle.timeStop==INFINITY) or (handle.timeStop>maxTStop):
36             raise GacksEnforcerEndDateBeyondLimit(handle, maxTStop)
37
38     def handles_by_id(self, handles):
39         """ return a dict of handle ids to lists of handles with that id """
40         d = {}
41         for handle in handles:
42             if not handle.id in d:
43                 d[handle.id] = []
44             d[handle.id].append(handle)
45         return d
46
47     def aggregate_concurrent(self, resource, handles):
48         buckets = {}
49         for handle in handles:
50             for i in range(handle.unitStart, handle.unitStop):
51                 (hostitem,offset) = resource.inverse_map(i)
52                 hostname = hostitem.name
53                 if not hostname in buckets:
54                     buckets[hostname] = []
55                 core_index = (i-offset)
56                 if not core_index in buckets[hostname]:
57                     buckets[hostname].append(i-offset)
58                 #print i, hostname, offset, core_index, buckets[hostname]
59         return buckets
60
61     def check_concurrent_limit(self, policy, resource, existing_handles, handles, account):
62         all_handles = existing_handles + handles
63
64         buckets = self.aggregate_concurrent(resource, all_handles)
65
66         total = 0
67         for key in buckets.keys():
68             bucket = buckets[key]
69             #print key, bucket
70             if (policy.maxUnitsNode >= 0) and (len(bucket) > policy.maxUnitsNode):
71                 raise GacksEnforcerTooManyOnNode(key, len(bucket))
72             total = total + len(bucket)
73
74         maxUnitsConcurrent = policy.maxUnitsConcurrent * account.multiplier
75
76         if (maxUnitsConcurrent >= 0) and (total > maxUnitsConcurrent):
77             raise GacksEnforcerTooManyConcurrent(total)
78
79     def check_bucket(self, name, policy, resource, handles, account):
80         # if policy doesn't use a bucket, then return
81         if policy.bucketInRate<0 or policy.bucketMax<0:
82             return
83
84         print "check_bucket"
85
86         # load and/or initialize the bucket
87         bucketAccount = self.accounts.get_account(account.name, "bucket", create_if_not_exist=True)
88         bucketAccount.inRate = policy.bucketInRate
89         bucketAccount.maxBalance = policy.bucketMax
90         if (bucketAccount.created):
91             bucketAccount.balance = policy.bucketInit
92             bucketAccount.commit()
93
94         all_handles = self.calendar.query_overlap(handles[0].id, hasAllocator=name)
95         all_handles = all_handles + handles
96
97         # sort them by start time
98         all_handles = sorted(all_handles, key=lambda handle: handle.timeStart)
99
100         active_handles = []
101
102         # compute a global start and stop time for all of the user's
103         # reservations
104         timeStart=None
105         timeStop=None
106         for handle in all_handles:
107             if (timeStart==None) or (timeStart > handle.timeStart):
108                 timeStart = handle.timeStart
109             if (timeStop==None) or (timeStop < handle.timeStop):
110                 timeStop = handle.timeStop
111
112         # start off at an hour boundary
113         timeStart = (int(timeStart) / BUCKET_QUANTUM) * BUCKET_QUANTUM
114
115         t = timeStart
116         while t < timeStop:
117             active_handles = [handle for handle in active_handles if (t < handle.timeStop)]
118
119             new_all_handles=[]
120             for handle in all_handles:
121                 if (not handle in active_handles) and (t+BUCKET_QUANTUM>handle.timeStart):
122                     active_handles.append(handle)
123                 else:
124                     new_all_handles.append(handle)
125             all_handles = new_all_handles
126
127             # If there are now active handles, then zoom forward to the start time
128             # of the next handle on the list, because there's going to be no
129             # work to do until we get there.
130             if (active_handles == []) and (all_handles != []):
131                 t = all_handles[0].timeStart
132                 t = (int(t) / BUCKET_QUANTUM) * BUCKET_QUANTUM
133                 continue
134
135             total = 0
136             for handle in active_handles:
137                 total = total + handle.get_quantity()
138
139             bucketAccount.apply_inRate(t)
140
141             print time.ctime(t), bucketAccount.balance, total
142             #for handle in active_handles:
143             #    print "  ", time.ctime(handle.timeStart), time.ctime(handle.timeStop)
144
145             if total > bucketAccount.balance:
146                 raise GacksEnforcerBucketUnderflow(t, total, bucketAccount.balance)
147
148             bucketAccount.adjust_balance(-total)
149
150             t = t + BUCKET_QUANTUM
151
152     def check_handles(self, name, handles, account, reservationKind):
153         id = handles[0].id
154         policy = self.policies.get_policy(account.level, id)
155         if (policy==None):
156             raise GacksEnforcerNoPolicy(account.level, id)
157
158         if not reservationKind in policy.reservations:
159             raise GacksEnforcerNoReservation(reservationKind, policy.reservations)
160
161         resource = self.resources.get_resource(id)
162         if not resource:
163             raise GacksEnforcerNoResource(id)
164
165         existing_handles = self.calendar.query_overlap(id, timeStart=handles[0].timeStart, timeStop=handles[0].timeStop, hasAllocator=name)
166
167         # all the handles are the same length, start, and stop time
168         self.check_duration(policy, handles[0])
169         self.check_end_date(policy, handles[0])
170
171         if resource.kind==GROUPED:
172             self.check_concurrent_limit(policy, resource, existing_handles, handles, account)
173
174         self.check_bucket(name, policy, resource, handles, account)
175
176     def check(self, name, handles, reservationKind="calendar"):
177         if len(handles) == 0:
178             return
179
180         self.check_same_time(handles)
181
182         account = self.accounts.get_account(name, "user")
183         if (account == None):
184             raise GacksEnforcerNoAccount(name)
185
186         if (not account.goodStanding):
187             raise GacksEnforcerNotGoodStanding(name)
188
189         id_handles = self.handles_by_id(handles)
190
191         for id in id_handles.keys():
192             self.check_handles(name, id_handles[id], account, reservationKind)
193