本文整理汇总了Python中tests.mockhadoop.add_mock_hadoop_output函数的典型用法代码示例。如果您正苦于以下问题:Python add_mock_hadoop_output函数的具体用法?Python add_mock_hadoop_output怎么用?Python add_mock_hadoop_output使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了add_mock_hadoop_output函数的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_end_to_end
def test_end_to_end(self):
# read from STDIN, a local file, and a remote file
stdin = StringIO('foo\nbar\n')
local_input_path = os.path.join(self.tmp_dir, 'input')
with open(local_input_path, 'w') as local_input_file:
local_input_file.write('bar\nqux\n')
input_to_upload = os.path.join(self.tmp_dir, 'remote_input')
with open(input_to_upload, 'w') as input_to_upload_file:
input_to_upload_file.write('foo\n')
remote_input_path = 'hdfs:///data/foo'
check_call([self.hadoop_bin,
'fs', '-put', input_to_upload, remote_input_path])
# doesn't matter what the intermediate output is; just has to exist.
add_mock_hadoop_output([''])
add_mock_hadoop_output(['1\t"qux"\n2\t"bar"\n', '2\t"foo"\n5\tnull\n'])
mr_job = MRTwoStepJob(['-r', 'hadoop', '-v',
'--no-conf', '--hadoop-arg', '-libjar',
'--hadoop-arg', 'containsJars.jar',
'-', local_input_path, remote_input_path])
mr_job.sandbox(stdin=stdin)
local_tmp_dir = None
results = []
with mr_job.make_runner() as runner:
assert isinstance(runner, HadoopJobRunner)
runner.run()
for line in runner.stream_output():
key, value = mr_job.parse_output_line(line)
results.append((key, value))
local_tmp_dir = runner._get_local_tmp_dir()
# make sure cleanup hasn't happened yet
assert os.path.exists(local_tmp_dir)
assert any(runner.ls(runner.get_output_dir()))
# make sure we're writing to the correct path in HDFS
hdfs_root = os.environ['MOCK_HDFS_ROOT']
assert_equal(sorted(os.listdir(hdfs_root)), ['data', 'user'])
home_dir = os.path.join(hdfs_root, 'user', getpass.getuser())
assert_equal(os.listdir(home_dir), ['tmp'])
assert_equal(os.listdir(os.path.join(home_dir, 'tmp')), ['mrjob'])
assert_equal(runner._opts['hadoop_extra_args'],
['-libjar', 'containsJars.jar'])
assert_equal(sorted(results),
[(1, 'qux'), (2, 'bar'), (2, 'foo'), (5, None)])
# make sure cleanup happens
assert not os.path.exists(local_tmp_dir)
assert not any(runner.ls(runner.get_output_dir()))
开发者ID:nedrocks,项目名称:mrjob,代码行数:56,代码来源:hadoop_test.py
示例2: test_end_to_end
def test_end_to_end(self):
# read from STDIN, a local file, and a remote file
stdin = StringIO("foo\nbar\n")
local_input_path = os.path.join(self.tmp_dir, "input")
with open(local_input_path, "w") as local_input_file:
local_input_file.write("bar\nqux\n")
input_to_upload = os.path.join(self.tmp_dir, "remote_input")
with open(input_to_upload, "w") as input_to_upload_file:
input_to_upload_file.write("foo\n")
remote_input_path = "hdfs:///data/foo"
check_call([self.hadoop_bin, "fs", "-put", input_to_upload, remote_input_path])
# doesn't matter what the intermediate output is; just has to exist.
add_mock_hadoop_output([""])
add_mock_hadoop_output(['1\t"qux"\n2\t"bar"\n', '2\t"foo"\n5\tnull\n'])
mr_job = MRTwoStepJob(["-r", "hadoop", "-v", "--no-conf", "-", local_input_path, remote_input_path])
mr_job.sandbox(stdin=stdin)
local_tmp_dir = None
results = []
with mr_job.make_runner() as runner:
assert isinstance(runner, HadoopJobRunner)
runner.run()
for line in runner.stream_output():
key, value = mr_job.parse_output_line(line)
results.append((key, value))
local_tmp_dir = runner._get_local_tmp_dir()
# make sure cleanup hasn't happened yet
assert os.path.exists(local_tmp_dir)
assert any(runner.ls(runner.get_output_dir()))
# make sure we're writing to the correct path in HDFS
hdfs_root = os.environ["MOCK_HDFS_ROOT"]
assert_equal(sorted(os.listdir(hdfs_root)), ["data", "user"])
home_dir = os.path.join(hdfs_root, "user", getpass.getuser())
assert_equal(os.listdir(home_dir), ["tmp"])
assert_equal(os.listdir(os.path.join(home_dir, "tmp")), ["mrjob"])
assert_equal(sorted(results), [(1, "qux"), (2, "bar"), (2, "foo"), (5, None)])
# make sure cleanup happens
assert not os.path.exists(local_tmp_dir)
assert not any(runner.ls(runner.get_output_dir()))
开发者ID:samuel,项目名称:mrjob,代码行数:49,代码来源:hadoop_test.py
示例3: test_setup_wrapper_script_uses_local_line_endings
def test_setup_wrapper_script_uses_local_line_endings(self):
job = MRTwoStepJob(['-r', 'hadoop', '--setup', 'true'])
job.sandbox(stdin=BytesIO(b''))
add_mock_hadoop_output([b''])
add_mock_hadoop_output([b''])
# tests #1071. Unfortunately, we mostly run these tests on machines
# that use unix line endings anyway. So monitor open() instead
with patch(
'mrjob.runner.open', create=True, side_effect=open) as m_open:
with logger_disabled('mrjob.hadoop'):
with job.make_runner() as runner:
runner.run()
self.assertIn(
call(runner._setup_wrapper_script_path, 'wb'),
m_open.mock_calls)
开发者ID:kartheek6,项目名称:mrjob,代码行数:18,代码来源:test_hadoop.py
示例4: test_input_output_interpolation
def test_input_output_interpolation(self):
fake_jar = os.path.join(self.tmp_dir, 'fake.jar')
open(fake_jar, 'w').close()
input1 = os.path.join(self.tmp_dir, 'input1')
open(input1, 'w').close()
input2 = os.path.join(self.tmp_dir, 'input2')
open(input2, 'w').close()
job = MRJarAndStreaming(
['-r', 'hadoop', '--jar', fake_jar, input1, input2])
job.sandbox()
add_mock_hadoop_output([b'']) # need this for streaming step
with job.make_runner() as runner:
runner.run()
hadoop_cmd_args = get_mock_hadoop_cmd_args()
hadoop_jar_cmd_args = [args for args in hadoop_cmd_args if
args and args[0] == 'jar']
self.assertEqual(len(hadoop_jar_cmd_args), 2)
jar_args, streaming_args = hadoop_jar_cmd_args
self.assertEqual(len(jar_args), 5)
self.assertEqual(jar_args[0], 'jar')
self.assertEqual(jar_args[1], fake_jar)
self.assertEqual(jar_args[2], 'stuff')
# check input is interpolated
input_arg = ','.join(
runner._upload_mgr.uri(path) for path in (input1, input2))
self.assertEqual(jar_args[3], input_arg)
# check output of jar is input of next step
jar_output_arg = jar_args[4]
streaming_input_arg = streaming_args[
streaming_args.index('-input') + 1]
self.assertEqual(jar_output_arg, streaming_input_arg)
开发者ID:kartheek6,项目名称:mrjob,代码行数:40,代码来源:test_hadoop.py
示例5: test_input_output_interpolation
def test_input_output_interpolation(self):
fake_jar = os.path.join(self.tmp_dir, 'fake.jar')
open(fake_jar, 'w').close()
input1 = os.path.join(self.tmp_dir, 'input1')
open(input1, 'w').close()
input2 = os.path.join(self.tmp_dir, 'input2')
open(input2, 'w').close()
job = MRJarAndStreaming(
['-r', 'hadoop', '--jar', fake_jar, input1, input2])
job.sandbox()
add_mock_hadoop_output(['']) # need this for streaming step
with job.make_runner() as runner:
runner.run()
with open(os.environ['MOCK_HADOOP_LOG']) as hadoop_log:
hadoop_jar_lines = [
line for line in hadoop_log if line.startswith('jar ')]
self.assertEqual(len(hadoop_jar_lines), 2)
jar_args = hadoop_jar_lines[0].rstrip().split()
streaming_args = hadoop_jar_lines[1].rstrip().split()
self.assertEqual(len(jar_args), 5)
self.assertEqual(jar_args[0], 'jar')
self.assertEqual(jar_args[1], fake_jar)
self.assertEqual(jar_args[2], 'stuff')
# check input is interpolated
input_arg = ','.join(
runner._upload_mgr.uri(path) for path in (input1, input2))
self.assertEqual(jar_args[3], input_arg)
# check output of jar is input of next step
jar_output_arg = jar_args[4]
streaming_input_arg = streaming_args[
streaming_args.index('-input') + 1]
self.assertEqual(jar_output_arg, streaming_input_arg)
开发者ID:DepengLuan,项目名称:mrjob,代码行数:40,代码来源:test_hadoop.py
示例6: test_input_output_interpolation
def test_input_output_interpolation(self):
fake_jar = os.path.join(self.tmp_dir, "fake.jar")
open(fake_jar, "w").close()
input1 = os.path.join(self.tmp_dir, "input1")
open(input1, "w").close()
input2 = os.path.join(self.tmp_dir, "input2")
open(input2, "w").close()
job = MRJarAndStreaming(["-r", "hadoop", "--jar", fake_jar, input1, input2])
job.sandbox()
add_mock_hadoop_output([""]) # need this for streaming step
with job.make_runner() as runner:
runner.run()
with open(os.environ["MOCK_HADOOP_LOG"]) as hadoop_log:
hadoop_jar_lines = [line for line in hadoop_log if line.startswith("jar ")]
self.assertEqual(len(hadoop_jar_lines), 2)
jar_args = hadoop_jar_lines[0].rstrip().split()
streaming_args = hadoop_jar_lines[1].rstrip().split()
self.assertEqual(len(jar_args), 5)
self.assertEqual(jar_args[0], "jar")
self.assertEqual(jar_args[1], fake_jar)
self.assertEqual(jar_args[2], "stuff")
# check input is interpolated
input_arg = ",".join(runner._upload_mgr.uri(path) for path in (input1, input2))
self.assertEqual(jar_args[3], input_arg)
# check output of jar is input of next step
jar_output_arg = jar_args[4]
streaming_input_arg = streaming_args[streaming_args.index("-input") + 1]
self.assertEqual(jar_output_arg, streaming_input_arg)
开发者ID:swiftserve,项目名称:mrjob,代码行数:36,代码来源:test_hadoop.py
示例7: _test_end_to_end
def _test_end_to_end(self, args=()):
# read from STDIN, a local file, and a remote file
stdin = BytesIO(b'foo\nbar\n')
local_input_path = os.path.join(self.tmp_dir, 'input')
with open(local_input_path, 'w') as local_input_file:
local_input_file.write('bar\nqux\n')
input_to_upload = os.path.join(self.tmp_dir, 'remote_input')
with open(input_to_upload, 'w') as input_to_upload_file:
input_to_upload_file.write('foo\n')
remote_input_path = 'hdfs:///data/foo'
check_call([self.hadoop_bin,
'fs', '-put', input_to_upload, remote_input_path])
# add counters
add_mock_hadoop_counters({'foo': {'bar': 23}})
add_mock_hadoop_counters({'baz': {'qux': 42}})
# doesn't matter what the intermediate output is; just has to exist.
add_mock_hadoop_output([b''])
add_mock_hadoop_output([b'1\t"qux"\n2\t"bar"\n',
b'2\t"foo"\n5\tnull\n'])
mr_job = MRTwoStepJob(['-r', 'hadoop', '-v',
'--no-conf', '--hadoop-arg', '-libjar',
'--hadoop-arg', 'containsJars.jar'] + list(args)
+ ['-', local_input_path, remote_input_path]
+ ['--jobconf', 'x=y'])
mr_job.sandbox(stdin=stdin)
local_tmp_dir = None
results = []
with mr_job.make_runner() as runner:
assert isinstance(runner, HadoopJobRunner)
runner.run()
for line in runner.stream_output():
key, value = mr_job.parse_output_line(line)
results.append((key, value))
local_tmp_dir = runner._get_local_tmp_dir()
# make sure cleanup hasn't happened yet
assert os.path.exists(local_tmp_dir)
assert any(runner.fs.ls(runner.get_output_dir()))
# make sure we're writing to the correct path in HDFS
hdfs_root = get_mock_hdfs_root()
self.assertEqual(sorted(os.listdir(hdfs_root)), ['data', 'user'])
home_dir = os.path.join(hdfs_root, 'user', getpass.getuser())
self.assertEqual(os.listdir(home_dir), ['tmp'])
self.assertEqual(os.listdir(os.path.join(home_dir, 'tmp')),
['mrjob'])
self.assertEqual(runner._opts['hadoop_extra_args'],
['-libjar', 'containsJars.jar'])
# make sure mrjob.tar.gz is was uploaded
self.assertTrue(os.path.exists(runner._mrjob_tar_gz_path))
self.assertIn(runner._mrjob_tar_gz_path,
runner._upload_mgr.path_to_uri())
# make sure setup script exists, and mrjob.tar.gz is added
# to PYTHONPATH in it
self.assertTrue(os.path.exists(runner._setup_wrapper_script_path))
self.assertIn(runner._setup_wrapper_script_path,
runner._upload_mgr.path_to_uri())
mrjob_tar_gz_name = runner._working_dir_mgr.name(
'archive', runner._mrjob_tar_gz_path)
with open(runner._setup_wrapper_script_path) as wrapper:
self.assertTrue(any(
('export PYTHONPATH' in line and mrjob_tar_gz_name in line)
for line in wrapper))
self.assertEqual(runner.counters(),
[{'foo': {'bar': 23}},
{'baz': {'qux': 42}}])
self.assertEqual(sorted(results),
[(1, 'qux'), (2, 'bar'), (2, 'foo'), (5, None)])
# make sure we called hadoop the way we expected
hadoop_cmd_args = get_mock_hadoop_cmd_args()
jar_cmd_args = [cmd_args for cmd_args in hadoop_cmd_args
if cmd_args[:1] == ['jar']]
self.assertEqual(len(jar_cmd_args), 2)
step_0_args, step_1_args = jar_cmd_args
# check input/output format
self.assertIn('-inputformat', step_0_args)
self.assertNotIn('-outputformat', step_0_args)
self.assertNotIn('-inputformat', step_1_args)
self.assertIn('-outputformat', step_1_args)
# make sure -libjar extra arg comes before -mapper
for args in (step_0_args, step_1_args):
self.assertIn('-libjar', args)
self.assertIn('-mapper', args)
#.........这里部分代码省略.........
开发者ID:kartheek6,项目名称:mrjob,代码行数:101,代码来源:test_hadoop.py
示例8: _test_end_to_end
def _test_end_to_end(self, args=()):
# read from STDIN, a local file, and a remote file
stdin = StringIO('foo\nbar\n')
local_input_path = os.path.join(self.tmp_dir, 'input')
with open(local_input_path, 'w') as local_input_file:
local_input_file.write('bar\nqux\n')
input_to_upload = os.path.join(self.tmp_dir, 'remote_input')
with open(input_to_upload, 'w') as input_to_upload_file:
input_to_upload_file.write('foo\n')
remote_input_path = 'hdfs:///data/foo'
check_call([self.hadoop_bin,
'fs', '-put', input_to_upload, remote_input_path])
# doesn't matter what the intermediate output is; just has to exist.
add_mock_hadoop_output([''])
add_mock_hadoop_output(['1\t"qux"\n2\t"bar"\n', '2\t"foo"\n5\tnull\n'])
mr_job = MRTwoStepJob(['-r', 'hadoop', '-v',
'--no-conf', '--hadoop-arg', '-libjar',
'--hadoop-arg', 'containsJars.jar'] + list(args)
+ ['-', local_input_path, remote_input_path]
+ ['--jobconf', 'x=y'])
mr_job.sandbox(stdin=stdin)
local_tmp_dir = None
results = []
with mr_job.make_runner() as runner:
assert isinstance(runner, HadoopJobRunner)
runner.run()
for line in runner.stream_output():
key, value = mr_job.parse_output_line(line)
results.append((key, value))
local_tmp_dir = runner._get_local_tmp_dir()
# make sure cleanup hasn't happened yet
assert os.path.exists(local_tmp_dir)
assert any(runner.ls(runner.get_output_dir()))
# make sure we're writing to the correct path in HDFS
hdfs_root = os.environ['MOCK_HDFS_ROOT']
self.assertEqual(sorted(os.listdir(hdfs_root)), ['data', 'user'])
home_dir = os.path.join(hdfs_root, 'user', getpass.getuser())
self.assertEqual(os.listdir(home_dir), ['tmp'])
self.assertEqual(os.listdir(os.path.join(home_dir, 'tmp')),
['mrjob'])
self.assertEqual(runner._opts['hadoop_extra_args'],
['-libjar', 'containsJars.jar'])
# make sure mrjob.tar.gz is uploaded and in PYTHONPATH
assert runner._mrjob_tar_gz_path
mrjob_tar_gz_file_dicts = [
file_dict for file_dict in runner._files
if file_dict['path'] == runner._mrjob_tar_gz_path]
self.assertEqual(len(mrjob_tar_gz_file_dicts), 1)
mrjob_tar_gz_file_dict = mrjob_tar_gz_file_dicts[0]
assert mrjob_tar_gz_file_dict['name']
pythonpath = runner._get_cmdenv()['PYTHONPATH']
self.assertIn(mrjob_tar_gz_file_dict['name'],
pythonpath.split(':'))
self.assertEqual(sorted(results),
[(1, 'qux'), (2, 'bar'), (2, 'foo'), (5, None)])
# make sure we called hadoop the way we expected
with open(os.environ['MOCK_HADOOP_LOG']) as mock_log:
hadoop_cmd_args = [shlex.split(line) for line in mock_log]
jar_cmd_args = [args for args in hadoop_cmd_args
if args[:1] == ['jar']]
self.assertEqual(len(jar_cmd_args), 2)
step_0_args, step_1_args = jar_cmd_args
# check input/output format
self.assertIn('-inputformat', step_0_args)
self.assertNotIn('-outputformat', step_0_args)
self.assertNotIn('-inputformat', step_1_args)
self.assertIn('-outputformat', step_1_args)
# make sure -libjar extra arg comes before -mapper
for args in (step_0_args, step_1_args):
self.assertIn('-libjar', args)
self.assertIn('-mapper', args)
self.assertLess(args.index('-libjar'), args.index('-mapper'))
# make sure -jobconf made it through
self.assertIn('-D', step_0_args)
# make sure cleanup happens
assert not os.path.exists(local_tmp_dir)
assert not any(runner.ls(runner.get_output_dir()))
开发者ID:derwiki,项目名称:mrjob,代码行数:96,代码来源:test_hadoop.py
示例9: _test_end_to_end
def _test_end_to_end(self, args=()):
# read from STDIN, a local file, and a remote file
stdin = StringIO("foo\nbar\n")
local_input_path = os.path.join(self.tmp_dir, "input")
with open(local_input_path, "w") as local_input_file:
local_input_file.write("bar\nqux\n")
input_to_upload = os.path.join(self.tmp_dir, "remote_input")
with open(input_to_upload, "w") as input_to_upload_file:
input_to_upload_file.write("foo\n")
remote_input_path = "hdfs:///data/foo"
check_call([self.hadoop_bin, "fs", "-put", input_to_upload, remote_input_path])
# doesn't matter what the intermediate output is; just has to exist.
add_mock_hadoop_output([""])
add_mock_hadoop_output(['1\t"qux"\n2\t"bar"\n', '2\t"foo"\n5\tnull\n'])
mr_job = MRTwoStepJob(
["-r", "hadoop", "-v", "--no-conf", "--hadoop-arg", "-libjar", "--hadoop-arg", "containsJars.jar"]
+ list(args)
+ ["-", local_input_path, remote_input_path]
+ ["--jobconf", "x=y"]
)
mr_job.sandbox(stdin=stdin)
local_tmp_dir = None
results = []
with mr_job.make_runner() as runner:
assert isinstance(runner, HadoopJobRunner)
runner.run()
for line in runner.stream_output():
key, value = mr_job.parse_output_line(line)
results.append((key, value))
local_tmp_dir = runner._get_local_tmp_dir()
# make sure cleanup hasn't happened yet
assert os.path.exists(local_tmp_dir)
assert any(runner.ls(runner.get_output_dir()))
# make sure we're writing to the correct path in HDFS
hdfs_root = os.environ["MOCK_HDFS_ROOT"]
self.assertEqual(sorted(os.listdir(hdfs_root)), ["data", "user"])
home_dir = os.path.join(hdfs_root, "user", getpass.getuser())
self.assertEqual(os.listdir(home_dir), ["tmp"])
self.assertEqual(os.listdir(os.path.join(home_dir, "tmp")), ["mrjob"])
self.assertEqual(runner._opts["hadoop_extra_args"], ["-libjar", "containsJars.jar"])
# make sure mrjob.tar.gz is was uploaded
self.assertTrue(os.path.exists(runner._mrjob_tar_gz_path))
self.assertIn(runner._mrjob_tar_gz_path, runner._upload_mgr.path_to_uri())
# make sure setup script exists, and mrjob.tar.gz is added
# to PYTHONPATH in it
self.assertTrue(os.path.exists(runner._setup_wrapper_script_path))
self.assertIn(runner._setup_wrapper_script_path, runner._upload_mgr.path_to_uri())
mrjob_tar_gz_name = runner._working_dir_mgr.name("archive", runner._mrjob_tar_gz_path)
with open(runner._setup_wrapper_script_path) as wrapper:
self.assertTrue(any(("export PYTHONPATH" in line and mrjob_tar_gz_name in line) for line in wrapper))
self.assertEqual(sorted(results), [(1, "qux"), (2, "bar"), (2, "foo"), (5, None)])
# make sure we called hadoop the way we expected
with open(os.environ["MOCK_HADOOP_LOG"]) as mock_log:
hadoop_cmd_args = [shlex_split(cmd) for cmd in mock_log]
jar_cmd_args = [cmd_args for cmd_args in hadoop_cmd_args if cmd_args[:1] == ["jar"]]
self.assertEqual(len(jar_cmd_args), 2)
step_0_args, step_1_args = jar_cmd_args
# check input/output format
self.assertIn("-inputformat", step_0_args)
self.assertNotIn("-outputformat", step_0_args)
self.assertNotIn("-inputformat", step_1_args)
self.assertIn("-outputformat", step_1_args)
# make sure -libjar extra arg comes before -mapper
for args in (step_0_args, step_1_args):
self.assertIn("-libjar", args)
self.assertIn("-mapper", args)
self.assertLess(args.index("-libjar"), args.index("-mapper"))
# make sure -jobconf made it through
self.assertIn("-D", step_0_args)
self.assertIn("x=y", step_0_args)
self.assertIn("-D", step_1_args)
# job overrides jobconf in step 1
self.assertIn("x=z", step_1_args)
# make sure cleanup happens
assert not os.path.exists(local_tmp_dir)
assert not any(runner.ls(runner.get_output_dir()))
开发者ID:swiftserve,项目名称:mrjob,代码行数:94,代码来源:test_hadoop.py
示例10: test_end_to_end
def test_end_to_end(self):
# read from STDIN, a local file, and a remote file
stdin = StringIO("foo\nbar\n")
local_input_path = os.path.join(self.tmp_dir, "input")
with open(local_input_path, "w") as local_input_file:
local_input_file.write("bar\nqux\n")
input_to_upload = os.path.join(self.tmp_dir, "remote_input")
with open(input_to_upload, "w") as input_to_upload_file:
input_to_upload_file.write("foo\n")
remote_input_path = "hdfs:///data/foo"
check_call([self.hadoop_bin, "fs", "-put", input_to_upload, remote_input_path])
# doesn't matter what the intermediate output is; just has to exist.
add_mock_hadoop_output([""])
add_mock_hadoop_output(['1\t"qux"\n2\t"bar"\n', '2\t"foo"\n5\tnull\n'])
mr_job = MRTwoStepJob(
[
"-r",
"hadoop",
"-v",
"--no-conf",
"--hadoop-arg",
"-libjar",
"--hadoop-arg",
"containsJars.jar",
"-",
local_input_path,
remote_input_path,
"--hadoop-input-format",
"FooFormat",
"--hadoop-output-format",
"BarFormat",
]
)
mr_job.sandbox(stdin=stdin)
local_tmp_dir = None
results = []
with mr_job.make_runner() as runner:
assert isinstance(runner, HadoopJobRunner)
runner.run()
for line in runner.stream_output():
key, value = mr_job.parse_output_line(line)
results.append((key, value))
local_tmp_dir = runner._get_local_tmp_dir()
# make sure cleanup hasn't happened yet
assert os.path.exists(local_tmp_dir)
assert any(runner.ls(runner.get_output_dir()))
# make sure we're writing to the correct path in HDFS
hdfs_root = os.environ["MOCK_HDFS_ROOT"]
assert_equal(sorted(os.listdir(hdfs_root)), ["data", "user"])
home_dir = os.path.join(hdfs_root, "user", getpass.getuser())
assert_equal(os.listdir(home_dir), ["tmp"])
assert_equal(os.listdir(os.path.join(home_dir, "tmp")), ["mrjob"])
assert_equal(runner._opts["hadoop_extra_args"], ["-libjar", "containsJars.jar"])
# make sure mrjob.tar.gz is uploaded and in PYTHONPATH
assert runner._mrjob_tar_gz_path
mrjob_tar_gz_file_dicts = [
file_dict for file_dict in runner._files if file_dict["path"] == runner._mrjob_tar_gz_path
]
assert_equal(len(mrjob_tar_gz_file_dicts), 1)
mrjob_tar_gz_file_dict = mrjob_tar_gz_file_dicts[0]
assert mrjob_tar_gz_file_dict["name"]
pythonpath = runner._get_cmdenv()["PYTHONPATH"]
assert_in(mrjob_tar_gz_file_dict["name"], pythonpath.split(":"))
assert_equal(sorted(results), [(1, "qux"), (2, "bar"), (2, "foo"), (5, None)])
# make sure we called hadoop the way we expected
with open(os.environ["MOCK_HADOOP_LOG"]) as mock_log:
hadoop_cmd_args = [shlex.split(line) for line in mock_log]
jar_cmd_args = [args for args in hadoop_cmd_args if args[:1] == ["jar"]]
assert_equal(len(jar_cmd_args), 2)
step_0_args, step_1_args = jar_cmd_args
# check input/output format
assert_in("-inputformat", step_0_args)
assert_not_in("-outputformat", step_0_args)
assert_not_in("-inputformat", step_1_args)
assert_in("-outputformat", step_1_args)
# make sure -libjar extra arg comes before -mapper
for args in (step_0_args, step_1_args):
assert_in("-libjar", args)
assert_in("-mapper", args)
assert_lt(args.index("-libjar"), args.index("-mapper"))
# make sure cleanup happens
assert not os.path.exists(local_tmp_dir)
#.........这里部分代码省略.........
开发者ID:hblanks,项目名称:mrjob,代码行数:101,代码来源:hadoop_test.py
注:本文中的tests.mockhadoop.add_mock_hadoop_output函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论