本文整理汇总了Python中pulp.server.managers.factory.principal_manager函数的典型用法代码示例。如果您正苦于以下问题:Python principal_manager函数的具体用法?Python principal_manager怎么用?Python principal_manager使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了principal_manager函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_associate_from_repo_no_criteria
def test_associate_from_repo_no_criteria(self):
# Setup
source_repo_id = 'source-repo'
dest_repo_id = 'dest-repo'
self.repo_manager.create_repo(source_repo_id)
self.importer_manager.set_importer(source_repo_id, 'mock-importer', {})
self.repo_manager.create_repo(dest_repo_id)
self.importer_manager.set_importer(dest_repo_id, 'mock-importer', {})
self.content_manager.add_content_unit('mock-type', 'unit-1', {'key-1': 'unit-1'})
self.content_manager.add_content_unit('mock-type', 'unit-2', {'key-1': 'unit-2'})
self.content_manager.add_content_unit('mock-type', 'unit-3', {'key-1': 'unit-3'})
self.manager.associate_unit_by_id(source_repo_id, 'mock-type', 'unit-1', OWNER_TYPE_USER,
'admin')
self.manager.associate_unit_by_id(source_repo_id, 'mock-type', 'unit-2', OWNER_TYPE_USER,
'admin')
self.manager.associate_unit_by_id(source_repo_id, 'mock-type', 'unit-3', OWNER_TYPE_USER,
'admin')
fake_user = User('associate-user', '')
manager_factory.principal_manager().set_principal(principal=fake_user)
mock_plugins.MOCK_IMPORTER.import_units.return_value = [Unit('mock-type', {'k': 'v'}, {},
'')]
# Test
results = self.manager.associate_from_repo(source_repo_id, dest_repo_id)
associated = results['units_successful']
# Verify
self.assertEqual(1, len(associated))
self.assertEqual(associated[0]['type_id'], 'mock-type')
self.assertEqual(associated[0]['unit_key'], {'k': 'v'})
self.assertEqual(1, mock_plugins.MOCK_IMPORTER.import_units.call_count)
args = mock_plugins.MOCK_IMPORTER.import_units.call_args[0]
kwargs = mock_plugins.MOCK_IMPORTER.import_units.call_args[1]
self.assertTrue(isinstance(args[0], Repository)) # repository transfer object
self.assertEqual(args[0].id, 'source-repo') # repo importing units from
self.assertEqual(args[1].id, 'dest-repo') # repo importing units into
self.assertEqual(None, kwargs['units']) # units to import
self.assertTrue(isinstance(args[3], PluginCallConfiguration)) # config
conduit = args[2]
self.assertTrue(isinstance(conduit, ImportUnitConduit))
self.assertEqual(conduit.association_owner_type, OWNER_TYPE_USER)
self.assertEqual(conduit.association_owner_id, fake_user.login)
# Clean Up
manager_factory.principal_manager().set_principal(principal=None)
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:54,代码来源:test_unit_association.py
示例2: setUp
def setUp(self):
super(PermissionManagerTests, self).setUp()
self.alpha_num = string.letters + string.digits
self.role_manager = manager_factory.role_manager()
self.role_query_manager = manager_factory.role_query_manager()
self.permission_manager = manager_factory.permission_manager()
self.permission_query_manager = manager_factory.permission_query_manager()
self.role_manager.ensure_super_user_role()
manager_factory.principal_manager().clear_principal()
开发者ID:alanoe,项目名称:pulp,代码行数:12,代码来源:test_cud.py
示例3: _run
def _run(self):
"""
Run the call in the call request.
Generally the target of a new thread.
"""
# used for calling _run directly during testing
principal_manager = managers_factory.principal_manager()
principal_manager.set_principal(self.call_request.principal)
# generally set in the wrapper, but not when called directly
if self.call_report.state in dispatch_constants.CALL_READY_STATES:
self.call_report.state = dispatch_constants.CALL_RUNNING_STATE
self.call_report.start_time = datetime.datetime.now(dateutils.utc_tz())
dispatch_context.CONTEXT.set_task_attributes(self)
call = self.call_request.call
args = copy.copy(self.call_request.args)
kwargs = copy.copy(self.call_request.kwargs)
try:
result = call(*args, **kwargs)
except:
e, tb = sys.exc_info()[1:]
_LOG.exception(e)
return self._failed(e, tb)
else:
return self._succeeded(result)
finally:
principal_manager.clear_principal()
dispatch_context.CONTEXT.clear_task_attributes()
开发者ID:bartwo,项目名称:pulp,代码行数:35,代码来源:task.py
示例4: _run
def _run(self):
"""
Run the call in the call request.
Generally the target of a new thread.
"""
# used for calling _run directly during testing
principal_manager = managers_factory.principal_manager()
principal_manager.set_principal(self.call_request.principal)
# usually set in the wrapper, unless called directly
if self.call_report.state in dispatch_constants.CALL_READY_STATES:
self.call_report.state = dispatch_constants.CALL_RUNNING_STATE
self.call_report.start_time = datetime.datetime.now(dateutils.utc_tz())
dispatch_context.CONTEXT.set_task_attributes(self)
call = self.call_request.call
args = copy.copy(self.call_request.args)
kwargs = copy.copy(self.call_request.kwargs)
try:
result = call(*args, **kwargs)
except:
# NOTE: this is making an assumption here that the call failed to
# execute, if this isn't the case, or it got far enough, we may be
# faced with _succeeded or _failed being called again
e, tb = sys.exc_info()[1:]
_LOG.exception(e)
# too bad 2.4 doesn't support try/except/finally blocks
principal_manager.clear_principal()
dispatch_context.CONTEXT.clear_task_attributes()
return self._failed(e, tb)
principal_manager.clear_principal()
dispatch_context.CONTEXT.clear_task_attributes()
return result
开发者ID:pkilambi,项目名称:pulp,代码行数:31,代码来源:task.py
示例5: POST
def POST(self, repo_id):
params = self.params()
criteria = params.get("criteria", None)
if criteria is not None:
try:
criteria = UnitAssociationCriteria.from_client_input(criteria)
except:
logger.error("Error parsing unassociation criteria [%s]" % criteria)
raise exceptions.PulpDataException(), None, sys.exc_info()[2]
tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id), action_tag("unassociate")]
async_result = unassociate_by_criteria.apply_async_with_reservation(
dispatch_constants.RESOURCE_REPOSITORY_TYPE,
repo_id,
[
repo_id,
criteria,
RepoContentUnit.OWNER_TYPE_USER,
manager_factory.principal_manager().get_principal()["login"],
],
tags=tags,
)
raise exceptions.OperationPostponed(async_result)
开发者ID:preethit,项目名称:pulp-1,代码行数:25,代码来源:repositories.py
示例6: test_associate_from_repo_no_criteria
def test_associate_from_repo_no_criteria(self, mock_repo_qs):
source_repo_id = 'source-repo'
dest_repo_id = 'dest-repo'
self.importer_manager.set_importer(source_repo_id, 'mock-importer', {})
self.importer_manager.set_importer(dest_repo_id, 'mock-importer', {})
self.content_manager.add_content_unit('mock-type', 'unit-1', {'key-1': 'unit-1'})
self.content_manager.add_content_unit('mock-type', 'unit-2', {'key-1': 'unit-2'})
self.content_manager.add_content_unit('mock-type', 'unit-3', {'key-1': 'unit-3'})
self.manager.associate_unit_by_id(source_repo_id, 'mock-type', 'unit-1')
self.manager.associate_unit_by_id(source_repo_id, 'mock-type', 'unit-2')
self.manager.associate_unit_by_id(source_repo_id, 'mock-type', 'unit-3')
fake_user = User('associate-user', '')
manager_factory.principal_manager().set_principal(principal=fake_user)
mock_plugins.MOCK_IMPORTER.import_units.return_value = [Unit('mock-type', {'k': 'v'}, {},
'')]
# Test
results = self.manager.associate_from_repo(source_repo_id, dest_repo_id)
associated = results['units_successful']
# Verify
self.assertEqual(1, len(associated))
self.assertEqual(associated[0]['type_id'], 'mock-type')
self.assertEqual(associated[0]['unit_key'], {'k': 'v'})
self.assertEqual(1, mock_plugins.MOCK_IMPORTER.import_units.call_count)
mock_repo = mock_repo_qs.get_repo_or_missing_resource.return_value
args = mock_plugins.MOCK_IMPORTER.import_units.call_args[0]
kwargs = mock_plugins.MOCK_IMPORTER.import_units.call_args[1]
self.assertEqual(args[0], mock_repo.to_transfer_repo())
self.assertEqual(args[1], mock_repo.to_transfer_repo())
self.assertEqual(None, kwargs['units']) # units to import
self.assertTrue(isinstance(args[3], PluginCallConfiguration)) # config
conduit = args[2]
self.assertTrue(isinstance(conduit, ImportUnitConduit))
# Clean Up
manager_factory.principal_manager().set_principal(principal=None)
开发者ID:jeremycline,项目名称:pulp,代码行数:45,代码来源:test_unit_association.py
示例7: _test_generate_user_certificate
def _test_generate_user_certificate(self):
# Setup
admin_user = self.user_manager.create_user('test-admin')
manager_factory.principal_manager().set_principal(admin_user) # pretend the user is logged in
# Test
cert = self.user_manager.generate_user_certificate()
# Verify
self.assertTrue(cert is not None)
certificate = manager_factory.certificate_manager(content=cert)
cn = certificate.subject()['CN']
username, id = self.cert_generation_manager.decode_admin_user(cn)
self.assertEqual(username, admin_user['login'])
self.assertEqual(id, admin_user['id'])
开发者ID:ashcrow,项目名称:pulp,代码行数:18,代码来源:test_user_manager.py
示例8: _originator
def _originator(self):
'''
Returns the value to use as the originator of the consumer event (either the
consumer itself or an admin user).
@return: login of the originator value to use in the event
@rtype: string
'''
return managers_factory.principal_manager().get_principal()['login']
开发者ID:jeremycline,项目名称:pulp,代码行数:9,代码来源:history.py
示例9: _auth_decorator
def _auth_decorator(self, *args, **kwargs):
# Check Authentication
# Run through each registered and enabled auth function
is_consumer = False
registered_auth_functions = [check_preauthenticated,
password_authentication,
user_cert_authentication,
consumer_cert_authentication,
oauth_authentication]
user_authenticated = False
for authenticate_user in registered_auth_functions:
if authenticate_user == oauth_authentication:
userid, is_consumer = authenticate_user()
else:
userid = authenticate_user()
if userid is not None:
user_authenticated = True
if authenticate_user == consumer_cert_authentication:
is_consumer = True
break
if not user_authenticated:
raise PulpCodedAuthenticationException(error_code=error_codes.PLP0025)
# Check Authorization
principal_manager = factory.principal_manager()
user_query_manager = factory.user_query_manager()
if super_user_only and not user_query_manager.is_superuser(userid):
raise PulpCodedAuthenticationException(error_code=error_codes.PLP0026, user=userid,
operation=OPERATION_NAMES[operation])
# if the operation is None, don't check authorization
elif operation is not None:
if is_consumer:
if is_consumer_authorized(http.resource_path(), userid, operation):
# set default principal = SYSTEM
principal_manager.set_principal()
else:
raise PulpCodedAuthenticationException(error_code=error_codes.PLP0026,
user=userid,
operation=OPERATION_NAMES[operation])
elif user_query_manager.is_authorized(http.resource_path(), userid, operation):
user = user_query_manager.find_by_login(userid)
principal_manager.set_principal(user)
else:
raise PulpCodedAuthenticationException(error_code=error_codes.PLP0026,
user=userid,
operation=OPERATION_NAMES[operation])
# Authentication and authorization succeeded. Call method and then clear principal.
value = method(self, *args, **kwargs)
principal_manager.clear_principal()
return value
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:56,代码来源:decorators.py
示例10: test_import_uploaded_unit
def test_import_uploaded_unit(self, mock_repo_qs, mock_rebuild):
importer_controller.set_importer('repo-u', 'mock-importer', {})
key = {'key': 'value'}
metadata = {'k1': 'v1'}
timestamp_pre_upload = dateutils.now_utc_datetime_with_tzinfo()
mock_repo = mock_repo_qs.get_repo_or_missing_resource.return_value
importer_return_report = {'success_flag': True, 'summary': '', 'details': {}}
mock_plugins.MOCK_IMPORTER.upload_unit.return_value = importer_return_report
upload_id = self.upload_manager.initialize_upload()
file_path = self.upload_manager._upload_file_path(upload_id)
fake_user = model.User('import-user', '')
manager_factory.principal_manager().set_principal(principal=fake_user)
response = self.upload_manager.import_uploaded_unit('repo-u', 'mock-type', key, metadata,
upload_id)
# import_uploaded_unit() should have returned our importer_return_report
self.assertTrue(response is importer_return_report)
call_args = mock_plugins.MOCK_IMPORTER.upload_unit.call_args[0]
self.assertTrue(call_args[0] is mock_repo.to_transfer_repo())
self.assertEqual(call_args[1], 'mock-type')
self.assertEqual(call_args[2], key)
self.assertEqual(call_args[3], metadata)
self.assertEqual(call_args[4], file_path)
conduit = call_args[5]
self.assertTrue(isinstance(conduit, UploadConduit))
self.assertEqual(call_args[5].repo_id, 'repo-u')
# It is now platform's responsibility to update plugin content unit counts
self.assertTrue(mock_rebuild.called, "rebuild_content_unit_counts must be called")
# Make sure that the last_unit_added timestamp was updated
self.assertTrue(mock_repo.last_unit_added > timestamp_pre_upload)
# Clean up
mock_plugins.MOCK_IMPORTER.upload_unit.return_value = None
manager_factory.principal_manager().set_principal(principal=None)
开发者ID:alexxa,项目名称:pulp,代码行数:43,代码来源:test_upload.py
示例11: test_associate_from_repo_no_criteria
def test_associate_from_repo_no_criteria(self):
# Setup
source_repo_id = "source-repo"
dest_repo_id = "dest-repo"
self.repo_manager.create_repo(source_repo_id)
self.importer_manager.set_importer(source_repo_id, "mock-importer", {})
self.repo_manager.create_repo(dest_repo_id)
self.importer_manager.set_importer(dest_repo_id, "mock-importer", {})
self.content_manager.add_content_unit("mock-type", "unit-1", {"key-1": "unit-1"})
self.content_manager.add_content_unit("mock-type", "unit-2", {"key-1": "unit-2"})
self.content_manager.add_content_unit("mock-type", "unit-3", {"key-1": "unit-3"})
self.manager.associate_unit_by_id(source_repo_id, "mock-type", "unit-1", OWNER_TYPE_USER, "admin")
self.manager.associate_unit_by_id(source_repo_id, "mock-type", "unit-2", OWNER_TYPE_USER, "admin")
self.manager.associate_unit_by_id(source_repo_id, "mock-type", "unit-3", OWNER_TYPE_USER, "admin")
fake_user = User("associate-user", "")
manager_factory.principal_manager().set_principal(principal=fake_user)
# Test
self.manager.associate_from_repo(source_repo_id, dest_repo_id)
# Verify
self.assertEqual(1, mock_plugins.MOCK_IMPORTER.import_units.call_count)
args = mock_plugins.MOCK_IMPORTER.import_units.call_args[0]
kwargs = mock_plugins.MOCK_IMPORTER.import_units.call_args[1]
self.assertTrue(isinstance(args[0], Repository)) # repository transfer object
self.assertEqual(args[0].id, "source-repo") # repo importing units from
self.assertEqual(args[1].id, "dest-repo") # repo importing units into
self.assertEqual(None, kwargs["units"]) # units to import
self.assertTrue(isinstance(args[3], PluginCallConfiguration)) # config
conduit = args[2]
self.assertTrue(isinstance(conduit, ImportUnitConduit))
self.assertEqual(conduit.association_owner_type, OWNER_TYPE_USER)
self.assertEqual(conduit.association_owner_id, fake_user.login)
# Clean Up
manager_factory.principal_manager().set_principal(principal=None)
开发者ID:ryanschneider,项目名称:pulp,代码行数:43,代码来源:test_repo_unit_association_manager.py
示例12: post
def post(self, request):
"""
Return client SSL certificate and a private key.
:param request: WSGI request object
:type request: django.core.handlers.wsgi.WSGIRequest
:return: Response containing cert and key
:rtype: django.http.HttpResponse
"""
user = factory.principal_manager().get_principal()
key, certificate = factory.cert_generation_manager().make_admin_user_cert(user)
key_cert = {'key': key, 'certificate': certificate}
return generate_json_response(key_cert)
开发者ID:alanoe,项目名称:pulp,代码行数:13,代码来源:root_actions.py
示例13: test_import_uploaded_unit
def test_import_uploaded_unit(self):
self.repo_manager.create_repo('repo-u')
self.importer_manager.set_importer('repo-u', 'mock-importer', {})
key = {'key' : 'value'}
metadata = {'k1' : 'v1'}
importer_return_report = object()
mock_plugins.MOCK_IMPORTER.upload_unit.return_value = importer_return_report
upload_id = self.upload_manager.initialize_upload()
file_path = self.upload_manager._upload_file_path(upload_id)
fake_user = User('import-user', '')
manager_factory.principal_manager().set_principal(principal=fake_user)
response = self.upload_manager.import_uploaded_unit('repo-u', 'mock-type', key, metadata,
upload_id)
# import_uploaded_unit() should have returned our importer_return_report
self.assertTrue(response is importer_return_report)
call_args = mock_plugins.MOCK_IMPORTER.upload_unit.call_args[0]
self.assertTrue(isinstance(call_args[0], Repository))
self.assertEqual(call_args[1], 'mock-type')
self.assertEqual(call_args[2], key)
self.assertEqual(call_args[3], metadata)
self.assertEqual(call_args[4], file_path)
conduit = call_args[5]
self.assertTrue(isinstance(conduit, UploadConduit))
self.assertEqual(call_args[5].repo_id, 'repo-u')
self.assertEqual(conduit.association_owner_type, OWNER_TYPE_USER)
self.assertEqual(conduit.association_owner_id, fake_user.login)
# Clean up
mock_plugins.MOCK_IMPORTER.upload_unit.return_value = None
manager_factory.principal_manager().set_principal(principal=None)
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:38,代码来源:test_server_managers_content_upload.py
示例14: test_import_uploaded_unit
def test_import_uploaded_unit(self):
# Setup
self.repo_manager.create_repo("repo-u")
self.importer_manager.set_importer("repo-u", "mock-importer", {})
key = {"key": "value"}
metadata = {"k1": "v1"}
importer_return_report = object()
mock_plugins.MOCK_IMPORTER.upload_unit.return_value = importer_return_report
upload_id = self.upload_manager.initialize_upload()
file_path = self.upload_manager._upload_file_path(upload_id)
fake_user = User("import-user", "")
manager_factory.principal_manager().set_principal(principal=fake_user)
# Test
self.upload_manager.import_uploaded_unit("repo-u", "mock-type", key, metadata, upload_id)
# Verify
call_args = mock_plugins.MOCK_IMPORTER.upload_unit.call_args[0]
self.assertTrue(isinstance(call_args[0], Repository))
self.assertEqual(call_args[1], "mock-type")
self.assertEqual(call_args[2], key)
self.assertEqual(call_args[3], metadata)
self.assertEqual(call_args[4], file_path)
conduit = call_args[5]
self.assertTrue(isinstance(conduit, UploadConduit))
self.assertEqual(call_args[5].repo_id, "repo-u")
self.assertEqual(conduit.association_owner_type, OWNER_TYPE_USER)
self.assertEqual(conduit.association_owner_id, fake_user.login)
# Clean up
mock_plugins.MOCK_IMPORTER.upload_unit.return_value = None
manager_factory.principal_manager().set_principal(principal=None)
开发者ID:ryanschneider,项目名称:pulp,代码行数:37,代码来源:test_content_upload_manager.py
示例15: test_import_uploaded_unit
def test_import_uploaded_unit(self, mock_repo_qs):
self.importer_manager.set_importer('repo-u', 'mock-importer', {})
key = {'key': 'value'}
metadata = {'k1': 'v1'}
mock_repo = mock_repo_qs.get_repo_or_missing_resource.return_value
importer_return_report = object()
mock_plugins.MOCK_IMPORTER.upload_unit.return_value = importer_return_report
upload_id = self.upload_manager.initialize_upload()
file_path = self.upload_manager._upload_file_path(upload_id)
fake_user = User('import-user', '')
manager_factory.principal_manager().set_principal(principal=fake_user)
response = self.upload_manager.import_uploaded_unit('repo-u', 'mock-type', key, metadata,
upload_id)
# import_uploaded_unit() should have returned our importer_return_report
self.assertTrue(response is importer_return_report)
call_args = mock_plugins.MOCK_IMPORTER.upload_unit.call_args[0]
self.assertTrue(call_args[0] is mock_repo.to_transfer_repo())
self.assertEqual(call_args[1], 'mock-type')
self.assertEqual(call_args[2], key)
self.assertEqual(call_args[3], metadata)
self.assertEqual(call_args[4], file_path)
conduit = call_args[5]
self.assertTrue(isinstance(conduit, UploadConduit))
self.assertEqual(call_args[5].repo_id, 'repo-u')
# Clean up
mock_plugins.MOCK_IMPORTER.upload_unit.return_value = None
manager_factory.principal_manager().set_principal(principal=None)
开发者ID:jeremycline,项目名称:pulp,代码行数:36,代码来源:test_upload.py
示例16: __init__
def __init__(self,
call,
args=None,
kwargs=None,
principal=None,
tags=None,
resources=None,
dependencies=None,
weight=1,
asynchronous=False,
archive=False,
kwarg_blacklist=()):
assert callable(call)
assert isinstance(args, (NoneType, tuple, list))
assert isinstance(kwargs, (NoneType, dict))
assert isinstance(principal, (NoneType, User, dict))
assert isinstance(tags, (NoneType, list))
assert isinstance(resources, (NoneType, dict))
assert isinstance(dependencies, (NoneType, dict))
assert isinstance(weight, int)
assert weight > -1
assert isinstance(asynchronous, bool)
assert isinstance(archive, bool)
assert isinstance(kwarg_blacklist, (list, tuple))
self.id = str(uuid.uuid4())
self.group_id = None
self.schedule_id = None
self.call = call
self.args = args or []
self.kwargs = kwargs or {}
self.principal = principal or managers_factory.principal_manager().get_principal()
self.tags = tags or []
self.resources = resources or {}
self.dependencies = dependencies or {}
self.weight = weight
self.asynchronous = asynchronous
self.archive = archive
self.kwarg_blacklist = kwarg_blacklist
self.execution_hooks = [[] for i in range(len(dispatch_constants.CALL_LIFE_CYCLE_CALLBACKS))]
self.control_hooks = [None for i in range(len(dispatch_constants.CALL_CONTROL_HOOKS))]
开发者ID:bartwo,项目名称:pulp,代码行数:47,代码来源:call.py
示例17: grant_automatic_permissions_for_resource
def grant_automatic_permissions_for_resource(self, resource):
"""
Grant CRUDE permissions for a newly created resource to current principal.
:param resource: resource path to grant permissions to
:type resource: str
:raises PulpExecutionException: if the system principal has not been set
"""
principal_manager = factory.principal_manager()
user = principal_manager.get_principal()
if principal_manager.is_system_principal():
raise PulpExecutionException(
_('Cannot grant automatic permissions for [%(user)s] on resource [%(resource)s]') %
{'user': user, 'resource': resource})
self.grant(resource, user['login'], authorization.OPERATION_NAMES)
开发者ID:credativ,项目名称:pulp,代码行数:17,代码来源:cud.py
示例18: POST
def POST(self, repo_id):
params = self.params()
criteria = params.get('criteria', None)
if criteria is not None:
try:
criteria = UnitAssociationCriteria.from_client_input(criteria)
except:
_logger.error('Error parsing unassociation criteria [%s]' % criteria)
raise exceptions.PulpDataException(), None, sys.exc_info()[2]
task_tags = [tags.resource_tag(tags.RESOURCE_REPOSITORY_TYPE, repo_id),
tags.action_tag('unassociate')]
async_result = unassociate_by_criteria.apply_async_with_reservation(
tags.RESOURCE_REPOSITORY_TYPE, repo_id,
[repo_id, criteria, RepoContentUnit.OWNER_TYPE_USER,
manager_factory.principal_manager().get_principal()['login']], tags=task_tags)
raise exceptions.OperationPostponed(async_result)
开发者ID:beav,项目名称:pulp,代码行数:19,代码来源:repositories.py
示例19: grant_automatic_permissions_for_resource
def grant_automatic_permissions_for_resource(self, resource):
"""
Grant CRUDE permissions for a newly created resource to current principal.
@type resource: str
@param resource: resource path to grant permissions to
@rtype: bool
@return: True on success, False otherwise
@raise PulpExecutionException: if the system principal has not been set
"""
principal_manager = factory.principal_manager()
user = principal_manager.get_principal()
if principal_manager.is_system_principal():
raise PulpExecutionException(_('Cannot grant automatic permissions for [%s] on resource [%s]') %
(user, resource))
operations = [self.CREATE, self.READ, self.UPDATE, self.DELETE, self.EXECUTE]
self.grant(resource, user['login'], operations)
开发者ID:bartwo,项目名称:pulp,代码行数:20,代码来源:cud.py
示例20: POST
def POST(self):
user = factory.principal_manager().get_principal()
key, certificate = factory.cert_generation_manager().make_admin_user_cert(user)
key_cert = {"key": key, "certificate": certificate}
return self.ok(key_cert)
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:5,代码来源:root_actions.py
注:本文中的pulp.server.managers.factory.principal_manager函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论