first stab at a design where each incoming API call has its own dbsession
Thierry Parmentelat [Thu, 7 Nov 2013 21:00:58 +0000 (22:00 +0100)]
very incomplete (focusing on PL for now) also needs work for the v2/v3 adapter
probably impovable on many respects
seems to pass the PL tests though

21 files changed:
sfa/client/sfaadmin.py
sfa/dummy/dummyslices.py
sfa/generic/__init__.py
sfa/generic/architecture.txt
sfa/importer/__init__.py
sfa/importer/dummyimporter.py
sfa/importer/iotlabimporter.py
sfa/importer/nitosimporter.py
sfa/importer/openstackimporter.py
sfa/importer/plimporter.py
sfa/managers/driver.py
sfa/managers/registry_manager.py
sfa/managers/v2_to_v3_adapter.py
sfa/methods/GetSelfCredential.py
sfa/planetlab/plaggregate.py
sfa/planetlab/pldriver.py
sfa/planetlab/plslices.py
sfa/server/sfaapi.py
sfa/server/threadedserver.py
sfa/storage/alchemy.py
sfa/storage/model.py

index b7b45d4..82c915c 100755 (executable)
@@ -125,9 +125,8 @@ class RegistryCommands(Commands):
         """Check the correspondance between the GID and the PubKey"""
 
         # db records
-        from sfa.storage.alchemy import dbsession
         from sfa.storage.model import RegRecord
-        db_query = dbsession.query(RegRecord).filter_by(type=type)
+        db_query = self.api.dbsession().query(RegRecord).filter_by(type=type)
         if xrn and not all:
             hrn = Xrn(xrn).get_hrn()
             db_query = db_query.filter_by(hrn=hrn)
@@ -315,10 +314,9 @@ class CertCommands(Commands):
     @args('-o', '--outfile', dest='outfile', metavar='<outfile>', help='output file', default=None)
     def export(self, xrn, type=None, outfile=None):
         """Fetch an object's GID from the Registry"""  
-        from sfa.storage.alchemy import dbsession
         from sfa.storage.model import RegRecord
         hrn = Xrn(xrn).get_hrn()
-        request=dbsession.query(RegRecord).filter_by(hrn=hrn)
+        request=self.api.dbsession().query(RegRecord).filter_by(hrn=hrn)
         if type: request = request.filter_by(type=type)
         record=request.first()
         if record:
index dddf1a6..7785031 100644 (file)
@@ -8,7 +8,6 @@ from sfa.util.xrn import Xrn, get_leaf, get_authority, urn_to_hrn
 
 from sfa.rspecs.rspec import RSpec
 from sfa.storage.model import SliverAllocation
-from sfa.storage.alchemy import dbsession
 
 from sfa.dummy.dummyxrn import DummyXrn, hrn_to_dummy_slicename
 
index 99d15bc..2b067ff 100644 (file)
@@ -67,7 +67,7 @@ class Generic:
         # xxx can probably drop support for managers implemented as modules 
         # which makes it a bit awkward
         manager_class_or_module = self.make_manager(api.interface)
-        driver = self.make_driver (api.config, api.interface)
+        driver = self.make_driver (api)
         ### arrange stuff together
         # add a manager wrapper
         manager_wrap = ManagerWrapper(manager_class_or_module,api.interface,api.config)
@@ -100,7 +100,9 @@ class Generic:
             logger.log_exc_critical(message)
         
     # need interface to select the right driver
-    def make_driver (self, config, interface):
+    def make_driver (self, api):
+        config=api.config
+        interface=api.interface
         flavour = self.flavour
         message="Generic.make_driver for flavour=%s and interface=%s"%(flavour,interface)
         
@@ -111,7 +113,7 @@ class Generic:
         try:
             class_obj = getattr(self,classname)()
             logger.debug("%s : %s"%(message,class_obj))
-            return class_obj(config)
+            return class_obj(api)
         except:
             logger.log_exc_critical(message)
         
index ff63549..cb81c8b 100644 (file)
@@ -22,6 +22,7 @@ configurable in a flavour (e.g. sfa.generic.pl.py)
 api.manager 
 manager.driver
 api.driver (for convenience)
+driver.api
 
 ------
 example
index 35f8acd..d1721da 100644 (file)
@@ -10,7 +10,9 @@ from sfa.util.sfalogging import _SfaLogger
 from sfa.trust.hierarchy import Hierarchy
 #from sfa.trust.trustedroots import TrustedRoots
 from sfa.trust.gid import create_uuid
-from sfa.storage.alchemy import dbsession
+# using global alchemy.session() here is fine 
+# as importer is on standalone one-shot process
+from sfa.storage.alchemy import global_dbsession
 from sfa.storage.model import RegRecord, RegAuthority, RegUser
 from sfa.trust.certificate import convert_public_key, Keypair
 
@@ -35,7 +37,7 @@ class Importer:
    
     # check before creating a RegRecord entry as we run this over and over
     def record_exists (self, type, hrn):
-       return dbsession.query(RegRecord).filter_by(hrn=hrn,type=type).count()!=0 
+       return global_dbsession.query(RegRecord).filter_by(hrn=hrn,type=type).count()!=0 
 
     def create_top_level_auth_records(self, hrn):
         """
@@ -56,8 +58,8 @@ class Importer:
             auth_record = RegAuthority(hrn=hrn, gid=auth_info.get_gid_object(),
                                        authority=get_authority(hrn))
             auth_record.just_created()
-            dbsession.add (auth_record)
-            dbsession.commit()
+            global_dbsession.add (auth_record)
+            global_dbsession.commit()
             self.logger.info("SfaImporter: imported authority (parent) %s " % auth_record)     
    
 
@@ -76,8 +78,8 @@ class Importer:
         user_record = RegUser(hrn=hrn, gid=auth_info.get_gid_object(),
                               authority=get_authority(hrn))
         user_record.just_created()
-        dbsession.add (user_record)
-        dbsession.commit()
+        global_dbsession.add (user_record)
+        global_dbsession.commit()
         self.logger.info("SfaImporter: importing user (slicemanager) %s " % user_record)
 
 
@@ -98,8 +100,8 @@ class Importer:
             interface_record = RegAuthority(type=type, hrn=hrn, gid=gid,
                                             authority=get_authority(hrn))
             interface_record.just_created()
-            dbsession.add (interface_record)
-            dbsession.commit()
+            global_dbsession.add (interface_record)
+            global_dbsession.commit()
             self.logger.info("SfaImporter: imported authority (%s) %s " % (type,interface_record))
  
     def run(self, options=None):
index 5001849..d274b27 100644 (file)
@@ -23,7 +23,9 @@ from sfa.util.xrn import Xrn, get_leaf, get_authority, hrn_to_urn
 from sfa.trust.gid import create_uuid    
 from sfa.trust.certificate import convert_public_key, Keypair
 
-from sfa.storage.alchemy import dbsession
+# using global alchemy.session() here is fine 
+# as importer is on standalone one-shot process
+from sfa.storage.alchemy import global_dbsession
 from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, RegUser, RegKey
 
 from sfa.dummy.dummyshell import DummyShell    
@@ -99,7 +101,7 @@ class DummyImporter:
         shell = DummyShell (config)
 
         ######## retrieve all existing SFA objects
-        all_records = dbsession.query(RegRecord).all()
+        all_records = global_dbsession.query(RegRecord).all()
 
         # create hash by (type,hrn) 
         # we essentially use this to know if a given record is already known to SFA 
@@ -159,8 +161,8 @@ class DummyImporter:
                                                pointer= -1,
                                                authority=get_authority(site_hrn))
                     site_record.just_created()
-                    dbsession.add(site_record)
-                    dbsession.commit()
+                    global_dbsession.add(site_record)
+                    global_dbsession.commit()
                     self.logger.info("DummyImporter: imported authority (site) : %s" % site_record) 
                     self.remember_record (site_record)
                 except:
@@ -190,8 +192,8 @@ class DummyImporter:
                                                pointer =node['node_id'],
                                                authority=get_authority(node_hrn))
                         node_record.just_created()
-                        dbsession.add(node_record)
-                        dbsession.commit()
+                        global_dbsession.add(node_record)
+                        global_dbsession.commit()
                         self.logger.info("DummyImporter: imported node: %s" % node_record)  
                         self.remember_record (node_record)
                     except:
@@ -249,8 +251,8 @@ class DummyImporter:
                         else:
                             self.logger.warning("No key found for user %s"%user_record)
                         user_record.just_created()
-                        dbsession.add (user_record)
-                        dbsession.commit()
+                        global_dbsession.add (user_record)
+                        global_dbsession.commit()
                         self.logger.info("DummyImporter: imported person: %s" % user_record)
                         self.remember_record ( user_record )
 
@@ -277,7 +279,7 @@ class DummyImporter:
                                 user_record.reg_keys=[ RegKey (pubkey)]
                             self.logger.info("DummyImporter: updated person: %s" % user_record)
                     user_record.email = user['email']
-                    dbsession.commit()
+                    global_dbsession.commit()
                     user_record.stale=False
                 except:
                     self.logger.log_exc("DummyImporter: failed to import user %d %s"%(user['user_id'],user['email']))
@@ -296,8 +298,8 @@ class DummyImporter:
                                                  pointer=slice['slice_id'],
                                                  authority=get_authority(slice_hrn))
                         slice_record.just_created()
-                        dbsession.add(slice_record)
-                        dbsession.commit()
+                        global_dbsession.add(slice_record)
+                        global_dbsession.commit()
                         self.logger.info("DummyImporter: imported slice: %s" % slice_record)  
                         self.remember_record ( slice_record )
                     except:
@@ -309,7 +311,7 @@ class DummyImporter:
                 # record current users affiliated with the slice
                 slice_record.reg_researchers = \
                     [ self.locate_by_type_pointer ('user',user_id) for user_id in slice['user_ids'] ]
-                dbsession.commit()
+                global_dbsession.commit()
                 slice_record.stale=False
 
         ### remove stale records
@@ -328,5 +330,5 @@ class DummyImporter:
                 self.logger.warning("stale not found with %s"%record)
             if stale:
                 self.logger.info("DummyImporter: deleting stale record: %s" % record)
-                dbsession.delete(record)
-                dbsession.commit()
+                global_dbsession.delete(record)
+                global_dbsession.commit()
index 8687437..bfa094b 100644 (file)
@@ -11,7 +11,9 @@ from sfa.iotlab.iotlabpostgres import TestbedAdditionalSfaDB
 from sfa.trust.certificate import Keypair, convert_public_key
 from sfa.trust.gid import create_uuid
 
-from sfa.storage.alchemy import dbsession
+# using global alchemy.session() here is fine 
+# as importer is on standalone one-shot process
+from sfa.storage.alchemy import global_dbsession
 from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, \
     RegUser, RegKey
 
@@ -43,7 +45,7 @@ class IotlabImporter:
         self.logger = loc_logger
         self.logger.setLevelDebug()
         #retrieve all existing SFA objects
-        self.all_records = dbsession.query(RegRecord).all()
+        self.all_records = global_dbsession.query(RegRecord).all()
 
         # initialize record.stale to True by default,
         # then mark stale=False on the ones that are in use
@@ -213,8 +215,8 @@ class IotlabImporter:
                 try:
 
                     node_record.just_created()
-                    dbsession.add(node_record)
-                    dbsession.commit()
+                    global_dbsession.add(node_record)
+                    global_dbsession.commit()
                     self.logger.info("IotlabImporter: imported node: %s"
                                      % node_record)
                     self.update_just_added_records_dict(node_record)
@@ -259,8 +261,8 @@ class IotlabImporter:
                                      pointer='-1',
                                      authority=get_authority(site_hrn))
                     site_record.just_created()
-                    dbsession.add(site_record)
-                    dbsession.commit()
+                    global_dbsession.add(site_record)
+                    global_dbsession.commit()
                     self.logger.info("IotlabImporter: imported authority \
                                     (site) %s" % site_record)
                     self.update_just_added_records_dict(site_record)
@@ -404,8 +406,8 @@ class IotlabImporter:
 
                         try:
                             user_record.just_created()
-                            dbsession.add (user_record)
-                            dbsession.commit()
+                            global_dbsession.add (user_record)
+                            global_dbsession.commit()
                             self.logger.info("IotlabImporter: imported person \
                                             %s" % (user_record))
                             self.update_just_added_records_dict(user_record)
@@ -440,7 +442,7 @@ class IotlabImporter:
                     user_record.email = person['email']
 
             try:
-                dbsession.commit()
+                global_dbsession.commit()
                 user_record.stale = False
             except SQLAlchemyError:
                 self.logger.log_exc("IotlabImporter: \
@@ -478,8 +480,8 @@ class IotlabImporter:
                                     authority=get_authority(slice_hrn))
             try:
                 slice_record.just_created()
-                dbsession.add(slice_record)
-                dbsession.commit()
+                global_dbsession.add(slice_record)
+                global_dbsession.commit()
 
 
                 self.update_just_added_records_dict(slice_record)
@@ -497,7 +499,7 @@ class IotlabImporter:
 
         slice_record.reg_researchers = [user_record]
         try:
-            dbsession.commit()
+            global_dbsession.commit()
             slice_record.stale = False
         except SQLAlchemyError:
             self.logger.log_exc("IotlabImporter: failed to update slice")
@@ -551,8 +553,8 @@ class IotlabImporter:
                                  % (record))
 
                 try:
-                    dbsession.delete(record)
-                    dbsession.commit()
+                    global_dbsession.delete(record)
+                    global_dbsession.commit()
                 except SQLAlchemyError:
                     self.logger.log_exc("IotlabImporter: failed to delete \
                         stale record %s" % (record))
index 78bccc4..425be77 100644 (file)
@@ -7,7 +7,9 @@ from sfa.util.xrn import Xrn, get_leaf, get_authority, hrn_to_urn
 from sfa.trust.gid import create_uuid    
 from sfa.trust.certificate import convert_public_key, Keypair
 
-from sfa.storage.alchemy import dbsession
+# using global alchemy.session() here is fine 
+# as importer is on standalone one-shot process
+from sfa.storage.alchemy import global_dbsession
 from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, RegUser, RegKey
 
 from sfa.nitos.nitosshell import NitosShell    
@@ -83,7 +85,7 @@ class NitosImporter:
         shell = NitosShell (config)
 
         ######## retrieve all existing SFA objects
-        all_records = dbsession.query(RegRecord).all()
+        all_records = global_dbsession.query(RegRecord).all()
 
         # create hash by (type,hrn) 
         # we essentially use this to know if a given record is already known to SFA 
@@ -146,8 +148,8 @@ class NitosImporter:
                                                pointer=0,
                                                authority=get_authority(site_hrn))
                     site_record.just_created()
-                    dbsession.add(site_record)
-                    dbsession.commit()
+                    global_dbsession.add(site_record)
+                    global_dbsession.commit()
                     self.logger.info("NitosImporter: imported authority (site) : %s" % site_record) 
                     self.remember_record (site_record)
                 except:
@@ -177,8 +179,8 @@ class NitosImporter:
                                                pointer =node['node_id'],
                                                authority=get_authority(node_hrn))
                         node_record.just_created()
-                        dbsession.add(node_record)
-                        dbsession.commit()
+                        global_dbsession.add(node_record)
+                        global_dbsession.commit()
                         self.logger.info("NitosImporter: imported node: %s" % node_record)  
                         self.remember_record (node_record)
                     except:
@@ -236,8 +238,8 @@ class NitosImporter:
                         else:
                             self.logger.warning("No key found for user %s"%user_record)
                         user_record.just_created()
-                        dbsession.add (user_record)
-                        dbsession.commit()
+                        global_dbsession.add (user_record)
+                        global_dbsession.commit()
                         self.logger.info("NitosImporter: imported user: %s" % user_record)
                         self.remember_record ( user_record )
                     else:
@@ -270,7 +272,7 @@ class NitosImporter:
                             user_record.just_updated()
                             self.logger.info("NitosImporter: updated user: %s" % user_record)
                     user_record.email = user['email']
-                    dbsession.commit()
+                    global_dbsession.commit()
                     user_record.stale=False
                 except:
                     self.logger.log_exc("NitosImporter: failed to import user %s %s"%(user['user_id'],user['email']))
@@ -289,8 +291,8 @@ class NitosImporter:
                                                  pointer=slice['slice_id'],
                                                  authority=get_authority(slice_hrn))
                         slice_record.just_created()
-                        dbsession.add(slice_record)
-                        dbsession.commit()
+                        global_dbsession.add(slice_record)
+                        global_dbsession.commit()
                         self.logger.info("NitosImporter: imported slice: %s" % slice_record)  
                         self.remember_record ( slice_record )
                     except:
@@ -302,7 +304,7 @@ class NitosImporter:
                 # record current users affiliated with the slice
                 slice_record.reg_researchers = \
                       [ self.locate_by_type_pointer ('user',int(user_id)) for user_id in slice['user_ids'] ]
-                dbsession.commit()
+                global_dbsession.commit()
                 slice_record.stale=False
 
 
@@ -322,7 +324,7 @@ class NitosImporter:
                 self.logger.warning("stale not found with %s"%record)
             if stale:
                 self.logger.info("NitosImporter: deleting stale record: %s" % record)
-                dbsession.delete(record)
-                dbsession.commit()
+                global_dbsession.delete(record)
+                global_dbsession.commit()
 
 
index 0cf729c..c8233bd 100644 (file)
@@ -4,7 +4,9 @@ from sfa.util.config import Config
 from sfa.util.xrn import Xrn, get_leaf, get_authority, hrn_to_urn
 from sfa.trust.gid import create_uuid    
 from sfa.trust.certificate import convert_public_key, Keypair
-from sfa.storage.alchemy import dbsession
+# using global alchemy.session() here is fine 
+# as importer is on standalone one-shot process
+from sfa.storage.alchemy import global_dbsession
 from sfa.storage.model import RegRecord, RegAuthority, RegUser, RegSlice, RegNode
 from sfa.openstack.osxrn import OSXrn
 from sfa.openstack.shell import Shell    
@@ -79,8 +81,8 @@ class OpenstackImporter:
                 user_record.hrn=hrn
                 user_record.gid=user_gid
                 user_record.authority=get_authority(hrn)
-                dbsession.add(user_record)
-                dbsession.commit()
+                global_dbsession.add(user_record)
+                global_dbsession.commit()
                 self.logger.info("OpenstackImporter: imported person %s" % user_record)   
 
         return users_dict, user_keys
@@ -112,8 +114,8 @@ class OpenstackImporter:
                 record.hrn=hrn
                 record.gid=gid
                 record.authority=get_authority(hrn)
-                dbsession.add(record)
-                dbsession.commit()
+                global_dbsession.add(record)
+                global_dbsession.commit()
                 self.logger.info("OpenstackImporter: imported authority: %s" % record)
 
             else:
@@ -125,8 +127,8 @@ class OpenstackImporter:
                 record.hrn=hrn
                 record.gid=gid
                 record.authority=get_authority(hrn)
-                dbsession.add(record)
-                dbsession.commit()
+                global_dbsession.add(record)
+                global_dbsession.commit()
                 self.logger.info("OpenstackImporter: imported slice: %s" % record) 
 
         return tenants_dict
@@ -139,7 +141,7 @@ class OpenstackImporter:
         existing_records = {}
         existing_hrns = []
         key_ids = []
-        for record in dbsession.query(RegRecord):
+        for record in global_dbsession.query(RegRecord):
             existing_records[ (record.hrn, record.type,) ] = record
             existing_hrns.append(record.hrn) 
             
@@ -168,8 +170,8 @@ class OpenstackImporter:
         
             record_object = existing_records[ (record_hrn, type) ]
             self.logger.info("OpenstackImporter: removing %s " % record)
-            dbsession.delete(record_object)
-            dbsession.commit()
+            global_dbsession.delete(record_object)
+            global_dbsession.commit()
                                    
         # save pub keys
         self.logger.info('OpenstackImporter: saving current pub keys')
index 41325a8..8a22bee 100644 (file)
@@ -24,7 +24,9 @@ from sfa.util.xrn import Xrn, get_leaf, get_authority, hrn_to_urn
 from sfa.trust.gid import create_uuid    
 from sfa.trust.certificate import convert_public_key, Keypair
 
-from sfa.storage.alchemy import dbsession
+# using global alchemy.session() here is fine 
+# as importer is on standalone one-shot process
+from sfa.storage.alchemy import global_dbsession
 from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, RegUser, RegKey
 
 from sfa.planetlab.plshell import PlShell    
@@ -115,8 +117,8 @@ class PlImporter:
                                            pointer=site['site_id'],
                                            authority=get_authority(site_hrn))
                 auth_record.just_created()
-                dbsession.add(auth_record)
-                dbsession.commit()
+                global_dbsession.add(auth_record)
+                global_dbsession.commit()
                 self.logger.info("PlImporter: Imported authority (vini site) %s"%auth_record)
                 self.remember_record ( site_record )
 
@@ -127,7 +129,7 @@ class PlImporter:
         shell = PlShell (config)
 
         ######## retrieve all existing SFA objects
-        all_records = dbsession.query(RegRecord).all()
+        all_records = global_dbsession.query(RegRecord).all()
 
         # create hash by (type,hrn) 
         # we essentially use this to know if a given record is already known to SFA 
@@ -209,8 +211,8 @@ class PlImporter:
                                                pointer=site['site_id'],
                                                authority=get_authority(site_hrn))
                     site_record.just_created()
-                    dbsession.add(site_record)
-                    dbsession.commit()
+                    global_dbsession.add(site_record)
+                    global_dbsession.commit()
                     self.logger.info("PlImporter: imported authority (site) : %s" % site_record) 
                     self.remember_record (site_record)
                 except:
@@ -245,8 +247,8 @@ class PlImporter:
                                                pointer =node['node_id'],
                                                authority=get_authority(node_hrn))
                         node_record.just_created()
-                        dbsession.add(node_record)
-                        dbsession.commit()
+                        global_dbsession.add(node_record)
+                        global_dbsession.commit()
                         self.logger.info("PlImporter: imported node: %s" % node_record)  
                         self.remember_record (node_record)
                     except:
@@ -310,8 +312,8 @@ class PlImporter:
                         else:
                             self.logger.warning("No key found for user %s"%user_record)
                         user_record.just_created()
-                        dbsession.add (user_record)
-                        dbsession.commit()
+                        global_dbsession.add (user_record)
+                        global_dbsession.commit()
                         self.logger.info("PlImporter: imported person: %s" % user_record)
                         self.remember_record ( user_record )
                     else:
@@ -359,7 +361,7 @@ class PlImporter:
                             user_record.just_updated()
                             self.logger.info("PlImporter: updated person: %s" % user_record)
                     user_record.email = person['email']
-                    dbsession.commit()
+                    global_dbsession.commit()
                     user_record.stale=False
                     # accumulate PIs - PLCAPI has a limitation that when someone has PI role
                     # this is valid for all sites she is in..
@@ -377,7 +379,7 @@ class PlImporter:
             # could be performed twice with the same person...
             # so hopefully we do not need to eliminate duplicates explicitly here anymore
             site_record.reg_pis = list(set(site_pis))
-            dbsession.commit()
+            global_dbsession.commit()
 
             # import slices
             for slice_id in site['slice_ids']:
@@ -396,8 +398,8 @@ class PlImporter:
                                                  pointer=slice['slice_id'],
                                                  authority=get_authority(slice_hrn))
                         slice_record.just_created()
-                        dbsession.add(slice_record)
-                        dbsession.commit()
+                        global_dbsession.add(slice_record)
+                        global_dbsession.commit()
                         self.logger.info("PlImporter: imported slice: %s" % slice_record)  
                         self.remember_record ( slice_record )
                     except:
@@ -410,7 +412,7 @@ class PlImporter:
                 # record current users affiliated with the slice
                 slice_record.reg_researchers = \
                     [ self.locate_by_type_pointer ('user',user_id) for user_id in slice['person_ids'] ]
-                dbsession.commit()
+                global_dbsession.commit()
                 slice_record.stale=False
 
         ### remove stale records
@@ -432,5 +434,5 @@ class PlImporter:
                 self.logger.warning("stale not found with %s"%record)
             if stale:
                 self.logger.info("PlImporter: deleting stale record: %s" % record)
-                dbsession.delete(record)
-                dbsession.commit()
+                global_dbsession.delete(record)
+                global_dbsession.commit()
index fa25a83..0e8b71d 100644 (file)
@@ -5,9 +5,10 @@
 
 class Driver:
     
-    def __init__ (self, config): 
+    def __init__ (self, api): 
+        self.api = api
         # this is the hrn attached to the running server
-        self.hrn = config.SFA_INTERFACE_HRN
+        self.hrn = api.config.SFA_INTERFACE_HRN
 
     ########################################
     ########## registry oriented
index c24c1f5..f55e69b 100644 (file)
@@ -19,7 +19,6 @@ from sfa.trust.gid import create_uuid
 
 from sfa.storage.model import make_record, RegRecord, RegAuthority, RegUser, RegSlice, RegKey, \
     augment_with_sfa_builtins
-from sfa.storage.alchemy import dbsession
 ### the types that we need to exclude from sqlobjects before being able to dump
 # them on the xmlrpc wire
 from sqlalchemy.orm.collections import InstrumentedList
@@ -41,6 +40,7 @@ class RegistryManager:
                              'peers':peers})
     
     def GetCredential(self, api, xrn, type, caller_xrn=None):
+        dbsession = api.dbsession()
         # convert xrn to hrn     
         if type:
             hrn = urn_to_hrn(xrn)[0]
@@ -110,6 +110,7 @@ class RegistryManager:
     # the default for full, which means 'dig into the testbed as well', should be false
     def Resolve(self, api, xrns, type=None, details=False):
     
+        dbsession = api.dbsession()
         if not isinstance(xrns, types.ListType):
             # try to infer type if not set and we get a single input
             if not type:
@@ -196,6 +197,7 @@ class RegistryManager:
         return records
     
     def List (self, api, xrn, origin_hrn=None, options={}):
+        dbsession=api.dbsession()
         # load all know registry names into a prefix tree and attempt to find
         # the longest matching prefix
         hrn, type = urn_to_hrn(xrn)
@@ -263,18 +265,18 @@ class RegistryManager:
     # subject_record describes the subject of the relationships
     # ref_record contains the target values for the various relationships we need to manage
     # (to begin with, this is just the slice x person (researcher) and authority x person (pi) relationships)
-    def update_driver_relations (self, subject_obj, ref_obj):
+    def update_driver_relations (self, subject_obj, ref_obj, dbsession):
         type=subject_obj.type
         #for (k,v) in subject_obj.__dict__.items(): print k,'=',v
         if type=='slice' and hasattr(ref_obj,'researcher'):
-            self.update_driver_relation(subject_obj, ref_obj.researcher, 'user', 'researcher')
+            self.update_driver_relation(subject_obj, ref_obj.researcher, 'user', 'researcher', dbsession)
         elif type=='authority' and hasattr(ref_obj,'pi'):
-            self.update_driver_relation(subject_obj,ref_obj.pi, 'user', 'pi')
+            self.update_driver_relation(subject_obj,ref_obj.pi, 'user', 'pi', dbsession)
         
     # field_key is the name of one field in the record, typically 'researcher' for a 'slice' record
     # hrns is the list of hrns that should be linked to the subject from now on
     # target_type would be e.g. 'user' in the 'slice' x 'researcher' example
-    def update_driver_relation (self, record_obj, hrns, target_type, relation_name):
+    def update_driver_relation (self, record_obj, hrns, target_type, relation_name, dbsession):
         # locate the linked objects in our db
         subject_type=record_obj.type
         subject_id=record_obj.pointer
@@ -286,6 +288,7 @@ class RegistryManager:
 
     def Register(self, api, record_dict):
     
+        dbsession=api.dbsession()
         hrn, type = record_dict['hrn'], record_dict['type']
         urn = hrn_to_urn(hrn,type)
         # validate the type
@@ -331,11 +334,11 @@ class RegistryManager:
 
             # locate objects for relationships
             pi_hrns = getattr(record,'pi',None)
-            if pi_hrns is not None: record.update_pis (pi_hrns)
+            if pi_hrns is not None: record.update_pis (pi_hrns, dbsession)
 
         elif isinstance (record, RegSlice):
             researcher_hrns = getattr(record,'researcher',None)
-            if researcher_hrns is not None: record.update_researchers (researcher_hrns)
+            if researcher_hrns is not None: record.update_researchers (researcher_hrns, dbsession)
         
         elif isinstance (record, RegUser):
             # create RegKey objects for incoming keys
@@ -351,11 +354,12 @@ class RegistryManager:
         dbsession.commit()
     
         # update membership for researchers, pis, owners, operators
-        self.update_driver_relations (record, record)
+        self.update_driver_relations (record, record, dbsession)
         
         return record.get_gid_object().save_to_string(save_parents=True)
     
     def Update(self, api, record_dict):
+        dbsession=api.dbsession()
         assert ('type' in record_dict)
         new_record=make_record(dict=record_dict)
         (type,hrn) = (new_record.type, new_record.hrn)
@@ -394,11 +398,11 @@ class RegistryManager:
         # update native relations
         if isinstance (record, RegSlice):
             researcher_hrns = getattr(new_record,'researcher',None)
-            if researcher_hrns is not None: record.update_researchers (researcher_hrns)
+            if researcher_hrns is not None: record.update_researchers (researcher_hrns, dbsession)
 
         elif isinstance (record, RegAuthority):
             pi_hrns = getattr(new_record,'pi',None)
-            if pi_hrns is not None: record.update_pis (pi_hrns)
+            if pi_hrns is not None: record.update_pis (pi_hrns, dbsession)
         
         # update the PLC information that was specified with the record
         # xxx oddly enough, without this useless statement, 
@@ -417,12 +421,13 @@ class RegistryManager:
 
         dbsession.commit()
         # update membership for researchers, pis, owners, operators
-        self.update_driver_relations (record, new_record)
+        self.update_driver_relations (record, new_record, dbsession)
         
         return 1 
     
     # expecting an Xrn instance
     def Remove(self, api, xrn, origin_hrn=None):
+        dbsession=api.dbsession()
         hrn=xrn.get_hrn()
         type=xrn.get_type()
         request=dbsession.query(RegRecord).filter_by(hrn=hrn)
@@ -465,6 +470,7 @@ class RegistryManager:
 
     # This is a PLC-specific thing, won't work with other platforms
     def get_key_from_incoming_ip (self, api):
+        dbsession=api.dbsession()
         # verify that the callers's ip address exist in the db and is an interface
         # for a node in the db
         (ip, port) = api.remote_addr
index 15a8cd8..069ef69 100644 (file)
@@ -7,6 +7,7 @@ from sfa.util.xrn import Xrn, urn_to_hrn, hrn_to_urn, get_leaf, get_authority
 from sfa.util.cache import Cache
 from sfa.rspecs.rspec import RSpec
 from sfa.storage.model import SliverAllocation
+# xxx 1-dbsession-per-request
 from sfa.storage.alchemy import dbsession
 
 class V2ToV3Adapter:
index aa53def..f3a9612 100644 (file)
@@ -1,4 +1,3 @@
-
 from sfa.util.faults import RecordNotFound, ConnectionKeyGIDMismatch
 from sfa.util.xrn import urn_to_hrn
 from sfa.util.method import Method
index fdd70e5..4649759 100644 (file)
@@ -21,7 +21,6 @@ from sfa.rspecs.version_manager import VersionManager
 from sfa.planetlab.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename, slicename_to_hrn, xrn_to_ext_slicename, top_auth
 from sfa.planetlab.vlink import get_tc_rate
 from sfa.planetlab.topology import Topology
-from sfa.storage.alchemy import dbsession
 from sfa.storage.model import SliverAllocation
 
 
index 79c61f0..a2ab1c2 100644 (file)
@@ -10,7 +10,6 @@ from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf
 from sfa.util.cache import Cache
 
 # one would think the driver should not need to mess with the SFA db, but..
-from sfa.storage.alchemy import dbsession
 from sfa.storage.model import RegRecord, SliverAllocation
 from sfa.trust.credential import Credential
 
@@ -45,8 +44,9 @@ class PlDriver (Driver):
     # the cache instance is a class member so it survives across incoming requests
     cache = None
 
-    def __init__ (self, config):
-        Driver.__init__ (self, config)
+    def __init__ (self, api):
+        Driver.__init__ (self, api)
+        config=api.config
         self.shell = PlShell (config)
         self.cache=None
         if config.SFA_AGGREGATE_CACHING:
@@ -501,7 +501,7 @@ class PlDriver (Driver):
         
         # get the registry records
         person_list, persons = [], {}
-        person_list = dbsession.query (RegRecord).filter(RegRecord.pointer.in_(person_ids))
+        person_list = self.api.dbsession().query (RegRecord).filter(RegRecord.pointer.in_(person_ids))
         # create a hrns keyed on the sfa record's pointer.
         # Its possible for multiple records to have the same pointer so
         # the dict's value will be a list of hrns.
@@ -680,7 +680,8 @@ class PlDriver (Driver):
         slices.handle_peer(None, None, persons, peer)
         # update sliver allocation states and set them to geni_provisioned
         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
-        SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned')
+        dbsession=self.api.dbsession()
+        SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned',dbsession)
         version_manager = VersionManager()
         rspec_version = version_manager.get_version(options['geni_rspec_version']) 
         return self.describe(urns, rspec_version, options=options)
@@ -719,7 +720,8 @@ class PlDriver (Driver):
                     self.shell.DeleteLeases(leases_ids)
      
                 # delete sliver allocation states
-                SliverAllocation.delete_allocations(sliver_ids)
+                dbsession=self.api.dbsession()
+                SliverAllocation.delete_allocations(sliver_ids,dbsession)
             finally:
                 if peer:
                     self.shell.BindObjectToPeer('slice', slice_id, peer, slice['peer_slice_id'])
index 79b7309..de9791f 100644 (file)
@@ -10,7 +10,6 @@ from sfa.planetlab.vlink import VLink
 from sfa.planetlab.topology import Topology
 from sfa.planetlab.plxrn import PlXrn, hrn_to_pl_slicename, xrn_to_hostname, xrn_to_ext_slicename, hrn_to_ext_loginbase, top_auth
 from sfa.storage.model import SliverAllocation
-from sfa.storage.alchemy import dbsession
 
 MAXINT =  2L**31-1
 
index a0fc7f4..5639b4b 100644 (file)
@@ -14,6 +14,7 @@ from sfa.util.version import version_core
 from sfa.server.xmlrpcapi import XmlrpcApi
 from sfa.client.return_value import ReturnValue
 
+from sfa.storage.alchemy import alchemy
 
 ####################
 class SfaApi (XmlrpcApi): 
@@ -69,6 +70,7 @@ class SfaApi (XmlrpcApi):
         
         # filled later on by generic/Generic
         self.manager=None
+        self._dbsession=None
 
     def server_proxy(self, interface, cred, timeout=30):
         """
@@ -89,7 +91,16 @@ class SfaApi (XmlrpcApi):
         server = interface.server_proxy(key_file, cert_file, timeout)
         return server
                
-        
+    def dbsession(self):
+        if self._dbsession is None:
+            self._dbsession=alchemy.session()
+        return self._dbsession
+
+    def close_dbsession(self):
+        if self._dbsession is None: return
+        alchemy.close_session(self._dbsession)
+        self._dbsession=None
+
     def getCredential(self, minimumExpiration=0):
         """
         Return a valid credential for this interface.
index 7bc434c..9332c5b 100644 (file)
@@ -133,9 +133,10 @@ class SecureXMLRpcRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
         self.send_header("Content-length", str(len(response)))
         self.end_headers()
         self.wfile.write(response)
-
-        # shut down the connection
         self.wfile.flush()
+        # close db connection
+        self.api.close_dbsession()
+        # shut down the connection
         self.connection.shutdown() # Modified here!
 
 ##
index e9f96dd..84c987f 100644 (file)
@@ -49,21 +49,34 @@ class Alchemy:
     def check (self):
         self.engine.execute ("select 1").scalar()
 
-    def session (self):
+    def global_session (self):
         if self._session is None:
             Session=sessionmaker ()
             self._session=Session(bind=self.engine)
+            logger.info('alchemy.global_session created session %s'%self._session)
         return self._session
 
-    def close_session (self):
+    def close_global_session (self):
         if self._session is None: return
+        logger.info('alchemy.close_global_session %s'%self._session)
         self._session.close()
         self._session=None
 
+    # create a dbsession to be managed separately
+    def session (self):
+        Session=sessionmaker()
+        session=Session (bind=self.engine)
+        logger.info('alchemy.session created session %s'%session)
+        return session
+
+    def close_session (self, session):
+        logger.info('alchemy.close_session closed session %s'%session)
+        session.close()
+
 ####################
 from sfa.util.config import Config
 
 alchemy=Alchemy (Config())
 engine=alchemy.engine
-dbsession=alchemy.session()
+global_dbsession=alchemy.global_session()
 
index b095042..1228290 100644 (file)
@@ -191,9 +191,7 @@ class RegAuthority (RegRecord):
     def __repr__ (self):
         return RegRecord.__repr__(self).replace("Record","Authority")
 
-    def update_pis (self, pi_hrns):
-        # don't ruin the import of that file in a client world
-        from sfa.storage.alchemy import dbsession
+    def update_pis (self, pi_hrns, dbsession):
         # strip that in case we have <researcher> words </researcher>
         pi_hrns = [ x.strip() for x in pi_hrns ]
         request = dbsession.query (RegUser).filter(RegUser.hrn.in_(pi_hrns))
@@ -221,9 +219,7 @@ class RegSlice (RegRecord):
     def __repr__ (self):
         return RegRecord.__repr__(self).replace("Record","Slice")
 
-    def update_researchers (self, researcher_hrns):
-        # don't ruin the import of that file in a client world
-        from sfa.storage.alchemy import dbsession
+    def update_researchers (self, researcher_hrns, dbsession):
         # strip that in case we have <researcher> words </researcher>
         researcher_hrns = [ x.strip() for x in researcher_hrns ]
         request = dbsession.query (RegUser).filter(RegUser.hrn.in_(researcher_hrns))
@@ -233,8 +229,9 @@ class RegSlice (RegRecord):
 
     # when dealing with credentials, we need to retrieve the PIs attached to a slice
     def get_pis (self):
-        # don't ruin the import of that file in a client world
-        from sfa.storage.alchemy import dbsession
+        from sqlalchemy.orm import sessionmaker
+        Session=sessionmaker()
+        dbsession=Session.object_session(self)
         from sfa.util.xrn import get_authority
         authority_hrn = get_authority(self.hrn)
         auth_record = dbsession.query(RegAuthority).filter_by(hrn=authority_hrn).first()
@@ -344,8 +341,7 @@ class SliverAllocation(Base,AlchemyObj):
         return state
 
     @staticmethod    
-    def set_allocations(sliver_ids, state):
-        from sfa.storage.alchemy import dbsession
+    def set_allocations(sliver_ids, state, dbsession):
         if not isinstance(sliver_ids, list):
             sliver_ids = [sliver_ids]
         sliver_state_updated = {}
@@ -366,8 +362,7 @@ class SliverAllocation(Base,AlchemyObj):
         dbsession.commit()
 
     @staticmethod
-    def delete_allocations(sliver_ids):
-        from sfa.storage.alchemy import dbsession
+    def delete_allocations(sliver_ids, dbsession):
         if not isinstance(sliver_ids, list):
             sliver_ids = [sliver_ids]
         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
@@ -377,8 +372,10 @@ class SliverAllocation(Base,AlchemyObj):
         dbsession.commit()
     
     def sync(self):
-        from sfa.storage.alchemy import dbsession
         
+        from sqlalchemy.orm import sessionmaker
+        Session=sessionmaker()
+        dbsession=Session.object_session(self)
         constraints = [SliverAllocation.sliver_id==self.sliver_id]
         results = dbsession.query(SliverAllocation).filter(and_(*constraints))
         records = []