• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python common.mock_state_change_event函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tests.common.mock_state_change_event函数的典型用法代码示例。如果您正苦于以下问题:Python mock_state_change_event函数的具体用法?Python mock_state_change_event怎么用?Python mock_state_change_event使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了mock_state_change_event函数的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_state_changed_event_sends_message_and_timestamp

    def test_state_changed_event_sends_message_and_timestamp(
            self,
            mock_utcnow,
            mock_pub):
        """"Test the sending of a message and timestamps if event changed."""
        e_id = 'another.entity'
        base_topic = 'pub'

        # Add the statestream component for publishing state updates
        assert self.add_statestream(base_topic=base_topic,
                                    publish_attributes=None,
                                    publish_timestamps=True)
        self.hass.block_till_done()

        # Reset the mock because it will have already gotten calls for the
        # mqtt_statestream state change on initialization, etc.
        mock_pub.reset_mock()

        # Set a state of an entity
        mock_state_change_event(self.hass, State(e_id, 'on'))
        self.hass.block_till_done()

        # Make sure 'on' was published to pub/fake/entity/state
        calls = [
            call.async_publish(self.hass, 'pub/another/entity/state', 'on', 1,
                               True),
            call.async_publish(self.hass, 'pub/another/entity/last_changed',
                               ANY, 1, True),
            call.async_publish(self.hass, 'pub/another/entity/last_updated',
                               ANY, 1, True),
        ]

        mock_pub.assert_has_calls(calls, any_order=True)
        assert mock_pub.called
开发者ID:BaptisteSim,项目名称:home-assistant,代码行数:34,代码来源:test_mqtt_statestream.py


示例2: test_state_changed_event_exclude_entity

    def test_state_changed_event_exclude_entity(self, mock_utcnow, mock_pub):
        """"Test that filtering on excluded entity works as expected."""
        base_topic = 'pub'

        incl = {}
        excl = {
            'entities': ['fake.entity2']
        }

        # Add the statestream component for publishing state updates
        # Set the filter to allow fake.* items
        assert self.add_statestream(base_topic=base_topic,
                                    publish_include=incl,
                                    publish_exclude=excl)
        self.hass.block_till_done()

        # Reset the mock because it will have already gotten calls for the
        # mqtt_statestream state change on initialization, etc.
        mock_pub.reset_mock()

        # Set a state of an entity
        mock_state_change_event(self.hass, State('fake.entity', 'on'))
        self.hass.block_till_done()

        # Make sure 'on' was published to pub/fake/entity/state
        mock_pub.assert_called_with(self.hass, 'pub/fake/entity/state', 'on',
                                    1, True)
        assert mock_pub.called

        mock_pub.reset_mock()
        # Set a state of an entity that shouldn't be included
        mock_state_change_event(self.hass, State('fake.entity2', 'on'))
        self.hass.block_till_done()

        assert not mock_pub.called
开发者ID:BaptisteSim,项目名称:home-assistant,代码行数:35,代码来源:test_mqtt_statestream.py


示例3: test_wrong_ignored_event_sends_over_stream

    def test_wrong_ignored_event_sends_over_stream(self, mock_pub):
        """Test the ignoring of sending events if defined."""
        assert self.add_eventstream(pub_topic='bar',
                                    ignore_event=['statee_changed'])
        self.hass.block_till_done()

        e_id = 'entity.test_id'
        event = {}
        event['event_type'] = EVENT_STATE_CHANGED
        new_state = {
            "state": "on",
            "entity_id": e_id,
            "attributes": {},
        }
        event['event_data'] = {"new_state": new_state, "entity_id": e_id}

        # Reset the mock because it will have already gotten calls for the
        # mqtt_eventstream state change on initialization, etc.
        mock_pub.reset_mock()

        # Set a state of an entity
        mock_state_change_event(self.hass, State(e_id, 'on'))
        self.hass.block_till_done()

        assert mock_pub.called
开发者ID:EarthlingRich,项目名称:home-assistant,代码行数:25,代码来源:test_init.py


示例4: test_get_states

    def test_get_states(self):
        """ Test getting states at a specific point in time. """
        self.init_recorder()
        states = []

        # Create 10 states for 5 different entities
        # After the first 5, sleep a second and save the time
        # history.get_states takes the latest states BEFORE point X

        for i in range(10):
            state = ha.State("test.point_in_time_{}".format(i % 5), "State {}".format(i), {"attribute_test": i})

            mock_state_change_event(self.hass, state)
            self.hass.pool.block_till_done()
            recorder._INSTANCE.block_till_done()

            if i < 5:
                states.append(state)

                if i == 4:
                    time.sleep(1)
                    point = dt_util.utcnow()

        self.assertEqual(states, sorted(history.get_states(point), key=lambda state: state.entity_id))

        # Test get_state here because we have a DB setup
        self.assertEqual(states[0], history.get_state(point, states[0].entity_id))
开发者ID:Luigiyo,项目名称:home-assistant,代码行数:27,代码来源:test_history.py


示例5: test_state_changed_attr_sends_message

    def test_state_changed_attr_sends_message(self, mock_utcnow, mock_pub):
        """"Test the sending of a new message if attribute changed."""
        e_id = 'fake.entity'
        base_topic = 'pub'

        # Add the statestream component for publishing state updates
        assert self.add_statestream(base_topic=base_topic,
                                    publish_attributes=True)
        self.hass.block_till_done()

        # Reset the mock because it will have already gotten calls for the
        # mqtt_statestream state change on initialization, etc.
        mock_pub.reset_mock()

        test_attributes = {"testing": "YES"}

        # Set a state of an entity
        mock_state_change_event(self.hass, State(e_id, 'off',
                                                 attributes=test_attributes))
        self.hass.block_till_done()

        # Make sure 'on' was published to pub/fake/entity/state
        calls = [
            call.async_publish(self.hass, 'pub/fake/entity/state', 'off', 1,
                               True),
            call.async_publish(self.hass, 'pub/fake/entity/testing', 'YES',
                               1, True)
        ]

        mock_pub.assert_has_calls(calls, any_order=True)
        assert mock_pub.called
开发者ID:fripsy,项目名称:home-assistant,代码行数:31,代码来源:test_mqtt_statestream.py


示例6: test_get_states

    def test_get_states(self):
        """Test getting states at a specific point in time."""
        self.init_recorder()
        states = []

        now = dt_util.utcnow()
        with patch('homeassistant.components.recorder.dt_util.utcnow',
                   return_value=now):
            for i in range(5):
                state = ha.State(
                    'test.point_in_time_{}'.format(i % 5),
                    "State {}".format(i),
                    {'attribute_test': i})

                mock_state_change_event(self.hass, state)

                states.append(state)

            self.wait_recording_done()

        future = now + timedelta(seconds=1)
        with patch('homeassistant.components.recorder.dt_util.utcnow',
                   return_value=future):
            for i in range(5):
                state = ha.State(
                    'test.point_in_time_{}'.format(i % 5),
                    "State {}".format(i),
                    {'attribute_test': i})

                mock_state_change_event(self.hass, state)

            self.wait_recording_done()

        # Get states returns everything before POINT
        for state1, state2 in zip(
                states, sorted(history.get_states(self.hass, future),
                               key=lambda state: state.entity_id)):
            assert state1 == state2

        # Test get_state here because we have a DB setup
        self.assertEqual(
            states[0], history.get_state(self.hass, future,
                                         states[0].entity_id))
开发者ID:EarthlingRich,项目名称:home-assistant,代码行数:43,代码来源:test_history.py


示例7: test_state_changed_event_sends_message

    def test_state_changed_event_sends_message(self, mock_utcnow, mock_pub):
        """Test the sending of a new message if event changed."""
        now = dt_util.as_utc(dt_util.now())
        e_id = 'fake.entity'
        pub_topic = 'bar'
        mock_utcnow.return_value = now

        # Add the eventstream component for publishing events
        assert self.add_eventstream(pub_topic=pub_topic)
        self.hass.block_till_done()

        # Reset the mock because it will have already gotten calls for the
        # mqtt_eventstream state change on initialization, etc.
        mock_pub.reset_mock()

        # Set a state of an entity
        mock_state_change_event(self.hass, State(e_id, 'on'))
        self.hass.block_till_done()

        # The order of the JSON is indeterminate,
        # so first just check that publish was called
        mock_pub.assert_called_with(self.hass, pub_topic, ANY)
        assert mock_pub.called

        # Get the actual call to publish and make sure it was the one
        # we were looking for
        msg = mock_pub.call_args[0][2]
        event = {}
        event['event_type'] = EVENT_STATE_CHANGED
        new_state = {
            "last_updated": now.isoformat(),
            "state": "on",
            "entity_id": e_id,
            "attributes": {},
            "last_changed": now.isoformat(),
        }
        event['event_data'] = {"new_state": new_state, "entity_id": e_id}

        # Verify that the message received was that expected
        result = json.loads(msg)
        result['event_data']['new_state'].pop('context')
        assert result == event
开发者ID:EarthlingRich,项目名称:home-assistant,代码行数:42,代码来源:test_init.py


示例8: test_state_changed_event_sends_message

    def test_state_changed_event_sends_message(self, mock_utcnow, mock_pub):
        """"Test the sending of a new message if event changed."""
        e_id = 'fake.entity'
        base_topic = 'pub'

        # Add the statestream component for publishing state updates
        assert self.add_statestream(base_topic=base_topic)
        self.hass.block_till_done()

        # Reset the mock because it will have already gotten calls for the
        # mqtt_statestream state change on initialization, etc.
        mock_pub.reset_mock()

        # Set a state of an entity
        mock_state_change_event(self.hass, State(e_id, 'on'))
        self.hass.block_till_done()

        # Make sure 'on' was published to pub/fake/entity/state
        mock_pub.assert_called_with(self.hass, 'pub/fake/entity/state', 'on',
                                    1, True)
        assert mock_pub.called
开发者ID:BaptisteSim,项目名称:home-assistant,代码行数:21,代码来源:test_mqtt_statestream.py


示例9: test_state_changed_event_sends_message

    def test_state_changed_event_sends_message(self, mock_datetime, mock_pub):
        now = '00:19:19 11-01-2016'
        e_id = 'fake.entity'
        pub_topic = 'bar'
        mock_datetime.return_value = now

        # Add the eventstream component for publishing events
        self.assertTrue(self.add_eventstream(pub_topic=pub_topic))
        self.hass.pool.block_till_done()

        # Reset the mock because it will have already gotten calls for the
        # mqtt_eventstream state change on initialization, etc.
        mock_pub.reset_mock()

        # Set a state of an entity
        mock_state_change_event(self.hass, State(e_id, 'on'))
        self.hass.pool.block_till_done()

        # The order of the JSON is indeterminate,
        # so first just check that publish was called
        mock_pub.assert_called_with(self.hass, pub_topic, ANY)
        self.assertTrue(mock_pub.called)

        # Get the actual call to publish and make sure it was the one
        # we were looking for
        msg = mock_pub.call_args[0][2]
        event = {}
        event['event_type'] = EVENT_STATE_CHANGED
        new_state = {
            "last_updated": now,
            "state": "on",
            "entity_id": e_id,
            "attributes": {},
            "last_changed": now
        }
        event['event_data'] = {"new_state": new_state, "entity_id": e_id}

        # Verify that the message received was that expected
        self.assertEqual(json.loads(msg), event)
开发者ID:100dayproject,项目名称:home-assistant,代码行数:39,代码来源:test_mqtt_eventstream.py


示例10: test_get_states

    def test_get_states(self):
        """ Test getting states at a specific point in time. """
        self.init_recorder()
        states = []

        for i in range(5):
            state = ha.State(
                'test.point_in_time_{}'.format(i % 5),
                "State {}".format(i),
                {'attribute_test': i})

            mock_state_change_event(self.hass, state)
            self.hass.pool.block_till_done()

            states.append(state)

        recorder._INSTANCE.block_till_done()

        point = dt_util.utcnow() + timedelta(seconds=1)

        with patch('homeassistant.util.dt.utcnow', return_value=point):
            for i in range(5):
                state = ha.State(
                    'test.point_in_time_{}'.format(i % 5),
                    "State {}".format(i),
                    {'attribute_test': i})

                mock_state_change_event(self.hass, state)
                self.hass.pool.block_till_done()

        # Get states returns everything before POINT
        self.assertEqual(states,
                         sorted(history.get_states(point),
                                key=lambda state: state.entity_id))

        # Test get_state here because we have a DB setup
        self.assertEqual(
            states[0], history.get_state(point, states[0].entity_id))
开发者ID:LucasZielke,项目名称:home-assistant,代码行数:38,代码来源:test_history.py



注:本文中的tests.common.mock_state_change_event函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python common.patch_yaml_files函数代码示例发布时间:2022-05-27
下一篇:
Python common.mock_service函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap