本文整理汇总了Python中mrjob.util.cmd_line函数的典型用法代码示例。如果您正苦于以下问题:Python cmd_line函数的具体用法?Python cmd_line怎么用?Python cmd_line使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了cmd_line函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _create_master_bootstrap_script_if_needed
def _create_master_bootstrap_script_if_needed(self):
"""Helper for :py:meth:`_add_bootstrap_files_for_upload`.
Create the master bootstrap script and write it into our local
temp directory. Set self._master_bootstrap_script_path.
This will do nothing if there are no bootstrap scripts or commands,
or if it has already been called."""
if self._master_bootstrap_script_path:
return
# don't bother if we're not starting a cluster
if self._cluster_id:
return
# Also don't bother if we're not bootstrapping
if not (self._bootstrap or self._bootstrap_mrjob()):
return
# create mrjob.zip if we need it, and add commands to install it
mrjob_bootstrap = []
if self._bootstrap_mrjob():
assert self._mrjob_zip_path
path_dict = {
'type': 'file', 'name': None, 'path': self._mrjob_zip_path}
self._bootstrap_dir_mgr.add(**path_dict)
# find out where python keeps its libraries
mrjob_bootstrap.append([
"__mrjob_PYTHON_LIB=$(%s -c "
"'from distutils.sysconfig import get_python_lib;"
" print(get_python_lib())')" %
cmd_line(self._python_bin())])
# unzip mrjob.zip
mrjob_bootstrap.append(
['sudo unzip ', path_dict, ' -d $__mrjob_PYTHON_LIB'])
# re-compile pyc files now, since mappers/reducers can't
# write to this directory. Don't fail if there is extra
# un-compileable crud in the tarball (this would matter if
# sh_bin were 'sh -e')
mrjob_bootstrap.append(
['sudo %s -m compileall -q'
' -f $__mrjob_PYTHON_LIB/mrjob && true' %
cmd_line(self._python_bin())])
# we call the script b.py because there's a character limit on
# bootstrap script names (or there was at one time, anyway)
path = os.path.join(self._get_local_tmp_dir(), 'b.py')
log.info('writing master bootstrap script to %s' % path)
contents = self._master_bootstrap_script_content(
self._bootstrap + mrjob_bootstrap)
for line in contents:
log.debug('BOOTSTRAP: ' + line.rstrip('\r\n'))
with open(path, 'w') as f:
for line in contents:
f.write(line)
self._master_bootstrap_script_path = path
开发者ID:davidmarin,项目名称:mrjob,代码行数:60,代码来源:dataproc.py
示例2: _spark_cmdenv
def _spark_cmdenv(self, step_num):
"""Returns a dictionary mapping environment variable to value,
including mapping PYSPARK_PYTHON to self._python_bin()
"""
step = self._get_step(step_num)
cmdenv = {}
if step['type'] in ('spark', 'spark_script'): # not spark_jar
driver_python = cmd_line(self._python_bin())
if self._spark_python_wrapper_path:
executor_python = './%s' % self._working_dir_mgr.name(
'file', self._spark_python_wrapper_path)
else:
executor_python = cmd_line(self._task_python_bin())
if self._spark_deploy_mode() == 'cluster':
# treat driver like executors (they run in same environment)
cmdenv['PYSPARK_PYTHON'] = executor_python
elif driver_python == executor_python:
# no difference, just set $PYSPARK_PYTHON
cmdenv['PYSPARK_PYTHON'] = driver_python
else:
# set different pythons for driver and executor
cmdenv['PYSPARK_PYTHON'] = executor_python
cmdenv['PYSPARK_DRIVER_PYTHON'] = driver_python
cmdenv.update(self._opts['cmdenv'])
return cmdenv
开发者ID:Affirm,项目名称:mrjob,代码行数:30,代码来源:bin.py
示例3: _run_job_in_hadoop
def _run_job_in_hadoop(self):
# figure out local names for our files
self._name_files()
# send script and wrapper script (if any) to working dir
assert self._script # shouldn't be able to run if no script
self._script['upload'] = 'file'
if self._wrapper_script:
self._wrapper_script['upload'] = 'file'
steps = self._get_steps()
for step_num, step in enumerate(steps):
log.debug('running step %d of %d' % (step_num+1, len(steps)))
streaming_args = [self._opts['hadoop_bin'], 'jar', self._opts['hadoop_streaming_jar']]
# Add extra hadoop args first as hadoop args could be a hadoop
# specific argument (e.g. -libjar) which must come before job
# specific args.
streaming_args.extend(
self._hadoop_conf_args(step_num, len(steps)))
# setup input
for input_uri in self._hdfs_step_input_files(step_num):
streaming_args.extend(['-input', input_uri])
# setup output
streaming_args.append('-output')
streaming_args.append(self._hdfs_step_output_dir(step_num))
# set up uploading from HDFS to the working dir
streaming_args.extend(self._upload_args())
# set up mapper and reducer
streaming_args.append('-mapper')
streaming_args.append(cmd_line(self._mapper_args(step_num)))
if 'R' in step:
streaming_args.append('-reducer')
streaming_args.append(cmd_line(self._reducer_args(step_num)))
else:
streaming_args.extend(['-jobconf', 'mapred.reduce.tasks=0'])
log.debug('> %s' % cmd_line(streaming_args))
step_proc = Popen(streaming_args, stdout=PIPE, stderr=PIPE)
# TODO: use a pty or something so that the hadoop binary
# won't buffer the status messages
self._process_stderr_from_streaming(step_proc.stderr)
# there shouldn't be much output to STDOUT
for line in step_proc.stdout:
log.error('STDOUT: ' + line.strip('\n'))
returncode = step_proc.wait()
if returncode != 0:
raise CalledProcessError(step_proc.returncode, streaming_args)
开发者ID:Jyrsa,项目名称:mrjob,代码行数:57,代码来源:hadoop.py
示例4: _setup_wrapper_script_content
def _setup_wrapper_script_content(
self, setup, manifest=False, wrap_python=False):
"""Return a (Bourne) shell script that runs the setup commands and then
executes whatever is passed to it (this will be our mapper/reducer),
as a list of strings (one for each line, including newlines).
We obtain a file lock so that two copies of the setup commands
cannot run simultaneously on the same machine (this helps for running
:command:`make` on a shared source code archive, for example).
"""
lines = []
# TODO: this is very similar to _start_of_sh_script() in cloud.py
if wrap_python:
# start with shebang
sh_bin = self._sh_bin()
if os.path.isabs(sh_bin[0]):
shebang_bin = sh_bin
else:
shebang_bin = ['/usr/bin/env'] + list(sh_bin)
if len(shebang_bin) > 2:
# Linux limits shebang to one binary and one arg
shebang_bin = shebang_bin[:2]
log.warning('Limiting shebang to two arguments:'
'#!%s' % cmd_line(shebang_bin))
lines.append('#!%s' % cmd_line(shebang_bin))
# hook for 'set -e', etc.
pre_commands = self._sh_pre_commands()
if pre_commands:
for cmd in pre_commands:
lines.append(cmd)
lines.append('')
if setup:
lines.extend(self._setup_cmd_content(setup))
# handle arguments to the script
if wrap_python:
# pretend to be python ([email protected] is arguments to the python binary)
python_bin = self._task_python_bin()
lines.append('%s "[email protected]"' % cmd_line(python_bin))
elif manifest:
# arguments ([email protected]) are a command
# eventually runs: "[email protected]" $INPUT_PATH $INPUT_URI
lines.extend(self._manifest_download_content())
else:
# arguments ([email protected]) are a command, just run it
lines.append('"[email protected]"')
return lines
开发者ID:Affirm,项目名称:mrjob,代码行数:55,代码来源:bin.py
示例5: _invoke_process
def _invoke_process(self, args, outfile_name, env, combiner_args=None):
"""invoke the process described by *args* and write to *outfile_name*
:param combiner_args: If this mapper has a combiner, we need to do
some extra shell wrangling, so pass the combiner
arguments in separately.
:return: dict(proc=Popen, args=[process args], write_to=file)
"""
if combiner_args:
log.info('> %s | sort | %s' %
(cmd_line(args), cmd_line(combiner_args)))
else:
log.info('> %s' % cmd_line(args))
# set up outfile
outfile = os.path.join(self._get_local_tmp_dir(), outfile_name)
log.info('writing to %s' % outfile)
self._prev_outfiles.append(outfile)
write_to = open(outfile, 'w')
with open(outfile, 'w') as write_to:
if combiner_args:
# set up a pipeline: mapper | sort | combiner
mapper_proc = Popen(args, stdout=PIPE, stderr=PIPE,
cwd=self._working_dir, env=env)
sort_proc = Popen(['sort'], stdin=mapper_proc.stdout,
stdout=PIPE, stderr=PIPE,
cwd=self._working_dir, env=env)
combiner_proc = Popen(combiner_args, stdin=sort_proc.stdout,
stdout=write_to, stderr=PIPE,
cwd=self._working_dir, env=env)
# this process shouldn't read from the pipes
mapper_proc.stdout.close()
sort_proc.stdout.close()
return [
{'proc': mapper_proc, 'args': args},
{'proc': sort_proc, 'args': ['sort']},
{'proc': combiner_proc, 'args': combiner_args},
]
else:
# just run the mapper process
proc = Popen(args, stdout=write_to, stderr=PIPE,
cwd=self._working_dir, env=env)
return [{'proc': proc, 'args': args}]
开发者ID:ddehghan,项目名称:mrjob,代码行数:51,代码来源:local.py
示例6: _load_steps
def _load_steps(self):
args = (self._executable(True) + ['--steps'] +
self._mr_job_extra_args(local=True))
log.debug('> %s' % cmd_line(args))
# add . to PYTHONPATH (in case mrjob isn't actually installed)
env = combine_local_envs(os.environ,
{'PYTHONPATH': os.path.abspath('.')})
steps_proc = Popen(args, stdout=PIPE, stderr=PIPE, env=env)
stdout, stderr = steps_proc.communicate()
if steps_proc.returncode != 0:
raise Exception(
'error getting step information: \n%s' % stderr)
# on Python 3, convert stdout to str so we can json.loads() it
if not isinstance(stdout, str):
stdout = stdout.decode('utf_8')
try:
steps = json.loads(stdout)
except ValueError:
raise ValueError("Bad --steps response: \n%s" % stdout)
# verify that this is a proper step description
if not steps or not stdout:
raise ValueError('step description is empty!')
return steps
开发者ID:Affirm,项目名称:mrjob,代码行数:29,代码来源:bin.py
示例7: _parse_setup
def _parse_setup(self):
"""Parse the *setup* option with
:py:func:`mrjob.setup.parse_setup_cmd()`.
If *bootstrap_mrjob* and ``self.BOOTSTRAP_MRJOB_IN_SETUP`` are both
true, create mrjob.tar.gz (if it doesn't exist already) and
prepend a setup command that adds it to PYTHONPATH.
Also patch in the deprecated
options *python_archives*, *setup_cmd*, and *setup_script*
as setup commands.
"""
setup = []
# python_archives
for path in self._opts['python_archives']:
path_dict = parse_legacy_hash_path('archive', path)
setup.append(['export PYTHONPATH=', path_dict, ':$PYTHONPATH'])
# setup
for cmd in self._opts['setup']:
setup.append(parse_setup_cmd(cmd))
# setup_cmds
for cmd in self._opts['setup_cmds']:
if not isinstance(cmd, basestring):
cmd = cmd_line(cmd)
setup.append([cmd])
# setup_scripts
for path in self._opts['setup_scripts']:
path_dict = parse_legacy_hash_path('file', path)
setup.append([path_dict])
return setup
开发者ID:DepengLuan,项目名称:mrjob,代码行数:35,代码来源:runner.py
示例8: _run_job_in_hadoop
def _run_job_in_hadoop(self):
self._counters = []
for step_num in range(self._num_steps()):
log.debug("running step %d of %d" % (step_num + 1, self._num_steps()))
step_args = self._args_for_step(step_num)
log.debug("> %s" % cmd_line(step_args))
# try to use a PTY if it's available
try:
pid, master_fd = pty.fork()
except (AttributeError, OSError):
# no PTYs, just use Popen
step_proc = Popen(step_args, stdout=PIPE, stderr=PIPE)
self._process_stderr_from_streaming(step_proc.stderr)
# there shouldn't be much output to STDOUT
for line in step_proc.stdout:
log.error("STDOUT: " + to_string(line.strip(b"\n")))
returncode = step_proc.wait()
else:
# we have PTYs
if pid == 0: # we are the child process
os.execvp(step_args[0], step_args)
else:
with os.fdopen(master_fd, "rb") as master:
# reading from master gives us the subprocess's
# stderr and stdout (it's a fake terminal)
self._process_stderr_from_streaming(master)
_, returncode = os.waitpid(pid, 0)
if returncode == 0:
# parsing needs step number for whole job
self._fetch_counters([step_num + self._start_step_num])
# printing needs step number relevant to this run of mrjob
self.print_counters([step_num + 1])
else:
msg = "Job failed with return code %d: %s" % (returncode, step_args)
log.error(msg)
# look for a Python traceback
cause = self._find_probable_cause_of_failure([step_num + self._start_step_num])
if cause:
# log cause, and put it in exception
cause_msg = [] # lines to log and put in exception
cause_msg.append("Probable cause of failure (from %s):" % cause["log_file_uri"])
cause_msg.extend(line.strip("\n") for line in cause["lines"])
if cause["input_uri"]:
cause_msg.append("(while reading from %s)" % cause["input_uri"])
for line in cause_msg:
log.error(line)
# add cause_msg to exception message
msg += "\n" + "\n".join(cause_msg) + "\n"
raise CalledProcessError(returncode, step_args)
开发者ID:senseb,项目名称:mrjob,代码行数:60,代码来源:hadoop.py
示例9: _substep_args
def _substep_args(self, step_num, mrc):
step = self._get_step(step_num)
if step[mrc]['type'] == 'command':
cmd = step[mrc]['command']
# never wrap custom hadoop streaming commands in bash
if isinstance(cmd, string_types):
return shlex_split(cmd)
else:
return cmd
elif step[mrc]['type'] == 'script':
script_args = self._script_args_for_step(
step_num, mrc, input_manifest=step.get('input_manifest'))
if 'pre_filter' in step[mrc]:
return self._sh_wrap(
'%s | %s' % (step[mrc]['pre_filter'],
cmd_line(script_args)))
else:
return script_args
else:
raise ValueError("Invalid %s step %d: %r" % (
mrc, step_num, step[mrc]))
开发者ID:Affirm,项目名称:mrjob,代码行数:25,代码来源:bin.py
示例10: test_python_dash_v_as_python_bin
def test_python_dash_v_as_python_bin(self):
python_cmd = cmd_line([sys.executable or 'python', '-v'])
mr_job = MRTwoStepJob(['--python-bin', python_cmd, '--no-conf',
'-r', 'local'])
mr_job.sandbox(stdin=[b'bar\n'])
with mr_job.make_runner() as runner:
runner.run()
# expect python -v crud in stderr
with open(runner._task_stderr_path('mapper', 0, 0)) as lines:
self.assertTrue(any(
'import mrjob' in line or # Python 2
"import 'mrjob'" in line
for line in lines))
with open(runner._task_stderr_path('mapper', 0, 0)) as lines:
self.assertTrue(any(
'#' in line for line in lines))
# should still get expected results
self.assertEqual(
sorted(to_lines(runner.cat_output())),
sorted([b'1\tnull\n', b'1\t"bar"\n']))
开发者ID:Affirm,项目名称:mrjob,代码行数:25,代码来源:test_local.py
示例11: _cat_file
def _cat_file(self, filename):
if is_uri(filename):
# stream from HDFS
cat_args = self._opts['hadoop_bin'] + ['fs', '-cat', filename]
log.debug('> %s' % cmd_line(cat_args))
cat_proc = Popen(cat_args, stdout=PIPE, stderr=PIPE)
def stream():
for line in cat_proc.stdout:
yield line
# there shouldn't be any stderr
for line in cat_proc.stderr:
log.error('STDERR: ' + line)
returncode = cat_proc.wait()
if returncode != 0:
raise CalledProcessError(returncode, cat_args)
return read_file(filename, stream())
else:
# read from local filesystem
return super(HadoopJobRunner, self)._cat_file(filename)
开发者ID:BrandonHaynes,项目名称:mrjob,代码行数:25,代码来源:hadoop.py
示例12: archive_and_unarchive
def archive_and_unarchive(self, extension, archive_template,
added_files=[]):
join = os.path.join
# archive it up
archive_name = 'a.' + extension
variables = dict(archive_name=join('..', archive_name),
files_to_archive='.')
archive_command = [arg % variables for arg in archive_template]
# sometime the relevant command isn't available or doesn't work;
# if so, skip the test
try:
proc = Popen(archive_command, cwd=join(self.tmp_dir, 'a'),
stdout=PIPE, stderr=PIPE)
except OSError as e:
if e.errno == 2:
self.skipTest("No %s command" % archive_command[0])
else:
raise
proc.communicate() # discard output
if proc.returncode != 0:
self.skipTest("Can't run `%s` to create archive." %
cmd_line(archive_command))
# unarchive it into b/
unarchive(join(self.tmp_dir, archive_name), join(self.tmp_dir, 'b'))
self.ensure_expected_results(added_files=added_files)
开发者ID:anirudhreddy92,项目名称:mrjob,代码行数:29,代码来源:test_util.py
示例13: _run_spark_submit
def _run_spark_submit(self, spark_submit_args, env, record_callback):
"""Run the spark submit binary in a subprocess, using a PTY if possible
:param spark_submit_args: spark-submit binary and arguments, as as list
:param env: environment variables, as a dict
:param record_callback: a function that takes a single log4j record
as its argument (see
:py:func:`~mrjob.logs.log4j\
._parse_hadoop_log4j_records)
:return: the subprocess's return code
"""
log.debug('> %s' % cmd_line(spark_submit_args))
log.debug(' with environment: %r' % sorted(env.items()))
returncode = 0 # should always be set, but just in case
# try to use a PTY if it's available
try:
pid, master_fd = pty.fork()
except (AttributeError, OSError):
# no PTYs, just use Popen
# user won't get much feedback for a while, so tell them
# spark-submit is running
log.debug('No PTY available, using Popen() to invoke spark-submit')
step_proc = Popen(
spark_submit_args, stdout=PIPE, stderr=PIPE, env=env)
for line in step_proc.stderr:
for record in _parse_hadoop_log4j_records(
_yield_lines_from_pty_or_pipe(step_proc.stderr)):
record_callback(record)
# there shouldn't be much output on STDOUT
for record in _parse_hadoop_log4j_records(step_proc.stdout):
record_callback(record)
step_proc.stdout.close()
step_proc.stderr.close()
returncode = step_proc.wait()
else:
# we have PTYs
if pid == 0: # we are the child process
os.execvpe(spark_submit_args[0], spark_submit_args, env)
# now this process is no longer Python
else:
log.debug('Invoking spark-submit via PTY')
with os.fdopen(master_fd, 'rb') as master:
for record in _parse_hadoop_log4j_records(
_yield_lines_from_pty_or_pipe(master)):
record_callback(record)
_, returncode = os.waitpid(pid, 0)
return returncode
开发者ID:Affirm,项目名称:mrjob,代码行数:58,代码来源:bin.py
示例14: _setup_wrapper_script_content
def _setup_wrapper_script_content(self, setup, mrjob_tar_gz_name=None):
"""Return a (Bourne) shell script that runs the setup commands and then
executes whatever is passed to it (this will be our mapper/reducer).
We obtain a file lock so that two copies of the setup commands
cannot run simultaneously on the same machine (this helps for running
:command:`make` on a shared source code archive, for example).
"""
out = StringIO()
def writeln(line=''):
out.write(line + '\n')
# we're always going to execute this script as an argument to
# sh, so there's no need to add a shebang (e.g. #!/bin/sh)
writeln('# store $PWD')
writeln('__mrjob_PWD=$PWD')
writeln('')
writeln('# obtain exclusive file lock')
# Basically, we're going to tie file descriptor 9 to our lockfile,
# use a subprocess to obtain a lock (which we somehow inherit too),
# and then release the lock by closing the file descriptor.
# File descriptors 10 and higher are used internally by the shell,
# so 9 is as out-of-the-way as we can get.
writeln('exec 9>/tmp/wrapper.lock.%s' % self._job_name)
# would use flock(1), but it's not always available
writeln("%s -c 'import fcntl; fcntl.flock(9, fcntl.LOCK_EX)'" %
cmd_line(self._opts['python_bin']))
writeln()
writeln('# setup commands')
for cmd in setup:
# reconstruct the command line, substituting $__mrjob_PWD/<name>
# for path dicts
line = ''
for token in cmd:
if isinstance(token, dict):
# it's a path dictionary
line += '$__mrjob_PWD/'
line += pipes.quote(self._working_dir_mgr.name(**token))
else:
# it's raw script
line += token
writeln(line)
writeln()
writeln('# release exclusive file lock')
writeln('exec 9>&-')
writeln()
writeln('# run job from the original working directory')
writeln('cd $__mrjob_PWD')
writeln('"[email protected]"')
return out.getvalue()
开发者ID:duedil-ltd,项目名称:mrjob,代码行数:57,代码来源:runner.py
示例15: _run_job_in_hadoop
def _run_job_in_hadoop(self):
self._counters = []
steps = self._get_steps()
for step_num, step in enumerate(steps):
log.debug('running step %d of %d' % (step_num + 1, len(steps)))
streaming_args = self._streaming_args(step, step_num, len(steps))
log.debug('> %s' % cmd_line(streaming_args))
master, slave = pty.openpty()
step_proc = Popen(streaming_args, stdout=PIPE, stderr=slave)
stderr = os.fdopen(master)
self._process_stderr_from_streaming(step_proc, stderr)
stderr.close()
# there shouldn't be much output to STDOUT
for line in step_proc.stdout:
log.error('STDOUT: ' + line.strip('\n'))
returncode = step_proc.wait()
if returncode == 0:
# parsing needs step number for whole job
self._fetch_counters([step_num + self._start_step_num])
# printing needs step number relevant to this run of mrjob
self.print_counters([step_num + 1])
else:
msg = ('Job failed with return code %d: %s' %
(step_proc.returncode, streaming_args))
log.error(msg)
# look for a Python traceback
cause = self._find_probable_cause_of_failure(
[step_num + self._start_step_num])
if cause:
# log cause, and put it in exception
cause_msg = [] # lines to log and put in exception
cause_msg.append('Probable cause of failure (from %s):' %
cause['log_file_uri'])
cause_msg.extend(line.strip('\n')
for line in cause['lines'])
if cause['input_uri']:
cause_msg.append('(while reading from %s)' %
cause['input_uri'])
for line in cause_msg:
log.error(line)
# add cause_msg to exception message
msg += '\n' + '\n'.join(cause_msg) + '\n'
raise Exception(msg)
raise CalledProcessError(step_proc.returncode, streaming_args)
开发者ID:jsumali,项目名称:mrjob,代码行数:57,代码来源:hadoop.py
示例16: _ssh_launch
def _ssh_launch(self, address, cmd_args, stdin=None):
"""Copy SSH keys if necessary, then launch the given command
over SSH and return a Popen."""
self._ssh_copy_key(address)
args = self._ssh_cmd_args(address, cmd_args)
log.debug(' > ' + cmd_line(args))
try:
return Popen(args, stdout=PIPE, stderr=PIPE, stdin=stdin)
except OSError as ex:
raise IOError(ex.strerror)
开发者ID:Affirm,项目名称:mrjob,代码行数:12,代码来源:ssh.py
示例17: _setup_cmd_content
def _setup_cmd_content(self, setup):
"""Write setup script content to obtain a file lock, run setup
commands in a way that doesn't perturb the script, and then
release the lock and return to the original working directory."""
lines = []
lines.append('# store $PWD')
lines.append('__mrjob_PWD=$PWD')
lines.append('')
lines.append('# obtain exclusive file lock')
# Basically, we're going to tie file descriptor 9 to our lockfile,
# use a subprocess to obtain a lock (which we somehow inherit too),
# and then release the lock by closing the file descriptor.
# File descriptors 10 and higher are used internally by the shell,
# so 9 is as out-of-the-way as we can get.
lines.append('exec 9>/tmp/wrapper.lock.%s' % self._job_key)
# would use flock(1), but it's not always available
lines.append("%s -c 'import fcntl; fcntl.flock(9, fcntl.LOCK_EX)'" %
cmd_line(self._python_bin()))
lines.append('')
lines.append('# setup commands')
# group setup commands so we can redirect their input/output (see
# below). Don't use parens; this would invoke a subshell, which would
# keep us from exporting environment variables to the task.
lines.append('{')
for cmd in setup:
# reconstruct the command line, substituting $__mrjob_PWD/<name>
# for path dicts
line = ' ' # indent, since these commands are in a group
for token in cmd:
if isinstance(token, dict):
# it's a path dictionary
line += '$__mrjob_PWD/'
line += pipes.quote(self._working_dir_mgr.name(**token))
else:
# it's raw script
line += token
lines.append(line)
# redirect setup commands' input/output so they don't interfere
# with the task (see Issue #803).
lines.append('} 0</dev/null 1>&2')
lines.append('')
lines.append('# release exclusive file lock')
lines.append('exec 9>&-')
lines.append('')
lines.append('# run task from the original working directory')
lines.append('cd $__mrjob_PWD')
return lines
开发者ID:Affirm,项目名称:mrjob,代码行数:53,代码来源:bin.py
示例18: _invoke_hadoop
def _invoke_hadoop(self, args, ok_returncodes=None, ok_stderr=None,
return_stdout=False):
"""Run the given hadoop command, raising an exception on non-zero
return code. This only works for commands whose output we don't
care about.
Args:
ok_returncodes -- a list/tuple/set of return codes we expect to
get back from hadoop (e.g. [0,1]). By default, we only expect 0.
If we get an unexpected return code, we raise a CalledProcessError.
ok_stderr -- don't log STDERR or raise CalledProcessError if stderr
matches a regex in this list (even if the returncode is bad)
return_stdout -- return the stdout from the hadoop command rather
than logging it. If this is False, we return the returncode
instead.
"""
if args[0] == 'fs':
if self._opts['hdfs_namenode']:
args = [args[0]] + ['-fs', self._opts['hdfs_namenode']] + args[1:]
args = self._opts['hadoop_bin'] + args
log.debug('> %s' % cmd_line(args))
proc = Popen(args, stdout=PIPE, stderr=PIPE)
stdout, stderr = proc.communicate()
log_func = log.debug if proc.returncode == 0 else log.error
if not return_stdout:
for line in StringIO(stdout):
log_func('STDOUT: ' + line.rstrip('\r\n'))
# check if STDERR is okay
stderr_is_ok = False
if ok_stderr:
for stderr_re in ok_stderr:
if stderr_re.match(stderr):
stderr_is_ok = True
break
if not stderr_is_ok:
for line in StringIO(stderr):
log_func('STDERR: ' + line.rstrip('\r\n'))
ok_returncodes = ok_returncodes or [0]
if not stderr_is_ok and proc.returncode not in ok_returncodes:
raise CalledProcessError(proc.returncode, args)
if return_stdout:
return stdout
else:
return proc.returncode
开发者ID:saraks,项目名称:mrjob,代码行数:53,代码来源:hadoop.py
示例19: _spark_cmdenv
def _spark_cmdenv(self, step_num):
"""Returns a dictionary mapping environment variable to value,
including mapping PYSPARK_PYTHON to self._python_bin()
"""
step = self._get_step(step_num)
cmdenv = {}
if step['type'] in ('spark', 'spark_script'): # not spark_jar
cmdenv = dict(PYSPARK_PYTHON=cmd_line(self._python_bin()))
cmdenv.update(self._opts['cmdenv'])
return cmdenv
开发者ID:okomestudio,项目名称:mrjob,代码行数:12,代码来源:bin.py
示例20: _render_substep
def _render_substep(self, cmd_key, pre_filter_key=None):
if self._steps[cmd_key]:
cmd = self._steps[cmd_key]
if not isinstance(cmd, string_types):
cmd = cmd_line(cmd)
if pre_filter_key and self._steps[pre_filter_key]:
raise ValueError("Cannot specify both %s and %s" % (cmd_key, pre_filter_key))
return {"type": "command", "command": cmd}
else:
substep = {"type": "script"}
if pre_filter_key and self._steps[pre_filter_key]:
substep["pre_filter"] = self._steps[pre_filter_key]
return substep
开发者ID:irskep,项目名称:mrjob,代码行数:13,代码来源:step.py
注:本文中的mrjob.util.cmd_line函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论