fix check_buckets failing if there were no reservations
[raven.git] / apps / gacks / gacksenforce.py
1 import os
2 import hashlib
3 import time
4
5 # one hour
6 BUCKET_QUANTUM = (60*60)
7
8 from gacksexcep import *
9 from gacksid import *
10 from gackshandle import *
11 from gacksresource import *
12
13 class GacksEnforcer:
14     def __init__(self, accounts, policies, calendar, resources):
15         self.accounts = accounts
16         self.policies = policies
17         self.calendar = calendar
18         self.resources = resources
19
20     def check_same_time(self, handles):
21         firstHandle = handles[0]
22         for handle in handles[1:]:
23             if (handle.timeStart != firstHandle.timeStart) or (handle.timeStop != firstHandle.timeStop):
24                 raise GacksEnforcerDifferentTimes(firstHandle, handle)
25
26     def check_duration(self, policy, handle):
27         if policy.maxDuration < 0:
28             return
29         if handle.get_duration() > policy.maxDuration:
30             raise GacksEnforcerDurationTooLong(handle, policy.maxDuration)
31
32     def check_end_date(self, policy, handle):
33         if policy.maxEndDays < 0:
34             return
35         maxTStop = time.time() + (policy.maxEndDays * 24 * 60 * 60)
36         if (handle.timeStop==INFINITY) or (handle.timeStop>maxTStop):
37             raise GacksEnforcerEndDateBeyondLimit(handle, maxTStop)
38
39     def handles_by_id(self, handles):
40         """ return a dict of handle ids to lists of handles with that id """
41         d = {}
42         for handle in handles:
43             if not handle.id in d:
44                 d[handle.id] = []
45             d[handle.id].append(handle)
46         return d
47
48     def aggregate_concurrent(self, resource, handles):
49         buckets = {}
50         for handle in handles:
51             for i in range(handle.unitStart, handle.unitStop):
52                 (hostitem,offset) = resource.inverse_map(i)
53                 hostname = hostitem.name
54                 if not hostname in buckets:
55                     buckets[hostname] = []
56                 core_index = (i-offset)
57                 if not core_index in buckets[hostname]:
58                     buckets[hostname].append(i-offset)
59                 #print i, hostname, offset, core_index, buckets[hostname]
60         return buckets
61
62     def check_concurrent_limit(self, policy, resource, existing_handles, handles, account):
63         all_handles = existing_handles + handles
64
65         buckets = self.aggregate_concurrent(resource, all_handles)
66
67         total = 0
68         for key in buckets.keys():
69             bucket = buckets[key]
70             #print key, bucket
71             if (policy.maxUnitsNode >= 0) and (len(bucket) > policy.maxUnitsNode):
72                 raise GacksEnforcerTooManyOnNode(key, len(bucket))
73             total = total + len(bucket)
74
75         maxUnitsConcurrent = policy.maxUnitsConcurrent * account.multiplier
76
77         if (maxUnitsConcurrent >= 0) and (total > maxUnitsConcurrent):
78             raise GacksEnforcerTooManyConcurrent(total)
79
80     def check_bucket(self, name, policy, resource, handles, account, enforce=True):
81         # if policy doesn't use a bucket, then return
82         if policy.bucketInRate<0 or policy.bucketMax<0:
83             return []
84
85         # load and/or initialize the bucket
86         bucketAccount = self.accounts.get_account(account.name, "bucket", create_if_not_exist=True)
87         bucketAccount.inRate = policy.bucketInRate
88         bucketAccount.maxBalance = policy.bucketMax
89         if (bucketAccount.created):
90             bucketAccount.balance = policy.bucketInit
91             bucketAccount.commit()
92
93         results = [(0, bucketAccount.balance, 0)]
94
95         all_handles = self.calendar.query_overlap(resource, hasAllocator=name)
96         all_handles = all_handles + handles
97
98         if (all_handles == []):
99             # there are no handles, so there's no analysis to do
100             return results
101
102         # sort them by start time
103         all_handles = sorted(all_handles, key=lambda handle: handle.timeStart)
104
105         active_handles = []
106
107         # compute a global start and stop time for all of the user's
108         # reservations
109         timeStart=None
110         timeStop=None
111         for handle in all_handles:
112             if (timeStart==None) or (timeStart > handle.timeStart):
113                 timeStart = handle.timeStart
114             if (timeStop==None) or (timeStop < handle.timeStop):
115                 timeStop = handle.timeStop
116
117         # start off at an hour boundary
118         timeStart = (int(timeStart) / BUCKET_QUANTUM) * BUCKET_QUANTUM
119
120         t = timeStart
121         while t < timeStop:
122             active_handles = [handle for handle in active_handles if (t < handle.timeStop)]
123
124             new_all_handles=[]
125             for handle in all_handles:
126                 if (not handle in active_handles) and (t+BUCKET_QUANTUM>handle.timeStart):
127                     active_handles.append(handle)
128                 else:
129                     new_all_handles.append(handle)
130             all_handles = new_all_handles
131
132             # If there are now active handles, then zoom forward to the start time
133             # of the next handle on the list, because there's going to be no
134             # work to do until we get there.
135             if (active_handles == []) and (all_handles != []):
136                 t = all_handles[0].timeStart
137                 t = (int(t) / BUCKET_QUANTUM) * BUCKET_QUANTUM
138                 continue
139
140             total = 0
141             for handle in active_handles:
142                 total = total + handle.get_quantity()
143
144             bucketAccount.apply_inRate(t)
145
146             results.append((t, bucketAccount.balance, total))
147             #for handle in active_handles:
148             #    print "  ", time.ctime(handle.timeStart), time.ctime(handle.timeStop)
149
150             if (total > bucketAccount.balance) and (enforce):
151                 raise GacksEnforcerBucketUnderflow(t, total, bucketAccount.balance)
152
153             bucketAccount.adjust_balance(-total)
154
155             t = t + BUCKET_QUANTUM
156
157         # add a final row to results to show after the last reservation
158         bucketAccount.apply_inRate(t)
159         results.append( (t, bucketAccount.balance, 0) )
160
161         return results
162
163     def check_handles(self, name, handles, account, reservationKind):
164         id = handles[0].id
165         policy = self.policies.get_policy(account.level, id)
166         if (policy==None):
167             raise GacksEnforcerNoPolicy(account.level, id)
168
169         if not reservationKind in policy.reservations:
170             raise GacksEnforcerNoReservation(reservationKind, policy.reservations)
171
172         resource = self.resources.get_resource(id)
173         if not resource:
174             raise GacksEnforcerNoResource(id)
175
176         existing_handles = self.calendar.query_overlap(id, timeStart=handles[0].timeStart, timeStop=handles[0].timeStop, hasAllocator=name)
177
178         # all the handles are the same length, start, and stop time
179         self.check_duration(policy, handles[0])
180         self.check_end_date(policy, handles[0])
181
182         if resource.kind==GROUPED:
183             self.check_concurrent_limit(policy, resource, existing_handles, handles, account)
184
185         self.check_bucket(name, policy, resource, handles, account)
186
187     def check(self, name, handles, reservationKind="calendar"):
188         if len(handles) == 0:
189             return
190
191         self.check_same_time(handles)
192
193         account = self.accounts.get_account(hrn_to_gacksid(name), "user")
194         if (account == None):
195             raise GacksEnforcerNoAccount(name)
196
197         if (not account.goodStanding):
198             raise GacksEnforcerNotGoodStanding(name)
199
200         id_handles = self.handles_by_id(handles)
201
202         for id in id_handles.keys():
203             self.check_handles(name, id_handles[id], account, reservationKind)
204