本文整理汇总了Java中storm.trident.planner.ProcessorContext类的典型用法代码示例。如果您正苦于以下问题:Java ProcessorContext类的具体用法?Java ProcessorContext怎么用?Java ProcessorContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
ProcessorContext类属于storm.trident.planner包,在下文中一共展示了ProcessorContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: finishBatch
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void finishBatch(ProcessorContext processorContext) {
BatchState state = (BatchState) processorContext.state[_context.getStateIndex()];
if(!state.tuples.isEmpty()) {
List<Object> results = _function.batchRetrieve(_state, state.args);
if(results.size()!=state.tuples.size()) {
throw new RuntimeException("Results size is different than argument size: " + results.size() + " vs " + state.tuples.size());
}
for(int i=0; i<state.tuples.size(); i++) {
TridentTuple tuple = state.tuples.get(i);
Object result = results.get(i);
_collector.setContext(processorContext, tuple);
_function.execute(_projection.create(tuple), result, _collector);
}
}
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:17,代码来源:StateQueryProcessor.java
示例2: finishBatch
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void finishBatch(ProcessorContext processorContext) {
_collector.setContext(processorContext);
Object batchId = processorContext.batchId;
// since this processor type is a committer, this occurs in the commit phase
List<TridentTuple> buffer = (List) processorContext.state[_context.getStateIndex()];
// don't update unless there are tuples
// this helps out with things like global partition persist, where multiple tasks may still
// exist for this processor. Only want the global one to do anything
// this is also a helpful optimization that state implementations don't need to manually do
if(buffer.size() > 0) {
Long txid = null;
// this is to support things like persisting off of drpc stream, which is inherently unreliable
// and won't have a tx attempt
if(batchId instanceof TransactionAttempt) {
txid = ((TransactionAttempt) batchId).getTransactionId();
}
_state.beginCommit(txid);
_updater.updateState(_state, buffer, _collector);
_state.commit(txid);
}
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:24,代码来源:PartitionPersistProcessor.java
示例3: finishBatch
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void finishBatch(ProcessorContext processorContext) {
BatchState state = (BatchState) processorContext.state[_context.getStateIndex()];
if (!state.tuples.isEmpty()) {
List<Object> results = _function.batchRetrieve(_state, state.args);
if (results.size() != state.tuples.size()) {
throw new RuntimeException("Results size is different than argument size: " + results.size() + " vs " + state.tuples.size());
}
for (int i = 0; i < state.tuples.size(); i++) {
TridentTuple tuple = state.tuples.get(i);
Object result = results.get(i);
_collector.setContext(processorContext, tuple);
_function.execute(_projection.create(tuple), result, _collector);
}
}
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:17,代码来源:StateQueryProcessor.java
示例4: finishBatch
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void finishBatch(ProcessorContext processorContext) {
_collector.setContext(processorContext);
Object batchId = processorContext.batchId;
// since this processor type is a committer, this occurs in the commit phase
List<TridentTuple> buffer = (List) processorContext.state[_context.getStateIndex()];
// don't update unless there are tuples
// this helps out with things like global partition persist, where multiple tasks may still
// exist for this processor. Only want the global one to do anything
// this is also a helpful optimization that state implementations don't need to manually do
if (buffer.size() > 0) {
Long txid = null;
// this is to support things like persisting off of drpc stream, which is inherently unreliable
// and won't have a tx attempt
if (batchId instanceof TransactionAttempt) {
txid = ((TransactionAttempt) batchId).getTransactionId();
}
_state.beginCommit(txid);
_updater.updateState(_state, buffer, _collector);
_state.commit(txid);
}
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:24,代码来源:PartitionPersistProcessor.java
示例5: finishBatch
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void finishBatch(ProcessorContext processorContext) {
BatchState state = (BatchState) processorContext.state[_context.getStateIndex()];
if(!state.tuples.isEmpty()) {
List<Object> results = _function.batchRetrieve(_state, Collections.unmodifiableList(state.args));
if(results.size()!=state.tuples.size()) {
throw new RuntimeException("Results size is different than argument size: " + results.size() + " vs " + state.tuples.size());
}
for (int i = 0; i < state.tuples.size(); i++) {
TridentTuple tuple = state.tuples.get(i);
Object result = results.get(i);
_collector.setContext(processorContext, tuple);
_function.execute(state.args.get(i), result, _collector);
}
}
}
开发者ID:alibaba,项目名称:jstorm,代码行数:17,代码来源:StateQueryProcessor.java
示例6: execute
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
TridentTuple toEmit = _factory.create(tuple);
for(TupleReceiver r: _context.getReceivers()) {
r.execute(processorContext, _context.getOutStreamId(), toEmit);
}
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:8,代码来源:ProjectedProcessor.java
示例7: execute
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
TridentTuple toEmit = _factory.create(tuple);
for (TupleReceiver r : _context.getReceivers()) {
r.execute(processorContext, _context.getOutStreamId(), toEmit);
}
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:8,代码来源:ProjectedProcessor.java
示例8: startBatch
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void startBatch(ProcessorContext processorContext) {
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:4,代码来源:ProjectedProcessor.java
示例9: finishBatch
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void finishBatch(ProcessorContext processorContext) {
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:4,代码来源:ProjectedProcessor.java
示例10: startBatch
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void startBatch(ProcessorContext processorContext) {
_collector.setContext(processorContext);
processorContext.state[_context.getStateIndex()] = _reducer.init(_collector);
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:6,代码来源:MultiReducerProcessor.java
示例11: execute
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
_collector.setContext(processorContext);
int i = _streamToIndex.get(streamId);
_reducer.execute(processorContext.state[_context.getStateIndex()], i, _projectionFactories[i].create(tuple), _collector);
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:7,代码来源:MultiReducerProcessor.java
示例12: finishBatch
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void finishBatch(ProcessorContext processorContext) {
_collector.setContext(processorContext);
_reducer.complete(processorContext.state[_context.getStateIndex()], _collector);
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:6,代码来源:MultiReducerProcessor.java
示例13: setContext
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
public void setContext(ProcessorContext pc) {
this.context = pc;
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:4,代码来源:FreshCollector.java
示例14: startBatch
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void startBatch(ProcessorContext processorContext) {
_collector.setContext(processorContext);
processorContext.state[_context.getStateIndex()] = _agg.init(processorContext.batchId, _collector);
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:6,代码来源:AggregateProcessor.java
示例15: execute
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
_collector.setContext(processorContext);
_agg.aggregate(processorContext.state[_context.getStateIndex()], _projection.create(tuple), _collector);
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:6,代码来源:AggregateProcessor.java
示例16: finishBatch
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void finishBatch(ProcessorContext processorContext) {
_collector.setContext(processorContext);
_agg.complete(processorContext.state[_context.getStateIndex()], _collector);
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:6,代码来源:AggregateProcessor.java
示例17: startBatch
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void startBatch(ProcessorContext processorContext) {
processorContext.state[_context.getStateIndex()] = new BatchState();
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:5,代码来源:StateQueryProcessor.java
示例18: execute
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
BatchState state = (BatchState) processorContext.state[_context.getStateIndex()];
state.tuples.add(tuple);
state.args.add(_projection.create(tuple));
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:7,代码来源:StateQueryProcessor.java
示例19: startBatch
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void startBatch(ProcessorContext processorContext) {
processorContext.state[_context.getStateIndex()] = new ArrayList<TridentTuple>();
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:5,代码来源:PartitionPersistProcessor.java
示例20: execute
import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
((List) processorContext.state[_context.getStateIndex()]).add(_projection.create(tuple));
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:5,代码来源:PartitionPersistProcessor.java
注:本文中的storm.trident.planner.ProcessorContext类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论