本文整理汇总了Java中org.apache.flink.api.common.functions.RichFilterFunction类的典型用法代码示例。如果您正苦于以下问题:Java RichFilterFunction类的具体用法?Java RichFilterFunction怎么用?Java RichFilterFunction使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
RichFilterFunction类属于org.apache.flink.api.common.functions包,在下文中一共展示了RichFilterFunction类的3个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testDistributedCacheWithIterations
import org.apache.flink.api.common.functions.RichFilterFunction; //导入依赖的package包/类
@Test
public void testDistributedCacheWithIterations() throws Exception{
File tempFile = new File(testPath);
try (FileWriter writer = new FileWriter(tempFile)) {
writer.write(testString);
}
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerCachedFile(resultPath, testName);
IterativeDataSet<Long> solution = env.fromElements(1L).iterate(2);
solution.closeWith(env.generateSequence(1, 2).filter(new RichFilterFunction<Long>() {
@Override
public void open(Configuration parameters) throws Exception{
File file = getRuntimeContext().getDistributedCache().getFile(testName);
BufferedReader reader = new BufferedReader(new FileReader(file));
String output = reader.readLine();
reader.close();
assertEquals(output, testString);
}
@Override
public boolean filter(Long value) throws Exception {
return false;
}
}).withBroadcastSet(solution, "SOLUTION")).output(new DiscardingOutputFormat<Long>());
env.execute();
expected = testString; // this will be a useless verification now.
}
开发者ID:axbaretto,项目名称:flink,代码行数:30,代码来源:AggregatorsITCase.java
示例2: main
import org.apache.flink.api.common.functions.RichFilterFunction; //导入依赖的package包/类
public static void main(final String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
String in = args[0];
String out = args[1];
System.err.println("Using input=" + in);
System.err.println("Using output=" + out);
String patterns[] = new String[args.length - 2];
System.arraycopy(args, 2, patterns, 0, args.length - 2);
System.err.println("Using patterns: " + Arrays.toString(patterns));
// get input data
DataSet<StringValue> text = env.createInput(new TextValueInputFormat(new Path(in)));
for (int p = 0; p < patterns.length; p++) {
final String pattern = patterns[p];
DataSet<StringValue> res = text.filter(new RichFilterFunction<StringValue>() {
Pattern p = Pattern.compile(pattern);
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public boolean filter(StringValue valueIn) throws Exception {
final String value = valueIn.getValue();
if (value == null || value.length() == 0) {
return false;
}
final Matcher m = p.matcher(value);
if (m.find()) {
return true;
}
return false;
}
}).name("grep for " + pattern);
res.writeAsText(out + "_" + pattern, FileSystem.WriteMode.OVERWRITE);
}
// execute program
JobExecutionResult jobResult = env.execute("Flink Grep benchmark");
System.err.println(AccumulatorHelper.getResultsFormated(jobResult.getAllAccumulatorResults()));
}
开发者ID:project-flink,项目名称:flink-perf,代码行数:46,代码来源:GrepJobOptimized.java
示例3: main
import org.apache.flink.api.common.functions.RichFilterFunction; //导入依赖的package包/类
public static void main(final String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
String in = args[0];
String out = args[1];
System.err.println("Using input=" + in);
System.err.println("Using output=" + out);
String patterns[] = new String[args.length - 2];
System.arraycopy(args, 2, patterns, 0, args.length - 2);
System.err.println("Using patterns: " + Arrays.toString(patterns));
// get input data
DataSet<String> text = env.readTextFile(args[0]);
for (int p = 0; p < patterns.length; p++) {
final String pattern = patterns[p];
DataSet<String> res = text.filter(new RichFilterFunction<String>() {
private static final long serialVersionUID = 1L;
Pattern p = Pattern.compile(pattern);
LongCounter filterMatches = new LongCounter();
LongCounter filterRecords = new LongCounter();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
getRuntimeContext().addAccumulator("filterMatchCount-" + pattern, filterMatches);
getRuntimeContext().addAccumulator("filterRecordCount-" + pattern, filterRecords);
}
@Override
public boolean filter(String value) throws Exception {
filterRecords.add(1L);
if (value == null || value.length() == 0) {
return false;
}
final Matcher m = p.matcher(value);
if (m.find()) {
filterMatches.add(1L);
return true;
}
return false;
}
}).name("grep for " + pattern);
res.writeAsText(out + "_" + pattern, FileSystem.WriteMode.OVERWRITE);
}
// execute program
JobExecutionResult jobResult = env.execute("Flink Grep benchmark");
System.err.println(AccumulatorHelper.getResultsFormated(jobResult.getAllAccumulatorResults()));
}
开发者ID:project-flink,项目名称:flink-perf,代码行数:52,代码来源:GrepJob.java
注:本文中的org.apache.flink.api.common.functions.RichFilterFunction类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论