本文整理汇总了Java中backtype.storm.utils.RegisteredGlobalState类的典型用法代码示例。如果您正苦于以下问题:Java RegisteredGlobalState类的具体用法?Java RegisteredGlobalState怎么用?Java RegisteredGlobalState使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
RegisteredGlobalState类属于backtype.storm.utils包,在下文中一共展示了RegisteredGlobalState类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: initializeTransaction
import backtype.storm.utils.RegisteredGlobalState; //导入依赖的package包/类
@Override
public Map<Integer, List<List<Object>>> initializeTransaction(long txid, Map<Integer, List<List<Object>>> prevMetadata, Map<Integer, List<List<Object>>> currMetadata) {
if(currMetadata!=null) return currMetadata;
List allBatches = (List) RegisteredGlobalState.getState(_id);
if(allBatches.size()>_emittedIndex) {
Object batchInfo = allBatches.get(_emittedIndex);
txIndices.put(txid, _emittedIndex);
_emittedIndex += 1;
if(batchInfo instanceof Map) {
return (Map) batchInfo;
} else {
List batchList = (List) batchInfo;
Map<Integer, List<List<Object>>> partitions = new HashMap();
for(int i=0; i<_numPartitions; i++) {
partitions.put(i, new ArrayList());
}
for(int i=0; i<batchList.size(); i++) {
int partition = i % _numPartitions;
partitions.get(partition).add((List)batchList.get(i));
}
return partitions;
}
} else {
return new HashMap();
}
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:27,代码来源:FeederBatchSpout.java
示例2: OpaqueMemoryTransactionalSpout
import backtype.storm.utils.RegisteredGlobalState; //导入依赖的package包/类
public OpaqueMemoryTransactionalSpout(
Map<Integer, List<List<Object>>> partitions, Fields outFields,
int takeAmt) {
_id = RegisteredGlobalState.registerState(partitions);
Map<Integer, Boolean> finished = Collections
.synchronizedMap(new HashMap<Integer, Boolean>());
_finishedPartitionsId = RegisteredGlobalState.registerState(finished);
Map<Integer, Boolean> disabled = Collections
.synchronizedMap(new HashMap<Integer, Boolean>());
_disabledId = RegisteredGlobalState.registerState(disabled);
_takeAmt = takeAmt;
_outFields = outFields;
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:17,代码来源:OpaqueMemoryTransactionalSpout.java
示例3: initializeTransaction
import backtype.storm.utils.RegisteredGlobalState; //导入依赖的package包/类
@Override
public Map<Integer, List<List<Object>>> initializeTransaction(long txid, Map<Integer, List<List<Object>>> prevMetadata, Map<Integer, List<List<Object>>> currMetadata) {
if(currMetadata!=null) return currMetadata;
List allBatches = (List) RegisteredGlobalState.getState(_id);
if(allBatches.size()>_emittedIndex) {
Object batchInfo = allBatches.get(_emittedIndex);
txIndices.put(txid, _emittedIndex);
_emittedIndex += 1;
if(batchInfo instanceof Map) {
return (Map) batchInfo;
} else {
List batchList = (List) batchInfo;
Map<Integer, List<List<Object>>> partitions = new HashMap();
for (int i = 0; i < _numPartitions; i++) {
partitions.put(i, new ArrayList());
}
for (int i = 0; i < batchList.size(); i++) {
int partition = i % _numPartitions;
partitions.get(partition).add((List) batchList.get(i));
}
return partitions;
}
} else {
return new HashMap();
}
}
开发者ID:alibaba,项目名称:jstorm,代码行数:27,代码来源:FeederBatchSpout.java
示例4: feed
import backtype.storm.utils.RegisteredGlobalState; //导入依赖的package包/类
@Override
public void feed(Object tuples) {
Semaphore sem = new Semaphore(0);
((List)RegisteredGlobalState.getState(_semaphoreId)).add(sem);
((List)RegisteredGlobalState.getState(_id)).add(tuples);
try {
sem.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:12,代码来源:FeederBatchSpout.java
示例5: success
import backtype.storm.utils.RegisteredGlobalState; //导入依赖的package包/类
@Override
public void success(long txid) {
Integer index = txIndices.get(txid);
if(index != null) {
Semaphore sem = (Semaphore) ((List)RegisteredGlobalState.getState(_semaphoreId)).get(index);
sem.release();
}
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:9,代码来源:FeederBatchSpout.java
示例6: isReady
import backtype.storm.utils.RegisteredGlobalState; //导入依赖的package包/类
@Override
public boolean isReady(long txid) {
if(!_waitToEmit) return true;
List allBatches = (List) RegisteredGlobalState.getState(_id);
if(allBatches.size() > _masterEmitted) {
_masterEmitted++;
return true;
} else {
Utils.sleep(2);
return false;
}
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:13,代码来源:FeederBatchSpout.java
示例7: MemoryTransactionalSpout
import backtype.storm.utils.RegisteredGlobalState; //导入依赖的package包/类
public MemoryTransactionalSpout(
Map<Integer, List<List<Object>>> partitions, Fields outFields,
int takeAmt) {
_id = RegisteredGlobalState.registerState(partitions);
Map<Integer, Boolean> finished = Collections
.synchronizedMap(new HashMap<Integer, Boolean>());
_finishedPartitionsId = RegisteredGlobalState.registerState(finished);
_takeAmt = takeAmt;
_outFields = outFields;
_initialPartitions = partitions;
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:12,代码来源:MemoryTransactionalSpout.java
示例8: getQueues
import backtype.storm.utils.RegisteredGlobalState; //导入依赖的package包/类
private Map<Integer, List<List<Object>>> getQueues() {
Map<Integer, List<List<Object>>> ret = (Map<Integer, List<List<Object>>>) RegisteredGlobalState
.getState(_id);
if (ret != null)
return ret;
else
return _initialPartitions;
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:9,代码来源:MemoryTransactionalSpout.java
示例9: feed
import backtype.storm.utils.RegisteredGlobalState; //导入依赖的package包/类
public void feed(Object tuples) {
Semaphore sem = new Semaphore(0);
((List)RegisteredGlobalState.getState(_semaphoreId)).add(sem);
((List)RegisteredGlobalState.getState(_id)).add(tuples);
try {
sem.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
开发者ID:songtk,项目名称:learn_jstorm,代码行数:11,代码来源:FeederBatchSpout.java
示例10: feed
import backtype.storm.utils.RegisteredGlobalState; //导入依赖的package包/类
public void feed(Object tuples) {
Semaphore sem = new Semaphore(0);
((List) RegisteredGlobalState.getState(_semaphoreId)).add(sem);
((List) RegisteredGlobalState.getState(_id)).add(tuples);
try {
sem.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:11,代码来源:FeederBatchSpout.java
示例11: initializeTransaction
import backtype.storm.utils.RegisteredGlobalState; //导入依赖的package包/类
@Override
public Map<Integer, List<List<Object>>> initializeTransaction(long txid, Map<Integer, List<List<Object>>> prevMetadata,
Map<Integer, List<List<Object>>> currMetadata) {
if (currMetadata != null)
return currMetadata;
List allBatches = (List) RegisteredGlobalState.getState(_id);
if (allBatches.size() > _emittedIndex) {
Object batchInfo = allBatches.get(_emittedIndex);
txIndices.put(txid, _emittedIndex);
_emittedIndex += 1;
if (batchInfo instanceof Map) {
return (Map) batchInfo;
} else {
List batchList = (List) batchInfo;
Map<Integer, List<List<Object>>> partitions = new HashMap();
for (int i = 0; i < _numPartitions; i++) {
partitions.put(i, new ArrayList());
}
for (int i = 0; i < batchList.size(); i++) {
int partition = i % _numPartitions;
partitions.get(partition).add((List) batchList.get(i));
}
return partitions;
}
} else {
return new HashMap();
}
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:29,代码来源:FeederBatchSpout.java
示例12: success
import backtype.storm.utils.RegisteredGlobalState; //导入依赖的package包/类
@Override
public void success(long txid) {
Integer index = txIndices.get(txid);
if (index != null) {
Semaphore sem = (Semaphore) ((List) RegisteredGlobalState.getState(_semaphoreId)).get(index);
sem.release();
}
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:9,代码来源:FeederBatchSpout.java
示例13: isReady
import backtype.storm.utils.RegisteredGlobalState; //导入依赖的package包/类
@Override
public boolean isReady(long txid) {
if (!_waitToEmit)
return true;
List allBatches = (List) RegisteredGlobalState.getState(_id);
if (allBatches.size() > _masterEmitted) {
_masterEmitted++;
return true;
} else {
Utils.sleep(2);
return false;
}
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:14,代码来源:FeederBatchSpout.java
示例14: OpaqueMemoryTransactionalSpout
import backtype.storm.utils.RegisteredGlobalState; //导入依赖的package包/类
public OpaqueMemoryTransactionalSpout(Map<Integer, List<List<Object>>> partitions, Fields outFields, int takeAmt) {
_id = RegisteredGlobalState.registerState(partitions);
Map<Integer, Boolean> finished = Collections.synchronizedMap(new HashMap<Integer, Boolean>());
_finishedPartitionsId = RegisteredGlobalState.registerState(finished);
Map<Integer, Boolean> disabled = Collections.synchronizedMap(new HashMap<Integer, Boolean>());
_disabledId = RegisteredGlobalState.registerState(disabled);
_takeAmt = takeAmt;
_outFields = outFields;
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:13,代码来源:OpaqueMemoryTransactionalSpout.java
示例15: MemoryTransactionalSpout
import backtype.storm.utils.RegisteredGlobalState; //导入依赖的package包/类
public MemoryTransactionalSpout(Map<Integer, List<List<Object>>> partitions, Fields outFields, int takeAmt) {
_id = RegisteredGlobalState.registerState(partitions);
Map<Integer, Boolean> finished = Collections.synchronizedMap(new HashMap<Integer, Boolean>());
_finishedPartitionsId = RegisteredGlobalState.registerState(finished);
_takeAmt = takeAmt;
_outFields = outFields;
_initialPartitions = partitions;
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:9,代码来源:MemoryTransactionalSpout.java
示例16: getQueues
import backtype.storm.utils.RegisteredGlobalState; //导入依赖的package包/类
private Map<Integer, List<List<Object>>> getQueues() {
Map<Integer, List<List<Object>>> ret = (Map<Integer, List<List<Object>>>) RegisteredGlobalState.getState(_id);
if (ret != null)
return ret;
else
return _initialPartitions;
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:8,代码来源:MemoryTransactionalSpout.java
示例17: OpaqueMemoryTransactionalSpout
import backtype.storm.utils.RegisteredGlobalState; //导入依赖的package包/类
public OpaqueMemoryTransactionalSpout(Map<Integer, List<List<Object>>> partitions, Fields outFields, int takeAmt) {
_id = RegisteredGlobalState.registerState(partitions);
Map<Integer, Boolean> finished = Collections.synchronizedMap(new HashMap<Integer, Boolean>());
_finishedPartitionsId = RegisteredGlobalState.registerState(finished);
Map<Integer, Boolean> disabled = Collections.synchronizedMap(new HashMap<Integer, Boolean>());
_disabledId = RegisteredGlobalState.registerState(disabled);
_takeAmt = takeAmt;
_outFields = outFields;
}
开发者ID:metamx,项目名称:incubator-storm,代码行数:13,代码来源:OpaqueMemoryTransactionalSpout.java
注:本文中的backtype.storm.utils.RegisteredGlobalState类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论