本文整理汇总了Java中org.apache.kafka.streams.kstream.KTable类的典型用法代码示例。如果您正苦于以下问题:Java KTable类的具体用法?Java KTable怎么用?Java KTable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
KTable类属于org.apache.kafka.streams.kstream包,在下文中一共展示了KTable类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: doFilter
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
final StateStoreSupplier<KeyValueStore> storeSupplier,
boolean isFilterNot) {
Objects.requireNonNull(predicate, "predicate can't be null");
String name = topology.newName(FILTER_NAME);
String internalStoreName = null;
if (storeSupplier != null) {
internalStoreName = storeSupplier.name();
}
KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, isFilterNot, internalStoreName);
topology.addProcessor(name, processorSupplier, this.name);
if (storeSupplier != null) {
topology.addStateStore(storeSupplier, name);
}
return new KTableImpl<>(topology, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, internalStoreName != null);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:KTableImpl.java
示例2: doMapValues
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
private <V1> KTable<K, V1> doMapValues(final ValueMapper<? super V, ? extends V1> mapper,
final Serde<V1> valueSerde,
final StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(mapper);
String name = topology.newName(MAPVALUES_NAME);
String internalStoreName = null;
if (storeSupplier != null) {
internalStoreName = storeSupplier.name();
}
KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(this, mapper, internalStoreName);
topology.addProcessor(name, processorSupplier, this.name);
if (storeSupplier != null) {
topology.addStateStore(storeSupplier, name);
return new KTableImpl<>(topology, name, processorSupplier, this.keySerde, valueSerde, sourceNodes, internalStoreName, true);
} else {
return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.queryableStoreName, false);
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:KTableImpl.java
示例3: joinTopology
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
public static KStreamBuilder joinTopology(KStreamBuilder builder) {
KStream<String, Integer> kStreamA = builder.stream(stringSerde, integerSerde, INPUT_TOPIC_A);
KStream<String, Integer> kStreamB = builder.stream(stringSerde, integerSerde, INPUT_TOPIC_B);
KTable<String, Integer> table = kStreamA
.groupByKey(stringSerde, integerSerde)
.aggregate(() -> 0, (k, v, t) -> v, integerSerde, STORAGE_NAME);
kStreamB
.leftJoin(table, (v1, v2) -> v1 + v2, stringSerde, integerSerde)
.to(stringSerde, integerSerde, OUTPUT_TOPIC_A);
kStreamB
.leftJoin(table, (v1, v2) -> v1 - v2, stringSerde, integerSerde)
.to(stringSerde, integerSerde, OUTPUT_TOPIC_B);
return builder;
}
开发者ID:carlosmenezes,项目名称:mockafka,代码行数:19,代码来源:TopologyUtil.java
示例4: aggregate
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Windows<W> windows,
final StateStoreSupplier<WindowStore> storeSupplier) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(windows, "windows can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return (KTable<Windowed<K>, T>) doAggregate(
new KStreamWindowAggregate<>(windows, storeSupplier.name(), initializer, aggregator),
AGGREGATE_NAME,
storeSupplier
);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:KGroupedStreamImpl.java
示例5: doAggregate
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
private <T> KTable<K, T> doAggregate(
final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
final String functionName,
final StateStoreSupplier storeSupplier) {
final String aggFunctionName = topology.newName(functionName);
final String sourceName = repartitionIfRequired(storeSupplier.name());
topology.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
topology.addStateStore(storeSupplier, aggFunctionName);
return new KTableImpl<>(topology,
aggFunctionName,
aggregateSupplier,
sourceName.equals(this.name) ? sourceNodes
: Collections.singleton(sourceName),
storeSupplier.name(),
isQueryable);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:KGroupedStreamImpl.java
示例6: join
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
private KTable<String, String> join(final KTable<String, String> first,
final KTable<String, String> second,
final JoinType joinType,
final String queryableName) {
final ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
@Override
public String apply(final String value1, final String value2) {
return value1 + "-" + value2;
}
};
switch (joinType) {
case INNER:
return first.join(second, joiner, Serdes.String(), queryableName);
case LEFT:
return first.leftJoin(second, joiner, Serdes.String(), queryableName);
case OUTER:
return first.outerJoin(second, joiner, Serdes.String(), queryableName);
}
throw new RuntimeException("Unknown join type.");
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:23,代码来源:KTableKTableJoinIntegrationTest.java
示例7: testJoin
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void testJoin() throws Exception {
final KStreamBuilder builder = new KStreamBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
final KTable<Integer, String> table1;
final KTable<Integer, String> table2;
final KTable<Integer, String> joined;
final MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
joined.toStream().process(processor);
doTestJoin(builder, expectedKeys, processor, joined);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KTableKTableJoinTest.java
示例8: testNotSendingOldValues
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void testNotSendingOldValues() throws Exception {
final KStreamBuilder builder = new KStreamBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
final KTable<Integer, String> table1;
final KTable<Integer, String> table2;
final KTable<Integer, String> joined;
final MockProcessorSupplier<Integer, String> proc;
table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
proc = new MockProcessorSupplier<>();
builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, false);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:KTableKTableJoinTest.java
示例9: testQueryableNotSendingOldValues
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void testQueryableNotSendingOldValues() throws Exception {
final KStreamBuilder builder = new KStreamBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
final KTable<Integer, String> table1;
final KTable<Integer, String> table2;
final KTable<Integer, String> joined;
final MockProcessorSupplier<Integer, String> proc;
table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName");
proc = new MockProcessorSupplier<>();
builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, false);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:KTableKTableJoinTest.java
示例10: testSendingOldValues
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void testSendingOldValues() throws Exception {
final KStreamBuilder builder = new KStreamBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
final KTable<Integer, String> table1;
final KTable<Integer, String> table2;
final KTable<Integer, String> joined;
final MockProcessorSupplier<Integer, String> proc;
table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
proc = new MockProcessorSupplier<>();
builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, true);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:KTableKTableJoinTest.java
示例11: testKTable
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void testKTable() {
final KStreamBuilder builder = new KStreamBuilder();
String topic1 = "topic1";
KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
table1.toStream().process(proc1);
driver = new KStreamTestDriver(builder, stateDir);
driver.process(topic1, "A", 1);
driver.process(topic1, "B", 2);
driver.process(topic1, "C", 3);
driver.process(topic1, "D", 4);
driver.flushState();
driver.process(topic1, "A", null);
driver.process(topic1, "B", null);
driver.flushState();
assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4", "A:null", "B:null"), proc1.processed);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:KTableSourceTest.java
示例12: testKTable
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void testKTable() {
final KStreamBuilder builder = new KStreamBuilder();
String topic1 = "topic1";
KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
@Override
public Integer apply(CharSequence value) {
return value.charAt(0) - 48;
}
});
MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
table2.toStream().process(proc2);
doTestKTable(builder, topic1, proc2);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KTableMapValuesTest.java
示例13: testQueryableKTable
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void testQueryableKTable() {
final KStreamBuilder builder = new KStreamBuilder();
String topic1 = "topic1";
KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
@Override
public Integer apply(CharSequence value) {
return value.charAt(0) - 48;
}
}, Serdes.Integer(), "anyName");
MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
table2.toStream().process(proc2);
doTestKTable(builder, topic1, proc2);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KTableMapValuesTest.java
示例14: shouldCountSessionWindowsWithInternalStoreName
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void shouldCountSessionWindowsWithInternalStoreName() throws Exception {
final Map<Windowed<String>, Long> results = new HashMap<>();
KTable table = groupedStream.count(SessionWindows.with(30));
table.foreach(new ForeachAction<Windowed<String>, Long>() {
@Override
public void apply(final Windowed<String> key, final Long value) {
results.put(key, value);
}
});
doCountSessionWindows(results);
assertNull(table.queryableStoreName());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:KGroupedStreamImplTest.java
示例15: shouldReduceSessionWindows
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void shouldReduceSessionWindows() throws Exception {
final Map<Windowed<String>, String> results = new HashMap<>();
KTable table = groupedStream.reduce(
new Reducer<String>() {
@Override
public String apply(final String value1, final String value2) {
return value1 + ":" + value2;
}
}, SessionWindows.with(30),
"session-store");
table.foreach(new ForeachAction<Windowed<String>, String>() {
@Override
public void apply(final Windowed<String> key, final String value) {
results.put(key, value);
}
});
doReduceSessionWindows(results);
assertEquals(table.queryableStoreName(), "session-store");
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:KGroupedStreamImplTest.java
示例16: shouldReduceSessionWindowsWithInternalStoreName
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void shouldReduceSessionWindowsWithInternalStoreName() throws Exception {
final Map<Windowed<String>, String> results = new HashMap<>();
KTable table = groupedStream.reduce(
new Reducer<String>() {
@Override
public String apply(final String value1, final String value2) {
return value1 + ":" + value2;
}
}, SessionWindows.with(30));
table.foreach(new ForeachAction<Windowed<String>, String>() {
@Override
public void apply(final Windowed<String> key, final String value) {
results.put(key, value);
}
});
doReduceSessionWindows(results);
assertNull(table.queryableStoreName());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KGroupedStreamImplTest.java
示例17: shouldReduce
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void shouldReduce() throws Exception {
final String topic = "input";
final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
@Override
public KeyValue<String, Integer> apply(String key, Number value) {
return KeyValue.pair(key, value.intValue());
}
};
final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
.groupBy(intProjection)
.reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR, "reduced");
doShouldReduce(reduced, topic);
assertEquals(reduced.queryableStoreName(), "reduced");
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:KGroupedTableImplTest.java
示例18: shouldReduceWithInternalStoreName
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void shouldReduceWithInternalStoreName() throws Exception {
final String topic = "input";
final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
@Override
public KeyValue<String, Integer> apply(String key, Number value) {
return KeyValue.pair(key, value.intValue());
}
};
final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
.groupBy(intProjection)
.reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR);
doShouldReduce(reduced, topic);
assertNull(reduced.queryableStoreName());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:KGroupedTableImplTest.java
示例19: doTestKTable
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
private void doTestKTable(final KStreamBuilder builder, final KTable<String, Integer> table2,
final KTable<String, Integer> table3, final String topic1) {
MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
table2.toStream().process(proc2);
table3.toStream().process(proc3);
driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.Integer());
driver.process(topic1, "A", 1);
driver.process(topic1, "B", 2);
driver.process(topic1, "C", 3);
driver.process(topic1, "D", 4);
driver.flushState();
driver.process(topic1, "A", null);
driver.process(topic1, "B", null);
driver.flushState();
proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
proc3.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:KTableFilterTest.java
示例20: join
import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Override
public <V1, R> KStream<K, R> join(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
if (repartitionRequired) {
final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(keySerde,
valueSerde, null);
return thisStreamRepartitioned.doStreamTableJoin(other, joiner, false);
} else {
return doStreamTableJoin(other, joiner, false);
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:KStreamImpl.java
注:本文中的org.apache.kafka.streams.kstream.KTable类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论