本文整理汇总了Python中tests.util.filesystem_utils.get_fs_path函数的典型用法代码示例。如果您正苦于以下问题:Python get_fs_path函数的具体用法?Python get_fs_path怎么用?Python get_fs_path使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_fs_path函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setup_method
def setup_method(self, method):
self.cleanup_db('impala_test_desc_db1')
self.cleanup_db('impala_test_desc_db2')
self.cleanup_db('impala_test_desc_db3')
self.cleanup_db('impala_test_desc_db4')
self.cleanup_db('hive_test_desc_db')
self.cleanup_db('hive_test_db')
self.client.execute("create database if not exists impala_test_desc_db1")
self.client.execute(
"create database if not exists impala_test_desc_db2 "
"comment \"test comment\"")
self.client.execute(
"create database if not exists impala_test_desc_db3 "
"location \"" + get_fs_path("/testdb") + "\"")
self.client.execute(
"create database if not exists impala_test_desc_db4 "
"comment \"test comment\" location \"" + get_fs_path("/test2.db") + "\"")
self.client.execute(
"create table if not exists impala_test_desc_db1.complex_types_tbl ("
"map_array_struct_col map<string, array<struct<f1:int, f2:string>>>, "
"struct_array_struct_col "
"struct<f1:int, f2:array<struct<f11:bigint, f12:string>>>, "
"map_array_map_struct_col "
"map<string, array<map<string, struct<f1:string, f2:int>>>>)")
开发者ID:ibmsoe,项目名称:ImpalaPPC,代码行数:26,代码来源:test_metadata_query_statements.py
示例2: test_hive_udfs_missing_jar
def test_hive_udfs_missing_jar(self, vector):
""" IMPALA-2365: Impalad shouldn't crash if the udf jar isn't present
on HDFS"""
# Copy hive-exec.jar to a temporary file
jar_path = get_fs_path("/test-warehouse/" + get_random_id(5) + ".jar")
hive_jar = get_fs_path("/test-warehouse/hive-exec.jar")
check_call(["hadoop", "fs", "-cp", hive_jar, jar_path])
drop_fn_stmt = "drop function if exists default.pi_missing_jar()"
create_fn_stmt = "create function default.pi_missing_jar() returns double \
location '%s' symbol='org.apache.hadoop.hive.ql.udf.UDFPI'" % jar_path
cluster = ImpalaCluster()
impalad = cluster.get_any_impalad()
client = impalad.service.create_beeswax_client()
# Create and drop functions with sync_ddl to make sure they are reflected
# in every impalad.
exec_option = vector.get_value('exec_option')
exec_option['sync_ddl'] = 1
self.execute_query_expect_success(client, drop_fn_stmt, exec_option)
self.execute_query_expect_success(client, create_fn_stmt, exec_option)
# Delete the udf jar
check_call(["hadoop", "fs", "-rm", jar_path])
different_impalad = cluster.get_different_impalad(impalad)
client = different_impalad.service.create_beeswax_client()
# Run a query using the udf from an impalad other than the one
# we used to create the function. This is to bypass loading from
# the cache
try:
self.execute_query_using_client(client,
"select default.pi_missing_jar()", vector)
assert False, "Query expected to fail"
except ImpalaBeeswaxException, e:
assert "Failed to get file info" in str(e)
开发者ID:ibmsoe,项目名称:ImpalaPPC,代码行数:35,代码来源:test_udfs.py
示例3: test_add_delete_data_to_hdfs_and_refresh
def test_add_delete_data_to_hdfs_and_refresh(self, vector, unique_database):
"""
Data added/deleted directly in HDFS is visible in impala after refresh of
partition.
"""
table_name = unique_database + '.' + "partition_test_table"
table_location = get_fs_path("/test-warehouse/%s" % unique_database)
file_name = "alltypes.parq"
src_file = get_fs_path("/test-warehouse/alltypesagg_parquet/year=2010/month=1/"
"day=9/*.parq")
file_num_rows = 1000
self.client.execute("""
create table %s like functional.alltypes stored as parquet
location '%s'
""" % (table_name, table_location))
self.client.execute("alter table %s add partition (year=2010, month=1)" %
table_name)
self.client.execute("refresh %s" % table_name)
# Check that there is no data in table
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
dst_path = "%s/year=2010/month=1/%s" % (table_location, file_name)
check_call(["hadoop", "fs", "-cp", "-f", src_file, dst_path], shell=False)
# Check that data added is not visible before refresh
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
# Chech that data is visible after refresh
self.client.execute("refresh %s partition (year=2010, month=1)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(file_num_rows)]
# Check that after deleting the file and refreshing, it returns zero rows
check_call(["hadoop", "fs", "-rm", dst_path], shell=False)
self.client.execute("refresh %s partition (year=2010, month=1)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
开发者ID:michaelhkw,项目名称:Impala,代码行数:35,代码来源:test_refresh_partition.py
示例4: test_java_udfs
def test_java_udfs(self, vector):
self.client.execute("create database if not exists java_udfs_test "
"location '%s'" % get_fs_path('/test-warehouse/java_udf_test.db'))
self.client.execute("create database if not exists udf_test "
"location '%s'" % get_fs_path('/test-warehouse/udf_test.db'))
try:
self.run_test_case('QueryTest/load-java-udfs', vector)
self.run_test_case('QueryTest/java-udf', vector)
finally:
self.client.execute("drop database if exists java_udfs_test cascade")
self.client.execute("drop database if exists udf_test cascade")
开发者ID:cchanning,项目名称:incubator-impala,代码行数:11,代码来源:test_udfs.py
示例5: test_insert_alter_partition_location
def test_insert_alter_partition_location(self):
"""Test that inserts after changing the location of a partition work correctly,
including the creation of a non-existant partition dir"""
PART_DIR = "tmp/test_insert_alter_partition_location"
QUALIFIED_PART_DIR = get_fs_path('/' + PART_DIR)
TBL_NAME = "functional.insert_alter_partition_location"
self.execute_query_expect_success(self.client, "DROP TABLE IF EXISTS %s" % TBL_NAME)
self.hdfs_client.delete_file_dir(PART_DIR, recursive=True)
self.execute_query_expect_success(self.client,
"CREATE TABLE %s (c int) PARTITIONED BY (p int)" % TBL_NAME)
self.execute_query_expect_success(self.client,
"ALTER TABLE %s ADD PARTITION(p=1)" % TBL_NAME)
self.execute_query_expect_success(self.client,
"ALTER TABLE %s PARTITION(p=1) SET LOCATION '%s'" %
(TBL_NAME, QUALIFIED_PART_DIR))
self.execute_query_expect_success(self.client,
"INSERT OVERWRITE %s PARTITION(p=1) VALUES(1)" % TBL_NAME)
result = self.execute_query_expect_success(self.client,
"SELECT COUNT(*) FROM %s" % TBL_NAME)
assert int(result.get_data()) == 1
# Should have created the partition dir, which should contain exactly one file (not in
# a subdirectory)
ls = self.hdfs_client.list_dir(PART_DIR)
assert len(ls['FileStatuses']['FileStatus']) == 1
开发者ID:cloudera,项目名称:recordservice,代码行数:28,代码来源:test_insert_behaviour.py
示例6: test_def_level_encoding
def test_def_level_encoding(self, vector, unique_database):
"""IMPALA-3376: Tests that parquet files are written to HDFS correctly by generating a
parquet table and running the parquet-reader tool on it, which performs sanity
checking, such as that the correct number of definition levels were encoded.
"""
table_name = "test_hdfs_parquet_table_writer"
qualified_table_name = "%s.%s" % (unique_database, table_name)
self.execute_query("drop table if exists %s" % qualified_table_name)
self.execute_query("create table %s stored as parquet as select l_linenumber from "
"tpch_parquet.lineitem limit 180000" % qualified_table_name)
tmp_dir = make_tmp_dir()
try:
hdfs_file = get_fs_path('/test-warehouse/%s.db/%s/*.parq'
% (unique_database, table_name))
check_call(['hdfs', 'dfs', '-copyToLocal', hdfs_file, tmp_dir])
for root, subdirs, files in os.walk(tmp_dir):
for f in files:
if not f.endswith('parq'):
continue
check_call([os.path.join(impalad_basedir, 'util/parquet-reader'), '--file',
os.path.join(tmp_dir, str(f))])
finally:
self.execute_query("drop table %s" % qualified_table_name)
rmtree(tmp_dir)
开发者ID:cchanning,项目名称:incubator-impala,代码行数:26,代码来源:test_insert_parquet.py
示例7: test_sorting_columns
def test_sorting_columns(self, vector, unique_database, tmpdir):
"""Tests that RowGroup::sorting_columns gets populated when specifying a sortby()
insert hint."""
source_table = "functional_parquet.alltypessmall"
target_table = "test_write_sorting_columns"
qualified_target_table = "{0}.{1}".format(unique_database, target_table)
hdfs_path = get_fs_path("/test-warehouse/{0}.db/{1}/".format(unique_database,
target_table))
# Create table
# TODO: Simplify once IMPALA-4167 (insert hints in CTAS) has been fixed.
query = "create table {0} like {1} stored as parquet".format(qualified_target_table,
source_table)
self.execute_query(query)
# Insert data
query = ("insert into {0} partition(year, month) /* +sortby(int_col, id) */ "
"select * from {1}").format(qualified_target_table, source_table)
self.execute_query(query)
# Download hdfs files and extract rowgroup metadata
row_groups = []
check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath])
for root, subdirs, files in os.walk(tmpdir.strpath):
for f in files:
parquet_file = os.path.join(root, str(f))
file_meta_data = get_parquet_metadata(parquet_file)
row_groups.extend(file_meta_data.row_groups)
# Verify that the files have the sorted_columns set
expected = [SortingColumn(4, False, False), SortingColumn(0, False, False)]
for row_group in row_groups:
assert row_group.sorting_columns == expected
开发者ID:michaelhkw,项目名称:Impala,代码行数:34,代码来源:test_insert_parquet.py
示例8: test_insert_alter_partition_location
def test_insert_alter_partition_location(self, unique_database):
"""Test that inserts after changing the location of a partition work correctly,
including the creation of a non-existant partition dir"""
part_dir = "tmp/{0}".format(unique_database)
qualified_part_dir = get_fs_path('/' + part_dir)
table_name = "`{0}`.`insert_alter_partition_location`".format(unique_database)
self.execute_query_expect_success(self.client, "DROP TABLE IF EXISTS %s" % table_name)
self.filesystem_client.delete_file_dir(part_dir, recursive=True)
self.execute_query_expect_success(
self.client,
"CREATE TABLE %s (c int) PARTITIONED BY (p int)" % table_name)
self.execute_query_expect_success(
self.client,
"ALTER TABLE %s ADD PARTITION(p=1)" % table_name)
self.execute_query_expect_success(
self.client,
"ALTER TABLE %s PARTITION(p=1) SET LOCATION '%s'" % (table_name,
qualified_part_dir))
self.execute_query_expect_success(
self.client,
"INSERT OVERWRITE %s PARTITION(p=1) VALUES(1)" % table_name)
result = self.execute_query_expect_success(
self.client,
"SELECT COUNT(*) FROM %s" % table_name)
assert int(result.get_data()) == 1
# Should have created the partition dir, which should contain exactly one file (not in
# a subdirectory)
assert len(self.filesystem_client.ls(part_dir)) == 1
开发者ID:cchanning,项目名称:incubator-impala,代码行数:32,代码来源:test_insert_behaviour.py
示例9: test_insert_parquet_verify_size
def test_insert_parquet_verify_size(self, vector):
# Test to verify that the result file size is close to what we expect.i
TBL = "parquet_insert_size"
DROP = "drop table if exists {0}".format(TBL)
CREATE = ("create table parquet_insert_size like tpch_parquet.orders"
" stored as parquet location '{0}/{1}'".format(WAREHOUSE, TBL))
QUERY = "insert overwrite {0} select * from tpch.orders".format(TBL)
DIR = get_fs_path("test-warehouse/{0}/".format(TBL))
BLOCK_SIZE = 40 * 1024 * 1024
self.execute_query(DROP)
self.execute_query(CREATE)
vector.get_value('exec_option')['PARQUET_FILE_SIZE'] = BLOCK_SIZE
vector.get_value('exec_option')['COMPRESSION_CODEC'] =\
vector.get_value('compression_codec')
vector.get_value('exec_option')['num_nodes'] = 1
self.execute_query(QUERY, vector.get_value('exec_option'))
# Get the files in hdfs and verify. There can be at most 1 file that is smaller
# that the BLOCK_SIZE. The rest should be within 80% of it and not over.
found_small_file = False
sizes = self.filesystem_client.get_all_file_sizes(DIR)
for size in sizes:
assert size < BLOCK_SIZE, "File size greater than expected.\
Expected: {0}, Got: {1}".format(BLOCK_SIZE, size)
if size < BLOCK_SIZE * 0.80:
assert found_small_file == False
found_small_file = True
开发者ID:mbrukman,项目名称:apache-impala,代码行数:29,代码来源:test_insert_parquet.py
示例10: test_insert_parquet_verify_size
def test_insert_parquet_verify_size(self, vector, unique_database):
# Test to verify that the result file size is close to what we expect.
tbl_name = "parquet_insert_size"
fq_tbl_name = unique_database + "." + tbl_name
location = get_fs_path("test-warehouse/{0}.db/{1}/"
.format(unique_database, tbl_name))
create = ("create table {0} like tpch_parquet.orders stored as parquet"
.format(fq_tbl_name, location))
query = "insert overwrite {0} select * from tpch.orders".format(fq_tbl_name)
block_size = 40 * 1024 * 1024
self.execute_query(create)
vector.get_value('exec_option')['PARQUET_FILE_SIZE'] = block_size
vector.get_value('exec_option')['COMPRESSION_CODEC'] =\
vector.get_value('compression_codec')
vector.get_value('exec_option')['num_nodes'] = 1
self.execute_query(query, vector.get_value('exec_option'))
# Get the files in hdfs and verify. There can be at most 1 file that is smaller
# that the block_size. The rest should be within 80% of it and not over.
found_small_file = False
sizes = self.filesystem_client.get_all_file_sizes(location)
for size in sizes:
assert size < block_size, "File size greater than expected.\
Expected: {0}, Got: {1}".format(block_size, size)
if size < block_size * 0.80:
assert not found_small_file
found_small_file = True
开发者ID:apache,项目名称:incubator-impala,代码行数:28,代码来源:test_insert_parquet.py
示例11: test_write_statistics_multiple_row_groups
def test_write_statistics_multiple_row_groups(self, vector, unique_database, tmpdir):
"""Test that writing multiple row groups works as expected. This is done by inserting
into a table using the SORT BY clause and then making sure that the min and max values
of row groups don't overlap."""
source_table = "tpch_parquet.orders"
target_table = "test_hdfs_parquet_table_writer"
qualified_target_table = "{0}.{1}".format(unique_database, target_table)
hdfs_path = get_fs_path("/test-warehouse/{0}.db/{1}/".format(
unique_database, target_table))
# Insert a large amount of data on a single backend with a limited parquet file size.
# This will result in several files being written, exercising code that tracks
# statistics for row groups.
query = "create table {0} sort by (o_orderkey) like {1} stored as parquet".format(
qualified_target_table, source_table)
self.execute_query(query, vector.get_value('exec_option'))
query = ("insert into {0} select * from {1}").format(
qualified_target_table, source_table)
vector.get_value('exec_option')['num_nodes'] = 1
vector.get_value('exec_option')['parquet_file_size'] = 8 * 1024 * 1024
self.execute_query(query, vector.get_value('exec_option'))
# Get all stats for the o_orderkey column
row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path,
tmpdir.strpath)
assert len(row_group_stats) > 1
orderkey_stats = [s[0] for s in row_group_stats]
# Make sure that they don't overlap by ordering by the min value, then looking at
# boundaries.
orderkey_stats.sort(key = lambda s: s.min)
for l, r in zip(orderkey_stats, orderkey_stats[1:]):
assert l.max <= r.min
开发者ID:apache,项目名称:incubator-impala,代码行数:33,代码来源:test_insert_parquet.py
示例12: test_sorting_columns
def test_sorting_columns(self, vector, unique_database, tmpdir):
"""Tests that RowGroup::sorting_columns gets populated when the table has SORT BY
columns."""
source_table = "functional_parquet.alltypessmall"
target_table = "test_write_sorting_columns"
qualified_target_table = "{0}.{1}".format(unique_database, target_table)
hdfs_path = get_fs_path("/test-warehouse/{0}.db/{1}/".format(unique_database,
target_table))
# Create table
query = "create table {0} sort by (int_col, id) like {1} stored as parquet".format(
qualified_target_table, source_table)
self.execute_query(query)
# Insert data
query = ("insert into {0} partition(year, month) select * from {1}").format(
qualified_target_table, source_table)
self.execute_query(query)
# Download hdfs files and extract rowgroup metadata
file_metadata_list = get_parquet_metadata_from_hdfs_folder(hdfs_path, tmpdir.strpath)
row_groups = []
for file_metadata in file_metadata_list:
row_groups.extend(file_metadata.row_groups)
# Verify that the files have the sorted_columns set
expected = [SortingColumn(4, False, False), SortingColumn(0, False, False)]
for row_group in row_groups:
assert row_group.sorting_columns == expected
开发者ID:apache,项目名称:incubator-impala,代码行数:30,代码来源:test_insert_parquet.py
示例13: _ctas_table_and_verify_stats
def _ctas_table_and_verify_stats(self, vector, unique_database, source_table,
expected_values, hive_skip_col_idx = None):
"""Copies 'source_table' into a parquet table and makes sure that the row group
statistics in the resulting parquet file match those in 'expected_values'. The
comparison is performed against both Hive and Impala. For Hive, columns indexed by
'hive_skip_col_idx' are excluded from the verification of the expected values.
"""
table_name = "test_hdfs_parquet_table_writer"
qualified_table_name = "{0}.{1}".format(unique_database, table_name)
hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
table_name))
# Validate against Hive.
self.execute_query("drop table if exists {0}".format(qualified_table_name))
self.run_stmt_in_hive("create table {0} stored as parquet as select * from "
"{1}".format(qualified_table_name, source_table))
self.execute_query("invalidate metadata {0}".format(qualified_table_name))
self._validate_min_max_stats(hdfs_path, expected_values, hive_skip_col_idx)
# Validate against Impala. Setting exec_single_node_rows_threshold and adding a limit
# clause ensures that the query is executed on the coordinator, resulting in a single
# parquet file being written.
num_rows = self.execute_scalar("select count(*) from {0}".format(source_table))
self.execute_query("drop table {0}".format(qualified_table_name))
query = ("create table {0} stored as parquet as select * from {1} limit "
"{2}").format(qualified_table_name, source_table, num_rows)
vector.get_value('exec_option')['EXEC_SINGLE_NODE_ROWS_THRESHOLD'] = num_rows
self.execute_query(query, vector.get_value('exec_option'))
self._validate_min_max_stats(hdfs_path, expected_values)
开发者ID:michaelhkw,项目名称:Impala,代码行数:29,代码来源:test_insert_parquet.py
示例14: test_libs_with_same_filenames
def test_libs_with_same_filenames(self, vector):
self.client.execute("create database if not exists same_lib_filename_udf_test "
"location '%s'" % get_fs_path('/test-warehouse/same_lib_filename_udf_test.db'))
try:
self.run_test_case('QueryTest/libs_with_same_filenames', vector)
finally:
self.client.execute("drop database if exists same_lib_filename_udf_test cascade")
开发者ID:cchanning,项目名称:incubator-impala,代码行数:7,代码来源:test_udfs.py
示例15: test_drop_function_while_running
def test_drop_function_while_running(self, vector, unique_database):
self.client.execute("drop function if exists `{0}`.drop_while_running(BIGINT)"
.format(unique_database))
self.client.execute(
"create function `{0}`.drop_while_running(BIGINT) returns "
"BIGINT LOCATION '{1}' SYMBOL='Identity'".format(
unique_database,
get_fs_path('/test-warehouse/libTestUdfs.so')))
query = ("select `{0}`.drop_while_running(l_orderkey) from tpch.lineitem limit 10000"
.format(unique_database))
# Run this query asynchronously.
handle = self.execute_query_async(query, vector.get_value('exec_option'),
table_format=vector.get_value('table_format'))
# Fetch some rows from the async query to make sure the UDF is being used
results = self.client.fetch(query, handle, 1)
assert results.success
assert len(results.data) == 1
# Drop the function while the original query is running.
self.client.execute(
"drop function `{0}`.drop_while_running(BIGINT)".format(unique_database))
# Fetch the rest of the rows, this should still be able to run the UDF
results = self.client.fetch(query, handle, -1)
assert results.success
assert len(results.data) == 9999
开发者ID:timarmstrong,项目名称:incubator-impala,代码行数:28,代码来源:test_udfs.py
示例16: test_clustered_partition_single_file
def test_clustered_partition_single_file(self, unique_database):
"""IMPALA-2523: Tests that clustered insert creates one file per partition, even when
inserting over multiple row batches."""
# On s3 this test takes about 220 seconds and we are unlikely to break it, so only run
# it in exhaustive strategy.
if self.exploration_strategy() != 'exhaustive' and IS_S3:
pytest.skip("only runs in exhaustive")
table = "{0}.insert_clustered".format(unique_database)
table_path = "test-warehouse/{0}.db/insert_clustered".format(unique_database)
table_location = get_fs_path("/" + table_path)
create_stmt = """create table {0} like functional.alltypes""".format(table)
self.execute_query_expect_success(self.client, create_stmt)
set_location_stmt = """alter table {0} set location '{1}'""".format(
table, table_location)
self.execute_query_expect_success(self.client, set_location_stmt)
# Setting a lower batch size will result in multiple row batches being written.
self.execute_query_expect_success(self.client, "set batch_size=10")
insert_stmt = """insert into {0} partition(year, month) /*+ clustered,shuffle */
select * from functional.alltypes""".format(table)
self.execute_query_expect_success(self.client, insert_stmt)
# We expect exactly one partition per year and month, since subsequent row batches of
# a partition will be written into the same file.
expected_partitions = \
["year=%s/month=%s" % (y, m) for y in [2009, 2010] for m in range(1,13)]
for partition in expected_partitions:
partition_path = "{0}/{1}".format(table_path, partition)
files = self.filesystem_client.ls(partition_path)
assert len(files) == 1, "%s: %s" % (partition, files)
开发者ID:timarmstrong,项目名称:incubator-impala,代码行数:34,代码来源:test_insert_behaviour.py
示例17: test_udf_update_via_drop
def test_udf_update_via_drop(self, vector, unique_database):
"""Test updating the UDF binary without restarting Impala. Dropping
the function should remove the binary from the local cache."""
# Run with sync_ddl to guarantee the drop is processed by all impalads.
exec_options = vector.get_value('exec_option')
exec_options['sync_ddl'] = 1
old_udf = os.path.join(
os.environ['IMPALA_HOME'], 'testdata/udfs/impala-hive-udfs.jar')
new_udf = os.path.join(
os.environ['IMPALA_HOME'], 'tests/test-hive-udfs/target/test-hive-udfs-1.0.jar')
udf_dst = get_fs_path('/test-warehouse/impala-hive-udfs2.jar')
drop_fn_stmt = (
'drop function if exists `{0}`.`udf_update_test_drop`()'.format(unique_database))
create_fn_stmt = (
"create function `{0}`.`udf_update_test_drop`() returns string LOCATION '{1}' "
"SYMBOL='com.cloudera.impala.TestUpdateUdf'".format(unique_database, udf_dst))
query_stmt = "select `{0}`.`udf_update_test_drop`()".format(unique_database)
# Put the old UDF binary on HDFS, make the UDF in Impala and run it.
check_call(["hadoop", "fs", "-put", "-f", old_udf, udf_dst])
self.execute_query_expect_success(self.client, drop_fn_stmt, exec_options)
self.execute_query_expect_success(self.client, create_fn_stmt, exec_options)
self.__run_query_all_impalads(exec_options, query_stmt, ["Old UDF"])
# Update the binary, drop and create the function again. The new binary should
# be running.
check_call(["hadoop", "fs", "-put", "-f", new_udf, udf_dst])
self.execute_query_expect_success(self.client, drop_fn_stmt, exec_options)
self.execute_query_expect_success(self.client, create_fn_stmt, exec_options)
self.__run_query_all_impalads(exec_options, query_stmt, ["New UDF"])
开发者ID:cchanning,项目名称:incubator-impala,代码行数:31,代码来源:test_udfs.py
示例18: test_udf_constant_folding
def test_udf_constant_folding(self, vector, unique_database):
"""Test that constant folding of UDFs is handled correctly. Uses count_rows(),
which returns a unique value every time it is evaluated in the same thread."""
exec_options = copy(vector.get_value('exec_option'))
# Execute on a single node so that all counter values will be unique.
exec_options["num_nodes"] = 1
create_fn_query = """create function {database}.count_rows() returns bigint
location '{location}' symbol='Count' prepare_fn='CountPrepare'
close_fn='CountClose'"""
self._load_functions(create_fn_query, vector, unique_database,
get_fs_path('/test-warehouse/libTestUdfs.so'))
# Only one distinct value if the expression is constant folded, otherwise one
# value per row in alltypes
expected_ndv = 1 if exec_options['enable_expr_rewrites'] else 7300
# Test fully constant expression, evaluated in FE.
query = "select `{0}`.count_rows() from functional.alltypes".format(unique_database)
result = self.execute_query_expect_success(self.client, query, exec_options)
actual_ndv = len(set(result.data))
assert actual_ndv == expected_ndv
# Test constant argument to a non-constant expr. The argument value can be
# cached in the backend.
query = """select concat(cast(`{0}`.count_rows() as string), '-', string_col)
from functional.alltypes""".format(unique_database)
result = self.execute_query_expect_success(self.client, query, exec_options)
actual_ndv = len(set(value.split("-")[0] for value in result.data))
assert actual_ndv == expected_ndv
开发者ID:timarmstrong,项目名称:incubator-impala,代码行数:29,代码来源:test_udfs.py
示例19: test_insert_parquet_verify_size
def test_insert_parquet_verify_size(self, vector):
# Test to verify that the result file size is close to what we expect.i
TBL = "parquet_insert_size"
DROP = "drop table if exists {0}".format(TBL)
CREATE = ("create table parquet_insert_size like tpch_parquet.orders"
" stored as parquet location '{0}/{1}'".format(WAREHOUSE, TBL))
QUERY = "insert overwrite {0} select * from tpch.orders".format(TBL)
DIR = get_fs_path("test-warehouse/{0}/".format(TBL))
BLOCK_SIZE = 40 * 1024 * 1024
self.execute_query(DROP)
self.execute_query(CREATE)
vector.get_value('exec_option')['PARQUET_FILE_SIZE'] = BLOCK_SIZE
vector.get_value('exec_option')['COMPRESSION_CODEC'] =\
vector.get_value('compression_codec')
vector.get_value('exec_option')['num_nodes'] = 1
self.execute_query(QUERY, vector.get_value('exec_option'))
# Get the files in hdfs and verify. There can be at most 1 file that is smaller
# that the BLOCK_SIZE. The rest should be within 80% of it and not over.
found_small_file = False
ls = self.hdfs_client.list_dir(DIR)
for f in ls['FileStatuses']['FileStatus']:
if f['type'] != 'FILE':
continue
length = f['length']
print length
assert length < BLOCK_SIZE
if length < BLOCK_SIZE * 0.80:
assert found_small_file == False
found_small_file = True
开发者ID:ibmsoe,项目名称:ImpalaPPC,代码行数:32,代码来源:test_insert_parquet.py
示例20: test_permanent_udfs
def test_permanent_udfs(self):
# Make sure the pre-calculated count tallies with the number of
# functions shown using "show [aggregate] functions" statement
self.verify_function_count(
"SHOW FUNCTIONS in {0}".format(self.DATABASE), self.udf_count);
self.verify_function_count(
"SHOW AGGREGATE FUNCTIONS in {0}".format(self.DATABASE), self.uda_count)
# invalidate metadata and make sure the count tallies
result = self.client.execute("INVALIDATE METADATA")
self.verify_function_count(
"SHOW FUNCTIONS in {0}".format(self.DATABASE), self.udf_count);
self.verify_function_count(
"SHOW AGGREGATE FUNCTIONS in {0}".format(self.DATABASE), self.uda_count)
# Restart the cluster, this triggers a full metadata reload
self.__restart_cluster()
# Make sure the counts of udfs and udas match post restart
self.verify_function_count(
"SHOW FUNCTIONS in {0}".format(self.DATABASE), self.udf_count);
self.verify_function_count(
"SHOW AGGREGATE FUNCTIONS in {0}".format(self.DATABASE), self.uda_count)
# Drop sample udas and verify the count matches pre and post restart
self.__load_drop_functions(
self.DROP_SAMPLE_UDAS_TEMPLATE, self.DATABASE,
get_fs_path('/test-warehouse/libudasample.so'))
self.verify_function_count(
"SHOW AGGREGATE FUNCTIONS in {0}".format(self.DATABASE), 1)
self.__restart_cluster()
self.verify_function_count(
"SHOW AGGREGATE FUNCTIONS in {0}".format(self.DATABASE), 1)
开发者ID:ibmsoe,项目名称:ImpalaPPC,代码行数:29,代码来源:test_permanent_udfs.py
注:本文中的tests.util.filesystem_utils.get_fs_path函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论