• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java FlinkKafkaProducer010类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010的典型用法代码示例。如果您正苦于以下问题:Java FlinkKafkaProducer010类的具体用法?Java FlinkKafkaProducer010怎么用?Java FlinkKafkaProducer010使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



FlinkKafkaProducer010类属于org.apache.flink.streaming.connectors.kafka包,在下文中一共展示了FlinkKafkaProducer010类的7个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: writeEnrichedStream

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; //导入依赖的package包/类
private static void writeEnrichedStream(DataStream<AisMessage> enrichedAisMessagesStream,
    String parsingConfig, boolean writeOutputStreamToFile, String outputLineDelimiter,
    String outputPath, String outputStreamTopic) throws IOException {

  if (writeOutputStreamToFile) {
    enrichedAisMessagesStream.map(new AisMessagesToCsvMapper(outputLineDelimiter)).writeAsText(
        outputPath, WriteMode.OVERWRITE);
    return;
  }

  // Write to Kafka
  Properties producerProps = AppUtils.getKafkaProducerProperties();

  FlinkKafkaProducer010Configuration<AisMessage> myProducerConfig =
      FlinkKafkaProducer010.writeToKafkaWithTimestamps(enrichedAisMessagesStream,
          outputStreamTopic, new AisMessageCsvSchema(parsingConfig, outputLineDelimiter),
          producerProps);
  myProducerConfig.setLogFailuresOnly(false);
  myProducerConfig.setFlushOnCheckpoint(true);

}
 
开发者ID:ehabqadah,项目名称:in-situ-processing-datAcron,代码行数:22,代码来源:InSituProcessingApp.java


示例2: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
    // Read parameters from command line
    final ParameterTool params = ParameterTool.fromArgs(args);

    if(params.getNumberOfParameters() < 4) {
        System.out.println("\nUsage: FlinkReadKafka --read-topic <topic> --write-topic <topic> --bootstrap.servers <kafka brokers> --group.id <groupid>");
        return;
    }


    // setup streaming environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
    env.enableCheckpointing(300000); // 300 seconds
    env.getConfig().setGlobalJobParameters(params);

    DataStream<String> messageStream = env
            .addSource(new FlinkKafkaConsumer010<>(
                    params.getRequired("read-topic"),
                    new SimpleStringSchema(),
                    params.getProperties())).name("Read from Kafka");

    // setup table environment
    StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(env);


    // Write JSON payload back to Kafka topic
    messageStream.addSink(new FlinkKafkaProducer010<>(
                params.getRequired("write-topic"),
                new SimpleStringSchema(),
                params.getProperties())).name("Write To Kafka");

    env.execute("FlinkReadWriteKafka");
}
 
开发者ID:kgorman,项目名称:TrafficAnalyzer,代码行数:35,代码来源:FlinkReadWriteKafka.java


示例3: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

		TaxiRideCleansingParameterParser params = new TaxiRideCleansingParameterParser();
		// TODO: refactor this method
		if(!params.parseParams(args)){
			final String dataFilePath = params.getDataFilePath();

			// get an ExecutionEnvironment
			StreamExecutionEnvironment env =
					StreamExecutionEnvironment.getExecutionEnvironment();
			// configure event-time processing
			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

			// get the taxi ride data stream
			DataStream<TaxiRide> rides = env.addSource(
					new TaxiRideSource(dataFilePath, MAX_EVENT_DELAY_DEFAULT, SERVING_SPEED_FACTOR_DEFAULT));

			TaxiRideCleansing taxiRideCleansing = new TaxiRideCleansing();
			DataStream<TaxiRide> filteredRides = taxiRideCleansing.execute(rides);

			filteredRides.addSink(new FlinkKafkaProducer010<>(
					"localhost:9092",      // Kafka broker host:port
					"cleansedRides",       // Topic to write to
					new TaxiRideSchema())  // Serializer (provided as util)
			);

//			filteredRides.print();
			env.execute("Running Taxi Ride Cleansing");
		}
	}
 
开发者ID:dineshtrivedi,项目名称:flink-java-project,代码行数:31,代码来源:TaxiRideCleansingRunner.java


示例4: configuration

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; //导入依赖的package包/类
public static void configuration(DataStream<String> stream, String topic, Properties properties) {

        // using Apache Kafka as a sink for serialized generic output
        FlinkKafkaProducer010.FlinkKafkaProducer010Configuration kafkaConfig = FlinkKafkaProducer010
                .writeToKafkaWithTimestamps(
                        stream,
                        topic,
                        new SimpleStringSchema(),
                        properties
        );
        kafkaConfig.setLogFailuresOnly(false);
        kafkaConfig.setFlushOnCheckpoint(true);
    }
 
开发者ID:ProjectEmber,项目名称:project-ember,代码行数:14,代码来源:EmberKafkaProducer.java


示例5: configuration

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; //导入依赖的package包/类
public static void configuration(DataStream<StreetLamp> stream, Properties properties) {

        // using Apache Kafka as
        FlinkKafkaProducer010.FlinkKafkaProducer010Configuration kafkaConfig = FlinkKafkaProducer010
                .writeToKafkaWithTimestamps(
                        stream,
                        "control",
                        new ControlSerializationSchema(),
                        properties
                );

        kafkaConfig.setLogFailuresOnly(false);
        kafkaConfig.setFlushOnCheckpoint(true);
    }
 
开发者ID:ProjectEmber,项目名称:project-ember,代码行数:15,代码来源:EmberKafkaControlSink.java


示例6: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; //导入依赖的package包/类
/**
 * The main entry method
 * 
 */
public static void main(String[] args) throws Exception {

  String cehkPointsPath =
      Paths.get(configs.getStringProp("flinkCheckPointsPath") + "/" + System.currentTimeMillis())
          .toUri().toString();


  int parallelism = configs.getIntProp("parallelism");
  String inputHdfsFile = configs.getStringProp("inputHDFSFilePath");
  String outputTopicName = configs.getStringProp("outputHDFSKafkaTopic");

  // Set up the execution environment
  final StreamExecutionEnvironment env =
      new StreamExecutionEnvBuilder().setParallelism(parallelism).setStateBackend(cehkPointsPath)
          .build();
  // Read the HDFS file
  DataStreamSource<String> inputTextStream =
      env.readTextFile(inputHdfsFile).setParallelism(parallelism);

  FlinkKafkaProducer010Configuration<String> myProducerConfig =
      FlinkKafkaProducer010.writeToKafkaWithTimestamps(inputTextStream, outputTopicName,
          new SimpleStringSchema(), AppUtils.getKafkaProducerProperties());


  myProducerConfig.setLogFailuresOnly(false);
  myProducerConfig.setFlushOnCheckpoint(true);


  System.out.println(env.getExecutionPlan());

  JobExecutionResult executionResult = null;

  try {
    executionResult = env.execute(" HDFS to Kafka stream producer");
  } catch (Exception e) {
    System.out.println(e.getMessage());
  }

  System.out.println("Full execution time=" + executionResult.getNetRuntime(TimeUnit.MINUTES));
}
 
开发者ID:ehabqadah,项目名称:in-situ-processing-datAcron,代码行数:45,代码来源:HdfsToKafkaProducer.java


示例7: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	// parse input arguments
	final ParameterTool parameterTool = ParameterTool.fromArgs(args);

	if (parameterTool.getNumberOfParameters() < 5) {
		System.out.println("Missing parameters!\n" +
				"Usage: Kafka --input-topic <topic> --output-topic <topic> " +
				"--bootstrap.servers <kafka brokers> " +
				"--zookeeper.connect <zk quorum> --group.id <some id> [--prefix <prefix>]");
		return;
	}

	String prefix = parameterTool.get("prefix", "PREFIX:");

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.getConfig().disableSysoutLogging();
	env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
	env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
	env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface

	// make parameters available in the web interface
	env.getConfig().setGlobalJobParameters(parameterTool);

	DataStream<String> input = env
			.addSource(new FlinkKafkaConsumer010<>(
					parameterTool.getRequired("input-topic"),
					new SimpleStringSchema(),
					parameterTool.getProperties()))
			.map(new PrefixingMapper(prefix));

	input.addSink(
			new FlinkKafkaProducer010<>(
					parameterTool.getRequired("output-topic"),
					new SimpleStringSchema(),
					parameterTool.getProperties()));

	env.execute("Kafka 0.10 Example");
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:39,代码来源:Kafka010Example.java



注:本文中的org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java CleanUpContext类代码示例发布时间:2022-05-22
下一篇:
Java DialPlot类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap