本文整理汇总了Python中pulp.server.managers.factory.initialize函数的典型用法代码示例。如果您正苦于以下问题:Python initialize函数的具体用法?Python initialize怎么用?Python initialize使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了initialize函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setUp
def setUp(self):
super(Migration0004Tests, self).setUp()
# Special way to import modules that start with a number
self.migration = _import_all_the_way(
'pulp_rpm.plugins.migrations.0004_pkg_group_category_repoid')
factory.initialize()
api.initialize(False)
types_db.update_database([TYPE_DEF_GROUP, TYPE_DEF_CATEGORY])
# Create the repositories necessary for the tests
self.source_repo_id = 'source-repo' # where units were copied from with the bad code
self.dest_repo_id = 'dest-repo' # where bad units were copied to
source_repo = model.Repository(repo_id=self.source_repo_id)
source_repo.save()
dest_repo = model.Repository(repo_id=self.dest_repo_id)
dest_repo.save()
source_importer = model.Importer(self.source_repo_id, 'yum_importer', {})
source_importer.save()
dest_importer = model.Importer(self.dest_repo_id, 'yum_importer', {})
dest_importer.save()
开发者ID:FlorianHeigl,项目名称:pulp_rpm,代码行数:25,代码来源:test_0004_migrate.py
示例2: test_syntactic_sugar_methods
def test_syntactic_sugar_methods(self):
"""
Tests the syntactic sugar methods for retrieving specific managers.
"""
# Setup
factory.initialize()
# Test
self.assertTrue(isinstance(factory.authentication_manager(), AuthenticationManager))
self.assertTrue(isinstance(factory.cert_generation_manager(), CertGenerationManager))
self.assertTrue(isinstance(factory.certificate_manager(), CertificateManager))
self.assertTrue(isinstance(factory.password_manager(), PasswordManager))
self.assertTrue(isinstance(factory.permission_manager(), PermissionManager))
self.assertTrue(isinstance(factory.permission_query_manager(), PermissionQueryManager))
self.assertTrue(isinstance(factory.role_manager(), RoleManager))
self.assertTrue(isinstance(factory.role_query_manager(), RoleQueryManager))
self.assertTrue(isinstance(factory.user_manager(), UserManager))
self.assertTrue(isinstance(factory.user_query_manager(), UserQueryManager))
self.assertTrue(isinstance(factory.repo_manager(), RepoManager))
self.assertTrue(isinstance(factory.repo_unit_association_manager(),
RepoUnitAssociationManager))
self.assertTrue(isinstance(factory.repo_publish_manager(), RepoPublishManager))
self.assertTrue(isinstance(factory.repo_query_manager(), RepoQueryManager))
self.assertTrue(isinstance(factory.repo_sync_manager(), RepoSyncManager))
self.assertTrue(isinstance(factory.content_manager(), ContentManager))
self.assertTrue(isinstance(factory.content_query_manager(), ContentQueryManager))
self.assertTrue(isinstance(factory.content_upload_manager(), ContentUploadManager))
self.assertTrue(isinstance(factory.consumer_manager(), ConsumerManager))
self.assertTrue(isinstance(factory.topic_publish_manager(), TopicPublishManager))
开发者ID:credativ,项目名称:pulp,代码行数:29,代码来源:test_factory.py
示例3: setUp
def setUp(self):
super(RepoConfigConduitTests, self).setUp()
mock_plugins.install()
manager_factory.initialize()
self.repo_manager = manager_factory.repo_manager()
self.distributor_manager = manager_factory.repo_distributor_manager()
# Populate the database with a repo with units
self.repo_manager.create_repo('repo-1')
self.distributor_manager.add_distributor('repo-1', 'mock-distributor', {"relative_url": "/a/bc/d"},
True, distributor_id='dist-1')
self.distributor_manager.add_distributor('repo-1', 'mock-distributor', {"relative_url": "/a/c"},
True, distributor_id='dist-2')
self.repo_manager.create_repo('repo-2')
self.distributor_manager.add_distributor('repo-2', 'mock-distributor', {"relative_url": "/a/bc/e"},
True, distributor_id='dist-3')
self.repo_manager.create_repo('repo-3')
self.distributor_manager.add_distributor('repo-3', 'mock-distributor', {},
True, distributor_id='dist-4')
self.repo_manager.create_repo('repo-4')
self.distributor_manager.add_distributor('repo-4', 'mock-distributor', {"relative_url": "/repo-5"},
True, distributor_id='dist-5')
self.conduit = RepoConfigConduit('rpm')
开发者ID:fdammeke,项目名称:pulp,代码行数:25,代码来源:test_repo_config_conduit.py
示例4: setUp
def setUp(self):
manager_factory.initialize()
self.repo_id = 'add-repo'
self.importer_id = 'add-importer'
self.mixin = mixins.AddUnitMixin(self.repo_id, self.importer_id)
开发者ID:alanoe,项目名称:pulp,代码行数:7,代码来源:test_mixins.py
示例5: _initialize_pulp
def _initialize_pulp():
# XXX ORDERING COUNTS
# This initialization order is very sensitive, and each touches a number of
# sub-systems in pulp. If you get this wrong, you will have pulp tripping
# over itself on start up. If you do not know where to add something, ASK!
global _IS_INITIALIZED, STACK_TRACER
if _IS_INITIALIZED:
return
_IS_INITIALIZED = True
# check our db version and other support
check_version()
# pulp generic content initialization
manager_factory.initialize()
plugin_api.initialize()
# new async dispatch initialization
dispatch_factory.initialize()
# ensure necessary infrastructure
ensure_builtin_roles()
ensure_admin()
# agent services
AgentServices.start()
# setup debugging, if configured
if config.config.getboolean('server', 'debugging_mode'):
STACK_TRACER = StacktraceDumper()
STACK_TRACER.start()
开发者ID:stpierre,项目名称:pulp,代码行数:31,代码来源:application.py
示例6: setUp
def setUp(self):
super(RepoConfigConduitTests, self).setUp()
mock_plugins.install()
manager_factory.initialize()
with mock.patch('pulp.server.controllers.distributor.model.Repository.objects'):
# Populate the database with a repo with units
dist_controller.add_distributor(
'repo-1', 'mock-distributor', {"relative_url": "/a/bc/d"}, True,
distributor_id='dist-1')
dist_controller.add_distributor(
'repo-1', 'mock-distributor', {"relative_url": "/a/c"}, True,
distributor_id='dist-2')
dist_controller.add_distributor(
'repo-2', 'mock-distributor', {"relative_url": "/a/bc/e"}, True,
distributor_id='dist-3')
dist_controller.add_distributor(
'repo-3', 'mock-distributor', {}, True, distributor_id='dist-4')
dist_controller.add_distributor(
'repo-4', 'mock-distributor', {"relative_url": "repo-5"}, True,
distributor_id='dist-5')
dist_controller.add_distributor(
'repo-5', 'mock-distributor', {"relative_url": "a/bcd/e"}, True,
distributor_id='dist-1')
dist_controller.add_distributor(
'repo-6', 'mock-distributor', {"relative_url": "a/bcde/f/"}, True,
distributor_id='dist-1')
self.conduit = RepoConfigConduit('rpm')
开发者ID:alanoe,项目名称:pulp,代码行数:29,代码来源:test_repo_config.py
示例7: main
def main():
"""
Populate ldap server with some test data
"""
print("See populate.log for descriptive output.")
factory.initialize()
connection.initialize()
ldapserv = LDAPConnection(admin='cn=Directory Manager',
password='password',
server='ldap://ldap-server-hostname:389',
tls=False)
ldapserv.connect()
for id in range(1,100):
userid = 'pulpuser%s' % id
lattr = LDAPAttribute()
lattr.setObjectclass('Person')
lattr.setObjectclass('organizationalPerson')
lattr.setObjectclass('inetorgperson')
lattr.setCN(userid)
lattr.setSN(userid)
lattr.setOU('Candlepin')
lattr.setuserId(userid)
lattr.setuserPassword('redhat')
lattr.setDescription('pulp ldap test user')
lattr.setMail('%[email protected]' % userid)
lattr.setDN("uid=%s,dc=rdu,dc=redhat,dc=com" % userid)
attr, dn = lattr.buildBody()
ldapserv.add_users(dn, attrs=attr)
ldapserv.lookup_user("dc=rdu,dc=redhat,dc=com", "pulpuser1")
ldapserv.authenticate_user("dc=rdu,dc=redhat,dc=com", "pulpuser1", "redhat")
ldapserv.disconnect()
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:32,代码来源:populate_users.py
示例8: setUp
def setUp(self):
super(RepoConfigConduitTests, self).setUp()
mock_plugins.install()
manager_factory.initialize()
self.distributor_manager = manager_factory.repo_distributor_manager()
with mock.patch("pulp.server.managers.repo.distributor.model.Repository.objects"):
# Populate the database with a repo with units
self.distributor_manager.add_distributor(
"repo-1", "mock-distributor", {"relative_url": "/a/bc/d"}, True, distributor_id="dist-1"
)
self.distributor_manager.add_distributor(
"repo-1", "mock-distributor", {"relative_url": "/a/c"}, True, distributor_id="dist-2"
)
self.distributor_manager.add_distributor(
"repo-2", "mock-distributor", {"relative_url": "/a/bc/e"}, True, distributor_id="dist-3"
)
self.distributor_manager.add_distributor("repo-3", "mock-distributor", {}, True, distributor_id="dist-4")
self.distributor_manager.add_distributor(
"repo-4", "mock-distributor", {"relative_url": "repo-5"}, True, distributor_id="dist-5"
)
self.distributor_manager.add_distributor(
"repo-5", "mock-distributor", {"relative_url": "a/bcd/e"}, True, distributor_id="dist-1"
)
self.distributor_manager.add_distributor(
"repo-6", "mock-distributor", {"relative_url": "a/bcde/f/"}, True, distributor_id="dist-1"
)
self.conduit = RepoConfigConduit("rpm")
开发者ID:nbetm,项目名称:pulp,代码行数:30,代码来源:test_repo_config.py
示例9: _auto_manage_db
def _auto_manage_db(options):
"""
Find and apply all available database migrations, and install or update all available content
types.
:param options: The command line parameters from the user.
"""
message = _('Loading content types.')
logger.info(message)
load_content_types()
message = _('Content types loaded.')
logger.info(message)
message = _('Ensuring the admin role and user are in place.')
logger.info(message)
# Due to the silliness of the factory, we have to initialize it because the UserManager and
# RoleManager are going to try to use it.
factory.initialize()
role_manager = RoleManager()
role_manager.ensure_super_user_role()
user_manager = UserManager()
user_manager.ensure_admin()
message = _('Admin role and user are in place.')
logger.info(message)
message = _('Beginning database migrations.')
logger.info(message)
migrate_database(options)
message = _('Database migrations complete.')
logger.info(message)
return os.EX_OK
开发者ID:CUXIDUMDUM,项目名称:pulp,代码行数:32,代码来源:manage.py
示例10: setUp
def setUp(self):
super(Migration0004Tests, self).setUp()
# Special way to import modules that start with a number
self.migration = _import_all_the_way(
'pulp_rpm.plugins.migrations.0004_pkg_group_category_repoid')
factory.initialize()
types_db.update_database([TYPE_DEF_GROUP, TYPE_DEF_CATEGORY])
# Create the repositories necessary for the tests
self.source_repo_id = 'source-repo' # where units were copied from with the bad code
self.dest_repo_id = 'dest-repo' # where bad units were copied to
source_repo = Repo(self.source_repo_id, '')
Repo.get_collection().insert(source_repo, safe=True)
dest_repo = Repo(self.dest_repo_id, '')
Repo.get_collection().insert(dest_repo, safe=True)
source_importer = RepoImporter(self.source_repo_id, 'yum_importer', 'yum_importer', {})
RepoImporter.get_collection().insert(source_importer, safe=True)
dest_importer = RepoImporter(self.dest_repo_id, 'yum_importer', 'yum_importer', {})
RepoImporter.get_collection().insert(dest_importer, safe=True)
开发者ID:AndreaGiardini,项目名称:pulp_rpm,代码行数:25,代码来源:test_0004_migrate.py
示例11: _initialize_pulp
def _initialize_pulp():
# This initialization order is very sensitive, and each touches a number of
# sub-systems in pulp. If you get this wrong, you will have pulp tripping
# over itself on start up.
global _IS_INITIALIZED, STACK_TRACER
if _IS_INITIALIZED:
return
# Verify the database has been migrated to the correct version. This is
# very likely a reason the server will fail to start.
try:
migration_models.check_package_versions()
except Exception:
msg = 'The database has not been migrated to the current version. '
msg += 'Run pulp-manage-db and restart the application.'
raise InitializationException(msg), None, sys.exc_info()[2]
# Load plugins and resolve against types. This is also a likely candidate
# for causing the server to fail to start.
try:
plugin_api.initialize()
except Exception:
msg = 'One or more plugins failed to initialize. If a new type has '
msg += 'been added, run pulp-manage-db to load the type into the '
msg += 'database and restart the application.'
raise InitializationException(msg), None, sys.exc_info()[2]
# There's a significantly smaller chance the following calls will fail.
# The previous two are likely user errors, but the remainder represent
# something gone horribly wrong. As such, I'm not going to account for each
# and instead simply let the exception itself bubble up.
# Load the mappings of manager type to managers
manager_factory.initialize()
# Initialize the tasking subsystem
dispatch_factory.initialize()
# Ensure the minimal auth configuration
role_manager = manager_factory.role_manager()
role_manager.ensure_super_user_role()
user_manager = manager_factory.user_manager()
user_manager.ensure_admin()
# database document reaper
reaper.initialize()
# agent services
AgentServices.start()
# Setup debugging, if configured
if config.config.getboolean('server', 'debugging_mode'):
STACK_TRACER = StacktraceDumper()
STACK_TRACER.start()
# If we got this far, it was successful, so flip the flag
_IS_INITIALIZED = True
开发者ID:jessegonzalez,项目名称:pulp,代码行数:59,代码来源:application.py
示例12: test_fire
def test_fire(self, mock_get_collection, mock_getbool, mock_smtp, mock_publish):
# verify that the event system will trigger listeners of this type
mock_get_collection.return_value.find.return_value = [self.event_doc]
event = data.Event(data.TYPE_REPO_SYNC_FINISHED, 'stuff')
factory.initialize()
factory.event_fire_manager()._do_fire(event)
# verify that the mail event handler was called and processed something
self.assertTrue(mock_smtp.return_value.sendmail.call_count, 2)
开发者ID:preethit,项目名称:pulp-1,代码行数:9,代码来源:test_email_notifier.py
示例13: setUpClass
def setUpClass(cls):
if not os.path.exists('/tmp/pulp'):
os.makedirs('/tmp/pulp')
stop_logging()
config_filename = os.path.join(TEST_DATA_DIR, 'test-override-pulp.conf')
config.config.read(config_filename)
start_logging()
manager_factory.initialize()
constants.DISTRIBUTION_STORAGE_PATH = TEMP_DISTRO_STORAGE_DIR
开发者ID:ATIX-AG,项目名称:pulp_rpm,代码行数:9,代码来源:rpm_support_base.py
示例14: _auto_manage_db
def _auto_manage_db(options):
"""
Find and apply all available database migrations, and install or update all available content
types.
:param options: The command line parameters from the user.
"""
unperformed_migrations = False
message = _('Loading content types.')
_logger.info(message)
# Note that if dry_run is False, None is always returned
old_content_types = load_content_types(dry_run=options.dry_run)
if old_content_types:
for content_type in old_content_types:
message = _(
'Would have created or updated the following type definition: ' + content_type.id)
_logger.info(message)
message = _('Content types loaded.')
_logger.info(message)
message = _('Ensuring the admin role and user are in place.')
_logger.info(message)
# Due to the silliness of the factory, we have to initialize it because the UserManager and
# RoleManager are going to try to use it.
factory.initialize()
role_manager = RoleManager()
if options.dry_run:
if not role_manager.get_role(SUPER_USER_ROLE):
unperformed_migrations = True
message = _('Would have created the admin role.')
_logger.info(message)
else:
role_manager.ensure_super_user_role()
user_manager = managers.UserManager()
if options.dry_run:
if not user_manager.get_admins():
unperformed_migrations = True
message = _('Would have created the default admin user.')
_logger.info(message)
else:
user_manager.ensure_admin()
message = _('Admin role and user are in place.')
_logger.info(message)
message = _('Beginning database migrations.')
_logger.info(message)
migrate_database(options)
message = _('Database migrations complete.')
_logger.info(message)
if unperformed_migrations:
return 1
return os.EX_OK
开发者ID:alexxa,项目名称:pulp,代码行数:56,代码来源:manage.py
示例15: setUpClass
def setUpClass(cls):
if not os.path.exists('/tmp/pulp'):
os.makedirs('/tmp/pulp')
stop_logging()
config_filename = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data', 'test-override-pulp.conf')
config.config.read(config_filename)
start_logging()
name = config.config.get('database', 'name')
connection.initialize(name)
manager_factory.initialize()
开发者ID:ehelms,项目名称:pulp,代码行数:10,代码来源:rpm_support_base.py
示例16: setUp
def setUp(self, mock_repo_qs):
super(RepoPublishConduitTests, self).setUp()
mock_plugins.install()
manager_factory.initialize()
# Populate the database with a repo with units
dist_controller.add_distributor('repo-1', 'mock-distributor', {}, True,
distributor_id='dist-1')
self.conduit = RepoPublishConduit('repo-1', 'dist-1')
开发者ID:maxamillion,项目名称:pulp,代码行数:10,代码来源:test_repo_publish.py
示例17: setUp
def setUp(self):
manager_factory.initialize()
self.repo_id = 'add-repo'
self.importer_id = 'add-importer'
self.association_owner_type = 'importer'
self.association_owner_id = 'imp-id'
self.mixin = mixins.AddUnitMixin(self.repo_id, self.importer_id,
self.association_owner_type, self.association_owner_id)
开发者ID:preethit,项目名称:pulp-1,代码行数:10,代码来源:test_conduit_mixins.py
示例18: setUpClass
def setUpClass(cls):
if not os.path.exists('/tmp/pulp'):
os.makedirs('/tmp/pulp')
stop_logging()
config_filename = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'../../../pulp_rpm/test/unit/data', 'test-override-pulp.conf')
config.config.read(config_filename)
start_logging()
name = config.config.get('database', 'name')
connection.initialize(name)
manager_factory.initialize()
constants.DISTRIBUTION_STORAGE_PATH = TEMP_DISTRO_STORAGE_DIR
开发者ID:preethit,项目名称:pulp_rpm,代码行数:12,代码来源:rpm_support_base.py
示例19: setUp
def setUp(self):
super(ImporterScratchPadMixinTests, self).setUp()
mock_plugins.install()
self.importer_manager = RepoImporterManager()
self.repo_id = 'repo-1'
self.conduit = mixins.ImporterScratchPadMixin(self.repo_id, 'test-importer')
manager_factory.initialize()
self.importer_id = 'isp-importer'
self.mixin = mixins.ImporterScratchPadMixin(self.repo_id, self.importer_id)
开发者ID:jeremycline,项目名称:pulp,代码行数:13,代码来源:test_mixins.py
示例20: test_get_manager
def test_get_manager(self):
"""
Tests retrieving a manager instance for a valid manager mapping.
"""
# Setup
factory.initialize()
# Test
manager = factory.get_manager(factory.TYPE_REPO)
# Verify
self.assertTrue(manager is not None)
self.assertTrue(isinstance(manager, RepoManager))
开发者ID:credativ,项目名称:pulp,代码行数:14,代码来源:test_factory.py
注:本文中的pulp.server.managers.factory.initialize函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论