00aed7b43800eaf87e620d52dd17f077abcf5a32
[raven.git] / apps / gacks / API.py
1 #!/usr/bin/env python
2
3 ##
4 # Gacks Server
5 ##
6
7 import atexit
8 import logging
9 import logging.handlers
10 import json
11 import tempfile
12 import os
13 import sys
14
15 from ravenlib.acl.mysqlacl import MysqlAclManager
16 from sfa.trust.credential import Credential
17
18 from gacksexcep import *
19 from gackscalendar import *
20 from gackscalendar_mysql import *
21 from gacksqueue import *
22 from gacksqueue_mysql import *
23 from gacksreceipt import *
24 from gackslog_mysql import *
25 from gacksresource import *
26 from gacksauth import GacksAuthToken
27 from gacksaccount import GacksAccount
28 from gacksaccountmanager import GacksAccountManager
29 from gacksinvoice import GacksInvoiceManager, STATE_PENDING, STATE_AGGREGATED
30 from gackspolicy import GacksPolicyDirectory
31 from gacksenforce import GacksEnforcer
32 from gacksnodepicker import GacksNodePicker
33 from gacksnodestatus import GacksNodeStatus
34 from gacksid import str_to_gacksid, hrn_matches_gacksid, str_to_hrn
35 from gackshandler_plc import set_logger_name as gackshandler_plc_set_logger_name
36 from gacksnodepicker import set_logger_name as gacksnodepicker_set_logger_name
37 from gacksbilling import GacksBilling
38
39 from AuthenticatedApi import AuthenticatedApi
40
41 GACKS_CENTRAL_HRN = "plc.arizona.gackscentral"
42 GRM_HRN = "plc.arizona.grm"
43 ASAP_HRN = "plc.arizona.asap"
44
45 LOG_DIR = "/usr/local/gackscentral/var/log"
46 TRUSTED_ROOTS_DIR = "/usr/local/gackscentral/var/trusted_roots"
47
48 UPDATE_CACHE_TIME = 60 # use a 60-second cache time for now (10* 60)
49 UPDATE_DURATION = (24*60*60) # update packets will contain 1 day of data
50
51 def dump_handles(x):
52    strs=[]
53    for handle in x:
54        strs.append(handle.as_string())
55    file("/tmp/foo","w").write("\n".join(strs))
56
57
58
59 ##
60 # GacksServer is a GeniServer that serves component interface requests.
61 #
62
63 class RemoteApi(AuthenticatedApi):
64
65     ##
66     # Create a new GacksServer object.
67
68     def __init__(self, encoding="utf-8", hrn=GACKS_CENTRAL_HRN, trustedRootsDir=TRUSTED_ROOTS_DIR, logDir=LOG_DIR):
69         self.gacksLogger = GacksMysqlLogger(directory = logDir)
70         self.hrn = hrn
71         self.calendar = None
72         self.accounts = None
73         self.invoices = None
74         self.policies = None
75         self.queue = None
76         self.init_logfile()
77         self.init_resources()
78         self.init_calendar(lock=True)
79         self.init_queue(lock=True)
80         self.init_accounts()
81         self.init_invoices()
82         self.init_policies()
83         self.init_acls()
84         self.billing = GacksBilling(self.accounts, self.invoices, self.policies, self.resources)
85         self.picker = GacksNodePicker(resources=self.resources)
86         self.node_status = GacksNodeStatus()
87         self.currentUpdateDicts = []
88         self.currentUpdateHash = None
89         self.currentUpdateTime = 0
90         return AuthenticatedApi.__init__(self, encoding, trustedRootsDir=trustedRootsDir)
91
92     def init_logfile(self):
93         logger = logging.getLogger("gacksapi")
94         logger.setLevel(logging.DEBUG)
95         socketHandler = logging.handlers.SocketHandler('localhost', logging.handlers.DEFAULT_TCP_LOGGING_PORT)
96         logger.addHandler(socketHandler)
97
98         gackshandler_plc_set_logger_name("gacksapi")
99         gacksnodepicker_set_logger_name("gacksapi")
100
101         logger = logging.getLogger("gacksinvoice")
102         logger.setLevel(logging.DEBUG)
103         socketHandler = logging.handlers.SocketHandler('localhost', logging.handlers.DEFAULT_TCP_LOGGING_PORT)
104         logger.addHandler(socketHandler)
105
106     def init_resources(self):
107         self.resources = GacksResourceDirectory("/etc/gacks/resources.d")
108
109     def init_calendar(self, reset=False, lock=False):
110         if self.calendar != None:
111             self.close_calendar()
112
113         self.calendar = GacksMySQLCalendar()  # GacksListCalendar()
114         self.calendar.open()
115
116         if lock:
117             self.calendar.lock()
118
119         if reset:
120             self.calendar.reset()
121
122         if self.calendar.is_empty():
123             self.log_msg("init_calendar", "empty calendar; setting default policy")
124             self.create_resources()
125         #too much junk in the log
126         #else:
127         #    self.log_msg("init_calendar", "existing calendar found")
128
129         if lock:
130             self.calendar.unlock()
131
132     def close_calendar(self):
133         if self.calendar != None:
134             self.calendar.close()
135             self.calendar = None
136
137     def init_queue(self, reset=False, lock=False):
138         if self.queue != None:
139             self.close_queue();
140
141         self.queue = GacksMySQLQueue();
142         self.queue.open();
143
144         if lock:
145             self.queue.lock()
146
147         if reset:
148             self.queue.reset()
149
150         if lock:
151             self.queue.unlock()
152
153     def close_queue(self):
154         if self.queue != None:
155             self.queue.close()
156             self.queue = None
157
158     def init_accounts(self):
159         self.accounts = GacksAccountManager()
160
161     def init_invoices(self):
162         self.invoices = GacksInvoiceManager(accounts=self.accounts)
163
164     def init_policies(self):
165         self.policies = GacksPolicyDirectory("/etc/gacks/policy.d")
166
167     def init_acls(self):
168         # The calendar knows the database parameters, so we'll get them from
169         # there. it's a bit messy though.
170
171         self.acls = MysqlAclManager(self.calendar.dbname, self.calendar.user, self.calendar.get_password(), self.calendar.address)
172
173         rights = {"plc.arizona.gacksadmin": ["reset", "defragment", "garbagecollect"]}
174         self.acls.create_acl_if_not_exist("calendar", rights)
175
176         rights = {"plc.arizona.gacksadmin": ["apply", "add"]}
177         self.acls.create_acl_if_not_exist("invoices", rights)
178
179         rights = {"plc.arizona.gacksadmin": ["get"]}
180         self.acls.create_acl_if_not_exist("reservations", rights)
181
182         rights = {"plc.arizona.gacksadmin": ["get", "list", "create", "update", "delete"]}
183         self.acls.create_acl_if_not_exist("accounts", rights)
184
185     def apilogger(self):
186         return logging.getLogger("gacksapi")
187
188     ##
189     # Override the API call function so that we lock the calendar while the API
190     # is in use.
191     def call(self, source, method, *args):
192         self.apilogger().debug("call: %s, %s" % (method, args))
193         self.calendar.lock()
194         try:
195             result = AuthenticatedApi.call(self, source, method, *args)
196         finally:
197             self.calendar.unlock()
198
199         return result
200
201     ##
202     # Register the server RPCs for Gacks
203
204     def register_functions(self):
205         AuthenticatedApi.register_functions(self)
206         self.register_function(self.get_handle)
207         self.register_function(self.get_update)
208         self.register_function(self.query_exact)
209         self.register_function(self.query_overlap)
210         self.register_function(self.set_allocator)
211         self.register_function(self.set_consumer)
212         self.register_function(self.set_consumer_hrn)
213         self.register_function(self.submit_receipt)
214
215         # picker
216         self.register_function(self.pick)
217
218         # asap
219         self.register_function(self.query_asap)
220         self.register_function(self.add_asap)
221         self.register_function(self.delete_asap)
222         self.register_function(self.admin_asap_run)
223
224         # resources
225         self.register_function(self.get_resources)
226
227         # grm
228         self.register_function(self.grm_delete)
229         self.register_function(self.grm_reserve)
230         self.register_function(self.asap_reserve)
231
232         # account management
233         self.register_function(self.get_account)
234         self.register_function(self.list_accounts)
235         self.register_function(self.create_account)
236         self.register_function(self.update_account)
237         self.register_function(self.delete_account)
238
239         # invoices
240         self.register_function(self.get_invoice)
241         self.register_function(self.add_charge)
242
243         # management interface
244         self.register_function(self.admin_reset)
245         self.register_function(self.admin_defragment)
246         self.register_function(self.admin_garbage_collect)
247         self.register_function(self.admin_apply_invoices)
248         self.register_function(self.admin_get_reservations)
249         self.register_function(self.admin_get_node_status)
250         self.register_function(self.admin_mail_invoices)
251
252     def log(self, func_name, arg_dict, result_name, result_val):
253         if self.gacksLogger:
254             self.gacksLogger.log(func_name, arg_dict, result_name, result_val)
255
256     def log_msg(self, func_name, msg):
257         if self.gacksLogger:
258             self.gacksLogger.log_msg(func_name, msg)
259
260     def create_resources(self):
261         for item in self.resources.resources:
262             r = GacksRecord(item.name, 0, item.qty, 0, INFINITY, [GACKS_CENTRAL_HRN, GRM_HRN], None)
263             self.calendar.insert_record(r)
264
265     def get_update(self, callerHash):
266         if (time.time()-self.currentUpdateTime) > UPDATE_CACHE_TIME:
267             self.currentUpdateTime = time.time()
268             self.currentUpdateDicts = self.calendar.query_overlap(timeStart=self.currentUpdateTime, timeStop=self.currentUpdateTime + UPDATE_DURATION)
269             self.currentUpdateHash = sha.new(str(self.currentUpdateDicts)).hexdigest()
270
271         if (callerHash == self.currentUpdateHash):
272             return ("notmodified", self.currentUpdateHash, [])
273
274         return ("update", self.currentUpdateHash, self.currentUpdateDicts)
275
276     def query_exact(self, id=None, unitStart=0, unitStop=INFINITY, timeStart=0, timeStop=INFINITY, hasAllocator=None, isLastAllocator=None):
277         recs = self.calendar.query_exact(id, unitStart, unitStop, timeStart, timeStop, hasAllocator, isLastAllocator)
278
279         recDicts = []
280         for rec in recs:
281             recDicts.append(rec.as_dict())
282
283         return recDicts
284
285     def query_overlap(self, id=None, unitStart=0, unitStop=INFINITY, timeStart=0, timeStop=INFINITY, hasAllocator=None, isLastAllocator=None):
286         recs = self.calendar.query_overlap(id, unitStart, unitStop, timeStart, timeStop, hasAllocator, isLastAllocator)
287
288         #file("/tmp/foo.txt", "w").write(str((id, unitStart, unitStop, timeStart, timeStop, hasAllocator, isLastAllocator)))
289
290         recDicts = []
291         for rec in recs:
292             recDicts.append(rec.as_dict())
293
294         return recDicts
295
296     def rspec_to_handles(self, rspec):
297         if isinstance(rspec, str):
298             rspec = eval(rspec)
299
300         if not isinstance(rspec, dict):
301             raise GacksBadRspecSyntax(str(rspec))
302
303         # an aggregate rspec is an rspec that contains a bunch of smaller
304         # rspecs. We resolve it recursively.
305
306         if "aggregate" in rspec:
307             handles=[]
308             for part in rspec["aggregate"]:
309                 thisHandles = self.rspec_to_handles(part)
310                 handles.extend(thisHandles)
311             return handles
312
313         id = rspec['id']
314         timeStart = rspec['timeStart']
315         timeStop = rspec['timeStop']
316
317         # this is the easy case -- the rspec specifies a unitStart and unitStop
318         # we simply return a handle with those parameters
319
320         if not ("unitQuantity" in rspec):
321             unitStart = rspec['unitStart']
322             unitStop = rspec['unitStop']
323             return [GacksHandle(id, unitStart, unitStop, timeStart, timeStop)]
324
325         # the hard case -- the rspec specifies a quantity. we need to get all
326         # of the available handles and batch them together until we satisfy
327         # that quantity
328
329         unitStart = int(rspec.get("unitStart", 0))
330         unitStop = int(rspec.get("unitStop", INFINITY))
331         unitQuantity = rspec['unitQuantity']
332         hasAllocator = rspec.get('hasAllocator', None)
333         isLastAllocator = rspec.get('isLastAllocator', None)
334
335         splitPoints = []
336
337         handles = []
338
339         qtyNeeded = int(unitQuantity)
340         records = self.calendar.query_overlap(id, unitStart, unitStop, timeStart, timeStop, hasAllocator, isLastAllocator)
341         for record in records:
342             if qtyNeeded <= 0:
343                 break
344
345             record.dump()
346
347             if not interval_contains(record.timeStart, record.timeStop, timeStart, timeStop):
348                 # the region is not a superset of the rspec that we're trying to fill,
349                 # so we can't use it.
350                 if (record.timeStop > timeStart) and (record.timeStop < timeStop):
351                     # keep track of where we ran into a region boundary
352                     splitPoints.append(record.timeStop)
353                 continue
354
355             # if a unitStart was specified, then make sure we don't violate it
356             if (record.unitStart<unitStart):
357                 record.unitStart = unitStart
358
359             # same with unitStop
360             if (record.unitStop>unitStop) and (record.unitStop!=INFINITY):
361                 record.unitStop = unitStop
362
363             # we might have turned it into an empty record
364             if (record.unitStart>=record.unitStop):
365                 continue
366
367             if record.get_quantity() == INFINITY:
368                 thisQty = record.unitStart + qtyNeeded
369             else:
370                 thisQty = min(record.get_quantity(), qtyNeeded)
371
372             qtyNeeded = qtyNeeded - thisQty
373
374             handle = GacksHandle(id, record.unitStart, record.unitStart + thisQty, timeStart, timeStop)
375             handles.append(handle)
376
377         if (qtyNeeded > 0) and (splitPoints != []):
378             # we couldn't find contiguous regions that satisfied our rspec, so
379             # lets subdivide the rspec into two pieces at a region boundary
380             # and try to resolve each region separately.
381             splitPoints.sort()
382             rspec1 = rspec.copy()
383             rspec2 = rspec.copy()
384             rspec1["timeStop"] = splitPoints[0]
385             rspec2["timeStart"] = splitPoints[0]
386             handles1 = self.rspec_to_handles(rspec1);
387             handles2 = self.rspec_to_handles(rspec2);
388             return handles1 + handles2
389
390         return handles
391
392     def get_handle(self, rspec):
393         arg_dict = locals()
394         handles = self.rspec_to_handles(rspec)
395         result = handles_to_strings(handles)
396
397         self.log("get_handle", arg_dict, "handle_list", result)
398
399         return result
400
401     def set_allocator_internal(self, handles, callerHRN, allocatorHRN, which, append, reqHash):
402         receiptList = []
403         for handle in handles:
404             # find the existing records that overlap the handle
405             existing_recs = self.calendar.query_handles_overlap([handle])
406
407             # XXX an exception thrown here can cause some receipts to be lost,
408             # since receipts will not be returned to the client
409             if not existing_recs:
410                 self.log("set_allocator", arg_dict, "exception", "GacksResourceNotFound:" + handle.as_string())
411                 raise GacksResourceNotFound(handle.as_string())
412
413             receipt = GacksReceipt(subject=handle.as_string(), handle=handle, action="set_allocator", requestHash=reqHash)
414
415             # TODO: Merge existing_recs
416
417             for item in existing_recs:
418                 if not item.contains_allocator(callerHRN):
419                     raise GacksCallerNotAllocator(str(item.as_string()) + " alloc=" + str(item.allocatorHRNs) + " caller=" + str(callerHRN) )
420                 if not item.is_superset(handle):
421                     raise GacksRequestSpansReservations(handle.as_string() + " on " + item.as_string())
422
423             leftovers = []
424             results = []
425             for item in existing_recs:
426                 if item.is_proper_superset(handle):
427                     parts = item.clone().split_subset(handle.unitStart, handle.unitStop, handle.timeStart, handle.timeStop)
428                     results.append(parts[0])
429                     leftovers.extend(parts[1:])
430                 else:
431                     results.append(item)
432
433             for item in existing_recs:
434                 #file("/tmp/gacksapi.txt","a").write("remove: " + item.as_string() + "\n")
435                 self.calendar.remove_record(item)
436
437             for item in leftovers:
438                 #file("/tmp/gacksapi.txt","a").write("insert leftover: " + item.as_string() + "\n")
439                 self.calendar.insert_record(item)
440
441             for item in results:
442                 #file("/tmp/gacksapi.txt","a").write("insert new: " + item.as_string() + "\n")
443                 item.set_allocator(callerHRN, allocatorHRN, which, append)
444                 self.calendar.insert_record(item)
445                 receipt.AddRecord(item)
446
447             receiptList.append(receipt)
448
449         return receiptList
450
451     def authenticateToken(self, token, argList, targetHrn=None):
452         # in case something goes wrong...
453         #file("/tmp/sreq.txt","w").write(str(argList))
454
455         token = GacksAuthToken(xmlrpc_arg=token, argList=argList)
456         token.authenticate(trusted_roots=self.trusted_roots, target_hrn=targetHrn, is_trusted_admin = True)   # XXX FIXME!
457
458         callerGid = token.get_gid_caller()
459         objectGid = token.get_gid_object()
460
461         return (token, callerGid, objectGid)
462
463     def set_allocator(self, authToken_str, handle_strs, allocatorGID_str, which, append,):
464         arg_dict = locals()
465
466         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [handle_strs, allocatorGID_str, which, append])
467
468         if allocatorGID_str:
469             allocatorGID = GID(string = allocatorGID_str)
470             allocatorGID.verify_chain(self.trusted_roots.get_list()) # self.trusted_cert_list)
471             allocatorHRN = allocatorGID.get_hrn()
472         else:
473             # if allocatorGID is None, then we'll treat this request as a truncate
474             allocatorHRN = None
475
476         receiptList = []
477
478         handles = strings_to_handles(handle_strs)
479
480         receiptList = self.set_allocator_internal(handles, objectGID.get_hrn(), allocatorHRN, which, append)
481
482         receiptStrings = []
483         for receipt in receiptList:
484            receipt.encode()
485            # XXX TODO sign the receipt
486            receiptStrings.append(receipt.save_to_string())
487
488         self.log("set_allocator", arg_dict, "receipt_list", receiptStrings)
489
490         return receiptStrings
491
492     def set_consumer_internal(self, handles, consumerHRN, reqHash):
493         receiptList = []
494         for handle in handles:
495             existing_recs = self.calendar.query_handles_overlap([handle])
496
497             if not existing_recs:
498                 raise GacksResourceNotFound(hand.as_string())
499
500             receipt = GacksReceipt(subject=handle.as_string(), handle=handle, action="set_consumer", requestHash=reqHash)
501
502             for rec in existing_recs:
503                 rec.set_consumer(consumerHRN)
504                 self.calendar.update_record(rec)
505                 receipt.AddRecord(rec)
506
507             receiptList.append(receipt)
508
509         return receiptList
510
511     def set_consumer_hrn(self, authToken_str, handle_strs, consumerHRN):
512         arg_dict = locals()
513
514         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [handle_strs, consumerHRN])
515
516         # XXX - are we missing a check to see that objectGID is on the allocator list?
517
518         handles = strings_to_handles(handle_strs)
519         receiptList = self.set_consumer_internal(handles, consumerHRN, authToken.reqHash)
520
521         receiptStrings = []
522         for receipt in receiptList:
523            receipt.encode()
524            # XXX TODO sign the receipt
525            receiptStrings.append(receipt.save_to_string())
526
527         self.log("set_consumer_hrn", arg_dict, "receipt_list", receiptStrings)
528
529         return receiptStrings
530
531
532     def set_consumer(self, authToken_str, handle_strs, cred_str):
533         arg_dict = locals()
534
535         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [handle_strs, cred_str])
536
537         # XXX - are we missing a check to see that objectGID is on the allocator list?
538
539         if cred_str:
540             cred = Credential(string = cred_str)
541
542             cred.verify_chain(self.trusted_roots.get_list())
543
544             # Checking to see if the cred can perform loanresources is equivalent
545             # to checking to see if it has the Bind privilege.
546             if not cred.can_perform("loanresources"):
547                 raise GacksMissingBindPrivilege(cred.get_privileges().save_to_string())
548
549             # Verify that GacksCentral is the callerGID of the credential. We need
550             # this so that the Gacks component can eventually call LoanResources
551             # using this credential.
552             # if callerGID.get_hrn() != self.
553
554             consumerHRN = cred.get_gid_object().get_hrn()
555         else:
556             cred = None
557             consumerHRN = None
558
559         handles = strings_to_handles(handle_strs)
560         receiptList = self.set_consumer_internal(handles, consumerHRN, authToken.reqHash)
561
562         receiptStrings = []
563         for receipt in receiptList:
564            receipt.encode()
565            # XXX TODO sign the receipt
566            receiptStrings.append(receipt.save_to_string())
567
568         self.log("set_consumer", arg_dict, "receipt_list", receiptStrings)
569
570         return receiptStrings
571
572     def submit_receipt(self, receipt_str):
573         receipt = GacksReceipt(string = receipt_str)
574
575         reclist = receipt.get_records()
576         for rec in reclist:
577             existing_recs = self.calendar.query_handles_overlap([rec])
578
579             # XXX this sequence number stuff is broken
580             #for rec in existing_recs:
581             #    if receipt.GetSequence() < rec.GetSequence():
582             #        raise GacksOutOfDateSequence(receipt.GetSubject())
583
584             # remove any old records for this resource/time period
585             for remove_rec in existing_recs:
586                 self.calendar.remove_record(remove_rec)
587
588             # insert the new record in the calendar
589             self.calendar.insert_record(add_rec)
590
591     ##
592     # Resources interface
593
594     def get_resources(self, hash, format="pickled"):
595         if self.resources.hash() == hash:
596             # client already has existing data
597             return (True, hash, None)
598         else:
599             if (format=="pickled"):
600                 data = self.resources.pickled()
601             elif (format=="json"):
602                 data = self.resources.json()
603             elif (format=="dict"):
604                 data = self.resources.as_dict()
605
606             # client has old data, return him the whole thing
607             return (False, self.resources.hash(), data)
608
609     ##
610     # Picker interface
611
612     def pick(self, authToken_str, resourceName, resourceGroup, amount, expand, options={}):
613         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [resourceName, resourceGroup, amount, expand, options])
614
615         # XXX TODO: Make sure caller has the right to add nodes to this slice!
616
617         result = self.picker.pick(objectGID.get_hrn(), resourceName, resourceGroup, amount, expand, options)
618
619         # These are potentially large and not-xml-friendly. We'll pass just the names ("addSetNames", "delSetNames").
620         if "addSet" in result:
621             del result["addSet"];
622         if "delSet" in result:
623             del result["delSet"];
624
625         return result
626
627     ##
628     # ASAP interface
629
630     def asap_job_to_handles(self, job):
631         handles = []
632         for resource in job.get_resources():
633             rspec = {"timeStart": int(time.time() + 60),   # reserve 60 seconds in the future
634                      "timeStop": int(time.time() + 60 + job.get_duration()),
635                      "id": resource.id,
636                      "unitQuantity": resource.qty,
637                      "isLastAllocator": "plc.arizona.grm"}  # XXX - hardcoded hrn of canopus
638             thisHandles = self.rspec_to_handles(rspec)
639             f = file("/tmp/asap_job_to_handles.txt", "w")   # is this leftover debugging?
640             f.write(str(rspec) + "=" + str(thisHandles) + "\n");
641             f.close()
642             if (thisHandles == []):
643                 return []
644             handles.extend(thisHandles)
645         return handles
646
647     def query_asap(self, id=None, allocatorHRN=None, consumerHRN=None):
648         jobs = self.queue.query(id, allocatorHRN, consumerHRN)
649         dicts = []
650         for job in jobs:
651             dicts.append(job.as_dict())
652         return dicts
653
654     def add_asap_internal(self, allocatorHRN, consumerHRN, duration, resourceStr):
655         job = AsapJob(allocatorHRN=allocatorHRN, consumerHRN=consumerHRN, duration=duration, resourceStr=resourceStr)
656         self.queue.add(job)
657
658     def add_asap(self, authToken_str, allocatorHRN, consumerHRN, duration, resourceStr):
659         (authToken,callerGID, objectGID) = self.authenticateToken(authToken_str, [allocatorHRN, consumerHRN, duration, resourceStr])
660         # XXX check here to see whether caller can add asap jobs
661         self.add_asap_internal(allocatorHRN, consumerHRN, duration, resourceStr)
662
663     def delete_asap(self, authToken_str, id=None, allocatorHRN=None, consumerHRN=None):
664         (authToken,callerGID, objectGID) = self.authenticateToken(authToken_str, [id, allocatorHRN, consumerHRN])
665         # XXX check here to see whether caller can delete asap jobs
666         self.queue.delete(id, allocatorHRN, consumerHRN)
667
668     ##
669     # GRM Support
670
671     def get_grm_hrn(self):
672         return "plc.arizona.grm"
673         # return GID(filename = "/usr/local/grm/var/private/keys/grm.gid").get_hrn()
674
675     def grm_delete(self, authToken_str, handle_strs):
676         arg_dict = locals()
677
678         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [handle_strs])
679
680         handles = strings_to_handles(handle_strs)
681
682         self.set_allocator_internal(handles, self.get_grm_hrn(), None, -1, True, authToken.reqHash)
683         self.set_consumer_internal(handles, None, authToken.reqHash)
684
685     def grm_reserve(self, authToken_str, rspec, consumerHRN, noreserve, ignorepolicy):
686         arg_dict = locals()
687
688         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [rspec, consumerHRN, noreserve, ignorepolicy])
689
690         if consumerHRN=="None":
691             consumerHRN=None
692
693         if noreserve=="False":
694             noreserve=False
695
696         if ignorepolicy=="False":
697             ignorepolicy=False
698
699         receipts = []
700         debugInfo = ""
701
702         handles = self.rspec_to_handles(rspec)
703
704         if (handles == []):
705             raise GacksEmptyHandles(str(rspec))
706
707         if not ignorepolicy:
708             enforcer = GacksEnforcer(self.accounts, self.policies, self.calendar, self.resources)
709             enforcer.check(objectGID.get_hrn(), handles)
710
711         if noreserve:
712             receipts = []
713         else:
714             receipts = self.set_allocator_internal(handles, self.get_grm_hrn(), objectGID.get_hrn(), -1, True, authToken.reqHash)
715             if consumerHRN:
716                 cReceipts = self.set_consumer_internal(handles, consumerHRN, authToken.reqHash)
717                 receipts = receipts + cReceipts
718
719         debugInfo += "rspec: " + str(rspec) + "\n"
720         for handle in handles:
721             debugInfo += "handle: " + handle.as_string() + "\n"
722         for receipt in receipts:
723             debugInfo += "receipt: " + receipt.save_to_string() + "\n"
724
725         receiptStrings = []
726         for receipt in receipts:
727            receipt.encode()
728            # XXX TODO sign the receipt
729            receiptStrings.append(receipt.save_to_string())
730
731         self.log("grm_reserve", arg_dict, "receipt_list", receiptStrings)
732
733         return (True, receiptStrings, debugInfo)
734
735     def asap_reserve(self, authToken_str, resources, duration, consumerHRN, noreserve):
736         # XXX this isn't being used
737         # right now just calling add_asap() in it's place, which does the same thing
738         # need to sort out some security concerns for asap
739
740         arg_dict = locals()
741
742         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [resources, duration, consumerHRN, noreserve])
743
744         receipts = []
745         debugInfo = ""
746
747         job = AsapJob()
748
749         for resource in resources.keys():
750             job.add(AsapResource(id=resource, qty=resources[resource]))
751
752         job.set_duration(duration)
753         job.set_allocator(objectGID.get_hrn())
754         job.set_consumer(consumerHRN)
755
756         debugInfo += "duration: " + str(job.get_duration()) + "\n"
757         debugInfo += "resources: " + str(job.get_resources_string()) + "\n"
758         debugInfo += "allocator: " + str(job.get_allocator()) + "\n"
759         debugInfo += "consumer: " + str(job.get_consumer()) + "\n"
760
761         self.add_asap_internal(self.get_grm_hrn(), job.get_allocator(), job.get_consumer(), job.get_duration(), job.get_resources_string())
762
763         self.log("asap_reserve", arg_dict, "receipt_list", [])
764
765         return (True, [], debugInfo)
766
767     ##
768     # Account interface
769
770     def get_account(self, authToken_str, name, kind, create_if_not_exist=False):
771         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [name, kind, create_if_not_exist])
772
773         want_bucket = False
774         if kind=="user_bucket":
775             # little bit of a hack. "user_bucket" means to fetch the user account
776             # and the bucket analysis
777             kind="user"
778             want_bucket = True
779
780         orig_name = name
781         name = str_to_gacksid(name)
782
783         acct = self.accounts.get_account(name, kind, create_if_not_exist=create_if_not_exist)
784         if acct is not None:
785             acct.apply_inRate()
786             if acct.is_dirty() or acct.created:
787                 acct.commit()
788
789             result = acct.as_dict()
790
791             # fill in some policy fields as they are useful to the clients
792             policy = self.policies.get_policy(acct.level, "plc.vicci.cores")
793             if policy is None:
794                 # there wasn't a policy associated with plc.vicci.cores. Try to find any policy.
795                 # this shouldn't happen
796                 policy = self.policies.get_policy(acct.level, None)
797             if policy is not None:
798                 result['term'] = policy.term
799                 result['membershipFee'] = policy.membershipFee
800                 result['membershipFeeMonths'] = policy.membershipFeeMonths
801                 upgradeDict = {}
802                 for x in self.policies.get_upgrades(acct.level):
803                     upgradeDict[x.name] = x.as_dict()
804                 result['upgrades'] = upgradeDict
805
806                 if want_bucket:
807                     hrn = str_to_hrn(orig_name)
808                     bucket_acct = self.accounts.get_account(name, "bucket")
809                     if bucket_acct is not None:
810                         enforcer = GacksEnforcer(self.accounts, self.policies, self.calendar, self.resources)
811                         result['bucketAnalysis'] = enforcer.check_bucket(hrn, policy, "plc.vicci.cores", [], acct, enforce=False)
812
813             return result
814         else:
815             return None
816
817     def delete_account(self, authToken_str, name, kind):
818         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [name, kind])
819
820         name = str_to_gacksid(name)
821
822         self.acls.test_acl("accounts", objectGID.get_hrn(), "delete")
823
824         self.accounts.delete_account(name, kind)
825
826         return 1
827
828     def list_accounts(self, authToken_str):
829         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
830
831         self.acls.test_acl("accounts", objectGID.get_hrn(), "list")
832
833         records = self.accounts.list_accounts()
834
835         return records
836
837     def create_account(self, authToken_str, args):
838         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [args])
839
840         self.acls.test_acl("accounts", objectGID.get_hrn(), "create")
841
842         name = str_to_gacksid(args["name"])
843         kind = args["kind"]
844
845         acct = self.accounts.get_account(name, kind, create_if_not_exist=True)
846         if not acct.created:
847             raise GacksAccountExists("Account %s %s already exists" % (name, kind))
848
849         del args["name"]
850         del args["kind"]
851         acct.update(args)
852
853         return acct.id
854
855     def update_account(self, authToken_str, args, mode="admin"):
856         """
857             args is a list of fields to update
858
859             mode is "admin", "restricted", or "user"
860                admin - superuser mode, all changes allowed
861                restricted - only allow things a user could do (used by gacksadmin for enduser changes)
862                user - restricted mode, and requires objectGID to match the account being changed
863         """
864
865         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [args, mode])
866
867         name = str_to_gacksid(args["name"])
868         kind = args["kind"]
869
870         if (mode=="admin") or (mode=="restricted"):
871             self.acls.test_acl("accounts", objectGID.get_hrn(), "update")
872         else:
873             if (not hrn_matches_gacksid(objectGID.get_hrn(), name)):
874                 raise GacksNotAuthorized("You don't have permission to update %s using objectGID %s" % (name, objectGID.get_hrn()) )
875
876         if (mode in ["user", "restricted"]):
877             user_allowed_fields = ["name", "kind", "level", "autoRenew", "freezeUnreserved", "billingContacts"];
878             for field in args:
879                 if not (field in user_allowed_fields):
880                     raise GacksFieldNotAllowed("you are not allowed to change %s" % field)
881
882         acct = self.accounts.get_account(name, kind, create_if_not_exist=False)
883
884         if ("level" in args):
885             new_level = args["level"]
886             new_policy = self.policies.get_policy(new_level, None)
887
888             if not new_policy:
889                 raise GacksUnknownPolicy("The service level %s does not exist" % new_level)
890
891             if (mode in ["user", "restricted"]):
892                 if not (acct.level in new_policy.upgradeFrom):
893                     raise GacksUpgradeNotAllowed("You are not allowed to upgrade from %s to %s" % (acct.level, new_level))
894
895         # If the level is changed, then bump serviceStartDate to the current
896         # date, since a new service commitment has started.
897         if ("level" in args) and (args["level"] != acct.level):
898             args["serviceStartDate"] = time.time()
899
900         del args["name"]
901         del args["kind"]
902         acct.update(args)
903
904         return acct.id
905
906     ##
907     # Invoice interface
908
909     def get_invoice(self, authToken_str, name=None, state=None, date_filter = {}, parent_id=None, filter=None, want_summary = False):
910         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [name])
911
912         if filter is not None:
913             # alternate syntax: filter is a dict that contains all relevant filtering information
914             assert(name==None)
915             assert(state==None)
916             assert(date_filter=={} or date_filter==None)
917             assert(parent_id==None)
918             file("/tmp/foo.txt","a").write(str(filter)+"\n")
919
920             # php does this -- empty dict is empty list
921             if (filter==[]):
922                 filter = {}
923
924             if "account" in filter:
925                 filter["account"] = str_to_gacksid(filter["account"])
926
927             invoice = self.invoices.get_invoice_prime(filter, lookup_objects=True)
928         else:
929             file("/tmp/foo.txt","a").write(str((name,state,date_filter,parent_id))+"\n")
930             name = str_to_gacksid(name)
931             invoice = self.invoices.get_invoice(name, state=state, date_filter=date_filter, parent_id=parent_id, lookup_objects=True)
932
933         result = invoice.as_dict()
934
935         if want_summary:
936             result["summary"] = invoice.get_summary()
937
938         return result
939
940     def add_charge(self, authToken_str, account, object, kind_id, date, amount, state=STATE_PENDING, parent_id=-1):
941         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [account, object, kind_id, date, amount, state, parent_id])
942
943         self.acls.test_acl("invoices", objectGID.get_hrn(), "add")
944
945         account = str_to_gacksid(account)
946
947         self.invoices.add_charge(account, object, kind_id, date, amount, state, parent_id)
948         return 0;
949
950     ##
951     # Management interface
952
953     def admin_asap_run(self, authToken_str):
954         jobids_converted = []
955         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
956         jobs = self.queue.query()
957         for job in jobs:
958             handles = self.asap_job_to_handles(job)
959             if (handles != []):
960                 self.set_allocator_internal(handles, GRM_HRN, ASAP_HRN, 0, True, "asaphash")
961                 self.set_allocator_internal(handles, ASAP_HRN, job.get_allocator(), 0, True, "asaphash")
962                 self.set_consumer_internal(handles, job.get_consumer(), "asaphash")
963                 jobids_converted.append(job.get_jobid())
964                 self.queue.delete(id=job.get_jobid())
965         return jobids_converted
966
967     def admin_reset(self, authToken_str):
968         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
969
970         self.acls.test_acl("calendar", objectGID.get_hrn(), "reset")
971
972         self.init_calendar(reset = True)
973
974         self.log_msg("admin_reset", "reset complete");
975
976         return True
977
978     def admin_garbage_collect(self, authToken_str, timeStop):
979         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [timeStop])
980
981         self.acls.test_acl("calendar", objectGID.get_hrn(), "garbagecollect")
982
983         self.calendar.garbage_collect(timeStop)
984
985         self.log_msg("admin_garbage_collect", "purge complete");
986
987         return True
988
989     def admin_defragment(self, authToken_str):
990         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
991
992         self.acls.test_acl("calendar", objectGID.get_hrn(), "defragment")
993
994         # Round down to the nearest hour. Otherwise defragment may cause the
995         # start time to GRM records to not start on the hour, making the
996         # slot unreservable.
997         tDefrag = int(time.time() / 3600) * 3600
998
999         absorb_list = self.calendar.defragment(tDefrag)
1000
1001         self.log_msg("admin_defragment", "absorb_list=" + str(absorb_list))
1002
1003         return absorb_list
1004
1005     def admin_apply_invoices(self, authToken_str):
1006         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
1007
1008         # do we need an ACL for invoice modifications?
1009         self.acls.test_acl("invoices", objectGID.get_hrn(), "apply")
1010
1011         self.billing.do_nightly()
1012
1013         self.log_msg("admin_apply_invoices", "apply complete");
1014
1015         return True
1016
1017     def admin_get_reservations(self, authToken_str):
1018         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
1019
1020         # do we need an ACL for invoice modifications?
1021         self.acls.test_acl("reservations", objectGID.get_hrn(), "get")
1022
1023         resv = json.load(file("/usr/local/gackscentral/var/resv", "r"))
1024         return resv
1025
1026     def admin_get_node_status(self, authToken_str, filter=None):
1027         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
1028
1029         self.acls.test_acl("reservations", objectGID.get_hrn(), "get")
1030
1031         nodes = self.node_status.query(filter)
1032         return nodes
1033
1034     def admin_mail_invoices(self, authToken_str, filter={}):
1035         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
1036
1037         return self.billing.mail_invoices(filter, email_list=["smbaker@gmail.com"])
1038
1039
1040