• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python utils.update_storage函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中testenv.utils.update_storage函数的典型用法代码示例。如果您正苦于以下问题:Python update_storage函数的具体用法?Python update_storage怎么用?Python update_storage使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了update_storage函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: terminate

def terminate(**kwargs):
    with update_storage(ctx) as data:
        ctx.logger.info('terminating machine: {0}'.format(ctx.node_id))
        if ctx.node_id not in data['machines']:
            raise RuntimeError('machine with id [{0}] does not exist'
                               .format(ctx.node_id))
        del data['machines'][ctx.node_id]
开发者ID:filipposc5,项目名称:cloudify-manager,代码行数:7,代码来源:tasks.py


示例2: saving_operation_info

def saving_operation_info(ctx, op, main_node, second_node=None, **_):
    with update_storage(ctx) as data:
        invocations = data['mock_operation_invocation'] = data.get(
            'mock_operation_invocation', []
        )
        num = data.get('num', 0) + 1
        data['num'] = num

        op_info = {'operation': op, 'num': num}
        if second_node is None:
            op_info.update({
                'node': main_node.node.name,
                'id': main_node.instance.id,
                'target_ids': [r.target.instance.id
                               for r in main_node.instance.relationships]
            })
        else:
            op_info.update({
                'id': main_node.instance.id,
                'source': main_node.node.name,
                'target': second_node.node.name
            })
        invocations.append(op_info)

    client = get_rest_client()
    fail_input = client.deployments.get(ctx.deployment.id).inputs.get(
        'fail', [])
    fail_input = [i for i in fail_input if
                  i.get('workflow') == ctx.workflow_id and
                  i.get('node') == main_node.node.id and
                  i.get('operation') == ctx.operation.name]
    if fail_input:
        raise RuntimeError('TEST_EXPECTED_FAIL')
开发者ID:isaac-s,项目名称:cloudify-manager,代码行数:33,代码来源:tasks.py


示例3: get_resource_operation

def get_resource_operation(ctx, **kwargs):
    resource_path = get_prop('resource_path', ctx, kwargs)
    # trying to retrieve a resource
    res1 = ctx.download_resource(resource_path)
    if not res1:
        raise RuntimeError('Failed to get resource {0}'.format(resource_path))
    with open(res1, 'r') as f:
        res1_data = f.read()
    os.remove(res1)

    # trying to retrieve a resource to a specific location
    tempdir = tempfile.mkdtemp()
    try:
        filepath = os.path.join(tempdir, 'temp-resource-file')
        res2 = ctx.download_resource(resource_path, filepath)
        if not res2:
            raise RuntimeError('Failed to get resource {0} into {1}'.format(
                resource_path, filepath))
        with open(res2, 'r') as f:
            res2_data = f.read()
    finally:
        shutil.rmtree(tempdir)

    with update_storage(ctx) as data:
        data['get_resource_operation_invocation'] = data.get(
            'get_resource_operation_invocation', []
        )
        data['get_resource_operation_invocation'].append({
            'res1_data': res1_data,
            'res2_data': res2_data,
            'custom_filepath': filepath,
            'res2_path': res2
        })
开发者ID:isaac-s,项目名称:cloudify-manager,代码行数:33,代码来源:tasks.py


示例4: make_unreachable

def make_unreachable(ctx, **kwargs):
    with update_storage(ctx) as data:
        data['unreachable_call_order'] = data.get('unreachable_call_order', [])
        data['unreachable_call_order'].append({
            'id': ctx.instance.id,
            'time': time.time()
        })
开发者ID:isaac-s,项目名称:cloudify-manager,代码行数:7,代码来源:tasks.py


示例5: stop

def stop(**kwargs):
    with update_storage(ctx) as data:
        ctx.logger.info('stopping machine: {0}'.format(ctx.node_id))
        if ctx.node_id not in data['machines']:
            raise RuntimeError('machine with id [{0}] does not exist'
                               .format(ctx.node_id))
        data['machines'][ctx.node_id] = NOT_RUNNING
开发者ID:filipposc5,项目名称:cloudify-manager,代码行数:7,代码来源:tasks.py


示例6: mock_operation_from_custom_workflow

def mock_operation_from_custom_workflow(ctx, key, value, **kwargs):
    with update_storage(ctx) as data:
        data['mock_operation_invocation'] = data.get(
            'mock_operation_invocation', []
        )
        data['mock_operation_invocation'].append({
            key: value
        })
开发者ID:zengxianhui,项目名称:cloudify-manager,代码行数:8,代码来源:tasks.py


示例7: restart

def restart(cloudify_agent=None, **_):
    installer = get_backend(cloudify_agent)
    worker_name = installer.agent_name
    installer.restart()
    with update_storage(ctx) as data:
        data[worker_name] = data.get(worker_name, {})
        data[worker_name]['states'] = data[worker_name].get('states', [])
        data[worker_name]['states'].append('restarted')
开发者ID:01000101,项目名称:cloudify-manager,代码行数:8,代码来源:operations.py


示例8: configure

def configure(cloudify_agent=None, **_):
    installer = get_backend(cloudify_agent)
    installer.create()
    worker_name = installer.agent_name
    with update_storage(ctx) as data:
        data[worker_name] = data.get(worker_name, {})
        data[worker_name]['states'] = data[worker_name].get('states', [])
        data[worker_name]['states'].append('configured')
开发者ID:01000101,项目名称:cloudify-manager,代码行数:8,代码来源:operations.py


示例9: uninstall

def uninstall(ctx, **kwargs):
    agent_config = _fix_worker(ctx, **kwargs)
    worker_name = agent_config['name']
    ctx.logger.info('Uninstalling worker {0}'.format(worker_name))
    with update_storage(ctx) as data:
        data[worker_name] = data.get(worker_name, {})
        data[worker_name]['states'] = data[worker_name].get('states', [])
        data[worker_name]['states'].append('uninstalled')
        data[worker_name]['pids'] = []
开发者ID:filipposc5,项目名称:cloudify-manager,代码行数:9,代码来源:tasks.py


示例10: stop_monitor

def stop_monitor(ctx, **kwargs):
    with update_storage(ctx) as data:
        data['monitoring_operations_invocation'] = data.get(
            'monitoring_operations_invocation', []
        )
        data['monitoring_operations_invocation'].append({
            'id': ctx.instance.id,
            'operation': 'stop_monitor'
        })
开发者ID:isaac-s,项目名称:cloudify-manager,代码行数:9,代码来源:tasks.py


示例11: uninstall

def uninstall(cloudify_agent=None, **kwargs):
    installer = get_backend(cloudify_agent)
    installer.uninstall()
    worker_name = installer.agent_name
    with update_storage(ctx) as data:
        data[worker_name] = data.get(worker_name, {})
        data[worker_name]['states'] = data[worker_name].get('states', [])
        data[worker_name]['states'].append('uninstalled')
        data[worker_name]['pids'] = []
开发者ID:nagyist,项目名称:cloudify-manager,代码行数:9,代码来源:tasks.py


示例12: mock_operation_get_instance_ip_from_context

def mock_operation_get_instance_ip_from_context(ctx, **_):
    with update_storage(ctx) as data:
        data['mock_operation_invocation'] = data.get(
            'mock_operation_invocation', []
        )
        data['mock_operation_invocation'].append((
            ctx.node.name, ctx.instance.host_ip
        ))

    return True
开发者ID:isaac-s,项目名称:cloudify-manager,代码行数:10,代码来源:tasks.py


示例13: provision

def provision(**kwargs):
    with update_storage(ctx) as data:
        machines = data.get('machines', {})
        if ctx.node_id in machines:
            raise NonRecoverableError('machine with id [{0}] already exists'
                                      .format(ctx.node_id))
        if ctx.properties.get('test_ip'):
            ctx.runtime_properties['ip'] = ctx.properties['test_ip']
        machines[ctx.node_id] = NOT_RUNNING
        data['machines'] = machines
开发者ID:filipposc5,项目名称:cloudify-manager,代码行数:10,代码来源:tasks.py


示例14: mock_operation_get_instance_ip_of_related_from_context

def mock_operation_get_instance_ip_of_related_from_context(ctx, **_):
    with update_storage(ctx) as data:
        data['mock_operation_invocation'] = data.get(
            'mock_operation_invocation', []
        )
        data['mock_operation_invocation'].append((
            '{}_rel'.format(ctx.node_name), ctx.related.host_ip
        ))

    return True
开发者ID:filipposc5,项目名称:cloudify-manager,代码行数:10,代码来源:tasks.py


示例15: mock_operation_get_instance_ip

def mock_operation_get_instance_ip(ctx, **kwargs):
    with update_storage(ctx) as data:
        data['mock_operation_invocation'] = data.get(
            'mock_operation_invocation', []
        )
        data['mock_operation_invocation'].append((
            ctx.node.name, get_node_instance_ip(ctx.instance.id)
        ))

    return True
开发者ID:isaac-s,项目名称:cloudify-manager,代码行数:10,代码来源:tasks.py


示例16: retrieve_template

def retrieve_template(ctx, rendering_tests_demo_conf, mode, **_):
    if mode == 'get':
        resource = \
            ctx.get_resource_and_render(rendering_tests_demo_conf)
    else:
        resource = \
            ctx.download_resource_and_render(rendering_tests_demo_conf)

    with update_storage(ctx) as data:
        data['rendered_resource'] = resource
开发者ID:isaac-s,项目名称:cloudify-manager,代码行数:10,代码来源:tasks.py


示例17: put_workflow_node_instance

def put_workflow_node_instance(ctx,
                               modification,
                               relationships):
    with update_storage(ctx) as data:
        state = data.get('state', {})
        data['state'] = state
        state[ctx.instance.id] = {
            'node_id': ctx.node.id,
            'modification': modification,
            'relationships': relationships
        }
开发者ID:isaac-s,项目名称:cloudify-manager,代码行数:11,代码来源:tasks.py


示例18: retry

def retry(ctx, retry_count=1, retry_after=1, **kwargs):
    with update_storage(ctx) as data:
        invocations = data.get('retry_invocations', 0)
        if invocations != ctx.operation.retry_number:
            raise NonRecoverableError(
                'invocations({0}) != ctx.operation.retry_number'
                '({1})'.format(invocations, ctx.operation.retry_number))
        data['retry_invocations'] = invocations + 1
    if ctx.operation.retry_number < retry_count:
        return ctx.operation.retry(message='Retrying operation',
                                   retry_after=retry_after)
开发者ID:isaac-s,项目名称:cloudify-manager,代码行数:11,代码来源:tasks.py


示例19: make_reachable

def make_reachable(ctx, **kwargs):
    state_info = {
        'id': ctx.instance.id,
        'time': time.time(),
        'capabilities': ctx.capabilities.get_all()
    }
    ctx.logger.info('Appending to state [node_id={0}, state={1}]'
                    .format(ctx.instance.id, state_info))
    with update_storage(ctx) as data:
        data['state'] = data.get('state', [])
        data['state'].append(state_info)
开发者ID:isaac-s,项目名称:cloudify-manager,代码行数:11,代码来源:tasks.py


示例20: delete

def delete(cloudify_agent=None, **_):
    installer = get_backend(cloudify_agent)
    installer.delete()
    worker_name = installer.agent_name
    with update_storage(ctx) as data:
        data[worker_name] = data.get(worker_name, {})
        data[worker_name]['states'] = data[worker_name].get('states', [])
        data[worker_name]['states'].append('deleted')

    if ctx.type == context.NODE_INSTANCE:
        del ctx.instance.runtime_properties['cloudify_agent']
开发者ID:KarnYong,项目名称:cloudify-manager,代码行数:11,代码来源:operations.py



注:本文中的testenv.utils.update_storage函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python tester.dump_classifier_and_data函数代码示例发布时间:2022-05-27
下一篇:
Python utils.resource函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap