• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java FlinkKafkaProducer09类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09的典型用法代码示例。如果您正苦于以下问题:Java FlinkKafkaProducer09类的具体用法?Java FlinkKafkaProducer09怎么用?Java FlinkKafkaProducer09使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



FlinkKafkaProducer09类属于org.apache.flink.streaming.connectors.kafka包,在下文中一共展示了FlinkKafkaProducer09类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

		final StreamExecutionEnvironment env =
				StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(1);

		// generate the sawtooth wave datapoints
		// a total of 40 steps, with 100 ms interval between each point = 4 sec period
		DataStream<UnkeyedDataPoint> originalSawTooth =
				env.addSource(new SawtoothSource(100, 40, 1));

		originalSawTooth.addSink(
				new FlinkKafkaProducer09<>(
						KAFKA_BROKER_SERVER,
						SAWTOOTH_TOPIC,
						new UnKeyedDataPointSchema()
				)
		);

		env.execute();
	}
 
开发者ID:flink-taiwan,项目名称:jcconf2016-workshop,代码行数:22,代码来源:ProduceSawtoothToKafka.java


示例2: create

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; //导入依赖的package包/类
/**
 * Create a {@link FlinkKafkaProducer09} depending on the provided settings
 * @param version
 * @return
 */
public FlinkKafkaProducer09<T> create() {
	
	/////////////////////////////////////////////////////////////////////////
	// validate provided input
	if(StringUtils.isBlank(this.topic))
		throw new IllegalArgumentException("Missing required topic");
	if(StringUtils.isBlank(this.brokerList))
		throw new IllegalArgumentException("Missing required broker list");
	/////////////////////////////////////////////////////////////////////////

	if(!this.properties.isEmpty()) {
		Properties producerProperties = new Properties();
		producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerList);

		for(Object k : this.properties.keySet())
			producerProperties.put(k, this.properties.get(k));			
		return new FlinkKafkaProducer09<>(this.topic, this.serializationSchema, producerProperties);
	}
	return new FlinkKafkaProducer09<T>(this.brokerList, this.topic, this.serializationSchema);		
}
 
开发者ID:ottogroup,项目名称:flink-operator-library,代码行数:26,代码来源:KafkaProducerBuilder.java


示例3: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	// parse arguments
	ParameterTool params = ParameterTool.fromPropertiesFile(args[0]);

	// create streaming environment
	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	// enable event time processing
	env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

	// enable fault-tolerance
	env.enableCheckpointing(1000);

	// enable restarts
	env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, 500L));

	env.setStateBackend(new FsStateBackend("file:///home/robert/flink-workdir/flink-streaming-etl/state-backend"));

	// run each operator separately
	env.disableOperatorChaining();

	// get data from Kafka
	Properties kParams = params.getProperties();
	kParams.setProperty("group.id", UUID.randomUUID().toString());
	DataStream<ObjectNode> inputStream = env.addSource(new FlinkKafkaConsumer09<>(params.getRequired("topic"), new JSONDeserializationSchema(), kParams)).name("Kafka 0.9 Source")
		.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ObjectNode>(Time.minutes(1L)) {
			@Override
			public long extractTimestamp(ObjectNode jsonNodes) {
				return jsonNodes.get("timestamp_ms").asLong();
			}
		}).name("Timestamp extractor");

	// filter out records without lang field
	DataStream<ObjectNode> tweetsWithLang = inputStream.filter(jsonNode -> jsonNode.has("user") && jsonNode.get("user").has("lang")).name("Filter records without 'lang' field");

	// select only lang = "en" tweets
	DataStream<ObjectNode> englishTweets = tweetsWithLang.filter(jsonNode -> jsonNode.get("user").get("lang").asText().equals("en")).name("Select 'lang'=en tweets");

	// write to file system
	RollingSink<ObjectNode> rollingSink = new RollingSink<>(params.get("sinkPath", "/home/robert/flink-workdir/flink-streaming-etl/rolling-sink"));
	rollingSink.setBucketer(new DateTimeBucketer("yyyy-MM-dd-HH-mm")); // do a bucket for each minute
	englishTweets.addSink(rollingSink).name("Rolling FileSystem Sink");

	// build aggregates (count per language) using window (10 seconds tumbling):
	DataStream<Tuple3<Long, String, Long>> languageCounts = tweetsWithLang.keyBy(jsonNode -> jsonNode.get("user").get("lang").asText())
		.timeWindow(Time.seconds(10))
		.apply(new Tuple3<>(0L, "", 0L), new JsonFoldCounter(), new CountEmitter()).name("Count per Langauage (10 seconds tumbling)");

	// write window aggregate to ElasticSearch
	List<InetSocketAddress> transportNodes = ImmutableList.of(new InetSocketAddress(InetAddress.getByName("localhost"), 9300));
	ElasticsearchSink<Tuple3<Long, String, Long>> elasticsearchSink = new ElasticsearchSink<>(params.toMap(), transportNodes, new ESRequest());

	languageCounts.addSink(elasticsearchSink).name("ElasticSearch2 Sink");

	// word-count on the tweet stream
	DataStream<Tuple2<Date, List<Tuple2<String, Long>>>> topWordCount = tweetsWithLang
		// get text from tweets
		.map(tweet -> tweet.get("text").asText()).name("Get text from Tweets")
		// split text into (word, 1) tuples
		.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
			@Override
			public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
				String[] splits = s.split(" ");
				for (String sp : splits) {
					collector.collect(new Tuple2<>(sp, 1L));
				}
			}
		}).name("Tokenize words")
		// group by word
		.keyBy(0)
		// build 1 min windows, compute every 10 seconds --> count word frequency
		.timeWindow(Time.minutes(1L), Time.seconds(10L)).apply(new WordCountingWindow()).name("Count word frequency (1 min, 10 sec sliding window)")
		// build top n every 10 seconds
		.timeWindowAll(Time.seconds(10L)).apply(new TopNWords(10)).name("TopN Window (10s)");

	// write top Ns to Kafka topic
	topWordCount.addSink(new FlinkKafkaProducer09<>(params.getRequired("wc-topic"), new ListSerSchema(), params.getProperties())).name("Write topN to Kafka");

	env.execute("Streaming ETL");

}
 
开发者ID:rmetzger,项目名称:flink-streaming-etl,代码行数:82,代码来源:StreamingETL.java


示例4: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
    final String input = "C:\\dev\\github\\clojured-taxi-rides\\resources\\datasets\\nycTaxiRides.gz";

    final int maxEventDelay = 60;       // events are out of order by max 60 seconds
    final int servingSpeedFactor = 600; // events of 10 minute are served in 1 second

    // set up streaming execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    // start the data generator
    DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor));

    DataStream<TaxiRide> filteredRides = rides
            // filter out rides that do not start or stop in NYC
            .filter(new NYCFilter());

    // write the filtered data to a Kafka sink
    filteredRides.addSink(new FlinkKafkaProducer09<>(
            LOCAL_KAFKA_BROKER,
            CLEANSED_RIDES_TOPIC,
            new TaxiRideSchema()));

    // run the cleansing pipeline
    env.execute("Taxi Ride Cleansing");
}
 
开发者ID:thr0n,项目名称:clojured-taxi-rides,代码行数:27,代码来源:RideCleansingToKafka.java


示例5: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  Properties properties = new Properties();
  properties.setProperty("bootstrap.servers", "localhost:9092");

  DataStream<String> stream = env.addSource(new SimpleStringGenerator());
  stream.addSink(new FlinkKafkaProducer09<>("flink-demo", new SimpleStringSchema(), properties));

  env.execute();
}
 
开发者ID:tgrall,项目名称:kafka-flink-101,代码行数:12,代码来源:WriteToKafka.java


示例6: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	// set up the streaming execution environment
	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	ParameterTool params = ParameterTool.fromPropertiesFile(args[0]);
	DataStream<String> twitterStreamString = env.addSource(new TwitterSource(params.getProperties()));
	DataStream<String> filteredStream = twitterStreamString.flatMap(new ParseJson());
	filteredStream.flatMap(new ThroughputLogger(5000L)).setParallelism(1);

	filteredStream.addSink(new FlinkKafkaProducer09<>("twitter", new SimpleStringSchema(), params.getProperties()));

	// execute program
	env.execute("Ingest data from Twitter to Kafka");
}
 
开发者ID:rmetzger,项目名称:flink-streaming-etl,代码行数:15,代码来源:TwitterIntoKafka.java


示例7: createKafkaProducer

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; //导入依赖的package包/类
@Override
protected FlinkKafkaProducerBase<Tuple2<Boolean, Row>> createKafkaProducer(String topic, Properties properties, SerializationSchema<Tuple2<Boolean, Row>> serializationSchema, FlinkKafkaPartitioner<Tuple2<Boolean, Row>> partitioner) {
	return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
}
 
开发者ID:datafibers-community,项目名称:df_data_service,代码行数:5,代码来源:Kafka09AvroTableSink.java


示例8: testFlinkSQL

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; //导入依赖的package包/类
public static void testFlinkSQL() {

        LOG.info("Only Unit Testing Function is enabled");
        String resultFile = "/home/vagrant/test.txt";

        try {

            String jarPath = DFInitService.class.getProtectionDomain().getCodeSource().getLocation().getPath();
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 6123, jarPath)
                    .setParallelism(1);
            String kafkaTopic = "finance";
            String kafkaTopic_stage = "df_trans_stage_finance";
            String kafkaTopic_out = "df_trans_out_finance";



            StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:9092");
            properties.setProperty("group.id", "consumer3");

            // Internal covert Json String to Json - Begin
            DataStream<String> stream = env
                    .addSource(new FlinkKafkaConsumer09<>(kafkaTopic, new SimpleStringSchema(), properties));

            stream.map(new MapFunction<String, String>() {
                @Override
                public String map(String jsonString) throws Exception {
                    return jsonString.replaceAll("\\\\", "").replace("\"{", "{").replace("}\"","}");
                }
            }).addSink(new FlinkKafkaProducer09<String>("localhost:9092", kafkaTopic_stage, new SimpleStringSchema()));
            // Internal covert Json String to Json - End

            String[] fieldNames =  new String[] {"name"};
            Class<?>[] fieldTypes = new Class<?>[] {String.class};

            Kafka09AvroTableSource kafkaTableSource = new Kafka09AvroTableSource(
                    kafkaTopic_stage,
                    properties,
                    fieldNames,
                    fieldTypes);

            //kafkaTableSource.setFailOnMissingField(true);

            tableEnv.registerTableSource("Orders", kafkaTableSource);

            //Table result = tableEnv.sql("SELECT STREAM name FROM Orders");
            Table result = tableEnv.sql("SELECT name FROM Orders");

            Files.deleteIfExists(Paths.get(resultFile));

            // create a TableSink
            TableSink sink = new CsvTableSink(resultFile, "|");
            // write the result Table to the TableSink
            result.writeToSink(sink);

            env.execute("FlinkConsumer");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
开发者ID:datafibers-community,项目名称:df_data_service,代码行数:63,代码来源:UnitTestSuiteFlink.java



注:本文中的org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Text2D类代码示例发布时间:2022-05-23
下一篇:
Java Scoreboard类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap