本文整理汇总了Java中storm.kafka.trident.TridentKafkaConfig类的典型用法代码示例。如果您正苦于以下问题:Java TridentKafkaConfig类的具体用法?Java TridentKafkaConfig怎么用?Java TridentKafkaConfig使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
TridentKafkaConfig类属于storm.kafka.trident包,在下文中一共展示了TridentKafkaConfig类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: buildTopology
import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public static StormTopology buildTopology(String hdfsUrl) {
TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(new ZkHosts(ZKHOST, "/brokers"), KAFKA_TOPIC);
tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new RawScheme());
tridentKafkaConfig.startOffsetTime = -1; // forceStartOffsetTime(-1); //Read latest messages from Kafka
TransactionalTridentKafkaSpout tridentKafkaSpout = new TransactionalTridentKafkaSpout(tridentKafkaConfig);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("stream", tridentKafkaSpout);
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(HDFS_OUT_PATH).withPrefix("trident").withExtension(".txt");
FileRotationPolicy rotationPolicy = new FileSizeCountRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB, 10);
HdfsState.Options seqOpts = new HdfsState.HdfsFileOptions().withFileNameFormat(fileNameFormat)
.withRecordFormat(new DelimitedRecordFormat().withFieldDelimiter("|").withFields(new Fields("json")))
.withRotationPolicy(rotationPolicy).withFsUrl(hdfsUrl)
// .addRotationAction(new MoveFileAction().toDestination(HDFS_ROTATE_PATH));
// .addRotationAction(new AddSuffixFileAction().withSuffix("-processed"));
.addRotationAction(new MD5FileAction());
StateFactory factory = new HdfsStateFactory().withOptions(seqOpts);
stream.each(new Fields("bytes"), new JacksonJsonParser(), new Fields("json")).partitionPersist(factory, new Fields("json"),
new HdfsUpdater(), new Fields());
return topology.build();
}
开发者ID:ogidogi,项目名称:kafka-storm-hive,代码行数:27,代码来源:HDFSSequenceTopology.java
示例2: TridentKafkaSpout
import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
/**
* Constructor
*
* @param config Config file to read properties from
* @param section Section of the kafka config file to read properties from.
*/
public TridentKafkaSpout(ConfigData config, String section) {
_kafkaConfig = new TridentKafkaConfig(new ZkHosts(config.getZkHost()), config.getTopic(section), "stormKafka");
_kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
_kafkaConfig.bufferSizeBytes = config.getFetchSizeKafka();
_kafkaConfig.fetchSizeBytes = config.getFetchSizeKafka();
_kafkaConfig.forceFromStart = false;
}
开发者ID:redBorder,项目名称:rb-bi,代码行数:14,代码来源:TridentKafkaSpout.java
示例3: TridentKafkaSpoutNmsp
import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
/**
* Constructor
*
* @param config Config file to read properties from
* @param section Section of the kafka config file to read properties from.
*/
public TridentKafkaSpoutNmsp(ConfigData config, String section) {
_kafkaConfig = new TridentKafkaConfig(new ZkHosts(config.getZkHost()), config.getTopic(section), "stormKafka");
_kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
_kafkaConfig.bufferSizeBytes = config.getFetchSizeKafkaNmsp();
_kafkaConfig.fetchSizeBytes = config.getFetchSizeKafkaNmsp();
_kafkaConfig.forceFromStart = false;
}
开发者ID:redBorder,项目名称:rb-bi,代码行数:14,代码来源:TridentKafkaSpoutNmsp.java
示例4: TridentKafkaSpoutLocation
import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
/**
* Constructor
*
* @param config Config file to read properties from
* @param section Section of the kafka config file to read properties from.
*/
public TridentKafkaSpoutLocation(ConfigData config, String section) {
_kafkaConfig = new TridentKafkaConfig(new ZkHosts(config.getZkHost()), config.getTopic(section), "stormKafka");
_kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
_kafkaConfig.bufferSizeBytes = config.getFetchSizeKafkaLocation();
_kafkaConfig.fetchSizeBytes = config.getFetchSizeKafkaLocation();
_kafkaConfig.forceFromStart = false;
}
开发者ID:redBorder,项目名称:rb-bi,代码行数:14,代码来源:TridentKafkaSpoutLocation.java
示例5: createKafkaSpout
import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
/**
* Creates a transactional kafka spout that consumes any new data published to "test" topic.
* <p/>
* For more info on transactional spouts
* see "Transactional spouts" section in
* <a href="https://storm.apache.org/documentation/Trident-state"> Trident state</a> doc.
*
* @return a transactional trident kafka spout.
*/
private TransactionalTridentKafkaSpout createKafkaSpout() {
ZkHosts hosts = new ZkHosts(zkUrl);
TridentKafkaConfig config = new TridentKafkaConfig(hosts, KAFKA_TOPIC);
config.scheme = new SchemeAsMultiScheme(new StringScheme());
// Consume new data from the topic
config.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
return new TransactionalTridentKafkaSpout(config);
}
开发者ID:desp0916,项目名称:LearnStorm,代码行数:19,代码来源:TridentKafkaWordCount.java
示例6: buildTopology
import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public static StormTopology buildTopology(Config conf, LocalDRPC drpc) {
TridentTopology topology = new TridentTopology();
//Kafka Spout
BrokerHosts zk = new ZkHosts(conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_NAME) + ":" +conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_PORT));
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk, (String) conf.get(CrawlerConfig.KAFKA_TOPIC_DOCUMENT_NAME));
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(kafkaConfig);
//ElasticSearch Persistent State
Settings esSettings = ImmutableSettings.settingsBuilder()
.put("storm.elasticsearch.cluster.name", conf.get(CrawlerConfig.ELASTICSEARCH_CLUSTER_NAME))
.put("storm.elasticsearch.hosts", conf.get(CrawlerConfig.ELASTICSEARCH_HOST_NAME) + ":" + conf.get(CrawlerConfig.ELASTICSEARCH_HOST_PORT))
.build();
StateFactory esStateFactory = new ESIndexState.Factory<JSONObject>(new ClientFactory.NodeClient(esSettings.getAsMap()), JSONObject.class);
TridentState esStaticState = topology.newStaticState(esStateFactory);
String esIndex = (String)(conf.get(CrawlerConfig.ELASTICSEARCH_INDEX_NAME));
topology.newStream("docstream",spout)
.each( new Fields("str"), new SplitDocStreamArgs(), new Fields("filename", "task", "user", "content"))
.each( new Fields("filename", "task", "user"), new PrintFilter("Kafka"))
.each( new Fields("filename","task","user","content"), new PrepareDocForElasticSearch(), new Fields("index","type","id","source") )
.partitionPersist(esStateFactory, new Fields("index","type","id","source"), new ESIndexUpdater<String>(new ESTridentTupleMapper()), new Fields());
return topology.build();
}
开发者ID:preems,项目名称:realtime-event-processing,代码行数:28,代码来源:DocEventProcessingTopology.java
示例7: getTridentKafkaEmitter
import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public static OpaqueTridentKafkaSpout getTridentKafkaEmitter(String zkConnString, String topicName, Map topologyConfig) {
BrokerHosts hosts = new ZkHosts(zkConnString);
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, topicName);
//topologyConfig.put("topology.spout.max.batch.size", 1);
//kafkaConfig.forceFromStart = true;
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
return new OpaqueTridentKafkaSpout(kafkaConfig);
}
开发者ID:Parth-Brahmbhatt,项目名称:storm-smoke-test,代码行数:9,代码来源:TridentConnectorUtil.java
示例8: getTridentKafkaConfig
import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public static TridentKafkaConfig getTridentKafkaConfig(Map options, MultiScheme scheme) {
String zkServers = (String) Utils.get(options, ZOOKEEPER_SERVERS, "localhost:2181") ;
String kafkaRoot = (String) Utils.get(options, KAFKA_ROOT_PATH, "/kafka");
String connectString = zkServers + kafkaRoot;
BrokerHosts hosts = new ZkHosts(connectString);
String topic = (String) Utils.get(options, TOPIC, DEFAULT_TOPIC);
String appId = (String) Utils.get(options, CLIENT_ID, "storm-app");
TridentKafkaConfig config = new TridentKafkaConfig(hosts, topic, appId);
config.scheme = scheme;
return config;
}
开发者ID:manuzhang,项目名称:storm-benchmark,代码行数:14,代码来源:KafkaUtils.java
示例9: BrokerSpout
import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public BrokerSpout(String kafkaTopic, String zookeeperHosts, String idClient, boolean forceFromStart) {
zhost = new ZkHosts(zookeeperHosts);
config = new TridentKafkaConfig(zhost, kafkaTopic); // 3er parametro
// idClient
config.forceFromStart = forceFromStart;
if (! forceFromStart){
config.startOffsetTime = -1;
}
LOG.info("BrokerSpout. zookeperHosts: " + zookeeperHosts + " topic: " + kafkaTopic + " idClient: " + idClient);
}
开发者ID:Produban,项目名称:openbus,代码行数:12,代码来源:BrokerSpout.java
示例10: BrokerSpout
import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public BrokerSpout(String kafkaTopic, String zookeeperHosts, String idClient, boolean forceFromStart) {
zhost = new ZkHosts(zookeeperHosts);
config = new TridentKafkaConfig(zhost, kafkaTopic, idClient);
config.forceFromStart = forceFromStart;
LOG.info("BrokerSpout. zookeperHosts: " + zookeeperHosts + " topic: " + kafkaTopic + " idClient: " + idClient);
}
开发者ID:Produban,项目名称:openbus,代码行数:8,代码来源:BrokerSpout.java
示例11: buildTopology
import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public static StormTopology buildTopology(Config conf, LocalDRPC localDrpc) {
TridentTopology topology = new TridentTopology();
//Kafka Spout
BrokerHosts zk = new ZkHosts(conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_NAME) + ":" +conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_PORT));
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk, (String) conf.get(CrawlerConfig.KAFKA_TOPIC_NAME));
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(kafkaConfig);
//ElasticSearch Persistent State
Settings esSettings = ImmutableSettings.settingsBuilder()
.put("storm.elasticsearch.cluster.name", conf.get(CrawlerConfig.ELASTICSEARCH_CLUSTER_NAME))
.put("storm.elasticsearch.hosts", conf.get(CrawlerConfig.ELASTICSEARCH_HOST_NAME) + ":" + conf.get(CrawlerConfig.ELASTICSEARCH_HOST_PORT))
.build();
StateFactory esStateFactory = new ESIndexState.Factory<String>(new ClientFactory.NodeClient(esSettings.getAsMap()), String.class);
TridentState esStaticState = topology.newStaticState(esStateFactory);
//Topology
topology.newStream("crawlKafkaSpout", spout).parallelismHint(5)
//Splits url and depth information on receiving from Kafka
.each(new Fields("str"), new SplitKafkaInput(), new Fields("url", "depth"))
//Bloom Filter. Filters already crawled URLs
.each(new Fields("url"), new URLFilter())
//Download and Parse Webpage
.each(new Fields("url"), new GetAdFreeWebPage(), new Fields("content_html", "title", "href"))//TODO Optimize
//Add Href URls to Kafka queue
.each(new Fields("href", "depth"), new KafkaProducerFilter())//TODO Replace with kafka persistent state.
//Insert to Elasticsearch
.each(new Fields("url", "content_html", "title"), new PrepareForElasticSearch(), new Fields("index", "type", "id", "source"))
.partitionPersist(esStateFactory, new Fields("index", "type", "id", "source"), new ESIndexUpdater<String>(new ESTridentTupleMapper()))
;
//DRPC
topology.newDRPCStream("search", localDrpc)
.each(new Fields("args"), new SplitDRPCArgs(), new Fields("query_input"))
.each(new Fields("query_input"), new BingAutoSuggest(0), new Fields("query_preProcessed"))//TODO return List of expanded query
.each(new Fields("query_preProcessed"), new PrepareSearchQuery(), new Fields("query", "indices", "types"))
.groupBy(new Fields("query", "indices", "types"))
.stateQuery(esStaticState, new Fields("query", "indices", "types"), new QuerySearchIndexQuery(), new Fields("results"))
;
return topology.build();
}
开发者ID:skalmadka,项目名称:web-crawler,代码行数:44,代码来源:WebCrawlerTopology.java
示例12: buildTopology
import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public static StormTopology buildTopology(Config conf, LocalDRPC localDrpc) {
TridentTopology topology = new TridentTopology();
//Kafka Spout
BrokerHosts zk = new ZkHosts(conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_NAME) + ":" +conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_PORT));
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk, (String) conf.get(CrawlerConfig.KAFKA_TOPIC_NAME));
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//kafkaConfig.ignoreZkOffsets=true;
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(kafkaConfig);
//ElasticSearch Persistent State
Settings esSettings = ImmutableSettings.settingsBuilder()
.put("storm.elasticsearch.cluster.name", conf.get(CrawlerConfig.ELASTICSEARCH_CLUSTER_NAME))
.put("storm.elasticsearch.hosts", conf.get(CrawlerConfig.ELASTICSEARCH_HOST_NAME) + ":" + conf.get(CrawlerConfig.ELASTICSEARCH_HOST_PORT))
.build();
StateFactory esStateFactory = new ESIndexState.Factory<JSONObject>(new ClientFactory.NodeClient(esSettings.getAsMap()), JSONObject.class);
TridentState esStaticState = topology.newStaticState(esStateFactory);
//Topology
topology.newStream("crawlKafkaSpout", spout).parallelismHint(5)
//Splits words on receiving from Kafka
.each(new Fields("str"), new SplitFunction(), new Fields("url", "depth", "task", "user"))
.each(new Fields("str"), new PrintFilter("Kafka"))
//Bloom Filter, Filters already crawled URLs
.each(new Fields("url", "task"), new URLFilter())
//Download and Parse Webpage
.each(new Fields("url"), new GetAdFreeWebPage(), new Fields("content_html", "title", "href"))
//Sending URLs present in the page into the kafka queue.
.each(new Fields("href", "depth", "task", "user"), new KafkaProducerFilter())
//Insert to Elasticsearch
.each(new Fields("url", "content_html", "title", "task", "user"), new PrepareForElasticSearch(), new Fields("index", "type", "id", "source"))
.partitionPersist(esStateFactory, new Fields("index", "type", "id", "source"), new ESIndexUpdater<String>(new ESTridentTupleMapper()), new Fields())
;
//DRPC
topology.newDRPCStream("search", localDrpc)
.each(new Fields("args"), new SplitDRPCArgs(), new Fields("query_input", "task"))
.each(new Fields("query_input"), new BingAutoSuggest(0), new Fields("query_preProcessed"))
.each(new Fields("query_preProcessed", "task"), new PrepareSearchQuery(), new Fields("query", "indices", "types"))
.groupBy(new Fields("query", "indices", "types"))
.stateQuery(esStaticState, new Fields("query", "indices", "types"), new QuerySearchIndexQuery(), new Fields("results"))
;
return topology.build();
}
开发者ID:preems,项目名称:realtime-event-processing,代码行数:46,代码来源:URLEventProcessingTopology.java
示例13: build
import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public OpaqueTridentKafkaSpout build() {
BrokerHosts zk = new ZkHosts(zookeeper);
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, topic);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
return new OpaqueTridentKafkaSpout(spoutConf);
}
开发者ID:dzikowski,项目名称:simple-kafka-storm-java,代码行数:7,代码来源:OpaqueTridentKafkaSpoutBuilder.java
示例14: testTweetSpout
import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public static TransactionalTridentKafkaSpout testTweetSpout(BrokerHosts hosts) {
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, "test", "storm");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
return new TransactionalTridentKafkaSpout(kafkaConfig);
}
开发者ID:eshioji,项目名称:trident-tutorial,代码行数:6,代码来源:TestUtils.java
示例15: BrokerSpout
import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public BrokerSpout(String kafkaTopic, String zookeeperHosts, String idClient, boolean forceFromStart) {
zhost = new ZkHosts(zookeeperHosts);
config = new TridentKafkaConfig(zhost, kafkaTopic); //3er parametro idClient
config.forceFromStart = forceFromStart;
config.startOffsetTime=-2;
LOG.info("BrokerSpout. zookeperHosts: " + zookeeperHosts + " topic: " + kafkaTopic + " idClient: " + idClient);
}
开发者ID:Produban,项目名称:openbus,代码行数:11,代码来源:BrokerSpout.java
注:本文中的storm.kafka.trident.TridentKafkaConfig类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论