本文整理汇总了Java中org.apache.flink.runtime.state.StateInitializationContext类的典型用法代码示例。如果您正苦于以下问题:Java StateInitializationContext类的具体用法?Java StateInitializationContext怎么用?Java StateInitializationContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
StateInitializationContext类属于org.apache.flink.runtime.state包,在下文中一共展示了StateInitializationContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: initializeState
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(StateInitializationContext context) throws Exception {
if (getKeyedStateBackend() != null) {
KeyGroupsList localKeyGroupRange = getKeyedStateBackend().getKeyGroupRange();
for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) {
DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(streamProvider.getStream());
int keyGroupIdx = streamProvider.getKeyGroupId();
checkArgument(localKeyGroupRange.contains(keyGroupIdx),
"Key Group " + keyGroupIdx + " does not belong to the local range.");
// if (this instanceof KeyGroupRestoringOperator)
restoreKeyGroupState(keyGroupIdx, div);
}
}
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:DedupingOperator.java
示例2: initializeState
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
if (nfaOperatorState == null) {
nfaOperatorState = getRuntimeContext().getState(
new ValueStateDescriptor<>(
NFA_OPERATOR_STATE_NAME,
new NFA.NFASerializer<>(inputSerializer)));
}
if (elementQueueState == null) {
elementQueueState = getRuntimeContext().getMapState(
new MapStateDescriptor<>(
EVENT_QUEUE_STATE_NAME,
LongSerializer.INSTANCE,
new ListSerializer<>(inputSerializer)
)
);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:AbstractKeyedCEPPatternOperator.java
示例3: initializeState
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
/**
* Stream operators with state which can be restored need to override this hook method.
*
* @param context context that allows to register different states.
*/
public void initializeState(StateInitializationContext context) throws Exception {
if (getKeyedStateBackend() != null) {
KeyGroupsList localKeyGroupRange = getKeyedStateBackend().getKeyGroupRange();
// and then initialize the timer services
for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) {
int keyGroupIdx = streamProvider.getKeyGroupId();
checkArgument(localKeyGroupRange.contains(keyGroupIdx),
"Key Group " + keyGroupIdx + " does not belong to the local range.");
timeServiceManager.restoreStateForKeyGroup(
new DataInputViewStreamWrapper(streamProvider.getStream()),
keyGroupIdx, getUserCodeClassloader());
}
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:AbstractStreamOperator.java
示例4: restoreFunctionState
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
public static void restoreFunctionState(
StateInitializationContext context,
Function userFunction) throws Exception {
Preconditions.checkNotNull(context);
while (true) {
if (tryRestoreFunction(context, userFunction)) {
break;
}
// inspect if the user function is wrapped, then unwrap and try again if we can restore the inner function
if (userFunction instanceof WrappingFunction) {
userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
} else {
break;
}
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:StreamingFunctionUtils.java
示例5: initializeState
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
Preconditions.checkState(this.checkpointedState == null,
"The reader state has already been initialized.");
checkpointedState = context.getOperatorStateStore()
.getSerializableListState("pending-checkpoints");
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
if (context.isRestored()) {
LOG.info("Restoring state for the GenericWriteAheadSink (taskIdx={}).", subtaskIdx);
for (PendingCheckpoint pendingCheckpoint : checkpointedState.get()) {
this.pendingCheckpoints.add(pendingCheckpoint);
}
if (LOG.isDebugEnabled()) {
LOG.debug("GenericWriteAheadSink idx {} restored {}.", subtaskIdx, this.pendingCheckpoints);
}
} else {
LOG.info("No state to restore for the GenericWriteAheadSink (taskIdx={}).", subtaskIdx);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:GenericWriteAheadSink.java
示例6: checkRestoredNullCheckpointWhenFetcherNotReady
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
/**
* Tests that no checkpoints happen when the fetcher is not running.
*/
@Test
public void checkRestoredNullCheckpointWhenFetcherNotReady() throws Exception {
FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
TestingListState<Serializable> listState = new TestingListState<>();
when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(false);
consumer.initializeState(initializationContext);
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17));
assertFalse(listState.get().iterator().hasNext());
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:FlinkKafkaConsumerBaseTest.java
示例7: checkUseFetcherWhenNoCheckpoint
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
/**
* Tests that on snapshots, states and offsets to commit to Kafka are correct
*/
@Test
public void checkUseFetcherWhenNoCheckpoint() throws Exception {
FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
List<KafkaTopicPartition> partitionList = new ArrayList<>(1);
partitionList.add(new KafkaTopicPartition("test", 0));
consumer.setSubscribedPartitions(partitionList);
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
TestingListState<Serializable> listState = new TestingListState<>();
when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
// make the context signal that there is no restored state, then validate that
when(initializationContext.isRestored()).thenReturn(false);
consumer.initializeState(initializationContext);
consumer.run(mock(SourceFunction.SourceContext.class));
}
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:FlinkKafkaConsumerBaseTest.java
示例8: initializeState
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
if (this.engineState == null) {
this.engineState = getRuntimeContext().getState(new ValueStateDescriptor<>(ESPER_SERVICE_PROVIDER_STATE, new EsperEngineSerializer()));
}
}
开发者ID:phil3k3,项目名称:flink-esper,代码行数:9,代码来源:SelectEsperStreamOperator.java
示例9: initializeState
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
// create a map-based queue to buffer input elements
if (elementQueueState == null) {
elementQueueState = getRuntimeContext().getMapState(
new MapStateDescriptor<>(
EVENT_QUEUE_STATE_NAME,
LongSerializer.INSTANCE,
new ListSerializer<>(inputSerializer)
)
);
}
}
开发者ID:pravega,项目名称:flink-connectors,代码行数:16,代码来源:EventTimeOrderingOperator.java
示例10: initializeState
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
if (siddhiRuntimeState == null) {
siddhiRuntimeState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(SIDDHI_RUNTIME_STATE_NAME,
new BytePrimitiveArraySerializer()));
}
if (queuedRecordsState == null) {
queuedRecordsState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>(QUEUED_RECORDS_STATE_NAME, new BytePrimitiveArraySerializer()));
}
if (context.isRestored()) {
restoreState();
}
}
开发者ID:haoch,项目名称:flink-siddhi,代码行数:16,代码来源:AbstractSiddhiOperator.java
示例11: initializeState
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
if (siddhiRuntimeState == null) {
siddhiRuntimeState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(SIDDHI_RUNTIME_STATE_NAME,
new BytePrimitiveArraySerializer()));
}
if (queuedRecordsState == null) {
queuedRecordsState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>(QUEUED_RECORDS_STATE_NAME, new BytePrimitiveArraySerializer()));
}
if (context.isRestored()) {
restoreState();
}
}
开发者ID:apache,项目名称:bahir-flink,代码行数:16,代码来源:AbstractSiddhiOperator.java
示例12: initializeState
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(StateInitializationContext context) throws Exception {
if (getKeyedStateBackend() != null) {
int totalKeyGroups = getKeyedStateBackend().getNumberOfKeyGroups();
KeyGroupsList localKeyGroupRange = getKeyedStateBackend().getKeyGroupRange();
for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) {
DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(streamProvider.getStream());
int keyGroupIdx = streamProvider.getKeyGroupId();
checkArgument(localKeyGroupRange.contains(keyGroupIdx),
"Key Group " + keyGroupIdx + " does not belong to the local range.");
// if (this instanceof KeyGroupRestoringOperator)
restoreKeyGroupState(keyGroupIdx, div);
// We just initialize our timerService
if (keyCoder != null) {
if (timerService == null) {
timerService = new HeapInternalTimerService<>(
totalKeyGroups,
localKeyGroupRange,
this,
getRuntimeContext().getProcessingTimeService());
}
timerService.restoreTimersForKeyGroup(div, keyGroupIdx, getUserCodeClassloader());
}
}
}
}
开发者ID:apache,项目名称:beam,代码行数:31,代码来源:DoFnOperator.java
示例13: initializeState
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
recoveredStreamElements = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
}
开发者ID:axbaretto,项目名称:flink,代码行数:9,代码来源:AsyncWaitOperator.java
示例14: initializeState
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
checkState(checkpointedState == null, "The reader state has already been initialized.");
checkpointedState = context.getOperatorStateStore().getSerializableListState("splits");
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
if (context.isRestored()) {
LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIdx);
// this may not be null in case we migrate from a previous Flink version.
if (restoredReaderState == null) {
restoredReaderState = new ArrayList<>();
for (TimestampedFileInputSplit split : checkpointedState.get()) {
restoredReaderState.add(split);
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} (taskIdx={}) restored {}.", getClass().getSimpleName(), subtaskIdx, restoredReaderState);
}
}
} else {
LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIdx);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:28,代码来源:ContinuousFileReaderOperator.java
示例15: tryRestoreFunction
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
private static boolean tryRestoreFunction(
StateInitializationContext context,
Function userFunction) throws Exception {
if (userFunction instanceof CheckpointedFunction) {
((CheckpointedFunction) userFunction).initializeState(context);
return true;
}
if (context.isRestored() && userFunction instanceof ListCheckpointed) {
@SuppressWarnings("unchecked")
ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction;
ListState<Serializable> listState = context.getOperatorStateStore().
getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
List<Serializable> list = new ArrayList<>();
for (Serializable serializable : listState.get()) {
list.add(serializable);
}
try {
listCheckpointedFun.restoreState(list);
} catch (Exception e) {
throw new Exception("Failed to restore state to function: " + e.getMessage(), e);
}
return true;
}
return false;
}
开发者ID:axbaretto,项目名称:flink,代码行数:36,代码来源:StreamingFunctionUtils.java
示例16: initializeState
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
counterState = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("counter-state", LongSerializer.INSTANCE));
if (context.isRestored()) {
for (Long value : counterState.get()) {
counter += value;
}
counterState.clear();
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:16,代码来源:RestoreStreamTaskTest.java
示例17: testUseRestoredStateForSnapshotIfFetcherNotInitialized
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
@Test
public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exception {
Properties config = TestUtils.getStandardProperties();
List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = new ArrayList<>(4);
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3)))),
new SequenceNumber("1")));
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Tuple2<StreamShardMetadata, SequenceNumber> state : globalUnionState) {
listState.add(state);
}
FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
RuntimeContext context = mock(RuntimeContext.class);
when(context.getIndexOfThisSubtask()).thenReturn(0);
when(context.getNumberOfParallelSubtasks()).thenReturn(2);
consumer.setRuntimeContext(context);
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
consumer.initializeState(initializationContext);
// only opened, not run
consumer.open(new Configuration());
// arbitrary checkpoint id and timestamp
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123));
assertTrue(listState.isClearCalled());
// the checkpointed list state should contain only the shards that it should subscribe to
assertEquals(globalUnionState.size() / 2, listState.getList().size());
assertTrue(listState.getList().contains(globalUnionState.get(0)));
assertTrue(listState.getList().contains(globalUnionState.get(2)));
}
开发者ID:axbaretto,项目名称:flink,代码行数:56,代码来源:FlinkKinesisConsumerTest.java
示例18: testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception {
// ----------------------------------------------------------------------
// setup initial state
// ----------------------------------------------------------------------
HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
// ----------------------------------------------------------------------
// mock fetcher
// ----------------------------------------------------------------------
KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
List<StreamShardHandle> shards = new ArrayList<>();
shards.addAll(fakeRestoredState.keySet());
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
restoredShard.getKey(), restoredShard.getValue()));
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:57,代码来源:FlinkKinesisConsumerTest.java
示例19: testFetcherShouldBeCorrectlySeededWithNewDiscoveredKinesisStreamShard
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldBeCorrectlySeededWithNewDiscoveredKinesisStreamShard() throws Exception {
// ----------------------------------------------------------------------
// setup initial state
// ----------------------------------------------------------------------
HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
// ----------------------------------------------------------------------
// mock fetcher
// ----------------------------------------------------------------------
KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
List<StreamShardHandle> shards = new ArrayList<>();
shards.addAll(fakeRestoredState.keySet());
shards.add(new StreamShardHandle("fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))));
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
fakeRestoredState.put(new StreamShardHandle("fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
restoredShard.getKey(), restoredShard.getValue()));
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:62,代码来源:FlinkKinesisConsumerTest.java
示例20: initializeState
import org.apache.flink.runtime.state.StateInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}
开发者ID:axbaretto,项目名称:flink,代码行数:6,代码来源:AbstractUdfStreamOperator.java
注:本文中的org.apache.flink.runtime.state.StateInitializationContext类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论