• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python testenv.resource函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中testenv.resource函数的典型用法代码示例。如果您正苦于以下问题:Python resource函数的具体用法?Python resource怎么用?Python resource使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了resource函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _local_task_fail_impl

 def _local_task_fail_impl(self, wf_name):
     if self.do_get:
         deploy(resource('dsl/workflow_api.yaml'), wf_name,
                parameters={'do_get': self.do_get})
     else:
         self.assertRaises(RuntimeError,
                           deploy,
                           resource('dsl/workflow_api.yaml'),
                           wf_name,
                           parameters={'do_get': self.do_get})
开发者ID:boreys,项目名称:cloudify-manager,代码行数:10,代码来源:test_wf_api.py


示例2: _execute_and_cancel_execution

    def _execute_and_cancel_execution(self, workflow_id, force=False,
                                      wait_for_termination=True,
                                      is_wait_for_asleep_node=True):
        dsl_path = resource('dsl/sleep_workflows.yaml')
        _id = uuid.uuid1()
        blueprint_id = 'blueprint_{0}'.format(_id)
        deployment_id = 'deployment_{0}'.format(_id)
        self.client.blueprints.upload(dsl_path, blueprint_id)
        self.client.deployments.create(blueprint_id, deployment_id)
        do_retries(verify_workers_installation_complete, 30,
                   deployment_id=deployment_id)
        execution = self.client.deployments.execute(
            deployment_id, workflow_id)

        node_inst_id = self.client.node_instances.list(deployment_id)[0].id

        if is_wait_for_asleep_node:
            for retry in range(30):
                if self.client.node_instances.get(
                        node_inst_id).state == 'asleep':
                    break
                time.sleep(1)
            else:
                raise RuntimeError("Execution was expected to go"
                                   " into 'sleeping' status")

        execution = self.client.executions.cancel(execution.id, force)
        expected_status = Execution.FORCE_CANCELLING if force else \
            Execution.CANCELLING
        self.assertEquals(expected_status, execution.status)
        if wait_for_termination:
            wait_for_execution_to_end(execution)
            execution = self.client.executions.get(execution.id)
        return execution
开发者ID:mahak,项目名称:cloudify-manager,代码行数:34,代码来源:test_executions.py


示例3: test_illegal_non_graph_to_graph_mode

 def test_illegal_non_graph_to_graph_mode(self):
     if not self.do_get:
         # no need to run twice
         return
     self.assertRaises(RuntimeError, deploy,
                       resource('dsl/workflow_api.yaml'),
                       self._testMethodName)
开发者ID:boreys,项目名称:cloudify-manager,代码行数:7,代码来源:test_wf_api.py


示例4: test_cancel_on_wait_for_task_termination

 def test_cancel_on_wait_for_task_termination(self):
     _, eid = deploy(
         resource('dsl/workflow_api.yaml'), self._testMethodName,
         parameters={'do_get': self.do_get}, wait_for_execution=False)
     self.wait_for_execution_status(eid, status=Execution.STARTED)
     self.client.executions.cancel(eid)
     self.wait_for_execution_status(eid, status=Execution.CANCELLED)
开发者ID:boreys,项目名称:cloudify-manager,代码行数:7,代码来源:test_wf_api.py


示例5: test_policies

    def test_policies(self):
        dsl_path = resource("dsl/with_policies.yaml")
        deployment, _ = deploy(dsl_path)

        def assertion():
            instances = self.client.node_instances.list(deployment.id)
            self.assertEqual(1, len(instances))
        self.do_assertions(assertion)

        instance = self.client.node_instances.list(deployment.id)[0]
        self.publish_riemann_event(deployment.id,
                                   node_name='node',
                                   node_id=instance.id,
                                   metric=123)

        def assertion():
            executions = self.client.executions.list(deployment.id)
            self.assertEqual(3, len(executions))
            invocations = send_task(testmock_get_invocations).get(timeout=10)
            self.assertEqual(2, len(invocations))
            instances = self.client.node_instances.list(deployment.id)
            self.assertEqual(1, len(instances))
            instance = instances[0]
            self.assertEqual(instance.id, invocations[0]['node_id'])
            self.assertEqual(123, invocations[1]['metric'])
        self.do_assertions(assertion)
开发者ID:mahak,项目名称:cloudify-manager,代码行数:26,代码来源:test_policies.py


示例6: test_invalid_dsl

    def test_invalid_dsl(self):
        # note: this actually tests the validation part of the "deploy" command
        dsl_path = resource("dsl/invalid-dsl.yaml")

        with self.assertRaises(Exception) as cm:
            deploy(dsl_path)
            self.assertTrue('invalid blueprint' in
                            cm.exception.message.lower(), cm.exception.message)
开发者ID:Fewbytes,项目名称:cloudify-manager,代码行数:8,代码来源:test_validate_dsl.py


示例7: test_cancel_on_task_retry_interval

 def test_cancel_on_task_retry_interval(self):
     self.configure(retries=2, interval=1000000)
     _, eid = deploy(
         resource('dsl/workflow_api.yaml'), self._testMethodName,
         parameters={'do_get': self.do_get}, wait_for_execution=False)
     self.wait_for_execution_status(eid, status=Execution.STARTED)
     self.client.executions.cancel(eid)
     self.wait_for_execution_status(eid, status=Execution.CANCELLED)
开发者ID:boreys,项目名称:cloudify-manager,代码行数:8,代码来源:test_wf_api.py


示例8: setUp

 def setUp(self):
     super(RestAPITest, self).setUp()
     dsl_path = resource('dsl/basic.yaml')
     self.node_id = 'webserver_host'
     self.blueprint_id = 'blueprint-' + str(uuid.uuid4())
     self.deployment_id = 'deployment-' + str(uuid.uuid4())
     self.client.blueprints.upload(dsl_path, self.blueprint_id)
     self.client.deployments.create(self.blueprint_id, self.deployment_id)
开发者ID:boreys,项目名称:cloudify-manager,代码行数:8,代码来源:test_rest_api.py


示例9: test_operation_mapping_override

 def test_operation_mapping_override(self):
     dsl_path = resource("dsl/operation_mapping.yaml")
     deployment, _ = deploy(dsl_path, 'workflow2')
     invocations = send_task(get_mock_operation_invocations).get()
     self.assertEqual(3, len(invocations))
     for invocation in invocations:
         self.assertEqual(1, len(invocation))
         self.assertEqual(invocation['test_key'], 'overridden_test_value')
开发者ID:boreys,项目名称:cloudify-manager,代码行数:8,代码来源:test_operation_mapping.py


示例10: test_fail_remote_task_eventual_success

    def test_fail_remote_task_eventual_success(self):
        deploy(resource('dsl/workflow_api.yaml'), self._testMethodName,
               parameters={'do_get': self.do_get})

        # testing workflow remote task
        invocations = send_task(get_fail_invocations).get()
        self.assertEqual(3, len(invocations))
        for i in range(len(invocations) - 1):
            self.assertLessEqual(1, invocations[i+1] - invocations[i])
开发者ID:boreys,项目名称:cloudify-manager,代码行数:9,代码来源:test_wf_api.py


示例11: test_get_blueprint

    def test_get_blueprint(self):
        dsl_path = resource("dsl/basic.yaml")
        blueprint_id = str(uuid.uuid4())
        deployment, _ = deploy(dsl_path, blueprint_id=blueprint_id)

        self.assertEqual(blueprint_id, deployment.blueprint_id)
        blueprint = self.client.blueprints.get(blueprint_id)
        self.assertEqual(blueprint_id, blueprint.id)
        self.assertTrue(len(blueprint['plan']) > 0)
开发者ID:boreys,项目名称:cloudify-manager,代码行数:9,代码来源:test_workflow.py


示例12: _test_retries_and_retry_interval_impl

 def _test_retries_and_retry_interval_impl(self,
                                           blueprint,
                                           retries,
                                           retry_interval,
                                           expected_interval,
                                           expected_retries,
                                           invocations_task,
                                           expect_failure=False):
     self.configure(retries=retries, retry_interval=retry_interval)
     if expect_failure:
         self.assertRaises(RuntimeError, deploy, resource(blueprint))
     else:
         deploy(resource(blueprint))
     invocations = send_task(invocations_task).get()
     self.assertEqual(expected_retries + 1, len(invocations))
     for i in range(len(invocations) - 1):
         self.assertLessEqual(expected_interval,
                              invocations[i+1] - invocations[i])
开发者ID:boreys,项目名称:cloudify-manager,代码行数:18,代码来源:test_task_retries.py


示例13: test_execute_operation_failure

 def test_execute_operation_failure(self):
     from plugins.cloudmock.tasks import set_raise_exception_on_start
     send_task(set_raise_exception_on_start).get(timeout=10)
     dsl_path = resource("dsl/basic.yaml")
     try:
         deploy(dsl_path)
         self.fail('expected exception')
     except Exception:
         pass
开发者ID:boreys,项目名称:cloudify-manager,代码行数:9,代码来源:test_workflow.py


示例14: test_start_monitor_node_operation

 def test_start_monitor_node_operation(self):
     dsl_path = resource("dsl/hardcoded_operation_properties.yaml")
     deploy(dsl_path)
     from plugins.testmockoperations.tasks import \
         get_monitoring_operations_invocation
     invocations = send_task(get_monitoring_operations_invocation)\
         .get(timeout=10)
     self.assertEqual(1, len(invocations))
     invocation = invocations[0]
     self.assertEqual('start_monitor', invocation['operation'])
开发者ID:boreys,项目名称:cloudify-manager,代码行数:10,代码来源:test_workflow.py


示例15: test_deployment_workflows

 def test_deployment_workflows(self):
     dsl_path = resource("dsl/custom_workflow_mapping.yaml")
     deployment, _ = deploy(dsl_path)
     deployment_id = deployment.id
     workflows = self.client.deployments.get(deployment_id).workflows
     self.assertEqual(3, len(workflows))
     wf_ids = [x.name for x in workflows]
     self.assertTrue('uninstall' in wf_ids)
     self.assertTrue('install' in wf_ids)
     self.assertTrue('custom' in wf_ids)
开发者ID:mahak,项目名称:cloudify-manager,代码行数:10,代码来源:test_deployment_workflows.py


示例16: test_plugin_get_resource

    def test_plugin_get_resource(self):
        dsl_path = resource("dsl/get_resource_in_plugin.yaml")
        deploy(dsl_path)
        from plugins.testmockoperations.tasks import \
            get_resource_operation_invocations as testmock_get_invocations
        invocations = send_task(testmock_get_invocations).get(
            timeout=10)
        self.assertEquals(1, len(invocations))
        invocation = invocations[0]
        with open(resource("dsl/basic.yaml")) as f:
            basic_data = f.read()

        # checking the resources are the correct data
        self.assertEquals(basic_data, invocation['res1_data'])
        self.assertEquals(basic_data, invocation['res2_data'])

        # checking the custom filepath provided is indeed where the second
        # resource was saved
        self.assertEquals(invocation['custom_filepath'],
                          invocation['res2_path'])
开发者ID:boreys,项目名称:cloudify-manager,代码行数:20,代码来源:test_workflow.py


示例17: test_execute_operation

    def test_execute_operation(self):
        dsl_path = resource("dsl/basic.yaml")
        blueprint_id = self.id()
        deployment, _ = deploy(dsl_path, blueprint_id=blueprint_id)

        self.assertEqual(blueprint_id, deployment.blueprint_id)

        from plugins.cloudmock.tasks import get_machines
        result = send_task(get_machines)
        machines = result.get(timeout=10)

        self.assertEquals(1, len(machines))
开发者ID:mahak,项目名称:cloudify-manager,代码行数:12,代码来源:test_workflow.py


示例18: test_inject_properties_to_operation

 def test_inject_properties_to_operation(self):
     dsl_path = resource("dsl/hardcoded_operation_properties.yaml")
     deploy(dsl_path)
     from plugins.testmockoperations.tasks import get_state as \
         testmock_get_state
     states = send_task(testmock_get_state).get(timeout=10)
     from plugins.testmockoperations.tasks import \
         get_mock_operation_invocations as testmock_get__invocations
     invocations = send_task(testmock_get__invocations).get(timeout=10)
     self.assertEqual(1, len(invocations))
     invocation = invocations[0]
     self.assertEqual('mockpropvalue', invocation['mockprop'])
     self.assertEqual(states[0]['id'], invocation['id'])
开发者ID:boreys,项目名称:cloudify-manager,代码行数:13,代码来源:test_workflow.py


示例19: test_deployment_inputs

 def test_deployment_inputs(self):
     blueprint_id = str(uuid.uuid4())
     blueprint = self.client.blueprints.upload(resource("dsl/basic.yaml"), blueprint_id)
     inputs = blueprint.plan["inputs"]
     self.assertEqual(1, len(inputs))
     self.assertTrue("install_agent" in inputs)
     self.assertFalse(inputs["install_agent"]["default"])
     self.assertTrue(len(inputs["install_agent"]["description"]) > 0)
     deployment_id = str(uuid.uuid4())
     deployment = self.client.deployments.create(blueprint.id, deployment_id)
     self.assertEqual(1, len(deployment.inputs))
     self.assertTrue("install_agent" in deployment.inputs)
     self.assertFalse(deployment.inputs["install_agent"])
开发者ID:boreys,项目名称:cloudify-manager,代码行数:13,代码来源:test_storage.py


示例20: test_threshold_policy

    def test_threshold_policy(self):
        dsl_path = resource("dsl/with_policies2.yaml")
        deployment, _ = deploy(dsl_path)
        self.deployment_id = deployment.id
        self.instance_id = self.wait_for_node_instance().id

        class Tester(object):
            def __init__(self, test_case, threshold, current_executions, current_invocations):
                self.test_case = test_case
                self.current_invocations = current_invocations
                self.current_executions = current_executions
                self.threshold = threshold

            def publish_above_threshold(self, do_assert):
                self.test_case.logger.info("Publish above threshold")
                self.test_case.publish(self.threshold + 1)
                if do_assert:
                    self.inc()
                    self.assertion(upper=True)

            def publish_below_threshold(self, do_assert):
                self.test_case.logger.info("Publish below threshold")
                self.test_case.publish(self.threshold - 1)
                if do_assert:
                    self.inc()
                    self.assertion(upper=False)

            def inc(self):
                self.current_executions += 1
                self.current_invocations += 1

            def assertion(self, upper):
                self.test_case.logger.info("waiting for {} executions".format(self.current_executions))
                self.test_case.wait_for_executions(self.current_executions)
                self.test_case.logger.info("waiting for {} invocations".format(self.current_invocations))
                invocations = self.test_case.wait_for_invocations(self.current_invocations)
                if upper:
                    key = "upper"
                    value = self.threshold + 1
                else:
                    key = "lower"
                    value = self.threshold - 1
                self.test_case.assertEqual(invocations[-1][key], value, "key: {}, expected: {}".format(key, value))

        tester = Tester(test_case=self, threshold=100, current_executions=2, current_invocations=0)

        for _ in range(2):
            tester.publish_above_threshold(do_assert=True)
            tester.publish_above_threshold(do_assert=False)
            tester.publish_below_threshold(do_assert=True)
            tester.publish_below_threshold(do_assert=False)
开发者ID:boreys,项目名称:cloudify-manager,代码行数:51,代码来源:test_policies.py



注:本文中的testenv.resource函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.deploy函数代码示例发布时间:2022-05-27
下一篇:
Python testenv.main函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap