• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java Tuple4类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.api.java.tuple.Tuple4的典型用法代码示例。如果您正苦于以下问题:Java Tuple4类的具体用法?Java Tuple4怎么用?Java Tuple4使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Tuple4类属于org.apache.flink.api.java.tuple包,在下文中一共展示了Tuple4类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: transformation

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
/**
 * Data transformation.
 * The method group by trackId, sum the number of occurrences, sort the output
 * and get the top elements defined by the user.
 * @param input
 * @return
 */
@Override
public DataSet<ChartsResult> transformation(DataSet<?> input) {
    final int limit= pipelineConf.getArgs().getLimit();

    log.info("Transformation Phase. Computing the tags");
    SortPartitionOperator<Tuple4<Long, Integer, String, TagEvent>> grouped = (SortPartitionOperator<Tuple4<Long, Integer, String, TagEvent>>) input
            .groupBy(2, 0) // Grouping by state & trackId
            .sum(1) // Sum the occurrences of each grouped item
            .sortPartition(2, Order.ASCENDING).setParallelism(1) // Sort by state
            .sortPartition(1, Order.DESCENDING).setParallelism(1);// Sort by count
            return grouped.reduceGroup(new ReduceLimit(limit, 2)); // Reducing groups applying the limit specified by user
}
 
开发者ID:aaitor,项目名称:flink-charts,代码行数:20,代码来源:StateChartsPipeline.java


示例2: reduce

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@Override
public void reduce(Iterable<Tuple4<Long, Integer, String, TagEvent>> values, Collector<ChartsResult> out) throws Exception {
    int counter = 0;
    String group= "";
    log.info("Reducing Groups and applyting limit(" + limit + ") by field " + groupPosition);

    for (Tuple4<Long, Integer, String, TagEvent> t : values) {
        if (!t.getField(groupPosition).equals(group))    {
            counter= 0;
            group= t.f2;
        }
        if (counter < limit) {
            out.collect(new ChartsResult(t.f0, t.f1, t.f3));
        }
        counter++;
    }
}
 
开发者ID:aaitor,项目名称:flink-charts,代码行数:18,代码来源:StateChartsPipeline.java


示例3: cleansingTest

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
/**
 * Test to validate the cleansing method.
 * We generate a DataSet with 10 TagEvents and modify 3 items to force bad data
 * The assertion checks that only are obtained the proper number of items after the
 * cleansing process.
 * @throws Exception
 */
@Test
public void cleansingTest() throws Exception {
    String args[]= {"-c", "state_chart", "-l", "3"};
    argsParser= ArgsParser.builder(args);

    PipelineChartsConf pipelineConf= new PipelineChartsConf(config, argsParser);
    StateChartsPipeline pipeline= new StateChartsPipeline(pipelineConf);

    List<TagEvent> mockCollection= TagEventUtils.getMockData(10);
    mockCollection.set(0, new TagEvent(0l, "xxx", "yy","ZZ", "Locality", "United States"));
    mockCollection.set(2, new TagEvent(0l, "xxx", "yy","ZZ", "Locality", "UK"));
    mockCollection.set(4, new TagEvent(99l, "xxx", "yy","", "", ""));


    DataSet<TagEvent> mockDataset= pipeline.getEnv().fromCollection(mockCollection);

    DataSet<Tuple4<Long, Integer, String, TagEvent>> clean = pipeline.cleansing(mockDataset);
    assertEquals(7, clean.count());
}
 
开发者ID:aaitor,项目名称:flink-charts,代码行数:27,代码来源:StateChartsPipelineTest.java


示例4: invoke

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@Override
public void invoke(Tuple4<String, Integer, String, Integer> inTuple4) throws InterruptedException {
    try {
        if (inTuple4 != null) {
            if (inTuple4.f2 != null && !"".equals(inTuple4.f2.trim())) {
                JSONObject data = JSONObject.parseObject(inTuple4.f2);
                JSONObject filter = data.getJSONObject("filter");
                JSONObject doc = data.getJSONObject("data");
                if (data.getString("dbName") != null) {
                    mongoManager.executeUpsert(data.getString("dbName"), data.getString("collectionName"), filter, doc);
                }
            }
        }
    } catch (Exception e) {
        logger.error("Mongo入库发生捕获的异常!{}:{}", e.getClass(), e.getMessage());
        for (StackTraceElement element : e.getStackTrace()) {
            logger.error("\tat {}", element.toString());
        }
    }
}
 
开发者ID:breakEval13,项目名称:rocketmq-flink-plugin,代码行数:21,代码来源:MongoSink.java


示例5: testAsFromTupleToPojo

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@Test
public void testAsFromTupleToPojo() throws Exception {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

	List<Tuple4<String, Integer, Double, String>> data = new ArrayList<>();
	data.add(new Tuple4<>("Rofl", 1, 1.0, "Hi"));
	data.add(new Tuple4<>("lol", 2, 1.0, "Hi"));
	data.add(new Tuple4<>("Test me", 4, 3.33, "Hello world"));

	Table table = tableEnv
		.fromDataSet(env.fromCollection(data), "a, b, c, d")
		.select("a, b, c, d");

	DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
	List<SmallPojo2> results = ds.collect();
	String expected = "Rofl,1,1.0,Hi\n" + "lol,2,1.0,Hi\n" + "Test me,4,3.33,Hello world\n";
	compareResultAsText(results, expected);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:TableEnvironmentITCase.java


示例6: apply

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
public void apply(
		Tuple key,
		TimeWindow window,
		Iterable<Tuple2<Integer, Boolean>> gridCells,
		Collector<Tuple4<Integer, Long, Boolean, Integer>> out) throws Exception {

	int cellId = ((Tuple2<Integer, Boolean>)key).f0;
	boolean isStart = ((Tuple2<Integer, Boolean>)key).f1;
	long windowTime = window.getEnd();

	int cnt = 0;
	for(Tuple2<Integer, Boolean> c : gridCells) {
		cnt += 1;
	}

	out.collect(new Tuple4<>(cellId, windowTime, isStart, cnt));
}
 
开发者ID:flink-taiwan,项目名称:jcconf2016-workshop,代码行数:20,代码来源:TaxiRideWithKafkaAnswer.java


示例7: testUnboundedPojoSourceAndReturnTuple

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@Test
public void testUnboundedPojoSourceAndReturnTuple() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	DataStream<Event> input = env.addSource(new RandomEventSource(5));

	DataStream<Tuple4<Long, Integer, String, Double>> output = SiddhiCEP
		.define("inputStream", input, "id", "name", "price", "timestamp")
		.cql("from inputStream select timestamp, id, name, price insert into  outputStream")
		.returns("outputStream");

	DataStream<Integer> following = output.map(new MapFunction<Tuple4<Long, Integer, String, Double>, Integer>() {
		@Override
		public Integer map(Tuple4<Long, Integer, String, Double> value) throws Exception {
			return value.f1;
		}
	});
	String resultPath = tempFolder.newFile().toURI().toString();
	following.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
	env.execute();
	assertEquals(5, getLineCount(resultPath));
}
 
开发者ID:haoch,项目名称:flink-siddhi,代码行数:22,代码来源:SiddhiCEPITCase.java


示例8: testTypeInfoParser

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@Test
public void testTypeInfoParser() {
	TypeInformation<Tuple3<String, Long, Object>> type1 = TypeInfoParser.parse("Tuple3<String,Long,java.lang.Object>");
	Assert.assertNotNull(type1);
	TypeInformation<Tuple4<String, Long, Object, InnerPojo>> type2 = TypeInfoParser.parse("Tuple4<" + String.class.getName() + ", " + Long.class.getName() + ", " + java.lang.Object.class.getName() + "," + InnerPojo.class.getName() + ">");
	Assert.assertNotNull(type2);
}
 
开发者ID:haoch,项目名称:flink-siddhi,代码行数:8,代码来源:SiddhiTypeFactoryTest.java


示例9: apply

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
public void apply(
        Tuple key,
        TimeWindow window,
        Iterable<Tuple2<Integer, Boolean>> values,
        Collector<Tuple4<Integer, Long, Boolean, Integer>> out) throws Exception {

    int cellId = ((Tuple2<Integer, Boolean>) key).f0;
    boolean isStart = ((Tuple2<Integer, Boolean>) key).f1;
    long windowTime = window.getEnd();

    int cnt = 0;
    for (Tuple2<Integer, Boolean> v : values) {
        cnt += 1;
    }

    out.collect(new Tuple4<>(cellId, windowTime, isStart, cnt));
}
 
开发者ID:thr0n,项目名称:clojured-taxi-rides,代码行数:20,代码来源:PopularPlaces.java


示例10: calculateAnomaly

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@Override
public AnomalyResult calculateAnomaly(Tuple4<Double,Double,Long,Long> v, double threshold) {
    NormalHValue w =new NormalHValue();
    w.f0 = 1d;
    w.f1 = v.f1;
    w.f2 = v.f0 * v.f1;
    w.f3 = v.f0 * v.f0 * v.f1 ;

    NormalHValue h = (NormalHValue) hist.getHistory();
    if (h == null) {
        return new AnomalyResult(-1,v.f2,v.f3,threshold,w,h);
    }
    double mean = h.f2/h.f1;
    double scale = h.f3-h.f2*h.f2/h.f1;
    double cc = h.f0 * 0.5;
    return new AnomalyResult(calculateAnomaly(v.f0, cc, mean  , (h.f1+v.f1)/(h.f1*v.f1)*scale),v.f2,v.f3,threshold,w,h);
}
 
开发者ID:sics-dna,项目名称:isc4flink,代码行数:18,代码来源:NormalModel.java


示例11: getAnomalySteam

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
public DataStream<Tuple3<K, AnomalyResult, RV>> getAnomalySteam(DataStream<V> ds, KeySelector<V, K> keySelector , KeySelector<V,Double> valueSelector, PayloadFold<V, RV> valueFold, Time window) {

        KeyedStream<V, K> keyedInput = ds
                .keyBy(keySelector);

        TypeInformation<Object> foldResultType = TypeExtractor.getUnaryOperatorReturnType(valueFold,
                PayloadFold.class,
                false,
                false,
                ds.getType(),
                "PayloadFold",
                false);

         TypeInformation<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>> resultType = (TypeInformation) new TupleTypeInfo<>(Tuple3.class,
                new TypeInformation[] {keyedInput.getKeyType(), new TupleTypeInfo(Tuple4.class,
                        BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO), foldResultType});

        Tuple3<K,Tuple4<Double,Double,Long,Long>, RV> init= new Tuple3<>(null,new Tuple4<>(0d,0d,0l,0l), valueFold.getInit());
        KeyedStream<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>, Tuple> kPreStream = keyedInput
                .timeWindow(window)
                .apply(init, new ExtCountWindFold<>(keySelector,valueFold, window, resultType),new ExtWindowTimeExtractor(resultType))
                .keyBy(0);

        return kPreStream.flatMap(afm);
    }
 
开发者ID:sics-dna,项目名称:isc4flink,代码行数:26,代码来源:ExtNormalFreqAnomaly.java


示例12: testRightSideEmpty

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@SuppressWarnings("unchecked")
protected void testRightSideEmpty() throws Exception {
	CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of(
			new Tuple2<>("Jack", "Engineering"),
			new Tuple2<>("Tim", "Sales"),
			new Tuple2<>("Zed", "HR")
	);
	CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of();

	List<Tuple4<String, String, String, Object>> actualLeft = computeOuterJoin(input1, input2, OuterJoinType.LEFT);
	List<Tuple4<String, String, String, Object>> actualRight = computeOuterJoin(input1, input2, OuterJoinType.RIGHT);
	List<Tuple4<String, String, String, Object>> actualFull = computeOuterJoin(input1, input2, OuterJoinType.FULL);

	List<Tuple4<String, String, String, Object>> expected = Arrays.asList(
			new Tuple4<String, String, String, Object>("Jack", "Engineering", null, null),
			new Tuple4<String, String, String, Object>("Tim", "Sales", null, null),
			new Tuple4<String, String, String, Object>("Zed", "HR", null, null)
	);

	Assert.assertEquals(expected, actualLeft);
	Assert.assertEquals(expected, actualFull);
	Assert.assertEquals(Collections.<Tuple4<String,String,String,Object>>emptyList(), actualRight);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:AbstractSortMergeOuterJoinIteratorITCase.java


示例13: getAnomalySteam

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
public DataStream<Tuple2<K, AnomalyResult>> getAnomalySteam(DataStream<V> ds, KeySelector<V, K> keySelector , KeySelector<V,Double> valueSelector, Time window) {

        KeyedStream<V, K> keyedInput = ds
                .keyBy(keySelector);

         TypeInformation<Tuple2<K,Tuple4<Double,Double,Long,Long>>> resultType = (TypeInformation) new TupleTypeInfo<>(Tuple2.class,
                new TypeInformation[] {keyedInput.getKeyType(), new TupleTypeInfo(Tuple4.class,
                        BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)});

        Tuple2<K,Tuple4<Double,Double,Long,Long>> init= new Tuple2<>(null,new Tuple4<>(0d,0d,0l,0l));
        KeyedStream<Tuple2<K,Tuple4<Double,Double,Long,Long>>, Tuple> kPreStream = keyedInput
                .timeWindow(window)
                .apply(init, new CountSumFold<>(keySelector, valueSelector, resultType), new WindowTimeExtractor(resultType))
                .keyBy(0);

        return kPreStream.flatMap(afm);
    }
 
开发者ID:sics-dna,项目名称:isc4flink,代码行数:18,代码来源:NormalValueAnomaly.java


示例14: testLeftSideEmpty

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
@SuppressWarnings("unchecked")
protected void testLeftSideEmpty() throws Exception {
	CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of();
	CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of(
			new Tuple2<>("Allison", 100),
			new Tuple2<>("Jack", 200),
			new Tuple2<>("Zed", 150),
			new Tuple2<>("Zed", 250)
	);

	List<Tuple4<String, String, String, Object>> actualLeft = computeOuterJoin(input1, input2, OuterJoinType.LEFT);
	List<Tuple4<String, String, String, Object>> actualRight = computeOuterJoin(input1, input2, OuterJoinType.RIGHT);
	List<Tuple4<String, String, String, Object>> actualFull = computeOuterJoin(input1, input2, OuterJoinType.FULL);

	List<Tuple4<String, String, String, Object>> expected = Arrays.asList(
			new Tuple4<String, String, String, Object>(null, null, "Allison", 100),
			new Tuple4<String, String, String, Object>(null, null, "Jack", 200),
			new Tuple4<String, String, String, Object>(null, null, "Zed", 150),
			new Tuple4<String, String, String, Object>(null, null, "Zed", 250)
	);

	Assert.assertEquals(Collections.<Tuple4<String,String,String,Object>>emptyList(), actualLeft);
	Assert.assertEquals(expected, actualRight);
	Assert.assertEquals(expected, actualFull);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:AbstractSortMergeOuterJoinIteratorITCase.java


示例15: getAnomalySteam

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
public DataStream<Tuple2<K, AnomalyResult>> getAnomalySteam(DataStream<V> ds, KeySelector<V, K> keySelector, KeySelector<V,Double> valueSelector, Time window) {

        KeyedStream<V, K> keyedInput = ds
                .keyBy(keySelector);

        TypeInformation<Tuple2<K,Tuple4<Double,Double,Long,Long>>> resultType = (TypeInformation) new TupleTypeInfo<>(Tuple2.class,
                new TypeInformation[] {keyedInput.getKeyType(), new TupleTypeInfo(Tuple4.class,
                        BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)});

        Tuple2<K,Tuple4<Double,Double,Long,Long>> init= new Tuple2<>(null,new Tuple4<>(0d,0d,0l,0l));
        KeyedStream<Tuple2<K,Tuple4<Double,Double,Long,Long>>, Tuple> kPreStream = keyedInput
                .timeWindow(window)
                .apply(init, new CountSumFold<>(keySelector,valueSelector, resultType),new WindowTimeExtractor(resultType))
                .keyBy(0);

        return kPreStream.flatMap(afm);
    }
 
开发者ID:sics-dna,项目名称:isc4flink,代码行数:18,代码来源:PoissonValueAnomaly.java


示例16: getAnomalySteam

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
public DataStream<Tuple3<K, AnomalyResult, RV>> getAnomalySteam(DataStream<V> ds, KeySelector<V, K> keySelector, KeySelector<V,Double> valueSelector, PayloadFold<V, RV> valueFold, Time window) {

        KeyedStream<V, K> keyedInput = ds
                .keyBy(keySelector);

        TypeInformation<Object> foldResultType = TypeExtractor.getUnaryOperatorReturnType(valueFold,
                PayloadFold.class,
                false,
                false,
                ds.getType(),
                "PayloadFold",
                false);


        TypeInformation<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>> resultType = (TypeInformation) new TupleTypeInfo<>(Tuple3.class,
                new TypeInformation[] {keyedInput.getKeyType(), new TupleTypeInfo(Tuple4.class,
                        BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO), foldResultType});

        Tuple3<K,Tuple4<Double,Double,Long,Long>, RV> init= new Tuple3<>(null,new Tuple4<>(0d,0d,0l,0l), valueFold.getInit());
        KeyedStream<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>, Tuple> kPreStream = keyedInput
                .timeWindow(window)
                .apply(init, new ExtCountSumFold<>(keySelector,valueSelector,valueFold, resultType),new ExtWindowTimeExtractor(resultType))
                .keyBy(0);

        return kPreStream.flatMap(afm);
    }
 
开发者ID:sics-dna,项目名称:isc4flink,代码行数:27,代码来源:ExtPoissonValueAnomaly.java


示例17: getAnomalySteam

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
public DataStream<Tuple2<K, AnomalyResult>> getAnomalySteam(DataStream<V> ds, KeySelector<V, K> keySelector , Time window) {

        KeyedStream<V, K> keyedInput = ds
                .keyBy(keySelector);


        TypeInformation<Tuple2<K,Tuple4<Double,Double,Long,Long>>> resultType = (TypeInformation) new TupleTypeInfo<>(Tuple2.class,
                new TypeInformation[] {keyedInput.getKeyType(), new TupleTypeInfo(Tuple4.class,
                        BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)});

        Tuple2<K,Tuple4<Double,Double,Long,Long>> init= new Tuple2<>(null,new Tuple4<>(0d,0d,0l,0l));
        KeyedStream<Tuple2<K,Tuple4<Double,Double,Long,Long>>, Tuple> kPreStream = keyedInput
                .timeWindow(window)
                .apply(init, new CountWindFold<>(keySelector, window, resultType),new WindowTimeExtractor(resultType))
                .keyBy(0);

        return kPreStream.flatMap(afm);
    }
 
开发者ID:sics-dna,项目名称:isc4flink,代码行数:19,代码来源:PoissonFreqAnomaly.java


示例18: getAnomalySteam

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
public DataStream<Tuple3<K, AnomalyResult, RV>> getAnomalySteam(DataStream<V> ds, KeySelector<V, K> keySelector , PayloadFold<V, RV> valueFold, Time window) {

        KeyedStream<V, K> keyedInput = ds
                .keyBy(keySelector);

        TypeInformation<Object> foldResultType = TypeExtractor.getUnaryOperatorReturnType(valueFold,
                PayloadFold.class,
                false,
                false,
                ds.getType(),
                "PayloadFold",
                false);
 TypeInformation<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>> resultType = (TypeInformation) new TupleTypeInfo<>(Tuple3.class,
                new TypeInformation[] {keyedInput.getKeyType(), new TupleTypeInfo(Tuple4.class,
                        BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO), foldResultType});

        Tuple3<K,Tuple4<Double,Double,Long,Long>, RV> init= new Tuple3<>(null,new Tuple4<>(0d,0d,0l,0l), valueFold.getInit());
        KeyedStream<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>, Tuple> kPreStream = keyedInput
                .timeWindow(window)
                .apply(init, new ExtCountWindFold<>(keySelector,valueFold, window, resultType),new ExtWindowTimeExtractor(resultType))
                .keyBy(0);

        return kPreStream.flatMap(afm);
    }
 
开发者ID:sics-dna,项目名称:isc4flink,代码行数:25,代码来源:ExtPoissonFreqAnomaly.java


示例19: getAnomalySteam

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
public DataStream<Tuple3<K, AnomalyResult, RV>> getAnomalySteam(DataStream<V> ds, KeySelector<V, K> keySelector, KeySelector<V,Double> valueSelector, PayloadFold<V, RV> valueFold, Time window) {

        KeyedStream<V, K> keyedInput = ds
                .keyBy(keySelector);

        TypeInformation<Object> foldResultType = TypeExtractor.getUnaryOperatorReturnType(valueFold,
                PayloadFold.class,
                false,
                false,
                ds.getType(),
                "PayloadFold",
                false);

        TypeInformation<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>> resultType = (TypeInformation) new TupleTypeInfo<>(Tuple3.class,
                new TypeInformation[] {keyedInput.getKeyType(), new TupleTypeInfo(Tuple4.class,
                        BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO), foldResultType});

        Tuple3<K,Tuple4<Double,Double,Long,Long>, RV> init= new Tuple3<>(null,new Tuple4<>(0d,0d,0l,0l), valueFold.getInit());
        KeyedStream<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>, Tuple> kPreStream = keyedInput
                .timeWindow(window)
                .apply(init, new ExtLogCountSumFold<>(keySelector,valueSelector,valueFold, resultType),new ExtWindowTimeExtractor(resultType))
                .keyBy(0);

        return kPreStream.flatMap(afm);
    }
 
开发者ID:sics-dna,项目名称:isc4flink,代码行数:26,代码来源:ExtLogNormalValueAnomaly.java


示例20: getAnomalySteam

import org.apache.flink.api.java.tuple.Tuple4; //导入依赖的package包/类
public DataStream<Tuple3<K, AnomalyResult, RV>> getAnomalySteam(DataStream<V> ds, KeySelector<V, K> keySelector, KeySelector<V,Double> valueSelector, PayloadFold<V, RV> valueFold, Time window) {

        KeyedStream<V, K> keyedInput = ds
                .keyBy(keySelector);

        TypeInformation<Object> foldResultType = TypeExtractor.getUnaryOperatorReturnType(valueFold,
                PayloadFold.class,
                false,
                false,
                ds.getType(),
                "PayloadFold",
                false);

        TypeInformation<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>> resultType = (TypeInformation) new TupleTypeInfo<>(Tuple2.class,
                new TypeInformation[] {keyedInput.getKeyType(), new TupleTypeInfo(Tuple4.class,
                        BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO), foldResultType});

        Tuple3<K,Tuple4<Double,Double,Long,Long>, RV> init= new Tuple3<>(null,new Tuple4<>(0d,0d,0l,0l), valueFold.getInit());
        KeyedStream<Tuple3<K,Tuple4<Double,Double,Long,Long>,RV>, Tuple> kPreStream = keyedInput
                .timeWindow(window)
                .apply(init, new ExtLogCountSumFold<>(keySelector,valueSelector,valueFold, resultType),new ExtWindowTimeExtractor(resultType))
                .keyBy(0);

        return kPreStream.flatMap(afm);
    }
 
开发者ID:sics-dna,项目名称:isc4flink,代码行数:26,代码来源:LogNormalValueAnomaly.java



注:本文中的org.apache.flink.api.java.tuple.Tuple4类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java ProxoolFacade类代码示例发布时间:2022-05-21
下一篇:
Java SessionDefaultAudience类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap