本文整理汇总了Java中org.apache.flink.api.common.restartstrategy.RestartStrategies类的典型用法代码示例。如果您正苦于以下问题:Java RestartStrategies类的具体用法?Java RestartStrategies怎么用?Java RestartStrategies使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
RestartStrategies类属于org.apache.flink.api.common.restartstrategy包,在下文中一共展示了RestartStrategies类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: run
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
/**
* Parses the {@link StreamingAppConfiguration} found inside the referenced configuration file,
* validates the contents and passes it on to {@link StreamingAppRuntime#run(StreamingAppConfiguration)}
* for further processing (to be implemented by extending class)
* @param args
* The list of arguments received from the command-line interface
* @param messageOutputStream
* The output stream to export messages (error, info, ...) into
* @param configurationType
* The expected configuration structure
* @param streamExecutionEnvironment
* Externally provided execution environment
* @throws Exception
* Thrown in case anything fails during application ramp up
*/
protected void run(final String[] args, final OutputStream messageOutputStream, final Class<T> configurationType, final StreamExecutionEnvironment streamExecutionEnvironment) throws Exception {
CommandLine cl = parseCommandLine(args);
final OutputStream stream = (messageOutputStream != null ? messageOutputStream : System.out);
if(!validateCommandLine(cl, stream)) {
printUsage(stream);
return;
}
T configuration = new ObjectMapper().readValue(new File(cl.getOptionValue(CLI_CONFIG_FILE)), configurationType);
if(!validateConfiguration(configuration, stream)) {
printUsage(stream);
return;
}
this.executionEnvironment = streamExecutionEnvironment;
this.executionEnvironment.setParallelism(configuration.getParallelism());
this.executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(configuration.getExecutionRetries(), 1000));
this.applicationName = configuration.getApplicationName();
this.applicationDescription = configuration.getApplicationDescription();
run(configuration);
}
开发者ID:ottogroup,项目名称:flink-operator-library,代码行数:39,代码来源:StreamingAppRuntime.java
示例2: main
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
// parse arguments
ParameterTool params = ParameterTool.fromPropertiesFile(args[0]);
// create streaming environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable event time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// enable fault-tolerance
env.enableCheckpointing(1000);
// enable restarts
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, 500L));
env.setStateBackend(new FsStateBackend("file:///home/robert/flink-workdir/flink-streaming-etl/state-backend"));
// run each operator separately
env.disableOperatorChaining();
// get data from Kafka
Properties kParams = params.getProperties();
kParams.setProperty("group.id", UUID.randomUUID().toString());
DataStream<ObjectNode> inputStream = env.addSource(new FlinkKafkaConsumer09<>(params.getRequired("topic"), new JSONDeserializationSchema(), kParams)).name("Kafka 0.9 Source")
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ObjectNode>(Time.minutes(1L)) {
@Override
public long extractTimestamp(ObjectNode jsonNodes) {
return jsonNodes.get("timestamp_ms").asLong();
}
}).name("Timestamp extractor");
// filter out records without lang field
DataStream<ObjectNode> tweetsWithLang = inputStream.filter(jsonNode -> jsonNode.has("user") && jsonNode.get("user").has("lang")).name("Filter records without 'lang' field");
// select only lang = "en" tweets
DataStream<ObjectNode> englishTweets = tweetsWithLang.filter(jsonNode -> jsonNode.get("user").get("lang").asText().equals("en")).name("Select 'lang'=en tweets");
// write to file system
RollingSink<ObjectNode> rollingSink = new RollingSink<>(params.get("sinkPath", "/home/robert/flink-workdir/flink-streaming-etl/rolling-sink"));
rollingSink.setBucketer(new DateTimeBucketer("yyyy-MM-dd-HH-mm")); // do a bucket for each minute
englishTweets.addSink(rollingSink).name("Rolling FileSystem Sink");
// build aggregates (count per language) using window (10 seconds tumbling):
DataStream<Tuple3<Long, String, Long>> languageCounts = tweetsWithLang.keyBy(jsonNode -> jsonNode.get("user").get("lang").asText())
.timeWindow(Time.seconds(10))
.apply(new Tuple3<>(0L, "", 0L), new JsonFoldCounter(), new CountEmitter()).name("Count per Langauage (10 seconds tumbling)");
// write window aggregate to ElasticSearch
List<InetSocketAddress> transportNodes = ImmutableList.of(new InetSocketAddress(InetAddress.getByName("localhost"), 9300));
ElasticsearchSink<Tuple3<Long, String, Long>> elasticsearchSink = new ElasticsearchSink<>(params.toMap(), transportNodes, new ESRequest());
languageCounts.addSink(elasticsearchSink).name("ElasticSearch2 Sink");
// word-count on the tweet stream
DataStream<Tuple2<Date, List<Tuple2<String, Long>>>> topWordCount = tweetsWithLang
// get text from tweets
.map(tweet -> tweet.get("text").asText()).name("Get text from Tweets")
// split text into (word, 1) tuples
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
String[] splits = s.split(" ");
for (String sp : splits) {
collector.collect(new Tuple2<>(sp, 1L));
}
}
}).name("Tokenize words")
// group by word
.keyBy(0)
// build 1 min windows, compute every 10 seconds --> count word frequency
.timeWindow(Time.minutes(1L), Time.seconds(10L)).apply(new WordCountingWindow()).name("Count word frequency (1 min, 10 sec sliding window)")
// build top n every 10 seconds
.timeWindowAll(Time.seconds(10L)).apply(new TopNWords(10)).name("TopN Window (10s)");
// write top Ns to Kafka topic
topWordCount.addSink(new FlinkKafkaProducer09<>(params.getRequired("wc-topic"), new ListSerSchema(), params.getProperties())).name("Write topN to Kafka");
env.execute("Streaming ETL");
}
开发者ID:rmetzger,项目名称:flink-streaming-etl,代码行数:82,代码来源:StreamingETL.java
示例3: createJobGraph
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
private JobGraph createJobGraph(ExecutionMode mode) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(RestartStrategies.noRestart());
env.setStateBackend(new MemoryStateBackend());
switch (mode) {
case MIGRATE:
createMigrationJob(env);
break;
case RESTORE:
createRestoredJob(env);
break;
}
return StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
}
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:AbstractOperatorRestoreTestBase.java
示例4: createJobGraph
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
/**
* Creates a streaming JobGraph from the StreamEnvironment.
*/
private JobGraph createJobGraph(
int parallelism,
int numberOfRetries,
long restartDelay) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.disableOperatorChaining();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(numberOfRetries, restartDelay));
env.getConfig().disableSysoutLogging();
DataStream<Integer> stream = env
.addSource(new InfiniteTestSource())
.shuffle()
.map(new StatefulCounter());
stream.addSink(new DiscardingSink<Integer>());
return env.getStreamGraph().getJobGraph();
}
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:SavepointITCase.java
示例5: runCheckpointedProgram
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
/**
* Runs the following program the test program defined in {@link #testProgram(StreamExecutionEnvironment)}
* followed by the checks in {@link #postSubmit}.
*/
@Test
public void runCheckpointedProgram() throws Exception {
try {
TestStreamEnvironment env = new TestStreamEnvironment(cluster, PARALLELISM);
env.setParallelism(PARALLELISM);
env.enableCheckpointing(500);
env.getConfig().disableSysoutLogging();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
testProgram(env);
TestUtils.tryExecute(env, "Fault Tolerance Test");
postSubmit();
}
catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:StreamFaultToleranceTestBase.java
示例6: main
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
// define the dataflow
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1000));
env.readFileStream("input/", 60000, FileMonitoringFunction.WatchType.ONLY_NEW_FILES)
.addSink(new DiscardingSink<String>());
// generate a job graph
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
File jobGraphFile = new File(params.get("output", "job.graph"));
try (FileOutputStream output = new FileOutputStream(jobGraphFile);
ObjectOutputStream obOutput = new ObjectOutputStream(output)){
obOutput.writeObject(jobGraph);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:StreamingNoop.java
示例7: main
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
if (parameterTool.getNumberOfParameters() < 2) {
System.out.println("Missing parameters!");
System.out.println("Usage: Kafka --topic <topic> --bootstrap.servers <kafka brokers>");
return;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
// very simple data generator
DataStream<String> messageStream = env.addSource(new SourceFunction<String>() {
private static final long serialVersionUID = 6369260445318862378L;
public boolean running = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
long i = 0;
while (this.running) {
ctx.collect("Element - " + i++);
Thread.sleep(500);
}
}
@Override
public void cancel() {
running = false;
}
});
// write data into Kafka
messageStream.addSink(new FlinkKafkaProducer08<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
env.execute("Write into Kafka example");
}
开发者ID:axbaretto,项目名称:flink,代码行数:38,代码来源:WriteIntoKafka.java
示例8: getSimpleJob
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
private static JobGraph getSimpleJob() throws IOException {
JobVertex task = new JobVertex("Test task");
task.setParallelism(1);
task.setMaxParallelism(1);
task.setInvokableClass(NoOpInvokable.class);
JobGraph jg = new JobGraph(new JobID(), "Test Job", task);
jg.setAllowQueuedScheduling(true);
jg.setScheduleMode(ScheduleMode.EAGER);
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
jg.setExecutionConfig(executionConfig);
return jg;
}
开发者ID:axbaretto,项目名称:flink,代码行数:17,代码来源:MiniClusterITCase.java
示例9: testAutomaticRestartingWhenCheckpointing
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
/**
* Tests that in a streaming use case where checkpointing is enabled, a
* fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart
* strategy has been specified.
*/
@Test
public void testAutomaticRestartingWhenCheckpointing() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
env.fromElements(1).print();
StreamGraph graph = env.getStreamGraph();
JobGraph jobGraph = graph.getJobGraph();
RestartStrategies.RestartStrategyConfiguration restartStrategy =
jobGraph.getSerializedExecutionConfig().deserializeValue(getClass().getClassLoader()).getRestartStrategy();
Assert.assertNotNull(restartStrategy);
Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration);
Assert.assertEquals(Integer.MAX_VALUE, ((RestartStrategies.FixedDelayRestartStrategyConfiguration) restartStrategy).getRestartAttempts());
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:RestartStrategyTest.java
示例10: testNoRestartingWhenCheckpointingAndExplicitExecutionRetriesZero
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
/**
* Checks that in a streaming use case where checkpointing is enabled and the number
* of execution retries is set to 0, restarting is deactivated.
*/
@Test
public void testNoRestartingWhenCheckpointingAndExplicitExecutionRetriesZero() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
env.setNumberOfExecutionRetries(0);
env.fromElements(1).print();
StreamGraph graph = env.getStreamGraph();
JobGraph jobGraph = graph.getJobGraph();
RestartStrategies.RestartStrategyConfiguration restartStrategy =
jobGraph.getSerializedExecutionConfig().deserializeValue(getClass().getClassLoader()).getRestartStrategy();
Assert.assertNotNull(restartStrategy);
Assert.assertTrue(restartStrategy instanceof RestartStrategies.NoRestartStrategyConfiguration);
}
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:RestartStrategyTest.java
示例11: testFixedRestartingWhenCheckpointingAndExplicitExecutionRetriesNonZero
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
/**
* Checks that in a streaming use case where checkpointing is enabled and the number
* of execution retries is set to 42 and the delay to 1337, fixed delay restarting is used.
*/
@Test
public void testFixedRestartingWhenCheckpointingAndExplicitExecutionRetriesNonZero() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
env.setNumberOfExecutionRetries(42);
env.getConfig().setExecutionRetryDelay(1337);
env.fromElements(1).print();
StreamGraph graph = env.getStreamGraph();
JobGraph jobGraph = graph.getJobGraph();
RestartStrategies.RestartStrategyConfiguration restartStrategy =
jobGraph.getSerializedExecutionConfig().deserializeValue(getClass().getClassLoader()).getRestartStrategy();
Assert.assertNotNull(restartStrategy);
Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration);
Assert.assertEquals(42, ((RestartStrategies.FixedDelayRestartStrategyConfiguration) restartStrategy).getRestartAttempts());
Assert.assertEquals(1337, ((RestartStrategies.FixedDelayRestartStrategyConfiguration) restartStrategy).getDelayBetweenAttemptsInterval().toMilliseconds());
}
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:RestartStrategyTest.java
示例12: main
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
env.setStateBackend(new FsStateBackend("file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend"));
CassandraSink<Tuple2<String, Integer>> sink = CassandraSink.addSink(env.addSource(new MySource()))
.setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
.enableWriteAheadLog()
.setClusterBuilder(new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
})
.build();
sink.name("Cassandra Sink").disableChaining().setParallelism(1).uid("hello");
env.execute();
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:CassandraTupleWriteAheadSinkExample.java
示例13: StreamExecutionEnvBuilder
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
public StreamExecutionEnvBuilder() throws IOException {
// setup the environment with default values
// set up streaming execution environment
env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure event-time characteristics
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// enable check pointing every 3 minutes
env.enableCheckpointing(10 * 60 * 1000);
// generate a Watermark every second
setAutoWatermarkInterval(1000);
env.setBufferTimeout(1000);
env.getConfig().setExecutionMode(ExecutionMode.PIPELINED);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(DEFAULT_NUMBER_JOB_RESTART,
Time.of(10, TimeUnit.SECONDS)));
}
开发者ID:ehabqadah,项目名称:in-situ-processing-datAcron,代码行数:16,代码来源:StreamExecutionEnvBuilder.java
示例14: main
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
// Read parameters from command line
final ParameterTool params = ParameterTool.fromArgs(args);
if(params.getNumberOfParameters() < 4) {
System.out.println("\nUsage: FlinkReadKafka --read-topic <topic> --write-topic <topic> --bootstrap.servers <kafka brokers> --group.id <groupid>");
return;
}
// setup streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(300000); // 300 seconds
env.getConfig().setGlobalJobParameters(params);
DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer010<>(
params.getRequired("read-topic"),
new SimpleStringSchema(),
params.getProperties())).name("Read from Kafka");
// setup table environment
StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(env);
// Write JSON payload back to Kafka topic
messageStream.addSink(new FlinkKafkaProducer010<>(
params.getRequired("write-topic"),
new SimpleStringSchema(),
params.getProperties())).name("Write To Kafka");
env.execute("FlinkReadWriteKafka");
}
开发者ID:kgorman,项目名称:TrafficAnalyzer,代码行数:35,代码来源:FlinkReadWriteKafka.java
示例15: exactlyOnceWriteSimulator
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
public void exactlyOnceWriteSimulator(final StreamId outStreamId, final StreamUtils streamUtils, int numElements) throws Exception {
final int checkpointInterval = 100;
final int restartAttempts = 1;
final long delayBetweenAttempts = 0L;
//30 sec timeout for all
final long txTimeout = 30 * 1000;
final long txTimeoutMax = 30 * 1000;
final long txTimeoutGracePeriod = 30 * 1000;
final String jobName = "ExactlyOnceSimulator";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.enableCheckpointing(checkpointInterval);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts, delayBetweenAttempts));
// Pravega Writer
FlinkPravegaWriter<Integer> pravegaExactlyOnceWriter = streamUtils.newExactlyOnceWriter(outStreamId,
Integer.class, new IdentityRouter<>());
env
.addSource(new IntegerCounterSourceGenerator(numElements))
.map(new FailingIdentityMapper<>(numElements / parallelism / 2))
.rebalance()
.addSink(pravegaExactlyOnceWriter);
env.execute(jobName);
}
开发者ID:pravega,项目名称:nautilus-samples,代码行数:33,代码来源:EventCounterApp.java
示例16: standardReadWriteSimulator
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
public void standardReadWriteSimulator(final StreamId inStreamId, final StreamId outStreamId, final StreamUtils streamUtils, int numElements) throws Exception {
final int checkpointInterval = 100;
final int taskFailureRestartAttempts = 1;
final long delayBetweenRestartAttempts = 0L;
final long startTime = 0L;
final String jobName = "standardReadWriteSimulator";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.enableCheckpointing(checkpointInterval);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(taskFailureRestartAttempts, delayBetweenRestartAttempts));
// the Pravega reader
final FlinkPravegaReader<Integer> pravegaSource = streamUtils.getFlinkPravegaParams().newReader(inStreamId, startTime, Integer.class);
// Pravega Writer
FlinkPravegaWriter<Integer> pravegaWriter = streamUtils.getFlinkPravegaParams().newWriter(outStreamId, Integer.class, new IdentityRouter<>());
pravegaWriter.setPravegaWriterMode(PravegaWriterMode.ATLEAST_ONCE);
DataStream<Integer> stream = env.addSource(pravegaSource).map(new IdentityMapper<>());
stream.addSink(pravegaWriter);
stream.addSink(new IntSequenceExactlyOnceValidator(numElements));
env.execute(jobName);
}
开发者ID:pravega,项目名称:nautilus-samples,代码行数:30,代码来源:EventCounterApp.java
示例17: main
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
// Parse command line parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.getRequired("host");
int port = Integer.valueOf(parameterTool.getRequired("port"));
// Setup the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
env.setParallelism(1);
// Stream of updates to subscriptions, partitioned by tweetId, read from socket
DataStream<TweetSubscription> filterUpdateStream = env.socketTextStream(host, port)
.map(stringToTweetSubscription())
.keyBy(TweetSubscription.getKeySelector());
// TweetImpression stream, partitioned by tweetId
DataStream<TweetImpression> tweetStream = env.addSource(new TweetSourceFunction(false), "TweetImpression Source")
.keyBy(TweetImpression.getKeySelector());
// Run the tweet impressions past the filters and emit those that customers have requested
DataStream<CustomerImpression> filteredStream = tweetStream
.connect(filterUpdateStream)
.flatMap(new TweetSubscriptionFilterFunction());
// Create a seperate sink for each customer
DataStreamSink<CustomerImpression>[] customerSinks = setupCustomerSinks(filteredStream);
// Run it
env.execute();
}
开发者ID:jgrier,项目名称:FilteringExample,代码行数:34,代码来源:TweetImpressionFilteringJob.java
示例18: testCheckpointing1
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
/**
* Test the checkpoint behavior of the HTM operator.
* @throws Exception
*/
@Test
public void testCheckpointing1() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
DataStream<TestHarness.DayDemoRecord> source = env
.addSource(new TestHarness.DayDemoRecordSourceFunction(5, true))
.broadcast();
DataStream<Tuple3<Integer,Double,Double>> result =
HTM.learn(source, new TestHarness.DayDemoNetworkFactory())
.select(new InferenceSelectFunction<TestHarness.DayDemoRecord, Tuple3<Integer,Double,Double>>() {
@Override
public Tuple3<Integer,Double,Double> select(Tuple2<TestHarness.DayDemoRecord,NetworkInference> inference) throws Exception {
return new Tuple3<>(
inference.f0.dayOfWeek,
(Double) inference.f1.getClassification("dayOfWeek").getMostProbableValue(1),
inference.f1.getAnomalyScore());
}
});
result.print();
env.execute();
}
开发者ID:htm-community,项目名称:flink-htm,代码行数:32,代码来源:HTMIntegrationTest.java
示例19: testCheckpointWithKeyedStream
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
/**
* Test the checkpoint behavior of the HTM operator.
* @throws Exception
*/
@Test
public void testCheckpointWithKeyedStream() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
DataStream<TestHarness.DayDemoRecord> source = env
.addSource(new TestHarness.DayDemoRecordSourceFunction(2, true))
.keyBy("dayOfWeek");
DataStream<Tuple3<Integer,Double,Double>> result =
HTM.learn(source, new TestHarness.DayDemoNetworkFactory())
.select(new InferenceSelectFunction<TestHarness.DayDemoRecord, Tuple3<Integer,Double,Double>>() {
@Override
public Tuple3<Integer,Double,Double> select(Tuple2<TestHarness.DayDemoRecord,NetworkInference> inference) throws Exception {
return new Tuple3<>(
inference.f0.dayOfWeek,
(Double) inference.f1.getClassification("dayOfWeek").getMostProbableValue(1),
inference.f1.getAnomalyScore());
}
});
result.print();
env.execute();
}
开发者ID:htm-community,项目名称:flink-htm,代码行数:32,代码来源:HTMIntegrationTest.java
示例20: main
import org.apache.flink.api.common.restartstrategy.RestartStrategies; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
if (parameterTool.getNumberOfParameters() < 4) {
System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " +
"--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>");
return;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer08<>(
parameterTool.getRequired("topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));
// write kafka stream to standard out.
messageStream.print();
env.execute("Read from Kafka example");
}
开发者ID:axbaretto,项目名称:flink,代码行数:28,代码来源:ReadFromKafka.java
注:本文中的org.apache.flink.api.common.restartstrategy.RestartStrategies类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论