• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java TridentKafkaConfig类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中storm.kafka.trident.TridentKafkaConfig的典型用法代码示例。如果您正苦于以下问题:Java TridentKafkaConfig类的具体用法?Java TridentKafkaConfig怎么用?Java TridentKafkaConfig使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



TridentKafkaConfig类属于storm.kafka.trident包,在下文中一共展示了TridentKafkaConfig类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: buildTopology

import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public static StormTopology buildTopology(String hdfsUrl) {
    TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(new ZkHosts(ZKHOST, "/brokers"), KAFKA_TOPIC);
    tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new RawScheme());
    tridentKafkaConfig.startOffsetTime = -1; // forceStartOffsetTime(-1); //Read latest messages from Kafka

    TransactionalTridentKafkaSpout tridentKafkaSpout = new TransactionalTridentKafkaSpout(tridentKafkaConfig);

    TridentTopology topology = new TridentTopology();

    Stream stream = topology.newStream("stream", tridentKafkaSpout);

    FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(HDFS_OUT_PATH).withPrefix("trident").withExtension(".txt");
    FileRotationPolicy rotationPolicy = new FileSizeCountRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB, 10);
    HdfsState.Options seqOpts = new HdfsState.HdfsFileOptions().withFileNameFormat(fileNameFormat)
            .withRecordFormat(new DelimitedRecordFormat().withFieldDelimiter("|").withFields(new Fields("json")))
            .withRotationPolicy(rotationPolicy).withFsUrl(hdfsUrl)
            // .addRotationAction(new MoveFileAction().toDestination(HDFS_ROTATE_PATH));
            // .addRotationAction(new AddSuffixFileAction().withSuffix("-processed"));
            .addRotationAction(new MD5FileAction());
    StateFactory factory = new HdfsStateFactory().withOptions(seqOpts);

    stream.each(new Fields("bytes"), new JacksonJsonParser(), new Fields("json")).partitionPersist(factory, new Fields("json"),
            new HdfsUpdater(), new Fields());

    return topology.build();
}
 
开发者ID:ogidogi,项目名称:kafka-storm-hive,代码行数:27,代码来源:HDFSSequenceTopology.java


示例2: TridentKafkaSpout

import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
/**
 * Constructor
 *
 * @param config Config file to read properties from
 * @param section Section of the kafka config file to read properties from.
 */
public TridentKafkaSpout(ConfigData config, String section) {
    _kafkaConfig = new TridentKafkaConfig(new ZkHosts(config.getZkHost()), config.getTopic(section), "stormKafka");
    _kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    _kafkaConfig.bufferSizeBytes = config.getFetchSizeKafka();
    _kafkaConfig.fetchSizeBytes = config.getFetchSizeKafka();
    _kafkaConfig.forceFromStart = false;
}
 
开发者ID:redBorder,项目名称:rb-bi,代码行数:14,代码来源:TridentKafkaSpout.java


示例3: TridentKafkaSpoutNmsp

import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
/**
 * Constructor
 *
 * @param config Config file to read properties from
 * @param section Section of the kafka config file to read properties from.
 */
public TridentKafkaSpoutNmsp(ConfigData config, String section) {
    _kafkaConfig = new TridentKafkaConfig(new ZkHosts(config.getZkHost()), config.getTopic(section), "stormKafka");
    _kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    _kafkaConfig.bufferSizeBytes = config.getFetchSizeKafkaNmsp();
    _kafkaConfig.fetchSizeBytes = config.getFetchSizeKafkaNmsp();
    _kafkaConfig.forceFromStart = false;
}
 
开发者ID:redBorder,项目名称:rb-bi,代码行数:14,代码来源:TridentKafkaSpoutNmsp.java


示例4: TridentKafkaSpoutLocation

import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
/**
 * Constructor
 *
 * @param config Config file to read properties from
 * @param section Section of the kafka config file to read properties from.
 */
public TridentKafkaSpoutLocation(ConfigData config, String section) {
    _kafkaConfig = new TridentKafkaConfig(new ZkHosts(config.getZkHost()), config.getTopic(section), "stormKafka");
    _kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    _kafkaConfig.bufferSizeBytes = config.getFetchSizeKafkaLocation();
    _kafkaConfig.fetchSizeBytes = config.getFetchSizeKafkaLocation();
    _kafkaConfig.forceFromStart = false;
}
 
开发者ID:redBorder,项目名称:rb-bi,代码行数:14,代码来源:TridentKafkaSpoutLocation.java


示例5: createKafkaSpout

import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
/**
 * Creates a transactional kafka spout that consumes any new data published to "test" topic.
 * <p/>
 * For more info on transactional spouts
 * see "Transactional spouts" section in
 * <a href="https://storm.apache.org/documentation/Trident-state"> Trident state</a> doc.
 *
 * @return a transactional trident kafka spout.
 */
private TransactionalTridentKafkaSpout createKafkaSpout() {
    ZkHosts hosts = new ZkHosts(zkUrl);
    TridentKafkaConfig config = new TridentKafkaConfig(hosts, KAFKA_TOPIC);
    config.scheme = new SchemeAsMultiScheme(new StringScheme());

    // Consume new data from the topic
    config.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
    return new TransactionalTridentKafkaSpout(config);
}
 
开发者ID:desp0916,项目名称:LearnStorm,代码行数:19,代码来源:TridentKafkaWordCount.java


示例6: buildTopology

import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public static StormTopology buildTopology(Config conf, LocalDRPC drpc) {

        TridentTopology topology = new TridentTopology();

        //Kafka Spout
        BrokerHosts zk = new ZkHosts(conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_NAME) + ":" +conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_PORT));
        TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk, (String) conf.get(CrawlerConfig.KAFKA_TOPIC_DOCUMENT_NAME));
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(kafkaConfig);

        //ElasticSearch Persistent State
        Settings esSettings = ImmutableSettings.settingsBuilder()
                .put("storm.elasticsearch.cluster.name", conf.get(CrawlerConfig.ELASTICSEARCH_CLUSTER_NAME))
                .put("storm.elasticsearch.hosts", conf.get(CrawlerConfig.ELASTICSEARCH_HOST_NAME) + ":" + conf.get(CrawlerConfig.ELASTICSEARCH_HOST_PORT))
                .build();
        StateFactory esStateFactory = new ESIndexState.Factory<JSONObject>(new ClientFactory.NodeClient(esSettings.getAsMap()), JSONObject.class);
        TridentState esStaticState = topology.newStaticState(esStateFactory);

        String esIndex = (String)(conf.get(CrawlerConfig.ELASTICSEARCH_INDEX_NAME));
        topology.newStream("docstream",spout)
                .each( new Fields("str"), new SplitDocStreamArgs(), new Fields("filename", "task", "user", "content"))
                .each( new Fields("filename", "task", "user"), new PrintFilter("Kafka"))
                .each( new Fields("filename","task","user","content"), new PrepareDocForElasticSearch(), new Fields("index","type","id","source") )
                .partitionPersist(esStateFactory, new Fields("index","type","id","source"), new ESIndexUpdater<String>(new ESTridentTupleMapper()), new Fields());

        return topology.build();
    }
 
开发者ID:preems,项目名称:realtime-event-processing,代码行数:28,代码来源:DocEventProcessingTopology.java


示例7: getTridentKafkaEmitter

import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public static OpaqueTridentKafkaSpout getTridentKafkaEmitter(String zkConnString, String topicName, Map topologyConfig) {
    BrokerHosts hosts = new ZkHosts(zkConnString);
    TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, topicName);
    //topologyConfig.put("topology.spout.max.batch.size", 1);
    //kafkaConfig.forceFromStart = true;
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    return new OpaqueTridentKafkaSpout(kafkaConfig);
}
 
开发者ID:Parth-Brahmbhatt,项目名称:storm-smoke-test,代码行数:9,代码来源:TridentConnectorUtil.java


示例8: getTridentKafkaConfig

import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public static TridentKafkaConfig getTridentKafkaConfig(Map options, MultiScheme scheme) {
  String zkServers = (String) Utils.get(options, ZOOKEEPER_SERVERS, "localhost:2181") ;
  String kafkaRoot = (String) Utils.get(options, KAFKA_ROOT_PATH, "/kafka");
  String connectString = zkServers + kafkaRoot;

  BrokerHosts hosts = new ZkHosts(connectString);
  String topic = (String) Utils.get(options, TOPIC, DEFAULT_TOPIC);
  String appId = (String) Utils.get(options, CLIENT_ID, "storm-app");

  TridentKafkaConfig config = new TridentKafkaConfig(hosts, topic, appId);
  config.scheme = scheme;
  return config;
}
 
开发者ID:manuzhang,项目名称:storm-benchmark,代码行数:14,代码来源:KafkaUtils.java


示例9: BrokerSpout

import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public BrokerSpout(String kafkaTopic, String zookeeperHosts, String idClient, boolean forceFromStart) {

	zhost = new ZkHosts(zookeeperHosts);
	config = new TridentKafkaConfig(zhost, kafkaTopic); // 3er parametro
							    // idClient
	config.forceFromStart = forceFromStart;
	if (! forceFromStart){
	    config.startOffsetTime = -1;
	}
	LOG.info("BrokerSpout. zookeperHosts: " + zookeeperHosts + " topic: " + kafkaTopic + " idClient: " + idClient);
    }
 
开发者ID:Produban,项目名称:openbus,代码行数:12,代码来源:BrokerSpout.java


示例10: BrokerSpout

import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public BrokerSpout(String kafkaTopic, String zookeeperHosts, String idClient, boolean forceFromStart) {
       zhost = new ZkHosts(zookeeperHosts);
       config = new TridentKafkaConfig(zhost, kafkaTopic, idClient);
       config.forceFromStart = forceFromStart;

       LOG.info("BrokerSpout. zookeperHosts: " + zookeeperHosts + " topic: " + kafkaTopic + " idClient: " + idClient);
}
 
开发者ID:Produban,项目名称:openbus,代码行数:8,代码来源:BrokerSpout.java


示例11: buildTopology

import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public static StormTopology buildTopology(Config conf, LocalDRPC localDrpc) {
    TridentTopology topology = new TridentTopology();

    //Kafka Spout
    BrokerHosts zk = new ZkHosts(conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_NAME) + ":" +conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_PORT));
    TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk, (String) conf.get(CrawlerConfig.KAFKA_TOPIC_NAME));
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(kafkaConfig);

    //ElasticSearch Persistent State
    Settings esSettings = ImmutableSettings.settingsBuilder()
            .put("storm.elasticsearch.cluster.name", conf.get(CrawlerConfig.ELASTICSEARCH_CLUSTER_NAME))
            .put("storm.elasticsearch.hosts", conf.get(CrawlerConfig.ELASTICSEARCH_HOST_NAME) + ":" + conf.get(CrawlerConfig.ELASTICSEARCH_HOST_PORT))
            .build();
    StateFactory esStateFactory = new ESIndexState.Factory<String>(new ClientFactory.NodeClient(esSettings.getAsMap()), String.class);
    TridentState esStaticState = topology.newStaticState(esStateFactory);

    //Topology
    topology.newStream("crawlKafkaSpout", spout).parallelismHint(5)
             //Splits url and depth information on receiving from Kafka
            .each(new Fields("str"), new SplitKafkaInput(), new Fields("url", "depth"))
            //Bloom Filter. Filters already crawled URLs
            .each(new Fields("url"), new URLFilter())
            //Download and Parse Webpage
            .each(new Fields("url"), new GetAdFreeWebPage(), new Fields("content_html", "title", "href"))//TODO Optimize
            //Add Href URls to Kafka queue
            .each(new Fields("href", "depth"), new KafkaProducerFilter())//TODO Replace with kafka persistent state.
            //Insert to Elasticsearch
            .each(new Fields("url", "content_html", "title"), new PrepareForElasticSearch(), new Fields("index", "type", "id", "source"))
            .partitionPersist(esStateFactory, new Fields("index", "type", "id", "source"), new ESIndexUpdater<String>(new ESTridentTupleMapper()))
    ;

    //DRPC
    topology.newDRPCStream("search", localDrpc)
            .each(new Fields("args"), new SplitDRPCArgs(), new Fields("query_input"))
            .each(new Fields("query_input"), new BingAutoSuggest(0), new Fields("query_preProcessed"))//TODO return List of expanded query
            .each(new Fields("query_preProcessed"), new PrepareSearchQuery(), new Fields("query", "indices", "types"))
            .groupBy(new Fields("query", "indices", "types"))
            .stateQuery(esStaticState, new Fields("query", "indices", "types"), new QuerySearchIndexQuery(), new Fields("results"))
    ;

    return topology.build();
}
 
开发者ID:skalmadka,项目名称:web-crawler,代码行数:44,代码来源:WebCrawlerTopology.java


示例12: buildTopology

import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public static StormTopology buildTopology(Config conf, LocalDRPC localDrpc) {
    TridentTopology topology = new TridentTopology();

    //Kafka Spout
    BrokerHosts zk = new ZkHosts(conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_NAME) + ":" +conf.get(CrawlerConfig.KAFKA_CONSUMER_HOST_PORT));
    TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk, (String) conf.get(CrawlerConfig.KAFKA_TOPIC_NAME));
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    //kafkaConfig.ignoreZkOffsets=true;
    OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(kafkaConfig);

    //ElasticSearch Persistent State
    Settings esSettings = ImmutableSettings.settingsBuilder()
            .put("storm.elasticsearch.cluster.name", conf.get(CrawlerConfig.ELASTICSEARCH_CLUSTER_NAME))
            .put("storm.elasticsearch.hosts", conf.get(CrawlerConfig.ELASTICSEARCH_HOST_NAME) + ":" + conf.get(CrawlerConfig.ELASTICSEARCH_HOST_PORT))
            .build();
    StateFactory esStateFactory = new ESIndexState.Factory<JSONObject>(new ClientFactory.NodeClient(esSettings.getAsMap()), JSONObject.class);
    TridentState esStaticState = topology.newStaticState(esStateFactory);

    //Topology
    topology.newStream("crawlKafkaSpout", spout).parallelismHint(5)
            //Splits words on receiving from Kafka
            .each(new Fields("str"), new SplitFunction(), new Fields("url", "depth", "task", "user"))
            .each(new Fields("str"), new PrintFilter("Kafka"))
            //Bloom Filter, Filters already crawled URLs
            .each(new Fields("url", "task"), new URLFilter())
            //Download and Parse Webpage
            .each(new Fields("url"), new GetAdFreeWebPage(), new Fields("content_html", "title", "href"))
            //Sending URLs present in the page into the kafka queue.
            .each(new Fields("href", "depth", "task", "user"), new KafkaProducerFilter())
            //Insert to Elasticsearch
            .each(new Fields("url", "content_html", "title", "task", "user"), new PrepareForElasticSearch(), new Fields("index", "type", "id", "source"))
            .partitionPersist(esStateFactory, new Fields("index", "type", "id", "source"), new ESIndexUpdater<String>(new ESTridentTupleMapper()), new Fields())
            ;

    //DRPC
    topology.newDRPCStream("search", localDrpc)
            .each(new Fields("args"), new SplitDRPCArgs(), new Fields("query_input", "task"))
            .each(new Fields("query_input"), new BingAutoSuggest(0), new Fields("query_preProcessed"))
            .each(new Fields("query_preProcessed", "task"), new PrepareSearchQuery(), new Fields("query", "indices", "types"))
            .groupBy(new Fields("query", "indices", "types"))
            .stateQuery(esStaticState, new Fields("query", "indices", "types"), new QuerySearchIndexQuery(), new Fields("results"))
            ;

    return topology.build();
}
 
开发者ID:preems,项目名称:realtime-event-processing,代码行数:46,代码来源:URLEventProcessingTopology.java


示例13: build

import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public OpaqueTridentKafkaSpout build() {
	BrokerHosts zk = new ZkHosts(zookeeper);
	TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, topic);
	spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
	return new OpaqueTridentKafkaSpout(spoutConf);
}
 
开发者ID:dzikowski,项目名称:simple-kafka-storm-java,代码行数:7,代码来源:OpaqueTridentKafkaSpoutBuilder.java


示例14: testTweetSpout

import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public static TransactionalTridentKafkaSpout testTweetSpout(BrokerHosts hosts) {
    TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, "test", "storm");
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    return new TransactionalTridentKafkaSpout(kafkaConfig);
}
 
开发者ID:eshioji,项目名称:trident-tutorial,代码行数:6,代码来源:TestUtils.java


示例15: BrokerSpout

import storm.kafka.trident.TridentKafkaConfig; //导入依赖的package包/类
public BrokerSpout(String kafkaTopic, String zookeeperHosts, String idClient, boolean forceFromStart) {

        zhost = new ZkHosts(zookeeperHosts);
        config = new TridentKafkaConfig(zhost, kafkaTopic); //3er parametro idClient
        config.forceFromStart = forceFromStart;
        config.startOffsetTime=-2;
       

        LOG.info("BrokerSpout. zookeperHosts: " + zookeeperHosts + " topic: " + kafkaTopic + " idClient: " + idClient);
	}
 
开发者ID:Produban,项目名称:openbus,代码行数:11,代码来源:BrokerSpout.java



注:本文中的storm.kafka.trident.TridentKafkaConfig类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java INodeLink类代码示例发布时间:2022-05-22
下一篇:
Java MisfireException类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap