本文整理汇总了Python中testify.assert_in函数的典型用法代码示例。如果您正苦于以下问题:Python assert_in函数的具体用法?Python assert_in怎么用?Python assert_in使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了assert_in函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_fail_after_a_while
def test_fail_after_a_while(self, print_exc_mock, print_warning_mock):
processes = vimap.pool.fork(
(worker_raise_exc_with_curleys.init_args(init=i) for i in xrange(100)), in_queue_size_factor=2
)
processes.imap([-1] * 3000 + list(range(50)))
# Check yielded output.
res_to_compare = []
for inp, out, typ in processes.zip_in_out_typ():
if typ == "exception":
res_to_compare.append((inp, serialize_error(out.value), typ))
else:
res_to_compare.append((inp, out, typ))
# All the -1s will produce None output.
expected_res_to_compare = [(-1, None, "output")] * 3000
# Once we get to the positive numbers, we start causing 50 of
# the 100 workers to throw exceptions.
expected_res_to_compare.extend(
[(i, serialize_error(ValueError("{0} curley braces!")), "exception") for i in range(50)]
)
T.assert_sorted_equal(res_to_compare, expected_res_to_compare)
# Check out exception logging.
calls = print_exc_mock.call_args_list
errors = [serialize_error(call_args[0].value) for call_args, _ in calls]
T.assert_equal(errors, [serialize_error(ValueError("{0} curley braces!"))] * 50)
# NOTE: Sometimes, the weakref in the pool is deleted, so 'has_exceptions' is
# not set, and the pool prints warnings we don't actually care about. Make
# sure that this is the only warning printed.
if print_warning_mock.call_args_list:
T.assert_equal(len(print_warning_mock.call_args_list), 1)
[warning] = print_warning_mock.call_args_list
T.assert_in("Pool disposed before input was consumed", warning[0][0])
开发者ID:striglia,项目名称:vimap,代码行数:34,代码来源:exceptions_test.py
示例2: test_deprecated_mapper_final_positional_arg
def test_deprecated_mapper_final_positional_arg(self):
def mapper(k, v):
pass
def reducer(k, v):
pass
def mapper_final():
pass
stderr = StringIO()
with no_handlers_for_logger():
log_to_stream('mrjob.job', stderr)
step = MRJob.mr(mapper, reducer, mapper_final)
# should be allowed to specify mapper_final as a positional arg,
# but we log a warning
assert_equal(step, MRJob.mr(mapper=mapper,
reducer=reducer,
mapper_final=mapper_final))
assert_in('mapper_final should be specified', stderr.getvalue())
# can't specify mapper_final as a positional and keyword arg
assert_raises(
TypeError,
MRJob.mr, mapper, reducer, mapper_final, mapper_final=mapper_final)
开发者ID:gimlids,项目名称:LTPM,代码行数:26,代码来源:job_test.py
示例3: test_invalid_job_collation
def test_invalid_job_collation(self):
jobs = FrozenDict({'test_collision0': ConfigJob(name='test_collision0',
node='node0',
schedule=ConfigIntervalScheduler(timedelta=datetime.timedelta(0,
20)),
actions=FrozenDict({'action0_0': ConfigAction(name='action0_0',
command='test_command0.0',
requires=(),
node=None)}),
queueing=True,
run_limit=50,
all_nodes=False,
cleanup_action=ConfigCleanupAction(command='test_command0.1',
requires=(),
name='cleanup',
node=None),
enabled=True,
allow_overlap=False)})
services = FrozenDict({'test_collision0': ConfigService(name='test_collision0',
node='node0',
pid_file='/var/run/%(name)s-%(instance_number)s.pid',
command='service_command0',
monitor_interval=20,
restart_interval=None,
count=2)})
fake_config = mock.Mock()
setattr(fake_config, 'jobs', jobs)
setattr(fake_config, 'services', services)
expected_message = "Collision found for identifier 'MASTER.test_collision0'"
exception = assert_raises(ConfigError, collate_jobs_and_services, {'MASTER': fake_config})
assert_in(expected_message, str(exception))
开发者ID:strategist922,项目名称:Tron,代码行数:32,代码来源:config_parse_test.py
示例4: test_failing_child_initialized_hook
def test_failing_child_initialized_hook(self):
def child_initialized_hook(child_pid):
raise Exception, "child_initialized hook raises exception"
# When child_initialized hook fails parent process will
# exit. To test a failing initilization hook we fork and watch
# the new child.
pid = os.fork()
if not pid:
event_hooks = {"child_initialized" : child_initialized_hook}
with testing.no_stderr():
# This will fail. redirecting stderr to /dev/null will
# silence the test output.
self.run_child_function_in_catbox(event_hooks=event_hooks)
else:
status = 0
wait_pid = 0
try:
for _ in range(5):
(wait_pid, status, _) = os.wait4(pid, os.WNOHANG)
if wait_pid == pid:
break
time.sleep(.1)
except OSError, e:
T.assert_in("No child processes", e)
else:
开发者ID:Pardus-Linux,项目名称:catbox,代码行数:26,代码来源:test_event_hooks.py
示例5: test_discover_test_with_unknown_import_error
def test_discover_test_with_unknown_import_error(self):
"""Insure that DiscoveryError is raised when a test which raises an unusual exception upon import is discovered."""
stdout, stderr = cmd_output(
'python', '-m', 'testify.test_program', self.broken_import_module,
)
T.assert_in('DISCOVERY FAILURE', stdout)
T.assert_in('AttributeError: aaaaa!', stderr)
开发者ID:chriskuehl,项目名称:Testify,代码行数:7,代码来源:discovery_failure_test.py
示例6: test_contains_ancestral
def test_contains_ancestral(self):
cd = ChainedDict(**{"the_key": True})
cd2 = ChainedDict(parent=cd, **{"the_other_key": True})
T.assert_in("the_key", cd2)
T.assert_in("the_other_key", cd2)
T.assert_not_in("the_other_key", cd)
开发者ID:PLOS-Web,项目名称:py_meter_ingest,代码行数:7,代码来源:chained_dict_test.py
示例7: test_exception_in_setup_phase
def test_exception_in_setup_phase(self):
"""If a class_setup method raises an exception, this exception is
reported as an error in all of the test methods in the test case. The
methods are then treated as flakes and re-run.
"""
# Pull and run the test case, thereby causing class_setup to run.
test_case = get_test(self.server, 'runner')
assert_equal(len(test_case['methods']), 3)
# The last method will be the special 'run' method which signals the
# entire test case is complete (including class_teardown).
assert_equal(test_case['methods'][-1], 'run')
self.run_test('runner')
# 'classTearDown' is a deprecated synonym for 'class_teardown'. We
# don't especially care about it, but it's in there.
#
# Exceptions during execution of class_setup cause test methods to fail
# and get requeued as flakes. They aren't reported now because they
# aren't complete.
expected_methods = set(['classTearDown', 'run'])
# self.run_test configures us up to collect results submitted at
# class_teardown completion time. class_setup_teardown methods report
# the result of their teardown phase at "class_teardown completion"
# time. So, when testing the setup phase of class_setup_teardown, we
# will see an "extra" method.
#
# Child classes which exercise class_setup_teardown will set
# self.class_setup_teardown_method_name so we can add it to
# expected_methods here.
if hasattr(self, 'class_setup_teardown_method_name'):
expected_methods.add(self.class_setup_teardown_method_name)
seen_methods = self.get_seen_methods(self.test_reporter.test_complete.calls)
# This produces a clearer diff than simply asserting the sets are
# equal.
assert_equal(expected_methods.symmetric_difference(seen_methods), set())
# Verify the failed test case is re-queued for running.
assert_equal(self.server.test_queue.empty(), False)
requeued_test_case = get_test(self.server, 'runner2')
assert_in(self.dummy_test_case.__name__, requeued_test_case['class_path'])
# Reset reporter.
self.test_reporter.test_complete = turtle.Turtle()
# Run tests again.
self.run_test('runner2')
# This time, test methods have been re-run as flakes. Now that these
# methods are are complete, they should be reported.
expected_methods = set(['test1', 'test2', 'classTearDown', 'run'])
if hasattr(self, 'class_setup_teardown_method_name'):
expected_methods.add(self.class_setup_teardown_method_name)
seen_methods = self.get_seen_methods(self.test_reporter.test_complete.calls)
# This produces a clearer diff than simply asserting the sets are
# equal.
assert_equal(expected_methods.symmetric_difference(seen_methods), set())
# Verify no more test cases have been re-queued for running.
assert_equal(self.server.test_queue.empty(), True)
开发者ID:MiloJiang,项目名称:Testify,代码行数:60,代码来源:test_runner_server_test.py
示例8: test_bad_requires
def test_bad_requires(self):
test_config = (
BASE_CONFIG
+ """
jobs:
-
name: "test_job0"
node: node0
schedule: "interval 20s"
actions:
-
name: "action0_0"
command: "test_command0.0"
-
name: "action0_1"
command: "test_command0.1"
-
name: "test_job1"
node: node0
schedule: "interval 20s"
actions:
-
name: "action1_0"
command: "test_command1.0"
requires: action0_0
"""
)
expected_message = "jobs.test_job1.action1_0 has a dependency " '"action0_0" that is not in the same job!'
exception = assert_raises(ConfigError, load_config, test_config)
assert_in(expected_message, str(exception))
开发者ID:anthonypt87,项目名称:Tron,代码行数:32,代码来源:config_parse_test.py
示例9: test_overlap_node_and_node_pools
def test_overlap_node_and_node_pools(self):
tron_config = dict(
nodes=[dict(name="sameName", hostname="localhost")], node_pools=[dict(name="sameName", nodes=["sameNode"])]
)
expected_msg = "Node and NodePool names must be unique sameName"
exception = assert_raises(ConfigError, valid_config, tron_config)
assert_in(expected_msg, str(exception))
开发者ID:anthonypt87,项目名称:Tron,代码行数:7,代码来源:config_parse_test.py
示例10: test_list_path_no_path_duplicates
def test_list_path_no_path_duplicates(self):
"""Tests that when no path is specified the correct results are returned
and the repeat key is not cached with the wrong data source.
"""
test_path = None
expected_paths = [{
'name': 'src.MajorSource%d' % i,
'type': 'dir'
} for i in xrange(len(self.data_source.data_sources))]
with self._mock_ds_method('_request_paths_from_ds') as mock_request_paths:
mock_request_path_list = [[path] for path in expected_paths]
mock_request_path_list[-1].append({
'name': 'src.MajorSource1',
'type': 'dir'
})
mock_request_paths.side_effect = mock_request_path_list
actual_paths = self.data_source.list_path(test_path)
T.assert_equal(mock_request_paths.call_count, len(self.data_source.data_sources))
for ds in self.data_source.data_sources:
mock_request_paths.assert_any_call(ds, test_path)
T.assert_equal(expected_paths, actual_paths)
for expected_path, expected_data_source in zip(expected_paths, self.data_source.data_sources):
T.assert_in(expected_path['name'], self.data_source.key_mapping_cache)
T.assert_equal(self.data_source.key_mapping_cache[expected_path['name']], expected_data_source)
开发者ID:DajunKou,项目名称:firefly,代码行数:31,代码来源:aggregating_data_source_test.py
示例11: test_process_queue_duplicate
def test_process_queue_duplicate(self):
duplicate_req = copy.deepcopy(self.fake_request)
duplicate_req['id'] = 11
with nested(
mock.patch("%s.pushmanager.core.git.GitQueue.verify_branch_failure" % __name__),
mock.patch("%s.pushmanager.core.git.GitQueue.verify_branch_successful" % __name__),
# This will fail, stop logging errors
mock.patch("%s.pushmanager.core.git.logging.error" % __name__),
mock.patch(
"%s.pushmanager.core.git.GitQueue._get_request_with_sha" % __name__,
return_value={'id': 10, 'state': 'requested'}
),
self.mocked_update_request(self.fake_request, duplicate_req)
):
# GitQueue._get_request_with_sha returning a value means
# we have a duplicated request. This should trigger a
# failure
T.assert_equal(pushmanager.core.git.GitQueue.verify_branch_failure.call_count, 1)
T.assert_equal(pushmanager.core.git.GitQueue.verify_branch_successful.call_count, 0)
# Match the error message for duplicate revision. error_msg
# should be the last item of the first call object's *args list
# (from mock library).
T.assert_in(
"another request with the same revision sha",
pushmanager.core.git.GitQueue.verify_branch_failure.call_args_list[0][0][1]
)
开发者ID:Mango-J,项目名称:pushmanager,代码行数:27,代码来源:test_core_git.py
示例12: test_list_path_no_path
def test_list_path_no_path(self):
"""Tests the behavior of list_path when asking for the root keys (no
path specified).
"""
test_path = None
expected_paths = [{
'name': 'src.MajorSource%d' % i,
'type': 'dir'
} for i in xrange(len(self.data_source.data_sources))]
with self._mock_ds_method('_request_paths_from_ds') as mock_request_paths:
mock_request_paths.side_effect = [[path] for path in expected_paths]
actual_paths = self.data_source.list_path(test_path)
T.assert_equal(mock_request_paths.call_count, len(self.data_source.data_sources))
for ds in self.data_source.data_sources:
mock_request_paths.assert_any_call(ds, test_path)
T.assert_equal(expected_paths, actual_paths)
for expected_path, expected_data_source in zip(expected_paths, self.data_source.data_sources):
T.assert_in(expected_path['name'], self.data_source.key_mapping_cache)
T.assert_equal(self.data_source.key_mapping_cache[expected_path['name']], expected_data_source)
开发者ID:DajunKou,项目名称:firefly,代码行数:26,代码来源:aggregating_data_source_test.py
示例13: test_find_data_source_for_stat_key
def test_find_data_source_for_stat_key(self):
"""Tests _find_data_source_for_stat_key when it's provided by one of
the configured data sources.
"""
expected_data_source = {
'data_server_url': "http://b.com",
'data_source_hash': util.generate_ds_key("another.data.source"),
'secret_key': "TEST_SECRET_TWO"
}
test_key = 'src.our_key'
def fake_paths_from_ds(data_source, path):
if data_source == expected_data_source:
return [{"name": test_key},]
else:
return [{"name": "src.not_our_key"},]
with mock.patch.object(self.data_source, '_request_paths_from_ds', fake_paths_from_ds):
actual_ds = self.data_source._find_data_source_for_stat_key(test_key)
T.assert_equal(expected_data_source, actual_ds)
T.assert_in(test_key, self.data_source.key_mapping_cache)
T.assert_equal(expected_data_source, self.data_source.key_mapping_cache[test_key])
开发者ID:DajunKou,项目名称:firefly,代码行数:25,代码来源:aggregating_data_source_test.py
示例14: verify_message_from_child
def verify_message_from_child(self, expected_message=None):
expected_message = expected_message or self.default_expected_message_from_child
actual_message_from_child = self.poll()
if actual_message_from_child:
T.assert_in(expected_message, actual_message_from_child)
else:
raise ChildDidNotReportBackException
开发者ID:bukzor,项目名称:catbox,代码行数:7,代码来源:testing.py
示例15: assert_checklist_for_tags
def assert_checklist_for_tags(self, tags, requestid=None):
num_checks = 0
checks = []
# Gather reference checklists from the code
for tag in tags:
# While the tag name is 'search-backend', the checklist type
# is truncated to 'search'.
if tag == 'search-backend':
tag = 'search'
if tag not in checklist_reminders:
continue
plain_list = checklist_reminders[tag]
checks += [(tag, check) for check in plain_list]
cleanup_tag = '%s-cleanup' % tag
cleanup_list = checklist_reminders[cleanup_tag]
checks += [(cleanup_tag, check) for check in cleanup_list]
num_checks = len(checks)
reqid = self.make_request_with_tags(tags, requestid)
checklists = self.get_checklists(reqid)
T.assert_equal(num_checks, len(checklists))
for check in checks:
T.assert_in((reqid, check[0], check[1]), checklists)
return reqid
开发者ID:eevee,项目名称:pushmanager,代码行数:31,代码来源:test_servlet_newrequest.py
示例16: test_checklist_duplicate
def test_checklist_duplicate(self):
with fake_checklist_request():
# insert fake data from FakeDataMixin
fake_pushid = 2
self.insert_pushes()
self.insert_requests()
test1_request = self.get_requests_by_user('testuser1')[0]
test2_request = self.get_requests_by_user('testuser2')[0]
self.insert_pushcontent(test1_request['id'], fake_pushid)
self.insert_pushcontent(test2_request['id'], fake_pushid)
# insert fake checklist data
checklist_queries = []
for req in (test1_request, test2_request):
checklist_queries.append(db.push_checklist.insert({
'request': req['id'],
'type': 'search',
'target': 'prod'
}))
checklist_queries.append(db.push_checklist.insert({
'request': req['id'],
'type': 'search-cleanup',
'target': 'post-verify-prod'
}))
db.execute_transaction_cb(checklist_queries, on_db_return)
uri = "/checklist?id=%d" % fake_pushid
response = self.fetch(uri)
T.assert_equal(response.error, None)
T.assert_not_in("No checklist items for this push", response.body)
T.assert_not_equal(re.search("for testuser\d,testuser\d", response.body), None)
T.assert_in("Before Certifying - Do In Prod", response.body)
开发者ID:Mango-J,项目名称:pushmanager,代码行数:32,代码来源:test_servlet_checklist.py
示例17: test_messy_error
def test_messy_error(self):
counter_string = 'Job JOBID="_001" FAILED_REDUCES="0" COUNTERS="THIS IS NOT ACTUALLY A COUNTER"'
with no_handlers_for_logger(''):
stderr = StringIO()
log_to_stream('mrjob.parse', stderr, level=logging.WARN)
assert_equal((None, None), parse_hadoop_counters_from_line(counter_string))
assert_in('Cannot parse Hadoop counter line', stderr.getvalue())
开发者ID:gimlids,项目名称:LTPM,代码行数:7,代码来源:parse_test.py
示例18: test_hoods_checklists
def test_hoods_checklists(self):
with fake_checklist_request():
# insert fake data from FakeDataMixin
fake_pushid = 2
self.insert_pushes()
self.insert_requests()
req = self.get_requests_by_user('testuser1')[0]
self.insert_pushcontent(req['id'], fake_pushid)
# insert fake checklist data
checklist_queries = []
checklist_items = (
{'request': req['id'], 'type': 'hoods', 'target': 'stage'},
{'request': req['id'], 'type': 'hoods', 'target': 'prod'},
{'request': req['id'], 'type': 'hoods-cleanup', 'target': 'post-verify-stage'},
)
for checklist_item in checklist_items:
checklist_queries.append(db.push_checklist.insert(checklist_item))
db.execute_transaction_cb(checklist_queries, on_db_return)
uri = "/checklist?id=%d" % fake_pushid
response = self.fetch(uri)
T.assert_equal(response.error, None)
T.assert_not_in("No checklist items for this push", response.body)
T.assert_in("Notify testuser1 to deploy Geoservices to stage", response.body)
T.assert_in("Notify testuser1 to deploy Geoservices to prod", response.body)
开发者ID:Mango-J,项目名称:pushmanager,代码行数:27,代码来源:test_servlet_checklist.py
示例19: test_bad_requires
def test_bad_requires(self):
test_config = BASE_CONFIG + """
jobs:
-
name: "test_job0"
node: node0
schedule: "interval 20s"
actions:
-
name: "action0_0"
command: "test_command0.0"
-
name: "action0_1"
command: "test_command0.1"
-
name: "test_job1"
node: node0
schedule: "interval 20s"
actions:
-
name: "action1_0"
command: "test_command1.0"
requires: [action0_0]
"""
expected_message = ('jobs.MASTER.test_job1.action1_0 has a dependency '
'"action0_0" that is not in the same job!')
exception = assert_raises(ConfigError, valid_config_from_yaml, test_config)
assert_in(expected_message, str(exception))
开发者ID:ContextLogic,项目名称:Tron,代码行数:30,代码来源:config_parse_test.py
示例20: test_create_scratch_uri
def test_create_scratch_uri(self):
# "walrus" bucket will be ignored; it doesn't start with "mrjob-"
self.add_mock_s3_data({'walrus': {}, 'zebra': {}})
runner = EMRJobRunner(conf_path=False, s3_sync_wait_time=0.01)
# bucket name should be mrjob- plus 16 random hex digits
s3_scratch_uri = runner._opts['s3_scratch_uri']
assert_equal(s3_scratch_uri[:11], 's3://mrjob-')
assert_equal(s3_scratch_uri[27:], '/tmp/')
# bucket shouldn't actually exist yet
scratch_bucket, _ = parse_s3_uri(s3_scratch_uri)
assert_not_in(scratch_bucket, self.mock_s3_fs.keys())
# need to do something to ensure that the bucket actually gets
# created. let's launch a (mock) job flow
jfid = runner.make_persistent_job_flow()
assert_in(scratch_bucket, self.mock_s3_fs.keys())
runner.make_emr_conn().terminate_jobflow(jfid)
# once our scratch bucket is created, we should re-use it
runner2 = EMRJobRunner(conf_path=False)
assert_equal(runner2._opts['s3_scratch_uri'], s3_scratch_uri)
s3_scratch_uri = runner._opts['s3_scratch_uri']
开发者ID:boursier,项目名称:mrjob,代码行数:25,代码来源:emr_test.py
注:本文中的testify.assert_in函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论