• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java FunctionInitializationContext类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.runtime.state.FunctionInitializationContext的典型用法代码示例。如果您正苦于以下问题:Java FunctionInitializationContext类的具体用法?Java FunctionInitializationContext怎么用?Java FunctionInitializationContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



FunctionInitializationContext类属于org.apache.flink.runtime.state包,在下文中一共展示了FunctionInitializationContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
  if (checkpointCoder == null) {
    // no checkpoint coder available in this source
    return;
  }

  OperatorStateStore stateStore = context.getOperatorStateStore();
  CoderTypeInformation<
      KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>>
      typeInformation = (CoderTypeInformation) new CoderTypeInformation<>(checkpointCoder);
  stateForCheckpoint = stateStore.getOperatorState(
      new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
          typeInformation.createSerializer(new ExecutionConfig())));

  if (context.isRestored()) {
    isRestored = true;
    LOG.info("Having restore state in the UnbounedSourceWrapper.");
  } else {
    LOG.info("No restore state for UnbounedSourceWrapper.");
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:UnboundedSourceWrapper.java


示例2: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
	TypeInformation<Tuple2<StreamShardMetadata, SequenceNumber>> shardsStateTypeInfo = new TupleTypeInfo<>(
		TypeInformation.of(StreamShardMetadata.class),
		TypeInformation.of(SequenceNumber.class));

	sequenceNumsStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
		new ListStateDescriptor<>(sequenceNumsStateStoreName, shardsStateTypeInfo));

	if (context.isRestored()) {
		if (sequenceNumsToRestore == null) {
			sequenceNumsToRestore = new HashMap<>();
			for (Tuple2<StreamShardMetadata, SequenceNumber> kinesisSequenceNumber : sequenceNumsStateForCheckpoint.get()) {
				sequenceNumsToRestore.put(kinesisSequenceNumber.f0, kinesisSequenceNumber.f1);
			}

			LOG.info("Setting restore state in the FlinkKinesisConsumer. Using the following offsets: {}",
				sequenceNumsToRestore);
		}
	} else {
		LOG.info("No restore state for FlinkKinesisConsumer.");
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:FlinkKinesisConsumer.java


示例3: beforeTest

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Before
public void beforeTest() throws Exception {

	OperatorStateStore mockStore = Mockito.mock(OperatorStateStore.class);
	FunctionInitializationContext mockContext = Mockito.mock(FunctionInitializationContext.class);
	Mockito.when(mockContext.getOperatorStateStore()).thenReturn(mockStore);
	Mockito.when(mockStore.getSerializableListState(any(String.class))).thenReturn(null);

	source = new RMQTestSource();
	source.initializeState(mockContext);
	source.open(config);

	messageId = 0;
	generateCorrelationIds = true;

	sourceThread = new Thread(new Runnable() {
		@Override
		public void run() {
			try {
				source.run(new DummySourceContext());
			} catch (Exception e) {
				exception = e;
			}
		}
	});
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:RMQSourceTest.java


示例4: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

	if (broadcast) {
		this.counterPartitions = context
				.getOperatorStateStore()
				.getUnionListState(new ListStateDescriptor<>("counter_partitions", IntSerializer.INSTANCE));
	} else {
		this.counterPartitions = context
				.getOperatorStateStore()
				.getListState(new ListStateDescriptor<>("counter_partitions", IntSerializer.INSTANCE));
	}

	if (context.isRestored()) {
		for (int v : counterPartitions.get()) {
			counter += v;
		}
		checkCorrectRestore[getRuntimeContext().getIndexOfThisSubtask()] = counter;
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:RescalingITCase.java


示例5: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
	Preconditions.checkState(this.checkpointedState == null,
		"The " + getClass().getSimpleName() + " has already been initialized.");

	this.checkpointedState = context.getOperatorStateStore().getListState(
		new ListStateDescriptor<>(
			"from-elements-state",
			IntSerializer.INSTANCE
		)
	);

	if (context.isRestored()) {
		List<Integer> retrievedStates = new ArrayList<>();
		for (Integer entry : this.checkpointedState.get()) {
			retrievedStates.add(entry);
		}

		// given that the parallelism of the function is 1, we can only have 1 state
		Preconditions.checkArgument(retrievedStates.size() == 1,
			getClass().getSimpleName() + " retrieved invalid state.");

		this.numElementsToSkip = retrievedStates.get(0);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:FromElementsFunction.java


示例6: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

	OperatorStateStore stateStore = context.getOperatorStateStore();
	offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);

	if (context.isRestored()) {
		restoreToOffset = new HashMap<>();
		for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
			restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1);
		}

		LOG.info("Setting restore state in the FlinkKafkaConsumer.");
		if (LOG.isDebugEnabled()) {
			LOG.debug("Using the following offsets: {}", restoreToOffset);
		}
	} else {
		LOG.info("No restore state for FlinkKafkaConsumer.");
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:FlinkKafkaConsumerBase.java


示例7: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
public void initializeState(FunctionInitializationContext context) throws Exception {
    LOG.info("initializeState...");
    TypeInformation<Tuple2<LogstoreShardMeta, String>> shardsStateTypeInfo = new TupleTypeInfo<Tuple2<LogstoreShardMeta, String>>(
            TypeInformation.of(LogstoreShardMeta.class),
            TypeInformation.of(String.class));

    cursorStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
            new ListStateDescriptor(curcorStateStoreName, shardsStateTypeInfo));

    if (context.isRestored()) {
        if (cursorsToRestore == null) {
            cursorsToRestore = new HashMap<LogstoreShardMeta, String>();
            for (Tuple2<LogstoreShardMeta, String> cursor : cursorStateForCheckpoint.get()) {
                LOG.info("initializeState, project: {}, logstore: {}, shard: {}, checkpoint: {}", logProject, logStore, cursor.f0.toString(), cursor.f1);
                cursorsToRestore.put(cursor.f0, cursor.f1);
                if (consumerGroupName != null && logClient != null) {
                    logClient.updateCheckpoint(logProject, logStore, consumerGroupName, "flinkTask-" + getRuntimeContext().getIndexOfThisSubtask() + "Of" + getRuntimeContext().getNumberOfParallelSubtasks(), cursor.f0.getShardId(), cursor.f1);
                }
            }

            LOG.info("Setting restore state in the FlinkLogConsumer. Using the following offsets: {}",
                    cursorsToRestore);
        }
    }
    else {
        LOG.info("No restore state for FlinkLogConsumer.");
    }
}
 
开发者ID:aliyun,项目名称:aliyun-log-flink-connector,代码行数:29,代码来源:FlinkLogConsumer.java


示例8: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
    Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized.");

    try {
        initFileSystem();
    } catch (IOException e) {
        LOG.error("Error while creating FileSystem when initializing the state of the TODBucketingSink.", e);
        throw new RuntimeException("Error while creating FileSystem when initializing the state of the TODBucketingSink.", e);
    }

    if (this.refTruncate == null) {
        this.refTruncate = reflectTruncate(fs);
    }

    OperatorStateStore stateStore = context.getOperatorStateStore();
    restoredBucketStates = stateStore.getSerializableListState("bucket-states");

    int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
    if (context.isRestored()) {
        LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);

        for (State<T> recoveredState : restoredBucketStates.get()) {
            handleRestoredBucketState(recoveredState);
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} idx {} restored {}", getClass().getSimpleName(), subtaskIndex, recoveredState);
            }
        }
    } else {
        LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
    }
}
 
开发者ID:breakEval13,项目名称:rocketmq-flink-plugin,代码行数:33,代码来源:TODBucketingSink.java


示例9: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
	if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
		LOG.warn("Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", semantic, Semantic.NONE);
		semantic = Semantic.NONE;
	}

	nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
		NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
	transactionalIdsGenerator = new TransactionalIdsGenerator(
		getRuntimeContext().getTaskName(),
		getRuntimeContext().getIndexOfThisSubtask(),
		getRuntimeContext().getNumberOfParallelSubtasks(),
		kafkaProducersPoolSize,
		SAFE_SCALE_DOWN_FACTOR);

	if (semantic != Semantic.EXACTLY_ONCE) {
		nextTransactionalIdHint = null;
	} else {
		ArrayList<NextTransactionalIdHint> transactionalIdHints = Lists.newArrayList(nextTransactionalIdHintState.get());
		if (transactionalIdHints.size() > 1) {
			throw new IllegalStateException(
				"There should be at most one next transactional id hint written by the first subtask");
		} else if (transactionalIdHints.size() == 0) {
			nextTransactionalIdHint = new NextTransactionalIdHint(0, 0);

			// this means that this is either:
			// (1) the first execution of this application
			// (2) previous execution has failed before first checkpoint completed
			//
			// in case of (2) we have to abort all previous transactions
			abortTransactions(transactionalIdsGenerator.generateIdsToAbort());
		} else {
			nextTransactionalIdHint = transactionalIdHints.get(0);
		}
	}

	super.initializeState(context);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:40,代码来源:FlinkKafkaProducer011.java


示例10: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
	Preconditions.checkArgument(this.restoredBucketStates == null,
		"The " + getClass().getSimpleName() + " has already been initialized.");

	try {
		initFileSystem();
	} catch (IOException e) {
		LOG.error("Error while creating FileSystem when initializing the state of the RollingSink.", e);
		throw new RuntimeException("Error while creating FileSystem when initializing the state of the RollingSink.", e);
	}

	if (this.refTruncate == null) {
		this.refTruncate = reflectTruncate(fs);
	}

	OperatorStateStore stateStore = context.getOperatorStateStore();
	restoredBucketStates = stateStore.getSerializableListState("rolling-states");

	int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
	if (context.isRestored()) {
		LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);

		for (BucketState bucketState : restoredBucketStates.get()) {
			handleRestoredBucketState(bucketState);
		}

		if (LOG.isDebugEnabled()) {
			LOG.debug("{} (taskIdx= {}) restored {}", getClass().getSimpleName(), subtaskIndex, bucketState);
		}
	} else {
		LOG.info("No state to restore for the {} (taskIdx= {}).", getClass().getSimpleName(), subtaskIndex);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:35,代码来源:RollingSink.java


示例11: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
	Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized.");

	try {
		initFileSystem();
	} catch (IOException e) {
		LOG.error("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
		throw new RuntimeException("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
	}

	if (this.refTruncate == null) {
		this.refTruncate = reflectTruncate(fs);
	}

	OperatorStateStore stateStore = context.getOperatorStateStore();
	restoredBucketStates = stateStore.getSerializableListState("bucket-states");

	int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
	if (context.isRestored()) {
		LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);

		for (State<T> recoveredState : restoredBucketStates.get()) {
			handleRestoredBucketState(recoveredState);
			if (LOG.isDebugEnabled()) {
				LOG.debug("{} idx {} restored {}", getClass().getSimpleName(), subtaskIndex, recoveredState);
			}
		}
	} else {
		LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:33,代码来源:BucketingSink.java


示例12: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
	pojoClass = getRuntimeContext().getUserCodeClassLoader().loadClass(POJO_NAME);

	fieldA = pojoClass.getDeclaredField("a");
	fieldA.setAccessible(true);

	if (hasBField) {
		fieldB = pojoClass.getDeclaredField("b");
		fieldB.setAccessible(true);
	}

	if (keyed) {
		keyedValueState = context.getKeyedStateStore().getState(
			new ValueStateDescriptor<>("keyedValueState", (Class<Object>) pojoClass));
		keyedMapState = context.getKeyedStateStore().getMapState(
			new MapStateDescriptor<>("keyedMapState", (Class<Object>) pojoClass, (Class<Object>) pojoClass));
		keyedListState = context.getKeyedStateStore().getListState(
			new ListStateDescriptor<>("keyedListState", (Class<Object>) pojoClass));

		ReduceFunction<Object> reduceFunction = new FirstValueReducer<>();
		keyedReducingState = context.getKeyedStateStore().getReducingState(
			new ReducingStateDescriptor<>("keyedReducingState", reduceFunction, (Class<Object>) pojoClass));
	} else {
		partitionableListState = context.getOperatorStateStore().getListState(
			new ListStateDescriptor<>("partitionableListState", (Class<Object>) pojoClass));
		unionListState = context.getOperatorStateStore().getUnionListState(
			new ListStateDescriptor<>("unionListState", (Class<Object>) pojoClass));
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:31,代码来源:PojoSerializerUpgradeTest.java


示例13: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
	ValueStateDescriptor<Integer> descriptor =
		new ValueStateDescriptor<>(
			"seenCountState",
			TypeInformation.of(new TypeHint<Integer>() {}),
			0);
	alreadySeen = context.getKeyedStateStore().getState(descriptor);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:JobManagerHACheckpointRecoveryITCase.java


示例14: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

	Preconditions.checkState(this.checkpointedState == null,
		"The " + getClass().getSimpleName() + " has already been initialized.");

	this.checkpointedState = context.getOperatorStateStore().getListState(
		new ListStateDescriptor<>(
			"stateful-sequence-source-state",
			LongSerializer.INSTANCE
		)
	);

	this.valuesToEmit = new ArrayDeque<>();
	if (context.isRestored()) {
		// upon restoring

		for (Long v : this.checkpointedState.get()) {
			this.valuesToEmit.add(v);
		}
	} else {
		// the first time the job is executed

		final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
		final int taskIdx = getRuntimeContext().getIndexOfThisSubtask();
		final long congruence = start + taskIdx;

		long totalNoOfElements = Math.abs(end - start + 1);
		final int baseSize = safeDivide(totalNoOfElements, stepSize);
		final int toCollect = (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize;

		for (long collected = 0; collected < toCollect; collected++) {
			this.valuesToEmit.add(collected * stepSize + congruence);
		}
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:37,代码来源:StatefulSequenceSource.java


示例15: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
	Preconditions.checkState(this.checkpointedState == null,
		"The " + getClass().getSimpleName() + " has already been initialized.");

	this.checkpointedState = context
		.getOperatorStateStore()
		.getSerializableListState("message-acknowledging-source-state");

	this.idsForCurrentCheckpoint = new HashSet<>(64);
	this.pendingCheckpoints = new ArrayDeque<>();
	this.idsProcessedButNotAcknowledged = new HashSet<>();

	if (context.isRestored()) {
		LOG.info("Restoring state for the {}.", getClass().getSimpleName());

		List<SerializedCheckpointData[]> retrievedStates = new ArrayList<>();
		for (SerializedCheckpointData[] entry : this.checkpointedState.get()) {
			retrievedStates.add(entry);
		}

		// given that the parallelism of the function is 1, we can only have at most 1 state
		Preconditions.checkArgument(retrievedStates.size() == 1,
			getClass().getSimpleName() + " retrieved invalid state.");

		pendingCheckpoints = SerializedCheckpointData.toDeque(retrievedStates.get(0), idSerializer);
		// build a set which contains all processed ids. It may be used to check if we have
		// already processed an incoming message.
		for (Tuple2<Long, Set<UId>> checkpoint : pendingCheckpoints) {
			idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
		}
	} else {
		LOG.info("No state to restore for the {}.", getClass().getSimpleName());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:36,代码来源:MessageAcknowledgingSourceBase.java


示例16: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
	serializableListState = context
			.getOperatorStateStore()
			.getListState(new ListStateDescriptor<>("test-state", IntSerializer.INSTANCE));
	if (context.isRestored()) {
		Iterator<Integer> integers = serializableListState.get().iterator();
		int act = integers.next();
		Assert.assertEquals(42, act);
		Assert.assertFalse(integers.hasNext());
		wasRestored = true;
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:14,代码来源:WrappingFunctionSnapshotRestoreTest.java


示例17: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
	Preconditions.checkArgument(this.restoredBucketStates == null,
		"The " + getClass().getSimpleName() + " has already been initialized.");

	initFileSystem();

	if (this.refTruncate == null) {
		this.refTruncate = reflectTruncate(fs);
	}

	OperatorStateStore stateStore = context.getOperatorStateStore();
	restoredBucketStates = stateStore.getSerializableListState("rolling-states");

	int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
	if (context.isRestored()) {
		LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);

		for (BucketState bucketState : restoredBucketStates.get()) {
			handleRestoredBucketState(bucketState);
		}

		if (LOG.isDebugEnabled()) {
			LOG.debug("{} (taskIdx= {}) restored {}", getClass().getSimpleName(), subtaskIndex, bucketState);
		}
	} else {
		LOG.info("No state to restore for the {} (taskIdx= {}).", getClass().getSimpleName(), subtaskIndex);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:30,代码来源:RollingSink.java


示例18: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
	Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized.");

	initFileSystem();

	if (this.refTruncate == null) {
		this.refTruncate = reflectTruncate(fs);
	}

	OperatorStateStore stateStore = context.getOperatorStateStore();
	restoredBucketStates = stateStore.getSerializableListState("bucket-states");

	int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
	if (context.isRestored()) {
		LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);

		for (State<T> recoveredState : restoredBucketStates.get()) {
			handleRestoredBucketState(recoveredState);
			if (LOG.isDebugEnabled()) {
				LOG.debug("{} idx {} restored {}", getClass().getSimpleName(), subtaskIndex, recoveredState);
			}
		}
	} else {
		LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:28,代码来源:BucketingSink.java


示例19: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
  //nothing to initialize, as in flight documents are completely flushed during checkpointing
}
 
开发者ID:awslabs,项目名称:flink-stream-processing-refarch,代码行数:5,代码来源:ElasticsearchJestSink.java


示例20: initializeState

import org.apache.flink.runtime.state.FunctionInitializationContext; //导入依赖的package包/类
public void initializeState(FunctionInitializationContext context) throws Exception {
    // do nothing
}
 
开发者ID:aliyun,项目名称:aliyun-log-flink-connector,代码行数:4,代码来源:FlinkLogProducer.java



注:本文中的org.apache.flink.runtime.state.FunctionInitializationContext类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Command类代码示例发布时间:2022-05-23
下一篇:
Java StreamReaderBufferProcessor类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap