90f1e4ec877ec887edd1051ca83493cf84a03804
[raven.git] / apps / gacks / API.py
1 #!/usr/bin/env python
2
3 ##
4 # Gacks Server
5 ##
6
7 import atexit
8 import logging
9 import logging.handlers
10 import json
11 import tempfile
12 import os
13 import sys
14
15 from ravenlib.acl.mysqlacl import MysqlAclManager
16 from sfa.trust.credential import Credential
17
18 from gacksexcep import *
19 from gackscalendar import *
20 from gackscalendar_mysql import *
21 from gacksqueue import *
22 from gacksqueue_mysql import *
23 from gacksreceipt import *
24 from gackslog_mysql import *
25 from gacksresource import *
26 from gacksauth import GacksAuthToken
27 from gacksaccount import GacksAccount
28 from gacksaccountmanager import GacksAccountManager
29 from gacksinvoice import GacksInvoiceManager, STATE_PENDING, STATE_AGGREGATED
30 from gackspolicy import GacksPolicyDirectory
31 from gacksenforce import GacksEnforcer
32 from gacksnodepicker import GacksNodePicker
33 from gacksnodestatus import GacksNodeStatus
34 from gacksid import str_to_gacksid, hrn_matches_gacksid
35 from gackshandler_plc import set_logger_name as gackshandler_plc_set_logger_name
36 from gacksnodepicker import set_logger_name as gacksnodepicker_set_logger_name
37 from gacksbilling import GacksBilling
38
39 from AuthenticatedApi import AuthenticatedApi
40
41 GACKS_CENTRAL_HRN = "plc.arizona.gackscentral"
42 GRM_HRN = "plc.arizona.grm"
43 ASAP_HRN = "plc.arizona.asap"
44
45 LOG_DIR = "/usr/local/gackscentral/var/log"
46 TRUSTED_ROOTS_DIR = "/usr/local/gackscentral/var/trusted_roots"
47
48 UPDATE_CACHE_TIME = 60 # use a 60-second cache time for now (10* 60)
49 UPDATE_DURATION = (24*60*60) # update packets will contain 1 day of data
50
51 def dump_handles(x):
52    strs=[]
53    for handle in x:
54        strs.append(handle.as_string())
55    file("/tmp/foo","w").write("\n".join(strs))
56
57
58
59 ##
60 # GacksServer is a GeniServer that serves component interface requests.
61 #
62
63 class RemoteApi(AuthenticatedApi):
64
65     ##
66     # Create a new GacksServer object.
67
68     def __init__(self, encoding="utf-8", hrn=GACKS_CENTRAL_HRN, trustedRootsDir=TRUSTED_ROOTS_DIR, logDir=LOG_DIR):
69         self.gacksLogger = GacksMysqlLogger(directory = logDir)
70         self.hrn = hrn
71         self.calendar = None
72         self.accounts = None
73         self.invoices = None
74         self.policies = None
75         self.queue = None
76         self.init_logfile()
77         self.init_resources()
78         self.init_calendar(lock=True)
79         self.init_queue(lock=True)
80         self.init_accounts()
81         self.init_invoices()
82         self.init_policies()
83         self.init_acls()
84         self.billing = GacksBilling(self.accounts, self.invoices, self.policies, self.resources)
85         self.picker = GacksNodePicker(resources=self.resources)
86         self.node_status = GacksNodeStatus()
87         self.currentUpdateDicts = []
88         self.currentUpdateHash = None
89         self.currentUpdateTime = 0
90         return AuthenticatedApi.__init__(self, encoding, trustedRootsDir=trustedRootsDir)
91
92     def init_logfile(self):
93         logger = logging.getLogger("gacksapi")
94         logger.setLevel(logging.DEBUG)
95         socketHandler = logging.handlers.SocketHandler('localhost', logging.handlers.DEFAULT_TCP_LOGGING_PORT)
96         logger.addHandler(socketHandler)
97
98         gackshandler_plc_set_logger_name("gacksapi")
99         gacksnodepicker_set_logger_name("gacksapi")
100
101         logger = logging.getLogger("gacksinvoice")
102         logger.setLevel(logging.DEBUG)
103         socketHandler = logging.handlers.SocketHandler('localhost', logging.handlers.DEFAULT_TCP_LOGGING_PORT)
104         logger.addHandler(socketHandler)
105
106     def init_resources(self):
107         self.resources = GacksResourceDirectory("/etc/gacks/resources.d")
108
109     def init_calendar(self, reset=False, lock=False):
110         if self.calendar != None:
111             self.close_calendar()
112
113         self.calendar = GacksMySQLCalendar()  # GacksListCalendar()
114         self.calendar.open()
115
116         if lock:
117             self.calendar.lock()
118
119         if reset:
120             self.calendar.reset()
121
122         if self.calendar.is_empty():
123             self.log_msg("init_calendar", "empty calendar; setting default policy")
124             self.create_resources()
125         #too much junk in the log
126         #else:
127         #    self.log_msg("init_calendar", "existing calendar found")
128
129         if lock:
130             self.calendar.unlock()
131
132     def close_calendar(self):
133         if self.calendar != None:
134             self.calendar.close()
135             self.calendar = None
136
137     def init_queue(self, reset=False, lock=False):
138         if self.queue != None:
139             self.close_queue();
140
141         self.queue = GacksMySQLQueue();
142         self.queue.open();
143
144         if lock:
145             self.queue.lock()
146
147         if reset:
148             self.queue.reset()
149
150         if lock:
151             self.queue.unlock()
152
153     def close_queue(self):
154         if self.queue != None:
155             self.queue.close()
156             self.queue = None
157
158     def init_accounts(self):
159         self.accounts = GacksAccountManager()
160
161     def init_invoices(self):
162         self.invoices = GacksInvoiceManager(accounts=self.accounts)
163
164     def init_policies(self):
165         self.policies = GacksPolicyDirectory("/etc/gacks/policy.d")
166
167     def init_acls(self):
168         # The calendar knows the database parameters, so we'll get them from
169         # there. it's a bit messy though.
170
171         self.acls = MysqlAclManager(self.calendar.dbname, self.calendar.user, self.calendar.get_password(), self.calendar.address)
172
173         rights = {"plc.arizona.gacksadmin": ["reset", "defragment", "garbagecollect"]}
174         self.acls.create_acl_if_not_exist("calendar", rights)
175
176         rights = {"plc.arizona.gacksadmin": ["apply", "add"]}
177         self.acls.create_acl_if_not_exist("invoices", rights)
178
179         rights = {"plc.arizona.gacksadmin": ["get"]}
180         self.acls.create_acl_if_not_exist("reservations", rights)
181
182         rights = {"plc.arizona.gacksadmin": ["get", "list", "create", "update", "delete"]}
183         self.acls.create_acl_if_not_exist("accounts", rights)
184
185     def apilogger(self):
186         return logging.getLogger("gacksapi")
187
188     ##
189     # Override the API call function so that we lock the calendar while the API
190     # is in use.
191     def call(self, source, method, *args):
192         self.apilogger().debug("call: %s, %s" % (method, args))
193         self.calendar.lock()
194         try:
195             result = AuthenticatedApi.call(self, source, method, *args)
196         finally:
197             self.calendar.unlock()
198
199         return result
200
201     ##
202     # Register the server RPCs for Gacks
203
204     def register_functions(self):
205         AuthenticatedApi.register_functions(self)
206         self.register_function(self.get_handle)
207         self.register_function(self.get_update)
208         self.register_function(self.query_exact)
209         self.register_function(self.query_overlap)
210         self.register_function(self.set_allocator)
211         self.register_function(self.set_consumer)
212         self.register_function(self.set_consumer_hrn)
213         self.register_function(self.submit_receipt)
214
215         # picker
216         self.register_function(self.pick)
217
218         # asap
219         self.register_function(self.query_asap)
220         self.register_function(self.add_asap)
221         self.register_function(self.delete_asap)
222         self.register_function(self.admin_asap_run)
223
224         # resources
225         self.register_function(self.get_resources)
226
227         # grm
228         self.register_function(self.grm_delete)
229         self.register_function(self.grm_reserve)
230         self.register_function(self.asap_reserve)
231
232         # account management
233         self.register_function(self.get_account)
234         self.register_function(self.list_accounts)
235         self.register_function(self.create_account)
236         self.register_function(self.update_account)
237         self.register_function(self.delete_account)
238
239         # invoices
240         self.register_function(self.get_invoice)
241         self.register_function(self.add_charge)
242
243         # management interface
244         self.register_function(self.admin_reset)
245         self.register_function(self.admin_defragment)
246         self.register_function(self.admin_garbage_collect)
247         self.register_function(self.admin_apply_invoices)
248         self.register_function(self.admin_get_reservations)
249         self.register_function(self.admin_get_node_status)
250         self.register_function(self.admin_mail_invoices)
251
252     def log(self, func_name, arg_dict, result_name, result_val):
253         if self.gacksLogger:
254             self.gacksLogger.log(func_name, arg_dict, result_name, result_val)
255
256     def log_msg(self, func_name, msg):
257         if self.gacksLogger:
258             self.gacksLogger.log_msg(func_name, msg)
259
260     def create_resources(self):
261         for item in self.resources.resources:
262             r = GacksRecord(item.name, 0, item.qty, 0, INFINITY, [GACKS_CENTRAL_HRN, GRM_HRN], None)
263             self.calendar.insert_record(r)
264
265     def get_update(self, callerHash):
266         if (time.time()-self.currentUpdateTime) > UPDATE_CACHE_TIME:
267             self.currentUpdateTime = time.time()
268             self.currentUpdateDicts = self.calendar.query_overlap(timeStart=self.currentUpdateTime, timeStop=self.currentUpdateTime + UPDATE_DURATION)
269             self.currentUpdateHash = sha.new(str(self.currentUpdateDicts)).hexdigest()
270
271         if (callerHash == self.currentUpdateHash):
272             return ("notmodified", self.currentUpdateHash, [])
273
274         return ("update", self.currentUpdateHash, self.currentUpdateDicts)
275
276     def query_exact(self, id=None, unitStart=0, unitStop=INFINITY, timeStart=0, timeStop=INFINITY, hasAllocator=None, isLastAllocator=None):
277         recs = self.calendar.query_exact(id, unitStart, unitStop, timeStart, timeStop, hasAllocator, isLastAllocator)
278
279         recDicts = []
280         for rec in recs:
281             recDicts.append(rec.as_dict())
282
283         return recDicts
284
285     def query_overlap(self, id=None, unitStart=0, unitStop=INFINITY, timeStart=0, timeStop=INFINITY, hasAllocator=None, isLastAllocator=None):
286         recs = self.calendar.query_overlap(id, unitStart, unitStop, timeStart, timeStop, hasAllocator, isLastAllocator)
287
288         #file("/tmp/foo.txt", "w").write(str((id, unitStart, unitStop, timeStart, timeStop, hasAllocator, isLastAllocator)))
289
290         recDicts = []
291         for rec in recs:
292             recDicts.append(rec.as_dict())
293
294         return recDicts
295
296     def rspec_to_handles(self, rspec):
297         if isinstance(rspec, str):
298             rspec = eval(rspec)
299
300         if not isinstance(rspec, dict):
301             raise GacksBadRspecSyntax(str(rspec))
302
303         # an aggregate rspec is an rspec that contains a bunch of smaller
304         # rspecs. We resolve it recursively.
305
306         if "aggregate" in rspec:
307             handles=[]
308             for part in rspec["aggregate"]:
309                 thisHandles = self.rspec_to_handles(part)
310                 handles.extend(thisHandles)
311             return handles
312
313         id = rspec['id']
314         timeStart = rspec['timeStart']
315         timeStop = rspec['timeStop']
316
317         # this is the easy case -- the rspec specifies a unitStart and unitStop
318         # we simply return a handle with those parameters
319
320         if not ("unitQuantity" in rspec):
321             unitStart = rspec['unitStart']
322             unitStop = rspec['unitStop']
323             return [GacksHandle(id, unitStart, unitStop, timeStart, timeStop)]
324
325         # the hard case -- the rspec specifies a quantity. we need to get all
326         # of the available handles and batch them together until we satisfy
327         # that quantity
328
329         unitStart = int(rspec.get("unitStart", 0))
330         unitStop = int(rspec.get("unitStop", INFINITY))
331         unitQuantity = rspec['unitQuantity']
332         hasAllocator = rspec.get('hasAllocator', None)
333         isLastAllocator = rspec.get('isLastAllocator', None)
334
335         splitPoints = []
336
337         handles = []
338
339         qtyNeeded = int(unitQuantity)
340         records = self.calendar.query_overlap(id, unitStart, unitStop, timeStart, timeStop, hasAllocator, isLastAllocator)
341         for record in records:
342             if qtyNeeded <= 0:
343                 break
344
345             record.dump()
346
347             if not interval_contains(record.timeStart, record.timeStop, timeStart, timeStop):
348                 # the region is not a superset of the rspec that we're trying to fill,
349                 # so we can't use it.
350                 if (record.timeStop > timeStart) and (record.timeStop < timeStop):
351                     # keep track of where we ran into a region boundary
352                     splitPoints.append(record.timeStop)
353                 continue
354
355             # if a unitStart was specified, then make sure we don't violate it
356             if (record.unitStart<unitStart):
357                 record.unitStart = unitStart
358
359             # same with unitStop
360             if (record.unitStop>unitStop) and (record.unitStop!=INFINITY):
361                 record.unitStop = unitStop
362
363             # we might have turned it into an empty record
364             if (record.unitStart>=record.unitStop):
365                 continue
366
367             if record.get_quantity() == INFINITY:
368                 thisQty = record.unitStart + qtyNeeded
369             else:
370                 thisQty = min(record.get_quantity(), qtyNeeded)
371
372             qtyNeeded = qtyNeeded - thisQty
373
374             handle = GacksHandle(id, record.unitStart, record.unitStart + thisQty, timeStart, timeStop)
375             handles.append(handle)
376
377         if (qtyNeeded > 0) and (splitPoints != []):
378             # we couldn't find contiguous regions that satisfied our rspec, so
379             # lets subdivide the rspec into two pieces at a region boundary
380             # and try to resolve each region separately.
381             splitPoints.sort()
382             rspec1 = rspec.copy()
383             rspec2 = rspec.copy()
384             rspec1["timeStop"] = splitPoints[0]
385             rspec2["timeStart"] = splitPoints[0]
386             handles1 = self.rspec_to_handles(rspec1);
387             handles2 = self.rspec_to_handles(rspec2);
388             return handles1 + handles2
389
390         return handles
391
392     def get_handle(self, rspec):
393         arg_dict = locals()
394         handles = self.rspec_to_handles(rspec)
395         result = handles_to_strings(handles)
396
397         self.log("get_handle", arg_dict, "handle_list", result)
398
399         return result
400
401     def set_allocator_internal(self, handles, callerHRN, allocatorHRN, which, append, reqHash):
402         receiptList = []
403         for handle in handles:
404             # find the existing records that overlap the handle
405             existing_recs = self.calendar.query_handles_overlap([handle])
406
407             # XXX an exception thrown here can cause some receipts to be lost,
408             # since receipts will not be returned to the client
409             if not existing_recs:
410                 self.log("set_allocator", arg_dict, "exception", "GacksResourceNotFound:" + handle.as_string())
411                 raise GacksResourceNotFound(handle.as_string())
412
413             receipt = GacksReceipt(subject=handle.as_string(), handle=handle, action="set_allocator", requestHash=reqHash)
414
415             # TODO: Merge existing_recs
416
417             for item in existing_recs:
418                 if not item.contains_allocator(callerHRN):
419                     raise GacksCallerNotAllocator(str(item.as_string()) + " alloc=" + str(item.allocatorHRNs) + " caller=" + str(callerHRN) )
420                 if not item.is_superset(handle):
421                     raise GacksRequestSpansReservations(handle.as_string() + " on " + item.as_string())
422
423             leftovers = []
424             results = []
425             for item in existing_recs:
426                 if item.is_proper_superset(handle):
427                     parts = item.clone().split_subset(handle.unitStart, handle.unitStop, handle.timeStart, handle.timeStop)
428                     results.append(parts[0])
429                     leftovers.extend(parts[1:])
430                 else:
431                     results.append(item)
432
433             for item in existing_recs:
434                 #file("/tmp/gacksapi.txt","a").write("remove: " + item.as_string() + "\n")
435                 self.calendar.remove_record(item)
436
437             for item in leftovers:
438                 #file("/tmp/gacksapi.txt","a").write("insert leftover: " + item.as_string() + "\n")
439                 self.calendar.insert_record(item)
440
441             for item in results:
442                 #file("/tmp/gacksapi.txt","a").write("insert new: " + item.as_string() + "\n")
443                 item.set_allocator(callerHRN, allocatorHRN, which, append)
444                 self.calendar.insert_record(item)
445                 receipt.AddRecord(item)
446
447             receiptList.append(receipt)
448
449         return receiptList
450
451     def authenticateToken(self, token, argList, targetHrn=None):
452         # in case something goes wrong...
453         #file("/tmp/sreq.txt","w").write(str(argList))
454
455         token = GacksAuthToken(xmlrpc_arg=token, argList=argList)
456         token.authenticate(trusted_roots=self.trusted_roots, target_hrn=targetHrn, is_trusted_admin = True)   # XXX FIXME!
457
458         callerGid = token.get_gid_caller()
459         objectGid = token.get_gid_object()
460
461         return (token, callerGid, objectGid)
462
463     def set_allocator(self, authToken_str, handle_strs, allocatorGID_str, which, append,):
464         arg_dict = locals()
465
466         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [handle_strs, allocatorGID_str, which, append])
467
468         if allocatorGID_str:
469             allocatorGID = GID(string = allocatorGID_str)
470             allocatorGID.verify_chain(self.trusted_roots.get_list()) # self.trusted_cert_list)
471             allocatorHRN = allocatorGID.get_hrn()
472         else:
473             # if allocatorGID is None, then we'll treat this request as a truncate
474             allocatorHRN = None
475
476         receiptList = []
477
478         handles = strings_to_handles(handle_strs)
479
480         receiptList = self.set_allocator_internal(handles, objectGID.get_hrn(), allocatorHRN, which, append)
481
482         receiptStrings = []
483         for receipt in receiptList:
484            receipt.encode()
485            # XXX TODO sign the receipt
486            receiptStrings.append(receipt.save_to_string())
487
488         self.log("set_allocator", arg_dict, "receipt_list", receiptStrings)
489
490         return receiptStrings
491
492     def set_consumer_internal(self, handles, consumerHRN, reqHash):
493         receiptList = []
494         for handle in handles:
495             existing_recs = self.calendar.query_handles_overlap([handle])
496
497             if not existing_recs:
498                 raise GacksResourceNotFound(hand.as_string())
499
500             receipt = GacksReceipt(subject=handle.as_string(), handle=handle, action="set_consumer", requestHash=reqHash)
501
502             for rec in existing_recs:
503                 rec.set_consumer(consumerHRN)
504                 self.calendar.update_record(rec)
505                 receipt.AddRecord(rec)
506
507             receiptList.append(receipt)
508
509         return receiptList
510
511     def set_consumer_hrn(self, authToken_str, handle_strs, consumerHRN):
512         arg_dict = locals()
513
514         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [handle_strs, consumerHRN])
515
516         # XXX - are we missing a check to see that objectGID is on the allocator list?
517
518         handles = strings_to_handles(handle_strs)
519         receiptList = self.set_consumer_internal(handles, consumerHRN, authToken.reqHash)
520
521         receiptStrings = []
522         for receipt in receiptList:
523            receipt.encode()
524            # XXX TODO sign the receipt
525            receiptStrings.append(receipt.save_to_string())
526
527         self.log("set_consumer_hrn", arg_dict, "receipt_list", receiptStrings)
528
529         return receiptStrings
530
531
532     def set_consumer(self, authToken_str, handle_strs, cred_str):
533         arg_dict = locals()
534
535         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [handle_strs, cred_str])
536
537         # XXX - are we missing a check to see that objectGID is on the allocator list?
538
539         if cred_str:
540             cred = Credential(string = cred_str)
541
542             cred.verify_chain(self.trusted_roots.get_list())
543
544             # Checking to see if the cred can perform loanresources is equivalent
545             # to checking to see if it has the Bind privilege.
546             if not cred.can_perform("loanresources"):
547                 raise GacksMissingBindPrivilege(cred.get_privileges().save_to_string())
548
549             # Verify that GacksCentral is the callerGID of the credential. We need
550             # this so that the Gacks component can eventually call LoanResources
551             # using this credential.
552             # if callerGID.get_hrn() != self.
553
554             consumerHRN = cred.get_gid_object().get_hrn()
555         else:
556             cred = None
557             consumerHRN = None
558
559         handles = strings_to_handles(handle_strs)
560         receiptList = self.set_consumer_internal(handles, consumerHRN, authToken.reqHash)
561
562         receiptStrings = []
563         for receipt in receiptList:
564            receipt.encode()
565            # XXX TODO sign the receipt
566            receiptStrings.append(receipt.save_to_string())
567
568         self.log("set_consumer", arg_dict, "receipt_list", receiptStrings)
569
570         return receiptStrings
571
572     def submit_receipt(self, receipt_str):
573         receipt = GacksReceipt(string = receipt_str)
574
575         reclist = receipt.get_records()
576         for rec in reclist:
577             existing_recs = self.calendar.query_handles_overlap([rec])
578
579             # XXX this sequence number stuff is broken
580             #for rec in existing_recs:
581             #    if receipt.GetSequence() < rec.GetSequence():
582             #        raise GacksOutOfDateSequence(receipt.GetSubject())
583
584             # remove any old records for this resource/time period
585             for remove_rec in existing_recs:
586                 self.calendar.remove_record(remove_rec)
587
588             # insert the new record in the calendar
589             self.calendar.insert_record(add_rec)
590
591     ##
592     # Resources interface
593
594     def get_resources(self, hash, format="pickled"):
595         if self.resources.hash() == hash:
596             # client already has existing data
597             return (True, hash, None)
598         else:
599             if (format=="pickled"):
600                 data = self.resources.pickled()
601             elif (format=="json"):
602                 data = self.resources.json()
603             elif (format=="dict"):
604                 data = self.resources.as_dict()
605
606             # client has old data, return him the whole thing
607             return (False, self.resources.hash(), data)
608
609     ##
610     # Picker interface
611
612     def pick(self, authToken_str, resourceName, resourceGroup, amount, expand, options={}):
613         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [resourceName, resourceGroup, amount, expand, options])
614
615         # XXX TODO: Make sure caller has the right to add nodes to this slice!
616
617         result = self.picker.pick(objectGID.get_hrn(), resourceName, resourceGroup, amount, expand, options)
618
619         # These are potentially large and not-xml-friendly. We'll pass just the names ("addSetNames", "delSetNames").
620         if "addSet" in result:
621             del result["addSet"];
622         if "delSet" in result:
623             del result["delSet"];
624
625         return result
626
627     ##
628     # ASAP interface
629
630     def asap_job_to_handles(self, job):
631         handles = []
632         for resource in job.get_resources():
633             rspec = {"timeStart": int(time.time() + 60),   # reserve 60 seconds in the future
634                      "timeStop": int(time.time() + 60 + job.get_duration()),
635                      "id": resource.id,
636                      "unitQuantity": resource.qty,
637                      "isLastAllocator": "plc.arizona.grm"}  # XXX - hardcoded hrn of canopus
638             thisHandles = self.rspec_to_handles(rspec)
639             f = file("/tmp/asap_job_to_handles.txt", "w")   # is this leftover debugging?
640             f.write(str(rspec) + "=" + str(thisHandles) + "\n");
641             f.close()
642             if (thisHandles == []):
643                 return []
644             handles.extend(thisHandles)
645         return handles
646
647     def query_asap(self, id=None, allocatorHRN=None, consumerHRN=None):
648         jobs = self.queue.query(id, allocatorHRN, consumerHRN)
649         dicts = []
650         for job in jobs:
651             dicts.append(job.as_dict())
652         return dicts
653
654     def add_asap_internal(self, allocatorHRN, consumerHRN, duration, resourceStr):
655         job = AsapJob(allocatorHRN=allocatorHRN, consumerHRN=consumerHRN, duration=duration, resourceStr=resourceStr)
656         self.queue.add(job)
657
658     def add_asap(self, authToken_str, allocatorHRN, consumerHRN, duration, resourceStr):
659         (authToken,callerGID, objectGID) = self.authenticateToken(authToken_str, [allocatorHRN, consumerHRN, duration, resourceStr])
660         # XXX check here to see whether caller can add asap jobs
661         self.add_asap_internal(allocatorHRN, consumerHRN, duration, resourceStr)
662
663     def delete_asap(self, authToken_str, id=None, allocatorHRN=None, consumerHRN=None):
664         (authToken,callerGID, objectGID) = self.authenticateToken(authToken_str, [id, allocatorHRN, consumerHRN])
665         # XXX check here to see whether caller can delete asap jobs
666         self.queue.delete(id, allocatorHRN, consumerHRN)
667
668     ##
669     # GRM Support
670
671     def get_grm_hrn(self):
672         return "plc.arizona.grm"
673         # return GID(filename = "/usr/local/grm/var/private/keys/grm.gid").get_hrn()
674
675     def grm_delete(self, authToken_str, handle_strs):
676         arg_dict = locals()
677
678         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [handle_strs])
679
680         handles = strings_to_handles(handle_strs)
681
682         self.set_allocator_internal(handles, self.get_grm_hrn(), None, -1, True, authToken.reqHash)
683         self.set_consumer_internal(handles, None, authToken.reqHash)
684
685     def grm_reserve(self, authToken_str, rspec, consumerHRN, noreserve, ignorepolicy):
686         arg_dict = locals()
687
688         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [rspec, consumerHRN, noreserve, ignorepolicy])
689
690         if consumerHRN=="None":
691             consumerHRN=None
692
693         if noreserve=="False":
694             noreserve=False
695
696         if ignorepolicy=="False":
697             ignorepolicy=False
698
699         receipts = []
700         debugInfo = ""
701
702         handles = self.rspec_to_handles(rspec)
703
704         if (handles == []):
705             raise GacksEmptyHandles(str(rspec))
706
707         if not ignorepolicy:
708             enforcer = GacksEnforcer(self.accounts, self.policies, self.calendar, self.resources)
709             enforcer.check(objectGID.get_hrn(), handles)
710
711         if noreserve:
712             receipts = []
713         else:
714             receipts = self.set_allocator_internal(handles, self.get_grm_hrn(), objectGID.get_hrn(), -1, True, authToken.reqHash)
715             if consumerHRN:
716                 cReceipts = self.set_consumer_internal(handles, consumerHRN, authToken.reqHash)
717                 receipts = receipts + cReceipts
718
719         debugInfo += "rspec: " + str(rspec) + "\n"
720         for handle in handles:
721             debugInfo += "handle: " + handle.as_string() + "\n"
722         for receipt in receipts:
723             debugInfo += "receipt: " + receipt.save_to_string() + "\n"
724
725         receiptStrings = []
726         for receipt in receipts:
727            receipt.encode()
728            # XXX TODO sign the receipt
729            receiptStrings.append(receipt.save_to_string())
730
731         self.log("grm_reserve", arg_dict, "receipt_list", receiptStrings)
732
733         return (True, receiptStrings, debugInfo)
734
735     def asap_reserve(self, authToken_str, resources, duration, consumerHRN, noreserve):
736         # XXX this isn't being used
737         # right now just calling add_asap() in it's place, which does the same thing
738         # need to sort out some security concerns for asap
739
740         arg_dict = locals()
741
742         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [resources, duration, consumerHRN, noreserve])
743
744         receipts = []
745         debugInfo = ""
746
747         job = AsapJob()
748
749         for resource in resources.keys():
750             job.add(AsapResource(id=resource, qty=resources[resource]))
751
752         job.set_duration(duration)
753         job.set_allocator(objectGID.get_hrn())
754         job.set_consumer(consumerHRN)
755
756         debugInfo += "duration: " + str(job.get_duration()) + "\n"
757         debugInfo += "resources: " + str(job.get_resources_string()) + "\n"
758         debugInfo += "allocator: " + str(job.get_allocator()) + "\n"
759         debugInfo += "consumer: " + str(job.get_consumer()) + "\n"
760
761         self.add_asap_internal(self.get_grm_hrn(), job.get_allocator(), job.get_consumer(), job.get_duration(), job.get_resources_string())
762
763         self.log("asap_reserve", arg_dict, "receipt_list", [])
764
765         return (True, [], debugInfo)
766
767     ##
768     # Account interface
769
770     def get_account(self, authToken_str, name, kind, create_if_not_exist=False):
771         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [name, kind, create_if_not_exist])
772
773         name = str_to_gacksid(name)
774
775         acct = self.accounts.get_account(name, kind, create_if_not_exist=create_if_not_exist)
776         if acct:
777             acct.apply_inRate()
778             if acct.is_dirty() or acct.created:
779                 acct.commit()
780
781             result = acct.as_dict()
782
783             # fill in some policy fields as they are useful to the clients
784             policy = self.policies.get_policy(acct.level, None)
785             if policy is not None:
786                 result['term'] = policy.term
787                 result['membershipFee'] = policy.membershipFee
788                 result['membershipFeeMonths'] = policy.membershipFeeMonths
789                 upgradeDict = {}
790                 for x in self.policies.get_upgrades(acct.level):
791                     upgradeDict[x.name] = x.as_dict()
792                 result['upgrades'] = upgradeDict
793
794             return result
795         else:
796             return None
797
798     def delete_account(self, authToken_str, name, kind):
799         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [name, kind])
800
801         name = str_to_gacksid(name)
802
803         self.acls.test_acl("accounts", objectGID.get_hrn(), "delete")
804
805         self.accounts.delete_account(name, kind)
806
807         return 1
808
809     def list_accounts(self, authToken_str):
810         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
811
812         self.acls.test_acl("accounts", objectGID.get_hrn(), "list")
813
814         records = self.accounts.list_accounts()
815
816         return records
817
818     def create_account(self, authToken_str, args):
819         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [args])
820
821         self.acls.test_acl("accounts", objectGID.get_hrn(), "create")
822
823         name = str_to_gacksid(args["name"])
824         kind = args["kind"]
825
826         acct = self.accounts.get_account(name, kind, create_if_not_exist=True)
827         if not acct.created:
828             raise GacksAccountExists("Account %s %s already exists" % (name, kind))
829
830         del args["name"]
831         del args["kind"]
832         acct.update(args)
833
834         return acct.id
835
836     def update_account(self, authToken_str, args, mode="admin"):
837         """
838             args is a list of fields to update
839
840             mode is "admin", "restricted", or "user"
841                admin - superuser mode, all changes allowed
842                restricted - only allow things a user could do (used by gacksadmin for enduser changes)
843                user - restricted mode, and requires objectGID to match the account being changed
844         """
845
846         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [args, mode])
847
848         name = str_to_gacksid(args["name"])
849         kind = args["kind"]
850
851         if (mode=="admin") or (mode=="restricted"):
852             self.acls.test_acl("accounts", objectGID.get_hrn(), "update")
853         else:
854             if (not hrn_matches_gacksid(objectGID.get_hrn(), name)):
855                 raise GacksNotAuthorized("You don't have permission to update %s using objectGID %s" % (name, objectGID.get_hrn()) )
856
857         if (mode in ["user", "restricted"]):
858             user_allowed_fields = ["name", "kind", "level", "autoRenew", "freezeUnreserved", "billingContacts"];
859             for field in args:
860                 if not (field in user_allowed_fields):
861                     raise GacksFieldNotAllowed("you are not allowed to change %s" % field)
862
863         acct = self.accounts.get_account(name, kind, create_if_not_exist=False)
864
865         if ("level" in args):
866             new_level = args["level"]
867             new_policy = self.policies.get_policy(new_level, None)
868
869             if not new_policy:
870                 raise GacksUnknownPolicy("The service level %s does not exist" % new_level)
871
872             if (mode in ["user", "restricted"]):
873                 if not (acct.level in new_policy.upgradeFrom):
874                     raise GacksUpgradeNotAllowed("You are not allowed to upgrade from %s to %s" % (acct.level, new_level))
875
876         # If the level is changed, then bump serviceStartDate to the current
877         # date, since a new service commitment has started.
878         if ("level" in args) and (args["level"] != acct.level):
879             args["serviceStartDate"] = time.time()
880
881         del args["name"]
882         del args["kind"]
883         acct.update(args)
884
885         return acct.id
886
887     ##
888     # Invoice interface
889
890     def get_invoice(self, authToken_str, name=None, state=None, date_filter = {}, parent_id=None, filter=None, want_summary = False):
891         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [name])
892
893         if filter is not None:
894             # alternate syntax: filter is a dict that contains all relevant filtering information
895             assert(name==None)
896             assert(state==None)
897             assert(date_filter=={} or date_filter==None)
898             assert(parent_id==None)
899             file("/tmp/foo.txt","a").write(str(filter)+"\n")
900
901             # php does this -- empty dict is empty list
902             if (filter==[]):
903                 filter = {}
904
905             if "account" in filter:
906                 filter["account"] = str_to_gacksid(filter["account"])
907
908             invoice = self.invoices.get_invoice_prime(filter, lookup_objects=True)
909         else:
910             file("/tmp/foo.txt","a").write(str((name,state,date_filter,parent_id))+"\n")
911             name = str_to_gacksid(name)
912             invoice = self.invoices.get_invoice(name, state=state, date_filter=date_filter, parent_id=parent_id, lookup_objects=True)
913
914         result = invoice.as_dict()
915
916         if want_summary:
917             result["summary"] = invoice.get_summary()
918
919         return result
920
921     def add_charge(self, authToken_str, account, object, kind_id, date, amount, state=STATE_PENDING, parent_id=-1):
922         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [account, object, kind_id, date, amount, state, parent_id])
923
924         self.acls.test_acl("invoices", objectGID.get_hrn(), "add")
925
926         account = str_to_gacksid(account)
927
928         self.invoices.add_charge(account, object, kind_id, date, amount, state, parent_id)
929         return 0;
930
931     ##
932     # Management interface
933
934     def admin_asap_run(self, authToken_str):
935         jobids_converted = []
936         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
937         jobs = self.queue.query()
938         for job in jobs:
939             handles = self.asap_job_to_handles(job)
940             if (handles != []):
941                 self.set_allocator_internal(handles, GRM_HRN, ASAP_HRN, 0, True, "asaphash")
942                 self.set_allocator_internal(handles, ASAP_HRN, job.get_allocator(), 0, True, "asaphash")
943                 self.set_consumer_internal(handles, job.get_consumer(), "asaphash")
944                 jobids_converted.append(job.get_jobid())
945                 self.queue.delete(id=job.get_jobid())
946         return jobids_converted
947
948     def admin_reset(self, authToken_str):
949         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
950
951         self.acls.test_acl("calendar", objectGID.get_hrn(), "reset")
952
953         self.init_calendar(reset = True)
954
955         self.log_msg("admin_reset", "reset complete");
956
957         return True
958
959     def admin_garbage_collect(self, authToken_str, timeStop):
960         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [timeStop])
961
962         self.acls.test_acl("calendar", objectGID.get_hrn(), "garbagecollect")
963
964         self.calendar.garbage_collect(timeStop)
965
966         self.log_msg("admin_garbage_collect", "purge complete");
967
968         return True
969
970     def admin_defragment(self, authToken_str):
971         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
972
973         self.acls.test_acl("calendar", objectGID.get_hrn(), "defragment")
974
975         # Round down to the nearest hour. Otherwise defragment may cause the
976         # start time to GRM records to not start on the hour, making the
977         # slot unreservable.
978         tDefrag = int(time.time() / 3600) * 3600
979
980         absorb_list = self.calendar.defragment(tDefrag)
981
982         self.log_msg("admin_defragment", "absorb_list=" + str(absorb_list))
983
984         return absorb_list
985
986     def admin_apply_invoices(self, authToken_str):
987         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
988
989         # do we need an ACL for invoice modifications?
990         self.acls.test_acl("invoices", objectGID.get_hrn(), "apply")
991
992         self.billing.do_nightly()
993
994         self.log_msg("admin_apply_invoices", "apply complete");
995
996         return True
997
998     def admin_get_reservations(self, authToken_str):
999         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
1000
1001         # do we need an ACL for invoice modifications?
1002         self.acls.test_acl("reservations", objectGID.get_hrn(), "get")
1003
1004         resv = json.load(file("/usr/local/gackscentral/var/resv", "r"))
1005         return resv
1006
1007     def admin_get_node_status(self, authToken_str, filter=None):
1008         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
1009
1010         self.acls.test_acl("reservations", objectGID.get_hrn(), "get")
1011
1012         nodes = self.node_status.query(filter)
1013         return nodes
1014
1015     def admin_mail_invoices(self, authToken_str, filter={}):
1016         (authToken, callerGID, objectGID) = self.authenticateToken(authToken_str, [])
1017
1018         what_i_did = {}
1019         what_i_did["_filter"] = filter
1020
1021         accounts = self.accounts.get_accounts(filter)
1022         for account in accounts:
1023             if account.billingContacts:
1024                 email_list = [x.strip() for x in account.billingContacts.split(",")]
1025                 if (self.invoices.mail_invoice(account.name, email_list = email_list)):
1026                     what_i_did[account.name] = email_list
1027
1028         return what_i_did
1029
1030
1031