本文整理汇总了Python中tests.common.mock_state_change_event函数的典型用法代码示例。如果您正苦于以下问题:Python mock_state_change_event函数的具体用法?Python mock_state_change_event怎么用?Python mock_state_change_event使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了mock_state_change_event函数的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_state_changed_event_sends_message_and_timestamp
def test_state_changed_event_sends_message_and_timestamp(
self,
mock_utcnow,
mock_pub):
""""Test the sending of a message and timestamps if event changed."""
e_id = 'another.entity'
base_topic = 'pub'
# Add the statestream component for publishing state updates
assert self.add_statestream(base_topic=base_topic,
publish_attributes=None,
publish_timestamps=True)
self.hass.block_till_done()
# Reset the mock because it will have already gotten calls for the
# mqtt_statestream state change on initialization, etc.
mock_pub.reset_mock()
# Set a state of an entity
mock_state_change_event(self.hass, State(e_id, 'on'))
self.hass.block_till_done()
# Make sure 'on' was published to pub/fake/entity/state
calls = [
call.async_publish(self.hass, 'pub/another/entity/state', 'on', 1,
True),
call.async_publish(self.hass, 'pub/another/entity/last_changed',
ANY, 1, True),
call.async_publish(self.hass, 'pub/another/entity/last_updated',
ANY, 1, True),
]
mock_pub.assert_has_calls(calls, any_order=True)
assert mock_pub.called
开发者ID:BaptisteSim,项目名称:home-assistant,代码行数:34,代码来源:test_mqtt_statestream.py
示例2: test_state_changed_event_exclude_entity
def test_state_changed_event_exclude_entity(self, mock_utcnow, mock_pub):
""""Test that filtering on excluded entity works as expected."""
base_topic = 'pub'
incl = {}
excl = {
'entities': ['fake.entity2']
}
# Add the statestream component for publishing state updates
# Set the filter to allow fake.* items
assert self.add_statestream(base_topic=base_topic,
publish_include=incl,
publish_exclude=excl)
self.hass.block_till_done()
# Reset the mock because it will have already gotten calls for the
# mqtt_statestream state change on initialization, etc.
mock_pub.reset_mock()
# Set a state of an entity
mock_state_change_event(self.hass, State('fake.entity', 'on'))
self.hass.block_till_done()
# Make sure 'on' was published to pub/fake/entity/state
mock_pub.assert_called_with(self.hass, 'pub/fake/entity/state', 'on',
1, True)
assert mock_pub.called
mock_pub.reset_mock()
# Set a state of an entity that shouldn't be included
mock_state_change_event(self.hass, State('fake.entity2', 'on'))
self.hass.block_till_done()
assert not mock_pub.called
开发者ID:BaptisteSim,项目名称:home-assistant,代码行数:35,代码来源:test_mqtt_statestream.py
示例3: test_wrong_ignored_event_sends_over_stream
def test_wrong_ignored_event_sends_over_stream(self, mock_pub):
"""Test the ignoring of sending events if defined."""
assert self.add_eventstream(pub_topic='bar',
ignore_event=['statee_changed'])
self.hass.block_till_done()
e_id = 'entity.test_id'
event = {}
event['event_type'] = EVENT_STATE_CHANGED
new_state = {
"state": "on",
"entity_id": e_id,
"attributes": {},
}
event['event_data'] = {"new_state": new_state, "entity_id": e_id}
# Reset the mock because it will have already gotten calls for the
# mqtt_eventstream state change on initialization, etc.
mock_pub.reset_mock()
# Set a state of an entity
mock_state_change_event(self.hass, State(e_id, 'on'))
self.hass.block_till_done()
assert mock_pub.called
开发者ID:EarthlingRich,项目名称:home-assistant,代码行数:25,代码来源:test_init.py
示例4: test_get_states
def test_get_states(self):
""" Test getting states at a specific point in time. """
self.init_recorder()
states = []
# Create 10 states for 5 different entities
# After the first 5, sleep a second and save the time
# history.get_states takes the latest states BEFORE point X
for i in range(10):
state = ha.State("test.point_in_time_{}".format(i % 5), "State {}".format(i), {"attribute_test": i})
mock_state_change_event(self.hass, state)
self.hass.pool.block_till_done()
recorder._INSTANCE.block_till_done()
if i < 5:
states.append(state)
if i == 4:
time.sleep(1)
point = dt_util.utcnow()
self.assertEqual(states, sorted(history.get_states(point), key=lambda state: state.entity_id))
# Test get_state here because we have a DB setup
self.assertEqual(states[0], history.get_state(point, states[0].entity_id))
开发者ID:Luigiyo,项目名称:home-assistant,代码行数:27,代码来源:test_history.py
示例5: test_state_changed_attr_sends_message
def test_state_changed_attr_sends_message(self, mock_utcnow, mock_pub):
""""Test the sending of a new message if attribute changed."""
e_id = 'fake.entity'
base_topic = 'pub'
# Add the statestream component for publishing state updates
assert self.add_statestream(base_topic=base_topic,
publish_attributes=True)
self.hass.block_till_done()
# Reset the mock because it will have already gotten calls for the
# mqtt_statestream state change on initialization, etc.
mock_pub.reset_mock()
test_attributes = {"testing": "YES"}
# Set a state of an entity
mock_state_change_event(self.hass, State(e_id, 'off',
attributes=test_attributes))
self.hass.block_till_done()
# Make sure 'on' was published to pub/fake/entity/state
calls = [
call.async_publish(self.hass, 'pub/fake/entity/state', 'off', 1,
True),
call.async_publish(self.hass, 'pub/fake/entity/testing', 'YES',
1, True)
]
mock_pub.assert_has_calls(calls, any_order=True)
assert mock_pub.called
开发者ID:fripsy,项目名称:home-assistant,代码行数:31,代码来源:test_mqtt_statestream.py
示例6: test_get_states
def test_get_states(self):
"""Test getting states at a specific point in time."""
self.init_recorder()
states = []
now = dt_util.utcnow()
with patch('homeassistant.components.recorder.dt_util.utcnow',
return_value=now):
for i in range(5):
state = ha.State(
'test.point_in_time_{}'.format(i % 5),
"State {}".format(i),
{'attribute_test': i})
mock_state_change_event(self.hass, state)
states.append(state)
self.wait_recording_done()
future = now + timedelta(seconds=1)
with patch('homeassistant.components.recorder.dt_util.utcnow',
return_value=future):
for i in range(5):
state = ha.State(
'test.point_in_time_{}'.format(i % 5),
"State {}".format(i),
{'attribute_test': i})
mock_state_change_event(self.hass, state)
self.wait_recording_done()
# Get states returns everything before POINT
for state1, state2 in zip(
states, sorted(history.get_states(self.hass, future),
key=lambda state: state.entity_id)):
assert state1 == state2
# Test get_state here because we have a DB setup
self.assertEqual(
states[0], history.get_state(self.hass, future,
states[0].entity_id))
开发者ID:EarthlingRich,项目名称:home-assistant,代码行数:43,代码来源:test_history.py
示例7: test_state_changed_event_sends_message
def test_state_changed_event_sends_message(self, mock_utcnow, mock_pub):
"""Test the sending of a new message if event changed."""
now = dt_util.as_utc(dt_util.now())
e_id = 'fake.entity'
pub_topic = 'bar'
mock_utcnow.return_value = now
# Add the eventstream component for publishing events
assert self.add_eventstream(pub_topic=pub_topic)
self.hass.block_till_done()
# Reset the mock because it will have already gotten calls for the
# mqtt_eventstream state change on initialization, etc.
mock_pub.reset_mock()
# Set a state of an entity
mock_state_change_event(self.hass, State(e_id, 'on'))
self.hass.block_till_done()
# The order of the JSON is indeterminate,
# so first just check that publish was called
mock_pub.assert_called_with(self.hass, pub_topic, ANY)
assert mock_pub.called
# Get the actual call to publish and make sure it was the one
# we were looking for
msg = mock_pub.call_args[0][2]
event = {}
event['event_type'] = EVENT_STATE_CHANGED
new_state = {
"last_updated": now.isoformat(),
"state": "on",
"entity_id": e_id,
"attributes": {},
"last_changed": now.isoformat(),
}
event['event_data'] = {"new_state": new_state, "entity_id": e_id}
# Verify that the message received was that expected
result = json.loads(msg)
result['event_data']['new_state'].pop('context')
assert result == event
开发者ID:EarthlingRich,项目名称:home-assistant,代码行数:42,代码来源:test_init.py
示例8: test_state_changed_event_sends_message
def test_state_changed_event_sends_message(self, mock_utcnow, mock_pub):
""""Test the sending of a new message if event changed."""
e_id = 'fake.entity'
base_topic = 'pub'
# Add the statestream component for publishing state updates
assert self.add_statestream(base_topic=base_topic)
self.hass.block_till_done()
# Reset the mock because it will have already gotten calls for the
# mqtt_statestream state change on initialization, etc.
mock_pub.reset_mock()
# Set a state of an entity
mock_state_change_event(self.hass, State(e_id, 'on'))
self.hass.block_till_done()
# Make sure 'on' was published to pub/fake/entity/state
mock_pub.assert_called_with(self.hass, 'pub/fake/entity/state', 'on',
1, True)
assert mock_pub.called
开发者ID:BaptisteSim,项目名称:home-assistant,代码行数:21,代码来源:test_mqtt_statestream.py
示例9: test_state_changed_event_sends_message
def test_state_changed_event_sends_message(self, mock_datetime, mock_pub):
now = '00:19:19 11-01-2016'
e_id = 'fake.entity'
pub_topic = 'bar'
mock_datetime.return_value = now
# Add the eventstream component for publishing events
self.assertTrue(self.add_eventstream(pub_topic=pub_topic))
self.hass.pool.block_till_done()
# Reset the mock because it will have already gotten calls for the
# mqtt_eventstream state change on initialization, etc.
mock_pub.reset_mock()
# Set a state of an entity
mock_state_change_event(self.hass, State(e_id, 'on'))
self.hass.pool.block_till_done()
# The order of the JSON is indeterminate,
# so first just check that publish was called
mock_pub.assert_called_with(self.hass, pub_topic, ANY)
self.assertTrue(mock_pub.called)
# Get the actual call to publish and make sure it was the one
# we were looking for
msg = mock_pub.call_args[0][2]
event = {}
event['event_type'] = EVENT_STATE_CHANGED
new_state = {
"last_updated": now,
"state": "on",
"entity_id": e_id,
"attributes": {},
"last_changed": now
}
event['event_data'] = {"new_state": new_state, "entity_id": e_id}
# Verify that the message received was that expected
self.assertEqual(json.loads(msg), event)
开发者ID:100dayproject,项目名称:home-assistant,代码行数:39,代码来源:test_mqtt_eventstream.py
示例10: test_get_states
def test_get_states(self):
""" Test getting states at a specific point in time. """
self.init_recorder()
states = []
for i in range(5):
state = ha.State(
'test.point_in_time_{}'.format(i % 5),
"State {}".format(i),
{'attribute_test': i})
mock_state_change_event(self.hass, state)
self.hass.pool.block_till_done()
states.append(state)
recorder._INSTANCE.block_till_done()
point = dt_util.utcnow() + timedelta(seconds=1)
with patch('homeassistant.util.dt.utcnow', return_value=point):
for i in range(5):
state = ha.State(
'test.point_in_time_{}'.format(i % 5),
"State {}".format(i),
{'attribute_test': i})
mock_state_change_event(self.hass, state)
self.hass.pool.block_till_done()
# Get states returns everything before POINT
self.assertEqual(states,
sorted(history.get_states(point),
key=lambda state: state.entity_id))
# Test get_state here because we have a DB setup
self.assertEqual(
states[0], history.get_state(point, states[0].entity_id))
开发者ID:LucasZielke,项目名称:home-assistant,代码行数:38,代码来源:test_history.py
注:本文中的tests.common.mock_state_change_event函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论