本文整理汇总了Python中tests.utils.helpers.get_data_path函数的典型用法代码示例。如果您正苦于以下问题:Python get_data_path函数的具体用法?Python get_data_path怎么用?Python get_data_path使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_data_path函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_pre_execute_check_imts_raises
def test_pre_execute_check_imts_raises(self):
haz_job = engine.prepare_job()
cfg = helpers.get_data_path('classical_job.ini')
params, files = engine.parse_config(open(cfg, 'r'))
haz_job.hazard_calculation = engine.create_hazard_calculation(
haz_job.owner, params, files.values())
haz_job.save()
hazard_curve_output = models.Output.objects.create_output(
haz_job, 'test_hazard_curve', 'hazard_curve'
)
models.HazardCurve.objects.create(
output=hazard_curve_output,
investigation_time=50.0,
imt='PGV', # the vulnerability model only defines SA(0.1)
statistics='mean'
)
cfg = helpers.get_data_path(
'end-to-end-hazard-risk/job_risk_classical.ini')
risk_job = helpers.get_risk_job(
cfg, hazard_output_id=hazard_curve_output.id
)
models.JobStats.objects.create(oq_job=risk_job)
calc = classical.ClassicalRiskCalculator(risk_job)
# Check for compatibility between the IMTs defined in the vulnerability
# model and the chosen hazard output (--hazard-output-id)
with self.assertRaises(ValueError) as ar:
calc.pre_execute()
self.assertEqual(
"There is no hazard output for: SA(0.1). "
"The available IMTs are: PGA.",
ar.exception.message)
开发者ID:kenxshao,项目名称:oq-engine,代码行数:34,代码来源:core_test.py
示例2: setUp
def setUp(self):
self.job, _ = helpers.get_fake_risk_job(
get_data_path('event_based_bcr/job.ini'),
get_data_path('event_based_hazard/job.ini'), output_type="gmf")
self.calculator = core.EventBasedBCRRiskCalculator(self.job)
models.JobStats.objects.create(oq_job=self.job)
开发者ID:Chunghan,项目名称:oq-engine,代码行数:7,代码来源:core_test.py
示例3: test_pre_execute_check_imts_no_errors
def test_pre_execute_check_imts_no_errors(self):
haz_job = engine.prepare_job()
cfg = helpers.get_data_path(
'end-to-end-hazard-risk/job_haz_classical.ini')
params, files = engine.parse_config(open(cfg, 'r'))
haz_job.hazard_calculation = engine.create_hazard_calculation(
haz_job.owner, params, files.values())
haz_job.save()
hazard_curve_output = models.Output.objects.create_output(
haz_job, 'test_hazard_curve', 'hazard_curve'
)
models.HazardCurve.objects.create(
output=hazard_curve_output,
investigation_time=50.0,
# this imt is compatible with the vuln model
imt='SA',
sa_period=0.025,
sa_damping=5.0,
statistics='mean'
)
cfg = helpers.get_data_path(
'end-to-end-hazard-risk/job_risk_classical.ini')
risk_job = helpers.get_risk_job(
cfg, hazard_output_id=hazard_curve_output.id
)
models.JobStats.objects.create(oq_job=risk_job)
calc = classical.ClassicalRiskCalculator(risk_job)
# In contrast to the test above (`test_pre_execute_check_imts_raises`),
# we expect no errors to be raised.
calc.pre_execute()
开发者ID:kenxshao,项目名称:oq-engine,代码行数:34,代码来源:core_test.py
示例4: setUp
def setUp(self):
self.generated_files = []
self.job = Job.from_file(helpers.get_data_path(CONFIG_FILE))
self.job_with_includes = Job.from_file(
helpers.get_data_path(CONFIG_WITH_INCLUDES))
self.generated_files.append(self.job.super_config_path)
self.generated_files.append(self.job_with_includes.super_config_path)
开发者ID:danciul,项目名称:openquake,代码行数:8,代码来源:job_unittest.py
示例5: test_with_input_type
def test_with_input_type(self):
job, files = helpers.get_fake_risk_job(
get_data_path('classical_psha_based_risk/job.ini'),
get_data_path('simple_fault_demo_hazard/job.ini'))
rc = job.risk_calculation
inputs = models.inputs4rcalc(rc.id, input_type='exposure')
self.assertEqual(1, inputs.count())
开发者ID:larsbutler,项目名称:oq-engine,代码行数:8,代码来源:models_test.py
示例6: setUp
def setUp(self):
client = kvs.get_client()
# Delete managed job id info so we can predict the job key
# which will be allocated for us
client.delete(kvs.tokens.CURRENT_JOBS)
self.generated_files = []
self.job = helpers.job_from_file(helpers.get_data_path(CONFIG_FILE))
self.job_with_includes = helpers.job_from_file(helpers.get_data_path(CONFIG_WITH_INCLUDES))
开发者ID:kpanic,项目名称:openquake,代码行数:10,代码来源:job_unittest.py
示例7: setUp
def setUp(self):
self.job, _ = helpers.get_fake_risk_job(
get_data_path("event_based_risk/job.ini"), get_data_path("event_based_hazard/job.ini"), output_type="gmf"
)
self.calculator = event_based.EventBasedRiskCalculator(self.job)
models.JobStats.objects.create(oq_job=self.job)
self.calculator.pre_execute()
self.job.is_running = True
self.job.status = "executing"
self.job.save()
开发者ID:kenxshao,项目名称:oq-engine,代码行数:11,代码来源:core_test.py
示例8: test_a_few_inputs
def test_a_few_inputs(self):
job, files = helpers.get_fake_risk_job(
get_data_path('classical_psha_based_risk/job.ini'),
get_data_path('simple_fault_demo_hazard/job.ini'))
rc = job.risk_calculation
expected_ids = sorted([x.id for x in files.values()])
inputs = models.inputs4rcalc(rc.id)
actual_ids = sorted([x.id for x in inputs])
self.assertEqual(expected_ids, actual_ids)
开发者ID:kenxshao,项目名称:oq-engine,代码行数:13,代码来源:models_test.py
示例9: setUp
def setUp(self):
job, _ = helpers.get_fake_risk_job(
get_data_path('classical_psha_based_risk/job.ini'),
get_data_path('simple_fault_demo_hazard/job.ini')
)
self.compulsory_arguments = dict(
lrem_steps_per_interval=5)
self.other_args = dict(
calculation_mode="classical",
region_constraint=(
'POLYGON((-122.0 38.113, -122.114 38.113, -122.57 38.111, '
'-122.0 38.113))'),
hazard_output=job.risk_calculation.hazard_output)
开发者ID:Chunghan,项目名称:oq-engine,代码行数:13,代码来源:validation_test.py
示例10: test_with_input_type
def test_with_input_type(self):
job, files = helpers.get_fake_risk_job(
get_data_path('classical_psha_based_risk/job.ini'),
get_data_path('simple_fault_demo_hazard/job.ini'))
rc = job.risk_calculation
# It should only be 1 id, actually.
expected_ids = [x.id for x in files.values()
if x.input_type == 'exposure']
inputs = models.inputs4rcalc(rc.id, input_type='exposure')
actual_ids = sorted([x.id for x in inputs])
self.assertEqual(expected_ids, actual_ids)
开发者ID:kenxshao,项目名称:oq-engine,代码行数:15,代码来源:models_test.py
示例11: setUp
def setUp(self):
self.cfg = helpers.get_data_path('event_based_hazard/job_2.ini')
self.job = helpers.get_hazard_job(self.cfg, username=getpass.getuser())
self.calc = core.EventBasedHazardCalculator(self.job)
hc_id = self.job.hazard_calculation.id
models.SiteCollection.cache[hc_id] = make_site_coll(0, 0, n=5)
models.JobStats.objects.create(oq_job=self.job)
开发者ID:chenliu0831,项目名称:oq-engine,代码行数:7,代码来源:core_test.py
示例12: test_initialize_site_model
def test_initialize_site_model(self):
# we need a slightly different config file for this test
cfg = helpers.get_data_path(
'simple_fault_demo_hazard/job_with_site_model.ini')
self.job = helpers.get_hazard_job(cfg)
self.calc = core.ClassicalHazardCalculator(self.job)
self.calc.initialize_site_model()
# If the site model isn't valid for the calculation geometry, a
# `RuntimeError` should be raised here
# Okay, it's all good. Now check the count of the site model records.
sm_nodes = models.SiteModel.objects.filter(job=self.job)
self.assertEqual(2601, len(sm_nodes))
num_pts_to_compute = len(
self.job.hazard_calculation.points_to_compute())
hazard_site = models.HazardSite.objects.filter(
hazard_calculation=self.job.hazard_calculation)
# The site model is good. Now test that `hazard_site` was computed.
# For now, just test the length.
self.assertEqual(num_pts_to_compute, len(hazard_site))
开发者ID:ryanberrio,项目名称:oq-engine,代码行数:25,代码来源:core_test.py
示例13: test_export_for_scenario
def test_export_for_scenario(self):
target_dir = tempfile.mkdtemp()
try:
cfg = helpers.get_data_path('scenario_hazard/job.ini')
# run the calculation in process to create something to export
os.environ['OQ_NO_DISTRIBUTE'] = '1'
try:
helpers.run_hazard_job(cfg)
finally:
del os.environ['OQ_NO_DISTRIBUTE']
job = models.OqJob.objects.latest('id')
self.assertEqual(job.status, 'complete')
outputs = export_core.get_outputs(job.id)
self.assertEqual(1, len(outputs)) # 1 GMF
gmf_outputs = outputs.filter(output_type='gmf_scenario')
self.assertEqual(1, len(gmf_outputs))
exported_files = check_export(gmf_outputs[0].id, target_dir)
self.assertEqual(1, len(exported_files))
# Check the file paths exist, is absolute, and the file isn't
# empty.
f = exported_files[0]
self._test_exported_file(f)
# Check for the correct number of GMFs in the file:
tree = etree.parse(f)
self.assertEqual(20, number_of('nrml:gmf', tree))
finally:
shutil.rmtree(target_dir)
开发者ID:kenxshao,项目名称:oq-engine,代码行数:35,代码来源:hazard_test.py
示例14: setUp
def setUp(self):
self.gmf_string = open(helpers.get_data_path("gmfs.json")).readline()
region = shapes.Region.from_coordinates(
[(-118.30, 34.12), (-118.18, 34.12),
(-118.18, 34.00), (-118.30, 34.00)])
region.cell_size = 0.02
self.grid = region.grid
开发者ID:kpanic,项目名称:openquake,代码行数:7,代码来源:shapes_unittest.py
示例15: test_successful_job_lifecycle
def test_successful_job_lifecycle(self):
with patch('openquake.job.Job.from_file') as from_file:
# called in place of Job.launch
def test_status_running_and_succeed():
self.assertEquals('running', self._job_status())
return []
# replaces Job.launch with a mock
def patch_job_launch(*args, **kwargs):
self.job = self.job_from_file(*args, **kwargs)
self.job.launch = mock.Mock(
side_effect=test_status_running_and_succeed)
self.assertEquals('pending', self._job_status())
return self.job
from_file.side_effect = patch_job_launch
with patch('openquake.job.spawn_job_supervisor'):
run_job(helpers.get_data_path(CONFIG_FILE), 'db')
self.assertEquals(1, self.job.launch.call_count)
self.assertEquals('succeeded', self._job_status())
开发者ID:favalex,项目名称:openquake,代码行数:26,代码来源:job_unittest.py
示例16: test_http_handler_writes_a_file
def test_http_handler_writes_a_file(self):
class StubbedHTTPConnection(StubbedGetter):
def __enter__(self):
return self
def __exit__(self, *args):
pass
def request(self, req_type, path):
self.remote_path = path
return self
def getresponse(self):
return self
def read(self):
with open(self.remote_path, "r") as reader:
return reader.read()
expected_path = "/tmp/fake_file"
remote_path = "http://localhost/%s" % helpers.get_data_path("config.gem")
url = urlparse.urlparse(remote_path)
http_handler = handlers.HTTPHandler(url, expected_path)
guaranteed_file = http_handler.handle(getter=StubbedHTTPConnection)
self.assertTrue(os.path.isfile(guaranteed_file))
os.unlink(guaranteed_file)
开发者ID:danciul,项目名称:openquake,代码行数:27,代码来源:handlers_unittest.py
示例17: setUp
def setUp(self):
client = kvs.get_client()
# Delete managed job id info so we can predict the job key
# which will be allocated for us
client.delete(kvs.tokens.CURRENT_JOBS)
self.generated_files = []
job = engine.prepare_job()
jp, params, sections = import_job_profile(helpers.get_data_path(CONFIG_FILE), job)
self.job_ctxt = JobContext(params, job.id, sections=sections, oq_job_profile=jp, oq_job=job)
job = engine.prepare_job()
jp, params, sections = import_job_profile(helpers.get_data_path(CONFIG_WITH_INCLUDES), job)
self.job_ctxt_with_includes = JobContext(params, job.id, sections=sections, oq_job_profile=jp, oq_job=job)
开发者ID:wmorales,项目名称:oq-engine,代码行数:16,代码来源:job_unittest.py
示例18: setUp
def setUp(self):
# Patch a few methods here and restore them in the tearDown to avoid
# too many nested with
# See http://www.voidspace.org.uk/python/mock/patch.html \
# #patch-methods-start-and-stop
self.patchers = []
def start_patch(attr_path):
_, attr = attr_path.rsplit('.', 1)
patcher = patch(attr_path)
self.patchers.append(patcher)
setattr(self, attr, patcher.start())
start_patch('openquake.engine.supervising.is_pid_running')
# Patch the actions taken by the supervisor
start_patch('openquake.engine.supervising.supervisor.\
record_job_stop_time')
start_patch(
'openquake.engine.supervising.supervisor.cleanup_after_job')
start_patch('openquake.engine.supervising.supervisor.terminate_job')
start_patch('openquake.engine.supervising.supervisor.get_job_status')
start_patch('openquake.engine.supervising.supervisor'
'.update_job_status')
logging.root.setLevel(logging.CRITICAL)
cfg = get_data_path('end-to-end-hazard-risk/job_haz_classical.ini')
self.job = get_hazard_job(cfg)
开发者ID:Chunghan,项目名称:oq-engine,代码行数:29,代码来源:supervisor_test.py
示例19: test_serialize
def test_serialize(self):
expected_file = helpers.get_data_path(
"expected-dmg-dist-total.xml")
expected_text = open(expected_file, "r").readlines()
self.make_dist()
self.make_data("no_damage", 1.0, 1.6)
self.make_data("slight", 34.8, 18.3)
self.make_data("moderate", 64.2, 19.8)
self.make_data("extensive", 64.3, 19.7)
self.make_data("complete", 64.3, 19.7)
try:
_, result_xml = tempfile.mkstemp()
writer = DmgDistTotalXMLWriter(result_xml,
"ebl1", self.damage_states)
writer.serialize(self.data)
actual_text = open(result_xml, "r").readlines()
self.assertEqual(expected_text, actual_text)
self.assertTrue(xml.validates_against_xml_schema(
result_xml))
finally:
os.unlink(result_xml)
开发者ID:angri,项目名称:openquake,代码行数:29,代码来源:scenario_damage_test.py
示例20: test_validate_warns
def test_validate_warns(self):
# Test that `validate` raises warnings if unnecessary parameters are
# specified for a given calculation.
# For example, `ses_per_logic_tree_path` is an event-based hazard
# param; if this param is specified for a classical hazard job, a
# warning should be raised.
cfg_file = helpers.get_data_path('simple_fault_demo_hazard/job.ini')
job = engine.prepare_job()
params = engine.parse_config(open(cfg_file, 'r'))
# Add a few superfluous parameters:
params['ses_per_logic_tree_path'] = 5
params['ground_motion_correlation_model'] = 'JB2009'
calculation = engine.create_calculation(
models.HazardCalculation, params)
job.hazard_calculation = calculation
job.save()
with warnings.catch_warnings(record=True) as w:
validation.validate(job, 'hazard', params, ['xml'])
expected_warnings = [
"Unknown parameter '%s' for calculation mode 'classical'."
" Ignoring." % x for x in ('ses_per_logic_tree_path',
'ground_motion_correlation_model')
]
actual_warnings = [m.message.message for m in w]
self.assertEqual(sorted(expected_warnings), sorted(actual_warnings))
开发者ID:Chunghan,项目名称:oq-engine,代码行数:28,代码来源:validation_test.py
注:本文中的tests.utils.helpers.get_data_path函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论