• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java KafkaSpout类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中storm.kafka.KafkaSpout的典型用法代码示例。如果您正苦于以下问题:Java KafkaSpout类的具体用法?Java KafkaSpout怎么用?Java KafkaSpout使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



KafkaSpout类属于storm.kafka包,在下文中一共展示了KafkaSpout类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: configureKafkaSpout

import storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void configureKafkaSpout(TopologyBuilder builder, String zkHostString, String kafkaTopic, 
                                       String kafkaStartOffset, int parallelismHint, String spoutName,
                                       String spoutScheme) {

    LOG.info("KAFKASPOUT: Configuring the KafkaSpout");

    // Configure the KafkaSpout
    SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zkHostString),
            kafkaTopic,      // Kafka topic to read from
            "/" + kafkaTopic, // Root path in Zookeeper for the spout to store consumer offsets
            UUID.randomUUID().toString());  // ID for storing consumer offsets in Zookeeper
    try {
        spoutConfig.scheme = new SchemeAsMultiScheme(getSchemeFromClassName(spoutScheme));
    } catch(Exception e) {
        LOG.error("ERROR: Unable to create instance of scheme: " + spoutScheme);
        e.printStackTrace();
    }
    setKafkaOffset(spoutConfig, kafkaStartOffset);
    
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

    // Add the spout and bolt to the topology
    builder.setSpout(spoutName, kafkaSpout, parallelismHint);

}
 
开发者ID:sakserv,项目名称:storm-topology-examples,代码行数:26,代码来源:ConfigureKafkaSpout.java


示例2: testOpen

import storm.kafka.KafkaSpout; //导入依赖的package包/类
@Test
public void testOpen(
        final @Injectable SpoutConfig spoutConfig,
        final @Injectable Map conf,
        final @Injectable TopologyContext context,
        final @Injectable SpoutOutputCollector collector,
        final @Injectable KafkaSpout kafkaSpout) throws Exception {

    spout.rateLimiter = null;
    spout.kafkaSpout = kafkaSpout;

    new Expectations(spout) {{
        spout.setupKafkaSpout();
    }};

    spout.permitsPerSecond = spout.DEFAULT_PERMITS_PER_SECOND;
    spout.open(conf, context, collector);
    Assert.assertNull(spout.rateLimiter);

    spout.permitsPerSecond = 1D;
    spout.open(conf, context, collector);
    Assert.assertNotNull(spout.rateLimiter);
}
 
开发者ID:boozallen,项目名称:cognition,代码行数:24,代码来源:StormKafkaSpoutTest.java


示例3: buildTopology

import storm.kafka.KafkaSpout; //导入依赖的package包/类
public StormTopology buildTopology(Properties properties) {
	
	// Load properties for the storm topology
	String kafkaTopic = properties.getProperty("kafka.topic");
	
	SpoutConfig kafkaConfig = new SpoutConfig(kafkaBrokerHosts, kafkaTopic, "",	"storm");
	kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
	TopologyBuilder builder = new TopologyBuilder();

	// Specific audit logs analysis bolts
	AuditLoginsCounterBolt loginCounterbolt = new AuditLoginsCounterBolt();
	AuditParserBolt auditParserBolt = new AuditParserBolt();
	
	// Elastic search bolt
	TupleMapper tupleMapper = new DefaultTupleMapper();
	ElasticSearchBolt elasticSearchBolt = new ElasticSearchBolt(tupleMapper);

	// Topology scheme: KafkaSpout -> auditParserBolt -> loginCounterBolt -> elasticSearchBolt
	builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
	builder.setBolt("ParseBolt", auditParserBolt, 1).shuffleGrouping("KafkaSpout");
	builder.setBolt("CountBolt", loginCounterbolt, 1).shuffleGrouping("ParseBolt");
	builder.setBolt("ElasticSearchBolt", elasticSearchBolt, 1)
	.fieldsGrouping("CountBolt", new Fields("id", "index", "type", "document"));

	return builder.createTopology();
}
 
开发者ID:mvalleavila,项目名称:Kafka-Storm-ElasticSearch,代码行数:27,代码来源:AuditActiveLoginsTopology.java


示例4: getTopology

import storm.kafka.KafkaSpout; //导入依赖的package包/类
@Override
public StormTopology getTopology(Config config) {

  final int spoutNum = BenchmarkUtils.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
  final int viewBoltNum = BenchmarkUtils.getInt(config, VIEW_NUM, DEFAULT_VIEW_BOLT_NUM);
  final int cntBoltNum = BenchmarkUtils.getInt(config, COUNT_NUM, DEFAULT_COUNT_BOLT_NUM);

  spout = new KafkaSpout(KafkaUtils.getSpoutConfig(
          config, new SchemeAsMultiScheme(new StringScheme())));

  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout(SPOUT_ID, spout, spoutNum);
  builder.setBolt(VIEW_ID, new PageViewBolt(Item.URL, Item.ONE), viewBoltNum)
         .localOrShuffleGrouping(SPOUT_ID);
  builder.setBolt(COUNT_ID, new WordCount.Count(), cntBoltNum)
          .fieldsGrouping(VIEW_ID, new Fields(Item.URL.toString()));
  return builder.createTopology();
}
 
开发者ID:manuzhang,项目名称:storm-benchmark,代码行数:19,代码来源:PageViewCount.java


示例5: getTopology

import storm.kafka.KafkaSpout; //导入依赖的package包/类
@Override
public StormTopology getTopology(Config config) {
  final int spoutNum = BenchmarkUtils.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
  final int pvBoltNum = BenchmarkUtils.getInt(config, VIEW_NUM, DEFAULT_PV_BOLT_NUM);
  final int filterBoltNum = BenchmarkUtils.getInt(config, FILTER_NUM, DEFAULT_FILTER_BOLT_NUM);
  spout = new KafkaSpout(KafkaUtils.getSpoutConfig(
          config, new SchemeAsMultiScheme(new StringScheme())));

  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout(SPOUT_ID, spout, spoutNum);
  builder.setBolt(VIEW_ID, new PageViewBolt(Item.STATUS, Item.ALL), pvBoltNum)
          .localOrShuffleGrouping(SPOUT_ID);
  builder.setBolt(FILTER_ID, new FilterBolt<Integer>(404), filterBoltNum)
          .fieldsGrouping(VIEW_ID, new Fields(Item.STATUS.toString()));
  return builder.createTopology();
}
 
开发者ID:manuzhang,项目名称:storm-benchmark,代码行数:17,代码来源:DataClean.java


示例6: getTopology

import storm.kafka.KafkaSpout; //导入依赖的package包/类
@Override
public StormTopology getTopology(Config config) {

  final int spoutNum = BenchmarkUtils.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
  final int matBoltNum = BenchmarkUtils.getInt(config, FM_NUM, DEFAULT_MAT_BOLT_NUM);
  final int cntBoltNum = BenchmarkUtils.getInt(config, CM_NUM, DEFAULT_CNT_BOLT_NUM);
  final String ptnString = (String) Utils.get(config, PATTERN_STRING, DEFAULT_PATTERN_STR);

  spout = new KafkaSpout(KafkaUtils.getSpoutConfig(config, new SchemeAsMultiScheme(new StringScheme())));

  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout(SPOUT_ID, spout, spoutNum);
  builder.setBolt(FM_ID, new FindMatchingSentence(ptnString), matBoltNum)
          .localOrShuffleGrouping(SPOUT_ID);
  builder.setBolt(CM_ID, new CountMatchingSentence(), cntBoltNum)
          .fieldsGrouping(FM_ID, new Fields(FindMatchingSentence.FIELDS));

  return builder.createTopology();
}
 
开发者ID:manuzhang,项目名称:storm-benchmark,代码行数:20,代码来源:Grep.java


示例7: getTopology

import storm.kafka.KafkaSpout; //导入依赖的package包/类
@Override
public StormTopology getTopology(Config config) {

  final int spoutNum = BenchmarkUtils.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
  final int pvBoltNum = BenchmarkUtils.getInt(config, VIEW_NUM, DEFAULT_PV_BOLT_NUM);
  final int uvBoltNum = BenchmarkUtils.getInt(config, UNIQUER_NUM, DEFAULT_UV_BOLT_NUM);
  final int winLen = BenchmarkUtils.getInt(config, WINDOW_LENGTH, DEFAULT_WINDOW_LENGTH_IN_SEC);
  final int emitFreq = BenchmarkUtils.getInt(config, EMIT_FREQ, DEFAULT_EMIT_FREQ_IN_SEC);
  spout = new KafkaSpout(KafkaUtils.getSpoutConfig(
          config, new SchemeAsMultiScheme(new StringScheme())));

  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout(SPOUT_ID, spout, spoutNum);
  builder.setBolt(VIEW_ID, new PageViewBolt(Item.URL, Item.USER), pvBoltNum)
          .localOrShuffleGrouping(SPOUT_ID);
  builder.setBolt(UNIQUER_ID, new UniqueVisitorBolt(winLen, emitFreq), uvBoltNum)
          .fieldsGrouping(VIEW_ID, new Fields(Item.URL.toString()));
  return builder.createTopology();
}
 
开发者ID:manuzhang,项目名称:storm-benchmark,代码行数:20,代码来源:UniqueVisitor.java


示例8: configureKafkaSpout

import storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void configureKafkaSpout(TopologyBuilder builder, String zkHostString, String kafkaTopic, String kafkaStartOffset) {

        // Configure the KafkaSpout
        SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zkHostString),
                kafkaTopic,      // Kafka topic to read from
                "/" + kafkaTopic, // Root path in Zookeeper for the spout to store consumer offsets
                UUID.randomUUID().toString());  // ID for storing consumer offsets in Zookeeper
        //spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConfig.scheme = new SchemeAsMultiScheme(new JsonScheme());

        // Allow for passing in an offset time
        // startOffsetTime has a bug that ignores the special -2 value
        if(kafkaStartOffset == "-2") {
            spoutConfig.forceFromStart = true;
        } else if (kafkaStartOffset != null) {
            spoutConfig.startOffsetTime = Long.parseLong(kafkaStartOffset);
        }
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        // Add the spout and bolt to the topology
        builder.setSpout("kafkaspout", kafkaSpout, 1);

    }
 
开发者ID:sakserv,项目名称:storm-kafka-hdfs-starter,代码行数:24,代码来源:ConfigureKafkaSpout.java


示例9: initializeKafkaSpout

import storm.kafka.KafkaSpout; //导入依赖的package包/类
private boolean initializeKafkaSpout(String name) {
	try {

		BrokerHosts zk = new ZkHosts(config.getString("kafka.zk"));
		String input_topic = config.getString("spout.kafka.topic");
		SpoutConfig kafkaConfig = new SpoutConfig(zk, input_topic, "",
				input_topic);
		kafkaConfig.scheme = new SchemeAsMultiScheme(new RawScheme());
		kafkaConfig.forceFromStart = Boolean.valueOf("True");
		kafkaConfig.startOffsetTime = -1;

		builder.setSpout(name, new KafkaSpout(kafkaConfig),
				config.getInt("spout.kafka.parallelism.hint")).setNumTasks(
				config.getInt("spout.kafka.num.tasks"));

	} catch (Exception e) {
		e.printStackTrace();
		System.exit(0);
	}

	return true;
}
 
开发者ID:OpenSOC,项目名称:opensoc-streaming,代码行数:23,代码来源:TopologyRunner.java


示例10: main

import storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String[] args) {
    TopologyBuilder builder = new TopologyBuilder();

    SpoutConfig spoutConf = new SpoutConfig(new ZkHosts("localhost:2181", "/brokers"), "test", "/kafkastorm", "KafkaSpout");
    spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
    spoutConf.forceFromStart = true;

    builder.setSpout("KafkaSpout", new KafkaSpout(spoutConf), 3);
    builder.setBolt("KafkaBolt", new PrinterBolt(), 3).shuffleGrouping("KafkaSpout");

    Config conf = new Config();
    // conf.setDebug(true);

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("kafka-test", conf, builder.createTopology());

    Utils.sleep(60000);
    cluster.shutdown();
}
 
开发者ID:ogidogi,项目名称:kafka-storm-hive,代码行数:20,代码来源:KafkaStormTopology.java


示例11: main

import storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String[] args) {
	
	
	Config conf = new Config();
	// conf.put(Config.TOPOLOGY_DEBUG,true);
	conf.put(StormElasticSearchConstants.ES_CLUSTER_NAME,"elasticsearch");
	conf.put(StormElasticSearchConstants.ES_HOST,"localhost");
	conf.put(StormElasticSearchConstants.ES_PORT,9300);
	
	ZkHosts zooHosts = new ZkHosts("localhost:50003");
    TopologyBuilder builder = new TopologyBuilder();
    SpoutConfig spoutConfig = new SpoutConfig(zooHosts, "test", "", "STORM-ID");

    //spoutConfig.scheme=new StringScheme();
   // spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    KafkaSpout spout1 =  new KafkaSpout(spoutConfig);
    builder.setSpout("source", spout1, 1);
	builder.setBolt("echo", new EchoBolt(), 1).shuffleGrouping("source");

	LocalCluster cluster = new LocalCluster();
	cluster.submitTopology("basic_primitives", conf,
			builder.createTopology());
}
 
开发者ID:Produban,项目名称:openbus,代码行数:24,代码来源:kafkaSpoutTest.java


示例12: main

import storm.kafka.KafkaSpout; //导入依赖的package包/类
/**
 * @param args
 * http://www.programcreek.com/java-api-examples/index.php?api=storm.kafka.KafkaSpout
 */
public static void main(String[] args) {
	try{
		//设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数(6个)
		String zkhost = "wxb-1:2181,wxb-2:2181,wxb-3:2181";
		String topic = "order";
		String groupId = "id";
		int spoutNum = 3;
		int boltNum = 1;
		ZkHosts zkHosts = new ZkHosts(zkhost);//kafaka所在的zookeeper
		SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/order", groupId);  // create /order /id
		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
		KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", kafkaSpout, spoutNum);
		builder.setBolt("check", new CheckOrderBolt(), boltNum).shuffleGrouping("spout");
        builder.setBolt("counter", new CounterBolt(),boltNum).shuffleGrouping("check");

        Config config = new Config();
        config.setDebug(true);
        
        if(args!=null && args.length > 0) {
            config.setNumWorkers(2);
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } else {        
            config.setMaxTaskParallelism(2);

            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("Wordcount-Topology", config, builder.createTopology());

            Thread.sleep(500000);

            cluster.shutdown();
        }
	}catch (Exception e) {
		e.printStackTrace();
	}
}
 
开发者ID:realxujiang,项目名称:storm-kafka-examples,代码行数:43,代码来源:CounterTopology.java


示例13: configureKafkaSpout

import storm.kafka.KafkaSpout; //导入依赖的package包/类
public int configureKafkaSpout(TopologyBuilder builder) {
    KafkaSpout kafkaSpout = constructKafkaSpout();

    //int spoutCount = Integer.valueOf(topologyConfig.getProperty("spout.thread.count"));
    //int boltCount = Integer.valueOf(topologyConfig.getProperty("bolt.thread.count"));
    
    int spoutCount = Integer.valueOf(1);
    int boltCount = Integer.valueOf(1);

    builder.setSpout("kafkaSpout", kafkaSpout, spoutCount);
    return boltCount;
}
 
开发者ID:bucaojit,项目名称:RealEstate-Streaming,代码行数:13,代码来源:PhoenixTest.java


示例14: getKafkaSpout

import storm.kafka.KafkaSpout; //导入依赖的package包/类
public KafkaSpout getKafkaSpout() {

        LOG.info("KAFKASPOUT: Configuring the Kafka Spout");

        // Create the initial spoutConfig
        SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeeperConnectionString),
                kafkaTopic,      // Kafka topic to read from
                "/" + kafkaTopic, // Root path in Zookeeper for the spout to store consumer offsets
                UUID.randomUUID().toString());  // ID for storing consumer offsets in Zookeeper

        // Set the scheme
        try {
            spoutConfig.scheme = new SchemeAsMultiScheme(getSchemeFromClassName(spoutSchemeClass));
        } catch(Exception e) {
            LOG.error("ERROR: Unable to create instance of scheme: " + spoutSchemeClass);
            e.printStackTrace();
        }

        // Set the offset
        setKafkaOffset(spoutConfig, kafkaStartOffset);

        // Create the kafkaSpout
        return new KafkaSpout(spoutConfig);

    }
 
开发者ID:sakserv,项目名称:storm-kafka-hdfs-example,代码行数:26,代码来源:KafkaSpoutConfigBuilder.java


示例15: configureKafkaSpout

import storm.kafka.KafkaSpout; //导入依赖的package包/类
public int configureKafkaSpout(TopologyBuilder builder) {
  KafkaSpout kafkaSpout = constructKafkaSpout();

  int spoutCount = Integer.valueOf(topologyConfig.getProperty("spout.thread.count"));
  int boltCount = Integer.valueOf(topologyConfig.getProperty("bolt.thread.count"));

  builder.setSpout("kafkaSpout", kafkaSpout, spoutCount);
  return boltCount;
}
 
开发者ID:DhruvKumar,项目名称:iot-masterclass,代码行数:10,代码来源:TruckEventProcessorKafkaTopology.java


示例16: main

import storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

		String zkIp = "localhost";

		String nimbusHost = "sandbox.hortonworks.com";

		String zookeeperHost = zkIp +":2181";

		ZkHosts zkHosts = new ZkHosts(zookeeperHost);
		List<String> zkServers = new ArrayList<String>();
		zkServers.add(zkIp);
		SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "spertus-flight-events", "/spertus-flights-events","flight_id");
		kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
		kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
		kafkaConfig.zkServers = zkServers;
		kafkaConfig.zkRoot = "/spertus-flight-events";
		kafkaConfig.zkPort = 2181;
		kafkaConfig.forceFromStart = true;
		KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);

		TopologyBuilder builder = new TopologyBuilder();

		builder.setSpout("flight-events", kafkaSpout, 1);
		builder.setBolt("flight-stats", new GetFlightStatsBolt(), 1).shuffleGrouping("flight-events");

		Map conf = new HashMap();
		conf.put(backtype.storm.Config.TOPOLOGY_WORKERS, 4);
		conf.put(backtype.storm.Config.TOPOLOGY_DEBUG, true);
		if (args != null && args.length > 0) {
			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
		}   else {
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("flight-topology", conf, builder.createTopology());
		}
	}
 
开发者ID:mspertus,项目名称:Big-Data-tutorial,代码行数:36,代码来源:FlightTopology.java


示例17: createTopology

import storm.kafka.KafkaSpout; //导入依赖的package包/类
private static StormTopology createTopology()
{
    SpoutConfig kafkaConf = new SpoutConfig(
        new ZkHosts(Properties.getString("rts.storm.zkhosts")),
        KAFKA_TOPIC,
        "/kafka",
        "KafkaSpout");
    kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
    TopologyBuilder topology = new TopologyBuilder();

    topology.setSpout("kafka_spout", new KafkaSpout(kafkaConf), 4);

    topology.setBolt("twitter_filter", new TwitterFilterBolt(), 4)
            .shuffleGrouping("kafka_spout");

    topology.setBolt("text_filter", new TextFilterBolt(), 4)
            .shuffleGrouping("twitter_filter");

    topology.setBolt("stemming", new StemmingBolt(), 4)
            .shuffleGrouping("text_filter");

    topology.setBolt("positive", new PositiveSentimentBolt(), 4)
            .shuffleGrouping("stemming");
    topology.setBolt("negative", new NegativeSentimentBolt(), 4)
            .shuffleGrouping("stemming");

    topology.setBolt("join", new JoinSentimentsBolt(), 4)
            .fieldsGrouping("positive", new Fields("tweet_id"))
            .fieldsGrouping("negative", new Fields("tweet_id"));

    topology.setBolt("score", new SentimentScoringBolt(), 4)
            .shuffleGrouping("join");

    topology.setBolt("hdfs", new HDFSBolt(), 4)
            .shuffleGrouping("score");
    topology.setBolt("nodejs", new NodeNotifierBolt(), 4)
            .shuffleGrouping("score");

    return topology.createTopology();
}
 
开发者ID:zdata-inc,项目名称:StormSampleProject,代码行数:41,代码来源:SentimentAnalysisTopology.java


示例18: main

import storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
    String kafkaZk = "zookeeper:2181"; // change it to your zookeeper server
    BrokerHosts brokerHosts = new ZkHosts(kafkaZk);

    SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "name_of_kafka_topic", "", "test"); // change it to the name of your kafka topic
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    kafkaConfig.forceFromStart = true;

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("stream", new KafkaSpout(kafkaConfig), 1);
    builder.setBolt("split", new SplitterBolt(), 8).shuffleGrouping("stream");
    builder.setBolt("counter", new CounterBolt(), 10).customGrouping("split", new PartialKeyGrouping());
    builder.setBolt("aggregator", new AggregatorBolt(), 1).fieldsGrouping("counter", new Fields("word"));

    Config conf = new Config();
    conf.setDebug(false);
    conf.setMaxSpoutPending(100);
    // conf.setMessageTimeoutSecs(300); // optionally increase the timeout for tuples

    if (args != null && args.length > 0) {
        conf.setNumWorkers(10);
        StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    } else {
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", conf, builder.createTopology());
        Utils.sleep(15000000);
        cluster.killTopology("test");
        cluster.shutdown();
    }
}
 
开发者ID:gdfm,项目名称:partial-key-grouping,代码行数:31,代码来源:WordCountPartialKeyGrouping.java


示例19: main

import storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String[] args) throws AlreadyAliveException,
			InvalidTopologyException {
		logger.info("begin to running recsys.");
		BrokerHosts brokerHosts = new ZkHosts(Constants.kafka_zk_address);
		SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, Constants.kafka_topic, 	Constants.kafka_zk_root, Constants.kafka_id);

		Config conf = new Config();
		Map<String, String> map = new HashMap<String, String>();
		map.put("metadata.broker.list", Constants.kakfa_broker_list);
		map.put("serializer.class", "kafka.serializer.StringEncoder");
		conf.put("kafka.broker.properties", map);
//		conf.put("topic", "topic2");

		spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("spout", new KafkaSpout(spoutConfig));
		builder.setBolt("bolt", new HBaseStoreBolt()).shuffleGrouping("spout");
//		builder.setBolt("kafkabolt", new KafkaBolt<String, Integer>()).shuffleGrouping("bolt");

		if (!islocal) {
			conf.setNumWorkers(3);
			StormSubmitter.submitTopology(Constants.storm_topology_name, conf, builder.createTopology());
		} else {
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology(Constants.storm_topology_name, conf, builder.createTopology());
			Utils.sleep(100000);
			cluster.killTopology(Constants.storm_topology_name);
			cluster.shutdown();
		}
		logger.info("run recsys finish.");
	}
 
开发者ID:bytegriffin,项目名称:recsys-online,代码行数:32,代码来源:Recsys.java


示例20: buildTopology

import storm.kafka.KafkaSpout; //导入依赖的package包/类
public StormTopology buildTopology(Properties properties) {
	
	// Load properties for the storm topoology
	String kafkaTopic = properties.getProperty("kafka.topic");
	String hbaseTable = properties.getProperty("hbase.table.name");
	String hbaseColumnFamily = properties.getProperty("hbase.column.family");
	
	SpoutConfig kafkaConfig = new SpoutConfig(kafkaBrokerHosts, kafkaTopic, "",
			"storm");
	kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
	TopologyBuilder builder = new TopologyBuilder();
	
	SimpleHBaseMapper hBaseMapper = new SimpleHBaseMapper()
			.withRowKeyField("host-user")
			.withCounterFields(new Fields("count"))
			.withColumnFamily(hbaseColumnFamily);

	// HbaseBolt(tableName, hbaseMapper)
	HBaseBolt hbaseBolt = new HBaseBolt(hbaseTable, hBaseMapper);
	AuditLoginsCounterBolt loginCounterbolt = new AuditLoginsCounterBolt(hbaseTable);
	AuditBolt auditParserBolt = new AuditBolt();

	builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
	builder.setBolt("ParseBolt", auditParserBolt, 1).shuffleGrouping("KafkaSpout");
	builder.setBolt("CountBolt", loginCounterbolt, 1).shuffleGrouping("ParseBolt");
	builder.setBolt("HBaseBolt", hbaseBolt, 1).fieldsGrouping("CountBolt",
			new Fields("host-user"));

	return builder.createTopology();
}
 
开发者ID:mvalleavila,项目名称:StormTopology-AuditActiveLogins,代码行数:31,代码来源:AuditActiveLoginsTopology.java



注:本文中的storm.kafka.KafkaSpout类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java ItemStreamException类代码示例发布时间:2022-05-21
下一篇:
Java ConvenientBanner类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap