• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java StateStoreSupplier类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.streams.processor.StateStoreSupplier的典型用法代码示例。如果您正苦于以下问题:Java StateStoreSupplier类的具体用法?Java StateStoreSupplier怎么用?Java StateStoreSupplier使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



StateStoreSupplier类属于org.apache.kafka.streams.processor包,在下文中一共展示了StateStoreSupplier类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: shouldDriveGlobalStore

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test
public void shouldDriveGlobalStore() throws Exception {
    final StateStoreSupplier storeSupplier = Stores.create("my-store")
            .withStringKeys().withStringValues().inMemory().disableLogging().build();
    final String global = "global";
    final String topic = "topic";
    final TopologyBuilder topologyBuilder = this.builder
            .addGlobalStore(storeSupplier, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor("my-store")));

    driver = new ProcessorTopologyTestDriver(config, topologyBuilder);
    final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get("my-store");
    driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
    driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
    assertEquals("value1", globalStore.get("key1"));
    assertEquals("value2", globalStore.get("key2"));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:18,代码来源:ProcessorTopologyTest.java


示例2: doTable

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
private <K, V> KTable<K, V> doTable(final AutoOffsetReset offsetReset,
                                    final Serde<K> keySerde,
                                    final Serde<V> valSerde,
                                    final TimestampExtractor timestampExtractor,
                                    final String topic,
                                    final StateStoreSupplier<KeyValueStore> storeSupplier,
                                    final boolean isQueryable) {
    final String source = newName(KStreamImpl.SOURCE_NAME);
    final String name = newName(KTableImpl.SOURCE_NAME);
    final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());

    addSource(offsetReset, source, timestampExtractor, keySerde == null ? null : keySerde.deserializer(),
            valSerde == null ? null : valSerde.deserializer(),
            topic);
    addProcessor(name, processorSupplier, source);

    final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier,
            keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable);

    addStateStore(storeSupplier, name);
    connectSourceStoreAndTopic(storeSupplier.name(), topic);

    return kTable;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:KStreamBuilder.java


示例3: doFilter

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
                              final StateStoreSupplier<KeyValueStore> storeSupplier,
                              boolean isFilterNot) {
    Objects.requireNonNull(predicate, "predicate can't be null");
    String name = topology.newName(FILTER_NAME);
    String internalStoreName = null;
    if (storeSupplier != null) {
        internalStoreName = storeSupplier.name();
    }
    KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, isFilterNot, internalStoreName);
    topology.addProcessor(name, processorSupplier, this.name);
    if (storeSupplier != null) {
        topology.addStateStore(storeSupplier, name);
    }
    return new KTableImpl<>(topology, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, internalStoreName != null);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:KTableImpl.java


示例4: doMapValues

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
private <V1> KTable<K, V1> doMapValues(final ValueMapper<? super V, ? extends V1> mapper,
                                       final Serde<V1> valueSerde,
                                       final StateStoreSupplier<KeyValueStore> storeSupplier) {
    Objects.requireNonNull(mapper);
    String name = topology.newName(MAPVALUES_NAME);
    String internalStoreName = null;
    if (storeSupplier != null) {
        internalStoreName = storeSupplier.name();
    }
    KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(this, mapper, internalStoreName);
    topology.addProcessor(name, processorSupplier, this.name);
    if (storeSupplier != null) {
        topology.addStateStore(storeSupplier, name);
        return new KTableImpl<>(topology, name, processorSupplier, this.keySerde, valueSerde, sourceNodes, internalStoreName, true);
    } else {
        return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.queryableStoreName, false);
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:KTableImpl.java


示例5: aggregate

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                              final Aggregator<? super K, ? super V, T> aggregator,
                                                              final Windows<W> windows,
                                                              final StateStoreSupplier<WindowStore> storeSupplier) {
    Objects.requireNonNull(initializer, "initializer can't be null");
    Objects.requireNonNull(aggregator, "aggregator can't be null");
    Objects.requireNonNull(windows, "windows can't be null");
    Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
    return (KTable<Windowed<K>, T>) doAggregate(
            new KStreamWindowAggregate<>(windows, storeSupplier.name(), initializer, aggregator),
            AGGREGATE_NAME,
            storeSupplier
    );
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:KGroupedStreamImpl.java


示例6: doAggregate

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
private <T> KTable<K, T> doAggregate(
        final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
        final String functionName,
        final StateStoreSupplier storeSupplier) {

    final String aggFunctionName = topology.newName(functionName);

    final String sourceName = repartitionIfRequired(storeSupplier.name());

    topology.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
    topology.addStateStore(storeSupplier, aggFunctionName);

    return new KTableImpl<>(topology,
            aggFunctionName,
            aggregateSupplier,
            sourceName.equals(this.name) ? sourceNodes
                    : Collections.singleton(sourceName),
            storeSupplier.name(),
            isQueryable);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:KGroupedStreamImpl.java


示例7: createKeyValueStore

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
protected <K, V> KeyValueStore<K, V> createKeyValueStore(
        ProcessorContext context,
        Class<K> keyClass,
        Class<V> valueClass,
        boolean useContextSerdes) {

    StateStoreSupplier supplier;
    if (useContextSerdes) {
        supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde()).inMemory().maxEntries(10).build();
    } else {
        supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().maxEntries(10).build();
    }

    KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
    store.init(context, store);
    return store;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:InMemoryLRUCacheStoreTest.java


示例8: createKeyValueStore

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
protected <K, V> KeyValueStore<K, V> createKeyValueStore(
        ProcessorContext context,
        Class<K> keyClass,
        Class<V> valueClass,
        boolean useContextSerdes) {

    StateStoreSupplier supplier;
    if (useContextSerdes) {
        supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde()).inMemory().build();
    } else {
        supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().build();
    }

    KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
    store.init(context, store);
    return store;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:InMemoryKeyValueStoreTest.java


示例9: init

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
    this.context = context;

    KryoSerDe kryoSerDe = new KryoSerDe();
    Set<StateStoreSupplier> stateStoreSuppliers = getDefinedStateNames(stages)
            .map(s -> getStateStoreSupplier(StateStoreProvider.getStateStoreName(queryName, s), kryoSerDe, kryoSerDe, inMemory))
            .collect(Collectors.toSet());

    Serde<?> keySerde = this.context.keySerde();
    Serde<?> valSerde = this.context.valueSerde();

    TimedKeyValueSerDes<K, V> timedKeyValueSerDes = new TimedKeyValueSerDes(keySerde, valSerde);
    stateStoreSuppliers.add(getStateStoreSupplier(bufferStateStoreName, kryoSerDe,
            Serdes.serdeFrom(timedKeyValueSerDes, timedKeyValueSerDes), inMemory));

    NFASTateValueSerDe valueSerDe = new NFASTateValueSerDe(new ComputationStageSerDe(stages, keySerde, valSerde));
    stateStoreSuppliers.add(getStateStoreSupplier(nfaStateStoreName, kryoSerDe,
            Serdes.serdeFrom(valueSerDe, valueSerDe), inMemory));

    initializeStateStores(stateStoreSuppliers);
}
 
开发者ID:fhussonnois,项目名称:kafkastreams-cep,代码行数:24,代码来源:CEPProcessor.java


示例10: processingTopologyBuilder

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
private TopologyBuilder processingTopologyBuilder() {
    //create state store
    StateStoreSupplier machineToAvgCPUUsageStore
            = Stores.create(AVG_STORE_NAME)
                    .withStringKeys()
                    .withDoubleValues()
                    .inMemory()
                    .build();

    StateStoreSupplier machineToNumOfRecordsReadStore
            = Stores.create(NUM_RECORDS_STORE_NAME)
                    .withStringKeys()
                    .withIntegerValues()
                    .inMemory()
                    .build();

    TopologyBuilder builder = new TopologyBuilder();

    builder.addSource(SOURCE_NAME, TOPIC_NAME)
            .addProcessor(PROCESSOR_NAME, new ProcessorSupplier() {
                @Override
                public Processor get() {
                    return new CPUCumulativeAverageProcessor();
                }
            }, SOURCE_NAME)
            .addStateStore(machineToAvgCPUUsageStore, PROCESSOR_NAME)
            .addStateStore(machineToNumOfRecordsReadStore, PROCESSOR_NAME);

    LOGGER.info("Kafka streams processing topology ready");

    return builder;
}
 
开发者ID:abhirockzz,项目名称:docker-kafka-streams,代码行数:33,代码来源:CPUMetricStreamHandler.java


示例11: processingTopologyBuilder

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
private TopologyBuilder processingTopologyBuilder() {

        StateStoreSupplier machineToAvgCPUUsageStore
                = Stores.create(AVG_STORE_NAME)
                        .withStringKeys()
                        .withDoubleValues()
                        .inMemory()
                        .build();

        StateStoreSupplier machineToNumOfRecordsReadStore
                = Stores.create(NUM_RECORDS_STORE_NAME)
                        .withStringKeys()
                        .withIntegerValues()
                        .inMemory()
                        .build();

        TopologyBuilder builder = new TopologyBuilder();

        builder.addSource(SOURCE_NAME, TOPIC_NAME)
                .addProcessor(PROCESSOR_NAME, new ProcessorSupplier() {
                    @Override
                    public Processor get() {
                        return new CPUCumulativeAverageProcessor();
                    }
                }, SOURCE_NAME)
                .addStateStore(machineToAvgCPUUsageStore, PROCESSOR_NAME)
                .addStateStore(machineToNumOfRecordsReadStore, PROCESSOR_NAME);

        LOGGER.info("Kafka streams processing topology ready");

        return builder;
    }
 
开发者ID:abhirockzz,项目名称:kafka-streams-example,代码行数:33,代码来源:CPUMetricStreamHandler.java


示例12: createWindowedStateStore

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
private static <K, V> StateStoreSupplier createWindowedStateStore(final JoinWindows windows,
                                                                 final Serde<K> keySerde,
                                                                 final Serde<V> valueSerde,
                                                                 final String storeName) {
    return Stores.create(storeName)
        .withKeys(keySerde)
        .withValues(valueSerde)
        .persistent()
        .windowed(windows.size(), windows.maintainMs(), windows.segments, true)
        .build();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:12,代码来源:KStreamImpl.java


示例13: aggregate

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                  final Aggregator<? super K, ? super V, T> adder,
                                  final Aggregator<? super K, ? super V, T> subtractor,
                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
    Objects.requireNonNull(initializer, "initializer can't be null");
    Objects.requireNonNull(adder, "adder can't be null");
    Objects.requireNonNull(subtractor, "subtractor can't be null");
    Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
    ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(storeSupplier.name(), initializer, adder, subtractor);
    return doAggregate(aggregateSupplier, AGGREGATE_NAME, storeSupplier);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:13,代码来源:KGroupedTableImpl.java


示例14: doAggregate

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
                                     final String functionName,
                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
    String sinkName = topology.newName(KStreamImpl.SINK_NAME);
    String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
    String funcName = topology.newName(functionName);

    String topic = storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX;

    Serializer<? extends K> keySerializer = keySerde == null ? null : keySerde.serializer();
    Deserializer<? extends K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
    Serializer<? extends V> valueSerializer = valSerde == null ? null : valSerde.serializer();
    Deserializer<? extends V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();

    ChangedSerializer<? extends V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
    ChangedDeserializer<? extends V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);

    // send the aggregate key-value pairs to the intermediate topic for partitioning
    topology.addInternalTopic(topic);
    topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name);

    // read the intermediate topic
    topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);

    // aggregate the values with the aggregator and local store
    topology.addProcessor(funcName, aggregateSupplier, sourceName);
    topology.addStateStore(storeSupplier, funcName);

    // return the KTable representation with the intermediate topic as the sources
    return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), isQueryable);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:32,代码来源:KGroupedTableImpl.java


示例15: reduce

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@Override
public KTable<K, V> reduce(final Reducer<V> adder,
                           final Reducer<V> subtractor,
                           final StateStoreSupplier<KeyValueStore> storeSupplier) {
    Objects.requireNonNull(adder, "adder can't be null");
    Objects.requireNonNull(subtractor, "subtractor can't be null");
    Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
    ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(storeSupplier.name(), adder, subtractor);
    return doAggregate(aggregateSupplier, REDUCE_NAME, storeSupplier);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:11,代码来源:KGroupedTableImpl.java


示例16: filter

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@Override
public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName) {
    StateStoreSupplier<KeyValueStore> storeSupplier = null;
    if (queryableStoreName != null) {
        storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName);
    }
    return doFilter(predicate, storeSupplier, false);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:KTableImpl.java


示例17: filterNot

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@Override
public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName) {
    StateStoreSupplier<KeyValueStore> storeSupplier = null;
    if (queryableStoreName != null) {
        storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName);
    }
    return doFilter(predicate, storeSupplier, true);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:KTableImpl.java


示例18: mapValues

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@Override
public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper,
                                    final Serde<V1> valueSerde,
                                    final String queryableStoreName) {
    StateStoreSupplier<KeyValueStore> storeSupplier = null;
    if (queryableStoreName != null) {
        storeSupplier = keyValueStore(this.keySerde, valueSerde, queryableStoreName);
    }
    return doMapValues(mapper, valueSerde, storeSupplier);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:11,代码来源:KTableImpl.java


示例19: through

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@Override
public KTable<K, V> through(final Serde<K> keySerde,
                            final Serde<V> valSerde,
                            final StreamPartitioner<? super K, ? super V> partitioner,
                            final String topic,
                            final StateStoreSupplier<KeyValueStore> storeSupplier) {
    Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
    to(keySerde, valSerde, partitioner, topic);

    return topology.table(keySerde, valSerde, topic, storeSupplier);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:12,代码来源:KTableImpl.java


示例20: join

import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@Override
public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
                                 final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                 final StateStoreSupplier<KeyValueStore> storeSupplier) {
    Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
    return doJoin(other, joiner, false, false, storeSupplier);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:KTableImpl.java



注:本文中的org.apache.kafka.streams.processor.StateStoreSupplier类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java IBuildConfiguration类代码示例发布时间:2022-05-23
下一篇:
Java InternalPlatform类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap