• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python platform.get_services_core函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中volttron.platform.get_services_core函数的典型用法代码示例。如果您正苦于以下问题:Python get_services_core函数的具体用法?Python get_services_core怎么用?Python get_services_core使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_services_core函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: agent

def agent(request, volttron_instance):
    """Build MasterDriverAgent and add DNP3 driver config to it."""

    def update_config(agent_id, name, value, cfg_type):
        test_agent.vip.rpc.call('config.store', 'manage_store', agent_id, name, value, config_type=cfg_type)

    test_agent = volttron_instance.build_agent()

    # Build and start DNP3Agent
    dnp3_agent_uuid = volttron_instance.install_agent(agent_dir=get_services_core("DNP3Agent"),
                                                      config_file=DNP3_AGENT_CONFIG,
                                                      vip_identity=DNP3_AGENT_ID,
                                                      start=True)

    # Build and start MasterDriverAgent
    test_agent.vip.rpc.call('config.store', 'manage_delete_store', MASTER_DRIVER_AGENT_ID)
    update_config(MASTER_DRIVER_AGENT_ID, 'devices/dnp3', DRIVER_CONFIG_STRING, 'json')
    update_config(MASTER_DRIVER_AGENT_ID, 'dnp3.csv', REGISTRY_CONFIG_STRING, 'csv')
    master_uuid = volttron_instance.install_agent(agent_dir=get_services_core("MasterDriverAgent"),
                                                  config_file={},
                                                  start=True)

    gevent.sleep(3)                # Wait for the agent to start and start the devices

    def stop():
        volttron_instance.stop_agent(master_uuid)
        volttron_instance.stop_agent(dnp3_agent_uuid)
        test_agent.core.stop()

    request.addfinalizer(stop)
    return test_agent
开发者ID:Kisensum,项目名称:volttron,代码行数:31,代码来源:test_dnp3_driver.py


示例2: test_agent

def test_agent(request, get_volttron_instances):
    """
        Fixture that initializes VOLTTRON agents for ReferenceAppAgent test cases.

        Start a VOLTTRON instance running a web agent,
        then install and start an OpenADRVenAgent, a ReferenceAppAgent, a SimulationDriverAgent,
        an ActuatorAgent, and a test agent.
    """
    instance = get_volttron_instances(1, should_start=False)
    start_wrapper_platform(instance, with_http=True)

    # Install and start a WebAgent.
    web_agent = _build_web_agent(instance.volttron_home)
    gevent.sleep(1)
    web_agent_uuid = instance.install_agent(agent_dir=web_agent)

    global web_server_address
    web_server_address = instance.bind_web_address

    global volttron_home
    volttron_home = instance.volttron_home

    def issue_config_rpc(test_agt, config_request, *args):
        return test_agt.vip.rpc.call('config.store', config_request, SIMULATION_DRIVER_ID, *args).get(timeout=10)

    def start_agent(id, dir, config):
        return instance.install_agent(vip_identity=id, agent_dir=dir, config_file=config, start=True)

    test_agt = instance.build_agent(identity=TEST_AGENT_ID)

    issue_config_rpc(test_agt, 'manage_delete_store')
    for param_dict in DRIVER_PARAMS:
        device_id = param_dict['id']
        issue_config_rpc(test_agt, 'manage_store', 'devices/campus1/building1/{}'.format(device_id), param_dict['config'], 'json')
        issue_config_rpc(test_agt, 'manage_store', '{}.csv'.format(device_id), param_dict['points'], 'csv')

    clock_uuid = start_agent(SIMULATION_CLOCK_ID, 'applications/kisensum/Simulation/SimulationClockAgent', SIMULATION_CLOCK_CONFIG)
    sim_driver_uuid = start_agent(SIMULATION_DRIVER_ID, 'applications/kisensum/Simulation/SimulationDriverAgent', {})
    actuator_uuid = start_agent(ACTUATOR_ID, get_services_core('ActuatorAgent'), ACTUATOR_CONFIG)
    ven_uuid = start_agent(VEN_AGENT_ID, get_services_core('OpenADRVenAgent'), VEN_AGENT_CONFIG)
    ref_app_uuid = start_agent(REFERENCE_APP_ID, 'applications/kisensum/ReferenceAppAgent', REFERENCE_APP_CONFIG)

    def stop():
        instance.stop_agent(actuator_uuid)
        instance.stop_agent(ref_app_uuid)
        instance.stop_agent(sim_driver_uuid)
        instance.stop_agent(ven_uuid)
        instance.stop_agent(clock_uuid)
        instance.stop_agent(web_agent_uuid)
        test_agt.core.stop()
        instance.shutdown_platform()

    gevent.sleep(10)

    request.addfinalizer(stop)

    yield test_agt
开发者ID:VOLTTRON,项目名称:volttron-applications,代码行数:57,代码来源:test_ref_app.py


示例3: failover

def failover(request, get_volttron_instances):
    global primary_config
    global secondary_config
    global primary_failover
    global secondary_failover
    global vc_uuid

    primary, secondary, vc = get_volttron_instances(3)
    primary.allow_all_connections()
    secondary.allow_all_connections()
    vc.allow_all_connections()

    # configure vc
    vc_uuid = vc.install_agent(
        agent_dir=get_services_core("VolttronCentralPlatform"))

    # configure primary
    primary_platform = primary.install_agent(
        agent_dir=get_services_core("VolttronCentralPlatform"))
    # register with vc
    primary_listener = primary.install_agent(agent_dir=get_examples("ListenerAgent"),
                                             vip_identity="listener",
                                             start=False)

    primary_config["remote_vip"] = tcp_to(secondary)
    primary_failover = primary.install_agent(agent_dir=get_services_core("FailoverAgent"),
                                             config_file=primary_config)

    # configure secondary
    secondary_platform = secondary.install_agent(agent_dir=get_services_core("VolttronCentralPlatform"))
    # register with vc
    secondary_listener = secondary.install_agent(agent_dir=get_examples("ListenerAgent"),
                                                 vip_identity="listener",
                                                 start=False)

    secondary_config["remote_vip"] = tcp_to(primary)
    secondary_failover = secondary.install_agent(agent_dir=get_services_core("FailoverAgent"),
                                                 config_file=secondary_config)

    def stop():
        vc.stop_agent(vc_uuid)
        vc.shutdown_platform()

        primary.stop_agent(primary_failover)
        primary.stop_agent(primary_platform)
        primary.stop_agent(primary_listener)
        primary.shutdown_platform()

        secondary.stop_agent(secondary_failover)
        secondary.stop_agent(secondary_platform)
        secondary.stop_agent(secondary_listener)
        secondary.shutdown_platform()

    request.addfinalizer(stop)

    return primary, secondary, vc
开发者ID:Kisensum,项目名称:volttron,代码行数:56,代码来源:test_failover.py


示例4: agent

def agent(request, volttron_instance_module_web):
    test_agent = volttron_instance_module_web.build_agent()

    # Configure a SEP2 device in the Master Driver
    test_agent.vip.rpc.call('config.store', 'manage_delete_store', 'platform.driver').get(timeout=10)
    test_agent.vip.rpc.call('config.store', 'manage_store', 'platform.driver',
                            'devices/{}'.format(DRIVER_NAME),
                            """{
                                "driver_config": {
                                    "sfdi": "097935300833",
                                    "sep2_agent_id": "test_sep2agent"
                                },
                                "campus": "campus",
                                "building": "building",
                                "unit": "sep2",
                                "driver_type": "sep2",
                                "registry_config": "config://sep2.csv",
                                "interval": 15,
                                "timezone": "US/Pacific",
                                "heart_beat_point": "Heartbeat"
                            }""",
                            'json').get(timeout=10)
    test_agent.vip.rpc.call('config.store', 'manage_store', 'platform.driver',
                            'sep2.csv',
                            REGISTRY_CONFIG_STRING,
                            'csv').get(timeout=10)

    # Install and start a SEP2Agent
    sep2_id = volttron_instance_module_web.install_agent(agent_dir=get_services_core("SEP2Agent"),
                                                         config_file=TEST_SEP2_CONFIG,
                                                         vip_identity='test_sep2agent',
                                                         start=True)
    print('sep2 agent id: ', sep2_id)

    # Install and start a MasterDriverAgent
    md_id = volttron_instance_module_web.install_agent(agent_dir=get_services_core("MasterDriverAgent"),
                                                       config_file={},
                                                       start=True)
    print('master driver agent id: ', md_id)

    global web_address
    web_address = volttron_instance_module_web.bind_web_address

    def stop():
        volttron_instance_module_web.stop_agent(md_id)
        volttron_instance_module_web.stop_agent(sep2_id)
        test_agent.core.stop()

    gevent.sleep(3)        # wait for agents and devices to start

    request.addfinalizer(stop)

    return test_agent
开发者ID:Kisensum,项目名称:volttron,代码行数:53,代码来源:test_sep2_agent.py


示例5: test_resinstall_agent

def test_resinstall_agent(volttron_instance):
    mysql_config = {
        "connection": {
            "type": "mysql",
            "params": {
                "host": "localhost",
                "port": 3306,
                "database": "test_historian",
                "user": "historian",
                "passwd": "historian"
            }
        }
    }
    for i in range(0,50):
        print("Counter: {}".format(i))
        # auuid = volttron_instance.install_agent(
        #     agent_dir=get_examples("ListenerAgent",
        #     vip_identity='test_listener',
        #     start=True)
        auuid = volttron_instance.install_agent(
            agent_dir=get_services_core("SQLHistorian"),
            config_file=mysql_config,
            start=True,
            vip_identity='test_historian')
        assert volttron_instance.is_agent_running(auuid)
        volttron_instance.remove_agent(auuid)
开发者ID:Kisensum,项目名称:volttron,代码行数:26,代码来源:test_platformwrapper.py


示例6: test_poll_valid

def test_poll_valid(volttron_instance, publish_agent, locations, result_topics):
    global publish_agent_v2, API_KEY
    uuid = None
    try:
        config = {
            "operation_mode": 2,
            "wu_api_key": API_KEY,
            "locations": locations,
            "poll_time": 20
        }
        uuid = volttron_instance.install_agent(
            vip_identity='weather_agent',
            agent_dir=get_services_core("WeatherAgent"),
            config_file=config,
            start=False)

        # reset mock to ignore any previous callback
        publish_agent.process_poll_result.reset_mock()
        publish_agent.process_error.reset_mock()
        volttron_instance.start_agent(uuid)
        gevent.sleep(10)
        assert publish_agent.process_poll_result.call_count == len(locations)
        assert publish_agent.process_error.call_count == 0
        call_args_list = publish_agent.process_poll_result.call_args_list

        i = 0
        for args in call_args_list:
            assert args[0][3] == result_topics[i]
            assert_weather_response(args[0])
            i += 1
    finally:
        if uuid:
            volttron_instance.remove_agent(uuid)
开发者ID:Kisensum,项目名称:volttron,代码行数:33,代码来源:test_weather_agent.py


示例7: add_forward_historian

def add_forward_historian(wrapper, config={}, **kwargs):
    agent_uuid = wrapper.install_agent(
        config_file=config,
        agent_dir=get_services_core("ForwardHistorian"),
        **kwargs
    )
    return agent_uuid
开发者ID:Kisensum,项目名称:volttron,代码行数:7,代码来源:agent_additions.py


示例8: test_reconnect_forwarder

def test_reconnect_forwarder(get_volttron_instances):
    from_instance, to_instance = get_volttron_instances(2, True)
    to_instance.allow_all_connections()

    publisher = from_instance.build_agent()
    receiver = to_instance.build_agent()

    forwarder_config = deepcopy(BASE_FORWARD_CONFIG)
    #forwardtoaddr = build_vip_address(to_instance, receiver)
    #print("FORWARD ADDR: {}".format(forwardtoaddr))
    forwarder_config['destination-vip'] = to_instance.vip_address
    forwarder_config['destination-serverkey'] = to_instance.keystore.public

    fuuid = from_instance.install_agent(
        agent_dir=get_services_core("ForwardHistorian"),start=True,
        config_file=forwarder_config)
    assert from_instance.is_agent_running(fuuid)
    print('Before Subscribing')
    receiver.vip.pubsub.subscribe('pubsub', '', callback=onmessage)
    publisher.vip.pubsub.publish('pubsub', 'stuff', message='Fuzzy')
    gevent.sleep(.2)

    num_messages = 5
    for i in range(num_messages):
        do_publish(publisher)

    for i in range(len(publishedmessages)):
        assert allforwardedmessage[i] == publishedmessages[i]
开发者ID:VOLTTRON,项目名称:volttron,代码行数:28,代码来源:test_forward_historian.py


示例9: agent

def agent(request, volttron_instance):
    """Build the test agent for rpc call."""

    test_agent = volttron_instance.build_agent()

    add_definitions_to_config_store(test_agent)

    print('Installing Mesa Agent')
    os.environ['AGENT_MODULE'] = 'dnp3.mesa.agent'
    agent_id = volttron_instance.install_agent(agent_dir=get_services_core("DNP3Agent"),
                                               config_file=MESA_AGENT_CONFIG,
                                               vip_identity=MESA_AGENT_ID,
                                               start=True)

    # Subscribe to MESA functions
    test_agent.vip.pubsub.subscribe(peer='pubsub',
                                    prefix='mesa/function',
                                    callback=onmessage)

    def stop():
        """Stop test agent."""
        if volttron_instance.is_running():
            volttron_instance.stop_agent(agent_id)
            volttron_instance.remove_agent(agent_id)
            test_agent.core.stop()

    gevent.sleep(3)        # wait for agents and devices to start

    request.addfinalizer(stop)

    return test_agent
开发者ID:VOLTTRON,项目名称:volttron,代码行数:31,代码来源:test_mesa_agent.py


示例10: vc_instance

def vc_instance(request, volttron_instance1_web):
    """
    Creates an instance of volttron with a `VolttronCentral` agent
    already installed and started.

    :returns tuple:
        - the PlatformWrapper
        - the uuid of the `VolttronCentral` agent.
        - the jsonrpc address to be used for communicating with the
          `VolttronCentral` agent.
    """
    agent_uuid = volttron_instance1_web.install_agent(
        agent_dir=get_services_core("VolttronCentral"),
        config_file=VC_CONFIG,
        start=True
    )

    rpc_addr = volttron_instance1_web.jsonrpc_endpoint

    # Allow all incoming connections that are encrypted.
    volttron_instance1_web.allow_all_connections()

    def cleanup():
        print('Cleanup vc_instance')
        volttron_instance1_web.remove_agent(agent_uuid)
        print_log(volttron_instance1_web.volttron_home)

    request.addfinalizer(cleanup)
    return volttron_instance1_web, agent_uuid, rpc_addr
开发者ID:Kisensum,项目名称:volttron,代码行数:29,代码来源:vc_fixtures.py


示例11: forwarder

def forwarder(request, volttron_instances):
    #print "Fixture forwarder"
    global volttron_instance1, volttron_instance2

    global forwarder_uuid, forwarder_config
    # 1. Update destination address in forwarder configuration

    volttron_instance1.allow_all_connections()
    volttron_instance2.allow_all_connections()

    # setup destination address to include keys
    known_hosts_file = os.path.join(volttron_instance1.volttron_home, 'known_hosts')
    known_hosts = KnownHostsStore(known_hosts_file)
    known_hosts.add(volttron_instance2.vip_address, volttron_instance2.serverkey)

    forwarder_config["destination-vip"] = volttron_instance2.vip_address
    forwarder_config["destination-serverkey"] = volttron_instance2.serverkey

    # 1: Install historian agent
    # Install and start sqlhistorian agent in instance2
    forwarder_uuid = volttron_instance1.install_agent(
        agent_dir=get_services_core("ForwardHistorian"),
        config_file=forwarder_config,
        start=True)
    print("forwarder agent id: ", forwarder_uuid)
开发者ID:Kisensum,项目名称:volttron,代码行数:25,代码来源:test_forwardhistorian.py


示例12: simple_failover

def simple_failover(request, get_volttron_instances):
    global simple_primary_config
    global simple_secondary_config
    global uuid_primary
    global uuid_secondary
    global listener_primary

    primary, secondary = get_volttron_instances(2)

    primary.allow_all_connections()
    secondary.allow_all_connections()

    # configure primary
    listener_primary = primary.install_agent(agent_dir=get_examples("ListenerAgent"),
                                             vip_identity="listener",
                                             start=False)

    simple_primary_config["remote_vip"] = secondary.vip_address
    simple_primary_config["remote_serverkey"] = secondary.serverkey
    uuid_primary = primary.install_agent(agent_dir=get_services_core("FailoverAgent"),
                                         config_file=simple_primary_config)

    # configure secondary
    listener_secondary = secondary.install_agent(agent_dir=get_examples("ListenerAgent"),
                                                 vip_identity="listener",
                                                 start=False)

    simple_secondary_config["remote_vip"] = primary.vip_address
    simple_secondary_config["remote_serverkey"] = primary.serverkey
    uuid_secondary = secondary.install_agent(agent_dir=get_services_core("FailoverAgent"),
                                             config_file=simple_secondary_config)

    gevent.sleep(SLEEP_TIME)
    assert all_agents_running(primary)
    assert not all_agents_running(secondary)

    def cleanup():
        primary.stop_agent(uuid_primary)
        primary.stop_agent(listener_primary)
        primary.shutdown_platform()

        secondary.stop_agent(uuid_secondary)
        secondary.stop_agent(listener_secondary)
        secondary.shutdown_platform()

    request.addfinalizer(cleanup)
    return primary, secondary
开发者ID:Kisensum,项目名称:volttron,代码行数:47,代码来源:test_simple_failover.py


示例13: add_volttron_central_platform

def add_volttron_central_platform(wrapper, config={}, **kwargs):
    print('Adding vcp to {}'.format(wrapper.vip_address))
    agent_uuid = wrapper.install_agent(
        config_file=config,
        agent_dir=get_services_core("VolttronCentralPlatform"),
        vip_identity=VOLTTRON_CENTRAL_PLATFORM
    )
    return agent_uuid
开发者ID:Kisensum,项目名称:volttron,代码行数:8,代码来源:agent_additions.py


示例14: add_mongohistorian

def add_mongohistorian(wrapper, config, vip_identity='platform.historian',
                       **kwargs):
    agent_uuid = wrapper.install_agent(
        config_file=config,
        agent_dir=get_services_core("MongodbHistorian"),
        vip_identity=vip_identity,
        **kwargs
    )
    return agent_uuid
开发者ID:Kisensum,项目名称:volttron,代码行数:9,代码来源:agent_additions.py


示例15: add_listener

def add_listener(wrapper, config={}, vip_identity=None, **kwargs):
    print("Adding to {wrapper} a listener agent".format(wrapper=wrapper))
    agent_uuid = wrapper.install_agent(
        config_file=config,
        vip_identity=vip_identity,
        agent_dir=get_services_core("ListenerAgent"),
        **kwargs
    )
    return agent_uuid
开发者ID:Kisensum,项目名称:volttron,代码行数:9,代码来源:agent_additions.py


示例16: test_openadr

    def test_openadr(self, agent):
        # Test that a GET of the "oadr_request_event" XML succeeds, returning a response with a 200 status code
        url = '{}/OpenADR2/Simple/2.0b/EIEvent'.format(web_address)
        xml_filename = get_services_core('OpenADRVenAgent/tests/oadr_request_event.xml')
        xml_file = open(xml_filename, 'rb')
        headers = {'content-type': 'application/sep+xml'}
        response = requests.get(url, data=xml_file, headers=headers)
        assert response.status_code == 200

        # Test that the PUT caused an update to the agent's list of events
        assert agent.vip.rpc.call('test_ven_agent', 'get_events').get(timeout=10) is None
开发者ID:VOLTTRON,项目名称:volttron-applications,代码行数:11,代码来源:test_openadrven.py


示例17: sep2_http_put

    def sep2_http_put(sep2_resource_name, sep2_filename):
        """
            Issue a web request to PUT data for a SEP2 resource, using the contents of an XML file.

        @param sep2_resource_name: The distinguishing part of the name of the SEP2 resource as it appears in the URL.
        @param sep2_filename: The distinguishing part of the SEP2 sample data file name.
        """
        url = '{}/dcap/{}'.format(web_address, sep2_resource_name)
        headers = {'content-type': 'application/sep+xml'}
        return requests.post(url,
                             data=open(get_services_core("SEP2Agent/tests/{}.PUT.xml").format(sep2_filename), 'rb'),
                             headers=headers)
开发者ID:Kisensum,项目名称:volttron,代码行数:12,代码来源:test_sep2_agent.py


示例18: sqlhistorian

def sqlhistorian(request, volttron_instances):
    # print "Fixture sqlhistorian"
    global volttron_instance1, volttron_instance2
    global sqlite_config
    # 1: Install historian agent
    # Install and start sqlhistorian agent in instance2
    agent_uuid = volttron_instance2.install_agent(
        agent_dir=get_services_core("SQLHistorian"),
        config_file=sqlite_config,
        start=True,
        vip_identity='platform.historian')
    print("sqlite historian agent id: ", agent_uuid)
开发者ID:Kisensum,项目名称:volttron,代码行数:12,代码来源:test_forwardhistorian.py


示例19: test_cov_update_published

def test_cov_update_published(volttron_instance, test_agent):
    """Tests the functionality of BACnet change of value forwarding in the
    Master Driver and driver.py"""
    # Reset master driver config store
    cmd = ['volttron-ctl', 'config', 'delete', PLATFORM_DRIVER, '--all']
    process = Popen(cmd, env=volttron_instance.env,
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    result = process.wait()
    assert result == 0

    # Add fake device configuration
    cmd = ['volttron-ctl', 'config', 'store', PLATFORM_DRIVER,
           'fake.csv', 'examples/configurations/drivers/fake.csv', '--csv']
    process = Popen(cmd, env=volttron_instance.env,
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    result = process.wait()
    assert result == 0

    cmd = ['volttron-ctl', 'config', 'store', PLATFORM_DRIVER,
           "devices/fakedriver", 'examples/configurations/drivers/fake.config',
           '--json']
    process = Popen(cmd, env=volttron_instance.env,
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    result = process.wait()
    assert result == 0

    # install master driver, start the master driver, which starts the device
    master_uuid = volttron_instance.install_agent(
        agent_dir=get_services_core("MasterDriverAgent"),
        config_file={},
        start=True)
    print("agent id: ", master_uuid)

    # tell the master driver to forward the value
    point_name = "PowerState"
    device_path = "fakedriver"
    result_dict = {"fake1": "test", "fake2": "test", "fake3": "test"}
    test_agent.vip.rpc.call(PLATFORM_DRIVER, 'forward_bacnet_cov_value',
                            device_path, point_name, result_dict)
    # wait for the publishes to make it to the bus
    gevent.sleep(2)

    # Mock checks
    # Should have one "PowerState" publish for each item in the result dict
    # Total all publishes likely will include regular scrapes
    assert test_agent.cov_callback.call_count >= 3
    test_count = 0
    for call_arg in test_agent.cov_callback.call_args_list:
        if call_arg[0][5][0].get("PowerState", False):
            test_count += 1
    assert test_count == 3
开发者ID:Kisensum,项目名称:volttron,代码行数:51,代码来源:test_driver_bacnet_cov.py


示例20: test_indefinite_override_after_restart

def test_indefinite_override_after_restart(config_store, test_agent, volttron_instance1):
    for i in xrange(4):
        config_name = "devices/fakedriver{}".format(i)
        setup_config(config_store, config_name, fake_device_config)
    device_path = 'fakedriver2'

    # Set override feature on device
    test_agent.vip.rpc.call(
        PLATFORM_DRIVER,  # Target agent
        'set_override_on',  # Method
        device_path,  # Override Pattern
        0.0,  # Indefinite override
        False,  # revert flag to True
        False
    ).get(timeout=10)

    # Give it enough time to set indefinite override.
    gevent.sleep(0.5)
    global master_uuid
    volttron_instance1.stop_agent(master_uuid)
    volttron_instance1.remove_agent(master_uuid)
    gevent.sleep(1)
    # Start the master driver agent which would in turn start the fake driver
    #  using the configs created above
    master_uuid = volttron_instance1.install_agent(
        agent_dir=get_services_core("MasterDriverAgent"),
        config_file={},
        start=True)
    gevent.sleep(1)  # wait for the agent to start and start the devices

    point = 'SampleWritableFloat1'
    new_value = 65.5
    try:
        #Try to set a point on fakedriver1
        result = test_agent.vip.rpc.call(
            PLATFORM_DRIVER,  # Target agent
            'set_point', # Method
            device_path, #device path
            point,
            new_value
        ).get(timeout=10)
        pytest.fail("Expecting Override Error. Code returned : {}".format(result))
    except RemoteError as e:
        assert e.exc_info['exc_type'] == 'master_driver.agent.OverrideError'
        assert e.message == 'Cannot set point on device {} since global override is set'.format(
            device_path)
    result = test_agent.vip.rpc.call(
        PLATFORM_DRIVER,  # Target agent
        'clear_overrides'  # Method
    ).get(timeout=10)
开发者ID:Kisensum,项目名称:volttron,代码行数:50,代码来源:test_global_override.py



注:本文中的volttron.platform.get_services_core函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python poly_line.PolyLine类代码示例发布时间:2022-05-26
下一篇:
Python utility.verbose_info函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap