本文整理汇总了Java中org.apache.flink.api.common.functions.CrossFunction类的典型用法代码示例。如果您正苦于以下问题:Java CrossFunction类的具体用法?Java CrossFunction怎么用?Java CrossFunction使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
CrossFunction类属于org.apache.flink.api.common.functions包,在下文中一共展示了CrossFunction类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: translateToDataFlow
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@Override
protected CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
String name = getName() != null ? getName() : "Cross at " + defaultName;
// create operator
CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>> po =
new CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>>(function,
new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), getResultType()),
name);
po.setFirstInput(input1);
po.setSecondInput(input2);
po.setParallelism(getParallelism());
po.setCrossHint(hint);
return po;
}
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:CrossOperator.java
示例2: getCrossReturnTypes
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@PublicEvolving
public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
return getBinaryOperatorReturnType(
(Function) crossInterface,
CrossFunction.class,
0,
1,
2,
new int[]{0},
new int[]{1},
NO_INDEX,
in1Type,
in2Type,
functionName,
allowMissing);
}
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:TypeExtractor.java
示例3: executeOnCollections
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@Override
protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
CrossFunction<IN1, IN2, OUT> function = this.userFunction.getUserCodeObject();
FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, this.parameters);
ArrayList<OUT> result = new ArrayList<OUT>(inputData1.size() * inputData2.size());
TypeSerializer<IN1> inSerializer1 = getOperatorInfo().getFirstInputType().createSerializer(executionConfig);
TypeSerializer<IN2> inSerializer2 = getOperatorInfo().getSecondInputType().createSerializer(executionConfig);
TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
for (IN1 element1 : inputData1) {
for (IN2 element2 : inputData2) {
IN1 copy1 = inSerializer1.copy(element1);
IN2 copy2 = inSerializer2.copy(element2);
OUT o = function.cross(copy1, copy2);
result.add(outSerializer.copy(o));
}
}
FunctionUtils.closeFunction(function);
return result;
}
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:CrossOperatorBase.java
示例4: translateToDataFlow
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@Override
protected org.apache.flink.api.common.operators.base.CrossOperatorBase<I1, I2, OUT, CrossFunction<I1,I2,OUT>> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
String name = getName() != null ? getName() : function.getClass().getName();
// create operator
CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>> po =
new CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>>(function, new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), getResultType()), name);
// set inputs
po.setFirstInput(input1);
po.setSecondInput(input2);
// set dop
po.setDegreeOfParallelism(this.getParallelism());
return po;
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:17,代码来源:CrossOperator.java
示例5: testCrossLambda
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@Test
public void testCrossLambda() {
CrossFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
if (!(ti instanceof MissingTypeInfo)) {
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:13,代码来源:LambdaExtractionTest.java
示例6: CrossOperator
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
public CrossOperator(DataSet<I1> input1, DataSet<I2> input2,
CrossFunction<I1, I2, OUT> function,
TypeInformation<OUT> returnType,
CrossHint hint,
String defaultName) {
super(input1, input2, returnType);
this.function = function;
this.defaultName = defaultName;
this.hint = hint;
UdfOperatorUtils.analyzeDualInputUdf(this, CrossFunction.class, defaultName, function, null, null);
}
开发者ID:axbaretto,项目名称:flink,代码行数:14,代码来源:CrossOperator.java
示例7: testCrossLambda
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@Test
public void testCrossLambda() {
CrossFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:11,代码来源:LambdaExtractionTest.java
示例8: CrossOperator
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
public CrossOperator(DataSet<I1> input1, DataSet<I2> input2,
CrossFunction<I1, I2, OUT> function,
TypeInformation<OUT> returnType)
{
super(input1, input2, returnType);
this.function = function;
if (!(function instanceof ProjectCrossFunction)) {
extractSemanticAnnotationsFromUdf(function.getClass());
} else {
generateProjectionProperties(((ProjectCrossFunction<?, ?, ?>) function));
}
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:15,代码来源:CrossOperator.java
示例9: DefaultCross
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
public DefaultCross(DataSet<I1> input1, DataSet<I2> input2) {
super(input1, input2, (CrossFunction<I1, I2, Tuple2<I1, I2>>) new DefaultCrossFunction<I1, I2>(),
new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()));
if (input1 == null || input2 == null) {
throw new NullPointerException();
}
this.input1 = input1;
this.input2 = input2;
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:12,代码来源:CrossOperator.java
示例10: runStreamedOuterFirst
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
private void runStreamedOuterFirst() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(this.taskContext.formatLogString("Running Cross with Nested-Loops: " +
"First input is outer side, second input is inner (spilling) side."));
}
final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0);
final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1);
final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer();
final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer();
final SpillingResettableMutableObjectIterator<T2> spillVals = new SpillingResettableMutableObjectIterator<T2>(
in2, serializer2, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide,
this.taskContext.getOwningNepheleTask());
this.spillIter = spillVals;
T1 val1;
final T1 val1Reuse = serializer1.createInstance();
T1 val1Copy = serializer1.createInstance();
T2 val2;
final T2 val2Reuse = serializer2.createInstance();
final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
// for all blocks
while (this.running && ((val1 = in1.next(val1Reuse)) != null)) {
// for all values from the spilling side
while (this.running && ((val2 = spillVals.next(val2Reuse)) != null)) {
val1Copy = serializer1.copy(val1, val1Copy);
collector.collect(crosser.cross(val1Copy, val2));
//crosser.cross(val1Copy, val2, collector);
}
spillVals.reset();
}
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:38,代码来源:CrossDriver.java
示例11: runStreamedOuterSecond
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
private void runStreamedOuterSecond() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(this.taskContext.formatLogString("Running Cross with Nested-Loops: " +
"First input is inner (spilling) side, second input is outer side."));
}
final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0);
final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1);
final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer();
final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer();
final SpillingResettableMutableObjectIterator<T1> spillVals = new SpillingResettableMutableObjectIterator<T1>(
in1, serializer1, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide,
this.taskContext.getOwningNepheleTask());
this.spillIter = spillVals;
T1 val1;
final T1 val1Reuse = serializer1.createInstance();
T2 val2;
final T2 val2Reuse = serializer2.createInstance();
T2 val2Copy = serializer2.createInstance();
final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
// for all blocks
while (this.running && (val2 = in2.next(val2Reuse)) != null) {
// for all values from the spilling side
while (this.running && (val1 = spillVals.next(val1Reuse)) != null) {
val2Copy = serializer2.copy(val2, val2Copy);
collector.collect(crosser.cross(val1, val2Copy));
//crosser.cross(val1, val2Copy, collector);
}
spillVals.reset();
}
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:37,代码来源:CrossDriver.java
示例12: getFunction
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@Override
protected CrossFunction<I1, I2, OUT> getFunction() {
return function;
}
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:CrossOperator.java
示例13: setup
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@Override
public void setup(TaskContext<CrossFunction<T1, T2, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
开发者ID:axbaretto,项目名称:flink,代码行数:6,代码来源:CrossDriver.java
示例14: getStubType
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@Override
public Class<CrossFunction<T1, T2, OT>> getStubType() {
@SuppressWarnings("unchecked")
final Class<CrossFunction<T1, T2, OT>> clazz = (Class<CrossFunction<T1, T2, OT>>) (Class<?>) CrossFunction.class;
return clazz;
}
开发者ID:axbaretto,项目名称:flink,代码行数:7,代码来源:CrossDriver.java
示例15: getCrossReturnTypes
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
return getBinaryOperatorReturnType((Function) crossInterface, CrossFunction.class, false, false, in1Type, in2Type);
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:5,代码来源:TypeExtractor.java
示例16: setup
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@Override
public void setup(PactTaskContext<CrossFunction<T1, T2, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:6,代码来源:CrossDriver.java
示例17: runBlockedOuterFirst
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
private void runBlockedOuterFirst() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: " +
"First input is outer (blocking) side, second input is inner (spilling) side."));
}
final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0);
final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1);
final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer();
final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer();
final BlockResettableMutableObjectIterator<T1> blockVals =
new BlockResettableMutableObjectIterator<T1>(this.memManager, in1, serializer1, this.memPagesForBlockSide,
this.taskContext.getOwningNepheleTask());
this.blockIter = blockVals;
final SpillingResettableMutableObjectIterator<T2> spillVals = new SpillingResettableMutableObjectIterator<T2>(
in2, serializer2, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide,
this.taskContext.getOwningNepheleTask());
this.spillIter = spillVals;
T1 val1;
final T1 val1Reuse = serializer1.createInstance();
T2 val2;
final T2 val2Reuse = serializer2.createInstance();
T2 val2Copy = serializer2.createInstance();
final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
// for all blocks
do {
// for all values from the spilling side
while (this.running && ((val2 = spillVals.next(val2Reuse)) != null)) {
// for all values in the block
while ((val1 = blockVals.next(val1Reuse)) != null) {
val2Copy = serializer2.copy(val2, val2Copy);
collector.collect(crosser.cross(val1,val2Copy));
//crosser.cross(val1, val2Copy, collector);
}
blockVals.reset();
}
spillVals.reset();
}
while (this.running && blockVals.nextBlock());
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:48,代码来源:CrossDriver.java
示例18: runBlockedOuterSecond
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
private void runBlockedOuterSecond() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: " +
"First input is inner (spilling) side, second input is outer (blocking) side."));
}
final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0);
final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1);
final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer();
final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer();
final SpillingResettableMutableObjectIterator<T1> spillVals = new SpillingResettableMutableObjectIterator<T1>(
in1, serializer1, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide,
this.taskContext.getOwningNepheleTask());
this.spillIter = spillVals;
final BlockResettableMutableObjectIterator<T2> blockVals =
new BlockResettableMutableObjectIterator<T2>(this.memManager, in2, serializer2, this.memPagesForBlockSide,
this.taskContext.getOwningNepheleTask());
this.blockIter = blockVals;
T1 val1;
final T1 val1Reuse = serializer1.createInstance();
T1 val1Copy = serializer1.createInstance();
T2 val2;
final T2 val2Reuse = serializer2.createInstance();
final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
// for all blocks
do {
// for all values from the spilling side
while (this.running && ((val1 = spillVals.next(val1Reuse)) != null)) {
// for all values in the block
while (this.running && ((val2 = blockVals.next(val2Reuse)) != null)) {
val1Copy = serializer1.copy(val1, val1Copy);
collector.collect(crosser.cross(val1Copy, val2));
//crosser.cross(val1Copy, val2, collector);
}
blockVals.reset();
}
spillVals.reset();
}
while (this.running && blockVals.nextBlock());
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:48,代码来源:CrossDriver.java
示例19: with
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
/**
* Finalizes a Cross transformation by applying a {@link CrossFunction} to each pair of crossed elements.
*
* <p>Each CrossFunction call returns exactly one element.
*
* @param function The CrossFunction that is called for each pair of crossed elements.
* @return An CrossOperator that represents the crossed result DataSet
*
* @see CrossFunction
* @see DataSet
*/
public <R> CrossOperator<I1, I2, R> with(CrossFunction<I1, I2, R> function) {
if (function == null) {
throw new NullPointerException("Cross function must not be null.");
}
TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, getInput1().getType(), getInput2().getType(),
super.getDefaultName(), true);
return new CrossOperator<I1, I2, R>(getInput1(), getInput2(), clean(function), returnType,
getCrossHint(), Utils.getCallLocationName());
}
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:CrossOperator.java
示例20: with
import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
/**
* Finalizes a Cross transformation by applying a {@link CrossFunction} to each pair of crossed elements.<br/>
* Each CrossFunction call returns exactly one element.
*
* @param function The CrossFunction that is called for each pair of crossed elements.
* @return An CrossOperator that represents the crossed result DataSet
*
* @see CrossFunction
* @see DataSet
*/
public <R> CrossOperator<I1, I2, R> with(CrossFunction<I1, I2, R> function) {
if (function == null) {
throw new NullPointerException("Cross function must not be null.");
}
TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, input1.getType(), input2.getType());
return new CrossOperator<I1, I2, R>(input1, input2, function, returnType);
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:18,代码来源:CrossOperator.java
注:本文中的org.apache.flink.api.common.functions.CrossFunction类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论