本文整理汇总了Python中volttron.platform.get_services_core函数的典型用法代码示例。如果您正苦于以下问题:Python get_services_core函数的具体用法?Python get_services_core怎么用?Python get_services_core使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_services_core函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: agent
def agent(request, volttron_instance):
"""Build MasterDriverAgent and add DNP3 driver config to it."""
def update_config(agent_id, name, value, cfg_type):
test_agent.vip.rpc.call('config.store', 'manage_store', agent_id, name, value, config_type=cfg_type)
test_agent = volttron_instance.build_agent()
# Build and start DNP3Agent
dnp3_agent_uuid = volttron_instance.install_agent(agent_dir=get_services_core("DNP3Agent"),
config_file=DNP3_AGENT_CONFIG,
vip_identity=DNP3_AGENT_ID,
start=True)
# Build and start MasterDriverAgent
test_agent.vip.rpc.call('config.store', 'manage_delete_store', MASTER_DRIVER_AGENT_ID)
update_config(MASTER_DRIVER_AGENT_ID, 'devices/dnp3', DRIVER_CONFIG_STRING, 'json')
update_config(MASTER_DRIVER_AGENT_ID, 'dnp3.csv', REGISTRY_CONFIG_STRING, 'csv')
master_uuid = volttron_instance.install_agent(agent_dir=get_services_core("MasterDriverAgent"),
config_file={},
start=True)
gevent.sleep(3) # Wait for the agent to start and start the devices
def stop():
volttron_instance.stop_agent(master_uuid)
volttron_instance.stop_agent(dnp3_agent_uuid)
test_agent.core.stop()
request.addfinalizer(stop)
return test_agent
开发者ID:Kisensum,项目名称:volttron,代码行数:31,代码来源:test_dnp3_driver.py
示例2: test_agent
def test_agent(request, get_volttron_instances):
"""
Fixture that initializes VOLTTRON agents for ReferenceAppAgent test cases.
Start a VOLTTRON instance running a web agent,
then install and start an OpenADRVenAgent, a ReferenceAppAgent, a SimulationDriverAgent,
an ActuatorAgent, and a test agent.
"""
instance = get_volttron_instances(1, should_start=False)
start_wrapper_platform(instance, with_http=True)
# Install and start a WebAgent.
web_agent = _build_web_agent(instance.volttron_home)
gevent.sleep(1)
web_agent_uuid = instance.install_agent(agent_dir=web_agent)
global web_server_address
web_server_address = instance.bind_web_address
global volttron_home
volttron_home = instance.volttron_home
def issue_config_rpc(test_agt, config_request, *args):
return test_agt.vip.rpc.call('config.store', config_request, SIMULATION_DRIVER_ID, *args).get(timeout=10)
def start_agent(id, dir, config):
return instance.install_agent(vip_identity=id, agent_dir=dir, config_file=config, start=True)
test_agt = instance.build_agent(identity=TEST_AGENT_ID)
issue_config_rpc(test_agt, 'manage_delete_store')
for param_dict in DRIVER_PARAMS:
device_id = param_dict['id']
issue_config_rpc(test_agt, 'manage_store', 'devices/campus1/building1/{}'.format(device_id), param_dict['config'], 'json')
issue_config_rpc(test_agt, 'manage_store', '{}.csv'.format(device_id), param_dict['points'], 'csv')
clock_uuid = start_agent(SIMULATION_CLOCK_ID, 'applications/kisensum/Simulation/SimulationClockAgent', SIMULATION_CLOCK_CONFIG)
sim_driver_uuid = start_agent(SIMULATION_DRIVER_ID, 'applications/kisensum/Simulation/SimulationDriverAgent', {})
actuator_uuid = start_agent(ACTUATOR_ID, get_services_core('ActuatorAgent'), ACTUATOR_CONFIG)
ven_uuid = start_agent(VEN_AGENT_ID, get_services_core('OpenADRVenAgent'), VEN_AGENT_CONFIG)
ref_app_uuid = start_agent(REFERENCE_APP_ID, 'applications/kisensum/ReferenceAppAgent', REFERENCE_APP_CONFIG)
def stop():
instance.stop_agent(actuator_uuid)
instance.stop_agent(ref_app_uuid)
instance.stop_agent(sim_driver_uuid)
instance.stop_agent(ven_uuid)
instance.stop_agent(clock_uuid)
instance.stop_agent(web_agent_uuid)
test_agt.core.stop()
instance.shutdown_platform()
gevent.sleep(10)
request.addfinalizer(stop)
yield test_agt
开发者ID:VOLTTRON,项目名称:volttron-applications,代码行数:57,代码来源:test_ref_app.py
示例3: failover
def failover(request, get_volttron_instances):
global primary_config
global secondary_config
global primary_failover
global secondary_failover
global vc_uuid
primary, secondary, vc = get_volttron_instances(3)
primary.allow_all_connections()
secondary.allow_all_connections()
vc.allow_all_connections()
# configure vc
vc_uuid = vc.install_agent(
agent_dir=get_services_core("VolttronCentralPlatform"))
# configure primary
primary_platform = primary.install_agent(
agent_dir=get_services_core("VolttronCentralPlatform"))
# register with vc
primary_listener = primary.install_agent(agent_dir=get_examples("ListenerAgent"),
vip_identity="listener",
start=False)
primary_config["remote_vip"] = tcp_to(secondary)
primary_failover = primary.install_agent(agent_dir=get_services_core("FailoverAgent"),
config_file=primary_config)
# configure secondary
secondary_platform = secondary.install_agent(agent_dir=get_services_core("VolttronCentralPlatform"))
# register with vc
secondary_listener = secondary.install_agent(agent_dir=get_examples("ListenerAgent"),
vip_identity="listener",
start=False)
secondary_config["remote_vip"] = tcp_to(primary)
secondary_failover = secondary.install_agent(agent_dir=get_services_core("FailoverAgent"),
config_file=secondary_config)
def stop():
vc.stop_agent(vc_uuid)
vc.shutdown_platform()
primary.stop_agent(primary_failover)
primary.stop_agent(primary_platform)
primary.stop_agent(primary_listener)
primary.shutdown_platform()
secondary.stop_agent(secondary_failover)
secondary.stop_agent(secondary_platform)
secondary.stop_agent(secondary_listener)
secondary.shutdown_platform()
request.addfinalizer(stop)
return primary, secondary, vc
开发者ID:Kisensum,项目名称:volttron,代码行数:56,代码来源:test_failover.py
示例4: agent
def agent(request, volttron_instance_module_web):
test_agent = volttron_instance_module_web.build_agent()
# Configure a SEP2 device in the Master Driver
test_agent.vip.rpc.call('config.store', 'manage_delete_store', 'platform.driver').get(timeout=10)
test_agent.vip.rpc.call('config.store', 'manage_store', 'platform.driver',
'devices/{}'.format(DRIVER_NAME),
"""{
"driver_config": {
"sfdi": "097935300833",
"sep2_agent_id": "test_sep2agent"
},
"campus": "campus",
"building": "building",
"unit": "sep2",
"driver_type": "sep2",
"registry_config": "config://sep2.csv",
"interval": 15,
"timezone": "US/Pacific",
"heart_beat_point": "Heartbeat"
}""",
'json').get(timeout=10)
test_agent.vip.rpc.call('config.store', 'manage_store', 'platform.driver',
'sep2.csv',
REGISTRY_CONFIG_STRING,
'csv').get(timeout=10)
# Install and start a SEP2Agent
sep2_id = volttron_instance_module_web.install_agent(agent_dir=get_services_core("SEP2Agent"),
config_file=TEST_SEP2_CONFIG,
vip_identity='test_sep2agent',
start=True)
print('sep2 agent id: ', sep2_id)
# Install and start a MasterDriverAgent
md_id = volttron_instance_module_web.install_agent(agent_dir=get_services_core("MasterDriverAgent"),
config_file={},
start=True)
print('master driver agent id: ', md_id)
global web_address
web_address = volttron_instance_module_web.bind_web_address
def stop():
volttron_instance_module_web.stop_agent(md_id)
volttron_instance_module_web.stop_agent(sep2_id)
test_agent.core.stop()
gevent.sleep(3) # wait for agents and devices to start
request.addfinalizer(stop)
return test_agent
开发者ID:Kisensum,项目名称:volttron,代码行数:53,代码来源:test_sep2_agent.py
示例5: test_resinstall_agent
def test_resinstall_agent(volttron_instance):
mysql_config = {
"connection": {
"type": "mysql",
"params": {
"host": "localhost",
"port": 3306,
"database": "test_historian",
"user": "historian",
"passwd": "historian"
}
}
}
for i in range(0,50):
print("Counter: {}".format(i))
# auuid = volttron_instance.install_agent(
# agent_dir=get_examples("ListenerAgent",
# vip_identity='test_listener',
# start=True)
auuid = volttron_instance.install_agent(
agent_dir=get_services_core("SQLHistorian"),
config_file=mysql_config,
start=True,
vip_identity='test_historian')
assert volttron_instance.is_agent_running(auuid)
volttron_instance.remove_agent(auuid)
开发者ID:Kisensum,项目名称:volttron,代码行数:26,代码来源:test_platformwrapper.py
示例6: test_poll_valid
def test_poll_valid(volttron_instance, publish_agent, locations, result_topics):
global publish_agent_v2, API_KEY
uuid = None
try:
config = {
"operation_mode": 2,
"wu_api_key": API_KEY,
"locations": locations,
"poll_time": 20
}
uuid = volttron_instance.install_agent(
vip_identity='weather_agent',
agent_dir=get_services_core("WeatherAgent"),
config_file=config,
start=False)
# reset mock to ignore any previous callback
publish_agent.process_poll_result.reset_mock()
publish_agent.process_error.reset_mock()
volttron_instance.start_agent(uuid)
gevent.sleep(10)
assert publish_agent.process_poll_result.call_count == len(locations)
assert publish_agent.process_error.call_count == 0
call_args_list = publish_agent.process_poll_result.call_args_list
i = 0
for args in call_args_list:
assert args[0][3] == result_topics[i]
assert_weather_response(args[0])
i += 1
finally:
if uuid:
volttron_instance.remove_agent(uuid)
开发者ID:Kisensum,项目名称:volttron,代码行数:33,代码来源:test_weather_agent.py
示例7: add_forward_historian
def add_forward_historian(wrapper, config={}, **kwargs):
agent_uuid = wrapper.install_agent(
config_file=config,
agent_dir=get_services_core("ForwardHistorian"),
**kwargs
)
return agent_uuid
开发者ID:Kisensum,项目名称:volttron,代码行数:7,代码来源:agent_additions.py
示例8: test_reconnect_forwarder
def test_reconnect_forwarder(get_volttron_instances):
from_instance, to_instance = get_volttron_instances(2, True)
to_instance.allow_all_connections()
publisher = from_instance.build_agent()
receiver = to_instance.build_agent()
forwarder_config = deepcopy(BASE_FORWARD_CONFIG)
#forwardtoaddr = build_vip_address(to_instance, receiver)
#print("FORWARD ADDR: {}".format(forwardtoaddr))
forwarder_config['destination-vip'] = to_instance.vip_address
forwarder_config['destination-serverkey'] = to_instance.keystore.public
fuuid = from_instance.install_agent(
agent_dir=get_services_core("ForwardHistorian"),start=True,
config_file=forwarder_config)
assert from_instance.is_agent_running(fuuid)
print('Before Subscribing')
receiver.vip.pubsub.subscribe('pubsub', '', callback=onmessage)
publisher.vip.pubsub.publish('pubsub', 'stuff', message='Fuzzy')
gevent.sleep(.2)
num_messages = 5
for i in range(num_messages):
do_publish(publisher)
for i in range(len(publishedmessages)):
assert allforwardedmessage[i] == publishedmessages[i]
开发者ID:VOLTTRON,项目名称:volttron,代码行数:28,代码来源:test_forward_historian.py
示例9: agent
def agent(request, volttron_instance):
"""Build the test agent for rpc call."""
test_agent = volttron_instance.build_agent()
add_definitions_to_config_store(test_agent)
print('Installing Mesa Agent')
os.environ['AGENT_MODULE'] = 'dnp3.mesa.agent'
agent_id = volttron_instance.install_agent(agent_dir=get_services_core("DNP3Agent"),
config_file=MESA_AGENT_CONFIG,
vip_identity=MESA_AGENT_ID,
start=True)
# Subscribe to MESA functions
test_agent.vip.pubsub.subscribe(peer='pubsub',
prefix='mesa/function',
callback=onmessage)
def stop():
"""Stop test agent."""
if volttron_instance.is_running():
volttron_instance.stop_agent(agent_id)
volttron_instance.remove_agent(agent_id)
test_agent.core.stop()
gevent.sleep(3) # wait for agents and devices to start
request.addfinalizer(stop)
return test_agent
开发者ID:VOLTTRON,项目名称:volttron,代码行数:31,代码来源:test_mesa_agent.py
示例10: vc_instance
def vc_instance(request, volttron_instance1_web):
"""
Creates an instance of volttron with a `VolttronCentral` agent
already installed and started.
:returns tuple:
- the PlatformWrapper
- the uuid of the `VolttronCentral` agent.
- the jsonrpc address to be used for communicating with the
`VolttronCentral` agent.
"""
agent_uuid = volttron_instance1_web.install_agent(
agent_dir=get_services_core("VolttronCentral"),
config_file=VC_CONFIG,
start=True
)
rpc_addr = volttron_instance1_web.jsonrpc_endpoint
# Allow all incoming connections that are encrypted.
volttron_instance1_web.allow_all_connections()
def cleanup():
print('Cleanup vc_instance')
volttron_instance1_web.remove_agent(agent_uuid)
print_log(volttron_instance1_web.volttron_home)
request.addfinalizer(cleanup)
return volttron_instance1_web, agent_uuid, rpc_addr
开发者ID:Kisensum,项目名称:volttron,代码行数:29,代码来源:vc_fixtures.py
示例11: forwarder
def forwarder(request, volttron_instances):
#print "Fixture forwarder"
global volttron_instance1, volttron_instance2
global forwarder_uuid, forwarder_config
# 1. Update destination address in forwarder configuration
volttron_instance1.allow_all_connections()
volttron_instance2.allow_all_connections()
# setup destination address to include keys
known_hosts_file = os.path.join(volttron_instance1.volttron_home, 'known_hosts')
known_hosts = KnownHostsStore(known_hosts_file)
known_hosts.add(volttron_instance2.vip_address, volttron_instance2.serverkey)
forwarder_config["destination-vip"] = volttron_instance2.vip_address
forwarder_config["destination-serverkey"] = volttron_instance2.serverkey
# 1: Install historian agent
# Install and start sqlhistorian agent in instance2
forwarder_uuid = volttron_instance1.install_agent(
agent_dir=get_services_core("ForwardHistorian"),
config_file=forwarder_config,
start=True)
print("forwarder agent id: ", forwarder_uuid)
开发者ID:Kisensum,项目名称:volttron,代码行数:25,代码来源:test_forwardhistorian.py
示例12: simple_failover
def simple_failover(request, get_volttron_instances):
global simple_primary_config
global simple_secondary_config
global uuid_primary
global uuid_secondary
global listener_primary
primary, secondary = get_volttron_instances(2)
primary.allow_all_connections()
secondary.allow_all_connections()
# configure primary
listener_primary = primary.install_agent(agent_dir=get_examples("ListenerAgent"),
vip_identity="listener",
start=False)
simple_primary_config["remote_vip"] = secondary.vip_address
simple_primary_config["remote_serverkey"] = secondary.serverkey
uuid_primary = primary.install_agent(agent_dir=get_services_core("FailoverAgent"),
config_file=simple_primary_config)
# configure secondary
listener_secondary = secondary.install_agent(agent_dir=get_examples("ListenerAgent"),
vip_identity="listener",
start=False)
simple_secondary_config["remote_vip"] = primary.vip_address
simple_secondary_config["remote_serverkey"] = primary.serverkey
uuid_secondary = secondary.install_agent(agent_dir=get_services_core("FailoverAgent"),
config_file=simple_secondary_config)
gevent.sleep(SLEEP_TIME)
assert all_agents_running(primary)
assert not all_agents_running(secondary)
def cleanup():
primary.stop_agent(uuid_primary)
primary.stop_agent(listener_primary)
primary.shutdown_platform()
secondary.stop_agent(uuid_secondary)
secondary.stop_agent(listener_secondary)
secondary.shutdown_platform()
request.addfinalizer(cleanup)
return primary, secondary
开发者ID:Kisensum,项目名称:volttron,代码行数:47,代码来源:test_simple_failover.py
示例13: add_volttron_central_platform
def add_volttron_central_platform(wrapper, config={}, **kwargs):
print('Adding vcp to {}'.format(wrapper.vip_address))
agent_uuid = wrapper.install_agent(
config_file=config,
agent_dir=get_services_core("VolttronCentralPlatform"),
vip_identity=VOLTTRON_CENTRAL_PLATFORM
)
return agent_uuid
开发者ID:Kisensum,项目名称:volttron,代码行数:8,代码来源:agent_additions.py
示例14: add_mongohistorian
def add_mongohistorian(wrapper, config, vip_identity='platform.historian',
**kwargs):
agent_uuid = wrapper.install_agent(
config_file=config,
agent_dir=get_services_core("MongodbHistorian"),
vip_identity=vip_identity,
**kwargs
)
return agent_uuid
开发者ID:Kisensum,项目名称:volttron,代码行数:9,代码来源:agent_additions.py
示例15: add_listener
def add_listener(wrapper, config={}, vip_identity=None, **kwargs):
print("Adding to {wrapper} a listener agent".format(wrapper=wrapper))
agent_uuid = wrapper.install_agent(
config_file=config,
vip_identity=vip_identity,
agent_dir=get_services_core("ListenerAgent"),
**kwargs
)
return agent_uuid
开发者ID:Kisensum,项目名称:volttron,代码行数:9,代码来源:agent_additions.py
示例16: test_openadr
def test_openadr(self, agent):
# Test that a GET of the "oadr_request_event" XML succeeds, returning a response with a 200 status code
url = '{}/OpenADR2/Simple/2.0b/EIEvent'.format(web_address)
xml_filename = get_services_core('OpenADRVenAgent/tests/oadr_request_event.xml')
xml_file = open(xml_filename, 'rb')
headers = {'content-type': 'application/sep+xml'}
response = requests.get(url, data=xml_file, headers=headers)
assert response.status_code == 200
# Test that the PUT caused an update to the agent's list of events
assert agent.vip.rpc.call('test_ven_agent', 'get_events').get(timeout=10) is None
开发者ID:VOLTTRON,项目名称:volttron-applications,代码行数:11,代码来源:test_openadrven.py
示例17: sep2_http_put
def sep2_http_put(sep2_resource_name, sep2_filename):
"""
Issue a web request to PUT data for a SEP2 resource, using the contents of an XML file.
@param sep2_resource_name: The distinguishing part of the name of the SEP2 resource as it appears in the URL.
@param sep2_filename: The distinguishing part of the SEP2 sample data file name.
"""
url = '{}/dcap/{}'.format(web_address, sep2_resource_name)
headers = {'content-type': 'application/sep+xml'}
return requests.post(url,
data=open(get_services_core("SEP2Agent/tests/{}.PUT.xml").format(sep2_filename), 'rb'),
headers=headers)
开发者ID:Kisensum,项目名称:volttron,代码行数:12,代码来源:test_sep2_agent.py
示例18: sqlhistorian
def sqlhistorian(request, volttron_instances):
# print "Fixture sqlhistorian"
global volttron_instance1, volttron_instance2
global sqlite_config
# 1: Install historian agent
# Install and start sqlhistorian agent in instance2
agent_uuid = volttron_instance2.install_agent(
agent_dir=get_services_core("SQLHistorian"),
config_file=sqlite_config,
start=True,
vip_identity='platform.historian')
print("sqlite historian agent id: ", agent_uuid)
开发者ID:Kisensum,项目名称:volttron,代码行数:12,代码来源:test_forwardhistorian.py
示例19: test_cov_update_published
def test_cov_update_published(volttron_instance, test_agent):
"""Tests the functionality of BACnet change of value forwarding in the
Master Driver and driver.py"""
# Reset master driver config store
cmd = ['volttron-ctl', 'config', 'delete', PLATFORM_DRIVER, '--all']
process = Popen(cmd, env=volttron_instance.env,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
result = process.wait()
assert result == 0
# Add fake device configuration
cmd = ['volttron-ctl', 'config', 'store', PLATFORM_DRIVER,
'fake.csv', 'examples/configurations/drivers/fake.csv', '--csv']
process = Popen(cmd, env=volttron_instance.env,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
result = process.wait()
assert result == 0
cmd = ['volttron-ctl', 'config', 'store', PLATFORM_DRIVER,
"devices/fakedriver", 'examples/configurations/drivers/fake.config',
'--json']
process = Popen(cmd, env=volttron_instance.env,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
result = process.wait()
assert result == 0
# install master driver, start the master driver, which starts the device
master_uuid = volttron_instance.install_agent(
agent_dir=get_services_core("MasterDriverAgent"),
config_file={},
start=True)
print("agent id: ", master_uuid)
# tell the master driver to forward the value
point_name = "PowerState"
device_path = "fakedriver"
result_dict = {"fake1": "test", "fake2": "test", "fake3": "test"}
test_agent.vip.rpc.call(PLATFORM_DRIVER, 'forward_bacnet_cov_value',
device_path, point_name, result_dict)
# wait for the publishes to make it to the bus
gevent.sleep(2)
# Mock checks
# Should have one "PowerState" publish for each item in the result dict
# Total all publishes likely will include regular scrapes
assert test_agent.cov_callback.call_count >= 3
test_count = 0
for call_arg in test_agent.cov_callback.call_args_list:
if call_arg[0][5][0].get("PowerState", False):
test_count += 1
assert test_count == 3
开发者ID:Kisensum,项目名称:volttron,代码行数:51,代码来源:test_driver_bacnet_cov.py
示例20: test_indefinite_override_after_restart
def test_indefinite_override_after_restart(config_store, test_agent, volttron_instance1):
for i in xrange(4):
config_name = "devices/fakedriver{}".format(i)
setup_config(config_store, config_name, fake_device_config)
device_path = 'fakedriver2'
# Set override feature on device
test_agent.vip.rpc.call(
PLATFORM_DRIVER, # Target agent
'set_override_on', # Method
device_path, # Override Pattern
0.0, # Indefinite override
False, # revert flag to True
False
).get(timeout=10)
# Give it enough time to set indefinite override.
gevent.sleep(0.5)
global master_uuid
volttron_instance1.stop_agent(master_uuid)
volttron_instance1.remove_agent(master_uuid)
gevent.sleep(1)
# Start the master driver agent which would in turn start the fake driver
# using the configs created above
master_uuid = volttron_instance1.install_agent(
agent_dir=get_services_core("MasterDriverAgent"),
config_file={},
start=True)
gevent.sleep(1) # wait for the agent to start and start the devices
point = 'SampleWritableFloat1'
new_value = 65.5
try:
#Try to set a point on fakedriver1
result = test_agent.vip.rpc.call(
PLATFORM_DRIVER, # Target agent
'set_point', # Method
device_path, #device path
point,
new_value
).get(timeout=10)
pytest.fail("Expecting Override Error. Code returned : {}".format(result))
except RemoteError as e:
assert e.exc_info['exc_type'] == 'master_driver.agent.OverrideError'
assert e.message == 'Cannot set point on device {} since global override is set'.format(
device_path)
result = test_agent.vip.rpc.call(
PLATFORM_DRIVER, # Target agent
'clear_overrides' # Method
).get(timeout=10)
开发者ID:Kisensum,项目名称:volttron,代码行数:50,代码来源:test_global_override.py
注:本文中的volttron.platform.get_services_core函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论