• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java OpaqueTridentKafkaSpout类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中storm.kafka.trident.OpaqueTridentKafkaSpout的典型用法代码示例。如果您正苦于以下问题:Java OpaqueTridentKafkaSpout类的具体用法?Java OpaqueTridentKafkaSpout怎么用?Java OpaqueTridentKafkaSpout使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



OpaqueTridentKafkaSpout类属于storm.kafka.trident包,在下文中一共展示了OpaqueTridentKafkaSpout类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: buildTopology

import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
public static StormTopology buildTopology(Config conf, LocalDRPC drpc) {

        TridentTopology topology = new TridentTopology();

        //Kafka Spout
        BrokerHosts zk = new ZkHosts(conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_NAME) + ":" +conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_PORT));
        TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk, (String) conf.get(CrawlerConfig.KAFKA_TOPIC_DOCUMENT_NAME));
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(kafkaConfig);

        //ElasticSearch Persistent State
        Settings esSettings = ImmutableSettings.settingsBuilder()
                .put("storm.elasticsearch.cluster.name", conf.get(CrawlerConfig.ELASTICSEARCH_CLUSTER_NAME))
                .put("storm.elasticsearch.hosts", conf.get(CrawlerConfig.ELASTICSEARCH_HOST_NAME) + ":" + conf.get(CrawlerConfig.ELASTICSEARCH_HOST_PORT))
                .build();
        StateFactory esStateFactory = new ESIndexState.Factory<JSONObject>(new ClientFactory.NodeClient(esSettings.getAsMap()), JSONObject.class);
        TridentState esStaticState = topology.newStaticState(esStateFactory);

        String esIndex = (String)(conf.get(CrawlerConfig.ELASTICSEARCH_INDEX_NAME));
        topology.newStream("docstream",spout)
                .each( new Fields("str"), new SplitDocStreamArgs(), new Fields("filename", "task", "user", "content"))
                .each( new Fields("filename", "task", "user"), new PrintFilter("Kafka"))
                .each( new Fields("filename","task","user","content"), new PrepareDocForElasticSearch(), new Fields("index","type","id","source") )
                .partitionPersist(esStateFactory, new Fields("index","type","id","source"), new ESIndexUpdater<String>(new ESTridentTupleMapper()), new Fields());

        return topology.build();
    }
 
开发者ID:preems,项目名称:realtime-event-processing,代码行数:28,代码来源:DocEventProcessingTopology.java


示例2: getTridentKafkaEmitter

import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
public static OpaqueTridentKafkaSpout getTridentKafkaEmitter(String zkConnString, String topicName, Map topologyConfig) {
    BrokerHosts hosts = new ZkHosts(zkConnString);
    TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, topicName);
    //topologyConfig.put("topology.spout.max.batch.size", 1);
    //kafkaConfig.forceFromStart = true;
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    return new OpaqueTridentKafkaSpout(kafkaConfig);
}
 
开发者ID:Parth-Brahmbhatt,项目名称:storm-smoke-test,代码行数:9,代码来源:TridentConnectorUtil.java


示例3: main

import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
public static void main(String... args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {

		// starting to build topology
		TridentTopology topology = new TridentTopology();

		// Kafka as an opaque trident spout
		OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpoutBuilder(Conf.zookeeper, Conf.inputTopic).build();
		Stream stream = topology.newStream(kafkaSpout, spout);

		// mapping transaction messages to pairs: (person,amount)
		Stream atomicTransactions = stream.each(strF, Functions.mapToPersonAmount, personAmountF);

		// bolt to println data
		atomicTransactions.each(personAmountF, Functions.printlnFunction, emptyF);

		// aggregating transactions and mapping to Kafka messages
		Stream transactionsGroupped = atomicTransactions.groupBy(personF)
				.persistentAggregate(new MemoryMapState.Factory(), amountF, new Sum(), sumF).newValuesStream()
				.each(personSumF, Functions.mapToKafkaMessage, keyMessageF);

		// Kafka as a bolt -- producing to outputTopic
		TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() //
				.withKafkaTopicSelector(new DefaultTopicSelector(Conf.outputTopic)) //
				.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<String, String>(key, message));
		transactionsGroupped.partitionPersist(stateFactory, keyMessageF, new TridentKafkaUpdater(), emptyF);

		// submitting topology to local cluster
		new LocalCluster().submitTopology(kafkaAccountsTopology, topologyConfig(), topology.build());

		// waiting a while, then running Kafka producer
		Sleep.seconds(5);
		KafkaProduceExample.start(20);

	}
 
开发者ID:dzikowski,项目名称:simple-kafka-storm-java,代码行数:35,代码来源:KafkaStormTridentExample.java


示例4: buildTopology

import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
public static StormTopology buildTopology(Config conf, LocalDRPC localDrpc) {
    TridentTopology topology = new TridentTopology();

    //Kafka Spout
    BrokerHosts zk = new ZkHosts(conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_NAME) + ":" +conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_PORT));
    TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk, (String) conf.get(CrawlerConfig.KAFKA_TOPIC_NAME));
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(kafkaConfig);

    //ElasticSearch Persistent State
    Settings esSettings = ImmutableSettings.settingsBuilder()
            .put("storm.elasticsearch.cluster.name", conf.get(CrawlerConfig.ELASTICSEARCH_CLUSTER_NAME))
            .put("storm.elasticsearch.hosts", conf.get(CrawlerConfig.ELASTICSEARCH_HOST_NAME) + ":" + conf.get(CrawlerConfig.ELASTICSEARCH_HOST_PORT))
            .build();
    StateFactory esStateFactory = new ESIndexState.Factory<String>(new ClientFactory.NodeClient(esSettings.getAsMap()), String.class);
    TridentState esStaticState = topology.newStaticState(esStateFactory);

    //Topology
    topology.newStream("crawlKafkaSpout", spout).parallelismHint(5)
             //Splits url and depth information on receiving from Kafka
            .each(new Fields("str"), new SplitKafkaInput(), new Fields("url", "depth"))
            //Bloom Filter. Filters already crawled URLs
            .each(new Fields("url"), new URLFilter())
            //Download and Parse Webpage
            .each(new Fields("url"), new GetAdFreeWebPage(), new Fields("content_html", "title", "href"))//TODO Optimize
            //Add Href URls to Kafka queue
            .each(new Fields("href", "depth"), new KafkaProducerFilter())//TODO Replace with kafka persistent state.
            //Insert to Elasticsearch
            .each(new Fields("url", "content_html", "title"), new PrepareForElasticSearch(), new Fields("index", "type", "id", "source"))
            .partitionPersist(esStateFactory, new Fields("index", "type", "id", "source"), new ESIndexUpdater<String>(new ESTridentTupleMapper()))
    ;

    //DRPC
    topology.newDRPCStream("search", localDrpc)
            .each(new Fields("args"), new SplitDRPCArgs(), new Fields("query_input"))
            .each(new Fields("query_input"), new BingAutoSuggest(0), new Fields("query_preProcessed"))//TODO return List of expanded query
            .each(new Fields("query_preProcessed"), new PrepareSearchQuery(), new Fields("query", "indices", "types"))
            .groupBy(new Fields("query", "indices", "types"))
            .stateQuery(esStaticState, new Fields("query", "indices", "types"), new QuerySearchIndexQuery(), new Fields("results"))
    ;

    return topology.build();
}
 
开发者ID:skalmadka,项目名称:web-crawler,代码行数:44,代码来源:WebCrawlerTopology.java


示例5: buildTopology

import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
public static StormTopology buildTopology(Config conf, LocalDRPC localDrpc) {
    TridentTopology topology = new TridentTopology();

    //Kafka Spout
    BrokerHosts zk = new ZkHosts(conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_NAME) + ":" +conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_PORT));
    TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk, (String) conf.get(CrawlerConfig.KAFKA_TOPIC_NAME));
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    //kafkaConfig.ignoreZkOffsets=true;
    OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(kafkaConfig);

    //ElasticSearch Persistent State
    Settings esSettings = ImmutableSettings.settingsBuilder()
            .put("storm.elasticsearch.cluster.name", conf.get(CrawlerConfig.ELASTICSEARCH_CLUSTER_NAME))
            .put("storm.elasticsearch.hosts", conf.get(CrawlerConfig.ELASTICSEARCH_HOST_NAME) + ":" + conf.get(CrawlerConfig.ELASTICSEARCH_HOST_PORT))
            .build();
    StateFactory esStateFactory = new ESIndexState.Factory<JSONObject>(new ClientFactory.NodeClient(esSettings.getAsMap()), JSONObject.class);
    TridentState esStaticState = topology.newStaticState(esStateFactory);

    //Topology
    topology.newStream("crawlKafkaSpout", spout).parallelismHint(5)
            //Splits words on receiving from Kafka
            .each(new Fields("str"), new SplitFunction(), new Fields("url", "depth", "task", "user"))
            .each(new Fields("str"), new PrintFilter("Kafka"))
            //Bloom Filter, Filters already crawled URLs
            .each(new Fields("url", "task"), new URLFilter())
            //Download and Parse Webpage
            .each(new Fields("url"), new GetAdFreeWebPage(), new Fields("content_html", "title", "href"))
            //Sending URLs present in the page into the kafka queue.
            .each(new Fields("href", "depth", "task", "user"), new KafkaProducerFilter())
            //Insert to Elasticsearch
            .each(new Fields("url", "content_html", "title", "task", "user"), new PrepareForElasticSearch(), new Fields("index", "type", "id", "source"))
            .partitionPersist(esStateFactory, new Fields("index", "type", "id", "source"), new ESIndexUpdater<String>(new ESTridentTupleMapper()), new Fields())
            ;

    //DRPC
    topology.newDRPCStream("search", localDrpc)
            .each(new Fields("args"), new SplitDRPCArgs(), new Fields("query_input", "task"))
            .each(new Fields("query_input"), new BingAutoSuggest(0), new Fields("query_preProcessed"))
            .each(new Fields("query_preProcessed", "task"), new PrepareSearchQuery(), new Fields("query", "indices", "types"))
            .groupBy(new Fields("query", "indices", "types"))
            .stateQuery(esStaticState, new Fields("query", "indices", "types"), new QuerySearchIndexQuery(), new Fields("results"))
            ;

    return topology.build();
}
 
开发者ID:preems,项目名称:realtime-event-processing,代码行数:46,代码来源:URLEventProcessingTopology.java


示例6: build

import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
public OpaqueTridentKafkaSpout build() {
	BrokerHosts zk = new ZkHosts(zookeeper);
	TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, topic);
	spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
	return new OpaqueTridentKafkaSpout(spoutConf);
}
 
开发者ID:dzikowski,项目名称:simple-kafka-storm-java,代码行数:7,代码来源:OpaqueTridentKafkaSpoutBuilder.java


示例7: getOpaquePartitionedTridentSpout

import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
@SuppressWarnings("rawtypes")
public IOpaquePartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> getOpaquePartitionedTridentSpout() {
       return new OpaqueTridentKafkaSpout(config);
   }
 
开发者ID:Produban,项目名称:openbus,代码行数:5,代码来源:BrokerSpout.java


示例8: getOpaquePartitionedTridentSpout

import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
@SuppressWarnings("rawtypes")
   public IOpaquePartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> getOpaquePartitionedTridentSpout() {
return new OpaqueTridentKafkaSpout(config);
   }
 
开发者ID:Produban,项目名称:openbus,代码行数:5,代码来源:BrokerSpout.java


示例9: getOpaquePartitionedTridentSpout

import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
public IOpaquePartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> getOpaquePartitionedTridentSpout() {
    return new OpaqueTridentKafkaSpout(config);
}
 
开发者ID:Produban,项目名称:openbus,代码行数:4,代码来源:BrokerSpout.java


示例10: builder

import storm.kafka.trident.OpaqueTridentKafkaSpout; //导入依赖的package包/类
/**
 * Build the tridentKafkaSpout.
 *
 * @return Trident spout of kafka.
 */
public OpaqueTridentKafkaSpout builder() {
    return new OpaqueTridentKafkaSpout(_kafkaConfig);
}
 
开发者ID:redBorder,项目名称:rb-bi,代码行数:9,代码来源:TridentKafkaSpout.java



注:本文中的storm.kafka.trident.OpaqueTridentKafkaSpout类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java EntityWitherSkull类代码示例发布时间:2022-05-22
下一篇:
Java CodepageDetectorProxy类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap