import repository from arizona
[raven.git] / apps / gacks / API.py
1 #!/usr/bin/env python
2
3 ##
4 # Gacks Server
5 ##
6
7 import atexit
8 import tempfile
9 import os
10 import sys
11
12 from ravenlib.acl.mysqlacl import MysqlAclManager
13 from sfa.trust.credential import Credential
14
15 from gacksexcep import *
16 from gackscalendar import *
17 from gackscalendar_mysql import *
18 from gacksqueue import *
19 from gacksqueue_mysql import *
20 from gacksreceipt import *
21 from gackslog_mysql import *
22 from gacksresource import *
23 from gacksauth import GacksAuthToken
24 from gacksaccount import GacksAccount
25 from gacksaccountmanager import GacksAccountManager
26 from gackspolicy import GacksPolicyDirectory
27 from gacksenforce import GacksEnforcer
28 from gacksnodepicker import GacksNodePicker
29
30 from AuthenticatedApi import AuthenticatedApi
31
32 GACKS_CENTRAL_HRN = "plc.arizona.gackscentral"
33 GRM_HRN = "plc.arizona.grm"
34 ASAP_HRN = "plc.arizona.asap"
35
36 LOG_DIR = "/usr/local/gackscentral/var/log"
37 TRUSTED_ROOTS_DIR = "/usr/local/gackscentral/var/trusted_roots"
38
39 UPDATE_CACHE_TIME = 60 # use a 60-second cache time for now (10* 60)
40 UPDATE_DURATION = (24*60*60) # update packets will contain 1 day of data
41
42 def dump_handles(x):
43    strs=[]
44    for handle in x:
45        strs.append(handle.as_string())
46    file("/tmp/foo","w").write("\n".join(strs))
47
48
49
50 ##
51 # GacksServer is a GeniServer that serves component interface requests.
52 #
53
54 class RemoteApi(AuthenticatedApi):
55
56     ##
57     # Create a new GacksServer object.
58
59     def __init__(self, encoding="utf-8", hrn=GACKS_CENTRAL_HRN, trustedRootsDir=TRUSTED_ROOTS_DIR, logDir=LOG_DIR):
60         self.gacksLogger = GacksMysqlLogger(directory = logDir)
61         self.hrn = hrn
62         self.calendar = None
63         self.accounts = None
64         self.policies = None
65         self.queue = None
66         self.init_resources()
67         self.init_calendar(lock=True)
68         self.init_queue(lock=True)
69         self.init_accounts()
70         self.init_policies()
71         self.init_acls()
72         self.picker = GacksNodePicker(resources=self.resources)
73         self.currentUpdateDicts = []
74         self.currentUpdateHash = None
75         self.currentUpdateTime = 0
76         return AuthenticatedApi.__init__(self, encoding, trustedRootsDir=trustedRootsDir)
77
78     def init_resources(self):
79         self.resources = GacksResourceDirectory("/etc/gacks/resources.d")
80
81     def init_calendar(self, reset=False, lock=False):
82         if self.calendar != None:
83             self.close_calendar()
84
85         self.calendar = GacksMySQLCalendar()  # GacksListCalendar()
86         self.calendar.open()
87
88         if lock:
89             self.calendar.lock()
90
91         if reset:
92             self.calendar.reset()
93
94         if self.calendar.is_empty():
95             self.log_msg("init_calendar", "empty calendar; setting default policy")
96             self.create_resources()
97         #too much junk in the log
98         #else:
99         #    self.log_msg("init_calendar", "existing calendar found")
100
101         if lock:
102             self.calendar.unlock()
103
104     def close_calendar(self):
105         if self.calendar != None:
106             self.calendar.close()
107             self.calendar = None
108
109     def init_queue(self, reset=False, lock=False):
110         if self.queue != None:
111             self.close_queue();
112
113         self.queue = GacksMySQLQueue();
114         self.queue.open();
115
116         if lock:
117             self.queue.lock()
118
119         if reset:
120             self.queue.reset()
121
122         if lock:
123             self.queue.unlock()
124
125     def close_queue(self):
126         if self.queue != None:
127             self.queue.close()
128             self.queue = None
129
130     def init_accounts(self):
131         self.accounts = GacksAccountManager()
132
133     def init_policies(self):
134         self.policies = GacksPolicyDirectory("/etc/gacks/policy.d")
135
136     def init_acls(self):
137         # The calendar knows the database parameters, so we'll get them from
138         # there. it's a bit messy though.
139
140         self.acls = MysqlAclManager(self.calendar.dbname, self.calendar.user, self.calendar.get_password(), self.calendar.address)
141
142         rights = {"plc.arizona.gacksadmin": ["reset", "defragment", "garbagecollect"]}
143         self.acls.create_acl_if_not_exist("calendar", rights)
144
145     ##
146     # Override the API call function so that we lock the calendar while the API
147     # is in use.
148     def call(self, source, method, *args):
149         self.calendar.lock()
150         try:
151             result = AuthenticatedApi.call(self, source, method, *args)
152         finally:
153             self.calendar.unlock()
154
155         return result
156
157     ##
158     # Register the server RPCs for Gacks
159
160     def register_functions(self):
161         AuthenticatedApi.register_functions(self)
162         self.register_function(self.get_handle)
163         self.register_function(self.get_update)
164         self.register_function(self.query_exact)
165         self.register_function(self.query_overlap)
166         self.register_function(self.set_allocator)
167         self.register_function(self.set_consumer)
168         self.register_function(self.set_consumer_hrn)
169         self.register_function(self.submit_receipt)
170
171         # picker
172         self.register_function(self.pick)
173
174         # asap
175         self.register_function(self.query_asap)
176         self.register_function(self.add_asap)
177         self.register_function(self.delete_asap)
178         self.register_function(self.admin_asap_run)
179
180         # resources
181         self.register_function(self.get_resources)
182
183         # grm
184         self.register_function(self.grm_delete)
185         self.register_function(self.grm_reserve)
186         self.register_function(self.asap_reserve)
187
188         # account management
189         self.register_function(self.get_account)
190
191         # management interface
192         self.register_function(self.admin_reset)
193         self.register_function(self.admin_defragment)
194         self.register_function(self.admin_garbage_collect)
195
196     def log(self, func_name, arg_dict, result_name, result_val):
197         if self.gacksLogger:
198             self.gacksLogger.log(func_name, arg_dict, result_name, result_val)
199
200     def log_msg(self, func_name, msg):
201         if self.gacksLogger:
202             self.gacksLogger.log_msg(func_name, msg)
203
204     def create_resources(self):
205         for item in self.resources.resources:
206             r = GacksRecord(item.name, 0, item.qty, 0, INFINITY, [GACKS_CENTRAL_HRN, GRM_HRN], None)
207             self.calendar.insert_record(r)
208
209     def get_update(self, callerHash):
210         if (time.time()-self.currentUpdateTime) > UPDATE_CACHE_TIME:
211             self.currentUpdateTime = time.time()
212             self.currentUpdateDicts = self.calendar.query_overlap(timeStart=self.currentUpdateTime, timeStop=self.currentUpdateTime + UPDATE_DURATION)
213             self.currentUpdateHash = sha.new(str(self.currentUpdateDicts)).hexdigest()
214
215         if (callerHash == self.currentUpdateHash):
216             return ("notmodified", self.currentUpdateHash, [])
217
218         return ("update", self.currentUpdateHash, self.currentUpdateDicts)
219
220     def query_exact(self, id=None, unitStart=0, unitStop=INFINITY, timeStart=0, timeStop=INFINITY, hasAllocator=None, isLastAllocator=None):
221         recs = self.calendar.query_exact(id, unitStart, unitStop, timeStart, timeStop, hasAllocator, isLastAllocator)
222
223         recDicts = []
224         for rec in recs:
225             recDicts.append(rec.as_dict())
226
227         return recDicts
228
229     def query_overlap(self, id=None, unitStart=0, unitStop=INFINITY, timeStart=0, timeStop=INFINITY, hasAllocator=None, isLastAllocator=None):
230         recs = self.calendar.query_overlap(id, unitStart, unitStop, timeStart, timeStop, hasAllocator, isLastAllocator)
231
232         recDicts = []
233         for rec in recs:
234             recDicts.append(rec.as_dict())
235
236         return recDicts
237
238     def rspec_to_handles(self, rspec):
239         if isinstance(rspec, str):
240             rspec = eval(rspec)
241
242         if not isinstance(rspec, dict):
243             raise GacksBadRspecSyntax(str(rspec))
244
245         # an aggregate rspec is an rspec that contains a bunch of smaller
246         # rspecs. We resolve it recursively.
247
248         if "aggregate" in rspec:
249             handles=[]
250             for part in rspec["aggregate"]:
251                 thisHandles = self.rspec_to_handles(part)
252                 handles.extend(thisHandles)
253             return handles
254
255         id = rspec['id']
256         timeStart = rspec['timeStart']
257         timeStop = rspec['timeStop']
258
259         # this is the easy case -- the rspec specifies a unitStart and unitStop
260         # we simply return a handle with those parameters
261
262         if not ("unitQuantity" in rspec):
263             unitStart = rspec['unitStart']
264             unitStop = rspec['unitStop']
265             return [GacksHandle(id, unitStart, unitStop, timeStart, timeStop)]
266
267         # the hard case -- the rspec specifies a quantity. we need to get all
268         # of the available handles and batch them together until we satisfy
269         # that quantity
270
271         unitStart = int(rspec.get("unitStart", 0))
272         unitStop = int(rspec.get("unitStop", INFINITY))
273         unitQuantity = rspec['unitQuantity']
274         hasAllocator = rspec.get('hasAllocator', None)
275         isLastAllocator = rspec.get('isLastAllocator', None)
276
277         splitPoints = []
278
279         handles = []
280
281         qtyNeeded = int(unitQuantity)
282         records = self.calendar.query_overlap(id, unitStart, unitStop, timeStart, timeStop, hasAllocator, isLastAllocator)
283         for record in records:
284             if qtyNeeded <= 0:
285                 break
286
287             record.dump()
288
289             if not interval_contains(record.timeStart, record.timeStop, timeStart, timeStop):
290                 # the region is not a superset of the rspec that we're trying to fill,
291                 # so we can't use it.
292                 if (record.timeStop > timeStart) and (record.timeStop < timeStop):
293                     # keep track of where we ran into a region boundary
294                     splitPoints.append(record.timeStop)
295                 continue
296
297             # if a unitStart was specified, then make sure we don't violate it
298             if (record.unitStart<unitStart):
299                 record.unitStart = unitStart
300
301             # same with unitStop
302             if (record.unitStop>unitStop) and (record.unitStop!=INFINITY):
303                 record.unitStop = unitStop
304
305             # we might have turned it into an empty record
306             if (record.unitStart>=record.unitStop):
307                 continue
308
309             if record.get_quantity() == INFINITY:
310                 thisQty = record.unitStart + qtyNeeded
311             else:
312                 thisQty = min(record.get_quantity(), qtyNeeded)
313
314             qtyNeeded = qtyNeeded - thisQty
315
316             handle = GacksHandle(id, record.unitStart, record.unitStart + thisQty, timeStart, timeStop)
317             handles.append(handle)
318
319         if (qtyNeeded > 0) and (splitPoints != []):
320             # we couldn't find contiguous regions that satisfied our rspec, so
321             # lets subdivide the rspec into two pieces at a region boundary
322             # and try to resolve each region separately.
323             splitPoints.sort()
324             rspec1 = rspec.copy()
325             rspec2 = rspec.copy()
326             rspec1["timeStop"] = splitPoints[0]
327             rspec2["timeStart"] = splitPoints[0]
328             handles1 = self.rspec_to_handles(rspec1);
329             handles2 = self.rspec_to_handles(rspec2);
330             return handles1 + handles2
331
332         return handles
333
334     def get_handle(self, rspec):
335         arg_dict = locals()
336         handles = self.rspec_to_handles(rspec)
337         result = handles_to_strings(handles)
338
339         self.log("get_handle", arg_dict, "handle_list", result)
340
341         return result
342
343     def set_allocator_internal(self, handles, callerHRN, allocatorHRN, which, append, reqHash):
344         receiptList = []
345         for handle in handles:
346             # find the existing records that overlap the handle
347             existing_recs = self.calendar.query_handles_overlap([handle])
348
349             # XXX an exception thrown here can cause some receipts to be lost,
350             # since receipts will not be returned to the client
351             if not existing_recs:
352                 self.log("set_allocator", arg_dict, "exception", "GacksResourceNotFound:" + handle.as_string())
353                 raise GacksResourceNotFound(handle.as_string())
354
355             receipt = GacksReceipt(subject=handle.as_string(), handle=handle, action="set_allocator", requestHash=reqHash)
356
357             # TODO: Merge existing_recs
358
359             for item in existing_recs:
360                 if not item.contains_allocator(callerHRN):
361                     raise GacksCallerNotAllocator(str(item.as_string()) + " alloc=" + str(item.allocatorHRNs) + " caller=" + str(callerHRN) )
362                 if not item.is_superset(handle):
363                     raise GacksRequestSpansReservations(handle.as_string() + " on " + item.as_string())
364
365             leftovers = []
366             results = []
367             for item in existing_recs:
368                 if item.is_proper_superset(handle):
369                     parts = item.clone().split_subset(handle.unitStart, handle.unitStop, handle.timeStart, handle.timeStop)
370                     results.append(parts[0])
371                     leftovers.extend(parts[1:])
372                 else:
373                     results.append(item)
374
375             for item in existing_recs:
376                 self.calendar.remove_record(item)
377
378             for item in leftovers:
379                 self.calendar.insert_record(item)
380
381             for item in results:
382                 item.set_allocator(callerHRN, allocatorHRN, which, append)
383                 self.calendar.insert_record(item)
384                 receipt.AddRecord(item)
385
386             receiptList.append(receipt)
387
388         return receiptList
389
390     def authenticateToken(self, token, argList):
391         # in case something goes wrong...
392         #file("/tmp/sreq.txt","w").write(str(argList))
393
394         token = GacksAuthToken(xmlrpc_arg=token, argList=argList)
395         token.authenticate(trusted_roots=self.trusted_roots)
396
397         callerGid = token.get_gid_caller()
398         objectGid = token.get_gid_object()
399
400         return (token, callerGid, objectGid)
401
402     def set_allocator(self, authToken_str, handle_strs, allocatorGID_str, which, append,):
403         arg_dict = locals()
404
405         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [handle_strs, allocatorGID_str, which, append])
406
407         if allocatorGID_str:
408             allocatorGID = GID(string = allocatorGID_str)
409             allocatorGID.verify_chain(self.trusted_roots.get_list()) # self.trusted_cert_list)
410             allocatorHRN = allocatorGID.get_hrn()
411         else:
412             # if allocatorGID is None, then we'll treat this request as a truncate
413             allocatorHRN = None
414
415         receiptList = []
416
417         handles = strings_to_handles(handle_strs)
418
419         receiptList = self.set_allocator_internal(handles, objectGID.get_hrn(), allocatorHRN, which, append)
420
421         receiptStrings = []
422         for receipt in receiptList:
423            receipt.encode()
424            # XXX TODO sign the receipt
425            receiptStrings.append(receipt.save_to_string())
426
427         self.log("set_allocator", arg_dict, "receipt_list", receiptStrings)
428
429         return receiptStrings
430
431     def set_consumer_internal(self, handles, consumerHRN, reqHash):
432         receiptList = []
433         for handle in handles:
434             existing_recs = self.calendar.query_handles_overlap([handle])
435
436             if not existing_recs:
437                 raise GacksResourceNotFound(hand.as_string())
438
439             receipt = GacksReceipt(subject=handle.as_string(), handle=handle, action="set_consumer", requestHash=reqHash)
440
441             for rec in existing_recs:
442                 rec.set_consumer(consumerHRN)
443                 self.calendar.update_record(rec)
444                 receipt.AddRecord(rec)
445
446             receiptList.append(receipt)
447
448         return receiptList
449
450     def set_consumer_hrn(self, authToken_str, handle_strs, consumerHRN):
451         arg_dict = locals()
452
453         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [handle_strs, consumerHRN])
454
455         # XXX - are we missing a check to see that objectGID is on the allocator list?
456
457         handles = strings_to_handles(handle_strs)
458         receiptList = self.set_consumer_internal(handles, consumerHRN, authToken.reqHash)
459
460         receiptStrings = []
461         for receipt in receiptList:
462            receipt.encode()
463            # XXX TODO sign the receipt
464            receiptStrings.append(receipt.save_to_string())
465
466         self.log("set_consumer_hrn", arg_dict, "receipt_list", receiptStrings)
467
468         return receiptStrings
469
470
471     def set_consumer(self, authToken_str, handle_strs, cred_str):
472         arg_dict = locals()
473
474         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [handle_strs, cred_str])
475
476         # XXX - are we missing a check to see that objectGID is on the allocator list?
477
478         if cred_str:
479             cred = Credential(string = cred_str)
480
481             cred.verify_chain(self.trusted_roots.get_list())
482
483             # Checking to see if the cred can perform loanresources is equivalent
484             # to checking to see if it has the Bind privilege.
485             if not cred.can_perform("loanresources"):
486                 raise GacksMissingBindPrivilege(cred.get_privileges().save_to_string())
487
488             # Verify that GacksCentral is the callerGID of the credential. We need
489             # this so that the Gacks component can eventually call LoanResources
490             # using this credential.
491             # if callerGID.get_hrn() != self.
492
493             consumerHRN = cred.get_gid_object().get_hrn()
494         else:
495             cred = None
496             consumerHRN = None
497
498         handles = strings_to_handles(handle_strs)
499         receiptList = self.set_consumer_internal(handles, consumerHRN, authToken.reqHash)
500
501         receiptStrings = []
502         for receipt in receiptList:
503            receipt.encode()
504            # XXX TODO sign the receipt
505            receiptStrings.append(receipt.save_to_string())
506
507         self.log("set_consumer", arg_dict, "receipt_list", receiptStrings)
508
509         return receiptStrings
510
511     def submit_receipt(self, receipt_str):
512         receipt = GacksReceipt(string = receipt_str)
513
514         reclist = receipt.get_records()
515         for rec in reclist:
516             existing_recs = self.calendar.query_handles_overlap([rec])
517
518             # XXX this sequence number stuff is broken
519             #for rec in existing_recs:
520             #    if receipt.GetSequence() < rec.GetSequence():
521             #        raise GacksOutOfDateSequence(receipt.GetSubject())
522
523             # remove any old records for this resource/time period
524             for remove_rec in existing_recs:
525                 self.calendar.remove_record(remove_rec)
526
527             # insert the new record in the calendar
528             self.calendar.insert_record(add_rec)
529
530     ##
531     # Resources interface
532
533     def get_resources(self, hash):
534         if self.resources.hash() == hash:
535             # client already has existing data
536             return (True, hash, None)
537         else:
538             # client has old data, return him the whole thing
539             return (False, self.resources.hash(), self.resources.pickled())
540
541     ##
542     # Picker interface
543
544     def pick(self, authToken_str, resourceName, resourceGroup, amount, expand):
545         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [resourceName, resourceGroup, amount, expand])
546
547         # XXX TODO: Make sure caller has the right to add nodes to this slice!
548
549         (delSet, addSet) = self.picker.pick(objectGID.get_hrn(), resourceName, resourceGroup, amount, expand)
550
551         delSet = [x["name"] for x in delSet]
552         addSet = [x["name"] for x in addSet]
553
554         return (True, delSet, addSet)
555
556     ##
557     # ASAP interface
558
559     def asap_job_to_handles(self, job):
560         handles = []
561         for resource in job.get_resources():
562             rspec = {"timeStart": int(time.time() + 60),   # reserve 60 seconds in the future
563                      "timeStop": int(time.time() + 60 + job.get_duration()),
564                      "id": resource.id,
565                      "unitQuantity": resource.qty,
566                      "isLastAllocator": "plc.arizona.grm"}  # XXX - hardcoded hrn of canopus
567             thisHandles = self.rspec_to_handles(rspec)
568             f = file("/tmp/asap_job_to_handles.txt", "w")   # is this leftover debugging?
569             f.write(str(rspec) + "=" + str(thisHandles) + "\n");
570             f.close()
571             if (thisHandles == []):
572                 return []
573             handles.extend(thisHandles)
574         return handles
575
576     def query_asap(self, id=None, allocatorHRN=None, consumerHRN=None):
577         jobs = self.queue.query(id, allocatorHRN, consumerHRN)
578         dicts = []
579         for job in jobs:
580             dicts.append(job.as_dict())
581         return dicts
582
583     def add_asap_internal(self, allocatorHRN, consumerHRN, duration, resourceStr):
584         job = AsapJob(allocatorHRN=allocatorHRN, consumerHRN=consumerHRN, duration=duration, resourceStr=resourceStr)
585         self.queue.add(job)
586
587     def add_asap(self, authToken_str, allocatorHRN, consumerHRN, duration, resourceStr):
588         (authToken,callerGID, objectGID) = self.authenticateToken(authToken_str, [allocatorHRN, consumerHRN, duration, resourceStr])
589         # XXX check here to see whether caller can add asap jobs
590         self.add_asap_internal(allocatorHRN, consumerHRN, duration, resourceStr)
591
592     def delete_asap(self, authToken_str, id=None, allocatorHRN=None, consumerHRN=None):
593         (authToken,callerGID, objectGID) = self.authenticateToken(authToken_str, [id, allocatorHRN, consumerHRN])
594         # XXX check here to see whether caller can delete asap jobs
595         self.queue.delete(id, allocatorHRN, consumerHRN)
596
597     ##
598     # GRM Support
599
600     def get_grm_hrn(self):
601         return "plc.arizona.grm"
602         # return GID(filename = "/usr/local/grm/var/private/keys/grm.gid").get_hrn()
603
604     def grm_delete(self, authToken_str, handle_strs):
605         arg_dict = locals()
606
607         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [handle_strs])
608
609         handles = strings_to_handles(handle_strs)
610
611         self.set_allocator_internal(handles, self.get_grm_hrn(), None, -1, True, authToken.reqHash)
612         self.set_consumer_internal(handles, None, authToken.reqHash)
613
614     def grm_reserve(self, authToken_str, rspec, consumerHRN, noreserve, ignorepolicy):
615         arg_dict = locals()
616
617         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [rspec, consumerHRN, noreserve, ignorepolicy])
618
619         if consumerHRN=="None":
620             consumerHRN=None
621
622         if noreserve=="False":
623             noreserve=False
624
625         if ignorepolicy=="False":
626             ignorepolicy=False
627
628         receipts = []
629         debugInfo = ""
630
631         handles = self.rspec_to_handles(rspec)
632
633         if (handles == []):
634             raise GacksEmptyHandles(str(rspec))
635
636         if not ignorepolicy:
637             enforcer = GacksEnforcer(self.accounts, self.policies, self.calendar, self.resources)
638             enforcer.check(objectGID.get_hrn(), handles)
639
640         if noreserve:
641             receipts = []
642         else:
643             receipts = self.set_allocator_internal(handles, self.get_grm_hrn(), objectGID.get_hrn(), -1, True, authToken.reqHash)
644             if consumerHRN:
645                 cReceipts = self.set_consumer_internal(handles, consumerHRN, authToken.reqHash)
646                 receipts = receipts + cReceipts
647
648         debugInfo += "rspec: " + str(rspec) + "\n"
649         for handle in handles:
650             debugInfo += "handle: " + handle.as_string() + "\n"
651         for receipt in receipts:
652             debugInfo += "receipt: " + receipt.save_to_string() + "\n"
653
654         receiptStrings = []
655         for receipt in receipts:
656            receipt.encode()
657            # XXX TODO sign the receipt
658            receiptStrings.append(receipt.save_to_string())
659
660         self.log("grm_reserve", arg_dict, "receipt_list", receiptStrings)
661
662         return (True, receiptStrings, debugInfo)
663
664     def asap_reserve(self, authToken_str, resources, duration, consumerHRN, noreserve):
665         # XXX this isn't being used
666         # right now just calling add_asap() in it's place, which does the same thing
667         # need to sort out some security concerns for asap
668
669         arg_dict = locals()
670
671         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [resources, duration, consumerHRN, noreserve])
672
673         receipts = []
674         debugInfo = ""
675
676         job = AsapJob()
677
678         for resource in resources.keys():
679             job.add(AsapResource(id=resource, qty=resources[resource]))
680
681         job.set_duration(duration)
682         job.set_allocator(objectGID.get_hrn())
683         job.set_consumer(consumerHRN)
684
685         debugInfo += "duration: " + str(job.get_duration()) + "\n"
686         debugInfo += "resources: " + str(job.get_resources_string()) + "\n"
687         debugInfo += "allocator: " + str(job.get_allocator()) + "\n"
688         debugInfo += "consumer: " + str(job.get_consumer()) + "\n"
689
690         self.add_asap_internal(self.get_grm_hrn(), job.get_allocator(), job.get_consumer(), job.get_duration(), job.get_resources_string())
691
692         self.log("asap_reserve", arg_dict, "receipt_list", [])
693
694         return (True, [], debugInfo)
695
696     ##
697     # Account interface
698
699     def get_account(self, authToken_str, name, kind):
700         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [name, kind])
701
702         # ... if we want to create accounts automatically ...
703         #acct = self.accounts.get_account(name, kind, create_if_not_exist=True)
704         #if acct.created:
705         #    #this is where we should fill in details about the account
706         #    acct.commit()
707
708         # assuming accounts will be created somewhere else...
709         #   returns None account does not exist
710         acct = self.accounts.get_account(name, kind, create_if_not_exist=False)
711         if acct:
712             acct.apply_inRate()
713             if acct.is_dirty():
714                 acct.commit()
715             return acct.as_dict()
716         else:
717             return None
718
719     ##
720     # Management interface
721
722     def admin_asap_run(self, authToken_str):
723         jobids_converted = []
724         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
725         jobs = self.queue.query()
726         for job in jobs:
727             handles = self.asap_job_to_handles(job)
728             if (handles != []):
729                 self.set_allocator_internal(handles, GRM_HRN, ASAP_HRN, 0, True, "asaphash")
730                 self.set_allocator_internal(handles, ASAP_HRN, job.get_allocator(), 0, True, "asaphash")
731                 self.set_consumer_internal(handles, job.get_consumer(), "asaphash")
732                 jobids_converted.append(job.get_jobid())
733                 self.queue.delete(id=job.get_jobid())
734         return jobids_converted
735
736     def admin_reset(self, authToken_str):
737         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
738
739         self.acls.test_acl("calendar", objectGID.get_hrn(), "reset")
740
741         self.init_calendar(reset = True)
742
743         self.log_msg("admin_reset", "reset complete");
744
745         return True
746
747     def admin_garbage_collect(self, authToken_str, timeStop):
748         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [timeStop])
749
750         self.acls.test_acl("calendar", objectGID.get_hrn(), "garbagecollect")
751
752         self.calendar.garbage_collect(timeStop)
753
754         self.log_msg("admin_garbage_collect", "purge complete");
755
756         return True
757
758     def admin_defragment(self, authToken_str):
759         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
760
761         self.acls.test_acl("calendar", objectGID.get_hrn(), "defragment")
762
763         absorb_list = self.calendar.defragment(int(time.time()))
764
765         self.log_msg("admin_defragment", "absorb_list=" + str(absorb_list))
766
767         return absorb_list
768
769