本文整理汇总了Java中org.apache.kafka.streams.processor.StateStoreSupplier类的典型用法代码示例。如果您正苦于以下问题:Java StateStoreSupplier类的具体用法?Java StateStoreSupplier怎么用?Java StateStoreSupplier使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
StateStoreSupplier类属于org.apache.kafka.streams.processor包,在下文中一共展示了StateStoreSupplier类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: shouldDriveGlobalStore
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test
public void shouldDriveGlobalStore() throws Exception {
final StateStoreSupplier storeSupplier = Stores.create("my-store")
.withStringKeys().withStringValues().inMemory().disableLogging().build();
final String global = "global";
final String topic = "topic";
final TopologyBuilder topologyBuilder = this.builder
.addGlobalStore(storeSupplier, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor("my-store")));
driver = new ProcessorTopologyTestDriver(config, topologyBuilder);
final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get("my-store");
driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
assertEquals("value1", globalStore.get("key1"));
assertEquals("value2", globalStore.get("key2"));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:18,代码来源:ProcessorTopologyTest.java
示例2: doTable
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
private <K, V> KTable<K, V> doTable(final AutoOffsetReset offsetReset,
final Serde<K> keySerde,
final Serde<V> valSerde,
final TimestampExtractor timestampExtractor,
final String topic,
final StateStoreSupplier<KeyValueStore> storeSupplier,
final boolean isQueryable) {
final String source = newName(KStreamImpl.SOURCE_NAME);
final String name = newName(KTableImpl.SOURCE_NAME);
final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());
addSource(offsetReset, source, timestampExtractor, keySerde == null ? null : keySerde.deserializer(),
valSerde == null ? null : valSerde.deserializer(),
topic);
addProcessor(name, processorSupplier, source);
final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier,
keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable);
addStateStore(storeSupplier, name);
connectSourceStoreAndTopic(storeSupplier.name(), topic);
return kTable;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:KStreamBuilder.java
示例3: doFilter
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
final StateStoreSupplier<KeyValueStore> storeSupplier,
boolean isFilterNot) {
Objects.requireNonNull(predicate, "predicate can't be null");
String name = topology.newName(FILTER_NAME);
String internalStoreName = null;
if (storeSupplier != null) {
internalStoreName = storeSupplier.name();
}
KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, isFilterNot, internalStoreName);
topology.addProcessor(name, processorSupplier, this.name);
if (storeSupplier != null) {
topology.addStateStore(storeSupplier, name);
}
return new KTableImpl<>(topology, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, internalStoreName != null);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:KTableImpl.java
示例4: doMapValues
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
private <V1> KTable<K, V1> doMapValues(final ValueMapper<? super V, ? extends V1> mapper,
final Serde<V1> valueSerde,
final StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(mapper);
String name = topology.newName(MAPVALUES_NAME);
String internalStoreName = null;
if (storeSupplier != null) {
internalStoreName = storeSupplier.name();
}
KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(this, mapper, internalStoreName);
topology.addProcessor(name, processorSupplier, this.name);
if (storeSupplier != null) {
topology.addStateStore(storeSupplier, name);
return new KTableImpl<>(topology, name, processorSupplier, this.keySerde, valueSerde, sourceNodes, internalStoreName, true);
} else {
return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.queryableStoreName, false);
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:KTableImpl.java
示例5: aggregate
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Windows<W> windows,
final StateStoreSupplier<WindowStore> storeSupplier) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(windows, "windows can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return (KTable<Windowed<K>, T>) doAggregate(
new KStreamWindowAggregate<>(windows, storeSupplier.name(), initializer, aggregator),
AGGREGATE_NAME,
storeSupplier
);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:KGroupedStreamImpl.java
示例6: doAggregate
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
private <T> KTable<K, T> doAggregate(
final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
final String functionName,
final StateStoreSupplier storeSupplier) {
final String aggFunctionName = topology.newName(functionName);
final String sourceName = repartitionIfRequired(storeSupplier.name());
topology.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
topology.addStateStore(storeSupplier, aggFunctionName);
return new KTableImpl<>(topology,
aggFunctionName,
aggregateSupplier,
sourceName.equals(this.name) ? sourceNodes
: Collections.singleton(sourceName),
storeSupplier.name(),
isQueryable);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:KGroupedStreamImpl.java
示例7: createKeyValueStore
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
protected <K, V> KeyValueStore<K, V> createKeyValueStore(
ProcessorContext context,
Class<K> keyClass,
Class<V> valueClass,
boolean useContextSerdes) {
StateStoreSupplier supplier;
if (useContextSerdes) {
supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde()).inMemory().maxEntries(10).build();
} else {
supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().maxEntries(10).build();
}
KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
store.init(context, store);
return store;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:InMemoryLRUCacheStoreTest.java
示例8: createKeyValueStore
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
protected <K, V> KeyValueStore<K, V> createKeyValueStore(
ProcessorContext context,
Class<K> keyClass,
Class<V> valueClass,
boolean useContextSerdes) {
StateStoreSupplier supplier;
if (useContextSerdes) {
supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde()).inMemory().build();
} else {
supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().build();
}
KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
store.init(context, store);
return store;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:InMemoryKeyValueStoreTest.java
示例9: init
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
KryoSerDe kryoSerDe = new KryoSerDe();
Set<StateStoreSupplier> stateStoreSuppliers = getDefinedStateNames(stages)
.map(s -> getStateStoreSupplier(StateStoreProvider.getStateStoreName(queryName, s), kryoSerDe, kryoSerDe, inMemory))
.collect(Collectors.toSet());
Serde<?> keySerde = this.context.keySerde();
Serde<?> valSerde = this.context.valueSerde();
TimedKeyValueSerDes<K, V> timedKeyValueSerDes = new TimedKeyValueSerDes(keySerde, valSerde);
stateStoreSuppliers.add(getStateStoreSupplier(bufferStateStoreName, kryoSerDe,
Serdes.serdeFrom(timedKeyValueSerDes, timedKeyValueSerDes), inMemory));
NFASTateValueSerDe valueSerDe = new NFASTateValueSerDe(new ComputationStageSerDe(stages, keySerde, valSerde));
stateStoreSuppliers.add(getStateStoreSupplier(nfaStateStoreName, kryoSerDe,
Serdes.serdeFrom(valueSerDe, valueSerDe), inMemory));
initializeStateStores(stateStoreSuppliers);
}
开发者ID:fhussonnois,项目名称:kafkastreams-cep,代码行数:24,代码来源:CEPProcessor.java
示例10: processingTopologyBuilder
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
private TopologyBuilder processingTopologyBuilder() {
//create state store
StateStoreSupplier machineToAvgCPUUsageStore
= Stores.create(AVG_STORE_NAME)
.withStringKeys()
.withDoubleValues()
.inMemory()
.build();
StateStoreSupplier machineToNumOfRecordsReadStore
= Stores.create(NUM_RECORDS_STORE_NAME)
.withStringKeys()
.withIntegerValues()
.inMemory()
.build();
TopologyBuilder builder = new TopologyBuilder();
builder.addSource(SOURCE_NAME, TOPIC_NAME)
.addProcessor(PROCESSOR_NAME, new ProcessorSupplier() {
@Override
public Processor get() {
return new CPUCumulativeAverageProcessor();
}
}, SOURCE_NAME)
.addStateStore(machineToAvgCPUUsageStore, PROCESSOR_NAME)
.addStateStore(machineToNumOfRecordsReadStore, PROCESSOR_NAME);
LOGGER.info("Kafka streams processing topology ready");
return builder;
}
开发者ID:abhirockzz,项目名称:docker-kafka-streams,代码行数:33,代码来源:CPUMetricStreamHandler.java
示例11: processingTopologyBuilder
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
private TopologyBuilder processingTopologyBuilder() {
StateStoreSupplier machineToAvgCPUUsageStore
= Stores.create(AVG_STORE_NAME)
.withStringKeys()
.withDoubleValues()
.inMemory()
.build();
StateStoreSupplier machineToNumOfRecordsReadStore
= Stores.create(NUM_RECORDS_STORE_NAME)
.withStringKeys()
.withIntegerValues()
.inMemory()
.build();
TopologyBuilder builder = new TopologyBuilder();
builder.addSource(SOURCE_NAME, TOPIC_NAME)
.addProcessor(PROCESSOR_NAME, new ProcessorSupplier() {
@Override
public Processor get() {
return new CPUCumulativeAverageProcessor();
}
}, SOURCE_NAME)
.addStateStore(machineToAvgCPUUsageStore, PROCESSOR_NAME)
.addStateStore(machineToNumOfRecordsReadStore, PROCESSOR_NAME);
LOGGER.info("Kafka streams processing topology ready");
return builder;
}
开发者ID:abhirockzz,项目名称:kafka-streams-example,代码行数:33,代码来源:CPUMetricStreamHandler.java
示例12: createWindowedStateStore
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
private static <K, V> StateStoreSupplier createWindowedStateStore(final JoinWindows windows,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final String storeName) {
return Stores.create(storeName)
.withKeys(keySerde)
.withValues(valueSerde)
.persistent()
.windowed(windows.size(), windows.maintainMs(), windows.segments, true)
.build();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:12,代码来源:KStreamImpl.java
示例13: aggregate
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> adder,
final Aggregator<? super K, ? super V, T> subtractor,
final StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(adder, "adder can't be null");
Objects.requireNonNull(subtractor, "subtractor can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(storeSupplier.name(), initializer, adder, subtractor);
return doAggregate(aggregateSupplier, AGGREGATE_NAME, storeSupplier);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:13,代码来源:KGroupedTableImpl.java
示例14: doAggregate
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
final String functionName,
final StateStoreSupplier<KeyValueStore> storeSupplier) {
String sinkName = topology.newName(KStreamImpl.SINK_NAME);
String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
String funcName = topology.newName(functionName);
String topic = storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
Serializer<? extends K> keySerializer = keySerde == null ? null : keySerde.serializer();
Deserializer<? extends K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
Serializer<? extends V> valueSerializer = valSerde == null ? null : valSerde.serializer();
Deserializer<? extends V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
ChangedSerializer<? extends V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
ChangedDeserializer<? extends V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
// send the aggregate key-value pairs to the intermediate topic for partitioning
topology.addInternalTopic(topic);
topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name);
// read the intermediate topic
topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
// aggregate the values with the aggregator and local store
topology.addProcessor(funcName, aggregateSupplier, sourceName);
topology.addStateStore(storeSupplier, funcName);
// return the KTable representation with the intermediate topic as the sources
return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), isQueryable);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:32,代码来源:KGroupedTableImpl.java
示例15: reduce
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@Override
public KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor,
final StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(adder, "adder can't be null");
Objects.requireNonNull(subtractor, "subtractor can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(storeSupplier.name(), adder, subtractor);
return doAggregate(aggregateSupplier, REDUCE_NAME, storeSupplier);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:11,代码来源:KGroupedTableImpl.java
示例16: filter
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@Override
public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName) {
StateStoreSupplier<KeyValueStore> storeSupplier = null;
if (queryableStoreName != null) {
storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName);
}
return doFilter(predicate, storeSupplier, false);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:KTableImpl.java
示例17: filterNot
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@Override
public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName) {
StateStoreSupplier<KeyValueStore> storeSupplier = null;
if (queryableStoreName != null) {
storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName);
}
return doFilter(predicate, storeSupplier, true);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:KTableImpl.java
示例18: mapValues
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@Override
public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper,
final Serde<V1> valueSerde,
final String queryableStoreName) {
StateStoreSupplier<KeyValueStore> storeSupplier = null;
if (queryableStoreName != null) {
storeSupplier = keyValueStore(this.keySerde, valueSerde, queryableStoreName);
}
return doMapValues(mapper, valueSerde, storeSupplier);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:11,代码来源:KTableImpl.java
示例19: through
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@Override
public KTable<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
final StreamPartitioner<? super K, ? super V> partitioner,
final String topic,
final StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
to(keySerde, valSerde, partitioner, topic);
return topology.table(keySerde, valSerde, topic, storeSupplier);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:12,代码来源:KTableImpl.java
示例20: join
import org.apache.kafka.streams.processor.StateStoreSupplier; //导入依赖的package包/类
@Override
public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
final StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return doJoin(other, joiner, false, false, storeSupplier);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:KTableImpl.java
注:本文中的org.apache.kafka.streams.processor.StateStoreSupplier类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论