fix bug in bucket checker; fix enforcer test case
[raven.git] / apps / gacks / gacksenforce.py
1 import os
2 import hashlib
3 import time
4
5 # one hour
6 BUCKET_QUANTUM = (60*60)
7
8 from gacksexcep import *
9 from gacksid import *
10 from gackshandle import *
11 from gacksresource import *
12
13 class GacksEnforcer:
14     def __init__(self, accounts, policies, calendar, resources):
15         self.accounts = accounts
16         self.policies = policies
17         self.calendar = calendar
18         self.resources = resources
19
20     def check_same_time(self, handles):
21         firstHandle = handles[0]
22         for handle in handles[1:]:
23             if (handle.timeStart != firstHandle.timeStart) or (handle.timeStop != firstHandle.timeStop):
24                 raise GacksEnforcerDifferentTimes(firstHandle, handle)
25
26     def check_duration(self, policy, handle):
27         if policy.maxDuration < 0:
28             return
29         if handle.get_duration() > policy.maxDuration:
30             raise GacksEnforcerDurationTooLong(handle, policy.maxDuration)
31
32     def check_end_date(self, policy, handle):
33         if policy.maxEndDays < 0:
34             return
35         maxTStop = time.time() + (policy.maxEndDays * 24 * 60 * 60)
36         if (handle.timeStop==INFINITY) or (handle.timeStop>maxTStop):
37             raise GacksEnforcerEndDateBeyondLimit(handle, maxTStop)
38
39     def handles_by_id(self, handles):
40         """ return a dict of handle ids to lists of handles with that id """
41         d = {}
42         for handle in handles:
43             if not handle.id in d:
44                 d[handle.id] = []
45             d[handle.id].append(handle)
46         return d
47
48     def aggregate_concurrent(self, resource, handles):
49         buckets = {}
50         for handle in handles:
51             for i in range(handle.unitStart, handle.unitStop):
52                 (hostitem,offset) = resource.inverse_map(i)
53                 hostname = hostitem.name
54                 if not hostname in buckets:
55                     buckets[hostname] = []
56                 core_index = (i-offset)
57                 if not core_index in buckets[hostname]:
58                     buckets[hostname].append(i-offset)
59                 #print i, hostname, offset, core_index, buckets[hostname]
60         return buckets
61
62     def check_concurrent_limit(self, policy, resource, existing_handles, handles, account):
63         all_handles = existing_handles + handles
64
65         buckets = self.aggregate_concurrent(resource, all_handles)
66
67         total = 0
68         for key in buckets.keys():
69             bucket = buckets[key]
70             #print key, bucket
71             if (policy.maxUnitsNode >= 0) and (len(bucket) > policy.maxUnitsNode):
72                 raise GacksEnforcerTooManyOnNode(key, len(bucket))
73             total = total + len(bucket)
74
75         maxUnitsConcurrent = policy.maxUnitsConcurrent * account.multiplier
76
77         if (maxUnitsConcurrent >= 0) and (total > maxUnitsConcurrent):
78             raise GacksEnforcerTooManyConcurrent(total)
79
80     def check_bucket(self, name, policy, resource, handles, account):
81         # if policy doesn't use a bucket, then return
82         if policy.bucketInRate<0 or policy.bucketMax<0:
83             return
84
85         print "check_bucket"
86
87         # load and/or initialize the bucket
88         bucketAccount = self.accounts.get_account(hrn_to_gacksid(name), "bucket", create_if_not_exist=True)
89         bucketAccount.inRate = policy.bucketInRate
90         bucketAccount.maxBalance = policy.bucketMax
91         if (bucketAccount.created):
92             bucketAccount.balance = policy.bucketInit
93             bucketAccount.commit()
94
95         all_handles = self.calendar.query_overlap(handles[0].id, hasAllocator=name)
96         all_handles = all_handles + handles
97
98         # sort them by start time
99         all_handles = sorted(all_handles, key=lambda handle: handle.timeStart)
100
101         active_handles = []
102
103         # compute a global start and stop time for all of the user's
104         # reservations
105         timeStart=None
106         timeStop=None
107         for handle in all_handles:
108             if (timeStart==None) or (timeStart > handle.timeStart):
109                 timeStart = handle.timeStart
110             if (timeStop==None) or (timeStop < handle.timeStop):
111                 timeStop = handle.timeStop
112
113         # start off at an hour boundary
114         timeStart = (int(timeStart) / BUCKET_QUANTUM) * BUCKET_QUANTUM
115
116         t = timeStart
117         while t < timeStop:
118             active_handles = [handle for handle in active_handles if (t < handle.timeStop)]
119
120             new_all_handles=[]
121             for handle in all_handles:
122                 if (not handle in active_handles) and (t+BUCKET_QUANTUM>handle.timeStart):
123                     active_handles.append(handle)
124                 else:
125                     new_all_handles.append(handle)
126             all_handles = new_all_handles
127
128             # If there are now active handles, then zoom forward to the start time
129             # of the next handle on the list, because there's going to be no
130             # work to do until we get there.
131             if (active_handles == []) and (all_handles != []):
132                 t = all_handles[0].timeStart
133                 t = (int(t) / BUCKET_QUANTUM) * BUCKET_QUANTUM
134                 continue
135
136             total = 0
137             for handle in active_handles:
138                 total = total + handle.get_quantity()
139
140             bucketAccount.apply_inRate(t)
141
142             print time.ctime(t), bucketAccount.balance, total
143             #for handle in active_handles:
144             #    print "  ", time.ctime(handle.timeStart), time.ctime(handle.timeStop)
145
146             if total > bucketAccount.balance:
147                 raise GacksEnforcerBucketUnderflow(t, total, bucketAccount.balance)
148
149             bucketAccount.adjust_balance(-total)
150
151             t = t + BUCKET_QUANTUM
152
153     def check_handles(self, name, handles, account, reservationKind):
154         id = handles[0].id
155         policy = self.policies.get_policy(account.level, id)
156         if (policy==None):
157             raise GacksEnforcerNoPolicy(account.level, id)
158
159         if not reservationKind in policy.reservations:
160             raise GacksEnforcerNoReservation(reservationKind, policy.reservations)
161
162         resource = self.resources.get_resource(id)
163         if not resource:
164             raise GacksEnforcerNoResource(id)
165
166         existing_handles = self.calendar.query_overlap(id, timeStart=handles[0].timeStart, timeStop=handles[0].timeStop, hasAllocator=name)
167
168         # all the handles are the same length, start, and stop time
169         self.check_duration(policy, handles[0])
170         self.check_end_date(policy, handles[0])
171
172         if resource.kind==GROUPED:
173             self.check_concurrent_limit(policy, resource, existing_handles, handles, account)
174
175         self.check_bucket(name, policy, resource, handles, account)
176
177     def check(self, name, handles, reservationKind="calendar"):
178         if len(handles) == 0:
179             return
180
181         self.check_same_time(handles)
182
183         account = self.accounts.get_account(hrn_to_gacksid(name), "user")
184         if (account == None):
185             raise GacksEnforcerNoAccount(name)
186
187         if (not account.goodStanding):
188             raise GacksEnforcerNotGoodStanding(name)
189
190         id_handles = self.handles_by_id(handles)
191
192         for id in id_handles.keys():
193             self.check_handles(name, id_handles[id], account, reservationKind)
194