• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python testify.assert_raises函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中testify.assert_raises函数的典型用法代码示例。如果您正苦于以下问题:Python assert_raises函数的具体用法?Python assert_raises怎么用?Python assert_raises使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了assert_raises函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_failed_job

    def test_failed_job(self):
        mr_job = MRTwoStepJob(['-r', 'emr', '-v',
                               '-c', self.mrjob_conf_path])
        mr_job.sandbox()

        self.add_mock_s3_data({'walrus': {}})
        self.mock_emr_failures = {('j-MOCKJOBFLOW0', 0): None}

        with mr_job.make_runner() as runner:
            assert isinstance(runner, EMRJobRunner)

            with logger_disabled('mrjob.emr'):
                assert_raises(Exception, runner.run)

            emr_conn = botoemr.EmrConnection()
            job_flow_id = runner.get_emr_job_flow_id()
            for i in range(10):
                emr_conn.simulate_progress(job_flow_id)

            job_flow = emr_conn.describe_jobflow(job_flow_id)
            assert_equal(job_flow.state, 'FAILED')

        # job should get terminated on cleanup
        emr_conn = runner.make_emr_conn()
        job_flow_id = runner.get_emr_job_flow_id()
        for i in range(10):
            emr_conn.simulate_progress(job_flow_id)

        job_flow = emr_conn.describe_jobflow(job_flow_id)
        assert_equal(job_flow.state, 'TERMINATED')
开发者ID:boursier,项目名称:mrjob,代码行数:30,代码来源:emr_test.py


示例2: test_unescape

 def test_unescape(self):
     # cases covered by string_escape:
     assert_equal(counter_unescape(r'\n'), '\n')
     assert_equal(counter_unescape(r'\\'), '\\')
     # cases covered by manual unescape:
     assert_equal(counter_unescape(r'\.'), '.')
     assert_raises(ValueError, counter_unescape, '\\')
开发者ID:gimlids,项目名称:LTPM,代码行数:7,代码来源:parse_test.py


示例3: test_deprecated_mapper_final_positional_arg

    def test_deprecated_mapper_final_positional_arg(self):
        def mapper(k, v):
            pass

        def reducer(k, v):
            pass

        def mapper_final():
            pass

        stderr = StringIO()
        with no_handlers_for_logger():
            log_to_stream('mrjob.job', stderr)
            step = MRJob.mr(mapper, reducer, mapper_final)

        # should be allowed to specify mapper_final as a positional arg,
        # but we log a warning
        assert_equal(step, MRJob.mr(mapper=mapper,
                                    reducer=reducer,
                                    mapper_final=mapper_final))
        assert_in('mapper_final should be specified', stderr.getvalue())

        # can't specify mapper_final as a positional and keyword arg
        assert_raises(
            TypeError,
            MRJob.mr, mapper, reducer, mapper_final, mapper_final=mapper_final)
开发者ID:gimlids,项目名称:LTPM,代码行数:26,代码来源:job_test.py


示例4: test_read_only

    def test_read_only(self):
        # Read mode exects db to already exist
        testify.assert_raises(
            sqlite3dbm.dbm.error,
            lambda: sqlite3dbm.dbm.SqliteMap(self.path, flag='r'),
        )
        # Create the db then re-open read-only
        smap = sqlite3dbm.dbm.SqliteMap(self.path, flag='c')
        smap = sqlite3dbm.dbm.SqliteMap(self.path, flag='r')

        # Check that all mutators raise exceptions
        mutator_raises = lambda callable_method: testify.assert_raises(
            sqlite3dbm.dbm.error,
            callable_method
        )
        def do_setitem():
            smap['foo'] = 'bar'
        mutator_raises(do_setitem)
        def do_delitem():
            del smap['foo']
        mutator_raises(do_delitem)
        mutator_raises(lambda: smap.clear())
        mutator_raises(lambda: smap.pop('foo'))
        mutator_raises(lambda: smap.popitem())
        mutator_raises(lambda: smap.update({'baz': 'qux'}))
开发者ID:Yelp,项目名称:sqlite3dbm,代码行数:25,代码来源:dbm_test.py


示例5: test_mapper_and_reducer_as_positional_args

    def test_mapper_and_reducer_as_positional_args(self):
        def mapper(k, v):
            pass

        def reducer(k, v):
            pass

        def combiner(k, v):
            pass

        assert_equal(MRJob.mr(mapper), MRJob.mr(mapper=mapper))

        assert_equal(MRJob.mr(mapper, reducer),
                     MRJob.mr(mapper=mapper, reducer=reducer))

        assert_equal(MRJob.mr(mapper, reducer=reducer),
                     MRJob.mr(mapper=mapper, reducer=reducer))

        assert_equal(MRJob.mr(mapper, reducer, combiner=combiner),
                     MRJob.mr(mapper=mapper, reducer=reducer,
                              combiner=combiner))

        # can't specify something as a positional and keyword arg
        assert_raises(TypeError,
                      MRJob.mr, mapper, mapper=mapper)
        assert_raises(TypeError,
                      MRJob.mr, mapper, reducer, reducer=reducer)
开发者ID:gimlids,项目名称:LTPM,代码行数:27,代码来源:job_test.py


示例6: test_output

 def test_output(self):
     # check plugin output length both for hosts
     self.sr.send_host("test_host", 0, util.get_chrs(send_nsca.nsca.MAX_PLUGINOUTPUT_LENGTH - 1))
     assert_raises(ValueError, self.sr.send_host, "test_host", 0, util.get_chrs(send_nsca.nsca.MAX_PLUGINOUTPUT_LENGTH + 1))
     # and for services
     self.sr.send_service("test_host", "test_service", 0, util.get_chrs(send_nsca.nsca.MAX_PLUGINOUTPUT_LENGTH - 1))
     assert_raises(ValueError, self.sr.send_service, "test_host", "test_service", 0, util.get_chrs(send_nsca.nsca.MAX_PLUGINOUTPUT_LENGTH + 1))
开发者ID:jm-welch,项目名称:send_nsca,代码行数:7,代码来源:limits.py


示例7: test_getitem

 def test_getitem(self):
     self.smap['jugglers'] = 'awesomesauce'
     testify.assert_equal(self.smap['jugglers'], 'awesomesauce')
     testify.assert_raises(
         KeyError,
         lambda: self.smap['unicyclers']
     )
开发者ID:Yelp,项目名称:sqlite3dbm,代码行数:7,代码来源:dbm_test.py


示例8: test_get_encryption_method

 def test_get_encryption_method(self):
     # map from crypter id to whether or not it should succeed
     crypters = {
         0: True,
         1: True,
         2: True,
         3: True,
         4: True,
         5: False,
         6: False,
         7: False,
         8: True,
         9: False,
         10: False,
         14: True,
         15: True,
         16: True,
         255: False
     }
     for crypter, success in crypters.iteritems():
         stream = cStringIO.StringIO()
         stream.write("encryption_method = %d\n" % crypter)
         if success:
             self.sender.parse_config(stream)
             assert_equal(self.sender.encryption_method_i, crypter)
         else:
             assert_raises(send_nsca.nsca.ConfigParseError, self.sender.parse_config, stream)
开发者ID:jm-welch,项目名称:send_nsca,代码行数:27,代码来源:config.py


示例9: test_percent_escaping

    def test_percent_escaping(self):
        assert_equal(strftime(y(2011), '110%%'), ['110%'])

        # don't incorrectly grab % out of %% to do globbing
        assert_equal(strftime(y(2011), '%m %%m %%%m'), ['* %m %*'])

        # catch invalid strftime string
        assert_raises(ValueError, strftime, y(2011), '110%')
开发者ID:davidmarin,项目名称:dateglob,代码行数:8,代码来源:dateglob_test.py


示例10: test_hostname

 def test_hostname(self):
     # check that we can send valid packets
     self.sr.send_host(util.get_chrs(send_nsca.nsca.MAX_HOSTNAME_LENGTH - 1), 0, 'ok')
     self.sr.send_host(util.get_chrs(send_nsca.nsca.MAX_HOSTNAME_LENGTH), 0, 'ok')
     # check that we cannot send invalid packets
     assert_raises(ValueError, self.sr.send_host, util.get_chrs(send_nsca.nsca.MAX_HOSTNAME_LENGTH + 1), 0, 'ok')
     # ascii only
     assert_raises(ValueError, self.sr.send_host, u"\xff\xf302", 0, 'ok')
开发者ID:jm-welch,项目名称:send_nsca,代码行数:8,代码来源:limits.py


示例11: test_mock_configuration_context_manager

    def test_mock_configuration_context_manager(self):
        one = self.getters.get('one')
        three = self.getters.get_int('three', default=3)

        with testing.MockConfiguration(dict(one=7), namespace=self.namespace):
            assert_equal(one, 7)
            assert_equal(three, 3)
        assert_raises(errors.ConfigurationError, self.getters.get('one'))
开发者ID:eevee,项目名称:PyStaticConfiguration,代码行数:8,代码来源:integration_test.py


示例12: test_mock_configuration_context_manager

    def test_mock_configuration_context_manager(self):
        one = staticconf.get("one")
        three = staticconf.get_int("three", default=3)

        with testing.MockConfiguration(dict(one=7)):
            assert_equal(one, 7)
            assert_equal(three, 3)
        assert_raises(errors.ConfigurationError, staticconf.get("one"))
开发者ID:Roguelazer,项目名称:PyStaticConfiguration,代码行数:8,代码来源:integration_test.py


示例13: test_historical_data_append_arms_with_variance_invalid

 def test_historical_data_append_arms_with_variance_invalid(self):
     """Test that adding arms with variance causes a ValueError."""
     historical_info = copy.deepcopy(self.three_arms_with_variance_no_unsampled_arm_test_case)
     T.assert_raises(
             ValueError,
             historical_info.append_sample_arms,
             self.three_arms_with_variance_no_unsampled_arm_test_case.arms_sampled
             )
开发者ID:Recmo,项目名称:MOE,代码行数:8,代码来源:data_containers_test.py


示例14: test_stale_module_check

    def test_stale_module_check(self):
        test_settings = copy.deepcopy(Settings)
        repo_path = tempfile.mkdtemp(prefix="pushmanager")
        submodule_path = tempfile.mkdtemp(prefix="pushmanager")
        self.temp_git_dirs.append(repo_path)
        self.temp_git_dirs.append(submodule_path)
        test_settings['git']['local_repo_path'] = repo_path

        # Create main repo
        GitCommand('init', repo_path, cwd=repo_path).run()
        # Prevent Git complaints about names
        GitCommand('config', 'user.email', '[email protected]', cwd=repo_path).run()
        GitCommand('config', 'user.name', 'pushmanager tester', cwd=repo_path).run()
        with open(os.path.join(repo_path, "code.py"), 'w') as f:
            f.write('#!/usr/bin/env python\n\nprint("Hello World!")\nPrint("Goodbye!")\n')
        GitCommand('add', repo_path, cwd=repo_path).run()
        GitCommand('commit', '-a', '-m', 'Master Commit', cwd=repo_path).run()

        # Create repo to use as submodule
        GitCommand('init', submodule_path, cwd=submodule_path).run()
        # Prevent Git complaints about names
        GitCommand('config', 'user.email', '[email protected]', cwd=submodule_path).run()
        GitCommand('config', 'user.name', 'pushmanager tester', cwd=submodule_path).run()
        with open(os.path.join(submodule_path, "codemodule.py"), 'w') as f:
            f.write('#!/usr/bin/env python\n\nprint("Hello World!")\nPrint("Goodbye!")\n')
        GitCommand('add', submodule_path, cwd=submodule_path).run()
        GitCommand('commit', '-a', '-m', 'Master Commit', cwd=submodule_path).run()

        # Make two incompatible branches in the submodule
        GitCommand('checkout', '-b', 'change_german', cwd=submodule_path).run()
        with open(os.path.join(submodule_path, "codemodule.py"), 'w') as f:
            f.write('#!/usr/bin/env python\n\nprint("Hallo Welt!")\nPrint("Goodbye!")\n')
        GitCommand('commit', '-a', '-m', 'verpflichten', cwd=submodule_path).run()
        GitCommand('checkout', 'master', cwd=submodule_path).run()

        GitCommand('checkout', '-b', 'change_welsh', cwd=submodule_path).run()
        with open(os.path.join(submodule_path, "codemodule.py"), 'w') as f:
            f.write('#!/usr/bin/env python\n\nprint("Helo Byd!")\nPrint("Goodbye!")\n')
        GitCommand('commit', '-a', '-m', 'ymrwymo', cwd=submodule_path).run()
        GitCommand('checkout', 'master', cwd=submodule_path).run()

        # Add submodule at master to main repo
        GitCommand('submodule', 'add', submodule_path, cwd=repo_path).run()
        GitCommand('commit', '-a', '-m', 'Add submodule', cwd=repo_path).run()

        # Create branches in main repo, have each switch submodule to different branch
        internal_submodule_path = os.path.join(repo_path, submodule_path.split("/")[-1:][0])
        GitCommand('checkout', '-b', 'change_german', cwd=repo_path).run()
        GitCommand('checkout', 'change_german', cwd=internal_submodule_path).run()
        GitCommand('commit', '-a', '-m', 'verpflichten', cwd=repo_path).run()
        GitCommand('checkout', 'master', cwd=repo_path).run()

        GitCommand('checkout', '-b', 'change_welsh', cwd=repo_path).run()
        GitCommand('commit', '-a', '-m', 'ymrwymo', cwd=repo_path).run()
        GitCommand('checkout', 'change_welsh', cwd=internal_submodule_path).run()
        GitCommand('checkout', 'master', cwd=repo_path).run()

        T.assert_raises(GitException, pushmanager.core.git._stale_submodule_check, repo_path)
开发者ID:Mango-J,项目名称:pushmanager,代码行数:58,代码来源:test_core_git.py


示例15: test_bad_option_types

 def test_bad_option_types(self):
     mr_job = MRJob()
     assert_raises(
         OptionError, mr_job.add_passthrough_option,
         '--stop-words', dest='stop_words', type='set', default=None)
     assert_raises(
         OptionError, mr_job.add_passthrough_option,
         '--leave-a-msg', dest='leave_a_msg', action='callback',
         default=None)
开发者ID:Jyrsa,项目名称:mrjob,代码行数:9,代码来源:job_test.py


示例16: test_undecodable_output_strict

    def test_undecodable_output_strict(self):
        UNENCODABLE_RAW_INPUT = StringIO('foo\n' +
                                         '\xaa\n' +
                                         'bar\n')

        mr_job = MRBoringJob(args=['--mapper', '--strict-protocols'])
        mr_job.sandbox(stdin=UNENCODABLE_RAW_INPUT)
        
        # make sure it raises an exception
        assert_raises(Exception, mr_job.run_mapper)
开发者ID:Jyrsa,项目名称:mrjob,代码行数:10,代码来源:job_test.py


示例17: test_no_result_fails

 def test_no_result_fails(self):
     mock_getaddrinfo = mock.Mock(return_value=[])
     # use a non-standard port so we can ensure that the calling worked
     # right
     test_port = 3770
     with mock.patch('socket.getaddrinfo', mock_getaddrinfo):
         assert_raises(socket.error, self.sender._sock_connect, 'test_host', test_port)
         assert_raises(socket.error, self.sender.connect)
     mock_getaddrinfo.assert_any_call('test_host', test_port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 0)
     mock_getaddrinfo.assert_any_call('test_host', DEFAULT_PORT, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 0)
开发者ID:jm-welch,项目名称:send_nsca,代码行数:10,代码来源:conn.py


示例18: test_mock_configuration

    def test_mock_configuration(self):
        two = staticconf.get_string('two')
        stars = staticconf.get_bool('stars')

        mock_config = testing.MockConfiguration(dict(two=2, stars=False))
        mock_config.setup()
        assert_equal(two, '2')
        assert not stars
        mock_config.teardown()
        assert_raises(errors.ConfigurationError, staticconf.get('two'))
开发者ID:analogue,项目名称:PyStaticConfiguration,代码行数:10,代码来源:integration_test.py


示例19: test_delitem

    def test_delitem(self):
        self.smap['boo'] = 'ahhh!'
        testify.assert_equal(self.smap['boo'], 'ahhh!')
        del self.smap['boo']
        testify.assert_not_in('boo', self.smap)

        testify.assert_raises(KeyError, lambda : self.smap['boo'])

        def try_delete():
            del self.smap['boo']
        testify.assert_raises(KeyError, try_delete)
开发者ID:Yelp,项目名称:sqlite3dbm,代码行数:11,代码来源:dbm_test.py


示例20: test_gp_construction_singular_covariance_matrix

    def test_gp_construction_singular_covariance_matrix(self):
        """Test that the GaussianProcess ctor indicates a singular covariance matrix when points_sampled contains duplicates (0 noise)."""
        index = numpy.argmax(numpy.greater_equal(self.num_sampled_list, 1))
        domain, gaussian_process = self.gp_test_environments[index]
        point_one = SamplePoint([0.0] * domain.dim, 1.0, 0.0)
        # points two and three have duplicate coordinates and we have noise_variance = 0.0
        point_two = SamplePoint([1.0] * domain.dim, 1.0, 0.0)
        point_three = point_two

        historical_data = HistoricalData(len(point_one.point), [point_one, point_two, point_three])
        T.assert_raises(C_GP.SingularMatrixException, GaussianProcess, gaussian_process.get_covariance_copy(), historical_data)
开发者ID:Recmo,项目名称:MOE,代码行数:11,代码来源:gaussian_process_test.py



注:本文中的testify.assert_raises函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python testify.run函数代码示例发布时间:2022-05-27
下一篇:
Python testify.assert_not_reached函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap