本文整理汇总了Python中mrjob.conf.combine_local_envs函数的典型用法代码示例。如果您正苦于以下问题:Python combine_local_envs函数的具体用法?Python combine_local_envs怎么用?Python combine_local_envs使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了combine_local_envs函数的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _env_for_task
def _env_for_task(self, task_type, step_num, task_num, map_split=None):
"""Set up environment variables for a subprocess (mapper, etc.)
This combines, in decreasing order of priority:
* environment variables set by the **cmdenv** option
* **jobconf** environment variables set by our job (e.g.
``mapreduce.task.ismap`)
* environment variables from **jobconf** options, translated to
whatever version of Hadoop we're emulating
* the current environment
* PYTHONPATH set to current working directory
We use :py:func:`~mrjob.conf.combine_local_envs`, so ``PATH``
environment variables are handled specially.
"""
user_jobconf = self._jobconf_for_step(step_num)
simulated_jobconf = self._simulate_jobconf_for_step(
task_type, step_num, task_num, map_split)
def to_env(jobconf):
return dict((k.replace('.', '_'), str(v))
for k, v in jobconf.items())
# keep the current environment because we need PATH to find binaries
# and make PYTHONPATH work
return combine_local_envs(os.environ,
to_env(user_jobconf),
to_env(simulated_jobconf),
self._opts['cmdenv'])
开发者ID:Affirm,项目名称:mrjob,代码行数:31,代码来源:sim.py
示例2: _load_steps
def _load_steps(self):
args = (self._executable(True) + ['--steps'] +
self._mr_job_extra_args(local=True))
log.debug('> %s' % cmd_line(args))
# add . to PYTHONPATH (in case mrjob isn't actually installed)
env = combine_local_envs(os.environ,
{'PYTHONPATH': os.path.abspath('.')})
steps_proc = Popen(args, stdout=PIPE, stderr=PIPE, env=env)
stdout, stderr = steps_proc.communicate()
if steps_proc.returncode != 0:
raise Exception(
'error getting step information: \n%s' % stderr)
# on Python 3, convert stdout to str so we can json.loads() it
if not isinstance(stdout, str):
stdout = stdout.decode('utf_8')
try:
steps = json.loads(stdout)
except ValueError:
raise ValueError("Bad --steps response: \n%s" % stdout)
# verify that this is a proper step description
if not steps or not stdout:
raise ValueError('step description is empty!')
return steps
开发者ID:Affirm,项目名称:mrjob,代码行数:29,代码来源:bin.py
示例3: test_paths
def test_paths(self):
assert_equal(combine_local_envs(
{'PATH': '/bin:/usr/bin',
'PYTHONPATH': '/usr/lib/python/site-packages',
'PS1': '> '},
{'PATH': '/home/dave/bin',
'PYTHONPATH': '/home/dave/python',
'CLASSPATH': '/home/dave/java',
'PS1': '\w> '}),
{'PATH': '/home/dave/bin;/bin:/usr/bin',
'PYTHONPATH': '/home/dave/python;/usr/lib/python/site-packages',
'CLASSPATH': '/home/dave/java',
'PS1': '\w> '})
开发者ID:gimlids,项目名称:LTPM,代码行数:13,代码来源:conf_test.py
示例4: _get_steps
def _get_steps(self):
"""Call the job script to find out how many steps it has, and whether
there are mappers and reducers for each step. Validate its
output.
Returns output as described in :ref:`steps-format`.
Results are cached, so call this as many times as you want.
"""
if self._steps is None:
if not self._script_path:
self._steps = []
else:
args = (self._executable(True) + ['--steps'] +
self._mr_job_extra_args(local=True))
log.debug('> %s' % cmd_line(args))
# add . to PYTHONPATH (in case mrjob isn't actually installed)
env = combine_local_envs(os.environ,
{'PYTHONPATH': os.path.abspath('.')})
steps_proc = Popen(args, stdout=PIPE, stderr=PIPE, env=env)
stdout, stderr = steps_proc.communicate()
if steps_proc.returncode != 0:
raise Exception(
'error getting step information: \n%s' % stderr)
# on Python 3, convert stdout to str so we can json.loads() it
if not isinstance(stdout, str):
stdout = stdout.decode('utf_8')
try:
steps = json.loads(stdout)
except ValueError:
raise ValueError("Bad --steps response: \n%s" % stdout)
# verify that this is a proper step description
if not steps or not stdout:
raise ValueError('step description is empty!')
for step in steps:
if step['type'] not in STEP_TYPES:
raise ValueError(
'unexpected step type %r in steps %r' % (
step['type'], stdout))
self._steps = steps
return self._steps
开发者ID:parastoo-62,项目名称:mrjob,代码行数:47,代码来源:runner.py
示例5: _get_steps
def _get_steps(self):
"""Call the mr_job to find out how many steps it has, and whether
there are mappers and reducers for each step. Validate its
output.
Returns output like ['MR', 'M']
(two steps, second only has a mapper)
We'll cache the result (so you can call _get_steps() as many times
as you want)
"""
if self._steps is None:
if not self._script:
self._steps = []
else:
# don't use self._opts['python_bin'] because that
# refers to the python binary to use inside Hadoop
python_bin = sys.executable or 'python'
args = ([python_bin, self._script['path'], '--steps'] +
self._mr_job_extra_args(local=True))
log.debug('> %s' % cmd_line(args))
# add . to PYTHONPATH (in case mrjob isn't actually installed)
env = combine_local_envs(os.environ,
{'PYTHONPATH': os.path.abspath('.')})
steps_proc = Popen(args, stdout=PIPE, stderr=PIPE, env=env)
stdout, stderr = steps_proc.communicate()
if steps_proc.returncode != 0:
raise Exception(
'error getting step information: %s', stderr)
steps = stdout.strip().split(' ')
# verify that this is a proper step description
if not steps:
raise ValueError('step description is empty!')
for step in steps:
if step not in ('MR', 'M'):
raise ValueError(
'unexpected step type %r in steps %r' %
(step, stdout))
self._steps = steps
return self._steps
开发者ID:AntonKast,项目名称:mrjob,代码行数:45,代码来源:runner.py
示例6: test_paths
def test_paths(self):
self.assertEqual(
combine_local_envs(
{"PATH": "/bin:/usr/bin", "PYTHONPATH": "/usr/lib/python/site-packages", "PS1": "> "},
{
"PATH": "/home/dave/bin",
"PYTHONPATH": "/home/dave/python",
"CLASSPATH": "/home/dave/java",
"PS1": "\w> ",
},
),
{
"PATH": "/home/dave/bin;/bin:/usr/bin",
"PYTHONPATH": "/home/dave/python;/usr/lib/python/site-packages",
"CLASSPATH": "/home/dave/java",
"PS1": "\w> ",
},
)
开发者ID:nyccto,项目名称:mrjob,代码行数:18,代码来源:test_conf.py
示例7: _get_steps
def _get_steps(self):
"""Call the job script to find out how many steps it has, and whether
there are mappers and reducers for each step. Validate its
output.
Returns output as described in :ref:`steps-format`. Results are
cached to avoid round trips to a subprocess.
"""
if self._steps is None:
if not self._script:
self._steps = []
else:
args = (self._opts['steps_python_bin'] +
[self._script['path'], '--steps'] +
self._mr_job_extra_args(local=True))
log.debug('> %s' % cmd_line(args))
# add . to PYTHONPATH (in case mrjob isn't actually installed)
env = combine_local_envs(os.environ,
{'PYTHONPATH': os.path.abspath('.')})
steps_proc = Popen(args, stdout=PIPE, stderr=PIPE, env=env)
stdout, stderr = steps_proc.communicate()
if steps_proc.returncode != 0:
raise Exception(
'error getting step information: %s', stderr)
try:
steps = json.loads(stdout)
except json.JSONDecodeError:
raise ValueError("Bad --steps response: \n%s" % stdout)
# verify that this is a proper step description
if not steps or not stdout:
raise ValueError('step description is empty!')
for step in steps:
if step['type'] not in STEP_TYPES:
raise ValueError(
'unexpected step type %r in steps %r' %
(step['type'], stdout))
self._steps = steps
return self._steps
开发者ID:derwiki,项目名称:mrjob,代码行数:43,代码来源:runner.py
示例8: _subprocess_env
def _subprocess_env(self, step_type, step_num, task_num, input_file=None,
input_start=None, input_length=None):
"""Set up environment variables for a subprocess (mapper, etc.)
This combines, in decreasing order of priority:
* environment variables set by the **cmdenv** option
* **jobconf** environment variables set by our job (e.g.
``mapreduce.task.ismap`)
* environment variables from **jobconf** options, translated to
whatever version of Hadoop we're emulating
* the current environment
* PYTHONPATH set to current working directory
We use :py:func:`~mrjob.conf.combine_local_envs`, so ``PATH``
environment variables are handled specially.
"""
version = self.get_hadoop_version()
jobconf_env = dict(
(translate_jobconf(k, version).replace('.', '_'), str(v))
for (k, v) in self._opts['jobconf'].iteritems())
internal_jobconf = self._simulate_jobconf_for_step(
step_type, step_num, task_num, input_file=input_file,
input_start=input_start, input_length=input_length)
internal_jobconf_env = dict(
(translate_jobconf(k, version).replace('.', '_'), str(v))
for (k, v) in internal_jobconf.iteritems())
ironpython_env = {'IRONPYTHONPATH': os.getcwd()} if is_ironpython \
else {}
# keep the current environment because we need PATH to find binaries
# and make PYTHONPATH work
return combine_local_envs({'PYTHONPATH': os.getcwd()},
ironpython_env,
os.environ,
jobconf_env,
internal_jobconf_env,
self._get_cmdenv())
开发者ID:nyccto,项目名称:mrjob,代码行数:42,代码来源:local.py
示例9: _get_steps
def _get_steps(self):
"""Call the job script to find out how many steps it has, and whether
there are mappers and reducers for each step. Validate its
output.
Returns output as described in :ref:`steps-format`.
Results are cached, so call this as many times as you want.
"""
if self._steps is None:
if not self._script_path:
self._steps = []
else:
args = self._executable(True) + ["--steps"] + self._mr_job_extra_args(local=True)
log.debug("> %s" % cmd_line(args))
# add . to PYTHONPATH (in case mrjob isn't actually installed)
env = combine_local_envs(os.environ, {"PYTHONPATH": os.path.abspath(".")})
steps_proc = Popen(args, stdout=PIPE, stderr=PIPE, env=env)
stdout, stderr = steps_proc.communicate()
if steps_proc.returncode != 0:
raise Exception("error getting step information: \n%s" % stderr)
try:
steps = json.loads(stdout)
except JSONDecodeError:
raise ValueError("Bad --steps response: \n%s" % stdout)
# verify that this is a proper step description
if not steps or not stdout:
raise ValueError("step description is empty!")
for step in steps:
if step["type"] not in STEP_TYPES:
raise ValueError("unexpected step type %r in steps %r" % (step["type"], stdout))
self._steps = steps
return self._steps
开发者ID:swiftserve,项目名称:mrjob,代码行数:38,代码来源:runner.py
示例10: _invoke_step
def _invoke_step(self, args, outfile_name, env=None):
"""Run the given command, outputting into outfile, and reading
from the previous outfile (or, for the first step, from our
original output files).
outfile is a path relative to our local tmp dir. commands are run
inside self._working_dir
We'll intelligently handle stderr from the process.
"""
# keep the current environment because we need PATH to find binaries
# and make PYTHONPATH work
env = combine_local_envs(
{'PYTHONPATH': os.getcwd()},
os.environ,
self._get_cmdenv(),
env or {})
# decide where to get input
if self._prev_outfile is not None:
input_paths = [self._prev_outfile]
else:
input_paths = []
for path in self._input_paths:
if path == '-':
input_paths.append(self._dump_stdin_to_local_file())
else:
input_paths.append(path)
# add input to the command line
for path in input_paths:
args.append(os.path.abspath(path))
log.info('> %s' % cmd_line(args))
# set up outfile
outfile = os.path.join(self._get_local_tmp_dir(), outfile_name)
log.info('writing to %s' % outfile)
log.debug('')
self._prev_outfile = outfile
write_to = open(outfile, 'w')
# run the process
proc = Popen(args, stdout=write_to, stderr=PIPE,
cwd=self._working_dir, env=env)
# handle counters, status msgs, and other stuff on stderr
stderr_lines = self._process_stderr_from_script(proc.stderr)
tb_lines = find_python_traceback(stderr_lines)
self._print_counters()
returncode = proc.wait()
if returncode != 0:
# try to throw a useful exception
if tb_lines:
raise Exception(
'Command %r returned non-zero exit status %d:\n%s' %
(args, returncode, ''.join(tb_lines)))
else:
raise Exception(
'Command %r returned non-zero exit status %d: %s' %
(args, returncode))
# flush file descriptors
write_to.flush()
开发者ID:boursier,项目名称:mrjob,代码行数:67,代码来源:local.py
注:本文中的mrjob.conf.combine_local_envs函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论