本文整理汇总了Java中org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan类的典型用法代码示例。如果您正苦于以下问题:Java MROperPlan类的具体用法?Java MROperPlan怎么用?Java MROperPlan使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
MROperPlan类属于org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans包,在下文中一共展示了MROperPlan类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testUDFInMergedCoGroup
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
@Test
public void testUDFInMergedCoGroup() throws Exception {
String query = "a = load 'input1' using " + TestCollectableLoadFunc.class.getName() + "();" +
"b = load 'input2' using " + TestIndexableLoadFunc.class.getName() + "();" +
"c = cogroup a by $0, b by $0 using 'merge';" +
"store c into 'output';";
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
MapReduceOper mrOper = mrPlan.getRoots().get(0);
assertTrue(mrOper.UDFs.contains(TestCollectableLoadFunc.class.getName()));
mrOper = mrPlan.getSuccessors(mrOper).get(0);
assertTrue(mrOper.UDFs.contains(TestCollectableLoadFunc.class.getName()));
assertTrue(mrOper.UDFs.contains(TestIndexableLoadFunc.class.getName()));
}
开发者ID:sigmoidanalytics,项目名称:spork,代码行数:17,代码来源:TestMRCompiler.java
示例2: testDefaultParallelInSort
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
@Test
public void testDefaultParallelInSort() throws Throwable {
// default_parallel is considered only at runtime, so here we only test requested parallel
// more thorough tests can be found in TestNumberOfReducers.java
String query = "a = load 'input';" + "b = order a by $0 parallel 100;" + "store b into 'output';";
PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
// Get the sort job
Iterator<MapReduceOper> iter = mrPlan.getKeys().values().iterator();
int counter = 0;
while (iter.hasNext()) {
MapReduceOper op = iter.next();
counter++;
if (op.isGlobalSort()) {
assertTrue(op.getRequestedParallelism()==100);
}
}
assertEquals(3, counter);
pc.defaultParallel = -1;
}
开发者ID:sigmoidanalytics,项目名称:spork,代码行数:25,代码来源:TestJobSubmission.java
示例3: testNumReducersInLimitWithParallel
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
/**
* Test to ensure that the order by with parallel followed by a limit, i.e., top k
* always produces the correct number of map reduce jobs
*/
@Test
public void testNumReducersInLimitWithParallel() throws Exception {
String query = "a = load 'input';" +
"b = order a by $0 parallel 2;" +
"c = limit b 10;" + "store c into 'output';";
PhysicalPlan pp = Util.buildPp(pigServerMR, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
LimitAdjuster la = new LimitAdjuster(mrPlan, pc);
la.visit();
la.adjust();
MapReduceOper mrOper = mrPlan.getRoots().get(0);
int count = 1;
while(mrPlan.getSuccessors(mrOper) != null) {
mrOper = mrPlan.getSuccessors(mrOper).get(0);
++count;
}
assertEquals(4, count);
}
开发者ID:sigmoidanalytics,项目名称:spork,代码行数:27,代码来源:TestMRCompiler.java
示例4: testDefaultParallel
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
@Test
public void testDefaultParallel() throws Throwable {
pc.defaultParallel = 100;
String query = "a = load 'input';" + "b = group a by $0;" + "store b into 'output';";
PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
ConfigurationValidator.validatePigProperties(pc.getProperties());
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
JobControl jobControl = jcc.compile(mrPlan, "Test");
Job job = jobControl.getWaitingJobs().get(0);
int parallel = job.getJobConf().getNumReduceTasks();
assertEquals(100, parallel);
Util.assertParallelValues(100, -1, -1, 100, job.getJobConf());
pc.defaultParallel = -1;
}
开发者ID:sigmoidanalytics,项目名称:spork-streaming,代码行数:23,代码来源:TestJobSubmission.java
示例5: testDefaultParallelInSort
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
@Test
public void testDefaultParallelInSort() throws Throwable {
// default_parallel is considered only at runtime, so here we only test requested parallel
// more thorough tests can be found in TestNumberOfReducers.java
String query = "a = load 'input';" + "b = order a by $0 parallel 100;" + "store b into 'output';";
PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
// Get the sort job
Iterator<MapReduceOper> iter = mrPlan.getKeys().values().iterator();
int counter = 0;
while (iter.hasNext()) {
MapReduceOper op = iter.next();
counter++;
if (op.isGlobalSort()) {
assertTrue(op.getRequestedParallelism()==100);
}
}
assertEquals(3, counter);
pc.defaultParallel = -1;
}
开发者ID:sigmoidanalytics,项目名称:spork-streaming,代码行数:25,代码来源:TestJobSubmission.java
示例6: testDefaultParallelInSkewJoin
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
@Test
public void testDefaultParallelInSkewJoin() throws Throwable {
// default_parallel is considered only at runtime, so here we only test requested parallel
// more thorough tests can be found in TestNumberOfReducers.java
String query = "a = load 'input';" +
"b = load 'input';" +
"c = join a by $0, b by $0 using 'skewed' parallel 100;" +
"store c into 'output';";
PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
// Get the skew join job
Iterator<MapReduceOper> iter = mrPlan.getKeys().values().iterator();
int counter = 0;
while (iter.hasNext()) {
MapReduceOper op = iter.next();
counter++;
if (op.isSkewedJoin()) {
assertTrue(op.getRequestedParallelism()==100);
}
}
assertEquals(3, counter);
pc.defaultParallel = -1;
}
开发者ID:sigmoidanalytics,项目名称:spork-streaming,代码行数:27,代码来源:TestJobSubmission.java
示例7: testNumReducersInLimit
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
/**
* Test to ensure that the order by without parallel followed by a limit, i.e., top k
* always produces the correct number of map reduce jobs. In the testcase below since
* we are running the unit test locally, we will get reduce parallelism as 1. So we will
* NOT introduce the extra MR job to do a final limit
*/
@Test
public void testNumReducersInLimit() throws Exception {
String query = "a = load 'input';" +
"b = order a by $0;" +
"c = limit b 10;" +
"store c into 'output';";
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
MapReduceOper mrOper = mrPlan.getRoots().get(0);
int count = 1;
while(mrPlan.getSuccessors(mrOper) != null) {
mrOper = mrPlan.getSuccessors(mrOper).get(0);
++count;
}
assertEquals(3, count);
}
开发者ID:sigmoidanalytics,项目名称:spork-streaming,代码行数:25,代码来源:TestMRCompiler.java
示例8: testDefaultParallelInSkewJoin
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
@Test
public void testDefaultParallelInSkewJoin() throws Throwable {
// default_parallel is considered only at runtime, so here we only test requested parallel
// more thorough tests can be found in TestNumberOfReducers.java
String query = "a = load 'input';" +
"b = load 'input';" +
"c = join a by $0, b by $0 using 'skewed' parallel 100;" +
"store c into 'output';";
PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
// Get the skew join job
Iterator<MapReduceOper> iter = mrPlan.getKeys().values().iterator();
int counter = 0;
while (iter.hasNext()) {
MapReduceOper op = iter.next();
counter++;
if (op.isSkewedJoin()) {
assertTrue(op.getRequestedParallelism()==100);
}
}
assertEquals(3, counter);
pc.defaultParallel = -1;
}
开发者ID:sigmoidanalytics,项目名称:spork,代码行数:27,代码来源:TestJobSubmission.java
示例9: testSchemaInStoreForDistinctLimit
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
@Test
public void testSchemaInStoreForDistinctLimit() throws Exception {
//test if the POStore in the 2nd mr plan (that stores the actual output)
// has a schema
String query = "a = load 'input1' as (a : int,b :float ,c : int);" +
"b = distinct a;" +
"c = limit b 10;" +
"store c into 'output';";
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
MapReduceOper secondMrOper = mrPlan.getLeaves().get(0);
POStore store = (POStore)secondMrOper.reducePlan.getLeaves().get(0);
assertEquals(
"compare load and store schema",
store.getSchema(),
Utils.getSchemaFromString("a : int,b :float ,c : int")
);
}
开发者ID:sigmoidanalytics,项目名称:spork,代码行数:20,代码来源:TestMRCompiler.java
示例10: testGroupConstWithParallel
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
/**
* Test parallelism for group by constant
* @throws Throwable
*/
@Test
public void testGroupConstWithParallel() throws Throwable {
PigContext pc = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
pc.defaultParallel = 100;
pc.connect();
String query = "a = load 'input';\n" + "b = group a by 1;" + "store b into 'output';";
PigServer pigServer = new PigServer( ExecType.MAPREDUCE, cluster.getProperties() );
PhysicalPlan pp = Util.buildPp( pigServer, query );
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
ConfigurationValidator.validatePigProperties(pc.getProperties());
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
JobControl jobControl = jcc.compile(mrPlan, "Test");
Job job = jobControl.getWaitingJobs().get(0);
int parallel = job.getJobConf().getNumReduceTasks();
assertEquals("parallism", 1, parallel);
}
开发者ID:sigmoidanalytics,项目名称:spork-streaming,代码行数:27,代码来源:TestGroupConstParallel.java
示例11: testGroupNonConstWithParallel
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
/**
* Test parallelism for group by column
* @throws Throwable
*/
@Test
public void testGroupNonConstWithParallel() throws Throwable {
PigContext pc = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
pc.defaultParallel = 100;
pc.connect();
PigServer pigServer = new PigServer( ExecType.MAPREDUCE, cluster.getProperties() );
String query = "a = load 'input';\n" + "b = group a by $0;" + "store b into 'output';";
PhysicalPlan pp = Util.buildPp( pigServer, query );
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
ConfigurationValidator.validatePigProperties(pc.getProperties());
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
JobControl jobControl = jcc.compile(mrPlan, "Test");
Job job = jobControl.getWaitingJobs().get(0);
int parallel = job.getJobConf().getNumReduceTasks();
assertEquals("parallism", 100, parallel);
}
开发者ID:sigmoidanalytics,项目名称:spork-streaming,代码行数:27,代码来源:TestGroupConstParallel.java
示例12: testSortOptimization3
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
@Test
public void testSortOptimization3() throws Exception {
// Sort on the main key prefix / non main key prefix mixed
String query = ("A=LOAD 'input1' AS (a0, a1, a2);" +
"B = group A by $0;" +
"C = foreach B { D = limit A 10; E = order D by $1; F = order E by $0; generate group, F;};"+
"store C into 'output';");
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
so.visit();
assertEquals(1, so.getNumMRUseSecondaryKey());
assertEquals(2, so.getNumSortRemoved());
assertEquals(0, so.getDistinctChanged());
}
开发者ID:sigmoidanalytics,项目名称:spork-streaming,代码行数:19,代码来源:TestSecondarySort.java
示例13: testSortOptimization4
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
@Test
public void testSortOptimization4() throws Exception {
// Sort on the main key again
String query = ("A=LOAD 'input1' AS (a0, a1, a2);" +
"B = group A by $0;" +
"C = foreach B { D = limit A 10; E = order D by $0, $1, $2; generate group, E;};" +
"store C into 'output';");
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
so.visit();
assertEquals(1, so.getNumMRUseSecondaryKey());
assertEquals(1, so.getNumSortRemoved());
assertEquals(0, so.getDistinctChanged());
}
开发者ID:sigmoidanalytics,项目名称:spork-streaming,代码行数:19,代码来源:TestSecondarySort.java
示例14: testSortOptimization5
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
@Test
public void testSortOptimization5() throws Exception {
// Sort on the two keys, we can only take off 1
String query = ("A=LOAD 'input1' AS (a0, a1, a2);" +
"B = group A by $0;" +
"C = foreach B { D = limit A 10; E = order D by $1; F = order E by $2; generate group, F;};" +
"store C into 'output';");
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
so.visit();
assertEquals(1, so.getNumMRUseSecondaryKey());
assertEquals(1, so.getNumSortRemoved());
assertEquals(0, so.getDistinctChanged());
}
开发者ID:sigmoidanalytics,项目名称:spork-streaming,代码行数:18,代码来源:TestSecondarySort.java
示例15: testSortOptimization6
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
@Test
public void testSortOptimization6() throws Exception {
// Sort desc
String query = ("A=LOAD 'input1' AS (a0, a1, a2);" +
"B = group A by $0;" +
"C = foreach B { D = order A by $0 desc; generate group, D;};" +
"store C into 'output';");
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
so.visit();
assertEquals(1, so.getNumMRUseSecondaryKey());
assertEquals(1, so.getNumSortRemoved());
assertEquals(0, so.getDistinctChanged());
}
开发者ID:sigmoidanalytics,项目名称:spork-streaming,代码行数:19,代码来源:TestSecondarySort.java
示例16: testSortOptimization7
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
@Test
public void testSortOptimization7() throws Exception {
// Sort asc on 1st key, desc on 2nd key
String query = ("A=LOAD 'input1' AS (a0, a1, a2);" +
"B = group A by ($0, $1);" +
"C = foreach B { D = order A by $0, $1 desc; generate group, D;};" +
"store C into 'output';");
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
so.visit();
assertEquals(1, so.getNumMRUseSecondaryKey());
assertEquals(1, so.getNumSortRemoved());
assertEquals(0, so.getDistinctChanged());
}
开发者ID:sigmoidanalytics,项目名称:spork-streaming,代码行数:19,代码来源:TestSecondarySort.java
示例17: testSortOptimization8
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
@Test
public void testSortOptimization8() throws Exception {
// Sort desc, used in UDF twice
String query = ("A=LOAD 'input1' AS (a0);" +
"B = group A all;" +
"C = foreach B { D = order A by $0 desc; generate DIFF(D, D);};" +
"store C into 'output';");
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
so.visit();
assertEquals(1, so.getNumMRUseSecondaryKey());
assertEquals(2, so.getNumSortRemoved());
assertEquals(0, so.getDistinctChanged());
}
开发者ID:sigmoidanalytics,项目名称:spork-streaming,代码行数:19,代码来源:TestSecondarySort.java
示例18: MRCompiler
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
public MRCompiler(PhysicalPlan plan,
PigContext pigContext) throws MRCompilerException {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
this.plan = plan;
this.pigContext = pigContext;
splitsSeen = new HashMap<OperatorKey, MapReduceOper>();
MRPlan = new MROperPlan();
nig = NodeIdGenerator.getGenerator();
udfFinder = new UDFFinder();
List<PhysicalOperator> roots = plan.getRoots();
if((roots == null) || (roots.size() <= 0)) {
int errCode = 2053;
String msg = "Internal error. Did not find roots in the physical plan.";
throw new MRCompilerException(msg, errCode, PigException.BUG);
}
scope = roots.get(0).getOperatorKey().getScope();
messageCollector = new CompilationMessageCollector() ;
phyToMROpMap = new HashMap<PhysicalOperator, MapReduceOper>();
fileConcatenationThreshold = Integer.parseInt(pigContext.getProperties()
.getProperty(FILE_CONCATENATION_THRESHOLD, "100"));
optimisticFileConcatenation = pigContext.getProperties().getProperty(
OPTIMISTIC_FILE_CONCATENATION, "false").equals("true");
LOG.info("File concatenation threshold: " + fileConcatenationThreshold
+ " optimistic? " + optimisticFileConcatenation);
}
开发者ID:sigmoidanalytics,项目名称:spork,代码行数:27,代码来源:MRCompiler.java
示例19: testPoissonSampleOptimizer
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
@Test
public void testPoissonSampleOptimizer() throws Exception {
String query = " A = load 'input' using PigStorage('\t');" +
"B = load 'input' using PigStorage('\t');" +
" C = join A by $0, B by $0 using 'skewed';" +
"store C into 'output';";
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
int count = 1;
MapReduceOper mrOper = mrPlan.getRoots().get(0);
while(mrPlan.getSuccessors(mrOper) != null) {
mrOper = mrPlan.getSuccessors(mrOper).get(0);
++count;
}
// Before optimizer visits, number of MR jobs = 3.
assertEquals(3,count);
SampleOptimizer so = new SampleOptimizer(mrPlan, pc);
so.visit();
count = 1;
mrOper = mrPlan.getRoots().get(0);
while(mrPlan.getSuccessors(mrOper) != null) {
mrOper = mrPlan.getSuccessors(mrOper).get(0);
++count;
}
// After optimizer visits, number of MR jobs = 2
assertEquals(2,count);
}
开发者ID:sigmoidanalytics,项目名称:spork,代码行数:31,代码来源:TestSampleOptimizer.java
示例20: testMapAggNotCombinable
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; //导入依赖的package包/类
@Test
public void testMapAggNotCombinable() throws Exception{
//not combinable, so there should not be a POPartial
String query = "l = load 'x' as (a,b,c);" +
"g = group l by a;" +
"f = foreach g generate group, COUNT(l.b), l.b;";
pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, "true");
MROperPlan mrp = Util.buildMRPlan(query, pc);
assertEquals(mrp.size(), 1);
assertNull("POPartialAgg should be absent", findPOPartialAgg(mrp));
}
开发者ID:sigmoidanalytics,项目名称:spork,代码行数:13,代码来源:TestPOPartialAggPlan.java
注:本文中的org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论