本文整理汇总了Python中tests.autoclassify.utils.create_failure_lines函数的典型用法代码示例。如果您正苦于以下问题:Python create_failure_lines函数的具体用法?Python create_failure_lines怎么用?Python create_failure_lines使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了create_failure_lines函数的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_chunked_qs_reverse
def test_chunked_qs_reverse(test_job):
"""
Test `chunked_qs_reverse` function
Specifically checks the length of chunks and their items don't overlap.
"""
# create 25 failure lines
create_failure_lines(test_job, [(test_line, {}) for i in range(25)])
qs = FailureLine.objects.all()
chunks = list(chunked_qs_reverse(qs, chunk_size=5))
one = chunks[0]
two = chunks[1]
five = chunks[4]
assert len(one) == 5
assert one[0].id == 25
assert one[4].id == 21
assert len(two) == 5
assert two[0].id == 20
assert two[4].id == 16
assert len(five) == 5
assert five[0].id == 5
assert five[4].id == 1
开发者ID:ahal,项目名称:treeherder-service,代码行数:27,代码来源:test_queryset.py
示例2: test_cycle_all_data_in_chunks
def test_cycle_all_data_in_chunks(test_repository, failure_classifications, sample_data,
sample_resultset, mock_log_parser):
"""
Test cycling the sample data in chunks.
"""
job_data = sample_data.job_data[:20]
test_utils.do_job_ingestion(test_repository, job_data, sample_resultset, False)
# build a date that will cause the data to be cycled
cycle_date_ts = datetime.datetime.now() - datetime.timedelta(weeks=1)
for job in Job.objects.all():
job.submit_time = cycle_date_ts
job.save()
create_failure_lines(Job.objects.get(id=1),
[(test_line, {})] * 7)
assert TestFailureLine.search().count() > 0
call_command('cycle_data', sleep_time=0, days=1, chunk_size=3)
refresh_all()
# There should be no jobs after cycling
assert Job.objects.count() == 0
assert FailureLine.objects.count() == 0
assert JobDetail.objects.count() == 0
assert TestFailureLine.search().count() == 0
开发者ID:kmoir,项目名称:treeherder,代码行数:27,代码来源:test_cycle_data.py
示例3: test_update_failure_lines
def test_update_failure_lines(eleven_jobs_stored,
mock_autoclassify_jobs_true, jm,
test_repository, failure_lines,
classified_failures, test_user):
jobs = (jm.get_job(1)[0], jm.get_job(2)[0])
MatcherManager.register_detector(ManualDetector)
client = APIClient()
client.force_authenticate(user=test_user)
create_failure_lines(test_repository,
jobs[1]["job_guid"],
[(test_line, {}),
(test_line, {"subtest": "subtest2"})])
failure_lines = FailureLine.objects.filter(
job_guid__in=[job["job_guid"] for job in jobs]).all()
for job in jobs:
job_failure_lines = FailureLine.objects.filter(job_guid=job["job_guid"]).all()
bs_artifact = {'type': 'json',
'name': 'Bug suggestions',
'blob': json.dumps([{"search": "TEST-UNEXPECTED-%s %s" %
(line.status.upper(), line.message)} for line in
job_failure_lines]),
'job_guid': job['job_guid']}
with ArtifactsModel(jm.project) as artifacts_model:
artifacts_model.load_job_artifacts([bs_artifact])
body = [{"id": failure_line.id,
"best_classification": classified_failures[1].id}
for failure_line in failure_lines]
for failure_line in failure_lines:
assert failure_line.best_is_verified is False
resp = client.put(reverse("failure-line-list"), body, format="json")
assert resp.status_code == 200
for failure_line in failure_lines:
failure_line.refresh_from_db()
assert failure_line.best_classification == classified_failures[1]
assert failure_line.best_is_verified
for job in jobs:
assert jm.is_fully_verified(job['id'])
notes = jm.get_job_note_list(job['id'])
assert len(notes) == 1
assert notes[0]["failure_classification_id"] == 4
assert notes[0]["who"] == test_user.email
开发者ID:jamesthechamp,项目名称:treeherder,代码行数:56,代码来源:test_failureline.py
示例4: test_update_failure_lines
def test_update_failure_lines(eleven_jobs_stored,
mock_autoclassify_jobs_true, jm,
test_repository, failure_lines,
classified_failures, test_user):
jobs = (Job.objects.get(project_specific_id=1),
Job.objects.get(project_specific_id=2))
MatcherManager.register_detector(ManualDetector)
client = APIClient()
client.force_authenticate(user=test_user)
create_failure_lines(test_repository,
jobs[1].guid,
[(test_line, {}),
(test_line, {"subtest": "subtest2"})])
failure_lines = FailureLine.objects.filter(
job_guid__in=[job.guid for job in jobs]).all()
for job in jobs:
job_failure_lines = FailureLine.objects.filter(job_guid=job.guid).all()
bs_artifact = {'type': 'json',
'name': 'Bug suggestions',
'blob': json.dumps([{"search": "TEST-UNEXPECTED-%s %s" %
(line.status.upper(), line.message)} for line in
job_failure_lines]),
'job_guid': job.guid}
with ArtifactsModel(jm.project) as artifacts_model:
artifacts_model.load_job_artifacts([bs_artifact])
body = [{"id": failure_line.id,
"best_classification": classified_failures[1].id}
for failure_line in failure_lines]
for failure_line in failure_lines:
assert failure_line.best_is_verified is False
resp = client.put(reverse("failure-line-list"), body, format="json")
assert resp.status_code == 200
for failure_line in failure_lines:
failure_line.refresh_from_db()
assert failure_line.best_classification == classified_failures[1]
assert failure_line.best_is_verified
for job in jobs:
assert jm.is_fully_verified(job.project_specific_id)
# will assert if we don't have exactly one job, which is what we want
note = JobNote.objects.get(job=job)
assert note.failure_classification.id == 4
assert note.user == test_user
开发者ID:andymckay,项目名称:treeherder,代码行数:55,代码来源:test_failureline.py
示例5: test_update_autoclassification_bug
def test_update_autoclassification_bug(test_job, test_job_2,
classified_failures):
# Job 1 has two failure lines so nothing should be updated
assert test_job.update_autoclassification_bug(1234) is None
lines = [(test_line, {})]
create_failure_lines(test_job_2, lines)
error_lines = create_text_log_errors(test_job_2, lines)
error_lines[0].mark_best_classification(classified_failures[0])
assert classified_failures[0].bug_number is None
assert test_job_2.update_autoclassification_bug(1234) == classified_failures[0]
classified_failures[0].refresh_from_db()
assert classified_failures[0].bug_number == 1234
开发者ID:MikeLing,项目名称:treeherder,代码行数:15,代码来源:test_classified_failure.py
示例6: test_get_matching_lines
def test_get_matching_lines(webapp, test_repository, failure_lines, classified_failures):
"""
test getting a single failure line
"""
for failure_line in failure_lines:
failure_line.best_classification = classified_failures[0]
failure_line.save()
extra_lines = create_failure_lines(test_repository,
failure_lines[0].job_guid,
[(test_line, {"test": "test2", "line": 2}),
(test_line, {"test": "test2", "subtest": "subtest2",
"line": 3})])
extra_lines[1].best_classification = classified_failures[1]
extra_lines[1].save()
resp = webapp.get(
reverse("classified-failure-matches", kwargs={"pk": classified_failures[0].id}))
assert resp.status_int == 200
actual = resp.json["results"]
assert [item["id"] for item in actual] == [item.id for item in reversed(failure_lines)]
开发者ID:KWierso,项目名称:treeherder,代码行数:25,代码来源:test_classified_failure.py
示例7: failure_lines
def failure_lines(jm, eleven_jobs_stored, test_repository):
from tests.autoclassify.utils import test_line, create_failure_lines
job = jm.get_job(1)[0]
return create_failure_lines(test_repository,
job["job_guid"],
[(test_line, {}),
(test_line, {"subtest": "subtest2"})])
开发者ID:anurag619,项目名称:treeherder,代码行数:9,代码来源:conftest.py
示例8: test_cycle_all_data_in_chunks
def test_cycle_all_data_in_chunks(jm, sample_data,
sample_resultset, test_repository, mock_log_parser):
"""
Test cycling the sample data in chunks.
"""
job_data = sample_data.job_data[:20]
test_utils.do_job_ingestion(jm, job_data, sample_resultset, False)
# build a date that will cause the data to be cycled
time_now = time.time()
cycle_date_ts = int(time_now - 7 * 24 * 3600)
jm.execute(
proc="jobs_test.updates.set_jobs_last_modified",
placeholders=[cycle_date_ts]
)
jobs_to_be_deleted = jm.execute(
proc="jobs_test.selects.get_jobs_for_cycling",
placeholders=[time_now - 24 * 3600]
)
job = jm.get_job(jobs_to_be_deleted[0]['id'])[0]
create_failure_lines(test_repository,
job["job_guid"],
[(test_line, {})] * 7)
jobs_before = jm.execute(proc="jobs_test.selects.jobs")
assert TestFailureLine.search().params(search_type="count").execute().hits.total > 0
call_command('cycle_data', sleep_time=0, days=1, chunk_size=3)
refresh_all()
jobs_after = jm.execute(proc="jobs_test.selects.jobs")
assert len(jobs_after) == len(jobs_before) - len(jobs_to_be_deleted)
# There should be no jobs after cycling
assert len(jobs_after) == 0
assert Job.objects.count() == 0
assert FailureLine.objects.count() == 0
assert JobDetail.objects.count() == 0
assert TestFailureLine.search().params(search_type="count").execute().hits.total == 0
开发者ID:vakila,项目名称:treeherder,代码行数:44,代码来源:test_jobs_model.py
示例9: test_cycle_one_job
def test_cycle_one_job(jm, sample_data,
sample_resultset, test_repository, mock_log_parser,
failure_lines):
"""
Test cycling one job in a group of jobs to confirm there are no
unexpected deletions
"""
job_data = sample_data.job_data[:20]
test_utils.do_job_ingestion(jm, job_data, sample_resultset, False)
job_not_deleted = jm.get_job(2)[0]
failure_lines_remaining = create_failure_lines(test_repository,
job_not_deleted["job_guid"],
[(test_line, {}),
(test_line, {"subtest": "subtest2"})])
time_now = time.time()
cycle_date_ts = int(time_now - 7 * 24 * 3600)
jm.execute(
proc="jobs_test.updates.set_jobs_last_modified",
placeholders=[time_now]
)
jm.execute(
proc="jobs_test.updates.set_one_job_last_modified_timestamp",
placeholders=[cycle_date_ts]
)
jobs_to_be_deleted = jm.execute(
proc="jobs_test.selects.get_one_job_for_cycling",
placeholders=[1]
)
jobs_before = jm.execute(proc="jobs_test.selects.jobs")
call_command('cycle_data', sleep_time=0, days=1, debug=True)
jobs_after = jm.execute(proc="jobs_test.selects.jobs")
# Confirm that the target result set has no jobs in the
# jobs table
jobs_to_be_deleted_after = jm.execute(
proc="jobs_test.selects.get_one_job_for_cycling",
placeholders=[1]
)
assert len(jobs_to_be_deleted_after) == 0
assert len(jobs_after) == len(jobs_before) - len(jobs_to_be_deleted)
assert len(jobs_after) == Job.objects.count()
assert (set(item.id for item in FailureLine.objects.all()) ==
set(item.id for item in failure_lines_remaining))
开发者ID:jamesthechamp,项目名称:treeherder,代码行数:56,代码来源:test_jobs_model.py
示例10: test_update_failure_lines
def test_update_failure_lines(mock_autoclassify_jobs_true,
test_repository,
text_log_errors_failure_lines,
classified_failures,
eleven_jobs_stored,
test_user):
jobs = (Job.objects.get(id=1), Job.objects.get(id=2))
MatcherManager.register_detector(ManualDetector)
client = APIClient()
client.force_authenticate(user=test_user)
lines = [(test_line, {}),
(test_line, {"subtest": "subtest2"})]
new_failure_lines = create_failure_lines(jobs[1], lines)
new_text_log_errors = create_text_log_errors(jobs[1], lines)
for text_log_error, failure_line in zip(new_text_log_errors,
new_failure_lines):
TextLogErrorMetadata.objects.create(text_log_error=text_log_error,
failure_line=failure_line)
failure_lines = FailureLine.objects.filter(
job_guid__in=[job.guid for job in jobs]).all()
text_log_errors = TextLogError.objects.filter(
step__job__in=jobs).all()
for text_log_error, failure_line in zip(text_log_errors, failure_lines):
assert text_log_error.metadata.best_is_verified is False
assert failure_line.best_is_verified is False
body = [{"id": failure_line.id,
"best_classification": classified_failures[1].id}
for failure_line in failure_lines]
resp = client.put(reverse("failure-line-list"), body, format="json")
assert resp.status_code == 200
for text_log_error, failure_line in zip(text_log_errors, failure_lines):
text_log_error.refresh_from_db()
text_log_error.metadata.refresh_from_db()
failure_line.refresh_from_db()
assert failure_line.best_classification == classified_failures[1]
assert failure_line.best_is_verified
assert text_log_error.metadata.best_classification == classified_failures[1]
assert text_log_error.metadata.best_is_verified
for job in jobs:
assert job.is_fully_verified()
# will assert if we don't have exactly one job, which is what we want
note = JobNote.objects.get(job=job)
assert note.failure_classification.id == 4
assert note.user == test_user
开发者ID:MikeLing,项目名称:treeherder,代码行数:56,代码来源:test_failureline.py
示例11: test_update_autoclassification_bug
def test_update_autoclassification_bug(test_job, test_job_2, classified_failures):
classified_failure = classified_failures[0]
# Job 1 has two failure lines so nothing should be updated
assert test_job.update_autoclassification_bug(1234) is None
lines = [(test_line, {})]
create_failure_lines(test_job_2, lines)
error_lines = create_text_log_errors(test_job_2, lines)
error_lines[0].mark_best_classification(classified_failures[0].id)
assert classified_failure.bug_number is None
metadata = TextLogErrorMetadata.objects.get(text_log_error__step__job=test_job_2)
metadata.failure_line = FailureLine.objects.get(pk=3)
metadata.save()
assert test_job_2.update_autoclassification_bug(1234) == classified_failures[0]
classified_failures[0].refresh_from_db()
assert classified_failures[0].bug_number == 1234
开发者ID:ahal,项目名称:treeherder-service,代码行数:20,代码来源:test_classified_failure.py
示例12: test_cycle_all_but_one_job
def test_cycle_all_but_one_job(test_repository, failure_classifications, sample_data,
sample_push, mock_log_parser, elasticsearch,
failure_lines):
"""
Test cycling all but one job in a group of jobs to confirm there are no
unexpected deletions
"""
job_data = sample_data.job_data[:20]
test_utils.do_job_ingestion(test_repository, job_data, sample_push, False)
# one job should not be deleted, set its submit time to now
job_not_deleted = Job.objects.get(id=2)
job_not_deleted.submit_time = datetime.datetime.now()
job_not_deleted.save()
extra_objects = {
'failure_lines': (FailureLine,
create_failure_lines(
job_not_deleted,
[(test_line, {}),
(test_line, {"subtest": "subtest2"})])),
'job_details': (JobDetail, [JobDetail.objects.create(
job=job_not_deleted,
title='test',
value='testvalue')])
}
# set other job's submit time to be a week ago from now
cycle_date_ts = datetime.datetime.now() - datetime.timedelta(weeks=1)
for job in Job.objects.all().exclude(id=job_not_deleted.id):
job.submit_time = cycle_date_ts
job.save()
num_job_logs_to_be_deleted = JobLog.objects.all().exclude(
id=job_not_deleted.id).count()
num_job_logs_before = JobLog.objects.count()
call_command('cycle_data', sleep_time=0, days=1, debug=True)
refresh_index()
assert Job.objects.count() == 1
assert JobLog.objects.count() == (num_job_logs_before -
num_job_logs_to_be_deleted)
for (object_type, objects) in extra_objects.values():
assert (set(item.id for item in object_type.objects.all()) ==
set(item.id for item in objects))
# get all documents
indexed_ids = set(int(item['_id']) for item in all_documents())
expected = set(item.id for item in extra_objects["failure_lines"][1])
assert indexed_ids == expected
开发者ID:ahal,项目名称:treeherder-service,代码行数:52,代码来源:test_cycle_data.py
示例13: test_chunked_qs
def test_chunked_qs(test_job):
# create 25 failure lines
create_failure_lines(test_job, [(test_line, {}) for i in range(25)])
qs = FailureLine.objects.all()
chunks = list(chunked_qs(qs, chunk_size=5))
one = chunks[0]
two = chunks[1]
five = chunks[4]
assert len(one) == 5
assert one[0].id == 1
assert one[4].id == 5
assert len(two) == 5
assert two[0].id == 6
assert two[4].id == 10
assert len(five) == 5
assert five[0].id == 21
assert five[4].id == 25
开发者ID:ahal,项目名称:treeherder-service,代码行数:22,代码来源:test_queryset.py
示例14: failure_lines
def failure_lines(jm, eleven_jobs_stored, initial_data):
from treeherder.model.models import RepositoryGroup, Repository
from tests.autoclassify.utils import test_line, create_failure_lines
job = jm.get_job(1)[0]
repository_group = RepositoryGroup.objects.create(name="repo_group")
repository = Repository.objects.create(name=jm.project,
repository_group=repository_group)
return create_failure_lines(repository,
job["job_guid"],
[(test_line, {}),
(test_line, {"subtest": "subtest2"})])
开发者ID:adusca,项目名称:treeherder,代码行数:14,代码来源:conftest.py
示例15: test_update_autoclassification_bug
def test_update_autoclassification_bug(test_job, test_job_2,
classified_failures):
# Job 1 has two failure lines so nothing should be updated
assert test_job.update_autoclassification_bug(1234) is None
failure_lines = create_failure_lines(test_job_2,
[(test_line, {})])
failure_lines[0].best_classification = classified_failures[0]
failure_lines[0].save()
classified_failures[0].bug_number = None
lines = [(item, {}) for item in FailureLine.objects.filter(job_guid=test_job_2.guid).values()]
create_text_log_errors(test_job_2, lines)
assert test_job_2.update_autoclassification_bug(1234) == classified_failures[0]
classified_failures[0].refresh_from_db()
assert classified_failures[0].bug_number == 1234
开发者ID:SebastinSanty,项目名称:treeherder,代码行数:16,代码来源:test_classified_failure.py
示例16: test_update_autoclassification_bug
def test_update_autoclassification_bug(jm, test_repository, classified_failures):
# Job 1 has two failue lines so nothing should be updated
assert jm.update_autoclassification_bug(1, 1234) is None
job = jm.get_job(2)[0]
failure_lines = create_failure_lines(test_repository,
job["job_guid"],
[(test_line, {})])
failure_lines[0].best_classification = classified_failures[0]
failure_lines[0].save()
classified_failures[0].bug_number = None
lines = [(item, {}) for item in FailureLine.objects.filter(job_guid=job["job_guid"]).values()]
create_summary_lines_failures(test_repository.name, job, lines)
create_bug_suggestions_failures(test_repository.name, job, lines)
assert jm.update_autoclassification_bug(2, 1234) == classified_failures[0]
classified_failures[0].refresh_from_db()
assert classified_failures[0].bug_number == 1234
开发者ID:vakila,项目名称:treeherder,代码行数:17,代码来源:test_jobs_model.py
示例17: test_cycle_one_job
def test_cycle_one_job(jm, sample_data,
sample_resultset, test_repository, mock_log_parser,
failure_lines):
"""
Test cycling one job in a group of jobs to confirm there are no
unexpected deletions
"""
job_data = sample_data.job_data[:20]
test_utils.do_job_ingestion(jm, job_data, sample_resultset, False)
job_not_deleted = jm.get_job(2)[0]
extra_objects = {
'failure_lines': (FailureLine,
create_failure_lines(test_repository,
job_not_deleted["job_guid"],
[(test_line, {}),
(test_line, {"subtest": "subtest2"})])),
'job_details': (JobDetail, [JobDetail.objects.create(
job=Job.objects.get(guid=job_not_deleted["job_guid"]),
title='test',
value='testvalue')])
}
time_now = time.time()
cycle_date_ts = int(time_now - 7 * 24 * 3600)
jm.execute(
proc="jobs_test.updates.set_jobs_last_modified",
placeholders=[time_now]
)
jm.execute(
proc="jobs_test.updates.set_one_job_last_modified_timestamp",
placeholders=[cycle_date_ts]
)
jobs_to_be_deleted = jm.execute(
proc="jobs_test.selects.get_one_job_for_cycling",
placeholders=[1]
)
num_job_logs_to_be_deleted = JobLog.objects.filter(
job__project_specific_id__in=[job['id'] for job in
jobs_to_be_deleted]).count()
jobs_before = jm.execute(proc="jobs_test.selects.jobs")
job_logs_before = JobLog.objects.count()
call_command('cycle_data', sleep_time=0, days=1, debug=True)
jobs_after = jm.execute(proc="jobs_test.selects.jobs")
# Confirm that the target result set has no jobs in the
# jobs table
jobs_to_be_deleted_after = jm.execute(
proc="jobs_test.selects.get_one_job_for_cycling",
placeholders=[1]
)
assert len(jobs_to_be_deleted_after) == 0
assert len(jobs_after) == len(jobs_before) - len(jobs_to_be_deleted)
assert len(jobs_after) == Job.objects.count()
assert JobLog.objects.count() == (job_logs_before -
num_job_logs_to_be_deleted)
for (object_type, objects) in extra_objects.values():
assert (set(item.id for item in object_type.objects.all()) ==
set(item.id for item in objects))
开发者ID:digideskio,项目名称:treeherder,代码行数:71,代码来源:test_jobs_model.py
示例18: failure_lines
def failure_lines(test_job, elasticsearch):
from tests.autoclassify.utils import test_line, create_failure_lines
return create_failure_lines(test_job,
[(test_line, {}),
(test_line, {"subtest": "subtest2"})])
开发者ID:bclary,项目名称:treeherder,代码行数:6,代码来源:conftest.py
注:本文中的tests.autoclassify.utils.create_failure_lines函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论