本文整理汇总了Python中tests.common.async_fire_mqtt_message函数的典型用法代码示例。如果您正苦于以下问题:Python async_fire_mqtt_message函数的具体用法?Python async_fire_mqtt_message怎么用?Python async_fire_mqtt_message使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了async_fire_mqtt_message函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_no_color_brightness_color_temp_white_val_if_no_topics
async def test_no_color_brightness_color_temp_white_val_if_no_topics(
hass, mqtt_mock):
"""Test for no RGB, brightness, color temp, effect, white val or XY."""
assert await async_setup_component(hass, light.DOMAIN, {
light.DOMAIN: {
'platform': 'mqtt_json',
'name': 'test',
'state_topic': 'test_light_rgb',
'command_topic': 'test_light_rgb/set',
}
})
state = hass.states.get('light.test')
assert STATE_OFF == state.state
assert 40 == state.attributes.get(ATTR_SUPPORTED_FEATURES)
assert state.attributes.get('rgb_color') is None
assert state.attributes.get('brightness') is None
assert state.attributes.get('color_temp') is None
assert state.attributes.get('effect') is None
assert state.attributes.get('white_value') is None
assert state.attributes.get('xy_color') is None
assert state.attributes.get('hs_color') is None
async_fire_mqtt_message(hass, 'test_light_rgb', '{"state":"ON"}')
await hass.async_block_till_done()
state = hass.states.get('light.test')
assert STATE_ON == state.state
assert state.attributes.get('rgb_color') is None
assert state.attributes.get('brightness') is None
assert state.attributes.get('color_temp') is None
assert state.attributes.get('effect') is None
assert state.attributes.get('white_value') is None
assert state.attributes.get('xy_color') is None
assert state.attributes.get('hs_color') is None
开发者ID:ManHammer,项目名称:home-assistant,代码行数:35,代码来源:test_mqtt_json.py
示例2: test_default_availability_payload
async def test_default_availability_payload(hass, mqtt_mock):
"""Test availability by default payload with defined topic."""
assert await async_setup_component(hass, lock.DOMAIN, {
lock.DOMAIN: {
'platform': 'mqtt',
'name': 'test',
'state_topic': 'state-topic',
'command_topic': 'command-topic',
'payload_lock': 'LOCK',
'payload_unlock': 'UNLOCK',
'availability_topic': 'availability-topic'
}
})
state = hass.states.get('lock.test')
assert state.state is STATE_UNAVAILABLE
async_fire_mqtt_message(hass, 'availability-topic', 'online')
await hass.async_block_till_done()
state = hass.states.get('lock.test')
assert state.state is not STATE_UNAVAILABLE
async_fire_mqtt_message(hass, 'availability-topic', 'offline')
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get('lock.test')
assert state.state is STATE_UNAVAILABLE
开发者ID:Martwall,项目名称:home-assistant,代码行数:29,代码来源:test_lock.py
示例3: test_unique_id
async def test_unique_id(hass):
"""Test unique id option only creates one light per unique_id."""
await async_mock_mqtt_component(hass)
assert await async_setup_component(hass, light.DOMAIN, {
light.DOMAIN: [{
'platform': 'mqtt',
'name': 'Test 1',
'schema': 'template',
'state_topic': 'test-topic',
'command_topic': 'test_topic',
'command_on_template': 'on,{{ transition }}',
'command_off_template': 'off,{{ transition|d }}',
'unique_id': 'TOTALLY_UNIQUE'
}, {
'platform': 'mqtt',
'name': 'Test 2',
'schema': 'template',
'state_topic': 'test-topic',
'command_topic': 'test_topic',
'unique_id': 'TOTALLY_UNIQUE'
}]
})
async_fire_mqtt_message(hass, 'test-topic', 'payload')
await hass.async_block_till_done()
assert len(hass.states.async_entity_ids(light.DOMAIN)) == 1
开发者ID:Martwall,项目名称:home-assistant,代码行数:25,代码来源:test_light_template.py
示例4: test_discovery_removal_alarm
async def test_discovery_removal_alarm(hass, mqtt_mock, caplog):
"""Test removal of discovered alarm_control_panel."""
entry = MockConfigEntry(domain=mqtt.DOMAIN)
await async_start(hass, 'homeassistant', {}, entry)
data = (
'{ "name": "Beer",'
' "state_topic": "test_topic",'
' "command_topic": "test_topic" }'
)
async_fire_mqtt_message(hass,
'homeassistant/alarm_control_panel/bla/config',
data)
await hass.async_block_till_done()
state = hass.states.get('alarm_control_panel.beer')
assert state is not None
assert state.name == 'Beer'
async_fire_mqtt_message(hass,
'homeassistant/alarm_control_panel/bla/config',
'')
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get('alarm_control_panel.beer')
assert state is None
开发者ID:arsaboo,项目名称:home-assistant,代码行数:28,代码来源:test_alarm_control_panel.py
示例5: test_discovery_update_lock
async def test_discovery_update_lock(hass, mqtt_mock, caplog):
"""Test update of discovered lock."""
entry = MockConfigEntry(domain=mqtt.DOMAIN)
await async_start(hass, 'homeassistant', {}, entry)
data1 = (
'{ "name": "Beer",'
' "state_topic": "test_topic",'
' "command_topic": "command_topic",'
' "availability_topic": "availability_topic1" }'
)
data2 = (
'{ "name": "Milk",'
' "state_topic": "test_topic2",'
' "command_topic": "command_topic",'
' "availability_topic": "availability_topic2" }'
)
async_fire_mqtt_message(hass, 'homeassistant/lock/bla/config',
data1)
await hass.async_block_till_done()
state = hass.states.get('lock.beer')
assert state is not None
assert state.name == 'Beer'
async_fire_mqtt_message(hass, 'homeassistant/lock/bla/config',
data2)
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get('lock.beer')
assert state is not None
assert state.name == 'Milk'
state = hass.states.get('lock.milk')
assert state is None
开发者ID:Martwall,项目名称:home-assistant,代码行数:32,代码来源:test_lock.py
示例6: test_mqtt_ws_subscription
async def test_mqtt_ws_subscription(hass, hass_ws_client):
"""Test MQTT websocket subscription."""
await async_mock_mqtt_component(hass)
client = await hass_ws_client(hass)
await client.send_json({
'id': 5,
'type': 'mqtt/subscribe',
'topic': 'test-topic',
})
response = await client.receive_json()
assert response['success']
async_fire_mqtt_message(hass, 'test-topic', 'test1')
async_fire_mqtt_message(hass, 'test-topic', 'test2')
response = await client.receive_json()
assert response['event']['topic'] == 'test-topic'
assert response['event']['payload'] == 'test1'
response = await client.receive_json()
assert response['event']['topic'] == 'test-topic'
assert response['event']['payload'] == 'test2'
# Unsubscribe
await client.send_json({
'id': 8,
'type': 'unsubscribe_events',
'subscription': 5,
})
response = await client.receive_json()
assert response['success']
开发者ID:boced66,项目名称:home-assistant,代码行数:32,代码来源:test_init.py
示例7: test_default_availability_payload
async def test_default_availability_payload(hass, mqtt_mock):
"""Test availability by default payload with defined topic."""
assert await async_setup_component(hass, light.DOMAIN, {
light.DOMAIN: {
'platform': 'mqtt',
'schema': 'json',
'name': 'test',
'state_topic': 'test_light_rgb',
'command_topic': 'test_light_rgb/set',
'availability_topic': 'availability-topic'
}
})
state = hass.states.get('light.test')
assert STATE_UNAVAILABLE == state.state
async_fire_mqtt_message(hass, 'availability-topic', 'online')
await hass.async_block_till_done()
state = hass.states.get('light.test')
assert STATE_UNAVAILABLE != state.state
async_fire_mqtt_message(hass, 'availability-topic', 'offline')
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get('light.test')
assert STATE_UNAVAILABLE == state.state
开发者ID:Martwall,项目名称:home-assistant,代码行数:28,代码来源:test_light_json.py
示例8: test_discovery_update_vacuum
async def test_discovery_update_vacuum(hass, mock_publish):
"""Test update of discovered vacuum."""
entry = MockConfigEntry(domain=mqtt.DOMAIN)
await async_start(hass, 'homeassistant', {}, entry)
data1 = (
'{ "name": "Beer",'
' "command_topic": "test_topic" }'
)
data2 = (
'{ "name": "Milk",'
' "command_topic": "test_topic" }'
)
async_fire_mqtt_message(hass, 'homeassistant/vacuum/bla/config',
data1)
await hass.async_block_till_done()
state = hass.states.get('vacuum.beer')
assert state is not None
assert state.name == 'Beer'
async_fire_mqtt_message(hass, 'homeassistant/vacuum/bla/config',
data2)
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get('vacuum.beer')
assert state is not None
assert state.name == 'Milk'
state = hass.states.get('vacuum.milk')
assert state is None
开发者ID:Martwall,项目名称:home-assistant,代码行数:32,代码来源:test_vacuum.py
示例9: test_custom_availability_payload
async def test_custom_availability_payload(hass, mqtt_mock):
"""Test availability by custom payload with defined topic."""
assert await async_setup_component(hass, light.DOMAIN, {
light.DOMAIN: {
'platform': 'mqtt',
'name': 'test',
'command_topic': 'test_light/set',
'brightness_command_topic': 'test_light/bright',
'rgb_command_topic': "test_light/rgb",
'availability_topic': 'availability-topic',
'payload_available': 'good',
'payload_not_available': 'nogood'
}
})
state = hass.states.get('light.test')
assert STATE_UNAVAILABLE == state.state
async_fire_mqtt_message(hass, 'availability-topic', 'good')
await hass.async_block_till_done()
state = hass.states.get('light.test')
assert STATE_UNAVAILABLE != state.state
async_fire_mqtt_message(hass, 'availability-topic', 'nogood')
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get('light.test')
assert STATE_UNAVAILABLE == state.state
开发者ID:ManHammer,项目名称:home-assistant,代码行数:30,代码来源:test_mqtt.py
示例10: test_controlling_state_via_topic_and_json_message
async def test_controlling_state_via_topic_and_json_message(
hass, mock_publish):
"""Test the controlling state via topic and JSON message."""
assert await async_setup_component(hass, switch.DOMAIN, {
switch.DOMAIN: {
'platform': 'mqtt',
'name': 'test',
'state_topic': 'state-topic',
'command_topic': 'command-topic',
'payload_on': 'beer on',
'payload_off': 'beer off',
'value_template': '{{ value_json.val }}'
}
})
state = hass.states.get('switch.test')
assert STATE_OFF == state.state
async_fire_mqtt_message(hass, 'state-topic', '{"val":"beer on"}')
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get('switch.test')
assert STATE_ON == state.state
async_fire_mqtt_message(hass, 'state-topic', '{"val":"beer off"}')
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get('switch.test')
assert STATE_OFF == state.state
开发者ID:Martwall,项目名称:home-assistant,代码行数:31,代码来源:test_switch.py
示例11: test_custom_availability_payload
async def test_custom_availability_payload(hass, mock_publish):
"""Test availability by custom payload with defined topic."""
default_config.update({
'availability_topic': 'availability-topic',
'payload_available': 'good',
'payload_not_available': 'nogood'
})
assert await async_setup_component(hass, vacuum.DOMAIN, {
vacuum.DOMAIN: default_config,
})
state = hass.states.get('vacuum.mqtttest')
assert STATE_UNAVAILABLE == state.state
async_fire_mqtt_message(hass, 'availability-topic', 'good')
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get('vacuum.mqtttest')
assert STATE_UNAVAILABLE != state.state
async_fire_mqtt_message(hass, 'availability-topic', 'nogood')
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get('vacuum.mqtttest')
assert STATE_UNAVAILABLE == state.state
开发者ID:Martwall,项目名称:home-assistant,代码行数:28,代码来源:test_vacuum.py
示例12: test_controlling_state_via_topic
async def test_controlling_state_via_topic(hass, mock_publish):
"""Test the controlling state via topic."""
assert await async_setup_component(hass, switch.DOMAIN, {
switch.DOMAIN: {
'platform': 'mqtt',
'name': 'test',
'state_topic': 'state-topic',
'command_topic': 'command-topic',
'payload_on': 1,
'payload_off': 0
}
})
state = hass.states.get('switch.test')
assert STATE_OFF == state.state
assert not state.attributes.get(ATTR_ASSUMED_STATE)
async_fire_mqtt_message(hass, 'state-topic', '1')
await hass.async_block_till_done()
state = hass.states.get('switch.test')
assert STATE_ON == state.state
async_fire_mqtt_message(hass, 'state-topic', '0')
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get('switch.test')
assert STATE_OFF == state.state
开发者ID:Martwall,项目名称:home-assistant,代码行数:29,代码来源:test_switch.py
示例13: test_custom_state_payload
async def test_custom_state_payload(hass, mock_publish):
"""Test the state payload."""
assert await async_setup_component(hass, switch.DOMAIN, {
switch.DOMAIN: {
'platform': 'mqtt',
'name': 'test',
'state_topic': 'state-topic',
'command_topic': 'command-topic',
'payload_on': 1,
'payload_off': 0,
'state_on': "HIGH",
'state_off': "LOW",
}
})
state = hass.states.get('switch.test')
assert STATE_OFF == state.state
assert not state.attributes.get(ATTR_ASSUMED_STATE)
async_fire_mqtt_message(hass, 'state-topic', 'HIGH')
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get('switch.test')
assert STATE_ON == state.state
async_fire_mqtt_message(hass, 'state-topic', 'LOW')
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get('switch.test')
assert STATE_OFF == state.state
开发者ID:Martwall,项目名称:home-assistant,代码行数:32,代码来源:test_switch.py
示例14: test_snips_call_action
def test_snips_call_action(hass, mqtt_mock):
"""Test calling action via Snips."""
calls = async_mock_service(hass, 'test', 'service')
result = yield from async_setup_component(hass, "snips", {
"snips": {
"intents": {
"Lights": {
"action": {
"service": "test.service",
"data_template": {
"color": "{{ light_color }}"
}
}
}
}
}
})
assert result
async_fire_mqtt_message(hass, 'hermes/nlu/intentParsed',
EXAMPLE_MSG)
yield from hass.async_block_till_done()
assert len(calls) == 1
call = calls[0]
assert call.data.get('color') == 'blue'
开发者ID:Khabi,项目名称:home-assistant,代码行数:26,代码来源:test_snips.py
示例15: test_snips_intent_username
async def test_snips_intent_username(hass, mqtt_mock):
"""Test intentName format username:intentName."""
result = await async_setup_component(hass, "snips", {
"snips": {},
})
assert result
payload = """
{
"input": "what to do",
"intent": {
"intentName": "username:Lights",
"probability": 1
},
"slots": []
}
"""
intents = async_mock_intent(hass, 'Lights')
async_fire_mqtt_message(hass, 'hermes/intent/username:Lights',
payload)
await hass.async_block_till_done()
assert len(intents) == 1
intent = intents[0]
assert intent.platform == 'snips'
assert intent.intent_type == 'Lights'
开发者ID:EarthlingRich,项目名称:home-assistant,代码行数:25,代码来源:test_snips.py
示例16: test_no_color_brightness_color_temp_hs_white_xy_if_no_topics
async def test_no_color_brightness_color_temp_hs_white_xy_if_no_topics(
hass, mqtt_mock):
"""Test if there is no color and brightness if no topic."""
assert await async_setup_component(hass, light.DOMAIN, {
light.DOMAIN: {
'platform': 'mqtt',
'name': 'test',
'state_topic': 'test_light_rgb/status',
'command_topic': 'test_light_rgb/set',
}
})
state = hass.states.get('light.test')
assert STATE_OFF == state.state
assert state.attributes.get('rgb_color') is None
assert state.attributes.get('brightness') is None
assert state.attributes.get('color_temp') is None
assert state.attributes.get('hs_color') is None
assert state.attributes.get('white_value') is None
assert state.attributes.get('xy_color') is None
async_fire_mqtt_message(hass, 'test_light_rgb/status', 'ON')
await hass.async_block_till_done()
state = hass.states.get('light.test')
assert STATE_ON == state.state
assert state.attributes.get('rgb_color') is None
assert state.attributes.get('brightness') is None
assert state.attributes.get('color_temp') is None
assert state.attributes.get('hs_color') is None
assert state.attributes.get('white_value') is None
assert state.attributes.get('xy_color') is None
开发者ID:ManHammer,项目名称:home-assistant,代码行数:32,代码来源:test_mqtt.py
示例17: test_snips_low_probability
async def test_snips_low_probability(hass, mqtt_mock, caplog):
"""Test intent via Snips."""
caplog.set_level(logging.WARNING)
result = await async_setup_component(hass, "snips", {
"snips": {
"probability_threshold": 0.5
},
})
assert result
payload = """
{
"input": "I am not sure what to say",
"intent": {
"intentName": "LightsMaybe",
"probability": 0.49
},
"slots": []
}
"""
async_mock_intent(hass, 'LightsMaybe')
async_fire_mqtt_message(hass, 'hermes/intent/LightsMaybe',
payload)
await hass.async_block_till_done()
assert 'Intent below probaility threshold 0.49 < 0.5' in caplog.text
开发者ID:EarthlingRich,项目名称:home-assistant,代码行数:25,代码来源:test_snips.py
示例18: test_snips_intent_user
async def test_snips_intent_user(hass):
"""Test intentName format user_XXX__intentName."""
await async_mock_mqtt_component(hass)
result = await async_setup_component(hass, "snips", {
"snips": {},
})
assert result
payload = """
{
"input": "what to do",
"intent": {
"intentName": "user_ABCDEF123__Lights",
"confidenceScore": 1
},
"slots": []
}
"""
intents = async_mock_intent(hass, 'Lights')
async_fire_mqtt_message(hass, 'hermes/intent/user_ABCDEF123__Lights',
payload)
await hass.async_block_till_done()
assert len(intents) == 1
intent = intents[0]
assert intent.platform == 'snips'
assert intent.intent_type == 'Lights'
开发者ID:boced66,项目名称:home-assistant,代码行数:27,代码来源:test_init.py
示例19: test_brightness_scale
async def test_brightness_scale(hass, mqtt_mock):
"""Test for brightness scaling."""
assert await async_setup_component(hass, light.DOMAIN, {
light.DOMAIN: {
'platform': 'mqtt',
'schema': 'json',
'name': 'test',
'state_topic': 'test_light_bright_scale',
'command_topic': 'test_light_bright_scale/set',
'brightness': True,
'brightness_scale': 99
}
})
state = hass.states.get('light.test')
assert STATE_OFF == state.state
assert state.attributes.get('brightness') is None
assert not state.attributes.get(ATTR_ASSUMED_STATE)
# Turn on the light
async_fire_mqtt_message(hass, 'test_light_bright_scale', '{"state":"ON"}')
await hass.async_block_till_done()
state = hass.states.get('light.test')
assert STATE_ON == state.state
assert 255 == state.attributes.get('brightness')
# Turn on the light with brightness
async_fire_mqtt_message(hass, 'test_light_bright_scale',
'{"state":"ON", "brightness": 99}')
await hass.async_block_till_done()
state = hass.states.get('light.test')
assert STATE_ON == state.state
assert 255 == state.attributes.get('brightness')
开发者ID:Martwall,项目名称:home-assistant,代码行数:35,代码来源:test_light_json.py
示例20: test_discovery_expansion
def test_discovery_expansion(hass, mqtt_mock, caplog):
"""Test expansion of abbreviated discovery payload."""
entry = MockConfigEntry(domain=mqtt.DOMAIN)
yield from async_start(hass, 'homeassistant', {}, entry)
data = (
'{ "~": "some/base/topic",'
' "name": "DiscoveryExpansionTest1",'
' "stat_t": "test_topic/~",'
' "cmd_t": "~/test_topic" }'
)
async_fire_mqtt_message(
hass, 'homeassistant/switch/bla/config', data)
yield from hass.async_block_till_done()
state = hass.states.get('switch.DiscoveryExpansionTest1')
assert state is not None
assert state.name == 'DiscoveryExpansionTest1'
assert ('switch', 'bla') in hass.data[ALREADY_DISCOVERED]
assert state.state == STATE_OFF
async_fire_mqtt_message(hass, 'test_topic/some/base/topic',
'ON')
yield from hass.async_block_till_done()
yield from hass.async_block_till_done()
state = hass.states.get('switch.DiscoveryExpansionTest1')
assert state.state == STATE_ON
开发者ID:ManHammer,项目名称:home-assistant,代码行数:30,代码来源:test_discovery.py
注:本文中的tests.common.async_fire_mqtt_message函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论