本文整理汇总了Python中testenv.resource函数的典型用法代码示例。如果您正苦于以下问题:Python resource函数的具体用法?Python resource怎么用?Python resource使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了resource函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _local_task_fail_impl
def _local_task_fail_impl(self, wf_name):
if self.do_get:
deploy(resource('dsl/workflow_api.yaml'), wf_name,
parameters={'do_get': self.do_get})
else:
self.assertRaises(RuntimeError,
deploy,
resource('dsl/workflow_api.yaml'),
wf_name,
parameters={'do_get': self.do_get})
开发者ID:boreys,项目名称:cloudify-manager,代码行数:10,代码来源:test_wf_api.py
示例2: _execute_and_cancel_execution
def _execute_and_cancel_execution(self, workflow_id, force=False,
wait_for_termination=True,
is_wait_for_asleep_node=True):
dsl_path = resource('dsl/sleep_workflows.yaml')
_id = uuid.uuid1()
blueprint_id = 'blueprint_{0}'.format(_id)
deployment_id = 'deployment_{0}'.format(_id)
self.client.blueprints.upload(dsl_path, blueprint_id)
self.client.deployments.create(blueprint_id, deployment_id)
do_retries(verify_workers_installation_complete, 30,
deployment_id=deployment_id)
execution = self.client.deployments.execute(
deployment_id, workflow_id)
node_inst_id = self.client.node_instances.list(deployment_id)[0].id
if is_wait_for_asleep_node:
for retry in range(30):
if self.client.node_instances.get(
node_inst_id).state == 'asleep':
break
time.sleep(1)
else:
raise RuntimeError("Execution was expected to go"
" into 'sleeping' status")
execution = self.client.executions.cancel(execution.id, force)
expected_status = Execution.FORCE_CANCELLING if force else \
Execution.CANCELLING
self.assertEquals(expected_status, execution.status)
if wait_for_termination:
wait_for_execution_to_end(execution)
execution = self.client.executions.get(execution.id)
return execution
开发者ID:mahak,项目名称:cloudify-manager,代码行数:34,代码来源:test_executions.py
示例3: test_illegal_non_graph_to_graph_mode
def test_illegal_non_graph_to_graph_mode(self):
if not self.do_get:
# no need to run twice
return
self.assertRaises(RuntimeError, deploy,
resource('dsl/workflow_api.yaml'),
self._testMethodName)
开发者ID:boreys,项目名称:cloudify-manager,代码行数:7,代码来源:test_wf_api.py
示例4: test_cancel_on_wait_for_task_termination
def test_cancel_on_wait_for_task_termination(self):
_, eid = deploy(
resource('dsl/workflow_api.yaml'), self._testMethodName,
parameters={'do_get': self.do_get}, wait_for_execution=False)
self.wait_for_execution_status(eid, status=Execution.STARTED)
self.client.executions.cancel(eid)
self.wait_for_execution_status(eid, status=Execution.CANCELLED)
开发者ID:boreys,项目名称:cloudify-manager,代码行数:7,代码来源:test_wf_api.py
示例5: test_policies
def test_policies(self):
dsl_path = resource("dsl/with_policies.yaml")
deployment, _ = deploy(dsl_path)
def assertion():
instances = self.client.node_instances.list(deployment.id)
self.assertEqual(1, len(instances))
self.do_assertions(assertion)
instance = self.client.node_instances.list(deployment.id)[0]
self.publish_riemann_event(deployment.id,
node_name='node',
node_id=instance.id,
metric=123)
def assertion():
executions = self.client.executions.list(deployment.id)
self.assertEqual(3, len(executions))
invocations = send_task(testmock_get_invocations).get(timeout=10)
self.assertEqual(2, len(invocations))
instances = self.client.node_instances.list(deployment.id)
self.assertEqual(1, len(instances))
instance = instances[0]
self.assertEqual(instance.id, invocations[0]['node_id'])
self.assertEqual(123, invocations[1]['metric'])
self.do_assertions(assertion)
开发者ID:mahak,项目名称:cloudify-manager,代码行数:26,代码来源:test_policies.py
示例6: test_invalid_dsl
def test_invalid_dsl(self):
# note: this actually tests the validation part of the "deploy" command
dsl_path = resource("dsl/invalid-dsl.yaml")
with self.assertRaises(Exception) as cm:
deploy(dsl_path)
self.assertTrue('invalid blueprint' in
cm.exception.message.lower(), cm.exception.message)
开发者ID:Fewbytes,项目名称:cloudify-manager,代码行数:8,代码来源:test_validate_dsl.py
示例7: test_cancel_on_task_retry_interval
def test_cancel_on_task_retry_interval(self):
self.configure(retries=2, interval=1000000)
_, eid = deploy(
resource('dsl/workflow_api.yaml'), self._testMethodName,
parameters={'do_get': self.do_get}, wait_for_execution=False)
self.wait_for_execution_status(eid, status=Execution.STARTED)
self.client.executions.cancel(eid)
self.wait_for_execution_status(eid, status=Execution.CANCELLED)
开发者ID:boreys,项目名称:cloudify-manager,代码行数:8,代码来源:test_wf_api.py
示例8: setUp
def setUp(self):
super(RestAPITest, self).setUp()
dsl_path = resource('dsl/basic.yaml')
self.node_id = 'webserver_host'
self.blueprint_id = 'blueprint-' + str(uuid.uuid4())
self.deployment_id = 'deployment-' + str(uuid.uuid4())
self.client.blueprints.upload(dsl_path, self.blueprint_id)
self.client.deployments.create(self.blueprint_id, self.deployment_id)
开发者ID:boreys,项目名称:cloudify-manager,代码行数:8,代码来源:test_rest_api.py
示例9: test_operation_mapping_override
def test_operation_mapping_override(self):
dsl_path = resource("dsl/operation_mapping.yaml")
deployment, _ = deploy(dsl_path, 'workflow2')
invocations = send_task(get_mock_operation_invocations).get()
self.assertEqual(3, len(invocations))
for invocation in invocations:
self.assertEqual(1, len(invocation))
self.assertEqual(invocation['test_key'], 'overridden_test_value')
开发者ID:boreys,项目名称:cloudify-manager,代码行数:8,代码来源:test_operation_mapping.py
示例10: test_fail_remote_task_eventual_success
def test_fail_remote_task_eventual_success(self):
deploy(resource('dsl/workflow_api.yaml'), self._testMethodName,
parameters={'do_get': self.do_get})
# testing workflow remote task
invocations = send_task(get_fail_invocations).get()
self.assertEqual(3, len(invocations))
for i in range(len(invocations) - 1):
self.assertLessEqual(1, invocations[i+1] - invocations[i])
开发者ID:boreys,项目名称:cloudify-manager,代码行数:9,代码来源:test_wf_api.py
示例11: test_get_blueprint
def test_get_blueprint(self):
dsl_path = resource("dsl/basic.yaml")
blueprint_id = str(uuid.uuid4())
deployment, _ = deploy(dsl_path, blueprint_id=blueprint_id)
self.assertEqual(blueprint_id, deployment.blueprint_id)
blueprint = self.client.blueprints.get(blueprint_id)
self.assertEqual(blueprint_id, blueprint.id)
self.assertTrue(len(blueprint['plan']) > 0)
开发者ID:boreys,项目名称:cloudify-manager,代码行数:9,代码来源:test_workflow.py
示例12: _test_retries_and_retry_interval_impl
def _test_retries_and_retry_interval_impl(self,
blueprint,
retries,
retry_interval,
expected_interval,
expected_retries,
invocations_task,
expect_failure=False):
self.configure(retries=retries, retry_interval=retry_interval)
if expect_failure:
self.assertRaises(RuntimeError, deploy, resource(blueprint))
else:
deploy(resource(blueprint))
invocations = send_task(invocations_task).get()
self.assertEqual(expected_retries + 1, len(invocations))
for i in range(len(invocations) - 1):
self.assertLessEqual(expected_interval,
invocations[i+1] - invocations[i])
开发者ID:boreys,项目名称:cloudify-manager,代码行数:18,代码来源:test_task_retries.py
示例13: test_execute_operation_failure
def test_execute_operation_failure(self):
from plugins.cloudmock.tasks import set_raise_exception_on_start
send_task(set_raise_exception_on_start).get(timeout=10)
dsl_path = resource("dsl/basic.yaml")
try:
deploy(dsl_path)
self.fail('expected exception')
except Exception:
pass
开发者ID:boreys,项目名称:cloudify-manager,代码行数:9,代码来源:test_workflow.py
示例14: test_start_monitor_node_operation
def test_start_monitor_node_operation(self):
dsl_path = resource("dsl/hardcoded_operation_properties.yaml")
deploy(dsl_path)
from plugins.testmockoperations.tasks import \
get_monitoring_operations_invocation
invocations = send_task(get_monitoring_operations_invocation)\
.get(timeout=10)
self.assertEqual(1, len(invocations))
invocation = invocations[0]
self.assertEqual('start_monitor', invocation['operation'])
开发者ID:boreys,项目名称:cloudify-manager,代码行数:10,代码来源:test_workflow.py
示例15: test_deployment_workflows
def test_deployment_workflows(self):
dsl_path = resource("dsl/custom_workflow_mapping.yaml")
deployment, _ = deploy(dsl_path)
deployment_id = deployment.id
workflows = self.client.deployments.get(deployment_id).workflows
self.assertEqual(3, len(workflows))
wf_ids = [x.name for x in workflows]
self.assertTrue('uninstall' in wf_ids)
self.assertTrue('install' in wf_ids)
self.assertTrue('custom' in wf_ids)
开发者ID:mahak,项目名称:cloudify-manager,代码行数:10,代码来源:test_deployment_workflows.py
示例16: test_plugin_get_resource
def test_plugin_get_resource(self):
dsl_path = resource("dsl/get_resource_in_plugin.yaml")
deploy(dsl_path)
from plugins.testmockoperations.tasks import \
get_resource_operation_invocations as testmock_get_invocations
invocations = send_task(testmock_get_invocations).get(
timeout=10)
self.assertEquals(1, len(invocations))
invocation = invocations[0]
with open(resource("dsl/basic.yaml")) as f:
basic_data = f.read()
# checking the resources are the correct data
self.assertEquals(basic_data, invocation['res1_data'])
self.assertEquals(basic_data, invocation['res2_data'])
# checking the custom filepath provided is indeed where the second
# resource was saved
self.assertEquals(invocation['custom_filepath'],
invocation['res2_path'])
开发者ID:boreys,项目名称:cloudify-manager,代码行数:20,代码来源:test_workflow.py
示例17: test_execute_operation
def test_execute_operation(self):
dsl_path = resource("dsl/basic.yaml")
blueprint_id = self.id()
deployment, _ = deploy(dsl_path, blueprint_id=blueprint_id)
self.assertEqual(blueprint_id, deployment.blueprint_id)
from plugins.cloudmock.tasks import get_machines
result = send_task(get_machines)
machines = result.get(timeout=10)
self.assertEquals(1, len(machines))
开发者ID:mahak,项目名称:cloudify-manager,代码行数:12,代码来源:test_workflow.py
示例18: test_inject_properties_to_operation
def test_inject_properties_to_operation(self):
dsl_path = resource("dsl/hardcoded_operation_properties.yaml")
deploy(dsl_path)
from plugins.testmockoperations.tasks import get_state as \
testmock_get_state
states = send_task(testmock_get_state).get(timeout=10)
from plugins.testmockoperations.tasks import \
get_mock_operation_invocations as testmock_get__invocations
invocations = send_task(testmock_get__invocations).get(timeout=10)
self.assertEqual(1, len(invocations))
invocation = invocations[0]
self.assertEqual('mockpropvalue', invocation['mockprop'])
self.assertEqual(states[0]['id'], invocation['id'])
开发者ID:boreys,项目名称:cloudify-manager,代码行数:13,代码来源:test_workflow.py
示例19: test_deployment_inputs
def test_deployment_inputs(self):
blueprint_id = str(uuid.uuid4())
blueprint = self.client.blueprints.upload(resource("dsl/basic.yaml"), blueprint_id)
inputs = blueprint.plan["inputs"]
self.assertEqual(1, len(inputs))
self.assertTrue("install_agent" in inputs)
self.assertFalse(inputs["install_agent"]["default"])
self.assertTrue(len(inputs["install_agent"]["description"]) > 0)
deployment_id = str(uuid.uuid4())
deployment = self.client.deployments.create(blueprint.id, deployment_id)
self.assertEqual(1, len(deployment.inputs))
self.assertTrue("install_agent" in deployment.inputs)
self.assertFalse(deployment.inputs["install_agent"])
开发者ID:boreys,项目名称:cloudify-manager,代码行数:13,代码来源:test_storage.py
示例20: test_threshold_policy
def test_threshold_policy(self):
dsl_path = resource("dsl/with_policies2.yaml")
deployment, _ = deploy(dsl_path)
self.deployment_id = deployment.id
self.instance_id = self.wait_for_node_instance().id
class Tester(object):
def __init__(self, test_case, threshold, current_executions, current_invocations):
self.test_case = test_case
self.current_invocations = current_invocations
self.current_executions = current_executions
self.threshold = threshold
def publish_above_threshold(self, do_assert):
self.test_case.logger.info("Publish above threshold")
self.test_case.publish(self.threshold + 1)
if do_assert:
self.inc()
self.assertion(upper=True)
def publish_below_threshold(self, do_assert):
self.test_case.logger.info("Publish below threshold")
self.test_case.publish(self.threshold - 1)
if do_assert:
self.inc()
self.assertion(upper=False)
def inc(self):
self.current_executions += 1
self.current_invocations += 1
def assertion(self, upper):
self.test_case.logger.info("waiting for {} executions".format(self.current_executions))
self.test_case.wait_for_executions(self.current_executions)
self.test_case.logger.info("waiting for {} invocations".format(self.current_invocations))
invocations = self.test_case.wait_for_invocations(self.current_invocations)
if upper:
key = "upper"
value = self.threshold + 1
else:
key = "lower"
value = self.threshold - 1
self.test_case.assertEqual(invocations[-1][key], value, "key: {}, expected: {}".format(key, value))
tester = Tester(test_case=self, threshold=100, current_executions=2, current_invocations=0)
for _ in range(2):
tester.publish_above_threshold(do_assert=True)
tester.publish_above_threshold(do_assert=False)
tester.publish_below_threshold(do_assert=True)
tester.publish_below_threshold(do_assert=False)
开发者ID:boreys,项目名称:cloudify-manager,代码行数:51,代码来源:test_policies.py
注:本文中的testenv.resource函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论