本文整理汇总了Java中org.apache.samza.storage.kv.KeyValueStore类的典型用法代码示例。如果您正苦于以下问题:Java KeyValueStore类的具体用法?Java KeyValueStore怎么用?Java KeyValueStore使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
KeyValueStore类属于org.apache.samza.storage.kv包,在下文中一共展示了KeyValueStore类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: handleMessage
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
@Override
public Collection<JM> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
try {
KeyValueStore<K, TimestampedValue<M>> thisState = thisPartialJoinFn.getState();
KeyValueStore<K, TimestampedValue<OM>> otherState = otherPartialJoinFn.getState();
K key = thisPartialJoinFn.getKey(message);
thisState.put(key, new TimestampedValue<>(message, clock.currentTimeMillis()));
TimestampedValue<OM> otherMessage = otherState.get(key);
long now = clock.currentTimeMillis();
if (otherMessage != null && otherMessage.getTimestamp() > now - ttlMs) {
JM joinResult = thisPartialJoinFn.apply(message, otherMessage.getValue());
return Collections.singletonList(joinResult);
}
} catch (Exception e) {
throw new SamzaException("Error handling message in PartialJoinOperatorImpl " + getOpImplId(), e);
}
return Collections.emptyList();
}
开发者ID:apache,项目名称:samza,代码行数:21,代码来源:PartialJoinOperatorImpl.java
示例2: handleInit
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
@Override
protected void handleInit(Config config, TaskContext context) {
WindowInternal<M, K, Object> window = windowOpSpec.getWindow();
KeyValueStore<TimeSeriesKey<K>, Object> store =
(KeyValueStore<TimeSeriesKey<K>, Object>) context.getStore(windowOpSpec.getOpId());
// For aggregating windows, we use the store in over-write mode since we only retain the aggregated
// value. Else, we use the store in append-mode.
if (foldLeftFn != null) {
foldLeftFn.init(config, context);
timeSeriesStore = new TimeSeriesStoreImpl(store, false);
} else {
timeSeriesStore = new TimeSeriesStoreImpl(store, true);
}
}
开发者ID:apache,项目名称:samza,代码行数:17,代码来源:WindowOperatorImpl.java
示例3: getStore
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
/**
* Returns the KeyValueStore associated with the given key
*
* @param store The key from the KeyValueStore that will be returned
* @return A KeyValueStore instance related to the given key
*/
public KeyValueStore<String, Map<String, Object>> getStore(String store) {
Store storeData = stores.get(store);
KeyValueStore<String, Map<String, Object>> keyValueStore = null;
if (storeData != null) {
keyValueStore = storeData.getStore();
}
return keyValueStore;
}
开发者ID:redBorder,项目名称:rb-samza-streaming,代码行数:18,代码来源:StoreManager.java
示例4: createLeftJoinFn
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
private PartialJoinFunction<Object, Object, Object, Object> createLeftJoinFn(JoinOperatorSpec joinOpSpec) {
return new PartialJoinFunction<Object, Object, Object, Object>() {
private final JoinFunction joinFn = joinOpSpec.getJoinFn();
private KeyValueStore<Object, TimestampedValue<Object>> leftStreamState;
@Override
public Object apply(Object m, Object om) {
return joinFn.apply(m, om);
}
@Override
public Object getKey(Object message) {
return joinFn.getFirstKey(message);
}
@Override
public KeyValueStore<Object, TimestampedValue<Object>> getState() {
return leftStreamState;
}
@Override
public void init(Config config, TaskContext context) {
String leftStoreName = joinOpSpec.getLeftOpId();
leftStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) context.getStore(leftStoreName);
// user-defined joinFn should only be initialized once, so we do it only in left partial join function.
joinFn.init(config, context);
}
@Override
public void close() {
// joinFn#close() must only be called once, so we do it it only in left partial join function.
joinFn.close();
}
};
}
开发者ID:apache,项目名称:samza,代码行数:37,代码来源:OperatorImplGraph.java
示例5: createRightJoinFn
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
private PartialJoinFunction<Object, Object, Object, Object> createRightJoinFn(JoinOperatorSpec joinOpSpec) {
return new PartialJoinFunction<Object, Object, Object, Object>() {
private final JoinFunction joinFn = joinOpSpec.getJoinFn();
private KeyValueStore<Object, TimestampedValue<Object>> rightStreamState;
@Override
public Object apply(Object m, Object om) {
return joinFn.apply(om, m);
}
@Override
public Object getKey(Object message) {
return joinFn.getSecondKey(message);
}
@Override
public void init(Config config, TaskContext context) {
String rightStoreName = joinOpSpec.getRightOpId();
rightStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) context.getStore(rightStoreName);
// user-defined joinFn should only be initialized once,
// so we do it only in left partial join function and not here again.
}
@Override
public KeyValueStore<Object, TimestampedValue<Object>> getState() {
return rightStreamState;
}
};
}
开发者ID:apache,项目名称:samza,代码行数:31,代码来源:OperatorImplGraph.java
示例6: getStore
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
@Override
public KeyValueStore getStore(String storeName) {
if (storageManager != null) {
return (KeyValueStore) storageManager.apply(storeName);
} else {
LOG.warn("No store found for name: {}", storeName);
return null;
}
}
开发者ID:apache,项目名称:samza,代码行数:10,代码来源:TaskContextImpl.java
示例7: init
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
@Override
@SuppressWarnings("unchecked")
public void init(Config config, TaskContext context) {
this.store = (KeyValueStore<String, String>) context.getStore("checker-state");
this.expectedKeys = config.getInt("expected.keys");
this.numPartitions = config.getInt("num.partitions");
}
开发者ID:apache,项目名称:samza,代码行数:8,代码来源:Checker.java
示例8: init
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
@SuppressWarnings("unchecked")
public void init(Config config, TaskContext context) {
this.store = (KeyValueStore<String, String>) context.getStore("mystore");
System.out.println("Contents of store: ");
KeyValueIterator<String, String> iter = store.all();
while (iter.hasNext()) {
Entry<String, String> entry = iter.next();
System.out.println(entry.getKey() + " => " + entry.getValue());
}
iter.close();
}
开发者ID:apache,项目名称:samza,代码行数:12,代码来源:SimpleStatefulTask.java
示例9: WindowStore
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
public WindowStore(String name, Config config, KeyValueStore<String, Map<String, Object>> store){
this.store = store;
this.lastUpdate = config.getBoolean("redborder.stores.window." + name + ".lastUpdate", true);
}
开发者ID:redBorder,项目名称:rb-samza-streaming,代码行数:5,代码来源:WindowStore.java
示例10: enrichWithLocal
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
/**
* Enriches a given message and returns the result.
*
* @param message The message to enrich
* @param useStores List contains the stores's name that you want use it.
* @return The message given enriched
*/
public Map<String, Object> enrichWithLocal(Map<String, Object> message, List<String> useStores) {
Map<String, Object> enrichment = new HashMap<>();
enrichment.putAll(message);
List<String> enrichWithStores;
if (useStores == null) {
enrichWithStores = storesList;
} else {
enrichWithStores = useStores;
}
for (String store : enrichWithStores) {
Store storeData = stores.get(store);
if (storeData != null) {
List<String> allKeys = storeData.getKeys();
for (String allKey : allKeys) {
String[] keys = allKey.split(":");
StringBuilder builder = new StringBuilder();
for (String key : keys) {
String kv = (String) enrichment.get(key);
if (kv != null) {
builder.append(kv);
}
}
String mergeKey = builder.toString();
KeyValueStore<String, Map<String, Object>> keyValueStore = storeData.getStore();
Map<String, Object> contents = keyValueStore.get(mergeKey);
Map<String, Object> transform = storeData.transform(contents);
log.debug("Query KV store[{}] key[{}], value[" + contents + "]", store, mergeKey);
if (transform != null) {
if (storeData.mustOverwrite()) {
enrichment.putAll(transform);
} else {
Map<String, Object> newData = new HashMap<>();
newData.putAll(transform);
newData.putAll(enrichment);
enrichment = newData;
}
break;
}
}
} else {
log.warn("The store [{}] isn't a available store!!!", store);
}
}
return enrichment;
}
开发者ID:redBorder,项目名称:rb-samza-streaming,代码行数:62,代码来源:StoreManager.java
示例11: setStore
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
public void setStore(KeyValueStore<String, Map<String, Object>> store) {
this.store = store;
}
开发者ID:redBorder,项目名称:rb-samza-streaming,代码行数:4,代码来源:Store.java
示例12: getStore
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
public KeyValueStore<String, Map<String, Object>> getStore() {
return store;
}
开发者ID:redBorder,项目名称:rb-samza-streaming,代码行数:4,代码来源:Store.java
示例13: init
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
public void init(Config config, TaskContext context) {
this.store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats");
}
开发者ID:yoloanalytics,项目名称:bigdata-swamp,代码行数:4,代码来源:WikipediaStatsStreamTask.java
示例14: init
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
@Override
public void init(Config config, TaskContext context) {
this.statsStore = (KeyValueStore<String, StatsWindowState>) context.getStore("my-stats-wnd-store");
}
开发者ID:apache,项目名称:samza,代码行数:5,代码来源:KeyValueStoreExample.java
示例15: testJoinChain
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
@Test
public void testJoinChain() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
when(mockRunner.getStreamSpec(eq("input1"))).thenReturn(new StreamSpec("input1", "input-stream1", "input-system"));
when(mockRunner.getStreamSpec(eq("input2"))).thenReturn(new StreamSpec("input2", "input-stream2", "input-system"));
Config mockConfig = mock(Config.class);
when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
JoinFunction mockJoinFunction = mock(JoinFunction.class);
MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", new NoOpSerde<>());
MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", new NoOpSerde<>());
inputStream1.join(inputStream2, mockJoinFunction,
mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j1");
TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
KeyValueStore mockLeftStore = mock(KeyValueStore.class);
when(mockTaskContext.getStore(eq("jobName-jobId-join-j1-L"))).thenReturn(mockLeftStore);
KeyValueStore mockRightStore = mock(KeyValueStore.class);
when(mockTaskContext.getStore(eq("jobName-jobId-join-j1-R"))).thenReturn(mockRightStore);
OperatorImplGraph opImplGraph =
new OperatorImplGraph(streamGraph, mockConfig, mockTaskContext, mock(Clock.class));
// verify that join function is initialized once.
verify(mockJoinFunction, times(1)).init(any(Config.class), any(TaskContextImpl.class));
InputOperatorImpl inputOpImpl1 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream1"));
InputOperatorImpl inputOpImpl2 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream2"));
PartialJoinOperatorImpl leftPartialJoinOpImpl =
(PartialJoinOperatorImpl) inputOpImpl1.registeredOperators.iterator().next();
PartialJoinOperatorImpl rightPartialJoinOpImpl =
(PartialJoinOperatorImpl) inputOpImpl2.registeredOperators.iterator().next();
assertEquals(leftPartialJoinOpImpl.getOperatorSpec(), rightPartialJoinOpImpl.getOperatorSpec());
assertNotSame(leftPartialJoinOpImpl, rightPartialJoinOpImpl);
Object joinKey = new Object();
// verify that left partial join operator calls getFirstKey
Object mockLeftMessage = mock(Object.class);
long currentTimeMillis = System.currentTimeMillis();
when(mockLeftStore.get(eq(joinKey))).thenReturn(new TimestampedValue<>(mockLeftMessage, currentTimeMillis));
when(mockJoinFunction.getFirstKey(eq(mockLeftMessage))).thenReturn(joinKey);
inputOpImpl1.onMessage(KV.of("", mockLeftMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
verify(mockJoinFunction, times(1)).getFirstKey(mockLeftMessage);
// verify that right partial join operator calls getSecondKey
Object mockRightMessage = mock(Object.class);
when(mockRightStore.get(eq(joinKey))).thenReturn(new TimestampedValue<>(mockRightMessage, currentTimeMillis));
when(mockJoinFunction.getSecondKey(eq(mockRightMessage))).thenReturn(joinKey);
inputOpImpl2.onMessage(KV.of("", mockRightMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
verify(mockJoinFunction, times(1)).getSecondKey(mockRightMessage);
// verify that the join function apply is called with the correct messages on match
verify(mockJoinFunction, times(1)).apply(mockLeftMessage, mockRightMessage);
}
开发者ID:apache,项目名称:samza,代码行数:58,代码来源:TestOperatorImplGraph.java
示例16: newTimeSeriesStore
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
private static <K> TimeSeriesStore<K, byte[]> newTimeSeriesStore(Serde<K> keySerde, boolean appendMode) {
KeyValueStore<TimeSeriesKey<K>, byte[]> kvStore =
new TestInMemoryStore(new TimeSeriesKeySerde<>(keySerde), new ByteSerde());
return new TimeSeriesStoreImpl<>(kvStore, appendMode);
}
开发者ID:apache,项目名称:samza,代码行数:6,代码来源:TestTimeSeriesStoreImpl.java
示例17: init
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
@SuppressWarnings("unchecked")
public void init(Config config, TaskContext context) {
this.store = (KeyValueStore<String, String>) context.getStore("mystore");
}
开发者ID:apache,项目名称:samza,代码行数:5,代码来源:StatePerfTestTask.java
示例18: init
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
@Override
public void init(Config config, TaskContext context) {
this.state = (KeyValueStore<String, String>) context.getStore("emitter-state");
this.taskName = context.getTaskName();
this.max = config.getInt("count");
}
开发者ID:apache,项目名称:samza,代码行数:7,代码来源:Emitter.java
示例19: init
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
@Override
public void init(Config config, TaskContext context) {
this.store = (KeyValueStore<String, String>) context.getStore("joiner-state");
this.expected = config.getInt("num.partitions");
this.taskName = context.getTaskName();
}
开发者ID:apache,项目名称:samza,代码行数:7,代码来源:Joiner.java
示例20: getKVStore
import org.apache.samza.storage.kv.KeyValueStore; //导入依赖的package包/类
public KeyValueStore getKVStore() {
return taskContext.getStore("");
}
开发者ID:theduderog,项目名称:hello-samza-confluent,代码行数:4,代码来源:TaskUnitTestHarness.java
注:本文中的org.apache.samza.storage.kv.KeyValueStore类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论