本文整理汇总了Java中org.apache.kafka.streams.state.KeyValueIterator类的典型用法代码示例。如果您正苦于以下问题:Java KeyValueIterator类的具体用法?Java KeyValueIterator怎么用?Java KeyValueIterator使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
KeyValueIterator类属于org.apache.kafka.streams.state包,在下文中一共展示了KeyValueIterator类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: notFoundWithNoResult
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Test
public void notFoundWithNoResult(TestContext context){
KafkaStreams streamMock = mock(KafkaStreams.class);
ReadOnlyKeyValueStore<Object, Object> storeMock = mock(ReadOnlyKeyValueStore.class);
KeyValueIterator<Object, Object> iteratorMock = mock(KeyValueIterator.class);
when(streamMock.store(eq("store"), any(QueryableStoreType.class))).thenReturn(storeMock);
SimpleKeyValueIterator iterator = new SimpleKeyValueIterator();
when(storeMock.range(any(), any())).thenReturn(iterator);
rule.vertx().deployVerticle(new RangeKeyValueQueryVerticle("host", streamMock), context.asyncAssertSuccess(deployment->{
RangeKeyValueQuery query = new RangeKeyValueQuery("store", Serdes.String().getClass().getName(), Serdes.String().getClass().getName(), "key".getBytes(), "key".getBytes());
rule.vertx().eventBus().send(Config.RANGE_KEY_VALUE_QUERY_ADDRESS_PREFIX + "host", query, context.asyncAssertSuccess(reply ->{
context.assertTrue(reply.body() instanceof MultiValuedKeyValueQueryResponse);
MultiValuedKeyValueQueryResponse response = (MultiValuedKeyValueQueryResponse) reply.body();
context.assertEquals(0, response.getResults().size());
context.assertTrue(iterator.closed);
}));
}));
}
开发者ID:ftrossbach,项目名称:kiqr,代码行数:27,代码来源:RangeKeyValuesQueryVerticleTest.java
示例2: getLocalMetrics
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
/**
* Query local state store to extract metrics
*
* @return local Metrics
*/
private Metrics getLocalMetrics() {
HostInfo thisInstance = GlobalAppState.getInstance().getHostPortInfo();
KafkaStreams ks = GlobalAppState.getInstance().getKafkaStreams();
String source = thisInstance.host() + ":" + thisInstance.port();
Metrics localMetrics = new Metrics();
ReadOnlyKeyValueStore<String, Double> averageStore = ks
.store(storeName,
QueryableStoreTypes.<String, Double>keyValueStore());
LOGGER.log(Level.INFO, "Entries in store {0}", averageStore.approximateNumEntries());
KeyValueIterator<String, Double> storeIterator = averageStore.all();
while (storeIterator.hasNext()) {
KeyValue<String, Double> kv = storeIterator.next();
localMetrics.add(source, kv.key, String.valueOf(kv.value));
}
LOGGER.log(Level.INFO, "Local store state {0}", localMetrics);
return localMetrics;
}
开发者ID:abhirockzz,项目名称:docker-kafka-streams,代码行数:28,代码来源:MetricsResource.java
示例3: hasNextCondition
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Override
public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) {
return new HasNextCondition() {
@Override
public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
while (iterator.hasNext()) {
final Bytes bytes = iterator.peekNextKey();
final Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get());
final long time = WindowStoreUtils.timestampFromBinaryKey(bytes.get());
if (keyBytes.compareTo(binaryKeyFrom) >= 0
&& keyBytes.compareTo(binaryKeyTo) <= 0
&& time >= from
&& time <= to) {
return true;
}
iterator.next();
}
return false;
}
};
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:WindowKeySchema.java
示例4: fetch
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
private KeyValueIterator<Windowed<K>, V> fetch(Fetcher<K, V> fetcher) {
final List<ReadOnlySessionStore<K, V>> stores = storeProvider.stores(storeName, queryableStoreType);
for (final ReadOnlySessionStore<K, V> store : stores) {
try {
final KeyValueIterator<Windowed<K>, V> result = fetcher.fetch(store);
if (!result.hasNext()) {
result.close();
} else {
return result;
}
} catch (final InvalidStateStoreException ise) {
throw new InvalidStateStoreException("State store [" + storeName + "] is not available anymore" +
" and may have been migrated to another instance; " +
"please re-discover its location from the state metadata.");
}
}
return KeyValueIterators.emptyIterator();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:CompositeReadOnlySessionStore.java
示例5: fetch
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
public <IteratorType extends KeyValueIterator<?, V>> IteratorType fetch(Fetcher<K, V, IteratorType> fetcher) {
final List<ReadOnlyWindowStore<K, V>> stores = provider.stores(storeName, windowStoreType);
for (ReadOnlyWindowStore<K, V> windowStore : stores) {
try {
final IteratorType result = fetcher.fetch(windowStore);
if (!result.hasNext()) {
result.close();
} else {
return result;
}
} catch (InvalidStateStoreException e) {
throw new InvalidStateStoreException(
"State store is not available anymore and may have been migrated to another instance; " +
"please re-discover its location from the state metadata.");
}
}
return fetcher.empty();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:CompositeReadOnlyWindowStore.java
示例6: findSessions
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
validateStoreOpen();
final Bytes binarySessionId = Bytes.wrap(serdes.rawKey(key));
final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(binarySessionId, earliestSessionEndTime));
final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRangeFixedSize(binarySessionId, latestSessionStartTime));
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, cacheKeyFrom, cacheKeyTo);
final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = bytesStore.findSessions(
binarySessionId, earliestSessionEndTime, latestSessionStartTime
);
final HasNextCondition hasNextCondition = keySchema.hasNextCondition(binarySessionId,
binarySessionId,
earliestSessionEndTime,
latestSessionStartTime);
final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction);
return new MergedSortedCacheSessionStoreIterator<>(filteredCacheIterator, storeIterator, serdes, cacheFunction);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:CachingSessionStore.java
示例7: shouldFetchRangeCorrectlyAcrossSegments
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Test
public void shouldFetchRangeCorrectlyAcrossSegments() throws Exception {
final Windowed<String> a1 = new Windowed<>("a", new SessionWindow(0, 0));
final Windowed<String> aa1 = new Windowed<>("aa", new SessionWindow(0, 0));
final Windowed<String> a2 = new Windowed<>("a", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL, Segments.MIN_SEGMENT_INTERVAL));
final Windowed<String> a3 = new Windowed<>("a", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2));
final Windowed<String> aa3 = new Windowed<>("aa", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2));
cachingStore.put(a1, 1L);
cachingStore.put(aa1, 1L);
cachingStore.put(a2, 2L);
cachingStore.put(a3, 3L);
cachingStore.put(aa3, 3L);
cachingStore.flush();
final KeyValueIterator<Windowed<String>, Long> rangeResults = cachingStore.findSessions("a", "aa", 0, Segments.MIN_SEGMENT_INTERVAL * 2);
assertEquals(a1, rangeResults.next().key);
assertEquals(aa1, rangeResults.next().key);
assertEquals(a2, rangeResults.next().key);
assertEquals(a3, rangeResults.next().key);
assertEquals(aa3, rangeResults.next().key);
assertFalse(rangeResults.hasNext());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:23,代码来源:CachingSessionStoreTest.java
示例8: verifyStateStore
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
private void verifyStateStore(final KafkaStreams streams, final Set<KeyValue<Long, Long>> expectedStoreContent) {
ReadOnlyKeyValueStore<Long, Long> store = null;
final long maxWaitingTime = System.currentTimeMillis() + 300000L;
while (System.currentTimeMillis() < maxWaitingTime) {
try {
store = streams.store(storeName, QueryableStoreTypes.<Long, Long>keyValueStore());
break;
} catch (final InvalidStateStoreException okJustRetry) {
try {
Thread.sleep(5000L);
} catch (final Exception ignore) { }
}
}
final KeyValueIterator<Long, Long> it = store.all();
while (it.hasNext()) {
assertTrue(expectedStoreContent.remove(it.next()));
}
assertTrue(expectedStoreContent.isEmpty());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:23,代码来源:EosIntegrationTest.java
示例9: shouldMergeSessions
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Test
public void shouldMergeSessions() throws Exception {
context.setTime(0);
final String sessionId = "mel";
processor.process(sessionId, "first");
assertTrue(sessionStore.findSessions(sessionId, 0, 0).hasNext());
// move time beyond gap
context.setTime(GAP_MS + 1);
processor.process(sessionId, "second");
assertTrue(sessionStore.findSessions(sessionId, GAP_MS + 1, GAP_MS + 1).hasNext());
// should still exist as not within gap
assertTrue(sessionStore.findSessions(sessionId, 0, 0).hasNext());
// move time back
context.setTime(GAP_MS / 2);
processor.process(sessionId, "third");
final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions(sessionId, 0, GAP_MS + 1);
final KeyValue<Windowed<String>, Long> kv = iterator.next();
assertEquals(Long.valueOf(3), kv.value);
assertFalse(iterator.hasNext());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:KStreamSessionWindowAggregateProcessorTest.java
示例10: shouldRemoveMergedSessionsFromStateStore
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Test
public void shouldRemoveMergedSessionsFromStateStore() throws Exception {
context.setTime(0);
processor.process("a", "1");
// first ensure it is in the store
final KeyValueIterator<Windowed<String>, Long> a1 = sessionStore.findSessions("a", 0, 0);
assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), a1.next());
context.setTime(100);
processor.process("a", "2");
// a1 from above should have been removed
// should have merged session in store
final KeyValueIterator<Windowed<String>, Long> a2 = sessionStore.findSessions("a", 0, 100);
assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 100)), 2L), a2.next());
assertFalse(a2.hasNext());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:18,代码来源:KStreamSessionWindowAggregateProcessorTest.java
示例11: shouldIterateOverRange
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Test
public void shouldIterateOverRange() throws Exception {
final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}};
for (int i = 0; i < bytes.length; i += 2) {
store.put(Bytes.wrap(bytes[i]), bytes[i]);
cache.put(namespace, Bytes.wrap(bytes[i + 1]), new LRUCacheEntry(bytes[i + 1]));
}
final Bytes from = Bytes.wrap(new byte[]{2});
final Bytes to = Bytes.wrap(new byte[]{9});
final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", store.range(from, to));
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, from, to);
final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes);
byte[][] values = new byte[8][];
int index = 0;
int bytesIndex = 2;
while (iterator.hasNext()) {
final byte[] value = iterator.next().value;
values[index++] = value;
assertArrayEquals(bytes[bytesIndex++], value);
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:MergedSortedCacheKeyValueStoreIteratorTest.java
示例12: shouldPutFetchFromCache
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Test
public void shouldPutFetchFromCache() throws Exception {
cachingStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
cachingStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 1L);
cachingStore.put(new Windowed<>("b", new SessionWindow(0, 0)), 1L);
assertEquals(3, cache.size());
final KeyValueIterator<Windowed<String>, Long> a = cachingStore.findSessions("a", 0, 0);
final KeyValueIterator<Windowed<String>, Long> b = cachingStore.findSessions("b", 0, 0);
assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), a.next());
assertEquals(KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 1L), b.next());
assertFalse(a.hasNext());
assertFalse(b.hasNext());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:CachingSessionStoreTest.java
示例13: punctuate
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Override
public void punctuate(long timestamp) {
KeyValueIterator<Long, List<OrderGoods>> iter = this.kvStore.all();
while (iter.hasNext()) {
KeyValue<Long, List<OrderGoods>> entry = (KeyValue<Long, List<OrderGoods>>) iter.next();
context.forward(entry.key, entry.value.toString());
}
iter.close();
context.commit();
}
开发者ID:jiumao-org,项目名称:wechat-mall,代码行数:13,代码来源:GoodOrderProcessor.java
示例14: process
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Override
public void process(final K key, final V value) {
// if the key is null, we do not need proceed aggregating
// the record with the table
if (key == null) {
return;
}
final long timestamp = context().timestamp();
final List<KeyValue<Windowed<K>, T>> merged = new ArrayList<>();
final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp);
SessionWindow mergedWindow = newSessionWindow;
T agg = initializer.apply();
try (final KeyValueIterator<Windowed<K>, T> iterator = store.findSessions(key, timestamp - windows.inactivityGap(),
timestamp + windows.inactivityGap())) {
while (iterator.hasNext()) {
final KeyValue<Windowed<K>, T> next = iterator.next();
merged.add(next);
agg = sessionMerger.apply(key, agg, next.value);
mergedWindow = mergeSessionWindow(mergedWindow, (SessionWindow) next.key.window());
}
}
agg = aggregator.apply(key, value, agg);
final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow);
if (!mergedWindow.equals(newSessionWindow)) {
for (final KeyValue<Windowed<K>, T> session : merged) {
store.remove(session.key);
tupleForwarder.maybeForward(session.key, null, session.value);
}
}
store.put(sessionKey, agg);
tupleForwarder.maybeForward(sessionKey, agg, null);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:36,代码来源:KStreamSessionWindowAggregate.java
示例15: shouldIterateOverRange
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Test
public void shouldIterateOverRange() throws Exception {
int items = addItemsToCache();
final KeyValueIterator<String, String> range = store.range(String.valueOf(0), String.valueOf(items));
final List<String> results = new ArrayList<>();
while (range.hasNext()) {
results.add(range.next().key);
}
assertEquals(items, results.size());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:11,代码来源:CachingKeyValueStoreTest.java
示例16: stateTable
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@SuppressWarnings("unchecked")
public <K, V> Map<K, V> stateTable(String name) throws EmptyInputException, NoTopologyException {
return withProcessedDriver(driver -> {
KeyValueIterator<Object, Object> records = driver.getKeyValueStore(name).all();
Map<K, V> result = new LinkedHashMap<>();
records.forEachRemaining(record -> result.put((K) record.key, (V) record.value));
records.close();
return result;
});
}
开发者ID:carlosmenezes,项目名称:mockafka,代码行数:12,代码来源:MockafkaBuilder.java
示例17: MergedSortedCacheSessionStoreIterator
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
MergedSortedCacheSessionStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator,
final StateSerdes<K, AGG> serdes,
final SegmentedCacheFunction cacheFunction) {
super(cacheIterator, storeIterator);
this.serdes = serdes;
this.cacheFunction = cacheFunction;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:MergedSortedCacheSessionStoreIterator.java
示例18: findSessions
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Override
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) {
final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(
Bytes.wrap(serdes.rawKey(keyFrom)), Bytes.wrap(serdes.rawKey(keyTo)),
earliestSessionEndTime, latestSessionStartTime
);
return new WrappedSessionStoreIterator<>(bytesIterator, serdes);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:RocksDBSessionStore.java
示例19: toList
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
private List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Bytes, byte[]> iterator) {
final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
while (iterator.hasNext()) {
final KeyValue<Bytes, byte[]> next = iterator.next();
final KeyValue<Windowed<String>, Long> deserialized
= KeyValue.pair(SessionKeySerde.from(next.key.get(), Serdes.String().deserializer(), "dummy"), Serdes.Long().deserializer().deserialize("", next.value));
results.add(deserialized);
}
return results;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:11,代码来源:RocksDBSegmentedBytesStoreTest.java
示例20: range
import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Override
public synchronized KeyValueIterator<K, V> range(K from, K to) {
validateStoreOpen();
// query rocksdb
final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(name, db.newIterator(), serdes, from, to);
openIterators.add(rocksDBRangeIterator);
return rocksDBRangeIterator;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:RocksDBStore.java
注:本文中的org.apache.kafka.streams.state.KeyValueIterator类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论