• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java FlinkKafkaConsumer08类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08的典型用法代码示例。如果您正苦于以下问题:Java FlinkKafkaConsumer08类的具体用法?Java FlinkKafkaConsumer08怎么用?Java FlinkKafkaConsumer08使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



FlinkKafkaConsumer08类属于org.apache.flink.streaming.connectors.kafka包,在下文中一共展示了FlinkKafkaConsumer08类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	Properties properties = new Properties();
	properties.setProperty("bootstrap.servers", "localhost:9092");
	properties.setProperty("zookeeper.connect", "localhost:2181");
	properties.setProperty("group.id", "test");
	properties.setProperty("auto.offset.reset", "latest");
	FlinkKafkaConsumer08<DeviceEvent> flinkKafkaConsumer08 = new FlinkKafkaConsumer08<>("device-data",
			new DeviceSchema(), properties);

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	DataStream<DeviceEvent> messageStream = env.addSource(flinkKafkaConsumer08);
	
	Map<String, String> config = new HashMap<>();
	config.put("cluster.name", "my-application");
	// This instructs the sink to emit after every element, otherwise they would be buffered
	config.put("bulk.flush.max.actions", "1");

	List<InetSocketAddress> transportAddresses = new ArrayList<>();
	transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));

	messageStream.addSink(new ElasticsearchSink<DeviceEvent>(config, transportAddresses, new ESSink()));
	env.execute();
}
 
开发者ID:PacktPublishing,项目名称:Practical-Real-time-Processing-and-Analytics,代码行数:24,代码来源:FlinkESConnector.java


示例2: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
    ParameterTool tool = ParameterTool.fromArgs(args);

    String topic = tool.getRequired("kafka.topic");

    Properties kafkaConsumerProps = new Properties();
    kafkaConsumerProps.setProperty("bootstrap.servers", tool.getRequired("kafkabroker"));
    kafkaConsumerProps.setProperty("group.id", tool.getRequired("kafka.groupId"));
    kafkaConsumerProps.setProperty("zookeeper.connect", tool.get("zookeeper.host", "localhost:2181"));
    kafkaConsumerProps.setProperty("auto.offset.reset", tool.getBoolean("from-beginning", false) ? "smallest" : "largest");

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> textStream = env
            .addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), kafkaConsumerProps));

    textStream.flatMap(new LineSplitter())
        .keyBy(0)
        .sum(1)
        .print();

    env.execute("WordCount from Kafka Example");
}
 
开发者ID:godatadriven,项目名称:flink-streaming-xke,代码行数:24,代码来源:KafkaStreamingWordCount.java


示例3: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; //导入依赖的package包/类
public static void main(String args[]) throws Exception {
	Properties properties = new Properties();
	properties.setProperty("bootstrap.servers", "localhost:9092");
	properties.setProperty("zookeeper.connect", "localhost:2181");
	properties.setProperty("group.id", "test");
	properties.setProperty("auto.offset.reset", "latest");  
	FlinkKafkaConsumer08<String> flinkKafkaConsumer08 = new FlinkKafkaConsumer08<>("flink-test",
			new SimpleStringSchema(), properties);

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	DataStream<String> messageStream = env.addSource(flinkKafkaConsumer08);

	// print() will write the contents of the stream to the TaskManager's
	// standard out stream
	// the rebelance call is causing a repartitioning of the data so that
	// all machines
	// see the messages (for example in cases when "num kafka partitions" <
	// "num flink operators"
	messageStream.rebalance().map(new MapFunction<String, String>() {
		private static final long serialVersionUID = -6867736771747690202L;

		@Override
		public String map(String value) throws Exception {
			return "Kafka and Flink says: " + value;
		}
	}).print();

	env.execute();
}
 
开发者ID:PacktPublishing,项目名称:Practical-Real-time-Processing-and-Analytics,代码行数:30,代码来源:FlinkKafkaSourceExample.java


示例4: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	Properties properties = new Properties();
	properties.setProperty("bootstrap.servers", "localhost:9092");
	properties.setProperty("zookeeper.connect", "localhost:2181");
	properties.setProperty("group.id", "test");
	properties.setProperty("auto.offset.reset", "latest");
	FlinkKafkaConsumer08<String> flinkKafkaConsumer08 = new FlinkKafkaConsumer08<>("device-data",
			new SimpleStringSchema(), properties);

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	DataStream<Tuple4<Long,Integer,Integer,Long>> messageStream = env.addSource(flinkKafkaConsumer08).map(new MapFunction<String, Tuple4<Long,Integer,Integer,Long>>() {

		private static final long serialVersionUID = 4723214570372887208L;

		@Override
		public Tuple4<Long,Integer,Integer,Long> map(String input) throws Exception {
			String[] inputSplits = input.split(",");
			return Tuple4.of(Long.parseLong(inputSplits[0]), Integer.parseInt(inputSplits[1]), Integer.parseInt(inputSplits[2]), Long.parseLong(inputSplits[3]));
		}
	});

	CassandraSink.addSink(messageStream).setQuery("INSERT INTO tdr.packet_tdr (phone_number, bin, bout, timestamp) values (?, ?, ? ,?);")
			.setClusterBuilder(new ClusterBuilder() {
				private static final long serialVersionUID = 1L;

				@Override
				public Cluster buildCluster(Cluster.Builder builder) {
					return builder.addContactPoint("127.0.0.1").build();
				}
			}).build();
	env.execute();
}
 
开发者ID:PacktPublishing,项目名称:Practical-Real-time-Processing-and-Analytics,代码行数:33,代码来源:FlinkCassandraConnector.java


示例5: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get CLI parameters
        ParameterTool parameters = ParameterTool.fromArgs(args);
        String topic = parameters.getRequired("topic");
        String groupId = parameters.get("group-id", "flink-kafka-consumer");
        String propertiesFile = parameters.getRequired("env");
        ParameterTool envProperties = ParameterTool.fromPropertiesFile(propertiesFile);
        String schemaRegistryUrl = envProperties.getRequired("registry_url");
        String bootstrapServers = envProperties.getRequired("brokers");
        String zookeeperConnect = envProperties.getRequired("zookeeper");

        // setup Kafka sink
        ConfluentAvroDeserializationSchema deserSchema = new ConfluentAvroDeserializationSchema(schemaRegistryUrl);
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", bootstrapServers);
        kafkaProps.setProperty("zookeeper.connect", zookeeperConnect);
        kafkaProps.setProperty("group.id", groupId);
        FlinkKafkaConsumer08<String> flinkKafkaConsumer = new FlinkKafkaConsumer08<String>(topic, deserSchema, kafkaProps);

        DataStream<String> kafkaStream = env.addSource(flinkKafkaConsumer);

        DataStream<Integer> counts = kafkaStream
                .map(new MapFunction<String, Integer>() {
                    public Integer map(String s) throws Exception {
                        return 1;
                    }
                })
                .timeWindowAll(Time.seconds(3))
                .sum(0);

        counts.print();

        env.execute("Flink Kafka Java Example");
    }
 
开发者ID:seanpquig,项目名称:flink-streaming-confluent,代码行数:38,代码来源:FlinkKafkaExample.java


示例6: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
    Properties kafkaConsumerProps = new Properties();
    kafkaConsumerProps.setProperty("bootstrap.servers", "localhost:9092");
    kafkaConsumerProps.setProperty("group.id", "test-consumer-group");
    kafkaConsumerProps.setProperty("zookeeper.connect", "localhost:2181");

    // set up the execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // Uses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK by Flink.
    DataStream<String> textStream = env
            .addSource(new FlinkKafkaConsumer08<>("yourTopic", new SimpleStringSchema(), kafkaConsumerProps));


    /*
     *
     * then, transform the resulting DataStream<String> using operations
     * like
     * 	.filter()
     * 	.flatMap()
     * 	.print()
     *
     * and many more.
     * Have a look at the programming guide for the Java API:
     *
     * https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html
     *
     */

    // execute program
    env.execute("Flink Streaming Java API Skeleton");
}
 
开发者ID:godatadriven,项目名称:flink-streaming-xke,代码行数:33,代码来源:StreamingSkeleton.java


示例7: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
    ParameterTool tool = ParameterTool.fromArgs(args);

    String topic = tool.getRequired("kafka.topic");

    Properties kafkaConsumerProps = new Properties();
    kafkaConsumerProps.setProperty("bootstrap.servers", tool.getRequired("kafkabroker"));
    kafkaConsumerProps.setProperty("group.id", tool.getRequired("kafka.groupId"));
    kafkaConsumerProps.setProperty("zookeeper.connect", tool.get("zookeeper.host", "localhost:2181"));
    kafkaConsumerProps.setProperty("auto.offset.reset", tool.getBoolean("from-beginning", false) ? "smallest" : "largest");

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

    DataStream<String> textStream = env
            .addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), kafkaConsumerProps));

    SlidingEventTimeWindows window = SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(30));

    textStream.flatMap(new LineSplitter())
        .keyBy(0)
        .sum(1)
        .windowAll(window)
        .maxBy(1)
        .writeAsText("file:///Users/abij/projects/tryouts/flink-streaming/flink-streaming-results.log", OVERWRITE);

    env.execute("SlidingWindow WordCount");
}
 
开发者ID:godatadriven,项目名称:flink-streaming-xke,代码行数:29,代码来源:SlidingWindowingWordCount.java


示例8: run

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; //导入依赖的package包/类
@Override
public void run() {
	try {
		result = FlinkKafkaConsumer08.getPartitionsForTopic(topics, properties);
	}
	catch (Throwable t) {
		this.error = t;
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:PartitionInfoFetcher.java


示例9: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	// parse input arguments
	final ParameterTool parameterTool = ParameterTool.fromArgs(args);

	if (parameterTool.getNumberOfParameters() < 4) {
		System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " +
				"--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>");
		return;
	}

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.getConfig().disableSysoutLogging();
	env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
	env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
	env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface

	DataStream<String> messageStream = env
			.addSource(new FlinkKafkaConsumer08<>(
					parameterTool.getRequired("topic"),
					new SimpleStringSchema(),
					parameterTool.getProperties()));

	// write kafka stream to standard out.
	messageStream.print();

	env.execute("Read from Kafka example");
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:28,代码来源:ReadFromKafka.java


示例10: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties props = new Properties();

    props.put("bootstrap.servers", "localhost:9092");
    props.put("zookeeper.connect", "localhost:2181");

    props.put("group.id", "test");

    props.put("auto.offset.reset", "smallest");
    props.put("partition.assignment.strategy", "range");

    DataStream<String> dataStream = env
            .addSource(new FlinkKafkaConsumer08<String>(
                    SparkStreamingConsumer.KAFKA_TOPIC,
                    new SimpleStringSchema(), props));

    dataStream
            .map(json -> {
                SparkStreamingConsumer.Message message = gson.fromJson(json, SparkStreamingConsumer.Message.class);

                // time delay emulation
                int count = 0;
                byte[] array = message.getUid().getBytes();
                for (int j = 0; j < SparkStreamingConsumer.delayFactor; j++) {
                    for (int i = 0; i < array.length; i++) {
                        if (array[0] == array[i]) count++;
                    }
                }

                return new DataTuple(json, message, System.currentTimeMillis(), 1L, count);
            }).returns(DataTuple.class)

            //.keyBy(x -> x.f1.getUid())
            .keyBy(x -> 1) // only one partition

            .timeWindow(Time.milliseconds(SparkStreamingConsumer.TIME))
            .reduce((x1, x2) -> new DataTuple(
                    x1.f0,
                    x1.f1,
                    x1.f2,
                    x1.f3 + x2.f3,
                    x1.f4))
            .map(x -> {
                return "***************************************************************************"
                        + "\nProcessing time: " + Long.toString(System.currentTimeMillis() - x.f2)
                        + "\nExpected time: " + Long.toString(SparkStreamingConsumer.TIME)
                        + "\nProcessed messages: " + Long.toString(x.f3)
                        + "\nMessage example: " + x.f0
                        + "\nRecovered json: " + x.f1
                        ;
            })
            .print();

    env.execute();
}
 
开发者ID:rssdev10,项目名称:spark-kafka-streaming,代码行数:58,代码来源:FlinkStreamingConsumer.java


示例11: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; //导入依赖的package包/类
public static void main(String[] args) throws Exception{
	
	// create execution environment
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(1);
	
	Properties properties = new Properties();
	properties.setProperty("bootstrap.servers", "localhost:9092");
	// only required for Kafka 0.8
	properties.setProperty("zookeeper.connect", "localhost:2181");
	properties.setProperty("group.id", "consumer3");
	
	
	
	DataStream<Tuple2<String, Float>> messageStream = env.
		   addSource(new FlinkKafkaConsumer08<>("appA", new SimpleStringSchema(), properties))
		   .flatMap(new StreamToTuple())
		   .keyBy(0)
              //.timeWindow(Time.seconds(5))
              .sum(1)
              ;
	
//	messageStream.print();
   
	messageStream.rebalance().addSink(new CassandraDataSink());
	
	System.out.println(env.getExecutionPlan());
   
   env.execute();

}
 
开发者ID:tuhingupta,项目名称:kappa-streaming,代码行数:32,代码来源:KafkaReader.java



注:本文中的org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java SwitchStatement类代码示例发布时间:2022-05-22
下一篇:
Java Objects类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap