本文整理汇总了Python中taskflow.engines.run函数的典型用法代码示例。如果您正苦于以下问题:Python run函数的具体用法?Python run怎么用?Python run使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了run函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_purge_flow_normal
def test_purge_flow_normal(self, mock_creds, mock_dns_client):
service_id = str(uuid.uuid4())
domains_old = domain.Domain(domain='cdn.poppy.org')
current_origin = origin.Origin(origin='poppy.org')
service_obj = service.Service(service_id=service_id,
name='poppy cdn service',
domains=[domains_old],
origins=[current_origin],
flavor_id='cdn')
kwargs = {
'project_id': json.dumps(str(uuid.uuid4())),
'service_id': json.dumps(service_id),
'provider_details': json.dumps(
dict([(k, v.to_dict())
for k, v in service_obj.provider_details.items()])),
'purge_url': 'cdn.poppy.org',
'hard': json.dumps(True),
'service_obj': json.dumps(service_obj.to_dict()),
'context_dict': context_utils.RequestContext().to_dict()
}
service_controller, storage_controller, dns_controller = \
self.all_controllers()
with MonkeyPatchControllers(service_controller,
dns_controller,
storage_controller,
memoized_controllers.task_controllers):
self.patch_purge_flow(service_controller, storage_controller,
dns_controller)
engines.run(purge_service.purge_service(), store=kwargs)
开发者ID:BenjamenMeyer,项目名称:poppy,代码行数:33,代码来源:test_flows.py
示例2: test_delete_flow_normal
def test_delete_flow_normal(self, mock_creds):
service_id = str(uuid.uuid4())
domains_old = domain.Domain(domain='cdn.poppy.org')
current_origin = origin.Origin(origin='poppy.org')
service_obj = service.Service(service_id=service_id,
name='poppy cdn service',
domains=[domains_old],
origins=[current_origin],
flavor_id='cdn')
kwargs = {
'project_id': json.dumps(str(uuid.uuid4())),
'service_id': json.dumps(service_id),
'time_seconds': [i * self.time_factor for
i in range(self.total_retries)],
'provider_details': json.dumps(
dict([(k, v.to_dict())
for k, v in service_obj.provider_details.items()]))
}
service_controller, storage_controller, dns_controller = \
self.all_controllers()
with MonkeyPatchControllers(service_controller,
dns_controller,
storage_controller,
memoized_controllers.task_controllers):
self.patch_delete_flow(service_controller, storage_controller,
dns_controller)
engines.run(delete_service.delete_service(), store=kwargs)
开发者ID:jc7998,项目名称:poppy,代码行数:31,代码来源:test_flows.py
示例3: test_service_state_flow_normal
def test_service_state_flow_normal(self, mock_creds):
service_id = str(uuid.uuid4())
domains_old = domain.Domain(domain='cdn.poppy.org')
current_origin = origin.Origin(origin='poppy.org')
service_obj = service.Service(service_id=service_id,
name='poppy cdn service',
domains=[domains_old],
origins=[current_origin],
flavor_id='cdn')
kwargs = {
'project_id': json.dumps(str(uuid.uuid4())),
'state': 'enable',
'service_obj': json.dumps(service_obj.to_dict())
}
service_controller, storage_controller, dns_controller = \
self.all_controllers()
with MonkeyPatchControllers(service_controller,
dns_controller,
storage_controller,
memoized_controllers.task_controllers):
self.patch_service_state_flow(service_controller,
storage_controller,
dns_controller)
engines.run(update_service_state.enable_service(), store=kwargs)
engines.run(update_service_state.disable_service(), store=kwargs)
开发者ID:jc7998,项目名称:poppy,代码行数:29,代码来源:test_flows.py
示例4: test_delete_flow_dns_exception_with_retry
def test_delete_flow_dns_exception_with_retry(self, mock_creds):
service_id = str(uuid.uuid4())
domains_old = domain.Domain(domain="cdn.poppy.org")
current_origin = origin.Origin(origin="poppy.org")
service_obj = service.Service(
service_id=service_id,
name="poppy cdn service",
domains=[domains_old],
origins=[current_origin],
flavor_id="cdn",
)
kwargs = {
"project_id": json.dumps(str(uuid.uuid4())),
"service_id": json.dumps(service_id),
"time_seconds": [i * self.time_factor for i in range(self.total_retries)],
"provider_details": json.dumps(dict([(k, v.to_dict()) for k, v in service_obj.provider_details.items()])),
}
service_controller, storage_controller, dns_controller = self.all_controllers()
with MonkeyPatchControllers(
service_controller, dns_controller, storage_controller, memoized_controllers.task_controllers
):
self.patch_delete_flow(service_controller, storage_controller, dns_controller)
dns_controller.delete = mock.Mock()
dns_responder_returns = self.dns_exceptions()
dns_controller.delete._mock_side_effect = (dns_responder for dns_responder in dns_responder_returns)
engines.run(delete_service.delete_service(), store=kwargs)
开发者ID:obulpathi,项目名称:poppy,代码行数:31,代码来源:test_flows.py
示例5: test_create_flow_dns_exception
def test_create_flow_dns_exception(self, mock_creds,
mock_dns_client):
providers = ['cdn_provider']
kwargs = {
'providers_list_json': json.dumps(providers),
'project_id': json.dumps(str(uuid.uuid4())),
'auth_token': json.dumps(str(uuid.uuid4())),
'service_id': json.dumps(str(uuid.uuid4())),
'time_seconds': [i * self.time_factor for
i in range(self.total_retries)],
'context_dict': context_utils.RequestContext().to_dict()
}
service_controller, storage_controller, dns_controller = \
self.all_controllers()
with MonkeyPatchControllers(service_controller,
dns_controller,
storage_controller,
memoized_controllers.task_controllers):
self.patch_create_flow(service_controller, storage_controller,
dns_controller)
dns_controller.create = mock.Mock()
dns_controller.create._mock_return_value = {
'cdn_provider': {
'error': 'Whoops!',
'error_class': 'tests.unit.distributed_task'
'.taskflow.test_flows.DNSException'
}
}
engines.run(create_service.create_service(), store=kwargs)
开发者ID:BenjamenMeyer,项目名称:poppy,代码行数:32,代码来源:test_flows.py
示例6: test_create_flow_dns_exception
def test_create_flow_dns_exception(self, mock_creds):
providers = ["cdn_provider"]
kwargs = {
"providers_list_json": json.dumps(providers),
"project_id": json.dumps(str(uuid.uuid4())),
"auth_token": json.dumps(str(uuid.uuid4())),
"service_id": json.dumps(str(uuid.uuid4())),
"time_seconds": [i * self.time_factor for i in range(self.total_retries)],
}
service_controller, storage_controller, dns_controller = self.all_controllers()
with MonkeyPatchControllers(
service_controller, dns_controller, storage_controller, memoized_controllers.task_controllers
):
self.patch_create_flow(service_controller, storage_controller, dns_controller)
dns_controller.create = mock.Mock()
dns_controller.create._mock_return_value = {
"cdn_provider": {
"error": "Whoops!",
"error_class": "tests.unit.distributed_task" ".taskflow.test_flows.DNSException",
}
}
engines.run(create_service.create_service(), store=kwargs)
开发者ID:obulpathi,项目名称:poppy,代码行数:25,代码来源:test_flows.py
示例7: test_delete_flow_dns_exception
def test_delete_flow_dns_exception(self, mock_creds):
service_id = str(uuid.uuid4())
domains_old = domain.Domain(domain="cdn.poppy.org")
current_origin = origin.Origin(origin="poppy.org")
service_obj = service.Service(
service_id=service_id,
name="poppy cdn service",
domains=[domains_old],
origins=[current_origin],
flavor_id="cdn",
)
kwargs = {
"project_id": json.dumps(str(uuid.uuid4())),
"service_id": json.dumps(service_id),
"time_seconds": [i * self.time_factor for i in range(self.total_retries)],
"provider_details": json.dumps(dict([(k, v.to_dict()) for k, v in service_obj.provider_details.items()])),
}
service_controller, storage_controller, dns_controller = self.all_controllers()
with MonkeyPatchControllers(
service_controller, dns_controller, storage_controller, memoized_controllers.task_controllers
):
self.patch_delete_flow(service_controller, storage_controller, dns_controller)
dns_controller.delete = mock.Mock()
dns_controller.delete._mock_return_value = {
"cdn_provider": {
"error": "Whoops!",
"error_class": "tests.unit.distributed_task" ".taskflow.test_flows.DNSException",
}
}
engines.run(delete_service.delete_service(), store=kwargs)
开发者ID:obulpathi,项目名称:poppy,代码行数:35,代码来源:test_flows.py
示例8: test_purge_flow_normal
def test_purge_flow_normal(self, mock_creds):
service_id = str(uuid.uuid4())
domains_old = domain.Domain(domain="cdn.poppy.org")
current_origin = origin.Origin(origin="poppy.org")
service_obj = service.Service(
service_id=service_id,
name="poppy cdn service",
domains=[domains_old],
origins=[current_origin],
flavor_id="cdn",
)
kwargs = {
"project_id": json.dumps(str(uuid.uuid4())),
"service_id": json.dumps(service_id),
"provider_details": json.dumps(dict([(k, v.to_dict()) for k, v in service_obj.provider_details.items()])),
"purge_url": "cdn.poppy.org",
"hard": json.dumps(True),
"service_obj": json.dumps(service_obj.to_dict()),
}
service_controller, storage_controller, dns_controller = self.all_controllers()
with MonkeyPatchControllers(
service_controller, dns_controller, storage_controller, memoized_controllers.task_controllers
):
self.patch_purge_flow(service_controller, storage_controller, dns_controller)
engines.run(purge_service.purge_service(), store=kwargs)
开发者ID:obulpathi,项目名称:poppy,代码行数:29,代码来源:test_flows.py
示例9: test_service_state_flow_normal
def test_service_state_flow_normal(self, mock_creds):
service_id = str(uuid.uuid4())
domains_old = domain.Domain(domain="cdn.poppy.org")
current_origin = origin.Origin(origin="poppy.org")
service_obj = service.Service(
service_id=service_id,
name="poppy cdn service",
domains=[domains_old],
origins=[current_origin],
flavor_id="cdn",
)
kwargs = {
"project_id": json.dumps(str(uuid.uuid4())),
"state": "enable",
"service_obj": json.dumps(service_obj.to_dict()),
}
service_controller, storage_controller, dns_controller = self.all_controllers()
with MonkeyPatchControllers(
service_controller, dns_controller, storage_controller, memoized_controllers.task_controllers
):
self.patch_service_state_flow(service_controller, storage_controller, dns_controller)
engines.run(update_service_state.enable_service(), store=kwargs)
engines.run(update_service_state.disable_service(), store=kwargs)
开发者ID:obulpathi,项目名称:poppy,代码行数:27,代码来源:test_flows.py
示例10: test_create_flow_dns_exception_with_retry_and_fail
def test_create_flow_dns_exception_with_retry_and_fail(self, mock_creds):
providers = ['cdn_provider']
kwargs = {
'providers_list_json': json.dumps(providers),
'project_id': json.dumps(str(uuid.uuid4())),
'auth_token': json.dumps(str(uuid.uuid4())),
'service_id': json.dumps(str(uuid.uuid4())),
'time_seconds': [i * self.time_factor for
i in range(self.total_retries)],
'context_dict': context_utils.RequestContext().to_dict()
}
service_controller, storage_controller, dns_controller = \
self.all_controllers()
with MonkeyPatchControllers(service_controller,
dns_controller,
storage_controller,
memoized_controllers.task_controllers):
self.patch_create_flow(service_controller, storage_controller,
dns_controller)
dns_controller.create = mock.Mock()
dns_responder_returns = self.dns_exceptions_only()
dns_controller.create._mock_side_effect = (dns_responder for
dns_responder in
dns_responder_returns)
engines.run(create_service.create_service(), store=kwargs)
开发者ID:BenjamenMeyer,项目名称:poppy,代码行数:31,代码来源:test_flows.py
示例11: test_recreate_ssl_certificate
def test_recreate_ssl_certificate(self, mock_creds, mock_dns_client):
providers = ['cdn_provider']
cert_obj_json = ssl_certificate.SSLCertificate('cdn',
'mytestsite.com',
'san')
kwargs = {
'providers_list_json': json.dumps(providers),
'project_id': json.dumps(str(uuid.uuid4())),
'domain_name': 'mytestsite.com',
'cert_type': 'san',
'cert_obj_json': json.dumps(cert_obj_json.to_dict()),
}
service_controller, storage_controller, dns_controller = \
self.all_controllers()
with MonkeyPatchControllers(service_controller,
dns_controller,
storage_controller,
memoized_controllers.task_controllers):
self.patch_recreate_ssl_certificate_flow(service_controller,
storage_controller,
dns_controller)
engines.run(recreate_ssl_certificate.recreate_ssl_certificate(),
store=kwargs)
开发者ID:BenjamenMeyer,项目名称:poppy,代码行数:26,代码来源:test_flows.py
示例12: test_create_flow_normal
def test_create_flow_normal(self):
providers = ['cdn_provider']
kwargs = {
'providers_list_json': json.dumps(providers),
'project_id': json.dumps(str(uuid.uuid4())),
'auth_token': json.dumps(str(uuid.uuid4())),
'service_id': json.dumps(str(uuid.uuid4())),
'time_seconds': [i * self.time_factor
for i in range(self.total_retries)],
'context_dict': context_utils.RequestContext().to_dict()
}
(
service_controller,
storage_controller,
dns_controller,
ssl_cert_controller
) = self.all_controllers()
with MonkeyPatchControllers(service_controller,
dns_controller,
storage_controller,
ssl_cert_controller,
memoized_controllers.task_controllers):
self.patch_create_flow(service_controller,
storage_controller,
dns_controller)
engines.run(create_service.create_service(), store=kwargs)
开发者ID:openstack,项目名称:poppy,代码行数:29,代码来源:test_flows.py
示例13: test_delete_ssl_certificate_normal
def test_delete_ssl_certificate_normal(self):
kwargs = {
'cert_type': "san",
'project_id': json.dumps(str(uuid.uuid4())),
'domain_name': "san.san.com",
'context_dict': context_utils.RequestContext().to_dict()
}
(
service_controller,
storage_controller,
dns_controller,
ssl_cert_controller
) = self.all_controllers()
with MonkeyPatchControllers(service_controller,
dns_controller,
storage_controller,
ssl_cert_controller,
memoized_controllers.task_controllers):
self.patch_create_ssl_certificate_flow(
service_controller,
storage_controller,
dns_controller
)
engines.run(
delete_ssl_certificate.delete_ssl_certificate(),
store=kwargs
)
开发者ID:yunhaia,项目名称:poppy,代码行数:31,代码来源:test_flows.py
示例14: test_create_ssl_certificate_normal
def test_create_ssl_certificate_normal(self):
providers = ['cdn_provider']
cert_obj_json = ssl_certificate.SSLCertificate('cdn',
'mytestsite.com',
'san')
kwargs = {
'providers_list_json': json.dumps(providers),
'project_id': json.dumps(str(uuid.uuid4())),
'cert_obj_json': json.dumps(cert_obj_json.to_dict()),
'context_dict': context_utils.RequestContext().to_dict()
}
(
service_controller,
storage_controller,
dns_controller,
ssl_cert_controller
) = self.all_controllers()
with MonkeyPatchControllers(service_controller,
dns_controller,
storage_controller,
ssl_cert_controller,
memoized_controllers.task_controllers):
self.patch_create_ssl_certificate_flow(service_controller,
storage_controller,
dns_controller)
engines.run(create_ssl_certificate.create_ssl_certificate(),
store=kwargs)
开发者ID:openstack,项目名称:poppy,代码行数:31,代码来源:test_flows.py
示例15: test_delete_stack
def test_delete_stack(self):
heat_client = mock.MagicMock(name='heat_client')
stack_id = 'stack_id'
flow_store = {'stack_id': stack_id}
flow = self._get_delete_stack_flow(heat_client)
engines.run(flow, store=flow_store)
heat_client.stacks.delete.assert_called_once_with(stack_id)
开发者ID:Fangfenghua,项目名称:magnum,代码行数:8,代码来源:test_heat_tasks.py
示例16: test_service_state_flow_dns_exception
def test_service_state_flow_dns_exception(self):
service_id = str(uuid.uuid4())
domains_old = domain.Domain(domain='cdn.poppy.org')
current_origin = origin.Origin(origin='poppy.org')
service_obj = service.Service(service_id=service_id,
name='poppy cdn service',
domains=[domains_old],
origins=[current_origin],
flavor_id='cdn')
enable_kwargs = {
'project_id': json.dumps(str(uuid.uuid4())),
'state': 'enable',
'service_obj': json.dumps(service_obj.to_dict()),
'time_seconds': [i * self.time_factor for
i in range(self.total_retries)],
'context_dict': context_utils.RequestContext().to_dict()
}
disable_kwargs = enable_kwargs.copy()
disable_kwargs['state'] = 'disable'
(
service_controller,
storage_controller,
dns_controller,
ssl_cert_controller
) = self.all_controllers()
with MonkeyPatchControllers(service_controller,
dns_controller,
storage_controller,
ssl_cert_controller,
memoized_controllers.task_controllers):
self.patch_service_state_flow(service_controller,
storage_controller,
dns_controller)
dns_controller.enable = mock.Mock()
dns_controller.enable._mock_return_value = {
'cdn_provider': {
'error': 'Whoops!',
'error_class': 'tests.unit.distributed_task'
'.taskflow.test_flows.DNSException'
}
}
dns_controller.disable = mock.Mock()
dns_controller.disable._mock_return_value = {
'cdn_provider': {
'error': 'Whoops!',
'error_class': 'tests.unit.distributed_task'
'.taskflow.test_flows.DNSException'
}
}
engines.run(update_service_state.enable_service(),
store=enable_kwargs)
engines.run(update_service_state.disable_service(),
store=disable_kwargs)
开发者ID:openstack,项目名称:poppy,代码行数:58,代码来源:test_flows.py
示例17: test_check_cert_status_and_update_flow
def test_check_cert_status_and_update_flow(self):
kwargs = {
'domain_name': "blog.testabc.com",
'cert_type': "san",
'flavor_id': "premium",
'project_id': "000"
}
engines.run(check_cert_status_and_update_flow.
check_cert_status_and_update_flow(),
store=kwargs)
开发者ID:BenjamenMeyer,项目名称:poppy,代码行数:10,代码来源:test_flows.py
示例18: test_service_state_flow_dns_exception_retry_and_succeed
def test_service_state_flow_dns_exception_retry_and_succeed(self):
service_id = str(uuid.uuid4())
domains_old = domain.Domain(domain='cdn.poppy.org')
current_origin = origin.Origin(origin='poppy.org')
service_obj = service.Service(service_id=service_id,
name='poppy cdn service',
domains=[domains_old],
origins=[current_origin],
flavor_id='cdn')
enable_kwargs = {
'project_id': json.dumps(str(uuid.uuid4())),
'state': 'enable',
'service_obj': json.dumps(service_obj.to_dict()),
'time_seconds': [i * self.time_factor for
i in range(self.total_retries)],
'context_dict': context_utils.RequestContext().to_dict()
}
disable_kwargs = enable_kwargs.copy()
disable_kwargs['state'] = 'disable'
(
service_controller,
storage_controller,
dns_controller,
ssl_cert_controller
) = self.all_controllers()
with MonkeyPatchControllers(service_controller,
dns_controller,
storage_controller,
ssl_cert_controller,
memoized_controllers.task_controllers):
self.patch_service_state_flow(service_controller,
storage_controller,
dns_controller)
dns_responder_returns = self.dns_exceptions_and_succeed()
dns_controller.enable._mock_side_effect = (dns_responder for
dns_responder in
dns_responder_returns)
dns_controller.disable._mock_side_effect = (dns_responder for
dns_responder in
dns_responder_returns)
engines.run(update_service_state.enable_service(),
store=enable_kwargs)
engines.run(update_service_state.disable_service(),
store=disable_kwargs)
开发者ID:openstack,项目名称:poppy,代码行数:51,代码来源:test_flows.py
示例19: test_update_flow_dns_exception_with_retry_and_fail
def test_update_flow_dns_exception_with_retry_and_fail(self):
service_id = str(uuid.uuid4())
domains_old = domain.Domain(domain='cdn.poppy.org')
domains_new = domain.Domain(domain='mycdn.poppy.org')
current_origin = origin.Origin(origin='poppy.org')
service_old = service.Service(service_id=service_id,
name='poppy cdn service',
domains=[domains_old],
origins=[current_origin],
flavor_id='cdn')
service_new = service.Service(service_id=service_id,
name='poppy cdn service',
domains=[domains_new],
origins=[current_origin],
flavor_id='cdn')
kwargs = {
'project_id': json.dumps(str(uuid.uuid4())),
'auth_token': json.dumps(str(uuid.uuid4())),
'service_id': json.dumps(service_id),
'time_seconds': [i * self.time_factor for
i in range(self.total_retries)],
'service_old': json.dumps(service_old.to_dict()),
'service_obj': json.dumps(service_new.to_dict()),
'context_dict': context_utils.RequestContext().to_dict()
}
(
service_controller,
storage_controller,
dns_controller,
ssl_cert_controller
) = self.all_controllers()
with MonkeyPatchControllers(service_controller,
dns_controller,
storage_controller,
ssl_cert_controller,
memoized_controllers.task_controllers):
self.patch_update_flow(service_controller, storage_controller,
dns_controller)
dns_controller.update = mock.Mock()
dns_responder_returns = self.dns_exceptions_only()
dns_controller.update._mock_side_effect = (dns_responder for
dns_responder in
dns_responder_returns)
engines.run(update_service.update_service(), store=kwargs)
开发者ID:openstack,项目名称:poppy,代码行数:50,代码来源:test_flows.py
示例20: test_delete_flow_dns_exception
def test_delete_flow_dns_exception(self):
service_id = str(uuid.uuid4())
domains_old = domain.Domain(domain='cdn.poppy.org')
current_origin = origin.Origin(origin='poppy.org')
service_obj = service.Service(service_id=service_id,
name='poppy cdn service',
domains=[domains_old],
origins=[current_origin],
flavor_id='cdn')
kwargs = {
'project_id': json.dumps(str(uuid.uuid4())),
'service_id': json.dumps(service_id),
'time_seconds': [i * self.time_factor for
i in range(self.total_retries)],
'provider_details': json.dumps(
dict([(k, v.to_dict())
for k, v in service_obj.provider_details.items()])),
'context_dict': context_utils.RequestContext().to_dict()
}
(
service_controller,
storage_controller,
dns_controller,
ssl_cert_controller
) = self.all_controllers()
with MonkeyPatchControllers(service_controller,
dns_controller,
storage_controller,
ssl_cert_controller,
memoized_controllers.task_controllers):
self.patch_delete_flow(service_controller, storage_controller,
dns_controller)
service_mock = mock.Mock()
type(service_mock).domains = []
storage_controller.get_service.return_value = service_mock
dns_controller.delete = mock.Mock()
dns_controller.delete._mock_return_value = {
'cdn_provider': {
'error': 'Whoops!',
'error_class': 'tests.unit.distributed_task'
'.taskflow.test_flows.DNSException'
}
}
engines.run(delete_service.delete_service(), store=kwargs)
开发者ID:openstack,项目名称:poppy,代码行数:50,代码来源:test_flows.py
注:本文中的taskflow.engines.run函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论