本文整理汇总了Python中monty.serialization.dumpfn函数的典型用法代码示例。如果您正苦于以下问题:Python dumpfn函数的具体用法?Python dumpfn怎么用?Python dumpfn使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了dumpfn函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: im_vac_antisite_def_energy_parse
def im_vac_antisite_def_energy_parse():
m_description = 'Command to parse vacancy and antisite defect ' \
'energies for intermetallics from the VASP DFT ' \
'calculations.'
parser = ArgumentParser(description=m_description)
parser.add_argument("--mpid",
type=str.lower,
help="Materials Project id of the intermetallic structure.\n" \
"For more info on Materials Project, please refer to " \
"www.materialsproject.org")
parser.add_argument("--mapi_key",
default = None,
help="Your Materials Project REST API key.\n" \
"For more info, please refer to " \
"www.materialsproject.org/opne")
args = parser.parse_args()
print args
energy_dict = vac_antisite_def_parse_energy(args.mpid, args.mapi_key)
print type(energy_dict)
for key,value in energy_dict.items():
print key
print type(key), type(value)
for key2, val2 in value.items():
print type(key2), type(val2)
if energy_dict:
fl_nm = args.mpid+'_raw_defect_energy.json'
dumpfn(energy_dict, fl_nm, cls=MontyEncoder, indent=2)
开发者ID:mbkumar,项目名称:pydii,代码行数:32,代码来源:gen_def_energy.py
示例2: im_sol_sub_def_energy_parse
def im_sol_sub_def_energy_parse():
m_description = 'Command to parse solute substitution defect ' \
'energies for intermetallics from the VASP DFT ' \
'calculations.'
parser = ArgumentParser(description=m_description)
parser.add_argument("--mpid",
type=str.lower,
help="Materials Project id of the intermetallic structure.\n" \
"For more info on Materials Project, please refer to " \
"www.materialsproject.org")
parser.add_argument("--solute", help="Solute Element")
parser.add_argument("--mapi_key",
default = None,
help="Your Materials Project REST API key.\n" \
"For more info, please refer to " \
"www.materialsproject.org/opne")
args = parser.parse_args()
energy_dict = solute_def_parse_energy(args.mpid, args.solute,
args.mapi_key)
if energy_dict:
fl_nm = args.mpid+'_solute-'+args.solute+'_raw_defect_energy.json'
dumpfn(energy_dict, fl_nm, indent=2, cls=MontyEncoder)
开发者ID:ghasemi-m,项目名称:pydii,代码行数:28,代码来源:gen_def_energy.py
示例3: run_task
def run_task(self, fw_spec):
transformations = []
transformation_params = self.get("transformation_params",
[{} for i in range(len(self["transformations"]))])
for t in self["transformations"]:
found = False
for m in ["advanced_transformations", "defect_transformations",
"site_transformations", "standard_transformations"]:
mod = import_module("pymatgen.transformations.{}".format(m))
try:
t_cls = getattr(mod, t)
except AttributeError:
continue
t_obj = t_cls(**transformation_params.pop(0))
transformations.append(t_obj)
found = True
if not found:
raise ValueError("Could not find transformation: {}".format(t))
# TODO: @matk86 - should prev_calc_dir use CONTCAR instead of POSCAR? Note that if
# current dir, maybe it is POSCAR indeed best ... -computron
structure = self['structure'] if not self.get('prev_calc_dir', None) else \
Poscar.from_file(os.path.join(self['prev_calc_dir'], 'POSCAR')).structure
ts = TransformedStructure(structure)
transmuter = StandardTransmuter([ts], transformations)
final_structure = transmuter.transformed_structures[-1].final_structure.copy()
vis_orig = self["vasp_input_set"]
vis_dict = vis_orig.as_dict()
vis_dict["structure"] = final_structure.as_dict()
vis_dict.update(self.get("override_default_vasp_params", {}) or {})
vis = vis_orig.__class__.from_dict(vis_dict)
vis.write_input(".")
dumpfn(transmuter.transformed_structures[-1], "transformations.json")
开发者ID:montoyjh,项目名称:MatMethods,代码行数:35,代码来源:write_inputs.py
示例4: run
def run(self, job_cmd=None):
"""
run the vasp jobs through custodian
if the job list is empty,
run a single job with the initial input set
"""
for j in self.jobs:
if job_cmd is not None:
j.job_cmd = job_cmd
else:
j.job_cmd = self.job_cmd
c_params = {'jobs': [j.as_dict() for j in self.jobs],
'handlers': [h.as_dict() for h in self.handlers],
'max_errors': 5}
c = Custodian(self.handlers, self.jobs, max_errors=5)
c.run()
for j in self.jobs:
self.cal_log.append({"job": j.as_dict(),
'job_id': j.job_id,
"corrections": [],
'final_energy': None})
self.job_ids.append(j.job_id)
if self.checkpoint_file:
dumpfn(self.cal_log, self.checkpoint_file,
cls=MontyEncoder, indent=4)
else:
dumpfn(self.cal_log, Calibrate.LOG_FILE, cls=MontyEncoder,
indent=4)
开发者ID:zhuyizhou,项目名称:MPInterfaces,代码行数:28,代码来源:calibrate.py
示例5: setup
def setup(self):
"""
Performs initial setup for VaspJob, including overriding any settings
and backing up.
"""
decompress_dir('.')
if self.backup:
for f in VASP_INPUT_FILES:
shutil.copy(f, "{}.orig".format(f))
if self.auto_npar:
try:
incar = Incar.from_file("INCAR")
# Only optimized NPAR for non-HF and non-RPA calculations.
if not (incar.get("LHFCALC") or incar.get("LRPA") or
incar.get("LEPSILON")):
if incar.get("IBRION") in [5, 6, 7, 8]:
# NPAR should not be set for Hessian matrix
# calculations, whether in DFPT or otherwise.
del incar["NPAR"]
else:
import multiprocessing
# try sge environment variable first
# (since multiprocessing counts cores on the current
# machine only)
ncores = os.environ.get('NSLOTS') or \
multiprocessing.cpu_count()
ncores = int(ncores)
for npar in range(int(math.sqrt(ncores)),
ncores):
if ncores % npar == 0:
incar["NPAR"] = npar
break
incar.write_file("INCAR")
except:
pass
if self.auto_continue:
if os.path.exists("continue.json"):
actions = loadfn("continue.json").get("actions")
logger.info("Continuing previous VaspJob. Actions: {}".format(actions))
backup(VASP_BACKUP_FILES, prefix="prev_run")
VaspModder().apply_actions(actions)
else:
# Default functionality is to copy CONTCAR to POSCAR and set
# ISTART to 1 in the INCAR, but other actions can be specified
if self.auto_continue is True:
actions = [{"file": "CONTCAR",
"action": {"_file_copy": {"dest": "POSCAR"}}},
{"dict": "INCAR",
"action": {"_set": {"ISTART": 1}}}]
else:
actions = self.auto_continue
dumpfn({"actions": actions}, "continue.json")
if self.settings_override is not None:
VaspModder().apply_actions(self.settings_override)
开发者ID:materialsproject,项目名称:custodian,代码行数:59,代码来源:jobs.py
示例6: generate_single_job_dict
def generate_single_job_dict():
"""
Used to generate test dictionary for single jobs.
"""
single_job_dict = {}
for file in single_job_out_names:
single_job_dict[file] = QCOutput(os.path.join(test_dir, file)).data
dumpfn(single_job_dict, "single_job.json")
开发者ID:ExpHP,项目名称:pymatgen,代码行数:8,代码来源:test_outputs.py
示例7: run
def run(self):
"""
Override of Custodian.run() to include instructions to copy the
temp_dir to the scratch partition on slave compute nodes if requested.
"""
cwd = os.getcwd()
with ScratchDir(self.scratch_dir, create_symbolic_link=True,
copy_to_current_on_exit=True,
copy_from_current_on_enter=True) as temp_dir:
self._manage_node_scratch(temp_dir_path=temp_dir,
job_start=True)
self.total_errors = 0
start = datetime.datetime.now()
logger.info("Run started at {} in {}.".format(
start, temp_dir))
v = sys.version.replace("\n", " ")
logger.info("Custodian running on Python version {}".format(v))
try:
# skip jobs until the restart
for job_n, job in islice(enumerate(self.jobs, 1),
self.restart, None):
self._run_job(job_n, job, temp_dir)
# Checkpoint after each job so that we can recover from
# last point and remove old checkpoints
if self.checkpoint:
super(SSHCustodian, self)._save_checkpoint(cwd, job_n)
except CustodianError as ex:
logger.error(ex.message)
if ex.raises:
raise RuntimeError("{} errors reached: {}. Exited..."
.format(self.total_errors, ex))
finally:
# Log the corrections to a json file.
logger.info("Logging to {}...".format(super(SSHCustodian,
self).LOG_FILE))
dumpfn(self.run_log, super(SSHCustodian, self).LOG_FILE,
cls=MontyEncoder, indent=4)
end = datetime.datetime.now()
logger.info("Run ended at {}.".format(end))
run_time = end - start
logger.info("Run completed. Total time taken = {}."
.format(run_time))
# Remove duplicate copy of log file, provided it ends with
# ".log"
for x in ([x for x in os.listdir(temp_dir)
if re.match(r'\w*\.log', x)]):
os.remove(os.path.join(temp_dir, x))
self._manage_node_scratch(temp_dir_path=temp_dir,
job_start=False)
if self.gzipped_output:
gzip_dir(".")
# Cleanup checkpoint files (if any) if run is successful.
super(SSHCustodian, self)._delete_checkpoints(cwd)
return self.run_log
开发者ID:jkglasbrenner,项目名称:sshcustodian,代码行数:58,代码来源:sshcustodian.py
示例8: run
def run(self):
"""
Runs all the jobs jobs.
Returns:
All errors encountered as a list of list.
[[error_dicts for job 1], [error_dicts for job 2], ....]
"""
cwd = os.getcwd()
with ScratchDir(self.scratch_dir, create_symbolic_link=True,
copy_to_current_on_exit=True,
copy_from_current_on_enter=True) as temp_dir:
self.total_errors = 0
start = datetime.datetime.now()
logger.info("Run started at {} in {}.".format(
start, temp_dir))
v = sys.version.replace("\n", " ")
logger.info("Custodian running on Python version {}".format(v))
logger.info("Hostname: {}, Cluster: {}".format(
*get_execution_host_info()))
try:
# skip jobs until the restart
for job_n, job in islice(enumerate(self.jobs, 1),
self.restart, None):
self._run_job(job_n, job)
# Checkpoint after each job so that we can recover from last
# point and remove old checkpoints
if self.checkpoint:
self.restart = job_n
Custodian._save_checkpoint(cwd, job_n)
except CustodianError as ex:
logger.error(ex.message)
if ex.raises:
raise RuntimeError("{} errors reached: {}. Exited..."
.format(self.total_errors, ex))
finally:
# Log the corrections to a json file.
logger.info("Logging to {}...".format(Custodian.LOG_FILE))
dumpfn(self.run_log, Custodian.LOG_FILE, cls=MontyEncoder,
indent=4)
end = datetime.datetime.now()
logger.info("Run ended at {}.".format(end))
run_time = end - start
logger.info("Run completed. Total time taken = {}."
.format(run_time))
if self.gzipped_output:
gzip_dir(".")
# Cleanup checkpoint files (if any) if run is successful.
Custodian._delete_checkpoints(cwd)
return self.run_log
开发者ID:xhqu1981,项目名称:custodian,代码行数:54,代码来源:custodian.py
示例9: generate_multi_job_dict
def generate_multi_job_dict():
"""
Used to generate test dictionary for multiple jobs
"""
multi_job_dict = {}
for file in multi_job_out_names:
outputs = QCOutput.multiple_outputs_from_file(QCOutput, os.path.join(test_dir, file), keep_sub_files=False)
data = []
for sub_output in outputs:
data.append(sub_output.data)
multi_job_dict[file] = data
dumpfn(multi_job_dict, "multi_job.json")
开发者ID:czhengsci,项目名称:pymatgen,代码行数:12,代码来源:test_outputs.py
示例10: update_checkpoint
def update_checkpoint(launchpad, launch_id, checkpoint):
"""
Helper function to update checkpoint
Args:
launchpad (LaunchPad): LaunchPad to ping with checkpoint data
launch_id (int): launch id to update
checkpoint (dict): checkpoint data
"""
if launchpad:
launchpad.ping_launch(launch_id, checkpoint=checkpoint)
else:
offline_info = loadfn("FW_offline.json")
offline_info.update({"checkpoint": checkpoint})
dumpfn(offline_info, "FW_offline.json")
开发者ID:gpetretto,项目名称:fireworks,代码行数:15,代码来源:rocket.py
示例11: add_config_var
def add_config_var(args):
d = {}
if os.path.exists(SETTINGS_FILE):
shutil.copy(SETTINGS_FILE, SETTINGS_FILE + ".bak")
print("Existing %s backed up to %s"
% (SETTINGS_FILE, SETTINGS_FILE + ".bak"))
d = loadfn(SETTINGS_FILE)
toks = args.var_spec
if len(toks) % 2 != 0:
print("Bad variable specification!")
sys.exit(-1)
for i in range(int(len(toks) / 2)):
d[toks[2 * i]] = toks[2 * i + 1]
dumpfn(d, SETTINGS_FILE, default_flow_style=False)
print("New %s written!" % (SETTINGS_FILE))
开发者ID:ExpHP,项目名称:pymatgen,代码行数:15,代码来源:pmg_config.py
示例12: _do_check
def _do_check(self, handlers, terminate_func=None):
"""
checks the specified handlers. Returns True iff errors caught
"""
corrections = []
for h in handlers:
try:
if h.check():
if h.max_num_corrections is not None \
and h.n_applied_corrections >= h.max_num_corrections:
msg = "Maximum number of corrections {} reached " \
"for handler {}".format(h.max_num_corrections, h)
if h.raise_on_max:
self.run_log[-1]["handler"] = h
self.run_log[-1]["max_errors_per_handler"] = True
raise MaxCorrectionsPerHandlerError(msg, True, h.max_num_corrections, h)
else:
logger.warning(msg+" Correction not applied.")
continue
if terminate_func is not None and h.is_terminating:
logger.info("Terminating job")
terminate_func()
# make sure we don't terminate twice
terminate_func = None
d = h.correct()
d["handler"] = h
logger.error("\n" + pformat(d, indent=2, width=-1))
corrections.append(d)
h.n_applied_corrections += 1
except Exception:
if not self.skip_over_errors:
raise
else:
import traceback
logger.error("Bad handler %s " % h)
logger.error(traceback.format_exc())
corrections.append(
{"errors": ["Bad handler %s " % h],
"actions": []})
self.total_errors += len(corrections)
self.errors_current_job += len(corrections)
self.run_log[-1]["corrections"].extend(corrections)
# We do a dump of the run log after each check.
dumpfn(self.run_log, Custodian.LOG_FILE, cls=MontyEncoder,
indent=4)
return len(corrections) > 0
开发者ID:materialsproject,项目名称:custodian,代码行数:46,代码来源:custodian.py
示例13: do_query
def do_query(args):
m = MPRester()
try:
criteria = json.loads(args.criteria)
except json.decoder.JSONDecodeError:
criteria = args.criteria
if args.structure:
count = 0
for d in m.query(criteria, properties=["structure", "task_id"]):
s = d["structure"]
formula = re.sub(r"\s+", "", s.formula)
if args.structure == "poscar":
fname = "POSCAR.%s_%s" % (d["task_id"], formula)
else:
fname = "%s-%s.%s" % (d["task_id"], formula, args.structure)
s.to(filename=fname)
count += 1
print("%d structures written!" % count)
elif args.entries:
entries = m.get_entries(criteria)
dumpfn(entries, args.entries)
print("%d entries written to %s!" % (len(entries), args.entries))
else:
props = ["e_above_hull", "spacegroup"]
props += args.data
entries = m.get_entries(criteria, property_data=props)
t = []
headers = ["mp-id", "Formula", "Spacegroup", "E/atom (eV)",
"E above hull (eV)"] + args.data
for e in entries:
row = [e.entry_id, e.composition.reduced_formula,
e.data["spacegroup"]["symbol"],
e.energy_per_atom, e.data["e_above_hull"]]
row += [e.data[s] for s in args.data]
t.append(row)
t = sorted(t, key=lambda x: x[headers.index("E above hull (eV)")])
print(tabulate(t, headers=headers, tablefmt="pipe", floatfmt=".3f"))
开发者ID:albalu,项目名称:pymatgen,代码行数:39,代码来源:pmg_query.py
示例14: pmg_dump
def pmg_dump(obj, filename, **kwargs):
"""
Dump an object to a json file using MontyEncoder. Note that these
objects can be lists, dicts or otherwise nested pymatgen objects that
support the as_dict() and from_dict MSONable protocol.
Args:
obj (object): Object to dump.
filename (str): Filename of file to open. Can be gzipped or bzipped.
\*\*kwargs: Any of the keyword arguments supported by the json.dump
method.
"""
return dumpfn(obj, filename, **kwargs)
开发者ID:AtlasL,项目名称:pymatgen,代码行数:13,代码来源:json_coders.py
示例15: test_dumpfn_loadfn
def test_dumpfn_loadfn(self):
d = {"hello": "world"}
dumpfn(d, "monte_test.json", indent=4)
d2 = loadfn("monte_test.json")
self.assertEqual(d, d2)
os.remove("monte_test.json")
dumpfn(d, "monte_test.yaml", default_flow_style=False)
d2 = loadfn("monte_test.yaml")
self.assertEqual(d, d2)
dumpfn(d, "monte_test.yaml", Dumper=Dumper)
d2 = loadfn("monte_test.yaml")
os.remove("monte_test.yaml")
dumpfn(d, "monte_test.mpk")
d2 = loadfn("monte_test.mpk")
self.assertEqual(d, {k.decode('utf-8'): v.decode('utf-8') for k, v in d2.items()})
os.remove("monte_test.mpk")
开发者ID:dwinston,项目名称:monty,代码行数:16,代码来源:test_serialization.py
示例16: test_dumpf_loadf
def test_dumpf_loadf(self):
d = {"hello": "world"}
dumpfn(d, "monte_test.json", indent=4)
d2 = loadfn("monte_test.json")
self.assertEqual(d, d2)
os.remove("monte_test.json")
dumpfn(d, "monte_test.yaml", default_flow_style=False)
d2 = loadfn("monte_test.yaml")
self.assertEqual(d, d2)
dumpfn(d, "monte_test.yaml", Dumper=Dumper)
d2 = loadfn("monte_test.yaml")
os.remove("monte_test.yaml")
开发者ID:gmrigna,项目名称:monty,代码行数:12,代码来源:test_serialization.py
示例17: solute_def_parse_energy
def solute_def_parse_energy(args):
mpid = args.mpid
solute = args.solute
mapi_key = args.mapi_key
if not mpid:
print ("============\nERROR: Provide an mpid\n============")
return
if not solute:
print ("============\nERROR: Provide solute element\n============")
return
if not mapi_key:
with MPRester() as mp:
structure = mp.get_structure_by_material_id(mpid)
else:
with MPRester(mapi_key) as mp:
structure = mp.get_structure_by_material_id(mpid)
energy_dict = {}
solutes = []
def_folders = glob.glob(os.path.join(
mpid,"solute*subspecie-{}".format(solute)))
def_folders += glob.glob(os.path.join(mpid,"bulk"))
for defdir in def_folders:
fldr_name = os.path.split(defdir)[1]
vr_file = os.path.join(defdir,'vasprun.xml')
if not os.path.exists(vr_file):
print (fldr_name, ": vasprun.xml doesn't exist in the folder. " \
"Abandoning parsing of energies for {}".format(mpid))
break # Further processing for the mpid is not useful
try:
vr = Vasprun(vr_file)
except:
print (fldr_name, ":Failure, couldn't parse vaprun.xml file. "
"Abandoning parsing of energies for {}".format(mpid))
break
if not vr.converged:
print (fldr_name, ": Vasp calculation not converged. "
"Abandoning parsing of energies for {}".format(mpid))
break # Further processing for the mpid is not useful
fldr_fields = fldr_name.split("_")
if 'bulk' in fldr_fields:
bulk_energy = vr.final_energy
bulk_sites = vr.structures[-1].num_sites
elif 'solute' in fldr_fields:
site_index = int(fldr_fields[1])
site_multiplicity = int(fldr_fields[2].split("-")[1])
site_specie = fldr_fields[3].split("-")[1]
substitution_specie = fldr_fields[4].split("-")[1]
energy = vr.final_energy
solutes.append({'site_index':site_index,
'site_specie':site_specie,'energy':energy,
'substitution_specie':substitution_specie,
'site_multiplicity':site_multiplicity
})
else:
if not solutes:
print("Solute folders do not exist")
return {}
print("Solute {} calculations successful for {}".format(solute,mpid))
for solute in solutes:
solute_flip_energy = solute['energy']-bulk_energy
solute['energy'] = solute_flip_energy
solutes.sort(key=lambda entry: entry['site_index'])
energy_dict[mpid] = {'solutes':solutes}
fl_nm = mpid+'_solute-'+args.solute+'_raw_defect_energy.json'
dumpfn(energy_dict, fl_nm, indent=2, cls=MontyEncoder)
开发者ID:bocklund,项目名称:pymatgen,代码行数:73,代码来源:pydii.py
示例18: update_checkpoint
def update_checkpoint(job_ids=None, jfile=None, **kwargs):
"""
rerun the jobs with job ids in the job_ids list. The jobs are
read from the json checkpoint file, jfile.
If no job_ids are given then the checkpoint file will
be updated with corresponding final energy
Args:
job_ids: list of job ids to update or q resolve
jfile: check point file
"""
cal_log = loadfn(jfile, cls=MontyDecoder)
cal_log_new = []
all_jobs = []
run_jobs = []
handlers = []
final_energy = None
incar = None
kpoints = None
qadapter = None
#if updating the specs of the job
for k, v in kwargs.items():
if k == 'incar':
incar = v
if k == 'kpoints':
kpoints = v
if k == 'que':
qadapter = v
for j in cal_log:
job = j["job"]
job.job_id = j['job_id']
all_jobs.append(job)
if job_ids and (j['job_id'] in job_ids or job.job_dir in job_ids):
logger.info('setting job {0} in {1} to rerun'.format(j['job_id'], job.job_dir))
contcar_file = job.job_dir+os.sep+'CONTCAR'
poscar_file = job.job_dir+os.sep+'POSCAR'
if os.path.isfile(contcar_file) and len(open(contcar_file).readlines()) != 0 :
logger.info('setting poscar file from {}'
.format(contcar_file))
job.vis.poscar = Poscar.from_file(contcar_file)
else:
logger.info('setting poscar file from {}'
.format(poscar_file))
job.vis.poscar = Poscar.from_file(poscar_file)
if incar:
logger.info('incar overridden')
job.vis.incar = incar
if kpoints:
logger.info('kpoints overridden')
job.vis.kpoints = kpoints
if qadapter:
logger.info('qadapter overridden')
job.vis.qadapter = qadapter
run_jobs.append(job)
if run_jobs:
c = Custodian(handlers, run_jobs, max_errors=5)
c.run()
for j in all_jobs:
final_energy = j.get_final_energy()
cal_log_new.append({"job": j.as_dict(),
'job_id': j.job_id,
"corrections": [],
'final_energy': final_energy})
dumpfn(cal_log_new, jfile, cls=MontyEncoder,
indent=4)
开发者ID:zhuyizhou,项目名称:MPInterfaces,代码行数:64,代码来源:utils.py
示例19: save
def save(self, fname="Transport_Properties.json"):
dumpfn(self.props_dict, fname)
开发者ID:ExpHP,项目名称:pymatgen,代码行数:2,代码来源:boltztrap2.py
示例20: run_interrupted
def run_interrupted(self):
"""
Runs custodian in a interuppted mode, which sets up and
validates jobs but doesn't run the executable
Returns:
number of remaining jobs
Raises:
CustodianError on unrecoverable errors, and jobs that fail
validation
"""
try:
cwd = os.getcwd()
start = datetime.datetime.now()
v = sys.version.replace("\n", " ")
logger.info("Custodian started in singleshot mode at {} in {}."
.format(start, cwd))
logger.info("Custodian running on Python version {}".format(v))
# load run log
if os.path.exists(Custodian.LOG_FILE):
self.run_log = loadfn(Custodian.LOG_FILE, cls=MontyDecoder)
if len(self.run_log) == 0:
# starting up an initial job - setup input and quit
job_n = 0
job = self.jobs[job_n]
logger.info("Setting up job no. 1 ({}) ".format(job.name))
job.setup()
self.run_log.append({"job": job.as_dict(), "corrections": [], 'job_n': job_n})
return len(self.jobs)
else:
# Continuing after running calculation
job_n = self.run_log[-1]['job_n']
job = self.jobs[job_n]
# If we had to fix errors from a previous run, insert clean log
# dict
if len(self.run_log[-1]['corrections']) > 0:
logger.info("Reran {}.run due to fixable errors".format(job.name))
# check error handlers
logger.info("Checking error handlers for {}.run".format(job.name))
if self._do_check(self.handlers):
logger.info("Failed validation based on error handlers")
# raise an error for an unrecoverable error
for x in self.run_log[-1]["corrections"]:
if not x["actions"] and x["handler"].raises_runtime_error:
s = "Unrecoverable error for handler: {}. " \
"Raising RuntimeError".format(x["handler"])
raise CustodianError(s, True, x["handler"])
logger.info("Corrected input based on error handlers")
# Return with more jobs to run if recoverable error caught
# and corrected for
return len(self.jobs) - job_n
# check validators
logger.info("Checking validator for {}.run".format(job.name))
for v in self.validators:
if v.check():
logger.info("Failed validation based on validator")
s = "Validation failed: {}".format(v)
raise CustodianError(s, True, v)
logger.info("Postprocessing for {}.run".format(job.name))
job.postprocess()
# IF DONE WITH ALL JOBS - DELETE ALL CHECKPOINTS AND RETURN
# VALIDATED
if len(self.jobs) == (job_n + 1):
self.finished = True
return 0
# Setup next job_n
job_n += 1
job = self.jobs[job_n]
self.run_log.append({"job": job.as_dict(), "corrections": [],
'job_n': job_n})
job.setup()
return len(self.jobs) - job_n
except CustodianError as ex:
logger.error(ex.message)
if ex.raises:
raise RuntimeError("{} errors reached: {}. Exited..."
.format(self.total_errors, ex))
finally:
#Log the corrections to a json file.
logger.info("Logging to {}...".format(Custodian.LOG_FILE))
dumpfn(self.run_log, Custodian.LOG_FILE, cls=MontyEncoder,
indent=4)
end = datetime.datetime.now()
logger.info("Run ended at {}.".format(end))
run_time = end - start
logger.info("Run completed. Total time taken = {}."
.format(run_time))
if self.finished and self.gzipped_output:
#.........这里部分代码省略.........
开发者ID:davidwaroquiers,项目名称:custodian,代码行数:101,代码来源:custodian.py
注:本文中的monty.serialization.dumpfn函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论