本文整理汇总了Java中org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09类的典型用法代码示例。如果您正苦于以下问题:Java FlinkKafkaProducer09类的具体用法?Java FlinkKafkaProducer09怎么用?Java FlinkKafkaProducer09使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
FlinkKafkaProducer09类属于org.apache.flink.streaming.connectors.kafka包,在下文中一共展示了FlinkKafkaProducer09类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// generate the sawtooth wave datapoints
// a total of 40 steps, with 100 ms interval between each point = 4 sec period
DataStream<UnkeyedDataPoint> originalSawTooth =
env.addSource(new SawtoothSource(100, 40, 1));
originalSawTooth.addSink(
new FlinkKafkaProducer09<>(
KAFKA_BROKER_SERVER,
SAWTOOTH_TOPIC,
new UnKeyedDataPointSchema()
)
);
env.execute();
}
开发者ID:flink-taiwan,项目名称:jcconf2016-workshop,代码行数:22,代码来源:ProduceSawtoothToKafka.java
示例2: create
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; //导入依赖的package包/类
/**
* Create a {@link FlinkKafkaProducer09} depending on the provided settings
* @param version
* @return
*/
public FlinkKafkaProducer09<T> create() {
/////////////////////////////////////////////////////////////////////////
// validate provided input
if(StringUtils.isBlank(this.topic))
throw new IllegalArgumentException("Missing required topic");
if(StringUtils.isBlank(this.brokerList))
throw new IllegalArgumentException("Missing required broker list");
/////////////////////////////////////////////////////////////////////////
if(!this.properties.isEmpty()) {
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerList);
for(Object k : this.properties.keySet())
producerProperties.put(k, this.properties.get(k));
return new FlinkKafkaProducer09<>(this.topic, this.serializationSchema, producerProperties);
}
return new FlinkKafkaProducer09<T>(this.brokerList, this.topic, this.serializationSchema);
}
开发者ID:ottogroup,项目名称:flink-operator-library,代码行数:26,代码来源:KafkaProducerBuilder.java
示例3: main
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
// parse arguments
ParameterTool params = ParameterTool.fromPropertiesFile(args[0]);
// create streaming environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable event time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// enable fault-tolerance
env.enableCheckpointing(1000);
// enable restarts
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, 500L));
env.setStateBackend(new FsStateBackend("file:///home/robert/flink-workdir/flink-streaming-etl/state-backend"));
// run each operator separately
env.disableOperatorChaining();
// get data from Kafka
Properties kParams = params.getProperties();
kParams.setProperty("group.id", UUID.randomUUID().toString());
DataStream<ObjectNode> inputStream = env.addSource(new FlinkKafkaConsumer09<>(params.getRequired("topic"), new JSONDeserializationSchema(), kParams)).name("Kafka 0.9 Source")
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ObjectNode>(Time.minutes(1L)) {
@Override
public long extractTimestamp(ObjectNode jsonNodes) {
return jsonNodes.get("timestamp_ms").asLong();
}
}).name("Timestamp extractor");
// filter out records without lang field
DataStream<ObjectNode> tweetsWithLang = inputStream.filter(jsonNode -> jsonNode.has("user") && jsonNode.get("user").has("lang")).name("Filter records without 'lang' field");
// select only lang = "en" tweets
DataStream<ObjectNode> englishTweets = tweetsWithLang.filter(jsonNode -> jsonNode.get("user").get("lang").asText().equals("en")).name("Select 'lang'=en tweets");
// write to file system
RollingSink<ObjectNode> rollingSink = new RollingSink<>(params.get("sinkPath", "/home/robert/flink-workdir/flink-streaming-etl/rolling-sink"));
rollingSink.setBucketer(new DateTimeBucketer("yyyy-MM-dd-HH-mm")); // do a bucket for each minute
englishTweets.addSink(rollingSink).name("Rolling FileSystem Sink");
// build aggregates (count per language) using window (10 seconds tumbling):
DataStream<Tuple3<Long, String, Long>> languageCounts = tweetsWithLang.keyBy(jsonNode -> jsonNode.get("user").get("lang").asText())
.timeWindow(Time.seconds(10))
.apply(new Tuple3<>(0L, "", 0L), new JsonFoldCounter(), new CountEmitter()).name("Count per Langauage (10 seconds tumbling)");
// write window aggregate to ElasticSearch
List<InetSocketAddress> transportNodes = ImmutableList.of(new InetSocketAddress(InetAddress.getByName("localhost"), 9300));
ElasticsearchSink<Tuple3<Long, String, Long>> elasticsearchSink = new ElasticsearchSink<>(params.toMap(), transportNodes, new ESRequest());
languageCounts.addSink(elasticsearchSink).name("ElasticSearch2 Sink");
// word-count on the tweet stream
DataStream<Tuple2<Date, List<Tuple2<String, Long>>>> topWordCount = tweetsWithLang
// get text from tweets
.map(tweet -> tweet.get("text").asText()).name("Get text from Tweets")
// split text into (word, 1) tuples
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
String[] splits = s.split(" ");
for (String sp : splits) {
collector.collect(new Tuple2<>(sp, 1L));
}
}
}).name("Tokenize words")
// group by word
.keyBy(0)
// build 1 min windows, compute every 10 seconds --> count word frequency
.timeWindow(Time.minutes(1L), Time.seconds(10L)).apply(new WordCountingWindow()).name("Count word frequency (1 min, 10 sec sliding window)")
// build top n every 10 seconds
.timeWindowAll(Time.seconds(10L)).apply(new TopNWords(10)).name("TopN Window (10s)");
// write top Ns to Kafka topic
topWordCount.addSink(new FlinkKafkaProducer09<>(params.getRequired("wc-topic"), new ListSerSchema(), params.getProperties())).name("Write topN to Kafka");
env.execute("Streaming ETL");
}
开发者ID:rmetzger,项目名称:flink-streaming-etl,代码行数:82,代码来源:StreamingETL.java
示例4: main
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
final String input = "C:\\dev\\github\\clojured-taxi-rides\\resources\\datasets\\nycTaxiRides.gz";
final int maxEventDelay = 60; // events are out of order by max 60 seconds
final int servingSpeedFactor = 600; // events of 10 minute are served in 1 second
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// start the data generator
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor));
DataStream<TaxiRide> filteredRides = rides
// filter out rides that do not start or stop in NYC
.filter(new NYCFilter());
// write the filtered data to a Kafka sink
filteredRides.addSink(new FlinkKafkaProducer09<>(
LOCAL_KAFKA_BROKER,
CLEANSED_RIDES_TOPIC,
new TaxiRideSchema()));
// run the cleansing pipeline
env.execute("Taxi Ride Cleansing");
}
开发者ID:thr0n,项目名称:clojured-taxi-rides,代码行数:27,代码来源:RideCleansingToKafka.java
示例5: main
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
DataStream<String> stream = env.addSource(new SimpleStringGenerator());
stream.addSink(new FlinkKafkaProducer09<>("flink-demo", new SimpleStringSchema(), properties));
env.execute();
}
开发者ID:tgrall,项目名称:kafka-flink-101,代码行数:12,代码来源:WriteToKafka.java
示例6: main
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool params = ParameterTool.fromPropertiesFile(args[0]);
DataStream<String> twitterStreamString = env.addSource(new TwitterSource(params.getProperties()));
DataStream<String> filteredStream = twitterStreamString.flatMap(new ParseJson());
filteredStream.flatMap(new ThroughputLogger(5000L)).setParallelism(1);
filteredStream.addSink(new FlinkKafkaProducer09<>("twitter", new SimpleStringSchema(), params.getProperties()));
// execute program
env.execute("Ingest data from Twitter to Kafka");
}
开发者ID:rmetzger,项目名称:flink-streaming-etl,代码行数:15,代码来源:TwitterIntoKafka.java
示例7: createKafkaProducer
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; //导入依赖的package包/类
@Override
protected FlinkKafkaProducerBase<Tuple2<Boolean, Row>> createKafkaProducer(String topic, Properties properties, SerializationSchema<Tuple2<Boolean, Row>> serializationSchema, FlinkKafkaPartitioner<Tuple2<Boolean, Row>> partitioner) {
return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
}
开发者ID:datafibers-community,项目名称:df_data_service,代码行数:5,代码来源:Kafka09AvroTableSink.java
示例8: testFlinkSQL
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; //导入依赖的package包/类
public static void testFlinkSQL() {
LOG.info("Only Unit Testing Function is enabled");
String resultFile = "/home/vagrant/test.txt";
try {
String jarPath = DFInitService.class.getProtectionDomain().getCodeSource().getLocation().getPath();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 6123, jarPath)
.setParallelism(1);
String kafkaTopic = "finance";
String kafkaTopic_stage = "df_trans_stage_finance";
String kafkaTopic_out = "df_trans_out_finance";
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "consumer3");
// Internal covert Json String to Json - Begin
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer09<>(kafkaTopic, new SimpleStringSchema(), properties));
stream.map(new MapFunction<String, String>() {
@Override
public String map(String jsonString) throws Exception {
return jsonString.replaceAll("\\\\", "").replace("\"{", "{").replace("}\"","}");
}
}).addSink(new FlinkKafkaProducer09<String>("localhost:9092", kafkaTopic_stage, new SimpleStringSchema()));
// Internal covert Json String to Json - End
String[] fieldNames = new String[] {"name"};
Class<?>[] fieldTypes = new Class<?>[] {String.class};
Kafka09AvroTableSource kafkaTableSource = new Kafka09AvroTableSource(
kafkaTopic_stage,
properties,
fieldNames,
fieldTypes);
//kafkaTableSource.setFailOnMissingField(true);
tableEnv.registerTableSource("Orders", kafkaTableSource);
//Table result = tableEnv.sql("SELECT STREAM name FROM Orders");
Table result = tableEnv.sql("SELECT name FROM Orders");
Files.deleteIfExists(Paths.get(resultFile));
// create a TableSink
TableSink sink = new CsvTableSink(resultFile, "|");
// write the result Table to the TableSink
result.writeToSink(sink);
env.execute("FlinkConsumer");
} catch (Exception e) {
e.printStackTrace();
}
}
开发者ID:datafibers-community,项目名称:df_data_service,代码行数:63,代码来源:UnitTestSuiteFlink.java
注:本文中的org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论