本文整理汇总了Java中org.apache.flink.runtime.state.FunctionInitializationContext类的典型用法代码示例。如果您正苦于以下问题:Java FunctionInitializationContext类的具体用法?Java FunctionInitializationContext怎么用?Java FunctionInitializationContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
FunctionInitializationContext类属于org.apache.flink.runtime.state包,在下文中一共展示了FunctionInitializationContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
if (checkpointCoder == null) {
// no checkpoint coder available in this source
return;
}
OperatorStateStore stateStore = context.getOperatorStateStore();
CoderTypeInformation<
KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>>
typeInformation = (CoderTypeInformation) new CoderTypeInformation<>(checkpointCoder);
stateForCheckpoint = stateStore.getOperatorState(
new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
typeInformation.createSerializer(new ExecutionConfig())));
if (context.isRestored()) {
isRestored = true;
LOG.info("Having restore state in the UnbounedSourceWrapper.");
} else {
LOG.info("No restore state for UnbounedSourceWrapper.");
}
}
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:UnboundedSourceWrapper.java
示例2: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
TypeInformation<Tuple2<StreamShardMetadata, SequenceNumber>> shardsStateTypeInfo = new TupleTypeInfo<>(
TypeInformation.of(StreamShardMetadata.class),
TypeInformation.of(SequenceNumber.class));
sequenceNumsStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
new ListStateDescriptor<>(sequenceNumsStateStoreName, shardsStateTypeInfo));
if (context.isRestored()) {
if (sequenceNumsToRestore == null) {
sequenceNumsToRestore = new HashMap<>();
for (Tuple2<StreamShardMetadata, SequenceNumber> kinesisSequenceNumber : sequenceNumsStateForCheckpoint.get()) {
sequenceNumsToRestore.put(kinesisSequenceNumber.f0, kinesisSequenceNumber.f1);
}
LOG.info("Setting restore state in the FlinkKinesisConsumer. Using the following offsets: {}",
sequenceNumsToRestore);
}
} else {
LOG.info("No restore state for FlinkKinesisConsumer.");
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:FlinkKinesisConsumer.java
示例3: beforeTest
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Before
public void beforeTest() throws Exception {
OperatorStateStore mockStore = Mockito.mock(OperatorStateStore.class);
FunctionInitializationContext mockContext = Mockito.mock(FunctionInitializationContext.class);
Mockito.when(mockContext.getOperatorStateStore()).thenReturn(mockStore);
Mockito.when(mockStore.getSerializableListState(any(String.class))).thenReturn(null);
source = new RMQTestSource();
source.initializeState(mockContext);
source.open(config);
messageId = 0;
generateCorrelationIds = true;
sourceThread = new Thread(new Runnable() {
@Override
public void run() {
try {
source.run(new DummySourceContext());
} catch (Exception e) {
exception = e;
}
}
});
}
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:RMQSourceTest.java
示例4: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
if (broadcast) {
this.counterPartitions = context
.getOperatorStateStore()
.getUnionListState(new ListStateDescriptor<>("counter_partitions", IntSerializer.INSTANCE));
} else {
this.counterPartitions = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("counter_partitions", IntSerializer.INSTANCE));
}
if (context.isRestored()) {
for (int v : counterPartitions.get()) {
counter += v;
}
checkCorrectRestore[getRuntimeContext().getIndexOfThisSubtask()] = counter;
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:RescalingITCase.java
示例5: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
Preconditions.checkState(this.checkpointedState == null,
"The " + getClass().getSimpleName() + " has already been initialized.");
this.checkpointedState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>(
"from-elements-state",
IntSerializer.INSTANCE
)
);
if (context.isRestored()) {
List<Integer> retrievedStates = new ArrayList<>();
for (Integer entry : this.checkpointedState.get()) {
retrievedStates.add(entry);
}
// given that the parallelism of the function is 1, we can only have 1 state
Preconditions.checkArgument(retrievedStates.size() == 1,
getClass().getSimpleName() + " retrieved invalid state.");
this.numElementsToSkip = retrievedStates.get(0);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:FromElementsFunction.java
示例6: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
OperatorStateStore stateStore = context.getOperatorStateStore();
offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
if (context.isRestored()) {
restoreToOffset = new HashMap<>();
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1);
}
LOG.info("Setting restore state in the FlinkKafkaConsumer.");
if (LOG.isDebugEnabled()) {
LOG.debug("Using the following offsets: {}", restoreToOffset);
}
} else {
LOG.info("No restore state for FlinkKafkaConsumer.");
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:FlinkKafkaConsumerBase.java
示例7: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
public void initializeState(FunctionInitializationContext context) throws Exception {
LOG.info("initializeState...");
TypeInformation<Tuple2<LogstoreShardMeta, String>> shardsStateTypeInfo = new TupleTypeInfo<Tuple2<LogstoreShardMeta, String>>(
TypeInformation.of(LogstoreShardMeta.class),
TypeInformation.of(String.class));
cursorStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
new ListStateDescriptor(curcorStateStoreName, shardsStateTypeInfo));
if (context.isRestored()) {
if (cursorsToRestore == null) {
cursorsToRestore = new HashMap<LogstoreShardMeta, String>();
for (Tuple2<LogstoreShardMeta, String> cursor : cursorStateForCheckpoint.get()) {
LOG.info("initializeState, project: {}, logstore: {}, shard: {}, checkpoint: {}", logProject, logStore, cursor.f0.toString(), cursor.f1);
cursorsToRestore.put(cursor.f0, cursor.f1);
if (consumerGroupName != null && logClient != null) {
logClient.updateCheckpoint(logProject, logStore, consumerGroupName, "flinkTask-" + getRuntimeContext().getIndexOfThisSubtask() + "Of" + getRuntimeContext().getNumberOfParallelSubtasks(), cursor.f0.getShardId(), cursor.f1);
}
}
LOG.info("Setting restore state in the FlinkLogConsumer. Using the following offsets: {}",
cursorsToRestore);
}
}
else {
LOG.info("No restore state for FlinkLogConsumer.");
}
}
开发者ID:aliyun,项目名称:aliyun-log-flink-connector,代码行数:29,代码来源:FlinkLogConsumer.java
示例8: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized.");
try {
initFileSystem();
} catch (IOException e) {
LOG.error("Error while creating FileSystem when initializing the state of the TODBucketingSink.", e);
throw new RuntimeException("Error while creating FileSystem when initializing the state of the TODBucketingSink.", e);
}
if (this.refTruncate == null) {
this.refTruncate = reflectTruncate(fs);
}
OperatorStateStore stateStore = context.getOperatorStateStore();
restoredBucketStates = stateStore.getSerializableListState("bucket-states");
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
if (context.isRestored()) {
LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
for (State<T> recoveredState : restoredBucketStates.get()) {
handleRestoredBucketState(recoveredState);
if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} restored {}", getClass().getSimpleName(), subtaskIndex, recoveredState);
}
}
} else {
LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
}
}
开发者ID:breakEval13,项目名称:rocketmq-flink-plugin,代码行数:33,代码来源:TODBucketingSink.java
示例9: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
LOG.warn("Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", semantic, Semantic.NONE);
semantic = Semantic.NONE;
}
nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
transactionalIdsGenerator = new TransactionalIdsGenerator(
getRuntimeContext().getTaskName(),
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
kafkaProducersPoolSize,
SAFE_SCALE_DOWN_FACTOR);
if (semantic != Semantic.EXACTLY_ONCE) {
nextTransactionalIdHint = null;
} else {
ArrayList<NextTransactionalIdHint> transactionalIdHints = Lists.newArrayList(nextTransactionalIdHintState.get());
if (transactionalIdHints.size() > 1) {
throw new IllegalStateException(
"There should be at most one next transactional id hint written by the first subtask");
} else if (transactionalIdHints.size() == 0) {
nextTransactionalIdHint = new NextTransactionalIdHint(0, 0);
// this means that this is either:
// (1) the first execution of this application
// (2) previous execution has failed before first checkpoint completed
//
// in case of (2) we have to abort all previous transactions
abortTransactions(transactionalIdsGenerator.generateIdsToAbort());
} else {
nextTransactionalIdHint = transactionalIdHints.get(0);
}
}
super.initializeState(context);
}
开发者ID:axbaretto,项目名称:flink,代码行数:40,代码来源:FlinkKafkaProducer011.java
示例10: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
Preconditions.checkArgument(this.restoredBucketStates == null,
"The " + getClass().getSimpleName() + " has already been initialized.");
try {
initFileSystem();
} catch (IOException e) {
LOG.error("Error while creating FileSystem when initializing the state of the RollingSink.", e);
throw new RuntimeException("Error while creating FileSystem when initializing the state of the RollingSink.", e);
}
if (this.refTruncate == null) {
this.refTruncate = reflectTruncate(fs);
}
OperatorStateStore stateStore = context.getOperatorStateStore();
restoredBucketStates = stateStore.getSerializableListState("rolling-states");
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
if (context.isRestored()) {
LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
for (BucketState bucketState : restoredBucketStates.get()) {
handleRestoredBucketState(bucketState);
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} (taskIdx= {}) restored {}", getClass().getSimpleName(), subtaskIndex, bucketState);
}
} else {
LOG.info("No state to restore for the {} (taskIdx= {}).", getClass().getSimpleName(), subtaskIndex);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:35,代码来源:RollingSink.java
示例11: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized.");
try {
initFileSystem();
} catch (IOException e) {
LOG.error("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
throw new RuntimeException("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
}
if (this.refTruncate == null) {
this.refTruncate = reflectTruncate(fs);
}
OperatorStateStore stateStore = context.getOperatorStateStore();
restoredBucketStates = stateStore.getSerializableListState("bucket-states");
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
if (context.isRestored()) {
LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
for (State<T> recoveredState : restoredBucketStates.get()) {
handleRestoredBucketState(recoveredState);
if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} restored {}", getClass().getSimpleName(), subtaskIndex, recoveredState);
}
}
} else {
LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:33,代码来源:BucketingSink.java
示例12: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
pojoClass = getRuntimeContext().getUserCodeClassLoader().loadClass(POJO_NAME);
fieldA = pojoClass.getDeclaredField("a");
fieldA.setAccessible(true);
if (hasBField) {
fieldB = pojoClass.getDeclaredField("b");
fieldB.setAccessible(true);
}
if (keyed) {
keyedValueState = context.getKeyedStateStore().getState(
new ValueStateDescriptor<>("keyedValueState", (Class<Object>) pojoClass));
keyedMapState = context.getKeyedStateStore().getMapState(
new MapStateDescriptor<>("keyedMapState", (Class<Object>) pojoClass, (Class<Object>) pojoClass));
keyedListState = context.getKeyedStateStore().getListState(
new ListStateDescriptor<>("keyedListState", (Class<Object>) pojoClass));
ReduceFunction<Object> reduceFunction = new FirstValueReducer<>();
keyedReducingState = context.getKeyedStateStore().getReducingState(
new ReducingStateDescriptor<>("keyedReducingState", reduceFunction, (Class<Object>) pojoClass));
} else {
partitionableListState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>("partitionableListState", (Class<Object>) pojoClass));
unionListState = context.getOperatorStateStore().getUnionListState(
new ListStateDescriptor<>("unionListState", (Class<Object>) pojoClass));
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:31,代码来源:PojoSerializerUpgradeTest.java
示例13: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ValueStateDescriptor<Integer> descriptor =
new ValueStateDescriptor<>(
"seenCountState",
TypeInformation.of(new TypeHint<Integer>() {}),
0);
alreadySeen = context.getKeyedStateStore().getState(descriptor);
}
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:JobManagerHACheckpointRecoveryITCase.java
示例14: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
Preconditions.checkState(this.checkpointedState == null,
"The " + getClass().getSimpleName() + " has already been initialized.");
this.checkpointedState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>(
"stateful-sequence-source-state",
LongSerializer.INSTANCE
)
);
this.valuesToEmit = new ArrayDeque<>();
if (context.isRestored()) {
// upon restoring
for (Long v : this.checkpointedState.get()) {
this.valuesToEmit.add(v);
}
} else {
// the first time the job is executed
final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
final int taskIdx = getRuntimeContext().getIndexOfThisSubtask();
final long congruence = start + taskIdx;
long totalNoOfElements = Math.abs(end - start + 1);
final int baseSize = safeDivide(totalNoOfElements, stepSize);
final int toCollect = (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize;
for (long collected = 0; collected < toCollect; collected++) {
this.valuesToEmit.add(collected * stepSize + congruence);
}
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:37,代码来源:StatefulSequenceSource.java
示例15: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
Preconditions.checkState(this.checkpointedState == null,
"The " + getClass().getSimpleName() + " has already been initialized.");
this.checkpointedState = context
.getOperatorStateStore()
.getSerializableListState("message-acknowledging-source-state");
this.idsForCurrentCheckpoint = new HashSet<>(64);
this.pendingCheckpoints = new ArrayDeque<>();
this.idsProcessedButNotAcknowledged = new HashSet<>();
if (context.isRestored()) {
LOG.info("Restoring state for the {}.", getClass().getSimpleName());
List<SerializedCheckpointData[]> retrievedStates = new ArrayList<>();
for (SerializedCheckpointData[] entry : this.checkpointedState.get()) {
retrievedStates.add(entry);
}
// given that the parallelism of the function is 1, we can only have at most 1 state
Preconditions.checkArgument(retrievedStates.size() == 1,
getClass().getSimpleName() + " retrieved invalid state.");
pendingCheckpoints = SerializedCheckpointData.toDeque(retrievedStates.get(0), idSerializer);
// build a set which contains all processed ids. It may be used to check if we have
// already processed an incoming message.
for (Tuple2<Long, Set<UId>> checkpoint : pendingCheckpoints) {
idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
}
} else {
LOG.info("No state to restore for the {}.", getClass().getSimpleName());
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:36,代码来源:MessageAcknowledgingSourceBase.java
示例16: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
serializableListState = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("test-state", IntSerializer.INSTANCE));
if (context.isRestored()) {
Iterator<Integer> integers = serializableListState.get().iterator();
int act = integers.next();
Assert.assertEquals(42, act);
Assert.assertFalse(integers.hasNext());
wasRestored = true;
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:14,代码来源:WrappingFunctionSnapshotRestoreTest.java
示例17: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
Preconditions.checkArgument(this.restoredBucketStates == null,
"The " + getClass().getSimpleName() + " has already been initialized.");
initFileSystem();
if (this.refTruncate == null) {
this.refTruncate = reflectTruncate(fs);
}
OperatorStateStore stateStore = context.getOperatorStateStore();
restoredBucketStates = stateStore.getSerializableListState("rolling-states");
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
if (context.isRestored()) {
LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
for (BucketState bucketState : restoredBucketStates.get()) {
handleRestoredBucketState(bucketState);
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} (taskIdx= {}) restored {}", getClass().getSimpleName(), subtaskIndex, bucketState);
}
} else {
LOG.info("No state to restore for the {} (taskIdx= {}).", getClass().getSimpleName(), subtaskIndex);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:30,代码来源:RollingSink.java
示例18: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized.");
initFileSystem();
if (this.refTruncate == null) {
this.refTruncate = reflectTruncate(fs);
}
OperatorStateStore stateStore = context.getOperatorStateStore();
restoredBucketStates = stateStore.getSerializableListState("bucket-states");
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
if (context.isRestored()) {
LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
for (State<T> recoveredState : restoredBucketStates.get()) {
handleRestoredBucketState(recoveredState);
if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} restored {}", getClass().getSimpleName(), subtaskIndex, recoveredState);
}
}
} else {
LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:28,代码来源:BucketingSink.java
示例19: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
//nothing to initialize, as in flight documents are completely flushed during checkpointing
}
开发者ID:awslabs,项目名称:flink-stream-processing-refarch,代码行数:5,代码来源:ElasticsearchJestSink.java
示例20: initializeState
import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
public void initializeState(FunctionInitializationContext context) throws Exception {
// do nothing
}
开发者ID:aliyun,项目名称:aliyun-log-flink-connector,代码行数:4,代码来源:FlinkLogProducer.java
注:本文中的org.apache.flink.runtime.state.FunctionInitializationContext类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论