本文整理汇总了Java中org.apache.storm.hdfs.bolt.HdfsBolt类的典型用法代码示例。如果您正苦于以下问题:Java HdfsBolt类的具体用法?Java HdfsBolt怎么用?Java HdfsBolt使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
HdfsBolt类属于org.apache.storm.hdfs.bolt包,在下文中一共展示了HdfsBolt类的13个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: createHdfsBolt
import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
/**
* Create bolt which will persist ticks to HDFS.
*/
private static HdfsBolt createHdfsBolt() {
// Use "|" instead of "," for field delimiter:
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");
// sync the filesystem after every 1k tuples:
SyncPolicy syncPolicy = new CountSyncPolicy(100);
// Rotate files when they reach 5MB:
FileRotationPolicy rotationPolicy =
new FileSizeRotationPolicy(5.0f, Units.MB);
// Write records to <user>/stock-ticks/ directory in HDFS:
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("stock-ticks/");
HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl("hdfs://localhost:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
return hdfsBolt;
}
开发者ID:amitchmca,项目名称:hadooparchitecturebook,代码行数:29,代码来源:MovingAvgLocalTopologyRunner.java
示例2: buildHdfsBolt
import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
private static HdfsBolt buildHdfsBolt(String hdfsUrl,String prefix, Fields fields){
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter(" : ").withFields(fields);
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/storm/").withPrefix(prefix).withExtension(".seq");
HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl(hdfsUrl)
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy)
.withRetryCount(5)
.addRotationAction(new MoveStormToLogAction().withDestination("/log"));
return hdfsBolt;
}
开发者ID:wuzhongdehua,项目名称:fksm,代码行数:26,代码来源:KafkaTopology.java
示例3: createHdfsBolt
import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
private static HdfsBolt createHdfsBolt() {
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath(Properties.getString("sa.storm.hdfs_output_file"));
return new HdfsBolt()
.withFsUrl(Properties.getString("sa.storm.hdfs_url"))
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
}
开发者ID:mayconbordin,项目名称:erad2016-streamprocessing,代码行数:22,代码来源:SentimentAnalysisTopology.java
示例4: getHdfsBolt
import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
public HdfsBolt getHdfsBolt() {
LOG.info("HDFSBOLT: Configuring the HdfsBolt");
// Define the RecordFormat, SyncPolicy, and FileNameFormat
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(fieldDelimiter);
SyncPolicy syncPolicy = new CountSyncPolicy(syncCount);
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(outputLocation);
// Configure the Bolt
return new HdfsBolt()
.withFsUrl(hdfsDefaultFs)
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(fileRotationPolicy)
.withSyncPolicy(syncPolicy);
}
开发者ID:sakserv,项目名称:storm-kafka-hdfs-example,代码行数:19,代码来源:HdfsBoltConfigBuilder.java
示例5: getHdfsBolt
import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
public static HdfsBolt getHdfsBolt(String fsUrl, String srcDir, String rotationDir) {
// sync the filesystem after every tuple
SyncPolicy syncPolicy = new CountSyncPolicy(1);
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath(srcDir)
.withExtension(".txt");
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(",");
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1f, FileSizeRotationPolicy.Units.KB);
HdfsBolt bolt = new HdfsBolt()
.withFsUrl(fsUrl)
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withSyncPolicy(syncPolicy)
.withRotationPolicy(rotationPolicy)
.addRotationAction(new MoveFileAction().toDestination(rotationDir));
return bolt;
}
开发者ID:Parth-Brahmbhatt,项目名称:storm-smoke-test,代码行数:22,代码来源:ConnectorUtil.java
示例6: configureHdfsBolt
import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
public static void configureHdfsBolt(TopologyBuilder builder,
String delimiter,
String outputPath,
String hdfsUri,
String hdfsBoltName,
String spoutName,
int parallelismHint,
FileRotationPolicy rotationPolicy,
int syncCount) {
LOG.info("HDFSBOLT: Configuring the HdfsBolt");
// Define the RecordFormat, SyncPolicy, and FileNameFormat
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(delimiter);
SyncPolicy syncPolicy = new CountSyncPolicy(syncCount);
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(outputPath);
// Configure the Bolt
HdfsBolt bolt = new HdfsBolt()
.withFsUrl(hdfsUri)
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
// Set the Bolt
builder.setBolt(hdfsBoltName, bolt, parallelismHint).shuffleGrouping(spoutName);
}
开发者ID:sakserv,项目名称:storm-topology-examples,代码行数:30,代码来源:ConfigureHdfsBolt.java
示例7: configureHdfsBolt
import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
public static void configureHdfsBolt(TopologyBuilder builder, String delimiter, String outputPath, String hdfsUri) {
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(delimiter);
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
//FileRotationPolicy rotationPolicy = new TimedRotationPolicy(300, TimedRotationPolicy.TimeUnit.SECONDS);
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1, FileSizeRotationPolicy.Units.KB);
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(outputPath);
HdfsBolt bolt = new HdfsBolt()
.withFsUrl(hdfsUri)
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
builder.setBolt("hdfsbolt", bolt, 1).shuffleGrouping("kafkaspout");
}
开发者ID:sakserv,项目名称:storm-kafka-hdfs-starter,代码行数:16,代码来源:ConfigureHdfsBolt.java
示例8: main
import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
public static void main(String[] args) {
try{
String zkhost = "wxb-1:2181,wxb-2:2181,wxb-3:2181";
String topic = "order";
String groupId = "id";
int spoutNum = 3;
int boltNum = 1;
ZkHosts zkHosts = new ZkHosts(zkhost);//kafaka所在的zookeeper
SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/order", groupId); // create /order /id
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
// HDFS bolt
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
// FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimedRotationPolicy.TimeUnit.MINUTES);
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/tmp/").withPrefix("order_").withExtension(".log");
HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl("hdfs://wxb-1:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", kafkaSpout, spoutNum);
builder.setBolt("check", new CheckOrderBolt(), boltNum).shuffleGrouping("spout");
builder.setBolt("counter", new CounterBolt(),boltNum).shuffleGrouping("check");
builder.setBolt("hdfs", hdfsBolt,boltNum).shuffleGrouping("counter");
Config config = new Config();
config.setDebug(true);
if(args!=null && args.length > 0) {
config.setNumWorkers(2);
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} else {
config.setMaxTaskParallelism(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Wordcount-Topology", config, builder.createTopology());
Thread.sleep(500000);
cluster.shutdown();
}
}catch (Exception e) {
e.printStackTrace();
}
}
开发者ID:realxujiang,项目名称:storm-kafka-examples,代码行数:61,代码来源:HdfsTopology.java
示例9: main
import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException, AuthorizationException {
com.typesafe.config.Config config = ConfigPropertieUtil.getInstance();
//zk地址
String zks = config.getString("kafka.zk.hosts");
//topic名称
String topic = config.getString("kafka.app.topic");
//kafka 对应的zk的目录地址
String zkRoot = config.getString("kafka.zk.broker.path");
//zk的ip
String zkHosts = config.getString("kafka.zk.servers");
//zk port
Integer zkPort = config.getInt("kafka.zk.port");
//storm worker数量
Integer numWorkers = config.getInt("storm.topology.workers");
//hdfs地址
String hdfsUrl = config.getString("hdfs.url");
//storm窗口一些设置
Integer windowDuration = config.getInt("storm.topology.window.duration");
Integer windowLag = config.getInt("storm.topology.window.lag");
Integer watermarkInterval = config.getInt("storm.topology.watermark.interval");
String id = config.getString("kafka.zk.kafka.group");
BrokerHosts brokerHosts = new ZkHosts(zks);
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.zkServers = Arrays.asList(zkHosts.split(","));
spoutConf.zkPort = zkPort;
HdfsBolt hdfsCallerBolt = buildHdfsBolt(hdfsUrl, "log_caller_info_", new Fields("caller_info"));
HdfsBolt hdfsRespParamBolt = buildHdfsBolt(hdfsUrl, "log_resp_param_", new Fields("resp_param"));
HdfsBolt hdfsRespTimeBolt = buildHdfsBolt(hdfsUrl, "log_resp_time_", new Fields("resp_time"));
HdfsBolt hdfsServiceInfoBolt = buildHdfsBolt(hdfsUrl, "log_service_info_", new Fields("service_info"));
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);
builder.setBolt("json-analysis", new KafkaJsonAnalysis(), 4).shuffleGrouping("kafka-reader");
//流水窗口操作,按照日志的时间来操作,窗口延迟2s
builder.setBolt("ss-sliding-window",
new ServerServiceWindowBolt()
.withTumblingWindow(new Duration(windowDuration, TimeUnit.SECONDS))
.withWatermarkInterval(new Duration(watermarkInterval, TimeUnit.SECONDS))
.withLag(new Duration(windowLag, TimeUnit.SECONDS)).withTimestampField("timestamp"), 4).fieldsGrouping("json-analysis", new Fields("server_service"));
// builder.setBolt("json-analysis", new StatisticsTimeLineBold(), 5).shuffleGrouping("sliding-window");
builder.setBolt("hdfs-bolt", new SaveHDFSBold(), 1).shuffleGrouping("json-analysis");
builder.setBolt("hdfs-caller-bolt", hdfsCallerBolt, 1).fieldsGrouping("hdfs-bolt", new Fields("caller_info"));
builder.setBolt("hdfs-resp-param-bolt", hdfsRespParamBolt, 1).fieldsGrouping("hdfs-bolt", new Fields("resp_param"));
builder.setBolt("hdfs-resp-time-bolt", hdfsRespTimeBolt, 1).fieldsGrouping("hdfs-bolt", new Fields("resp_time"));
builder.setBolt("hdfs-service-info-bolt", hdfsServiceInfoBolt, 1).fieldsGrouping("hdfs-bolt", new Fields("service_info"));
// builder.setBolt("redis-remove-bolt", new RemoveRedisBolt(), 1);
Config conf = new Config();
//conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(numWorkers);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
String name = KafkaTopology.class.getSimpleName();
conf.setMaxTaskParallelism(numWorkers);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
Thread.sleep(60000);
cluster.shutdown();
}
}
开发者ID:wuzhongdehua,项目名称:fksm,代码行数:67,代码来源:KafkaTopology.java
示例10: configureHDFSBolt
import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
public void configureHDFSBolt(TopologyBuilder builder) {
// Use pipe as record boundary
String rootPath = topologyConfig.getProperty("hdfs.path");
String prefix = topologyConfig.getProperty("hdfs.file.prefix");
String fsUrl = topologyConfig.getProperty("hdfs.url");
String sourceMetastoreUrl = topologyConfig.getProperty("hive.metastore.url");
String hiveStagingTableName = topologyConfig.getProperty("hive.staging.table.name");
String databaseName = topologyConfig.getProperty("hive.database.name");
Float rotationTimeInMinutes = Float.valueOf(topologyConfig.getProperty("hdfs.file.rotation.time.minutes"));
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(",");
//Synchronize data buffer with the filesystem every 1000 tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// Rotate data files when they reach five MB
//FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
//Rotate every X minutes
FileTimeRotationPolicy rotationPolicy = new FileTimeRotationPolicy(rotationTimeInMinutes, FileTimeRotationPolicy
.Units.MINUTES);
//Hive Partition Action
HiveTablePartitionAction hivePartitionAction = new HiveTablePartitionAction(sourceMetastoreUrl,
hiveStagingTableName, databaseName, fsUrl);
//MoveFileAction moveFileAction = new MoveFileAction().toDestination(rootPath + "/working");
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath(rootPath + "/staging")
.withPrefix(prefix);
// Instantiate the HdfsBolt
HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl(fsUrl)
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy)
.addRotationAction(hivePartitionAction);
int hdfsBoltCount = Integer.valueOf(topologyConfig.getProperty("hdfsbolt.thread.count"));
builder.setBolt("hdfs_bolt", hdfsBolt, hdfsBoltCount).shuffleGrouping("kafkaSpout");
}
开发者ID:DhruvKumar,项目名称:iot-masterclass,代码行数:47,代码来源:TruckEventProcessorKafkaTopology.java
示例11: main
import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
String zkIp = "localhost";
String nimbusHost = "sandbox.hortonworks.com";
String zookeeperHost = zkIp +":2181";
ZkHosts zkHosts = new ZkHosts(zookeeperHost);
List<String> zkServers = new ArrayList<String>();
zkServers.add(zkIp);
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "spertus-weather-events", "/spertus-weather-events","test_id");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
kafkaConfig.zkServers = zkServers;
kafkaConfig.zkRoot = "/spertus-weather-events";
kafkaConfig.zkPort = 2181;
kafkaConfig.forceFromStart = true;
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
HdfsBolt hdfsBolt = new HdfsBolt().withFsUrl("hdfs://sandbox.hortonworks.com:8020")
.withFileNameFormat(new DefaultFileNameFormat().withPath("/tmp/test"))
.withRecordFormat(new DelimitedRecordFormat().withFieldDelimiter("|"))
.withSyncPolicy(new CountSyncPolicy(10))
.withRotationPolicy(new FileSizeRotationPolicy(5.0f, Units.MB));
builder.setSpout("raw-weather-events", kafkaSpout, 1);
builder.setBolt("filter-airports", new FilterAirportsBolt(), 1).shuffleGrouping("raw-weather-events");
// builder.setBolt("test-bolt", new TestBolt(), 1).shuffleGrouping("raw-weather-events");
// builder.setBolt("hdfs-bolt", hdfsBolt, 1).shuffleGrouping("raw-weather-events");
Map conf = new HashMap();
conf.put(backtype.storm.Config.TOPOLOGY_WORKERS, 4);
conf.put(backtype.storm.Config.TOPOLOGY_DEBUG, true);
if (args != null && args.length > 0) {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("weather-topology", conf, builder.createTopology());
}
}
开发者ID:mspertus,项目名称:Big-Data-tutorial,代码行数:44,代码来源:WeatherTopology.java
示例12: initializeHDFSBolt
import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
private boolean initializeHDFSBolt(String topology_name, String name) {
try {
String messageUpstreamComponent = messageComponents
.get(messageComponents.size() - 1);
System.out.println("[OpenSOC] ------" + name
+ " is initializing from " + messageUpstreamComponent);
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter(
config.getString("bolt.hdfs.field.delimiter")
.toString()).withFields(
new Fields("message"));
// sync the file system after every x number of tuples
SyncPolicy syncPolicy = new CountSyncPolicy(Integer.valueOf(config
.getString("bolt.hdfs.batch.size").toString()));
// rotate files when they reach certain size
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(
Float.valueOf(config.getString(
"bolt.hdfs.file.rotation.size.in.mb").toString()),
Units.MB);
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath(config.getString("bolt.hdfs.wip.file.path")
.toString());
// Post rotate action
MoveFileAction moveFileAction = (new MoveFileAction())
.toDestination(config.getString(
"bolt.hdfs.finished.file.path").toString());
HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl(
config.getString("bolt.hdfs.file.system.url")
.toString())
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy)
.addRotationAction(moveFileAction);
if (config.getString("bolt.hdfs.compression.codec.class") != null) {
hdfsBolt.withCompressionCodec(config.getString(
"bolt.hdfs.compression.codec.class").toString());
}
builder.setBolt(name, hdfsBolt,
config.getInt("bolt.hdfs.parallelism.hint"))
.shuffleGrouping(messageUpstreamComponent, "message")
.setNumTasks(config.getInt("bolt.hdfs.num.tasks"));
} catch (Exception e) {
e.printStackTrace();
System.exit(0);
}
return true;
}
开发者ID:OpenSOC,项目名称:opensoc-streaming,代码行数:61,代码来源:TopologyRunner.java
示例13: configureHDFSBolt
import org.apache.storm.hdfs.bolt.HdfsBolt; //导入依赖的package包/类
public void configureHDFSBolt(TopologyBuilder builder) {
// Use pipe as record boundary
String rootPath = topologyConfig.getProperty("hdfs.path");
String prefix = topologyConfig.getProperty("hdfs.file.prefix");
String fsUrl = topologyConfig.getProperty("hdfs.url");
String sourceMetastoreUrl = topologyConfig.getProperty("hive.metastore.url");
String hiveStagingTableName = topologyConfig.getProperty("hive.staging.table.name");
String databaseName = topologyConfig.getProperty("hive.database.name");
Float rotationTimeInMinutes = Float.valueOf(topologyConfig.getProperty("hdfs.file.rotation.time.minutes"));
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(",");
//Synchronize data buffer with the filesystem every 1000 tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// Rotate data files when they reach five MB
//FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
//Rotate every X minutes
FileTimeRotationPolicy rotationPolicy = new FileTimeRotationPolicy(rotationTimeInMinutes, FileTimeRotationPolicy.Units.MINUTES);
//Hive Partition Action
HiveTablePartitionAction hivePartitionAction = new HiveTablePartitionAction(sourceMetastoreUrl, hiveStagingTableName, databaseName, fsUrl);
//MoveFileAction moveFileAction = new MoveFileAction().toDestination(rootPath + "/working");
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath(rootPath + "/staging")
.withPrefix(prefix);
// Instantiate the HdfsBolt
HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl(fsUrl)
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy)
.addRotationAction(hivePartitionAction);
int hdfsBoltCount = Integer.valueOf(topologyConfig.getProperty("hdfsbolt.thread.count"));
builder.setBolt("hdfs_bolt", hdfsBolt, hdfsBoltCount).shuffleGrouping("kafkaSpout");
}
开发者ID:patw,项目名称:storm-sample,代码行数:46,代码来源:TruckEventProcessorKafkaTopology.java
注:本文中的org.apache.storm.hdfs.bolt.HdfsBolt类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论