• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java FunctionSnapshotContext类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.runtime.state.FunctionSnapshotContext的典型用法代码示例。如果您正苦于以下问题:Java FunctionSnapshotContext类的具体用法?Java FunctionSnapshotContext怎么用?Java FunctionSnapshotContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



FunctionSnapshotContext类属于org.apache.flink.runtime.state包,在下文中一共展示了FunctionSnapshotContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
	super.snapshotState(context);

	nextTransactionalIdHintState.clear();
	// To avoid duplication only first subtask keeps track of next transactional id hint. Otherwise all of the
	// subtasks would write exactly same information.
	if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == Semantic.EXACTLY_ONCE) {
		checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE");
		long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;

		// If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that
		// case we adjust nextFreeTransactionalId by the range of transactionalIds that could be used for this
		// scaling up.
		if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {
			nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
		}

		nextTransactionalIdHintState.add(new NextTransactionalIdHint(
			getRuntimeContext().getNumberOfParallelSubtasks(),
			nextFreeTransactionalId));
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:FlinkKafkaProducer011.java


示例2: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
	// check for asynchronous errors and fail the checkpoint if necessary
	checkErroneous();

	if (flushOnCheckpoint) {
		// flushing is activated: We need to wait until pendingRecords is 0
		flush();
		synchronized (pendingRecordsLock) {
			if (pendingRecords != 0) {
				throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecords);
			}

			// if the flushed requests has errors, we should propagate it also and fail the checkpoint
			checkErroneous();
		}
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:FlinkKafkaProducerBase.java


示例3: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
	Preconditions.checkNotNull(restoredBucketStates,
		"The " + getClass().getSimpleName() + " has not been properly initialized.");

	int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();

	if (isWriterOpen) {
		bucketState.currentFile = currentPartPath.toString();
		bucketState.currentFileValidLength = writer.flush();
	}

	synchronized (bucketState.pendingFilesPerCheckpoint) {
		bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
	}
	bucketState.pendingFiles = new ArrayList<>();

	restoredBucketStates.clear();
	restoredBucketStates.add(bucketState);

	if (LOG.isDebugEnabled()) {
		LOG.debug("{} (taskIdx={}) checkpointed {}.", getClass().getSimpleName(), subtaskIdx, bucketState);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:RollingSink.java


示例4: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {

	counterPartitions.clear();

	checkCorrectSnapshot[getRuntimeContext().getIndexOfThisSubtask()] = counter;

	int div = counter / NUM_PARTITIONS;
	int mod = counter % NUM_PARTITIONS;

	for (int i = 0; i < NUM_PARTITIONS; ++i) {
		int partitionValue = div;
		if (mod > 0) {
			--mod;
			++partitionValue;
		}
		counterPartitions.add(partitionValue);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:RescalingITCase.java


示例5: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
	// this is like the pre-commit of a 2-phase-commit transaction
	// we are ready to commit and remember the transaction

	checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");

	long checkpointId = context.getCheckpointId();
	LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);

	preCommit(currentTransactionHolder.handle);
	pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
	LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);

	currentTransactionHolder = beginTransactionInternal();
	LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);

	state.clear();
	state.add(new State<>(
		this.currentTransactionHolder,
		new ArrayList<>(pendingCommitTransactions.values()),
		userContext));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:TwoPhaseCommitSinkFunction.java


示例6: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
	Preconditions.checkState(this.checkpointedState != null,
		"The " + getClass().getSimpleName() + " has not been properly initialized.");

	if (LOG.isDebugEnabled()) {
		LOG.debug("{} checkpointing: Messages: {}, checkpoint id: {}, timestamp: {}",
			idsForCurrentCheckpoint, context.getCheckpointId(), context.getCheckpointTimestamp());
	}

	pendingCheckpoints.addLast(new Tuple2<>(context.getCheckpointId(), idsForCurrentCheckpoint));
	idsForCurrentCheckpoint = new HashSet<>(64);

	this.checkpointedState.clear();
	this.checkpointedState.add(SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:17,代码来源:MessageAcknowledgingSourceBase.java


示例7: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
	Preconditions.checkNotNull(restoredBucketStates,
		"The " + getClass().getSimpleName() + " has not been properly initialized.");

	int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
	
	if (isWriterOpen) {
		bucketState.currentFile = currentPartPath.toString();
		bucketState.currentFileValidLength = writer.flush();
	}

	synchronized (bucketState.pendingFilesPerCheckpoint) {
		bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
	}
	bucketState.pendingFiles = new ArrayList<>();

	restoredBucketStates.clear();
	restoredBucketStates.add(bucketState);

	if (LOG.isDebugEnabled()) {
		LOG.debug("{} (taskIdx={}) checkpointed {}.", getClass().getSimpleName(), subtaskIdx, bucketState);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:RollingSink.java


示例8: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
  do {
    try {
      flushDocumentBuffer();
    } catch (IOException e) {
      //if the request fails, that's fine, just retry on the next iteration
    }
  } while (! documentBuffer.isEmpty());
}
 
开发者ID:awslabs,项目名称:flink-stream-processing-refarch,代码行数:11,代码来源:ElasticsearchJestSink.java


示例9: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");

    restoredBucketStates.clear();

    synchronized (state.bucketStates) {
        int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();

        for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
            BucketState<T> bucketState = bucketStateEntry.getValue();

            if (bucketState.isWriterOpen) {
                bucketState.currentFileValidLength = bucketState.writer.flush();
            }

            synchronized (bucketState.pendingFilesPerCheckpoint) {
                bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
            }
            bucketState.pendingFiles = new ArrayList<>();
        }
        restoredBucketStates.add(state);

        if (LOG.isDebugEnabled()) {
            LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
        }
    }
}
 
开发者ID:breakEval13,项目名称:rocketmq-flink-plugin,代码行数:29,代码来源:TODBucketingSink.java


示例10: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
	checkErrorAndRethrow();

	if (flushOnCheckpoint) {
		do {
			bulkProcessor.flush();
			checkErrorAndRethrow();
		} while (numPendingRequests.get() != 0);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:12,代码来源:ElasticsearchSinkBase.java


示例11: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
	isFlushed = false;

	super.snapshotState(ctx);

	// if the snapshot implementation doesn't wait until all pending records are flushed, we should fail the test
	if (flushOnCheckpoint && !isFlushed) {
		throw new RuntimeException("Flushing is enabled; snapshots should be blocked until all pending records are flushed");
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:12,代码来源:FlinkKafkaProducerBaseTest.java


示例12: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
	// check for asynchronous errors and fail the checkpoint if necessary
	checkAndPropagateAsyncError();

	flushSync();
	if (producer.getOutstandingRecordsCount() > 0) {
		throw new IllegalStateException(
			"Number of outstanding records must be zero at this point: " + producer.getOutstandingRecordsCount());
	}

	// if the flushed requests has errors, we should propagate it also and fail the checkpoint
	checkAndPropagateAsyncError();
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:15,代码来源:FlinkKinesisProducer.java


示例13: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
	isFlushed = false;

	super.snapshotState(context);

	// if the snapshot implementation doesn't wait until all pending records are flushed, we should fail the test
	if (!isFlushed) {
		throw new RuntimeException("Flushing is enabled; snapshots should be blocked until all pending records are flushed");
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:12,代码来源:FlinkKinesisProducerTest.java


示例14: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
	Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");

	restoredBucketStates.clear();

	synchronized (state.bucketStates) {
		int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();

		for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
			BucketState<T> bucketState = bucketStateEntry.getValue();

			if (bucketState.isWriterOpen) {
				bucketState.currentFileValidLength = bucketState.writer.flush();
			}

			synchronized (bucketState.pendingFilesPerCheckpoint) {
				bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
			}
			bucketState.pendingFiles = new ArrayList<>();
		}
		restoredBucketStates.add(state);

		if (LOG.isDebugEnabled()) {
			LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
		}
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:29,代码来源:BucketingSink.java


示例15: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
	Preconditions.checkState(this.checkpointedState != null,
		"The " + getClass().getSimpleName() + " state has not been properly initialized.");

	this.checkpointedState.clear();
	for (Long v : this.valuesToEmit) {
		this.checkpointedState.add(v);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:11,代码来源:StatefulSequenceSource.java


示例16: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
	Preconditions.checkState(this.checkpointedState != null,
		"The " + getClass().getSimpleName() + " has not been properly initialized.");

	this.checkpointedState.clear();
	this.checkpointedState.add(this.numElementsEmitted);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:9,代码来源:FromElementsFunction.java


示例17: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
	Preconditions.checkState(this.checkpointedState != null,
		"The " + getClass().getSimpleName() + " state has not been properly initialized.");

	this.checkpointedState.clear();
	this.checkpointedState.add(this.globalModificationTime);

	if (LOG.isDebugEnabled()) {
		LOG.debug("{} checkpointed {}.", getClass().getSimpleName(), globalModificationTime);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:13,代码来源:ContinuousFileMonitoringFunction.java


示例18: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
	if (flushOnCheckpoint) {
		// flushing is activated: We need to wait until pendingRecords is 0
		flush();
		synchronized (pendingRecordsLock) {
			if (pendingRecords != 0) {
				throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecords);
			}
			// pending records count is 0. We can now confirm the checkpoint
		}
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:14,代码来源:FlinkKafkaProducerBase.java


示例19: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
	// call the actual snapshot state
	super.snapshotState(ctx);
	// notify test that snapshotting has been done
	snapshottingFinished.set(true);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:8,代码来源:AtLeastOnceProducerTest.java


示例20: snapshotState

import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    if(logProducer != null) {
        logProducer.flush();
        Thread.sleep(logProducer.getProducerConfig().packageTimeoutInMS);
    }
}
 
开发者ID:aliyun,项目名称:aliyun-log-flink-connector,代码行数:7,代码来源:FlinkLogProducer.java



注:本文中的org.apache.flink.runtime.state.FunctionSnapshotContext类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java EventHandler类代码示例发布时间:2022-05-23
下一篇:
Java ODataService类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap