本文整理汇总了Python中testify.assert_raises函数的典型用法代码示例。如果您正苦于以下问题:Python assert_raises函数的具体用法?Python assert_raises怎么用?Python assert_raises使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了assert_raises函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_failed_job
def test_failed_job(self):
mr_job = MRTwoStepJob(['-r', 'emr', '-v',
'-c', self.mrjob_conf_path])
mr_job.sandbox()
self.add_mock_s3_data({'walrus': {}})
self.mock_emr_failures = {('j-MOCKJOBFLOW0', 0): None}
with mr_job.make_runner() as runner:
assert isinstance(runner, EMRJobRunner)
with logger_disabled('mrjob.emr'):
assert_raises(Exception, runner.run)
emr_conn = botoemr.EmrConnection()
job_flow_id = runner.get_emr_job_flow_id()
for i in range(10):
emr_conn.simulate_progress(job_flow_id)
job_flow = emr_conn.describe_jobflow(job_flow_id)
assert_equal(job_flow.state, 'FAILED')
# job should get terminated on cleanup
emr_conn = runner.make_emr_conn()
job_flow_id = runner.get_emr_job_flow_id()
for i in range(10):
emr_conn.simulate_progress(job_flow_id)
job_flow = emr_conn.describe_jobflow(job_flow_id)
assert_equal(job_flow.state, 'TERMINATED')
开发者ID:boursier,项目名称:mrjob,代码行数:30,代码来源:emr_test.py
示例2: test_unescape
def test_unescape(self):
# cases covered by string_escape:
assert_equal(counter_unescape(r'\n'), '\n')
assert_equal(counter_unescape(r'\\'), '\\')
# cases covered by manual unescape:
assert_equal(counter_unescape(r'\.'), '.')
assert_raises(ValueError, counter_unescape, '\\')
开发者ID:gimlids,项目名称:LTPM,代码行数:7,代码来源:parse_test.py
示例3: test_deprecated_mapper_final_positional_arg
def test_deprecated_mapper_final_positional_arg(self):
def mapper(k, v):
pass
def reducer(k, v):
pass
def mapper_final():
pass
stderr = StringIO()
with no_handlers_for_logger():
log_to_stream('mrjob.job', stderr)
step = MRJob.mr(mapper, reducer, mapper_final)
# should be allowed to specify mapper_final as a positional arg,
# but we log a warning
assert_equal(step, MRJob.mr(mapper=mapper,
reducer=reducer,
mapper_final=mapper_final))
assert_in('mapper_final should be specified', stderr.getvalue())
# can't specify mapper_final as a positional and keyword arg
assert_raises(
TypeError,
MRJob.mr, mapper, reducer, mapper_final, mapper_final=mapper_final)
开发者ID:gimlids,项目名称:LTPM,代码行数:26,代码来源:job_test.py
示例4: test_read_only
def test_read_only(self):
# Read mode exects db to already exist
testify.assert_raises(
sqlite3dbm.dbm.error,
lambda: sqlite3dbm.dbm.SqliteMap(self.path, flag='r'),
)
# Create the db then re-open read-only
smap = sqlite3dbm.dbm.SqliteMap(self.path, flag='c')
smap = sqlite3dbm.dbm.SqliteMap(self.path, flag='r')
# Check that all mutators raise exceptions
mutator_raises = lambda callable_method: testify.assert_raises(
sqlite3dbm.dbm.error,
callable_method
)
def do_setitem():
smap['foo'] = 'bar'
mutator_raises(do_setitem)
def do_delitem():
del smap['foo']
mutator_raises(do_delitem)
mutator_raises(lambda: smap.clear())
mutator_raises(lambda: smap.pop('foo'))
mutator_raises(lambda: smap.popitem())
mutator_raises(lambda: smap.update({'baz': 'qux'}))
开发者ID:Yelp,项目名称:sqlite3dbm,代码行数:25,代码来源:dbm_test.py
示例5: test_mapper_and_reducer_as_positional_args
def test_mapper_and_reducer_as_positional_args(self):
def mapper(k, v):
pass
def reducer(k, v):
pass
def combiner(k, v):
pass
assert_equal(MRJob.mr(mapper), MRJob.mr(mapper=mapper))
assert_equal(MRJob.mr(mapper, reducer),
MRJob.mr(mapper=mapper, reducer=reducer))
assert_equal(MRJob.mr(mapper, reducer=reducer),
MRJob.mr(mapper=mapper, reducer=reducer))
assert_equal(MRJob.mr(mapper, reducer, combiner=combiner),
MRJob.mr(mapper=mapper, reducer=reducer,
combiner=combiner))
# can't specify something as a positional and keyword arg
assert_raises(TypeError,
MRJob.mr, mapper, mapper=mapper)
assert_raises(TypeError,
MRJob.mr, mapper, reducer, reducer=reducer)
开发者ID:gimlids,项目名称:LTPM,代码行数:27,代码来源:job_test.py
示例6: test_output
def test_output(self):
# check plugin output length both for hosts
self.sr.send_host("test_host", 0, util.get_chrs(send_nsca.nsca.MAX_PLUGINOUTPUT_LENGTH - 1))
assert_raises(ValueError, self.sr.send_host, "test_host", 0, util.get_chrs(send_nsca.nsca.MAX_PLUGINOUTPUT_LENGTH + 1))
# and for services
self.sr.send_service("test_host", "test_service", 0, util.get_chrs(send_nsca.nsca.MAX_PLUGINOUTPUT_LENGTH - 1))
assert_raises(ValueError, self.sr.send_service, "test_host", "test_service", 0, util.get_chrs(send_nsca.nsca.MAX_PLUGINOUTPUT_LENGTH + 1))
开发者ID:jm-welch,项目名称:send_nsca,代码行数:7,代码来源:limits.py
示例7: test_getitem
def test_getitem(self):
self.smap['jugglers'] = 'awesomesauce'
testify.assert_equal(self.smap['jugglers'], 'awesomesauce')
testify.assert_raises(
KeyError,
lambda: self.smap['unicyclers']
)
开发者ID:Yelp,项目名称:sqlite3dbm,代码行数:7,代码来源:dbm_test.py
示例8: test_get_encryption_method
def test_get_encryption_method(self):
# map from crypter id to whether or not it should succeed
crypters = {
0: True,
1: True,
2: True,
3: True,
4: True,
5: False,
6: False,
7: False,
8: True,
9: False,
10: False,
14: True,
15: True,
16: True,
255: False
}
for crypter, success in crypters.iteritems():
stream = cStringIO.StringIO()
stream.write("encryption_method = %d\n" % crypter)
if success:
self.sender.parse_config(stream)
assert_equal(self.sender.encryption_method_i, crypter)
else:
assert_raises(send_nsca.nsca.ConfigParseError, self.sender.parse_config, stream)
开发者ID:jm-welch,项目名称:send_nsca,代码行数:27,代码来源:config.py
示例9: test_percent_escaping
def test_percent_escaping(self):
assert_equal(strftime(y(2011), '110%%'), ['110%'])
# don't incorrectly grab % out of %% to do globbing
assert_equal(strftime(y(2011), '%m %%m %%%m'), ['* %m %*'])
# catch invalid strftime string
assert_raises(ValueError, strftime, y(2011), '110%')
开发者ID:davidmarin,项目名称:dateglob,代码行数:8,代码来源:dateglob_test.py
示例10: test_hostname
def test_hostname(self):
# check that we can send valid packets
self.sr.send_host(util.get_chrs(send_nsca.nsca.MAX_HOSTNAME_LENGTH - 1), 0, 'ok')
self.sr.send_host(util.get_chrs(send_nsca.nsca.MAX_HOSTNAME_LENGTH), 0, 'ok')
# check that we cannot send invalid packets
assert_raises(ValueError, self.sr.send_host, util.get_chrs(send_nsca.nsca.MAX_HOSTNAME_LENGTH + 1), 0, 'ok')
# ascii only
assert_raises(ValueError, self.sr.send_host, u"\xff\xf302", 0, 'ok')
开发者ID:jm-welch,项目名称:send_nsca,代码行数:8,代码来源:limits.py
示例11: test_mock_configuration_context_manager
def test_mock_configuration_context_manager(self):
one = self.getters.get('one')
three = self.getters.get_int('three', default=3)
with testing.MockConfiguration(dict(one=7), namespace=self.namespace):
assert_equal(one, 7)
assert_equal(three, 3)
assert_raises(errors.ConfigurationError, self.getters.get('one'))
开发者ID:eevee,项目名称:PyStaticConfiguration,代码行数:8,代码来源:integration_test.py
示例12: test_mock_configuration_context_manager
def test_mock_configuration_context_manager(self):
one = staticconf.get("one")
three = staticconf.get_int("three", default=3)
with testing.MockConfiguration(dict(one=7)):
assert_equal(one, 7)
assert_equal(three, 3)
assert_raises(errors.ConfigurationError, staticconf.get("one"))
开发者ID:Roguelazer,项目名称:PyStaticConfiguration,代码行数:8,代码来源:integration_test.py
示例13: test_historical_data_append_arms_with_variance_invalid
def test_historical_data_append_arms_with_variance_invalid(self):
"""Test that adding arms with variance causes a ValueError."""
historical_info = copy.deepcopy(self.three_arms_with_variance_no_unsampled_arm_test_case)
T.assert_raises(
ValueError,
historical_info.append_sample_arms,
self.three_arms_with_variance_no_unsampled_arm_test_case.arms_sampled
)
开发者ID:Recmo,项目名称:MOE,代码行数:8,代码来源:data_containers_test.py
示例14: test_stale_module_check
def test_stale_module_check(self):
test_settings = copy.deepcopy(Settings)
repo_path = tempfile.mkdtemp(prefix="pushmanager")
submodule_path = tempfile.mkdtemp(prefix="pushmanager")
self.temp_git_dirs.append(repo_path)
self.temp_git_dirs.append(submodule_path)
test_settings['git']['local_repo_path'] = repo_path
# Create main repo
GitCommand('init', repo_path, cwd=repo_path).run()
# Prevent Git complaints about names
GitCommand('config', 'user.email', '[email protected]', cwd=repo_path).run()
GitCommand('config', 'user.name', 'pushmanager tester', cwd=repo_path).run()
with open(os.path.join(repo_path, "code.py"), 'w') as f:
f.write('#!/usr/bin/env python\n\nprint("Hello World!")\nPrint("Goodbye!")\n')
GitCommand('add', repo_path, cwd=repo_path).run()
GitCommand('commit', '-a', '-m', 'Master Commit', cwd=repo_path).run()
# Create repo to use as submodule
GitCommand('init', submodule_path, cwd=submodule_path).run()
# Prevent Git complaints about names
GitCommand('config', 'user.email', '[email protected]', cwd=submodule_path).run()
GitCommand('config', 'user.name', 'pushmanager tester', cwd=submodule_path).run()
with open(os.path.join(submodule_path, "codemodule.py"), 'w') as f:
f.write('#!/usr/bin/env python\n\nprint("Hello World!")\nPrint("Goodbye!")\n')
GitCommand('add', submodule_path, cwd=submodule_path).run()
GitCommand('commit', '-a', '-m', 'Master Commit', cwd=submodule_path).run()
# Make two incompatible branches in the submodule
GitCommand('checkout', '-b', 'change_german', cwd=submodule_path).run()
with open(os.path.join(submodule_path, "codemodule.py"), 'w') as f:
f.write('#!/usr/bin/env python\n\nprint("Hallo Welt!")\nPrint("Goodbye!")\n')
GitCommand('commit', '-a', '-m', 'verpflichten', cwd=submodule_path).run()
GitCommand('checkout', 'master', cwd=submodule_path).run()
GitCommand('checkout', '-b', 'change_welsh', cwd=submodule_path).run()
with open(os.path.join(submodule_path, "codemodule.py"), 'w') as f:
f.write('#!/usr/bin/env python\n\nprint("Helo Byd!")\nPrint("Goodbye!")\n')
GitCommand('commit', '-a', '-m', 'ymrwymo', cwd=submodule_path).run()
GitCommand('checkout', 'master', cwd=submodule_path).run()
# Add submodule at master to main repo
GitCommand('submodule', 'add', submodule_path, cwd=repo_path).run()
GitCommand('commit', '-a', '-m', 'Add submodule', cwd=repo_path).run()
# Create branches in main repo, have each switch submodule to different branch
internal_submodule_path = os.path.join(repo_path, submodule_path.split("/")[-1:][0])
GitCommand('checkout', '-b', 'change_german', cwd=repo_path).run()
GitCommand('checkout', 'change_german', cwd=internal_submodule_path).run()
GitCommand('commit', '-a', '-m', 'verpflichten', cwd=repo_path).run()
GitCommand('checkout', 'master', cwd=repo_path).run()
GitCommand('checkout', '-b', 'change_welsh', cwd=repo_path).run()
GitCommand('commit', '-a', '-m', 'ymrwymo', cwd=repo_path).run()
GitCommand('checkout', 'change_welsh', cwd=internal_submodule_path).run()
GitCommand('checkout', 'master', cwd=repo_path).run()
T.assert_raises(GitException, pushmanager.core.git._stale_submodule_check, repo_path)
开发者ID:Mango-J,项目名称:pushmanager,代码行数:58,代码来源:test_core_git.py
示例15: test_bad_option_types
def test_bad_option_types(self):
mr_job = MRJob()
assert_raises(
OptionError, mr_job.add_passthrough_option,
'--stop-words', dest='stop_words', type='set', default=None)
assert_raises(
OptionError, mr_job.add_passthrough_option,
'--leave-a-msg', dest='leave_a_msg', action='callback',
default=None)
开发者ID:Jyrsa,项目名称:mrjob,代码行数:9,代码来源:job_test.py
示例16: test_undecodable_output_strict
def test_undecodable_output_strict(self):
UNENCODABLE_RAW_INPUT = StringIO('foo\n' +
'\xaa\n' +
'bar\n')
mr_job = MRBoringJob(args=['--mapper', '--strict-protocols'])
mr_job.sandbox(stdin=UNENCODABLE_RAW_INPUT)
# make sure it raises an exception
assert_raises(Exception, mr_job.run_mapper)
开发者ID:Jyrsa,项目名称:mrjob,代码行数:10,代码来源:job_test.py
示例17: test_no_result_fails
def test_no_result_fails(self):
mock_getaddrinfo = mock.Mock(return_value=[])
# use a non-standard port so we can ensure that the calling worked
# right
test_port = 3770
with mock.patch('socket.getaddrinfo', mock_getaddrinfo):
assert_raises(socket.error, self.sender._sock_connect, 'test_host', test_port)
assert_raises(socket.error, self.sender.connect)
mock_getaddrinfo.assert_any_call('test_host', test_port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 0)
mock_getaddrinfo.assert_any_call('test_host', DEFAULT_PORT, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 0)
开发者ID:jm-welch,项目名称:send_nsca,代码行数:10,代码来源:conn.py
示例18: test_mock_configuration
def test_mock_configuration(self):
two = staticconf.get_string('two')
stars = staticconf.get_bool('stars')
mock_config = testing.MockConfiguration(dict(two=2, stars=False))
mock_config.setup()
assert_equal(two, '2')
assert not stars
mock_config.teardown()
assert_raises(errors.ConfigurationError, staticconf.get('two'))
开发者ID:analogue,项目名称:PyStaticConfiguration,代码行数:10,代码来源:integration_test.py
示例19: test_delitem
def test_delitem(self):
self.smap['boo'] = 'ahhh!'
testify.assert_equal(self.smap['boo'], 'ahhh!')
del self.smap['boo']
testify.assert_not_in('boo', self.smap)
testify.assert_raises(KeyError, lambda : self.smap['boo'])
def try_delete():
del self.smap['boo']
testify.assert_raises(KeyError, try_delete)
开发者ID:Yelp,项目名称:sqlite3dbm,代码行数:11,代码来源:dbm_test.py
示例20: test_gp_construction_singular_covariance_matrix
def test_gp_construction_singular_covariance_matrix(self):
"""Test that the GaussianProcess ctor indicates a singular covariance matrix when points_sampled contains duplicates (0 noise)."""
index = numpy.argmax(numpy.greater_equal(self.num_sampled_list, 1))
domain, gaussian_process = self.gp_test_environments[index]
point_one = SamplePoint([0.0] * domain.dim, 1.0, 0.0)
# points two and three have duplicate coordinates and we have noise_variance = 0.0
point_two = SamplePoint([1.0] * domain.dim, 1.0, 0.0)
point_three = point_two
historical_data = HistoricalData(len(point_one.point), [point_one, point_two, point_three])
T.assert_raises(C_GP.SingularMatrixException, GaussianProcess, gaussian_process.get_covariance_copy(), historical_data)
开发者ID:Recmo,项目名称:MOE,代码行数:11,代码来源:gaussian_process_test.py
注:本文中的testify.assert_raises函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论