• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java FlatMapOperator类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.api.java.operators.FlatMapOperator的典型用法代码示例。如果您正苦于以下问题:Java FlatMapOperator类的具体用法?Java FlatMapOperator怎么用?Java FlatMapOperator使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



FlatMapOperator类属于org.apache.flink.api.java.operators包,在下文中一共展示了FlatMapOperator类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: pruneOutput

import org.apache.flink.api.java.operators.FlatMapOperator; //导入依赖的package包/类
private <T> void pruneOutput(
    DataSet<WindowedValue<RawUnionValue>> taggedDataSet,
    FlinkBatchTranslationContext context,
    int integerTag,
    PCollection<T> collection) {
  TypeInformation<WindowedValue<T>> outputType = context.getTypeInfo(collection);

  FlinkMultiOutputPruningFunction<T> pruningFunction =
      new FlinkMultiOutputPruningFunction<>(integerTag);

  FlatMapOperator<WindowedValue<RawUnionValue>, WindowedValue<T>> pruningOperator =
      new FlatMapOperator<>(
          taggedDataSet,
          outputType,
          pruningFunction,
          collection.getName());

  context.setOutputDataSet(collection, pruningOperator);
}
 
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:FlinkBatchTransformTranslators.java


示例2: doOperation

import org.apache.flink.api.java.operators.FlatMapOperator; //导入依赖的package包/类
@Override
public Object doOperation(final AddElementsFromFile op, final Context context, final Store store) throws OperationException {
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    if (null != op.getParallelism()) {
        env.setParallelism(op.getParallelism());
    }

    final FlatMapOperator<String, Element> builder =
            env.readTextFile(op.getFilename())
                    .flatMap(new GafferMapFunction(op.getElementGenerator()));

    if (Boolean.parseBoolean(op.getOption(FlinkConstants.SKIP_REBALANCING))) {
        builder.output(new GafferOutput(op, store));
    } else {
        builder.rebalance().output(new GafferOutput(op, store));
    }

    try {
        env.execute(op.getClass().getSimpleName() + "-" + op.getFilename());
    } catch (final Exception e) {
        throw new OperationException("Failed to add elements from file: " + op.getFilename(), e);
    }

    return null;
}
 
开发者ID:gchq,项目名称:Gaffer,代码行数:26,代码来源:AddElementsFromFileHandler.java


示例3: verifySamplerWithFraction

import org.apache.flink.api.java.operators.FlatMapOperator; //导入依赖的package包/类
private void verifySamplerWithFraction(boolean withReplacement, double fraction, long seed) throws Exception {
	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	FlatMapOperator<Tuple3<Integer, Long, String>, String> ds = getSourceDataSet(env);
	MapPartitionOperator<String, String> sampled = DataSetUtils.sample(ds, withReplacement, fraction, seed);
	List<String> result = sampled.collect();
	containsResultAsText(result, getSourceStrings());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:8,代码来源:SampleITCase.java


示例4: verifySamplerWithFixedSize

import org.apache.flink.api.java.operators.FlatMapOperator; //导入依赖的package包/类
private void verifySamplerWithFixedSize(boolean withReplacement, int numSamples, long seed) throws Exception {
	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	FlatMapOperator<Tuple3<Integer, Long, String>, String> ds = getSourceDataSet(env);
	DataSet<String> sampled = DataSetUtils.sampleWithSize(ds, withReplacement, numSamples, seed);
	List<String> result = sampled.collect();
	assertEquals(numSamples, result.size());
	containsResultAsText(result, getSourceStrings());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:9,代码来源:SampleITCase.java


示例5: getSourceDataSet

import org.apache.flink.api.java.operators.FlatMapOperator; //导入依赖的package包/类
private FlatMapOperator<Tuple3<Integer, Long, String>, String> getSourceDataSet(ExecutionEnvironment env) {
	return CollectionDataSets.get3TupleDataSet(env).flatMap(
		new FlatMapFunction<Tuple3<Integer, Long, String>, String>() {
			@Override
			public void flatMap(Tuple3<Integer, Long, String> value, Collector<String> out) throws Exception {
				out.collect(value.f2);
			}
		});
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:SampleITCase.java


示例6: testProgram

import org.apache.flink.api.java.operators.FlatMapOperator; //导入依赖的package包/类
@Override
protected void testProgram() throws Exception {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(1);

	// Sc1 generates M parameters a,b,c for second degree polynomials P(x) = ax^2 + bx + c identified by id
	DataSet<Tuple4<String, Integer, Integer, Integer>> sc1 = env
			.fromElements(new Tuple4<>("1", 61, 6, 29), new Tuple4<>("2", 7, 13, 10), new Tuple4<>("3", 8, 13, 27));

	// Sc2 generates N x values to be evaluated with the polynomial identified by id
	DataSet<Tuple2<String, Integer>> sc2 = env
			.fromElements(new Tuple2<>("1", 5), new Tuple2<>("2", 3), new Tuple2<>("3", 6));

	// Sc3 generates N y values to be evaluated with the polynomial identified by id
	DataSet<Tuple2<String, Integer>> sc3 = env
			.fromElements(new Tuple2<>("1", 2), new Tuple2<>("2", 3), new Tuple2<>("3", 7));

	// Jn1 matches x and y values on id and emits (id, x, y) triples
	JoinOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> jn1 =
			sc2.join(sc3).where(0).equalTo(0).with(new Jn1());

	// Jn2 matches polynomial and arguments by id, computes p = min(P(x),P(y)) and emits (id, p) tuples
	JoinOperator<Tuple3<String, Integer, Integer>, Tuple4<String, Integer, Integer, Integer>, Tuple2<String, Integer>> jn2 =
			jn1.join(sc1).where(0).equalTo(0).with(new Jn2());

	// Mp1 selects (id, x, y) triples where x = y and broadcasts z (=x=y) to Mp2
	FlatMapOperator<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> mp1 =
			jn1.flatMap(new Mp1());

	// Mp2 filters out all p values which can be divided by z
	List<Tuple2<String, Integer>> result = jn2.flatMap(new Mp2()).withBroadcastSet(mp1, "z").collect();

	JavaProgramTestBase.compareResultAsText(result, RESULT);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:35,代码来源:BroadcastBranchingITCase.java


示例7: start

import org.apache.flink.api.java.operators.FlatMapOperator; //导入依赖的package包/类
public static void start( MachineLearningDefinienListConfig config ){
    LOG.info("Start machine learning approach for listing identifier-definien pairs");
    // first, create a flink environment
    ExecutionEnvironment flinkEnv = ExecutionEnvironment.getExecutionEnvironment();
    flinkEnv.setParallelism( config.getParallelism() );

    LOG.debug("Read wikidump via flink");
    DataSource<String> dataSource = FlinkMlpRelationFinder.readWikiDump( config, flinkEnv );

    LOG.debug("Parse documents via flink");
    FlatMapOperator<String, RawWikiDocument> mapOperator = dataSource.flatMap(new TextExtractorMapper());

    LOG.debug("Open text annotator mapper");
    TextAnnotatorMapper annotatorMapper = new TextAnnotatorMapper(config);
    // ML approach doesn't create PosTagger here ... strange, so I will use it now.
    annotatorMapper.open(null);
    DataSet<ParsedWikiDocument> parsedDocuments = mapOperator.map( annotatorMapper );

    LOG.debug("Create feature Extractor without Gouldi");
    CreateCandidatesMapper candidatesMapper = new CreateCandidatesMapper(config);
    DataSet<WikiDocumentOutput> outputDataSet = parsedDocuments.map( candidatesMapper );

    LOG.debug("Map to output format.");
    RelationMapper outputMapper = new RelationMapper();
    DataSet<LinkedList<String[]>> outputs = outputDataSet.map(outputMapper);

    Path outputPath = Paths.get(config.getOutputDir(), OUTPUT_FILE_NAME);
    LOG.info("Write output file " + outputPath.toString() );
    outputs.writeAsFormattedText(
        outputPath.toString(),
        FileSystem.WriteMode.OVERWRITE,
        new OutputFormatter()
    ).setParallelism(1);

    try {
        flinkEnv.execute();
    } catch (Exception e) {
        LOG.error("Error due execution of flink process.", e);
    }
}
 
开发者ID:ag-gipp,项目名称:mathosphere,代码行数:41,代码来源:MachineLearningRelationExtractor.java


示例8: aggregateSnippets

import org.apache.flink.api.java.operators.FlatMapOperator; //导入依赖的package包/类
private static DataSet<ExtractedMathPDDocument> aggregateSnippets(FlatMapOperator<String, ExtractedMathPDDocument> extractedMathPdSnippets) {
    if (NUMBER_OF_PARTITIONS >= 0) {
        return aggregateSnippetsToPartitions(extractedMathPdSnippets);
    } else if (NUMBER_OF_PARTITIONS == -1) {
        return aggregateSnippetsToSingleDocs(extractedMathPdSnippets);
    } else {
        throw new RuntimeException("illegal state: NUMBER_OF_PARTITIONS");
    }
}
 
开发者ID:ag-gipp,项目名称:mathosphere,代码行数:10,代码来源:FlinkPd.java


示例9: aggregateSnippetsToSingleDocs

import org.apache.flink.api.java.operators.FlatMapOperator; //导入依赖的package包/类
/**
 * This function takes math pd snippets and converts them to single documents (by merging all snippets belonging to the same document)
 *
 * @param extractedMathPdSnippets
 * @return
 */
private static DataSet<ExtractedMathPDDocument> aggregateSnippetsToSingleDocs(FlatMapOperator<String, ExtractedMathPDDocument> extractedMathPdSnippets) {
    DataSet<ExtractedMathPDDocument> ds = extractedMathPdSnippets;
    ReduceOperator<ExtractedMathPDDocument> extractedMathPdDocuments = ds
            .groupBy(new SelectTitle())
            .reduce((ReduceFunction<ExtractedMathPDDocument>) (t0, t1) -> {
                t1.mergeOtherIntoThis(t0);
                t1.setText("removed");
                //LOGGER.info("merged {} into {}", new Object[]{t1.f0, t0.f0});
                return t1;
            });

    return extractedMathPdDocuments;
}
 
开发者ID:ag-gipp,项目名称:mathosphere,代码行数:20,代码来源:FlinkPd.java


示例10: flatMap

import org.apache.flink.api.java.operators.FlatMapOperator; //导入依赖的package包/类
/**
 * Applies a FlatMap transformation on a {@link DataSet}.
 *
 * <p>The transformation calls a {@link org.apache.flink.api.common.functions.RichFlatMapFunction} for each element of the DataSet.
 * Each FlatMapFunction call can return any number of elements including none.
 *
 * @param flatMapper The FlatMapFunction that is called for each element of the DataSet.
 * @return A FlatMapOperator that represents the transformed DataSet.
 *
 * @see org.apache.flink.api.common.functions.RichFlatMapFunction
 * @see FlatMapOperator
 * @see DataSet
 */
public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
	if (flatMapper == null) {
		throw new NullPointerException("FlatMap function must not be null.");
	}

	String callLocation = Utils.getCallLocationName();
	TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true);
	return new FlatMapOperator<>(this, resultType, clean(flatMapper), callLocation);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:DataSet.java


示例11: flatMap

import org.apache.flink.api.java.operators.FlatMapOperator; //导入依赖的package包/类
/**
 * Applies a FlatMap transformation on a {@link DataSet}.<br/>
 * The transformation calls a {@link org.apache.flink.api.common.functions.RichFlatMapFunction} for each element of the DataSet.
 * Each FlatMapFunction call can return any number of elements including none.
 * 
 * @param flatMapper The FlatMapFunction that is called for each element of the DataSet. 
 * @return A FlatMapOperator that represents the transformed DataSet.
 * 
 * @see org.apache.flink.api.common.functions.RichFlatMapFunction
 * @see FlatMapOperator
 * @see DataSet
 */
public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
	if (flatMapper == null) {
		throw new NullPointerException("FlatMap function must not be null.");
	}

	TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, this.getType());
	return new FlatMapOperator<T, R>(this, resultType, flatMapper);
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:21,代码来源:DataSet.java



注:本文中的org.apache.flink.api.java.operators.FlatMapOperator类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java AccessWatchpointEvent类代码示例发布时间:2022-05-23
下一篇:
Java KeyFingerPrintCalculator类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap