• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java WindowedDeserializer类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.streams.kstream.internals.WindowedDeserializer的典型用法代码示例。如果您正苦于以下问题:Java WindowedDeserializer类的具体用法?Java WindowedDeserializer怎么用?Java WindowedDeserializer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



WindowedDeserializer类属于org.apache.kafka.streams.kstream.internals包,在下文中一共展示了WindowedDeserializer类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import org.apache.kafka.streams.kstream.internals.WindowedDeserializer; //导入依赖的package包/类
public static void main(String[] args) {

        StreamsConfig streamingConfig = new StreamsConfig(getProperties());

        JsonSerializer<StockTransactionCollector> stockTransactionsSerializer = new JsonSerializer<>();
        JsonDeserializer<StockTransactionCollector> stockTransactionsDeserializer = new JsonDeserializer<>(StockTransactionCollector.class);
        JsonDeserializer<StockTransaction> stockTxnDeserializer = new JsonDeserializer<>(StockTransaction.class);
        JsonSerializer<StockTransaction> stockTxnJsonSerializer = new JsonSerializer<>();
        Serde<StockTransaction> transactionSerde = Serdes.serdeFrom(stockTxnJsonSerializer,stockTxnDeserializer);
        StringSerializer stringSerializer = new StringSerializer();
        StringDeserializer stringDeserializer = new StringDeserializer();
        Serde<String> stringSerde = Serdes.serdeFrom(stringSerializer,stringDeserializer);
        Serde<StockTransactionCollector> collectorSerde = Serdes.serdeFrom(stockTransactionsSerializer,stockTransactionsDeserializer);
        WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(stringSerializer);
        WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
        Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer,windowedDeserializer);

        KStreamBuilder kStreamBuilder = new KStreamBuilder();


        KStream<String,StockTransaction> transactionKStream =  kStreamBuilder.stream(stringSerde,transactionSerde,"stocks");

        transactionKStream.map((k,v)-> new KeyValue<>(v.getSymbol(),v))
                          .through(stringSerde, transactionSerde,"stocks-out")
                          .groupBy((k,v) -> k, stringSerde, transactionSerde)
                          .aggregate(StockTransactionCollector::new,
                               (k, v, stockTransactionCollector) -> stockTransactionCollector.add(v),
                               TimeWindows.of(10000),
                               collectorSerde, "stock-summaries")
                .to(windowedSerde,collectorSerde,"transaction-summary");


        System.out.println("Starting StockStreams Example");
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,streamingConfig);
        kafkaStreams.start();
        System.out.println("Now started StockStreams Example");

    }
 
开发者ID:bbejeck,项目名称:kafka-streams,代码行数:39,代码来源:StocksKafkaStreamsDriver.java


示例2: readWindowedResults

import org.apache.kafka.streams.kstream.internals.WindowedDeserializer; //导入依赖的package包/类
private Map<Windowed<String>, GenericRow> readWindowedResults(
    String resultTopic,
    Schema resultSchema,
    int expectedNumMessages
) {
  Deserializer<Windowed<String>> keyDeserializer = new WindowedDeserializer<>(new StringDeserializer());
  return topicConsumer.readResults(resultTopic, resultSchema, expectedNumMessages, keyDeserializer);
}
 
开发者ID:confluentinc,项目名称:ksql,代码行数:9,代码来源:JsonFormatTest.java


示例3: consume

import org.apache.kafka.streams.kstream.internals.WindowedDeserializer; //导入依赖的package包/类
private void consume() {
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", "stroom.kafka:9092");
        consumerProps.put("group.id", "consumerGroup");
        consumerProps.put("enable.auto.commit", "true");
        consumerProps.put("auto.commit.interval.ms", "1000");
        consumerProps.put("session.timeout.ms", "30000");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        Serde<String> stringSerde = Serdes.String();
        Serde<Long> longSerde = Serdes.Long();
//        LongAggregatorSerializer longAggregatorSerialiser = new LongAggregatorSerializer();
//        LongAggregatorDeserializer longAggregatorDeserialiser = new LongAggregatorDeserializer();
//        Serde<LongAggregator> longAggregatorSerde = Serdes.serdeFrom(longAggregatorSerialiser, longAggregatorDeserialiser);
        Serde<LongAggregator> longAggregatorSerde = SerdeUtils.buildBasicSerde(
                (topic, data) -> Bytes.toBytes(data.getAggregateVal()),
                (topic, bData) -> new LongAggregator(Bytes.toLong(bData)));

        SerdeUtils.verify(longAggregatorSerde, new LongAggregator(123));

        WindowedSerializer<Long> longWindowedSerializer = new WindowedSerializer<>(longSerde.serializer());
        WindowedDeserializer<Long> longWindowedDeserializer = new WindowedDeserializer<>(longSerde.deserializer());
        Serde<Windowed<Long>> windowedSerde = Serdes.serdeFrom(longWindowedSerializer, longWindowedDeserializer);

        KafkaConsumer<Windowed<Long>, LongAggregator> consumer = new KafkaConsumer<>(
                consumerProps,
                windowedSerde.deserializer(),
//                longSerde.deserializer(),
                longAggregatorSerde.deserializer());

        consumer.subscribe(Collections.singletonList(DEST_TOPIC));

        ExecutorService executorService = Executors.newSingleThreadExecutor();


        @SuppressWarnings("FutureReturnValueIgnored")
        Future future = executorService.submit(() -> {
            LOGGER.info("Consumer about to poll");
            Instant terminationTime = null;
//            while (!isTerminated.get() || Instant.now().isBefore(terminationTime.plusSeconds(10))) {
            while (true) {
                try {
//                    ConsumerRecords<Windowed<Long>, LongAggregator> records = consumer.poll(100);
                    ConsumerRecords<Windowed<Long>, LongAggregator> records = consumer.poll(100);
//                LOGGER.info("Received {} messages in batch", records.count());
                    for (ConsumerRecord<Windowed<Long>, LongAggregator> record : records) {
//                    for (ConsumerRecord<Long, LongAggregator> record : records) {
                        //                    System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
                        LOGGER.info("Received message key: {} winStart: {} winEnd {} winDuration: {} val: {}",
                                epochMsToString(record.key().key()),
                                epochMsToString(record.key().window().start()),
                                epochMsToString(record.key().window().end()),
                                record.key().window().end() - record.key().window().start(),
                                record.value().getAggregateVal());
//                        LOGGER.info("Received message key: {} val: {}",
//                                epochMsToString(record.key()),
//                                record.value().getAggregateVal());
//                        outputData.computeIfAbsent(record.key(),aLong -> new AtomicLong()).addAndGet(record.value().getAggregateVal());
                        outputData.computeIfAbsent(record.key().key(), aLong -> new AtomicLong()).addAndGet(record.value().getAggregateVal());
                    }
                } catch (Exception e) {
                    LOGGER.error("Error polling topic {} ", DEST_TOPIC, e);
                }
                if (isTerminated.get()) {
                    terminationTime = Instant.now();
                }
            }
//            consumer.close();
//            LOGGER.info("Consumer closed");

        });
        LOGGER.info("Consumer started");
    }
 
开发者ID:gchq,项目名称:stroom-stats,代码行数:75,代码来源:KafkaStreamsSandbox.java


示例4: main

import org.apache.kafka.streams.kstream.internals.WindowedDeserializer; //导入依赖的package包/类
public static void main(String[] args) {

        String bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");
        LOG.info("KAFKA_BOOTSTRAP_SERVERS = {}", bootstrapServers);

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KStreamBuilder builder = new KStreamBuilder();

        KStream<String, String> source = builder.stream(sourceAddress);

        KStream<Windowed<String>, String> max = source
                /*.selectKey((key, value, newKey) -> {
                    return "temp";
                })*/
                .selectKey(new KeyValueMapper<String, String, String>() {
                    @Override
                    public String apply(String key, String value) {
                        return "temp";
                    }
                })
                .groupByKey()
                .reduce((a,b) -> {
                    if (Integer.parseInt(a) > Integer.parseInt(b))
                        return a;
                    else
                        return b;
                }, TimeWindows.of(TimeUnit.SECONDS.toMillis(5000)))
                .toStream();

        WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(Serdes.String().serializer());
        WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(Serdes.String().deserializer());
        Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);

        // need to override key serde to Windowed<String> type
        max.to(windowedSerde, Serdes.String(), destinationAddress);

        final KafkaStreams streams = new KafkaStreams(builder, props);

        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-temperature-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
 
开发者ID:ppatierno,项目名称:enmasse-iot-demo,代码行数:65,代码来源:KafkaTemperature.java


示例5: main

import org.apache.kafka.streams.kstream.internals.WindowedDeserializer; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {

        Properties props = new Properties();
        props.put(APPLICATION_ID_CONFIG, "my-stream-processing-application");
        props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.serializer", JsonPOJOSerializer.class.getName());
        props.put("value.deserializer", JsonPOJODeserializer.class.getName());

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        Map<String, Object> serdeProps = new HashMap<>();
        serdeProps.put("JsonPOJOClass", Messung.class);

        final Serializer<Messung> serializer = new JsonPOJOSerializer<>();
        serializer.configure(serdeProps, false);

        final Deserializer<Messung> deserializer = new JsonPOJODeserializer<>();
        deserializer.configure(serdeProps, false);

        final Serde<Messung> pojoSerde = Serdes.serdeFrom(serializer, deserializer);

        Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(new WindowedSerializer<>(new StringSerializer()), new WindowedDeserializer<>(new StringDeserializer()));

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, Messung> stream = builder.stream(Serdes.String(), pojoSerde, "produktion");

        KTable<Windowed<String>, Double> table = stream
                .groupByKey(Serdes.String(), pojoSerde)
                .aggregate( () -> new Double(0),
                            (k,v,agg) -> agg + v.kw
                            ,TimeWindows.of(10000),
                        Serdes.Double(), "store");

        table.toStream().to(windowedSerde, Serdes.Double(), "produktion3");


        KStream<Windowed<String>, Double> aggregiert = builder.stream(windowedSerde, Serdes.Double(), "produktion3");
        aggregiert.map((k, v) -> {
            String time = new SimpleDateFormat("HH:mm:ss").format(k.window().start());
            System.out.printf("%s key: %s value: %s\n", time, k.key(), v);

            return new KeyValue(k, v);
        });

        KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props));
        streams.start();
    }
 
开发者ID:predic8,项目名称:apache-kafka-demos,代码行数:50,代码来源:GroupByKeyStream.java


示例6: WindowedSerde

import org.apache.kafka.streams.kstream.internals.WindowedDeserializer; //导入依赖的package包/类
public WindowedSerde(Serde<T> serde) {
  inner = Serdes.serdeFrom(
      new WindowedSerializer<>(serde.serializer()),
      new WindowedDeserializer<>(serde.deserializer()));
}
 
开发者ID:NewTranx,项目名称:newtranx-utils,代码行数:6,代码来源:WindowedSerde.java


示例7: shouldAggregateTumblingWindow

import org.apache.kafka.streams.kstream.internals.WindowedDeserializer; //导入依赖的package包/类
@Test
public void shouldAggregateTumblingWindow() throws Exception {

  testHarness.publishTestData(topicName, dataProvider, now);


  final String streamName = "TUMBLING_AGGTEST";

  final String queryString = String.format(
          "CREATE TABLE %s AS SELECT %s FROM ORDERS WINDOW %s WHERE ITEMID = 'ITEM_1' GROUP BY ITEMID;",
          streamName,
          "ITEMID, COUNT(ITEMID), SUM(ORDERUNITS)",
          "TUMBLING ( SIZE 10 SECONDS)"
  );

  ksqlContext.sql(queryString);

  Schema resultSchema = ksqlContext.getMetaStore().getSource(streamName).getSchema();

  final GenericRow expected = new GenericRow(Arrays.asList(null, null, "ITEM_1", 2 /** 2 x items **/, 20.0));

  final Map<String, GenericRow> results = new HashMap<>();
  TestUtils.waitForCondition(() -> {
    final Map<Windowed<String>, GenericRow> windowedResults = testHarness.consumeData(streamName, resultSchema, 1, new WindowedDeserializer<>(new StringDeserializer()), MAX_POLL_PER_ITERATION);
    updateResults(results, windowedResults);
    final GenericRow actual = results.get("ITEM_1");
    return expected.equals(actual);
  }, 60000, "didn't receive correct results within timeout");

  AdminClient adminClient = AdminClient.create(testHarness.ksqlConfig.getKsqlStreamConfigProps());
  KafkaTopicClient topicClient = new KafkaTopicClientImpl(adminClient);

  Set<String> topicBeforeCleanup = topicClient.listTopicNames();

  assertThat("Expected to have 5 topics instead have : " + topicBeforeCleanup.size(),
             topicBeforeCleanup.size(), equalTo(5));
  QueryMetadata queryMetadata = ksqlContext.getRunningQueries().iterator().next();

  queryMetadata.close();
  Set<String> topicsAfterCleanUp = topicClient.listTopicNames();

  assertThat("Expected to see 3 topics after clean up but seeing " + topicsAfterCleanUp.size
      (), topicsAfterCleanUp.size(), equalTo(3));
}
 
开发者ID:confluentinc,项目名称:ksql,代码行数:45,代码来源:WindowingIntTest.java


示例8: shouldAggregateHoppingWindow

import org.apache.kafka.streams.kstream.internals.WindowedDeserializer; //导入依赖的package包/类
@Test
public void shouldAggregateHoppingWindow() throws Exception {

  testHarness.publishTestData(topicName, dataProvider, now);


  final String streamName = "HOPPING_AGGTEST";

  final String queryString = String.format(
          "CREATE TABLE %s AS SELECT %s FROM ORDERS WINDOW %s WHERE ITEMID = 'ITEM_1' GROUP BY ITEMID;",
          streamName,
          "ITEMID, COUNT(ITEMID), SUM(ORDERUNITS)",
          "HOPPING ( SIZE 10 SECONDS, ADVANCE BY 5 SECONDS)"
  );

  ksqlContext.sql(queryString);

  Schema resultSchema = ksqlContext.getMetaStore().getSource(streamName).getSchema();


  final GenericRow expected = new GenericRow(Arrays.asList(null, null, "ITEM_1", 2 /** 2 x items **/, 20.0));

  final Map<String, GenericRow> results = new HashMap<>();
  TestUtils.waitForCondition(() -> {
    final Map<Windowed<String>, GenericRow> windowedResults = testHarness.consumeData(streamName, resultSchema, 1, new WindowedDeserializer<>(new StringDeserializer()), 1000);
    updateResults(results, windowedResults);
    final GenericRow actual = results.get("ITEM_1");
    return expected.equals(actual);
  }, 60000, "didn't receive correct results within timeout");

  AdminClient adminClient = AdminClient.create(testHarness.ksqlConfig.getKsqlStreamConfigProps());
  KafkaTopicClient topicClient = new KafkaTopicClientImpl(adminClient);

  Set<String> topicBeforeCleanup = topicClient.listTopicNames();

  assertThat("Expected to have 5 topics instead have : " + topicBeforeCleanup.size(),
             topicBeforeCleanup.size(), equalTo(5));
  QueryMetadata queryMetadata = ksqlContext.getRunningQueries().iterator().next();

  queryMetadata.close();
  Set<String> topicsAfterCleanUp = topicClient.listTopicNames();

  assertThat("Expected to see 3 topics after clean up but seeing " + topicsAfterCleanUp.size
      (), topicsAfterCleanUp.size(), equalTo(3));
}
 
开发者ID:confluentinc,项目名称:ksql,代码行数:46,代码来源:WindowingIntTest.java


示例9: WindowedSerde

import org.apache.kafka.streams.kstream.internals.WindowedDeserializer; //导入依赖的package包/类
public WindowedSerde() {
  serializer = new WindowedSerializer<>(new StringSerializer());
  deserializer = new WindowedDeserializer<>(new StringDeserializer());
}
 
开发者ID:confluentinc,项目名称:ksql,代码行数:5,代码来源:WindowedSerde.java


示例10: shouldAggregateSessionWindow

import org.apache.kafka.streams.kstream.internals.WindowedDeserializer; //导入依赖的package包/类
@Test
public void shouldAggregateSessionWindow() throws Exception {

  testHarness.publishTestData(topicName, dataProvider, now);


  final String streamName = "SESSION_AGGTEST";

  final String queryString = String.format(
          "CREATE TABLE %s AS SELECT %s FROM ORDERS WINDOW %s GROUP BY ORDERID;",
          streamName,
          "ORDERID, COUNT(*), SUM(ORDERUNITS)",
          "SESSION (10 SECONDS)"
  );

  ksqlContext.sql(queryString);

  Schema resultSchema = ksqlContext.getMetaStore().getSource(streamName).getSchema();


  GenericRow expectedResults = new GenericRow(Arrays.asList(null, null, "ORDER_6", 6 /** 2 x items **/, 420.0));

  final Map<String, GenericRow> results = new HashMap<>();

  TestUtils.waitForCondition(() -> {
    final Map<Windowed<String>, GenericRow> windowedResults = testHarness.consumeData(streamName, resultSchema, datasetOneMetaData.size(), new WindowedDeserializer<>(new StringDeserializer()), 1000);
    updateResults(results, windowedResults);
    final GenericRow actual = results.get("ORDER_6");
    return expectedResults.equals(actual) && results.size() == 6;
  }, 60000, "didn't receive correct results within timeout");

  AdminClient adminClient = AdminClient.create(testHarness.ksqlConfig.getKsqlStreamConfigProps());
  KafkaTopicClient topicClient = new KafkaTopicClientImpl(adminClient);

  Set<String> topicBeforeCleanup = topicClient.listTopicNames();

  assertThat("Expected to have 5 topics instead have : " + topicBeforeCleanup.size(),
             topicBeforeCleanup.size(), equalTo(5));
  QueryMetadata queryMetadata = ksqlContext.getRunningQueries().iterator().next();

  queryMetadata.close();
  Set<String> topicsAfterCleanUp = topicClient.listTopicNames();

  assertThat("Expected to see 3 topics after clean up but seeing " + topicsAfterCleanUp.size
      (), topicsAfterCleanUp.size(), equalTo(3));

}
 
开发者ID:confluentinc,项目名称:ksql,代码行数:48,代码来源:WindowingIntTest.java



注:本文中的org.apache.kafka.streams.kstream.internals.WindowedDeserializer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java BendpointMoveHandle类代码示例发布时间:2022-05-23
下一篇:
Java SqlProvider类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap