本文整理汇总了Java中org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext类的典型用法代码示例。如果您正苦于以下问题:Java SourceContext类的具体用法?Java SourceContext怎么用?Java SourceContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
SourceContext类属于org.apache.flink.streaming.api.functions.source.SourceFunction包,在下文中一共展示了SourceContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: Kafka08Fetcher
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
public Kafka08Fetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
KeyedDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long autoCommitInterval,
boolean useMetrics) throws Exception {
super(
sourceContext,
seedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
useMetrics);
this.deserializer = checkNotNull(deserializer);
this.kafkaConfig = checkNotNull(kafkaProperties);
this.runtimeContext = runtimeContext;
this.invalidOffsetBehavior = getInvalidOffsetBehavior(kafkaProperties);
this.autoCommitInterval = autoCommitInterval;
}
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:Kafka08Fetcher.java
示例2: TestFetcher
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
protected TestFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval) throws Exception {
super(
sourceContext,
assignedPartitionsWithStartOffsets,
watermarksPeriodic,
watermarksPunctuated,
processingTimeProvider,
autoWatermarkInterval,
TestFetcher.class.getClassLoader(),
false);
}
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:AbstractFetcherTest.java
示例3: TestFetcher
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
protected TestFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval) throws Exception
{
super(
sourceContext,
assignedPartitionsWithStartOffsets,
watermarksPeriodic,
watermarksPunctuated,
processingTimeProvider,
autoWatermarkInterval,
TestFetcher.class.getClassLoader(),
false);
}
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:AbstractFetcherTimestampsTest.java
示例4: testRunExecuteFixedNumber
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test
public void testRunExecuteFixedNumber() throws Exception {
final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
declarer.declare(new Fields("dummy"));
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments()
.thenReturn(declarer);
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
when(taskContext.getTaskName()).thenReturn("name");
final IRichSpout spout = mock(IRichSpout.class);
final int numberOfCalls = this.r.nextInt(50);
final SpoutWrapper<?> spoutWrapper = new SpoutWrapper<Object>(spout,
numberOfCalls);
spoutWrapper.setRuntimeContext(taskContext);
spoutWrapper.run(mock(SourceContext.class));
verify(spout, times(numberOfCalls)).nextTuple();
}
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:SpoutWrapperTest.java
示例5: runAndExecuteFiniteSpout
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test
public void runAndExecuteFiniteSpout() throws Exception {
final FiniteSpout stormSpout = mock(FiniteSpout.class);
when(stormSpout.reachedEnd()).thenReturn(false, false, false, true, false, false, true);
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
when(taskContext.getTaskName()).thenReturn("name");
final SpoutWrapper<?> wrapper = new SpoutWrapper<Object>(stormSpout);
wrapper.setRuntimeContext(taskContext);
wrapper.run(mock(SourceContext.class));
verify(stormSpout, times(3)).nextTuple();
}
开发者ID:axbaretto,项目名称:flink,代码行数:17,代码来源:SpoutWrapperTest.java
示例6: runAndExecuteFiniteSpout2
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test
public void runAndExecuteFiniteSpout2() throws Exception {
final FiniteSpout stormSpout = mock(FiniteSpout.class);
when(stormSpout.reachedEnd()).thenReturn(true, false, true, false, true, false, true);
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
when(taskContext.getTaskName()).thenReturn("name");
final SpoutWrapper<?> wrapper = new SpoutWrapper<Object>(stormSpout);
wrapper.setRuntimeContext(taskContext);
wrapper.run(mock(SourceContext.class));
verify(stormSpout, never()).nextTuple();
}
开发者ID:axbaretto,项目名称:flink,代码行数:17,代码来源:SpoutWrapperTest.java
示例7: ReceiverActor
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
public ReceiverActor(SourceContext<Object> ctx,
String urlOfPublisher,
boolean autoAck) {
this.ctx = ctx;
this.urlOfPublisher = urlOfPublisher;
this.autoAck = autoAck;
}
开发者ID:apache,项目名称:bahir-flink,代码行数:8,代码来源:ReceiverActor.java
示例8: Kafka010Fetcher
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
public Kafka010Fetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
String taskNameWithSubtasks,
MetricGroup metricGroup,
KeyedDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long pollTimeout,
boolean useMetrics) throws Exception {
super(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
processingTimeProvider,
autoWatermarkInterval,
userCodeClassLoader,
taskNameWithSubtasks,
metricGroup,
deserializer,
kafkaProperties,
pollTimeout,
useMetrics);
}
开发者ID:axbaretto,项目名称:flink,代码行数:30,代码来源:Kafka010Fetcher.java
示例9: PeriodicWatermarkEmitter
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
PeriodicWatermarkEmitter(
List<KafkaTopicPartitionState<KPH>> allPartitions,
SourceContext<?> emitter,
ProcessingTimeService timerService,
long autoWatermarkInterval) {
this.allPartitions = checkNotNull(allPartitions);
this.emitter = checkNotNull(emitter);
this.timerService = checkNotNull(timerService);
this.interval = autoWatermarkInterval;
this.lastWatermarkTimestamp = Long.MIN_VALUE;
}
开发者ID:axbaretto,项目名称:flink,代码行数:12,代码来源:AbstractFetcher.java
示例10: testSpoutStormCollectorWithTaskId
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testSpoutStormCollectorWithTaskId() throws InstantiationException, IllegalAccessException {
for (int numberOfAttributes = 0; numberOfAttributes < 25; ++numberOfAttributes) {
final SourceContext flinkCollector = mock(SourceContext.class);
final int taskId = 42;
final String streamId = "streamId";
HashMap<String, Integer> attributes = new HashMap<String, Integer>();
attributes.put(streamId, numberOfAttributes);
SpoutCollector<?> collector = new SpoutCollector(attributes, taskId, flinkCollector);
final Values tuple = new Values();
final Tuple flinkTuple = Tuple.getTupleClass(numberOfAttributes + 1).newInstance();
for (int i = 0; i < numberOfAttributes; ++i) {
tuple.add(new Integer(this.r.nextInt()));
flinkTuple.setField(tuple.get(i), i);
}
flinkTuple.setField(taskId, numberOfAttributes);
final List<Integer> taskIds;
final Object messageId = new Integer(this.r.nextInt());
taskIds = collector.emit(streamId, tuple, messageId);
Assert.assertNull(taskIds);
verify(flinkCollector).collect(flinkTuple);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:33,代码来源:SpoutCollectorTest.java
示例11: testTooManyAttributes
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test(expected = UnsupportedOperationException.class)
public void testTooManyAttributes() {
HashMap<String, Integer> attributes = new HashMap<String, Integer>();
attributes.put("", 26);
new SpoutCollector<Object>(attributes, -1, mock(SourceContext.class));
}
开发者ID:axbaretto,项目名称:flink,代码行数:9,代码来源:SpoutCollectorTest.java
示例12: testTooManyAttributesWithTaskId
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test(expected = UnsupportedOperationException.class)
public void testTooManyAttributesWithTaskId() {
HashMap<String, Integer> attributes = new HashMap<String, Integer>();
attributes.put("", 25);
new SpoutCollector<Object>(attributes, 42, mock(SourceContext.class));
}
开发者ID:axbaretto,项目名称:flink,代码行数:9,代码来源:SpoutCollectorTest.java
示例13: testRawStreamWithTaskId
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test(expected = UnsupportedOperationException.class)
public void testRawStreamWithTaskId() {
HashMap<String, Integer> attributes = new HashMap<String, Integer>();
attributes.put("", -1);
new SpoutCollector<Object>(attributes, 42, mock(SourceContext.class));
}
开发者ID:axbaretto,项目名称:flink,代码行数:9,代码来源:SpoutCollectorTest.java
示例14: Kafka08Fetcher
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
public Kafka08Fetcher(
SourceContext<T> sourceContext,
List<KafkaTopicPartition> assignedPartitions,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
KeyedDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long invalidOffsetBehavior,
long autoCommitInterval,
boolean useMetrics) throws Exception
{
super(
sourceContext,
assignedPartitions,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
useMetrics);
this.deserializer = checkNotNull(deserializer);
this.kafkaConfig = checkNotNull(kafkaProperties);
this.runtimeContext = runtimeContext;
this.invalidOffsetBehavior = invalidOffsetBehavior;
this.autoCommitInterval = autoCommitInterval;
this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
// initially, all these partitions are not assigned to a specific broker connection
for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
unassignedPartitionsQueue.add(partition);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:35,代码来源:Kafka08Fetcher.java
示例15: Kafka010Fetcher
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
public Kafka010Fetcher(
SourceContext<T> sourceContext,
List<KafkaTopicPartition> assignedPartitions,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
boolean enableCheckpointing,
String taskNameWithSubtasks,
MetricGroup metricGroup,
KeyedDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long pollTimeout,
boolean useMetrics) throws Exception
{
super(
sourceContext,
assignedPartitions,
watermarksPeriodic,
watermarksPunctuated,
processingTimeProvider,
autoWatermarkInterval,
userCodeClassLoader,
enableCheckpointing,
taskNameWithSubtasks,
metricGroup,
deserializer,
kafkaProperties,
pollTimeout,
useMetrics);
}
开发者ID:axbaretto,项目名称:flink,代码行数:33,代码来源:Kafka010Fetcher.java
示例16: PeriodicWatermarkEmitter
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
PeriodicWatermarkEmitter(
KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
SourceContext<?> emitter,
ProcessingTimeService timerService,
long autoWatermarkInterval)
{
this.allPartitions = checkNotNull(allPartitions);
this.emitter = checkNotNull(emitter);
this.timerService = checkNotNull(timerService);
this.interval = autoWatermarkInterval;
this.lastWatermarkTimestamp = Long.MIN_VALUE;
}
开发者ID:axbaretto,项目名称:flink,代码行数:13,代码来源:AbstractFetcher.java
示例17: TestFetcher
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
protected TestFetcher(
SourceContext<T> sourceContext,
List<KafkaTopicPartition> assignedPartitions,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval) throws Exception
{
super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, TestFetcher.class.getClassLoader(), false);
}
开发者ID:axbaretto,项目名称:flink,代码行数:11,代码来源:AbstractFetcherTimestampsTest.java
示例18: Kafka09Fetcher
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
public Kafka09Fetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
String taskNameWithSubtasks,
MetricGroup metricGroup,
KeyedDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long pollTimeout,
boolean useMetrics) throws Exception {
super(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
processingTimeProvider,
autoWatermarkInterval,
userCodeClassLoader,
useMetrics);
this.deserializer = deserializer;
this.handover = new Handover();
final MetricGroup kafkaMetricGroup = metricGroup.addGroup(KAFKA_CONSUMER_METRICS_GROUP);
addOffsetStateGauge(kafkaMetricGroup);
this.consumerThread = new KafkaConsumerThread(
LOG,
handover,
kafkaProperties,
unassignedPartitionsQueue,
kafkaMetricGroup,
createCallBridge(),
getFetcherName() + " for " + taskNameWithSubtasks,
pollTimeout,
useMetrics);
}
开发者ID:axbaretto,项目名称:flink,代码行数:42,代码来源:Kafka09Fetcher.java
示例19: AbstractFetcher
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
protected AbstractFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
boolean useMetrics) throws Exception {
this.sourceContext = checkNotNull(sourceContext);
this.checkpointLock = sourceContext.getCheckpointLock();
this.useMetrics = useMetrics;
this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
// figure out what we watermark mode we will be using
this.watermarksPeriodic = watermarksPeriodic;
this.watermarksPunctuated = watermarksPunctuated;
if (watermarksPeriodic == null) {
if (watermarksPunctuated == null) {
// simple case, no watermarks involved
timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
} else {
timestampWatermarkMode = PUNCTUATED_WATERMARKS;
}
} else {
if (watermarksPunctuated == null) {
timestampWatermarkMode = PERIODIC_WATERMARKS;
} else {
throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
}
}
this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
// initialize subscribed partition states with seed partitions
this.subscribedPartitionStates = createPartitionStateHolders(
seedPartitionsWithInitialOffsets,
timestampWatermarkMode,
watermarksPeriodic,
watermarksPunctuated,
userCodeClassLoader);
// check that all seed partition states have a defined offset
for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) {
if (!partitionState.isOffsetDefined()) {
throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets.");
}
}
// all seed partitions are not assigned yet, so should be added to the unassigned partitions queue
for (KafkaTopicPartitionState<KPH> partition : subscribedPartitionStates) {
unassignedPartitionsQueue.add(partition);
}
// if we have periodic watermarks, kick off the interval scheduler
if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
@SuppressWarnings("unchecked")
PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter(
subscribedPartitionStates,
sourceContext,
processingTimeProvider,
autoWatermarkInterval);
periodicEmitter.start();
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:68,代码来源:AbstractFetcher.java
示例20: testRunPrepare
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; //导入依赖的package包/类
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testRunPrepare() throws Exception {
final StormConfig stormConfig = new StormConfig();
stormConfig.put(this.r.nextInt(), this.r.nextInt());
final Configuration flinkConfig = new Configuration();
flinkConfig.setInteger("testKey", this.r.nextInt());
final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
.thenReturn(flinkConfig);
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
when(taskContext.getTaskName()).thenReturn("name");
final IRichSpout spout = mock(IRichSpout.class);
SpoutWrapper spoutWrapper = new SpoutWrapper(spout);
spoutWrapper.setRuntimeContext(taskContext);
spoutWrapper.cancel();
// test without configuration
spoutWrapper.run(mock(SourceContext.class));
verify(spout).open(any(Map.class), any(TopologyContext.class),
any(SpoutOutputCollector.class));
// test with StormConfig
spoutWrapper.run(mock(SourceContext.class));
verify(spout).open(eq(stormConfig), any(TopologyContext.class),
any(SpoutOutputCollector.class));
// test with Configuration
final TestDummySpout testSpout = new TestDummySpout();
spoutWrapper = new SpoutWrapper(testSpout);
spoutWrapper.setRuntimeContext(taskContext);
spoutWrapper.cancel();
spoutWrapper.run(mock(SourceContext.class));
for (Entry<String, String> entry : flinkConfig.toMap().entrySet()) {
Assert.assertEquals(entry.getValue(), testSpout.config.get(entry.getKey()));
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:43,代码来源:SpoutWrapperTest.java
注:本文中的org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论