本文整理汇总了Java中org.apache.flink.api.java.tuple.Tuple4类的典型用法代码示例。如果您正苦于以下问题:Java Tuple4类的具体用法?Java Tuple4怎么用?Java Tuple4使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Tuple4类属于org.apache.flink.api.java.tuple包,在下文中一共展示了Tuple4类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: transformation
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
/**
* Data transformation.
* The method group by trackId, sum the number of occurrences, sort the output
* and get the top elements defined by the user.
* @param input
* @return
*/
@Override
public DataSet<ChartsResult> transformation(DataSet<?> input) {
final int limit= pipelineConf.getArgs().getLimit();
log.info("Transformation Phase. Computing the tags");
SortPartitionOperator<Tuple4<Long, Integer, String, TagEvent>> grouped = (SortPartitionOperator<Tuple4<Long, Integer, String, TagEvent>>) input
.groupBy(2, 0) // Grouping by state & trackId
.sum(1) // Sum the occurrences of each grouped item
.sortPartition(2, Order.ASCENDING).setParallelism(1) // Sort by state
.sortPartition(1, Order.DESCENDING).setParallelism(1);// Sort by count
return grouped.reduceGroup(new ReduceLimit(limit, 2)); // Reducing groups applying the limit specified by user
}
开发者ID:aaitor,项目名称:flink-charts,代码行数:20,代码来源:StateChartsPipeline.java
示例2: reduce
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@Override
public void reduce(Iterable<Tuple4<Long, Integer, String, TagEvent>> values, Collector<ChartsResult> out) throws Exception {
int counter = 0;
String group= "";
log.info("Reducing Groups and applyting limit(" + limit + ") by field " + groupPosition);
for (Tuple4<Long, Integer, String, TagEvent> t : values) {
if (!t.getField(groupPosition).equals(group)) {
counter= 0;
group= t.f2;
}
if (counter < limit) {
out.collect(new ChartsResult(t.f0, t.f1, t.f3));
}
counter++;
}
}
开发者ID:aaitor,项目名称:flink-charts,代码行数:18,代码来源:StateChartsPipeline.java
示例3: cleansingTest
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
/**
* Test to validate the cleansing method.
* We generate a DataSet with 10 TagEvents and modify 3 items to force bad data
* The assertion checks that only are obtained the proper number of items after the
* cleansing process.
* @throws Exception
*/
@Test
public void cleansingTest() throws Exception {
String args[]= {"-c", "state_chart", "-l", "3"};
argsParser= ArgsParser.builder(args);
PipelineChartsConf pipelineConf= new PipelineChartsConf(config, argsParser);
StateChartsPipeline pipeline= new StateChartsPipeline(pipelineConf);
List<TagEvent> mockCollection= TagEventUtils.getMockData(10);
mockCollection.set(0, new TagEvent(0l, "xxx", "yy","ZZ", "Locality", "United States"));
mockCollection.set(2, new TagEvent(0l, "xxx", "yy","ZZ", "Locality", "UK"));
mockCollection.set(4, new TagEvent(99l, "xxx", "yy","", "", ""));
DataSet<TagEvent> mockDataset= pipeline.getEnv().fromCollection(mockCollection);
DataSet<Tuple4<Long, Integer, String, TagEvent>> clean = pipeline.cleansing(mockDataset);
assertEquals(7, clean.count());
}
开发者ID:aaitor,项目名称:flink-charts,代码行数:27,代码来源:StateChartsPipelineTest.java
示例4: invoke
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@Override
public void invoke(Tuple4<String, Integer, String, Integer> inTuple4) throws InterruptedException {
try {
if (inTuple4 != null) {
if (inTuple4.f2 != null && !"".equals(inTuple4.f2.trim())) {
JSONObject data = JSONObject.parseObject(inTuple4.f2);
JSONObject filter = data.getJSONObject("filter");
JSONObject doc = data.getJSONObject("data");
if (data.getString("dbName") != null) {
mongoManager.executeUpsert(data.getString("dbName"), data.getString("collectionName"), filter, doc);
}
}
}
} catch (Exception e) {
logger.error("Mongo入库发生捕获的异常!{}:{}", e.getClass(), e.getMessage());
for (StackTraceElement element : e.getStackTrace()) {
logger.error("\tat {}", element.toString());
}
}
}
开发者ID:breakEval13,项目名称:rocketmq-flink-plugin,代码行数:21,代码来源:MongoSink.java
示例5: testAsFromTupleToPojo
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@Test
public void testAsFromTupleToPojo() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
List<Tuple4<String, Integer, Double, String>> data = new ArrayList<>();
data.add(new Tuple4<>("Rofl", 1, 1.0, "Hi"));
data.add(new Tuple4<>("lol", 2, 1.0, "Hi"));
data.add(new Tuple4<>("Test me", 4, 3.33, "Hello world"));
Table table = tableEnv
.fromDataSet(env.fromCollection(data), "a, b, c, d")
.select("a, b, c, d");
DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
List<SmallPojo2> results = ds.collect();
String expected = "Rofl,1,1.0,Hi\n" + "lol,2,1.0,Hi\n" + "Test me,4,3.33,Hello world\n";
compareResultAsText(results, expected);
}
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:TableEnvironmentITCase.java
示例6: apply
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
public void apply(
Tuple key,
TimeWindow window,
Iterable<Tuple2<Integer, Boolean>> gridCells,
Collector<Tuple4<Integer, Long, Boolean, Integer>> out) throws Exception {
int cellId = ((Tuple2<Integer, Boolean>)key).f0;
boolean isStart = ((Tuple2<Integer, Boolean>)key).f1;
long windowTime = window.getEnd();
int cnt = 0;
for(Tuple2<Integer, Boolean> c : gridCells) {
cnt += 1;
}
out.collect(new Tuple4<>(cellId, windowTime, isStart, cnt));
}
开发者ID:flink-taiwan,项目名称:jcconf2016-workshop,代码行数:20,代码来源:TaxiRideWithKafkaAnswer.java
示例7: testUnboundedPojoSourceAndReturnTuple
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@Test
public void testUnboundedPojoSourceAndReturnTuple() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input = env.addSource(new RandomEventSource(5));
DataStream<Tuple4<Long, Integer, String, Double>> output = SiddhiCEP
.define("inputStream", input, "id", "name", "price", "timestamp")
.cql("from inputStream select timestamp, id, name, price insert into outputStream")
.returns("outputStream");
DataStream<Integer> following = output.map(new MapFunction<Tuple4<Long, Integer, String, Double>, Integer>() {
@Override
public Integer map(Tuple4<Long, Integer, String, Double> value) throws Exception {
return value.f1;
}
});
String resultPath = tempFolder.newFile().toURI().toString();
following.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
assertEquals(5, getLineCount(resultPath));
}
开发者ID:haoch,项目名称:flink-siddhi,代码行数:22,代码来源:SiddhiCEPITCase.java
示例8: testTypeInfoParser
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@Test
public void testTypeInfoParser() {
TypeInformation<Tuple3<String, Long, Object>> type1 = TypeInfoParser.parse("Tuple3<String,Long,java.lang.Object>");
Assert.assertNotNull(type1);
TypeInformation<Tuple4<String, Long, Object, InnerPojo>> type2 = TypeInfoParser.parse("Tuple4<" + String.class.getName() + ", " + Long.class.getName() + ", " + java.lang.Object.class.getName() + "," + InnerPojo.class.getName() + ">");
Assert.assertNotNull(type2);
}
开发者ID:haoch,项目名称:flink-siddhi,代码行数:8,代码来源:SiddhiTypeFactoryTest.java
示例9: apply
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
public void apply(
Tuple key,
TimeWindow window,
Iterable<Tuple2<Integer, Boolean>> values,
Collector<Tuple4<Integer, Long, Boolean, Integer>> out) throws Exception {
int cellId = ((Tuple2<Integer, Boolean>) key).f0;
boolean isStart = ((Tuple2<Integer, Boolean>) key).f1;
long windowTime = window.getEnd();
int cnt = 0;
for (Tuple2<Integer, Boolean> v : values) {
cnt += 1;
}
out.collect(new Tuple4<>(cellId, windowTime, isStart, cnt));
}
开发者ID:thr0n,项目名称:clojured-taxi-rides,代码行数:20,代码来源:PopularPlaces.java
示例10: calculateAnomaly
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@Override
public AnomalyResult calculateAnomaly(Tuple4<Double,Double,Long,Long> v, double threshold) {
NormalHValue w =new NormalHValue();
w.f0 = 1d;
w.f1 = v.f1;
w.f2 = v.f0 * v.f1;
w.f3 = v.f0 * v.f0 * v.f1 ;
NormalHValue h = (NormalHValue) hist.getHistory();
if (h == null) {
return new AnomalyResult(-1,v.f2,v.f3,threshold,w,h);
}
double mean = h.f2/h.f1;
double scale = h.f3-h.f2*h.f2/h.f1;
double cc = h.f0 * 0.5;
return new AnomalyResult(calculateAnomaly(v.f0, cc, mean , (h.f1+v.f1)/(h.f1*v.f1)*scale),v.f2,v.f3,threshold,w,h);
}
开发者ID:sics-dna,项目名称:isc4flink,代码行数:18,代码来源:NormalModel.java
示例11: getAnomalySteam
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
public DataStream<Tuple3<K, AnomalyResult, RV>> getAnomalySteam(DataStream<V> ds, KeySelector<V, K> keySelector , KeySelector<V,Double> valueSelector, PayloadFold<V, RV> valueFold, Time window) {
KeyedStream<V, K> keyedInput = ds
.keyBy(keySelector);
TypeInformation<Object> foldResultType = TypeExtractor.getUnaryOperatorReturnType(valueFold,
PayloadFold.class,
false,
false,
ds.getType(),
"PayloadFold",
false);
TypeInformation<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>> resultType = (TypeInformation) new TupleTypeInfo<>(Tuple3.class,
new TypeInformation[] {keyedInput.getKeyType(), new TupleTypeInfo(Tuple4.class,
BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO), foldResultType});
Tuple3<K,Tuple4<Double,Double,Long,Long>, RV> init= new Tuple3<>(null,new Tuple4<>(0d,0d,0l,0l), valueFold.getInit());
KeyedStream<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>, Tuple> kPreStream = keyedInput
.timeWindow(window)
.apply(init, new ExtCountWindFold<>(keySelector,valueFold, window, resultType),new ExtWindowTimeExtractor(resultType))
.keyBy(0);
return kPreStream.flatMap(afm);
}
开发者ID:sics-dna,项目名称:isc4flink,代码行数:26,代码来源:ExtNormalFreqAnomaly.java
示例12: testRightSideEmpty
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@SuppressWarnings("unchecked")
protected void testRightSideEmpty() throws Exception {
CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of(
new Tuple2<>("Jack", "Engineering"),
new Tuple2<>("Tim", "Sales"),
new Tuple2<>("Zed", "HR")
);
CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of();
List<Tuple4<String, String, String, Object>> actualLeft = computeOuterJoin(input1, input2, OuterJoinType.LEFT);
List<Tuple4<String, String, String, Object>> actualRight = computeOuterJoin(input1, input2, OuterJoinType.RIGHT);
List<Tuple4<String, String, String, Object>> actualFull = computeOuterJoin(input1, input2, OuterJoinType.FULL);
List<Tuple4<String, String, String, Object>> expected = Arrays.asList(
new Tuple4<String, String, String, Object>("Jack", "Engineering", null, null),
new Tuple4<String, String, String, Object>("Tim", "Sales", null, null),
new Tuple4<String, String, String, Object>("Zed", "HR", null, null)
);
Assert.assertEquals(expected, actualLeft);
Assert.assertEquals(expected, actualFull);
Assert.assertEquals(Collections.<Tuple4<String,String,String,Object>>emptyList(), actualRight);
}
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:AbstractSortMergeOuterJoinIteratorITCase.java
示例13: getAnomalySteam
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
public DataStream<Tuple2<K, AnomalyResult>> getAnomalySteam(DataStream<V> ds, KeySelector<V, K> keySelector , KeySelector<V,Double> valueSelector, Time window) {
KeyedStream<V, K> keyedInput = ds
.keyBy(keySelector);
TypeInformation<Tuple2<K,Tuple4<Double,Double,Long,Long>>> resultType = (TypeInformation) new TupleTypeInfo<>(Tuple2.class,
new TypeInformation[] {keyedInput.getKeyType(), new TupleTypeInfo(Tuple4.class,
BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)});
Tuple2<K,Tuple4<Double,Double,Long,Long>> init= new Tuple2<>(null,new Tuple4<>(0d,0d,0l,0l));
KeyedStream<Tuple2<K,Tuple4<Double,Double,Long,Long>>, Tuple> kPreStream = keyedInput
.timeWindow(window)
.apply(init, new CountSumFold<>(keySelector, valueSelector, resultType), new WindowTimeExtractor(resultType))
.keyBy(0);
return kPreStream.flatMap(afm);
}
开发者ID:sics-dna,项目名称:isc4flink,代码行数:18,代码来源:NormalValueAnomaly.java
示例14: testLeftSideEmpty
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@SuppressWarnings("unchecked")
protected void testLeftSideEmpty() throws Exception {
CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of();
CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of(
new Tuple2<>("Allison", 100),
new Tuple2<>("Jack", 200),
new Tuple2<>("Zed", 150),
new Tuple2<>("Zed", 250)
);
List<Tuple4<String, String, String, Object>> actualLeft = computeOuterJoin(input1, input2, OuterJoinType.LEFT);
List<Tuple4<String, String, String, Object>> actualRight = computeOuterJoin(input1, input2, OuterJoinType.RIGHT);
List<Tuple4<String, String, String, Object>> actualFull = computeOuterJoin(input1, input2, OuterJoinType.FULL);
List<Tuple4<String, String, String, Object>> expected = Arrays.asList(
new Tuple4<String, String, String, Object>(null, null, "Allison", 100),
new Tuple4<String, String, String, Object>(null, null, "Jack", 200),
new Tuple4<String, String, String, Object>(null, null, "Zed", 150),
new Tuple4<String, String, String, Object>(null, null, "Zed", 250)
);
Assert.assertEquals(Collections.<Tuple4<String,String,String,Object>>emptyList(), actualLeft);
Assert.assertEquals(expected, actualRight);
Assert.assertEquals(expected, actualFull);
}
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:AbstractSortMergeOuterJoinIteratorITCase.java
示例15: getAnomalySteam
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
public DataStream<Tuple2<K, AnomalyResult>> getAnomalySteam(DataStream<V> ds, KeySelector<V, K> keySelector, KeySelector<V,Double> valueSelector, Time window) {
KeyedStream<V, K> keyedInput = ds
.keyBy(keySelector);
TypeInformation<Tuple2<K,Tuple4<Double,Double,Long,Long>>> resultType = (TypeInformation) new TupleTypeInfo<>(Tuple2.class,
new TypeInformation[] {keyedInput.getKeyType(), new TupleTypeInfo(Tuple4.class,
BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)});
Tuple2<K,Tuple4<Double,Double,Long,Long>> init= new Tuple2<>(null,new Tuple4<>(0d,0d,0l,0l));
KeyedStream<Tuple2<K,Tuple4<Double,Double,Long,Long>>, Tuple> kPreStream = keyedInput
.timeWindow(window)
.apply(init, new CountSumFold<>(keySelector,valueSelector, resultType),new WindowTimeExtractor(resultType))
.keyBy(0);
return kPreStream.flatMap(afm);
}
开发者ID:sics-dna,项目名称:isc4flink,代码行数:18,代码来源:PoissonValueAnomaly.java
示例16: getAnomalySteam
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
public DataStream<Tuple3<K, AnomalyResult, RV>> getAnomalySteam(DataStream<V> ds, KeySelector<V, K> keySelector, KeySelector<V,Double> valueSelector, PayloadFold<V, RV> valueFold, Time window) {
KeyedStream<V, K> keyedInput = ds
.keyBy(keySelector);
TypeInformation<Object> foldResultType = TypeExtractor.getUnaryOperatorReturnType(valueFold,
PayloadFold.class,
false,
false,
ds.getType(),
"PayloadFold",
false);
TypeInformation<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>> resultType = (TypeInformation) new TupleTypeInfo<>(Tuple3.class,
new TypeInformation[] {keyedInput.getKeyType(), new TupleTypeInfo(Tuple4.class,
BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO), foldResultType});
Tuple3<K,Tuple4<Double,Double,Long,Long>, RV> init= new Tuple3<>(null,new Tuple4<>(0d,0d,0l,0l), valueFold.getInit());
KeyedStream<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>, Tuple> kPreStream = keyedInput
.timeWindow(window)
.apply(init, new ExtCountSumFold<>(keySelector,valueSelector,valueFold, resultType),new ExtWindowTimeExtractor(resultType))
.keyBy(0);
return kPreStream.flatMap(afm);
}
开发者ID:sics-dna,项目名称:isc4flink,代码行数:27,代码来源:ExtPoissonValueAnomaly.java
示例17: getAnomalySteam
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
public DataStream<Tuple2<K, AnomalyResult>> getAnomalySteam(DataStream<V> ds, KeySelector<V, K> keySelector , Time window) {
KeyedStream<V, K> keyedInput = ds
.keyBy(keySelector);
TypeInformation<Tuple2<K,Tuple4<Double,Double,Long,Long>>> resultType = (TypeInformation) new TupleTypeInfo<>(Tuple2.class,
new TypeInformation[] {keyedInput.getKeyType(), new TupleTypeInfo(Tuple4.class,
BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)});
Tuple2<K,Tuple4<Double,Double,Long,Long>> init= new Tuple2<>(null,new Tuple4<>(0d,0d,0l,0l));
KeyedStream<Tuple2<K,Tuple4<Double,Double,Long,Long>>, Tuple> kPreStream = keyedInput
.timeWindow(window)
.apply(init, new CountWindFold<>(keySelector, window, resultType),new WindowTimeExtractor(resultType))
.keyBy(0);
return kPreStream.flatMap(afm);
}
开发者ID:sics-dna,项目名称:isc4flink,代码行数:19,代码来源:PoissonFreqAnomaly.java
示例18: getAnomalySteam
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
public DataStream<Tuple3<K, AnomalyResult, RV>> getAnomalySteam(DataStream<V> ds, KeySelector<V, K> keySelector , PayloadFold<V, RV> valueFold, Time window) {
KeyedStream<V, K> keyedInput = ds
.keyBy(keySelector);
TypeInformation<Object> foldResultType = TypeExtractor.getUnaryOperatorReturnType(valueFold,
PayloadFold.class,
false,
false,
ds.getType(),
"PayloadFold",
false);
TypeInformation<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>> resultType = (TypeInformation) new TupleTypeInfo<>(Tuple3.class,
new TypeInformation[] {keyedInput.getKeyType(), new TupleTypeInfo(Tuple4.class,
BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO), foldResultType});
Tuple3<K,Tuple4<Double,Double,Long,Long>, RV> init= new Tuple3<>(null,new Tuple4<>(0d,0d,0l,0l), valueFold.getInit());
KeyedStream<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>, Tuple> kPreStream = keyedInput
.timeWindow(window)
.apply(init, new ExtCountWindFold<>(keySelector,valueFold, window, resultType),new ExtWindowTimeExtractor(resultType))
.keyBy(0);
return kPreStream.flatMap(afm);
}
开发者ID:sics-dna,项目名称:isc4flink,代码行数:25,代码来源:ExtPoissonFreqAnomaly.java
示例19: getAnomalySteam
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
public DataStream<Tuple3<K, AnomalyResult, RV>> getAnomalySteam(DataStream<V> ds, KeySelector<V, K> keySelector, KeySelector<V,Double> valueSelector, PayloadFold<V, RV> valueFold, Time window) {
KeyedStream<V, K> keyedInput = ds
.keyBy(keySelector);
TypeInformation<Object> foldResultType = TypeExtractor.getUnaryOperatorReturnType(valueFold,
PayloadFold.class,
false,
false,
ds.getType(),
"PayloadFold",
false);
TypeInformation<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>> resultType = (TypeInformation) new TupleTypeInfo<>(Tuple3.class,
new TypeInformation[] {keyedInput.getKeyType(), new TupleTypeInfo(Tuple4.class,
BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO), foldResultType});
Tuple3<K,Tuple4<Double,Double,Long,Long>, RV> init= new Tuple3<>(null,new Tuple4<>(0d,0d,0l,0l), valueFold.getInit());
KeyedStream<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>, Tuple> kPreStream = keyedInput
.timeWindow(window)
.apply(init, new ExtLogCountSumFold<>(keySelector,valueSelector,valueFold, resultType),new ExtWindowTimeExtractor(resultType))
.keyBy(0);
return kPreStream.flatMap(afm);
}
开发者ID:sics-dna,项目名称:isc4flink,代码行数:26,代码来源:ExtLogNormalValueAnomaly.java
示例20: getAnomalySteam
import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
public DataStream<Tuple3<K, AnomalyResult, RV>> getAnomalySteam(DataStream<V> ds, KeySelector<V, K> keySelector, KeySelector<V,Double> valueSelector, PayloadFold<V, RV> valueFold, Time window) {
KeyedStream<V, K> keyedInput = ds
.keyBy(keySelector);
TypeInformation<Object> foldResultType = TypeExtractor.getUnaryOperatorReturnType(valueFold,
PayloadFold.class,
false,
false,
ds.getType(),
"PayloadFold",
false);
TypeInformation<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>> resultType = (TypeInformation) new TupleTypeInfo<>(Tuple2.class,
new TypeInformation[] {keyedInput.getKeyType(), new TupleTypeInfo(Tuple4.class,
BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO), foldResultType});
Tuple3<K,Tuple4<Double,Double,Long,Long>, RV> init= new Tuple3<>(null,new Tuple4<>(0d,0d,0l,0l), valueFold.getInit());
KeyedStream<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>, Tuple> kPreStream = keyedInput
.timeWindow(window)
.apply(init, new ExtLogCountSumFold<>(keySelector,valueSelector,valueFold, resultType),new ExtWindowTimeExtractor(resultType))
.keyBy(0);
return kPreStream.flatMap(afm);
}
开发者ID:sics-dna,项目名称:isc4flink,代码行数:26,代码来源:LogNormalValueAnomaly.java
注:本文中的org.apache.flink.api.java.tuple.Tuple4类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论