• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java HdfsBolt类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.storm.hdfs.bolt.HdfsBolt的典型用法代码示例。如果您正苦于以下问题:Java HdfsBolt类的具体用法?Java HdfsBolt怎么用?Java HdfsBolt使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



HdfsBolt类属于org.apache.storm.hdfs.bolt包,在下文中一共展示了HdfsBolt类的13个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: createHdfsBolt

import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
/**
 * Create bolt which will persist ticks to HDFS.
 */
private static HdfsBolt createHdfsBolt() {

  // Use "|" instead of "," for field delimiter:
  RecordFormat format = new DelimitedRecordFormat()
    .withFieldDelimiter("|");
  // sync the filesystem after every 1k tuples:
  SyncPolicy syncPolicy = new CountSyncPolicy(100);

  // Rotate files when they reach 5MB:
  FileRotationPolicy rotationPolicy = 
    new FileSizeRotationPolicy(5.0f, Units.MB);

  // Write records to <user>/stock-ticks/ directory in HDFS:
  FileNameFormat fileNameFormat = new DefaultFileNameFormat()
    .withPath("stock-ticks/");

  HdfsBolt hdfsBolt = new HdfsBolt()
    .withFsUrl("hdfs://localhost:8020")
    .withFileNameFormat(fileNameFormat)
    .withRecordFormat(format)
    .withRotationPolicy(rotationPolicy)
    .withSyncPolicy(syncPolicy);

  return hdfsBolt;
}
 
开发者ID:amitchmca,项目名称:hadooparchitecturebook,代码行数:29,代码来源:MovingAvgLocalTopologyRunner.java


示例2: buildHdfsBolt

import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
private static HdfsBolt buildHdfsBolt(String hdfsUrl,String prefix, Fields fields){
    // use "|" instead of "," for field delimiter
    RecordFormat format = new DelimitedRecordFormat()
            .withFieldDelimiter(" : ").withFields(fields);

    // sync the filesystem after every 1k tuples
    SyncPolicy syncPolicy = new CountSyncPolicy(1000);

    // rotate files
    FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

    FileNameFormat fileNameFormat = new DefaultFileNameFormat()
            .withPath("/storm/").withPrefix(prefix).withExtension(".seq");

    HdfsBolt hdfsBolt = new HdfsBolt()
            .withFsUrl(hdfsUrl)
            .withFileNameFormat(fileNameFormat)
            .withRecordFormat(format)
            .withRotationPolicy(rotationPolicy)
            .withSyncPolicy(syncPolicy)
            .withRetryCount(5)
            .addRotationAction(new MoveStormToLogAction().withDestination("/log"));

    return hdfsBolt;
}
 
开发者ID:wuzhongdehua,项目名称:fksm,代码行数:26,代码来源:KafkaTopology.java


示例3: createHdfsBolt

import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
private static HdfsBolt createHdfsBolt() {
    // use "|" instead of "," for field delimiter
    RecordFormat format = new DelimitedRecordFormat()
            .withFieldDelimiter("|");

    // sync the filesystem after every 1k tuples
    SyncPolicy syncPolicy = new CountSyncPolicy(1000);

    // rotate files when they reach 5MB
    FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);

    FileNameFormat fileNameFormat = new DefaultFileNameFormat()
            .withPath(Properties.getString("sa.storm.hdfs_output_file"));

    return new HdfsBolt()
            .withFsUrl(Properties.getString("sa.storm.hdfs_url"))
            .withFileNameFormat(fileNameFormat)
            .withRecordFormat(format)
            .withRotationPolicy(rotationPolicy)
            .withSyncPolicy(syncPolicy);
}
 
开发者ID:mayconbordin,项目名称:erad2016-streamprocessing,代码行数:22,代码来源:SentimentAnalysisTopology.java


示例4: getHdfsBolt

import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
public HdfsBolt getHdfsBolt() {

        LOG.info("HDFSBOLT: Configuring the HdfsBolt");

        // Define the RecordFormat, SyncPolicy, and FileNameFormat
        RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(fieldDelimiter);
        SyncPolicy syncPolicy = new CountSyncPolicy(syncCount);
        FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(outputLocation);

        // Configure the Bolt
        return new HdfsBolt()
                .withFsUrl(hdfsDefaultFs)
                .withFileNameFormat(fileNameFormat)
                .withRecordFormat(format)
                .withRotationPolicy(fileRotationPolicy)
                .withSyncPolicy(syncPolicy);

    }
 
开发者ID:sakserv,项目名称:storm-kafka-hdfs-example,代码行数:19,代码来源:HdfsBoltConfigBuilder.java


示例5: getHdfsBolt

import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
public static HdfsBolt getHdfsBolt(String fsUrl, String srcDir, String rotationDir) {
    // sync the filesystem after every tuple
    SyncPolicy syncPolicy = new CountSyncPolicy(1);

    FileNameFormat fileNameFormat = new DefaultFileNameFormat()
            .withPath(srcDir)
            .withExtension(".txt");

    RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(",");
    FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1f, FileSizeRotationPolicy.Units.KB);

    HdfsBolt bolt = new HdfsBolt()
            .withFsUrl(fsUrl)
            .withFileNameFormat(fileNameFormat)
            .withRecordFormat(format)
            .withSyncPolicy(syncPolicy)
            .withRotationPolicy(rotationPolicy)
            .addRotationAction(new MoveFileAction().toDestination(rotationDir));

    return bolt;
}
 
开发者ID:Parth-Brahmbhatt,项目名称:storm-smoke-test,代码行数:22,代码来源:ConnectorUtil.java


示例6: configureHdfsBolt

import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
public static void configureHdfsBolt(TopologyBuilder builder, 
                                     String delimiter, 
                                     String outputPath, 
                                     String hdfsUri,
                                     String hdfsBoltName, 
                                     String spoutName,
                                     int parallelismHint,
                                     FileRotationPolicy rotationPolicy,
                                     int syncCount) {
    
    LOG.info("HDFSBOLT: Configuring the HdfsBolt");
    
    // Define the RecordFormat, SyncPolicy, and FileNameFormat
    RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(delimiter);
    SyncPolicy syncPolicy = new CountSyncPolicy(syncCount);
    FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(outputPath);
    
    // Configure the Bolt
    HdfsBolt bolt = new HdfsBolt()
            .withFsUrl(hdfsUri)
            .withFileNameFormat(fileNameFormat)
            .withRecordFormat(format)
            .withRotationPolicy(rotationPolicy)
            .withSyncPolicy(syncPolicy);
    
    // Set the Bolt
    builder.setBolt(hdfsBoltName, bolt, parallelismHint).shuffleGrouping(spoutName);

}
 
开发者ID:sakserv,项目名称:storm-topology-examples,代码行数:30,代码来源:ConfigureHdfsBolt.java


示例7: configureHdfsBolt

import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
public static void configureHdfsBolt(TopologyBuilder builder, String delimiter, String outputPath, String hdfsUri) {
    RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(delimiter);
    SyncPolicy syncPolicy = new CountSyncPolicy(1000);
    //FileRotationPolicy rotationPolicy = new TimedRotationPolicy(300, TimedRotationPolicy.TimeUnit.SECONDS);
    FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1, FileSizeRotationPolicy.Units.KB);
    FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(outputPath);
    HdfsBolt bolt = new HdfsBolt()
            .withFsUrl(hdfsUri)
            .withFileNameFormat(fileNameFormat)
            .withRecordFormat(format)
            .withRotationPolicy(rotationPolicy)
            .withSyncPolicy(syncPolicy);
    builder.setBolt("hdfsbolt", bolt, 1).shuffleGrouping("kafkaspout");

}
 
开发者ID:sakserv,项目名称:storm-kafka-hdfs-starter,代码行数:16,代码来源:ConfigureHdfsBolt.java


示例8: main

import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
public static void main(String[] args) {
    try{
        String zkhost = "wxb-1:2181,wxb-2:2181,wxb-3:2181";
        String topic = "order";
        String groupId = "id";
        int spoutNum = 3;
        int boltNum = 1;
        ZkHosts zkHosts = new ZkHosts(zkhost);//kafaka所在的zookeeper
        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/order", groupId);  // create /order /id
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        // HDFS bolt
        // use "|" instead of "," for field delimiter
        RecordFormat format = new DelimitedRecordFormat()
                .withFieldDelimiter("|");

        // sync the filesystem after every 1k tuples
        SyncPolicy syncPolicy = new CountSyncPolicy(1000);

        // rotate files when they reach 5MB
        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
        // FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimedRotationPolicy.TimeUnit.MINUTES);

        FileNameFormat fileNameFormat = new DefaultFileNameFormat()
                .withPath("/tmp/").withPrefix("order_").withExtension(".log");

        HdfsBolt hdfsBolt = new HdfsBolt()
                .withFsUrl("hdfs://wxb-1:8020")
                .withFileNameFormat(fileNameFormat)
                .withRecordFormat(format)
                .withRotationPolicy(rotationPolicy)
                .withSyncPolicy(syncPolicy);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", kafkaSpout, spoutNum);
        builder.setBolt("check", new CheckOrderBolt(), boltNum).shuffleGrouping("spout");
        builder.setBolt("counter", new CounterBolt(),boltNum).shuffleGrouping("check");
        builder.setBolt("hdfs", hdfsBolt,boltNum).shuffleGrouping("counter");

        Config config = new Config();
        config.setDebug(true);

        if(args!=null && args.length > 0) {
            config.setNumWorkers(2);
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } else {
            config.setMaxTaskParallelism(2);

            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("Wordcount-Topology", config, builder.createTopology());

            Thread.sleep(500000);

            cluster.shutdown();
        }
    }catch (Exception e) {
        e.printStackTrace();
    }
}
 
开发者ID:realxujiang,项目名称:storm-kafka-examples,代码行数:61,代码来源:HdfsTopology.java


示例9: main

import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException, AuthorizationException {
        com.typesafe.config.Config config = ConfigPropertieUtil.getInstance();
        //zk地址
        String zks = config.getString("kafka.zk.hosts");
        //topic名称
        String topic = config.getString("kafka.app.topic");
        //kafka 对应的zk的目录地址
        String zkRoot = config.getString("kafka.zk.broker.path");
        //zk的ip
        String zkHosts = config.getString("kafka.zk.servers");
        //zk port
        Integer zkPort = config.getInt("kafka.zk.port");
        //storm worker数量
        Integer numWorkers = config.getInt("storm.topology.workers");
        //hdfs地址
        String hdfsUrl = config.getString("hdfs.url");

        //storm窗口一些设置
        Integer windowDuration = config.getInt("storm.topology.window.duration");
        Integer windowLag = config.getInt("storm.topology.window.lag");
        Integer watermarkInterval = config.getInt("storm.topology.watermark.interval");
        String id = config.getString("kafka.zk.kafka.group");

        BrokerHosts brokerHosts = new ZkHosts(zks);
        SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConf.zkServers = Arrays.asList(zkHosts.split(","));
        spoutConf.zkPort = zkPort;

        HdfsBolt hdfsCallerBolt = buildHdfsBolt(hdfsUrl, "log_caller_info_", new Fields("caller_info"));
        HdfsBolt hdfsRespParamBolt = buildHdfsBolt(hdfsUrl, "log_resp_param_", new Fields("resp_param"));
        HdfsBolt hdfsRespTimeBolt = buildHdfsBolt(hdfsUrl, "log_resp_time_", new Fields("resp_time"));
        HdfsBolt hdfsServiceInfoBolt = buildHdfsBolt(hdfsUrl, "log_service_info_", new Fields("service_info"));

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);
        builder.setBolt("json-analysis", new KafkaJsonAnalysis(), 4).shuffleGrouping("kafka-reader");
        //流水窗口操作,按照日志的时间来操作,窗口延迟2s
        builder.setBolt("ss-sliding-window",
                new ServerServiceWindowBolt()
                        .withTumblingWindow(new Duration(windowDuration, TimeUnit.SECONDS))
                        .withWatermarkInterval(new Duration(watermarkInterval, TimeUnit.SECONDS))
                .withLag(new Duration(windowLag, TimeUnit.SECONDS)).withTimestampField("timestamp"), 4).fieldsGrouping("json-analysis", new Fields("server_service"));
//        builder.setBolt("json-analysis", new StatisticsTimeLineBold(), 5).shuffleGrouping("sliding-window");
        builder.setBolt("hdfs-bolt", new SaveHDFSBold(), 1).shuffleGrouping("json-analysis");
        builder.setBolt("hdfs-caller-bolt", hdfsCallerBolt, 1).fieldsGrouping("hdfs-bolt", new Fields("caller_info"));
        builder.setBolt("hdfs-resp-param-bolt", hdfsRespParamBolt, 1).fieldsGrouping("hdfs-bolt", new Fields("resp_param"));
        builder.setBolt("hdfs-resp-time-bolt", hdfsRespTimeBolt, 1).fieldsGrouping("hdfs-bolt", new Fields("resp_time"));
        builder.setBolt("hdfs-service-info-bolt", hdfsServiceInfoBolt, 1).fieldsGrouping("hdfs-bolt", new Fields("service_info"));

//        builder.setBolt("redis-remove-bolt", new RemoveRedisBolt(), 1);

        Config conf = new Config();
        //conf.setDebug(true);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(numWorkers);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            String name = KafkaTopology.class.getSimpleName();
            conf.setMaxTaskParallelism(numWorkers);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(name, conf, builder.createTopology());
            Thread.sleep(60000);
            cluster.shutdown();
        }
    }
 
开发者ID:wuzhongdehua,项目名称:fksm,代码行数:67,代码来源:KafkaTopology.java


示例10: configureHDFSBolt

import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
public void configureHDFSBolt(TopologyBuilder builder) {
  // Use pipe as record boundary

  String rootPath = topologyConfig.getProperty("hdfs.path");
  String prefix = topologyConfig.getProperty("hdfs.file.prefix");
  String fsUrl = topologyConfig.getProperty("hdfs.url");
  String sourceMetastoreUrl = topologyConfig.getProperty("hive.metastore.url");
  String hiveStagingTableName = topologyConfig.getProperty("hive.staging.table.name");
  String databaseName = topologyConfig.getProperty("hive.database.name");
  Float rotationTimeInMinutes = Float.valueOf(topologyConfig.getProperty("hdfs.file.rotation.time.minutes"));

  RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(",");

  //Synchronize data buffer with the filesystem every 1000 tuples
  SyncPolicy syncPolicy = new CountSyncPolicy(1000);

  // Rotate data files when they reach five MB
  //FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

  //Rotate every X minutes
  FileTimeRotationPolicy rotationPolicy = new FileTimeRotationPolicy(rotationTimeInMinutes, FileTimeRotationPolicy
      .Units.MINUTES);

  //Hive Partition Action
  HiveTablePartitionAction hivePartitionAction = new HiveTablePartitionAction(sourceMetastoreUrl,
      hiveStagingTableName, databaseName, fsUrl);

  //MoveFileAction moveFileAction = new MoveFileAction().toDestination(rootPath + "/working");


  FileNameFormat fileNameFormat = new DefaultFileNameFormat()
      .withPath(rootPath + "/staging")
      .withPrefix(prefix);

  // Instantiate the HdfsBolt
  HdfsBolt hdfsBolt = new HdfsBolt()
      .withFsUrl(fsUrl)
      .withFileNameFormat(fileNameFormat)
      .withRecordFormat(format)
      .withRotationPolicy(rotationPolicy)
      .withSyncPolicy(syncPolicy)
      .addRotationAction(hivePartitionAction);

  int hdfsBoltCount = Integer.valueOf(topologyConfig.getProperty("hdfsbolt.thread.count"));
  builder.setBolt("hdfs_bolt", hdfsBolt, hdfsBoltCount).shuffleGrouping("kafkaSpout");
}
 
开发者ID:DhruvKumar,项目名称:iot-masterclass,代码行数:47,代码来源:TruckEventProcessorKafkaTopology.java


示例11: main

import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

		String zkIp = "localhost";

		String nimbusHost = "sandbox.hortonworks.com";

		String zookeeperHost = zkIp +":2181";

		ZkHosts zkHosts = new ZkHosts(zookeeperHost);
		List<String> zkServers = new ArrayList<String>();
		zkServers.add(zkIp);
		SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "spertus-weather-events", "/spertus-weather-events","test_id");
		kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
		kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
		kafkaConfig.zkServers = zkServers;
		kafkaConfig.zkRoot = "/spertus-weather-events";
		kafkaConfig.zkPort = 2181;
		kafkaConfig.forceFromStart = true;
		KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);

		TopologyBuilder builder = new TopologyBuilder();

		HdfsBolt hdfsBolt = new HdfsBolt().withFsUrl("hdfs://sandbox.hortonworks.com:8020")
				.withFileNameFormat(new DefaultFileNameFormat().withPath("/tmp/test"))
				.withRecordFormat(new DelimitedRecordFormat().withFieldDelimiter("|"))
				.withSyncPolicy(new CountSyncPolicy(10))
				.withRotationPolicy(new FileSizeRotationPolicy(5.0f, Units.MB));
		builder.setSpout("raw-weather-events", kafkaSpout, 1);
		builder.setBolt("filter-airports", new FilterAirportsBolt(), 1).shuffleGrouping("raw-weather-events");
		//        builder.setBolt("test-bolt", new TestBolt(), 1).shuffleGrouping("raw-weather-events");
		//        builder.setBolt("hdfs-bolt", hdfsBolt, 1).shuffleGrouping("raw-weather-events");


		Map conf = new HashMap();
		conf.put(backtype.storm.Config.TOPOLOGY_WORKERS, 4);
		conf.put(backtype.storm.Config.TOPOLOGY_DEBUG, true);
		if (args != null && args.length > 0) {
			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
		}   else {
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("weather-topology", conf, builder.createTopology());
		}
	}
 
开发者ID:mspertus,项目名称:Big-Data-tutorial,代码行数:44,代码来源:WeatherTopology.java


示例12: initializeHDFSBolt

import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
private boolean initializeHDFSBolt(String topology_name, String name) {
	try {

		String messageUpstreamComponent = messageComponents
				.get(messageComponents.size() - 1);

		System.out.println("[OpenSOC] ------" + name
				+ " is initializing from " + messageUpstreamComponent);

		RecordFormat format = new DelimitedRecordFormat()
				.withFieldDelimiter(
						config.getString("bolt.hdfs.field.delimiter")
								.toString()).withFields(
						new Fields("message"));

		// sync the file system after every x number of tuples
		SyncPolicy syncPolicy = new CountSyncPolicy(Integer.valueOf(config
				.getString("bolt.hdfs.batch.size").toString()));

		// rotate files when they reach certain size
		FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(
				Float.valueOf(config.getString(
						"bolt.hdfs.file.rotation.size.in.mb").toString()),
				Units.MB);

		FileNameFormat fileNameFormat = new DefaultFileNameFormat()
				.withPath(config.getString("bolt.hdfs.wip.file.path")
						.toString());

		// Post rotate action
		MoveFileAction moveFileAction = (new MoveFileAction())
				.toDestination(config.getString(
						"bolt.hdfs.finished.file.path").toString());

		HdfsBolt hdfsBolt = new HdfsBolt()
				.withFsUrl(
						config.getString("bolt.hdfs.file.system.url")
								.toString())
				.withFileNameFormat(fileNameFormat)
				.withRecordFormat(format)
				.withRotationPolicy(rotationPolicy)
				.withSyncPolicy(syncPolicy)
				.addRotationAction(moveFileAction);
		if (config.getString("bolt.hdfs.compression.codec.class") != null) {
			hdfsBolt.withCompressionCodec(config.getString(
					"bolt.hdfs.compression.codec.class").toString());
		}

		builder.setBolt(name, hdfsBolt,
				config.getInt("bolt.hdfs.parallelism.hint"))
				.shuffleGrouping(messageUpstreamComponent, "message")
				.setNumTasks(config.getInt("bolt.hdfs.num.tasks"));

	} catch (Exception e) {
		e.printStackTrace();
		System.exit(0);
	}

	return true;
}
 
开发者ID:OpenSOC,项目名称:opensoc-streaming,代码行数:61,代码来源:TopologyRunner.java


示例13: configureHDFSBolt

import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
public void configureHDFSBolt(TopologyBuilder builder) {
	// Use pipe as record boundary
	
	String rootPath = topologyConfig.getProperty("hdfs.path");
	String prefix = topologyConfig.getProperty("hdfs.file.prefix");
	String fsUrl = topologyConfig.getProperty("hdfs.url");
	String sourceMetastoreUrl = topologyConfig.getProperty("hive.metastore.url");
	String hiveStagingTableName = topologyConfig.getProperty("hive.staging.table.name");
	String databaseName = topologyConfig.getProperty("hive.database.name");
	Float rotationTimeInMinutes = Float.valueOf(topologyConfig.getProperty("hdfs.file.rotation.time.minutes"));
	
	RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(",");

	//Synchronize data buffer with the filesystem every 1000 tuples
	SyncPolicy syncPolicy = new CountSyncPolicy(1000);

	// Rotate data files when they reach five MB
	//FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
	
	//Rotate every X minutes
	FileTimeRotationPolicy rotationPolicy = new FileTimeRotationPolicy(rotationTimeInMinutes, FileTimeRotationPolicy.Units.MINUTES);
	
	//Hive Partition Action
	HiveTablePartitionAction hivePartitionAction = new HiveTablePartitionAction(sourceMetastoreUrl, hiveStagingTableName, databaseName, fsUrl);
	
	//MoveFileAction moveFileAction = new MoveFileAction().toDestination(rootPath + "/working");


	
	FileNameFormat fileNameFormat = new DefaultFileNameFormat()
			.withPath(rootPath + "/staging")
			.withPrefix(prefix);

	// Instantiate the HdfsBolt
	HdfsBolt hdfsBolt = new HdfsBolt()
			 .withFsUrl(fsUrl)
	         .withFileNameFormat(fileNameFormat)
	         .withRecordFormat(format)
	         .withRotationPolicy(rotationPolicy)
	         .withSyncPolicy(syncPolicy)
	         .addRotationAction(hivePartitionAction);
	
	int hdfsBoltCount = Integer.valueOf(topologyConfig.getProperty("hdfsbolt.thread.count"));
	builder.setBolt("hdfs_bolt", hdfsBolt, hdfsBoltCount).shuffleGrouping("kafkaSpout");
}
 
开发者ID:patw,项目名称:storm-sample,代码行数:46,代码来源:TruckEventProcessorKafkaTopology.java



注:本文中的org.apache.storm.hdfs.bolt.HdfsBolt类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Method类代码示例发布时间:2022-05-21
下一篇:
Java GetDataBuilder类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap