本文整理汇总了Java中storm.kafka.trident.OpaqueTridentKafkaSpout类的典型用法代码示例。如果您正苦于以下问题:Java OpaqueTridentKafkaSpout类的具体用法?Java OpaqueTridentKafkaSpout怎么用?Java OpaqueTridentKafkaSpout使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
OpaqueTridentKafkaSpout类属于storm.kafka.trident包,在下文中一共展示了OpaqueTridentKafkaSpout类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: buildTopology
import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
public static StormTopology buildTopology(Config conf, LocalDRPC drpc) {
TridentTopology topology = new TridentTopology();
//Kafka Spout
BrokerHosts zk = new ZkHosts(conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_NAME) + ":" +conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_PORT));
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk, (String) conf.get(CrawlerConfig.KAFKA_TOPIC_DOCUMENT_NAME));
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(kafkaConfig);
//ElasticSearch Persistent State
Settings esSettings = ImmutableSettings.settingsBuilder()
.put("storm.elasticsearch.cluster.name", conf.get(CrawlerConfig.ELASTICSEARCH_CLUSTER_NAME))
.put("storm.elasticsearch.hosts", conf.get(CrawlerConfig.ELASTICSEARCH_HOST_NAME) + ":" + conf.get(CrawlerConfig.ELASTICSEARCH_HOST_PORT))
.build();
StateFactory esStateFactory = new ESIndexState.Factory<JSONObject>(new ClientFactory.NodeClient(esSettings.getAsMap()), JSONObject.class);
TridentState esStaticState = topology.newStaticState(esStateFactory);
String esIndex = (String)(conf.get(CrawlerConfig.ELASTICSEARCH_INDEX_NAME));
topology.newStream("docstream",spout)
.each( new Fields("str"), new SplitDocStreamArgs(), new Fields("filename", "task", "user", "content"))
.each( new Fields("filename", "task", "user"), new PrintFilter("Kafka"))
.each( new Fields("filename","task","user","content"), new PrepareDocForElasticSearch(), new Fields("index","type","id","source") )
.partitionPersist(esStateFactory, new Fields("index","type","id","source"), new ESIndexUpdater<String>(new ESTridentTupleMapper()), new Fields());
return topology.build();
}
开发者ID:preems,项目名称:realtime-event-processing,代码行数:28,代码来源:DocEventProcessingTopology.java
示例2: getTridentKafkaEmitter
import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
public static OpaqueTridentKafkaSpout getTridentKafkaEmitter(String zkConnString, String topicName, Map topologyConfig) {
BrokerHosts hosts = new ZkHosts(zkConnString);
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, topicName);
//topologyConfig.put("topology.spout.max.batch.size", 1);
//kafkaConfig.forceFromStart = true;
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
return new OpaqueTridentKafkaSpout(kafkaConfig);
}
开发者ID:Parth-Brahmbhatt,项目名称:storm-smoke-test,代码行数:9,代码来源:TridentConnectorUtil.java
示例3: main
import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
public static void main(String... args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
// starting to build topology
TridentTopology topology = new TridentTopology();
// Kafka as an opaque trident spout
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpoutBuilder(Conf.zookeeper, Conf.inputTopic).build();
Stream stream = topology.newStream(kafkaSpout, spout);
// mapping transaction messages to pairs: (person,amount)
Stream atomicTransactions = stream.each(strF, Functions.mapToPersonAmount, personAmountF);
// bolt to println data
atomicTransactions.each(personAmountF, Functions.printlnFunction, emptyF);
// aggregating transactions and mapping to Kafka messages
Stream transactionsGroupped = atomicTransactions.groupBy(personF)
.persistentAggregate(new MemoryMapState.Factory(), amountF, new Sum(), sumF).newValuesStream()
.each(personSumF, Functions.mapToKafkaMessage, keyMessageF);
// Kafka as a bolt -- producing to outputTopic
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() //
.withKafkaTopicSelector(new DefaultTopicSelector(Conf.outputTopic)) //
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<String, String>(key, message));
transactionsGroupped.partitionPersist(stateFactory, keyMessageF, new TridentKafkaUpdater(), emptyF);
// submitting topology to local cluster
new LocalCluster().submitTopology(kafkaAccountsTopology, topologyConfig(), topology.build());
// waiting a while, then running Kafka producer
Sleep.seconds(5);
KafkaProduceExample.start(20);
}
开发者ID:dzikowski,项目名称:simple-kafka-storm-java,代码行数:35,代码来源:KafkaStormTridentExample.java
示例4: buildTopology
import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
public static StormTopology buildTopology(Config conf, LocalDRPC localDrpc) {
TridentTopology topology = new TridentTopology();
//Kafka Spout
BrokerHosts zk = new ZkHosts(conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_NAME) + ":" +conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_PORT));
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk, (String) conf.get(CrawlerConfig.KAFKA_TOPIC_NAME));
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(kafkaConfig);
//ElasticSearch Persistent State
Settings esSettings = ImmutableSettings.settingsBuilder()
.put("storm.elasticsearch.cluster.name", conf.get(CrawlerConfig.ELASTICSEARCH_CLUSTER_NAME))
.put("storm.elasticsearch.hosts", conf.get(CrawlerConfig.ELASTICSEARCH_HOST_NAME) + ":" + conf.get(CrawlerConfig.ELASTICSEARCH_HOST_PORT))
.build();
StateFactory esStateFactory = new ESIndexState.Factory<String>(new ClientFactory.NodeClient(esSettings.getAsMap()), String.class);
TridentState esStaticState = topology.newStaticState(esStateFactory);
//Topology
topology.newStream("crawlKafkaSpout", spout).parallelismHint(5)
//Splits url and depth information on receiving from Kafka
.each(new Fields("str"), new SplitKafkaInput(), new Fields("url", "depth"))
//Bloom Filter. Filters already crawled URLs
.each(new Fields("url"), new URLFilter())
//Download and Parse Webpage
.each(new Fields("url"), new GetAdFreeWebPage(), new Fields("content_html", "title", "href"))//TODO Optimize
//Add Href URls to Kafka queue
.each(new Fields("href", "depth"), new KafkaProducerFilter())//TODO Replace with kafka persistent state.
//Insert to Elasticsearch
.each(new Fields("url", "content_html", "title"), new PrepareForElasticSearch(), new Fields("index", "type", "id", "source"))
.partitionPersist(esStateFactory, new Fields("index", "type", "id", "source"), new ESIndexUpdater<String>(new ESTridentTupleMapper()))
;
//DRPC
topology.newDRPCStream("search", localDrpc)
.each(new Fields("args"), new SplitDRPCArgs(), new Fields("query_input"))
.each(new Fields("query_input"), new BingAutoSuggest(0), new Fields("query_preProcessed"))//TODO return List of expanded query
.each(new Fields("query_preProcessed"), new PrepareSearchQuery(), new Fields("query", "indices", "types"))
.groupBy(new Fields("query", "indices", "types"))
.stateQuery(esStaticState, new Fields("query", "indices", "types"), new QuerySearchIndexQuery(), new Fields("results"))
;
return topology.build();
}
开发者ID:skalmadka,项目名称:web-crawler,代码行数:44,代码来源:WebCrawlerTopology.java
示例5: buildTopology
import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
public static StormTopology buildTopology(Config conf, LocalDRPC localDrpc) {
TridentTopology topology = new TridentTopology();
//Kafka Spout
BrokerHosts zk = new ZkHosts(conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_NAME) + ":" +conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_PORT));
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk, (String) conf.get(CrawlerConfig.KAFKA_TOPIC_NAME));
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//kafkaConfig.ignoreZkOffsets=true;
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(kafkaConfig);
//ElasticSearch Persistent State
Settings esSettings = ImmutableSettings.settingsBuilder()
.put("storm.elasticsearch.cluster.name", conf.get(CrawlerConfig.ELASTICSEARCH_CLUSTER_NAME))
.put("storm.elasticsearch.hosts", conf.get(CrawlerConfig.ELASTICSEARCH_HOST_NAME) + ":" + conf.get(CrawlerConfig.ELASTICSEARCH_HOST_PORT))
.build();
StateFactory esStateFactory = new ESIndexState.Factory<JSONObject>(new ClientFactory.NodeClient(esSettings.getAsMap()), JSONObject.class);
TridentState esStaticState = topology.newStaticState(esStateFactory);
//Topology
topology.newStream("crawlKafkaSpout", spout).parallelismHint(5)
//Splits words on receiving from Kafka
.each(new Fields("str"), new SplitFunction(), new Fields("url", "depth", "task", "user"))
.each(new Fields("str"), new PrintFilter("Kafka"))
//Bloom Filter, Filters already crawled URLs
.each(new Fields("url", "task"), new URLFilter())
//Download and Parse Webpage
.each(new Fields("url"), new GetAdFreeWebPage(), new Fields("content_html", "title", "href"))
//Sending URLs present in the page into the kafka queue.
.each(new Fields("href", "depth", "task", "user"), new KafkaProducerFilter())
//Insert to Elasticsearch
.each(new Fields("url", "content_html", "title", "task", "user"), new PrepareForElasticSearch(), new Fields("index", "type", "id", "source"))
.partitionPersist(esStateFactory, new Fields("index", "type", "id", "source"), new ESIndexUpdater<String>(new ESTridentTupleMapper()), new Fields())
;
//DRPC
topology.newDRPCStream("search", localDrpc)
.each(new Fields("args"), new SplitDRPCArgs(), new Fields("query_input", "task"))
.each(new Fields("query_input"), new BingAutoSuggest(0), new Fields("query_preProcessed"))
.each(new Fields("query_preProcessed", "task"), new PrepareSearchQuery(), new Fields("query", "indices", "types"))
.groupBy(new Fields("query", "indices", "types"))
.stateQuery(esStaticState, new Fields("query", "indices", "types"), new QuerySearchIndexQuery(), new Fields("results"))
;
return topology.build();
}
开发者ID:preems,项目名称:realtime-event-processing,代码行数:46,代码来源:URLEventProcessingTopology.java
示例6: build
import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
public OpaqueTridentKafkaSpout build() {
BrokerHosts zk = new ZkHosts(zookeeper);
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, topic);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
return new OpaqueTridentKafkaSpout(spoutConf);
}
开发者ID:dzikowski,项目名称:simple-kafka-storm-java,代码行数:7,代码来源:OpaqueTridentKafkaSpoutBuilder.java
示例7: getOpaquePartitionedTridentSpout
import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
@SuppressWarnings("rawtypes")
public IOpaquePartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> getOpaquePartitionedTridentSpout() {
return new OpaqueTridentKafkaSpout(config);
}
开发者ID:Produban,项目名称:openbus,代码行数:5,代码来源:BrokerSpout.java
示例8: getOpaquePartitionedTridentSpout
import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
@SuppressWarnings("rawtypes")
public IOpaquePartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> getOpaquePartitionedTridentSpout() {
return new OpaqueTridentKafkaSpout(config);
}
开发者ID:Produban,项目名称:openbus,代码行数:5,代码来源:BrokerSpout.java
示例9: getOpaquePartitionedTridentSpout
import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
public IOpaquePartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> getOpaquePartitionedTridentSpout() {
return new OpaqueTridentKafkaSpout(config);
}
开发者ID:Produban,项目名称:openbus,代码行数:4,代码来源:BrokerSpout.java
示例10: builder
import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
/**
* Build the tridentKafkaSpout.
*
* @return Trident spout of kafka.
*/
public OpaqueTridentKafkaSpout builder() {
return new OpaqueTridentKafkaSpout(_kafkaConfig);
}
开发者ID:redBorder,项目名称:rb-bi,代码行数:9,代码来源:TridentKafkaSpout.java
注:本文中的storm.kafka.trident.OpaqueTridentKafkaSpout类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论