本文整理汇总了Java中org.apache.flink.api.java.operators.GroupReduceOperator类的典型用法代码示例。如果您正苦于以下问题:Java GroupReduceOperator类的具体用法?Java GroupReduceOperator怎么用?Java GroupReduceOperator使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
GroupReduceOperator类属于org.apache.flink.api.java.operators包,在下文中一共展示了GroupReduceOperator类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: sampleWithSize
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
/**
* Generate a sample of DataSet which contains fixed size elements.
*
* <p><strong>NOTE:</strong> Sample with fixed size is not as efficient as sample with fraction, use sample with
* fraction unless you need exact precision.
*
* @param withReplacement Whether element can be selected more than once.
* @param numSamples The expected sample size.
* @param seed Random number generator seed.
* @return The sampled DataSet
*/
public static <T> DataSet<T> sampleWithSize(
DataSet <T> input,
final boolean withReplacement,
final int numSamples,
final long seed) {
SampleInPartition<T> sampleInPartition = new SampleInPartition<>(withReplacement, numSamples, seed);
MapPartitionOperator mapPartitionOperator = input.mapPartition(sampleInPartition);
// There is no previous group, so the parallelism of GroupReduceOperator is always 1.
String callLocation = Utils.getCallLocationName();
SampleInCoordinator<T> sampleInCoordinator = new SampleInCoordinator<>(withReplacement, numSamples, seed);
return new GroupReduceOperator<>(mapPartitionOperator, input.getType(), sampleInCoordinator, callLocation);
}
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:DataSetUtils.java
示例2: first
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
/**
* Returns a new set containing the first n elements in this {@link DataSet}.
*
* @param n The desired number of elements.
* @return A ReduceGroupOperator that represents the DataSet containing the elements.
*/
public GroupReduceOperator<T, T> first(int n) {
if (n < 1) {
throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
}
return reduceGroup(new FirstReducer<T>(n));
}
开发者ID:axbaretto,项目名称:flink,代码行数:14,代码来源:DataSet.java
示例3: testSemanticPropsWithKeySelector1
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testSemanticPropsWithKeySelector1() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
tupleDs.groupBy(new DummyTestKeySelector())
.reduceGroup(new DummyGroupReduceFunction1());
SemanticProperties semProps = reduceOp.getSemanticProperties();
assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
assertTrue(semProps.getReadFields(0).size() == 3);
assertTrue(semProps.getReadFields(0).contains(2));
assertTrue(semProps.getReadFields(0).contains(5));
assertTrue(semProps.getReadFields(0).contains(6));
}
开发者ID:axbaretto,项目名称:flink,代码行数:36,代码来源:GroupReduceOperatorTest.java
示例4: testSemanticPropsWithKeySelector2
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testSemanticPropsWithKeySelector2() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
tupleDs.groupBy(new DummyTestKeySelector())
.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
.reduceGroup(new DummyGroupReduceFunction1());
SemanticProperties semProps = reduceOp.getSemanticProperties();
assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
assertTrue(semProps.getForwardingTargetFields(0, 4).contains(4));
assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 0);
assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
assertTrue(semProps.getForwardingSourceField(0, 4) == 4);
assertTrue(semProps.getReadFields(0).size() == 3);
assertTrue(semProps.getReadFields(0).contains(4));
assertTrue(semProps.getReadFields(0).contains(7));
assertTrue(semProps.getReadFields(0).contains(8));
}
开发者ID:axbaretto,项目名称:flink,代码行数:39,代码来源:GroupReduceOperatorTest.java
示例5: testSemanticPropsWithKeySelector3
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testSemanticPropsWithKeySelector3() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
tupleDs.groupBy(new DummyTestKeySelector())
.reduceGroup(new DummyGroupReduceFunction2())
.withForwardedFields("0->4;1;1->3;2");
SemanticProperties semProps = reduceOp.getSemanticProperties();
assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
assertTrue(semProps.getReadFields(0).size() == 3);
assertTrue(semProps.getReadFields(0).contains(2));
assertTrue(semProps.getReadFields(0).contains(5));
assertTrue(semProps.getReadFields(0).contains(6));
}
开发者ID:axbaretto,项目名称:flink,代码行数:37,代码来源:GroupReduceOperatorTest.java
示例6: testSemanticPropsWithKeySelector4
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testSemanticPropsWithKeySelector4() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
tupleDs.groupBy(new DummyTestKeySelector())
.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
.reduceGroup(new DummyGroupReduceFunction2())
.withForwardedFields("0->4;1;1->3;2");
SemanticProperties semProps = reduceOp.getSemanticProperties();
assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
assertTrue(semProps.getForwardingTargetFields(0, 4).contains(4));
assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 0);
assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
assertTrue(semProps.getForwardingSourceField(0, 4) == 4);
assertTrue(semProps.getReadFields(0).size() == 3);
assertTrue(semProps.getReadFields(0).contains(4));
assertTrue(semProps.getReadFields(0).contains(7));
assertTrue(semProps.getReadFields(0).contains(8));
}
开发者ID:axbaretto,项目名称:flink,代码行数:40,代码来源:GroupReduceOperatorTest.java
示例7: testSemanticPropsWithKeySelector5
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testSemanticPropsWithKeySelector5() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
tupleDs.groupBy(new DummyTestKeySelector())
.reduceGroup(new DummyGroupReduceFunction3())
.withForwardedFields("4->0;3;3->1;2");
SemanticProperties semProps = reduceOp.getSemanticProperties();
assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
assertTrue(semProps.getForwardingTargetFields(0, 6).contains(0));
assertTrue(semProps.getForwardingSourceField(0, 0) == 6);
assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
assertTrue(semProps.getReadFields(0) == null);
}
开发者ID:axbaretto,项目名称:flink,代码行数:34,代码来源:GroupReduceOperatorTest.java
示例8: testSemanticPropsWithKeySelector6
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testSemanticPropsWithKeySelector6() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
tupleDs.groupBy(new DummyTestKeySelector())
.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
.reduceGroup(new DummyGroupReduceFunction3())
.withForwardedFields("4->0;3;3->1;2");
SemanticProperties semProps = reduceOp.getSemanticProperties();
assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 2);
assertTrue(semProps.getForwardingTargetFields(0, 7).contains(1));
assertTrue(semProps.getForwardingTargetFields(0, 7).contains(3));
assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 1);
assertTrue(semProps.getForwardingTargetFields(0, 8).contains(0));
assertTrue(semProps.getForwardingSourceField(0, 0) == 8);
assertTrue(semProps.getForwardingSourceField(0, 1) == 7);
assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
assertTrue(semProps.getForwardingSourceField(0, 3) == 7);
assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
assertTrue(semProps.getReadFields(0) == null);
}
开发者ID:axbaretto,项目名称:flink,代码行数:37,代码来源:GroupReduceOperatorTest.java
示例9: testSemanticPropsWithKeySelector7
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testSemanticPropsWithKeySelector7() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
tupleDs.groupBy(new DummyTestKeySelector())
.reduceGroup(new DummyGroupReduceFunction4());
SemanticProperties semProps = reduceOp.getSemanticProperties();
assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
assertTrue(semProps.getForwardingTargetFields(0, 2).contains(0));
assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 1);
assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 1);
assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
assertTrue(semProps.getForwardingSourceField(0, 0) == 2);
assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
assertTrue(semProps.getForwardingSourceField(0, 2) < 0);
assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
assertTrue(semProps.getReadFields(0) == null);
}
开发者ID:axbaretto,项目名称:flink,代码行数:32,代码来源:GroupReduceOperatorTest.java
示例10: translateSink
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
private void translateSink(FlowProcess flowProcess, DataSet<Tuple> input, FlowNode node) {
Tap tap = this.getSingle(node.getSinkTaps());
Configuration sinkConfig = this.getNodeConfig(node);
tap.sinkConfInit(flowProcess, sinkConfig);
int desiredDop = tap.getScheme().getNumSinkParts();
int inputDop = ((Operator)input).getParallelism();
int dop;
if (inputDop == 1) {
// input operators have dop 1. Probably because they perform a non-keyed reduce or coGroup
dop = 1;
}
else {
if (desiredDop > 0) {
// output dop explicitly set.
if (input instanceof GroupReduceOperator) {
// input is a reduce and we must preserve its sorting.
// we must set the desired dop also for reduce and related operators
adjustDopOfReduceOrCoGroup((GroupReduceOperator) input, desiredDop);
}
dop = desiredDop;
}
else {
dop = inputDop;
}
}
input
.output(new TapOutputFormat(node))
.name(tap.getIdentifier())
.setParallelism(dop)
.withParameters(FlinkConfigConverter.toFlinkConfig(sinkConfig));
}
开发者ID:dataArtisans,项目名称:cascading-flink,代码行数:37,代码来源:FlinkFlowStep.java
示例11: first
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
/**
* Returns a new set containing the first n elements in this {@link DataSet}.<br/>
* @param n The desired number of elements.
* @return A ReduceGroupOperator that represents the DataSet containing the elements.
*/
public GroupReduceOperator<T, T> first(int n) {
if(n < 1) {
throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
}
return reduceGroup(new FirstReducer<T>(n));
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:13,代码来源:DataSet.java
示例12: translateNode
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Override
public void translateNode(
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> transform,
FlinkBatchTranslationContext context) {
// for now, this is copied from the Combine.PerKey translater. Once we have the new runner API
// we can replace GroupByKey by a Combine.PerKey with the Concatenate CombineFn
DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
context.getInputDataSet(context.getInput(transform));
Combine.CombineFn<InputT, List<InputT>, List<InputT>> combineFn = new Concatenate<>();
KvCoder<K, InputT> inputCoder =
(KvCoder<K, InputT>) context.getInput(transform).getCoder();
Coder<List<InputT>> accumulatorCoder;
try {
accumulatorCoder =
combineFn.getAccumulatorCoder(
context.getInput(transform).getPipeline().getCoderRegistry(),
inputCoder.getValueCoder());
} catch (CannotProvideCoderException e) {
throw new RuntimeException(e);
}
WindowingStrategy<?, ?> windowingStrategy =
context.getInput(transform).getWindowingStrategy();
TypeInformation<WindowedValue<KV<K, List<InputT>>>> partialReduceTypeInfo =
new CoderTypeInformation<>(
WindowedValue.getFullCoder(
KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
windowingStrategy.getWindowFn().windowCoder()));
Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
@SuppressWarnings("unchecked")
WindowingStrategy<Object, BoundedWindow> boundedStrategy =
(WindowingStrategy<Object, BoundedWindow>) windowingStrategy;
FlinkPartialReduceFunction<K, InputT, List<InputT>, ?> partialReduceFunction =
new FlinkPartialReduceFunction<>(
combineFn, boundedStrategy,
Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
context.getPipelineOptions());
FlinkReduceFunction<K, List<InputT>, List<InputT>, ?> reduceFunction =
new FlinkReduceFunction<>(
combineFn, boundedStrategy,
Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
context.getPipelineOptions());
// Partially GroupReduce the values into the intermediate format AccumT (combine)
GroupCombineOperator<
WindowedValue<KV<K, InputT>>,
WindowedValue<KV<K, List<InputT>>>> groupCombine =
new GroupCombineOperator<>(
inputGrouping,
partialReduceTypeInfo,
partialReduceFunction,
"GroupCombine: " + transform.getName());
Grouping<WindowedValue<KV<K, List<InputT>>>> intermediateGrouping =
groupCombine.groupBy(new KvKeySelector<List<InputT>, K>(inputCoder.getKeyCoder()));
// Fully reduce the values and create output format VO
GroupReduceOperator<
WindowedValue<KV<K, List<InputT>>>, WindowedValue<KV<K, List<InputT>>>> outputDataSet =
new GroupReduceOperator<>(
intermediateGrouping, partialReduceTypeInfo, reduceFunction, transform.getName());
context.setOutputDataSet(context.getOutput(transform), outputDataSet);
}
开发者ID:apache,项目名称:beam,代码行数:79,代码来源:FlinkBatchTransformTranslators.java
示例13: runInternal
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Override
public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// s, d(s), 1/log(d(s))
DataSet<Tuple3<K, LongValue, FloatValue>> inverseLogDegree = input
.run(new VertexDegree<K, VV, EV>()
.setParallelism(parallelism))
.map(new VertexInverseLogDegree<>())
.setParallelism(parallelism)
.name("Vertex score");
// s, t, 1/log(d(s))
DataSet<Tuple3<K, K, FloatValue>> sourceInverseLogDegree = input
.getEdges()
.join(inverseLogDegree, JoinHint.REPARTITION_HASH_SECOND)
.where(0)
.equalTo(0)
.projectFirst(0, 1)
.<Tuple3<K, K, FloatValue>>projectSecond(2)
.setParallelism(parallelism)
.name("Edge score");
// group span, s, t, 1/log(d(s))
DataSet<Tuple4<IntValue, K, K, FloatValue>> groupSpans = sourceInverseLogDegree
.groupBy(0)
.sortGroup(1, Order.ASCENDING)
.reduceGroup(new GenerateGroupSpans<>())
.setParallelism(parallelism)
.name("Generate group spans");
// group, s, t, 1/log(d(s))
DataSet<Tuple4<IntValue, K, K, FloatValue>> groups = groupSpans
.rebalance()
.setParallelism(parallelism)
.name("Rebalance")
.flatMap(new GenerateGroups<>())
.setParallelism(parallelism)
.name("Generate groups");
// t, u, 1/log(d(s)) where (s, t) and (s, u) are edges in graph
DataSet<Tuple3<K, K, FloatValue>> twoPaths = groups
.groupBy(0, 1)
.sortGroup(2, Order.ASCENDING)
.reduceGroup(new GenerateGroupPairs<>())
.name("Generate group pairs");
// t, u, adamic-adar score
GroupReduceOperator<Tuple3<K, K, FloatValue>, Result<K>> scores = twoPaths
.groupBy(0, 1)
.reduceGroup(new ComputeScores<>(minimumScore, minimumRatio))
.name("Compute scores");
if (minimumRatio > 0.0f) {
// total score, number of pairs of neighbors
DataSet<Tuple2<FloatValue, LongValue>> sumOfScoresAndNumberOfNeighborPairs = inverseLogDegree
.map(new ComputeScoreFromVertex<>())
.setParallelism(parallelism)
.name("Average score")
.sum(0)
.andSum(1);
scores
.withBroadcastSet(sumOfScoresAndNumberOfNeighborPairs, SUM_OF_SCORES_AND_NUMBER_OF_NEIGHBOR_PAIRS);
}
if (mirrorResults) {
return scores
.flatMap(new MirrorResult<>())
.name("Mirror results");
} else {
return scores;
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:74,代码来源:AdamicAdar.java
示例14: testAllReduceWithCombiner
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testAllReduceWithCombiner() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);
DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new CombineReducer2()).name("reducer");
reduced.setCombinable(true);
reduced.output(new DiscardingOutputFormat<Long>()).name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
// get the original nodes
SourcePlanNode sourceNode = resolver.getNode("source");
SingleInputPlanNode reduceNode = resolver.getNode("reducer");
SinkPlanNode sinkNode = resolver.getNode("sink");
// get the combiner
SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
// check wiring
assertEquals(sourceNode, combineNode.getInput().getSource());
assertEquals(reduceNode, sinkNode.getInput().getSource());
// check that both reduce and combiner have the same strategy
assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
assertEquals(DriverStrategy.ALL_GROUP_REDUCE_COMBINE, combineNode.getDriverStrategy());
// check parallelism
assertEquals(8, sourceNode.getParallelism());
assertEquals(8, combineNode.getParallelism());
assertEquals(1, reduceNode.getParallelism());
assertEquals(1, sinkNode.getParallelism());
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:47,代码来源:GroupReduceCompilationTest.java
示例15: testGroupedReduceWithFieldPositionKeyCombinable
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testGroupedReduceWithFieldPositionKeyCombinable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);
DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
.name("source").setParallelism(6);
GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
.groupBy(1)
.reduceGroup(new CombineReducer()).name("reducer");
reduced.setCombinable(true);
reduced.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
// get the original nodes
SourcePlanNode sourceNode = resolver.getNode("source");
SingleInputPlanNode reduceNode = resolver.getNode("reducer");
SinkPlanNode sinkNode = resolver.getNode("sink");
// get the combiner
SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
// check wiring
assertEquals(sourceNode, combineNode.getInput().getSource());
assertEquals(reduceNode, sinkNode.getInput().getSource());
// check that both reduce and combiner have the same strategy
assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
// check the keys
assertEquals(new FieldList(1), reduceNode.getKeys(0));
assertEquals(new FieldList(1), combineNode.getKeys(0));
assertEquals(new FieldList(1), combineNode.getKeys(1));
assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
// check parallelism
assertEquals(6, sourceNode.getParallelism());
assertEquals(6, combineNode.getParallelism());
assertEquals(8, reduceNode.getParallelism());
assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:56,代码来源:GroupReduceCompilationTest.java
示例16: testGroupedReduceWithSelectorFunctionKeyCombinable
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testGroupedReduceWithSelectorFunctionKeyCombinable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);
DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
.name("source").setParallelism(6);
GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
.groupBy(new KeySelector<Tuple2<String,Double>, String>() {
public String getKey(Tuple2<String, Double> value) { return value.f0; }
})
.reduceGroup(new CombineReducer()).name("reducer");
reduced.setCombinable(true);
reduced.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
// get the original nodes
SourcePlanNode sourceNode = resolver.getNode("source");
SingleInputPlanNode reduceNode = resolver.getNode("reducer");
SinkPlanNode sinkNode = resolver.getNode("sink");
// get the combiner
SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
// get the key extractors and projectors
SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource();
SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
// check wiring
assertEquals(sourceNode, keyExtractor.getInput().getSource());
assertEquals(keyProjector, sinkNode.getInput().getSource());
// check that both reduce and combiner have the same strategy
assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
// check the keys
assertEquals(new FieldList(0), reduceNode.getKeys(0));
assertEquals(new FieldList(0), combineNode.getKeys(0));
assertEquals(new FieldList(0), combineNode.getKeys(1));
assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
// check parallelism
assertEquals(6, sourceNode.getParallelism());
assertEquals(6, keyExtractor.getParallelism());
assertEquals(6, combineNode.getParallelism());
assertEquals(8, reduceNode.getParallelism());
assertEquals(8, keyProjector.getParallelism());
assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:65,代码来源:GroupReduceCompilationTest.java
示例17: testAllReduceWithCombiner
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testAllReduceWithCombiner() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(8);
DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new RichGroupReduceFunction<Long, Long>() {
public void reduce(Iterable<Long> values, Collector<Long> out) {}
}).name("reducer");
reduced.setCombinable(true);
reduced.print().name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
// get the original nodes
SourcePlanNode sourceNode = resolver.getNode("source");
SingleInputPlanNode reduceNode = resolver.getNode("reducer");
SinkPlanNode sinkNode = resolver.getNode("sink");
// get the combiner
SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
// check wiring
assertEquals(sourceNode, combineNode.getInput().getSource());
assertEquals(reduceNode, sinkNode.getInput().getSource());
// check that both reduce and combiner have the same strategy
assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
assertEquals(DriverStrategy.ALL_GROUP_COMBINE, combineNode.getDriverStrategy());
// check DOP
assertEquals(8, sourceNode.getDegreeOfParallelism());
assertEquals(8, combineNode.getDegreeOfParallelism());
assertEquals(1, reduceNode.getDegreeOfParallelism());
assertEquals(1, sinkNode.getDegreeOfParallelism());
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
}
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:49,代码来源:GroupReduceCompilationTest.java
示例18: testGroupedReduceWithFieldPositionKeyCombinable
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testGroupedReduceWithFieldPositionKeyCombinable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(8);
DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
.name("source").setParallelism(6);
GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
.groupBy(1)
.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
}).name("reducer");
reduced.setCombinable(true);
reduced.print().name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
// get the original nodes
SourcePlanNode sourceNode = resolver.getNode("source");
SingleInputPlanNode reduceNode = resolver.getNode("reducer");
SinkPlanNode sinkNode = resolver.getNode("sink");
// get the combiner
SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
// check wiring
assertEquals(sourceNode, combineNode.getInput().getSource());
assertEquals(reduceNode, sinkNode.getInput().getSource());
// check that both reduce and combiner have the same strategy
assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
// check the keys
assertEquals(new FieldList(1), reduceNode.getKeys(0));
assertEquals(new FieldList(1), combineNode.getKeys(0));
assertEquals(new FieldList(1), combineNode.getKeys(1));
assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
// check DOP
assertEquals(6, sourceNode.getDegreeOfParallelism());
assertEquals(6, combineNode.getDegreeOfParallelism());
assertEquals(8, reduceNode.getDegreeOfParallelism());
assertEquals(8, sinkNode.getDegreeOfParallelism());
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
}
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:58,代码来源:GroupReduceCompilationTest.java
示例19: testGroupedReduceWithSelectorFunctionKeyCombinable
import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testGroupedReduceWithSelectorFunctionKeyCombinable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(8);
DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
.name("source").setParallelism(6);
GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
.groupBy(new KeySelector<Tuple2<String,Double>, String>() {
public String getKey(Tuple2<String, Double> value) { return value.f0; }
})
.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
}).name("reducer");
reduced.setCombinable(true);
reduced.print().name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
// get the original nodes
SourcePlanNode sourceNode = resolver.getNode("source");
SingleInputPlanNode reduceNode = resolver.getNode("reducer");
SinkPlanNode sinkNode = resolver.getNode("sink");
// get the combiner
SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
// get the key extractors and projectors
SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource();
SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
// check wiring
assertEquals(sourceNode, keyExtractor.getInput().getSource());
assertEquals(keyProjector, sinkNode.getInput().getSource());
// check that both reduce and combiner have the same strategy
assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
// check the keys
assertEquals(new FieldList(0), reduceNode.getKeys(0));
assertEquals(new FieldList(0), combineNode.getKeys(0));
assertEquals(new FieldList(0), combineNode.getKeys(1));
assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
// check DOP
assertEquals(6, sourceNode.getDegreeOfParallelism());
assertEqual
|
请发表评论