• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java CrossFunction类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.api.common.functions.CrossFunction的典型用法代码示例。如果您正苦于以下问题:Java CrossFunction类的具体用法?Java CrossFunction怎么用?Java CrossFunction使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



CrossFunction类属于org.apache.flink.api.common.functions包,在下文中一共展示了CrossFunction类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: translateToDataFlow

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@Override
protected CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {

	String name = getName() != null ? getName() : "Cross at " + defaultName;
	// create operator
	CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>> po =
			new CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>>(function,
					new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), getResultType()),
					name);

	po.setFirstInput(input1);
	po.setSecondInput(input2);
	po.setParallelism(getParallelism());
	po.setCrossHint(hint);

	return po;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:CrossOperator.java


示例2: getCrossReturnTypes

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@PublicEvolving
public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
		TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
	return getBinaryOperatorReturnType(
		(Function) crossInterface,
		CrossFunction.class,
		0,
		1,
		2,
		new int[]{0},
		new int[]{1},
		NO_INDEX,
		in1Type,
		in2Type,
		functionName,
		allowMissing);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:TypeExtractor.java


示例3: executeOnCollections

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@Override
protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
	CrossFunction<IN1, IN2, OUT> function = this.userFunction.getUserCodeObject();
	
	FunctionUtils.setFunctionRuntimeContext(function, ctx);
	FunctionUtils.openFunction(function, this.parameters);

	ArrayList<OUT> result = new ArrayList<OUT>(inputData1.size() * inputData2.size());
	
	TypeSerializer<IN1> inSerializer1 = getOperatorInfo().getFirstInputType().createSerializer(executionConfig);
	TypeSerializer<IN2> inSerializer2 = getOperatorInfo().getSecondInputType().createSerializer(executionConfig);
	TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);

	for (IN1 element1 : inputData1) {
		for (IN2 element2 : inputData2) {
			IN1 copy1 = inSerializer1.copy(element1);
			IN2 copy2 = inSerializer2.copy(element2);
			OUT o = function.cross(copy1, copy2);
			result.add(outSerializer.copy(o));
		}
	}

	FunctionUtils.closeFunction(function);
	return result;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:CrossOperatorBase.java


示例4: translateToDataFlow

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@Override
protected org.apache.flink.api.common.operators.base.CrossOperatorBase<I1, I2, OUT, CrossFunction<I1,I2,OUT>> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
	
	String name = getName() != null ? getName() : function.getClass().getName();
	// create operator
	CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>> po =
			new CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>>(function, new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), getResultType()), name);
	// set inputs
	po.setFirstInput(input1);
	po.setSecondInput(input2);

	// set dop
	po.setDegreeOfParallelism(this.getParallelism());

	return po;
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:17,代码来源:CrossOperator.java


示例5: testCrossLambda

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@Test
public void testCrossLambda() {
	CrossFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;

	TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
	if (!(ti instanceof MissingTypeInfo)) {
		Assert.assertTrue(ti.isTupleType());
		Assert.assertEquals(2, ti.getArity());
		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:13,代码来源:LambdaExtractionTest.java


示例6: CrossOperator

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
public CrossOperator(DataSet<I1> input1, DataSet<I2> input2,
						CrossFunction<I1, I2, OUT> function,
						TypeInformation<OUT> returnType,
						CrossHint hint,
						String defaultName) {
	super(input1, input2, returnType);

	this.function = function;
	this.defaultName = defaultName;
	this.hint = hint;

	UdfOperatorUtils.analyzeDualInputUdf(this, CrossFunction.class, defaultName, function, null, null);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:14,代码来源:CrossOperator.java


示例7: testCrossLambda

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@Test
public void testCrossLambda() {
	CrossFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
	
	TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
	Assert.assertTrue(ti.isTupleType());
	Assert.assertEquals(2, ti.getArity());
	Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
	Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:11,代码来源:LambdaExtractionTest.java


示例8: CrossOperator

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
public CrossOperator(DataSet<I1> input1, DataSet<I2> input2,
						CrossFunction<I1, I2, OUT> function,
						TypeInformation<OUT> returnType)
{
	super(input1, input2, returnType);

	this.function = function;

	if (!(function instanceof ProjectCrossFunction)) {
		extractSemanticAnnotationsFromUdf(function.getClass());
	} else {
		generateProjectionProperties(((ProjectCrossFunction<?, ?, ?>) function));
	}
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:15,代码来源:CrossOperator.java


示例9: DefaultCross

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
public DefaultCross(DataSet<I1> input1, DataSet<I2> input2) {
	super(input1, input2, (CrossFunction<I1, I2, Tuple2<I1, I2>>) new DefaultCrossFunction<I1, I2>(),
			new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()));

	if (input1 == null || input2 == null) {
		throw new NullPointerException();
	}

	this.input1 = input1;
	this.input2 = input2;
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:12,代码来源:CrossOperator.java


示例10: runStreamedOuterFirst

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
private void runStreamedOuterFirst() throws Exception {
	if (LOG.isDebugEnabled())  {
		LOG.debug(this.taskContext.formatLogString("Running Cross with Nested-Loops: " +
				"First input is outer side, second input is inner (spilling) side."));
	}
	
	final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0);
	final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1);
	
	final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer();
	final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer();
	
	final SpillingResettableMutableObjectIterator<T2> spillVals = new SpillingResettableMutableObjectIterator<T2>(
			in2, serializer2, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide,
			this.taskContext.getOwningNepheleTask());
	this.spillIter = spillVals;
	
	T1 val1;
	final T1 val1Reuse = serializer1.createInstance();
	T1 val1Copy = serializer1.createInstance();
	T2 val2;
	final T2 val2Reuse = serializer2.createInstance();

	final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
	final Collector<OT> collector = this.taskContext.getOutputCollector();
	
	// for all blocks
	while (this.running && ((val1 = in1.next(val1Reuse)) != null)) {
		// for all values from the spilling side
		while (this.running && ((val2 = spillVals.next(val2Reuse)) != null)) {
			val1Copy = serializer1.copy(val1, val1Copy);
			collector.collect(crosser.cross(val1Copy, val2));
			//crosser.cross(val1Copy, val2, collector);
		}
		spillVals.reset();
	}
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:38,代码来源:CrossDriver.java


示例11: runStreamedOuterSecond

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
private void runStreamedOuterSecond() throws Exception {
	if (LOG.isDebugEnabled())  {
		LOG.debug(this.taskContext.formatLogString("Running Cross with Nested-Loops: " +
				"First input is inner (spilling) side, second input is outer side."));
	}
	final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0);
	final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1);
	
	final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer();
	final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer();
	
	final SpillingResettableMutableObjectIterator<T1> spillVals = new SpillingResettableMutableObjectIterator<T1>(
			in1, serializer1, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide,
			this.taskContext.getOwningNepheleTask());
	this.spillIter = spillVals;
	
	T1 val1;
	final T1 val1Reuse = serializer1.createInstance();
	T2 val2;
	final T2 val2Reuse = serializer2.createInstance();
	T2 val2Copy = serializer2.createInstance();
	
	final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
	final Collector<OT> collector = this.taskContext.getOutputCollector();
	
	// for all blocks
	while (this.running && (val2 = in2.next(val2Reuse)) != null) {
		// for all values from the spilling side
		while (this.running && (val1 = spillVals.next(val1Reuse)) != null) {
			val2Copy = serializer2.copy(val2, val2Copy);
			collector.collect(crosser.cross(val1, val2Copy));
			//crosser.cross(val1, val2Copy, collector);
		}
		spillVals.reset();
	}
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:37,代码来源:CrossDriver.java


示例12: getFunction

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@Override
protected CrossFunction<I1, I2, OUT> getFunction() {
	return function;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:CrossOperator.java


示例13: setup

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@Override
public void setup(TaskContext<CrossFunction<T1, T2, OT>, OT> context) {
	this.taskContext = context;
	this.running = true;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:6,代码来源:CrossDriver.java


示例14: getStubType

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@Override
public Class<CrossFunction<T1, T2, OT>> getStubType() {
	@SuppressWarnings("unchecked")
	final Class<CrossFunction<T1, T2, OT>> clazz = (Class<CrossFunction<T1, T2, OT>>) (Class<?>) CrossFunction.class;
	return clazz;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:7,代码来源:CrossDriver.java


示例15: getCrossReturnTypes

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
		TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
	return getBinaryOperatorReturnType((Function) crossInterface, CrossFunction.class, false, false, in1Type, in2Type);
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:5,代码来源:TypeExtractor.java


示例16: setup

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
@Override
public void setup(PactTaskContext<CrossFunction<T1, T2, OT>, OT> context) {
	this.taskContext = context;
	this.running = true;
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:6,代码来源:CrossDriver.java


示例17: runBlockedOuterFirst

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
private void runBlockedOuterFirst() throws Exception {
	if (LOG.isDebugEnabled())  {
		LOG.debug(this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: " +
				"First input is outer (blocking) side, second input is inner (spilling) side."));
	}
		
	final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0);
	final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1);
	
	final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer();
	final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer();
	
	final BlockResettableMutableObjectIterator<T1> blockVals = 
			new BlockResettableMutableObjectIterator<T1>(this.memManager, in1, serializer1, this.memPagesForBlockSide,
						this.taskContext.getOwningNepheleTask());
	this.blockIter = blockVals;
	
	final SpillingResettableMutableObjectIterator<T2> spillVals = new SpillingResettableMutableObjectIterator<T2>(
			in2, serializer2, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide,
			this.taskContext.getOwningNepheleTask());
	this.spillIter = spillVals;
	
	T1 val1;
	final T1 val1Reuse = serializer1.createInstance();
	T2 val2;
	final T2 val2Reuse = serializer2.createInstance();
	T2 val2Copy = serializer2.createInstance();
	
	final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
	final Collector<OT> collector = this.taskContext.getOutputCollector();
	
	// for all blocks
	do {
		// for all values from the spilling side
		while (this.running && ((val2 = spillVals.next(val2Reuse)) != null)) {
			// for all values in the block
			while ((val1 = blockVals.next(val1Reuse)) != null) {
				val2Copy = serializer2.copy(val2, val2Copy);
				collector.collect(crosser.cross(val1,val2Copy));
				//crosser.cross(val1, val2Copy, collector);
			}
			blockVals.reset();
		}
		spillVals.reset();
	}
	while (this.running && blockVals.nextBlock());
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:48,代码来源:CrossDriver.java


示例18: runBlockedOuterSecond

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
private void runBlockedOuterSecond() throws Exception {
	if (LOG.isDebugEnabled())  {
		LOG.debug(this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: " +
				"First input is inner (spilling) side, second input is outer (blocking) side."));
	}
	
	final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0);
	final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1);
	
	final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer();
	final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer();
	
	final SpillingResettableMutableObjectIterator<T1> spillVals = new SpillingResettableMutableObjectIterator<T1>(
			in1, serializer1, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide,
			this.taskContext.getOwningNepheleTask());
	this.spillIter = spillVals;
	
	final BlockResettableMutableObjectIterator<T2> blockVals = 
			new BlockResettableMutableObjectIterator<T2>(this.memManager, in2, serializer2, this.memPagesForBlockSide,
					this.taskContext.getOwningNepheleTask());
	this.blockIter = blockVals;
	
	T1 val1;
	final T1 val1Reuse = serializer1.createInstance();
	T1 val1Copy = serializer1.createInstance();
	T2 val2;
	final T2 val2Reuse = serializer2.createInstance();

	final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
	final Collector<OT> collector = this.taskContext.getOutputCollector();
	
	// for all blocks
	do {
		// for all values from the spilling side
		while (this.running && ((val1 = spillVals.next(val1Reuse)) != null)) {
			// for all values in the block
			while (this.running && ((val2 = blockVals.next(val2Reuse)) != null)) {
				val1Copy = serializer1.copy(val1, val1Copy);
				collector.collect(crosser.cross(val1Copy, val2));
				//crosser.cross(val1Copy, val2, collector);
			}
			blockVals.reset();
		}
		spillVals.reset();
	}
	while (this.running && blockVals.nextBlock());
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:48,代码来源:CrossDriver.java


示例19: with

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
/**
 * Finalizes a Cross transformation by applying a {@link CrossFunction} to each pair of crossed elements.
 *
 * <p>Each CrossFunction call returns exactly one element.
 *
 * @param function The CrossFunction that is called for each pair of crossed elements.
 * @return An CrossOperator that represents the crossed result DataSet
 *
 * @see CrossFunction
 * @see DataSet
 */
public <R> CrossOperator<I1, I2, R> with(CrossFunction<I1, I2, R> function) {
	if (function == null) {
		throw new NullPointerException("Cross function must not be null.");
	}
	TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, getInput1().getType(), getInput2().getType(),
			super.getDefaultName(), true);
	return new CrossOperator<I1, I2, R>(getInput1(), getInput2(), clean(function), returnType,
			getCrossHint(), Utils.getCallLocationName());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:CrossOperator.java


示例20: with

import org.apache.flink.api.common.functions.CrossFunction; //导入依赖的package包/类
/**
 * Finalizes a Cross transformation by applying a {@link CrossFunction} to each pair of crossed elements.<br/>
 * Each CrossFunction call returns exactly one element. 
 * 
 * @param function The CrossFunction that is called for each pair of crossed elements.
 * @return An CrossOperator that represents the crossed result DataSet
 * 
 * @see CrossFunction
 * @see DataSet
 */
public <R> CrossOperator<I1, I2, R> with(CrossFunction<I1, I2, R> function) {
	if (function == null) {
		throw new NullPointerException("Cross function must not be null.");
	}
	TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, input1.getType(), input2.getType());
	return new CrossOperator<I1, I2, R>(input1, input2, function, returnType);
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:18,代码来源:CrossOperator.java



注:本文中的org.apache.flink.api.common.functions.CrossFunction类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java FlipTable类代码示例发布时间:2022-05-23
下一篇:
Java AnyNameExceptNameClass类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap