本文整理汇总了Java中storm.kafka.KafkaSpout类的典型用法代码示例。如果您正苦于以下问题:Java KafkaSpout类的具体用法?Java KafkaSpout怎么用?Java KafkaSpout使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
KafkaSpout类属于storm.kafka包,在下文中一共展示了KafkaSpout类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: configureKafkaSpout
import storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void configureKafkaSpout(TopologyBuilder builder, String zkHostString, String kafkaTopic,
String kafkaStartOffset, int parallelismHint, String spoutName,
String spoutScheme) {
LOG.info("KAFKASPOUT: Configuring the KafkaSpout");
// Configure the KafkaSpout
SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zkHostString),
kafkaTopic, // Kafka topic to read from
"/" + kafkaTopic, // Root path in Zookeeper for the spout to store consumer offsets
UUID.randomUUID().toString()); // ID for storing consumer offsets in Zookeeper
try {
spoutConfig.scheme = new SchemeAsMultiScheme(getSchemeFromClassName(spoutScheme));
} catch(Exception e) {
LOG.error("ERROR: Unable to create instance of scheme: " + spoutScheme);
e.printStackTrace();
}
setKafkaOffset(spoutConfig, kafkaStartOffset);
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
// Add the spout and bolt to the topology
builder.setSpout(spoutName, kafkaSpout, parallelismHint);
}
开发者ID:sakserv,项目名称:storm-topology-examples,代码行数:26,代码来源:ConfigureKafkaSpout.java
示例2: testOpen
import storm.kafka.KafkaSpout; //导入依赖的package包/类
@Test
public void testOpen(
final @Injectable SpoutConfig spoutConfig,
final @Injectable Map conf,
final @Injectable TopologyContext context,
final @Injectable SpoutOutputCollector collector,
final @Injectable KafkaSpout kafkaSpout) throws Exception {
spout.rateLimiter = null;
spout.kafkaSpout = kafkaSpout;
new Expectations(spout) {{
spout.setupKafkaSpout();
}};
spout.permitsPerSecond = spout.DEFAULT_PERMITS_PER_SECOND;
spout.open(conf, context, collector);
Assert.assertNull(spout.rateLimiter);
spout.permitsPerSecond = 1D;
spout.open(conf, context, collector);
Assert.assertNotNull(spout.rateLimiter);
}
开发者ID:boozallen,项目名称:cognition,代码行数:24,代码来源:StormKafkaSpoutTest.java
示例3: buildTopology
import storm.kafka.KafkaSpout; //导入依赖的package包/类
public StormTopology buildTopology(Properties properties) {
// Load properties for the storm topology
String kafkaTopic = properties.getProperty("kafka.topic");
SpoutConfig kafkaConfig = new SpoutConfig(kafkaBrokerHosts, kafkaTopic, "", "storm");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
// Specific audit logs analysis bolts
AuditLoginsCounterBolt loginCounterbolt = new AuditLoginsCounterBolt();
AuditParserBolt auditParserBolt = new AuditParserBolt();
// Elastic search bolt
TupleMapper tupleMapper = new DefaultTupleMapper();
ElasticSearchBolt elasticSearchBolt = new ElasticSearchBolt(tupleMapper);
// Topology scheme: KafkaSpout -> auditParserBolt -> loginCounterBolt -> elasticSearchBolt
builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
builder.setBolt("ParseBolt", auditParserBolt, 1).shuffleGrouping("KafkaSpout");
builder.setBolt("CountBolt", loginCounterbolt, 1).shuffleGrouping("ParseBolt");
builder.setBolt("ElasticSearchBolt", elasticSearchBolt, 1)
.fieldsGrouping("CountBolt", new Fields("id", "index", "type", "document"));
return builder.createTopology();
}
开发者ID:mvalleavila,项目名称:Kafka-Storm-ElasticSearch,代码行数:27,代码来源:AuditActiveLoginsTopology.java
示例4: getTopology
import storm.kafka.KafkaSpout; //导入依赖的package包/类
@Override
public StormTopology getTopology(Config config) {
final int spoutNum = BenchmarkUtils.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
final int viewBoltNum = BenchmarkUtils.getInt(config, VIEW_NUM, DEFAULT_VIEW_BOLT_NUM);
final int cntBoltNum = BenchmarkUtils.getInt(config, COUNT_NUM, DEFAULT_COUNT_BOLT_NUM);
spout = new KafkaSpout(KafkaUtils.getSpoutConfig(
config, new SchemeAsMultiScheme(new StringScheme())));
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUT_ID, spout, spoutNum);
builder.setBolt(VIEW_ID, new PageViewBolt(Item.URL, Item.ONE), viewBoltNum)
.localOrShuffleGrouping(SPOUT_ID);
builder.setBolt(COUNT_ID, new WordCount.Count(), cntBoltNum)
.fieldsGrouping(VIEW_ID, new Fields(Item.URL.toString()));
return builder.createTopology();
}
开发者ID:manuzhang,项目名称:storm-benchmark,代码行数:19,代码来源:PageViewCount.java
示例5: getTopology
import storm.kafka.KafkaSpout; //导入依赖的package包/类
@Override
public StormTopology getTopology(Config config) {
final int spoutNum = BenchmarkUtils.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
final int pvBoltNum = BenchmarkUtils.getInt(config, VIEW_NUM, DEFAULT_PV_BOLT_NUM);
final int filterBoltNum = BenchmarkUtils.getInt(config, FILTER_NUM, DEFAULT_FILTER_BOLT_NUM);
spout = new KafkaSpout(KafkaUtils.getSpoutConfig(
config, new SchemeAsMultiScheme(new StringScheme())));
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUT_ID, spout, spoutNum);
builder.setBolt(VIEW_ID, new PageViewBolt(Item.STATUS, Item.ALL), pvBoltNum)
.localOrShuffleGrouping(SPOUT_ID);
builder.setBolt(FILTER_ID, new FilterBolt<Integer>(404), filterBoltNum)
.fieldsGrouping(VIEW_ID, new Fields(Item.STATUS.toString()));
return builder.createTopology();
}
开发者ID:manuzhang,项目名称:storm-benchmark,代码行数:17,代码来源:DataClean.java
示例6: getTopology
import storm.kafka.KafkaSpout; //导入依赖的package包/类
@Override
public StormTopology getTopology(Config config) {
final int spoutNum = BenchmarkUtils.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
final int matBoltNum = BenchmarkUtils.getInt(config, FM_NUM, DEFAULT_MAT_BOLT_NUM);
final int cntBoltNum = BenchmarkUtils.getInt(config, CM_NUM, DEFAULT_CNT_BOLT_NUM);
final String ptnString = (String) Utils.get(config, PATTERN_STRING, DEFAULT_PATTERN_STR);
spout = new KafkaSpout(KafkaUtils.getSpoutConfig(config, new SchemeAsMultiScheme(new StringScheme())));
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUT_ID, spout, spoutNum);
builder.setBolt(FM_ID, new FindMatchingSentence(ptnString), matBoltNum)
.localOrShuffleGrouping(SPOUT_ID);
builder.setBolt(CM_ID, new CountMatchingSentence(), cntBoltNum)
.fieldsGrouping(FM_ID, new Fields(FindMatchingSentence.FIELDS));
return builder.createTopology();
}
开发者ID:manuzhang,项目名称:storm-benchmark,代码行数:20,代码来源:Grep.java
示例7: getTopology
import storm.kafka.KafkaSpout; //导入依赖的package包/类
@Override
public StormTopology getTopology(Config config) {
final int spoutNum = BenchmarkUtils.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
final int pvBoltNum = BenchmarkUtils.getInt(config, VIEW_NUM, DEFAULT_PV_BOLT_NUM);
final int uvBoltNum = BenchmarkUtils.getInt(config, UNIQUER_NUM, DEFAULT_UV_BOLT_NUM);
final int winLen = BenchmarkUtils.getInt(config, WINDOW_LENGTH, DEFAULT_WINDOW_LENGTH_IN_SEC);
final int emitFreq = BenchmarkUtils.getInt(config, EMIT_FREQ, DEFAULT_EMIT_FREQ_IN_SEC);
spout = new KafkaSpout(KafkaUtils.getSpoutConfig(
config, new SchemeAsMultiScheme(new StringScheme())));
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUT_ID, spout, spoutNum);
builder.setBolt(VIEW_ID, new PageViewBolt(Item.URL, Item.USER), pvBoltNum)
.localOrShuffleGrouping(SPOUT_ID);
builder.setBolt(UNIQUER_ID, new UniqueVisitorBolt(winLen, emitFreq), uvBoltNum)
.fieldsGrouping(VIEW_ID, new Fields(Item.URL.toString()));
return builder.createTopology();
}
开发者ID:manuzhang,项目名称:storm-benchmark,代码行数:20,代码来源:UniqueVisitor.java
示例8: configureKafkaSpout
import storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void configureKafkaSpout(TopologyBuilder builder, String zkHostString, String kafkaTopic, String kafkaStartOffset) {
// Configure the KafkaSpout
SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zkHostString),
kafkaTopic, // Kafka topic to read from
"/" + kafkaTopic, // Root path in Zookeeper for the spout to store consumer offsets
UUID.randomUUID().toString()); // ID for storing consumer offsets in Zookeeper
//spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.scheme = new SchemeAsMultiScheme(new JsonScheme());
// Allow for passing in an offset time
// startOffsetTime has a bug that ignores the special -2 value
if(kafkaStartOffset == "-2") {
spoutConfig.forceFromStart = true;
} else if (kafkaStartOffset != null) {
spoutConfig.startOffsetTime = Long.parseLong(kafkaStartOffset);
}
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
// Add the spout and bolt to the topology
builder.setSpout("kafkaspout", kafkaSpout, 1);
}
开发者ID:sakserv,项目名称:storm-kafka-hdfs-starter,代码行数:24,代码来源:ConfigureKafkaSpout.java
示例9: initializeKafkaSpout
import storm.kafka.KafkaSpout; //导入依赖的package包/类
private boolean initializeKafkaSpout(String name) {
try {
BrokerHosts zk = new ZkHosts(config.getString("kafka.zk"));
String input_topic = config.getString("spout.kafka.topic");
SpoutConfig kafkaConfig = new SpoutConfig(zk, input_topic, "",
input_topic);
kafkaConfig.scheme = new SchemeAsMultiScheme(new RawScheme());
kafkaConfig.forceFromStart = Boolean.valueOf("True");
kafkaConfig.startOffsetTime = -1;
builder.setSpout(name, new KafkaSpout(kafkaConfig),
config.getInt("spout.kafka.parallelism.hint")).setNumTasks(
config.getInt("spout.kafka.num.tasks"));
} catch (Exception e) {
e.printStackTrace();
System.exit(0);
}
return true;
}
开发者ID:OpenSOC,项目名称:opensoc-streaming,代码行数:23,代码来源:TopologyRunner.java
示例10: main
import storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
SpoutConfig spoutConf = new SpoutConfig(new ZkHosts("localhost:2181", "/brokers"), "test", "/kafkastorm", "KafkaSpout");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.forceFromStart = true;
builder.setSpout("KafkaSpout", new KafkaSpout(spoutConf), 3);
builder.setBolt("KafkaBolt", new PrinterBolt(), 3).shuffleGrouping("KafkaSpout");
Config conf = new Config();
// conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka-test", conf, builder.createTopology());
Utils.sleep(60000);
cluster.shutdown();
}
开发者ID:ogidogi,项目名称:kafka-storm-hive,代码行数:20,代码来源:KafkaStormTopology.java
示例11: main
import storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String[] args) {
Config conf = new Config();
// conf.put(Config.TOPOLOGY_DEBUG,true);
conf.put(StormElasticSearchConstants.ES_CLUSTER_NAME,"elasticsearch");
conf.put(StormElasticSearchConstants.ES_HOST,"localhost");
conf.put(StormElasticSearchConstants.ES_PORT,9300);
ZkHosts zooHosts = new ZkHosts("localhost:50003");
TopologyBuilder builder = new TopologyBuilder();
SpoutConfig spoutConfig = new SpoutConfig(zooHosts, "test", "", "STORM-ID");
//spoutConfig.scheme=new StringScheme();
// spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout spout1 = new KafkaSpout(spoutConfig);
builder.setSpout("source", spout1, 1);
builder.setBolt("echo", new EchoBolt(), 1).shuffleGrouping("source");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("basic_primitives", conf,
builder.createTopology());
}
开发者ID:Produban,项目名称:openbus,代码行数:24,代码来源:kafkaSpoutTest.java
示例12: main
import storm.kafka.KafkaSpout; //导入依赖的package包/类
/**
* @param args
* http://www.programcreek.com/java-api-examples/index.php?api=storm.kafka.KafkaSpout
*/
public static void main(String[] args) {
try{
//设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数(6个)
String zkhost = "wxb-1:2181,wxb-2:2181,wxb-3:2181";
String topic = "order";
String groupId = "id";
int spoutNum = 3;
int boltNum = 1;
ZkHosts zkHosts = new ZkHosts(zkhost);//kafaka所在的zookeeper
SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/order", groupId); // create /order /id
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", kafkaSpout, spoutNum);
builder.setBolt("check", new CheckOrderBolt(), boltNum).shuffleGrouping("spout");
builder.setBolt("counter", new CounterBolt(),boltNum).shuffleGrouping("check");
Config config = new Config();
config.setDebug(true);
if(args!=null && args.length > 0) {
config.setNumWorkers(2);
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} else {
config.setMaxTaskParallelism(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Wordcount-Topology", config, builder.createTopology());
Thread.sleep(500000);
cluster.shutdown();
}
}catch (Exception e) {
e.printStackTrace();
}
}
开发者ID:realxujiang,项目名称:storm-kafka-examples,代码行数:43,代码来源:CounterTopology.java
示例13: configureKafkaSpout
import storm.kafka.KafkaSpout; //导入依赖的package包/类
public int configureKafkaSpout(TopologyBuilder builder) {
KafkaSpout kafkaSpout = constructKafkaSpout();
//int spoutCount = Integer.valueOf(topologyConfig.getProperty("spout.thread.count"));
//int boltCount = Integer.valueOf(topologyConfig.getProperty("bolt.thread.count"));
int spoutCount = Integer.valueOf(1);
int boltCount = Integer.valueOf(1);
builder.setSpout("kafkaSpout", kafkaSpout, spoutCount);
return boltCount;
}
开发者ID:bucaojit,项目名称:RealEstate-Streaming,代码行数:13,代码来源:PhoenixTest.java
示例14: getKafkaSpout
import storm.kafka.KafkaSpout; //导入依赖的package包/类
public KafkaSpout getKafkaSpout() {
LOG.info("KAFKASPOUT: Configuring the Kafka Spout");
// Create the initial spoutConfig
SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeeperConnectionString),
kafkaTopic, // Kafka topic to read from
"/" + kafkaTopic, // Root path in Zookeeper for the spout to store consumer offsets
UUID.randomUUID().toString()); // ID for storing consumer offsets in Zookeeper
// Set the scheme
try {
spoutConfig.scheme = new SchemeAsMultiScheme(getSchemeFromClassName(spoutSchemeClass));
} catch(Exception e) {
LOG.error("ERROR: Unable to create instance of scheme: " + spoutSchemeClass);
e.printStackTrace();
}
// Set the offset
setKafkaOffset(spoutConfig, kafkaStartOffset);
// Create the kafkaSpout
return new KafkaSpout(spoutConfig);
}
开发者ID:sakserv,项目名称:storm-kafka-hdfs-example,代码行数:26,代码来源:KafkaSpoutConfigBuilder.java
示例15: configureKafkaSpout
import storm.kafka.KafkaSpout; //导入依赖的package包/类
public int configureKafkaSpout(TopologyBuilder builder) {
KafkaSpout kafkaSpout = constructKafkaSpout();
int spoutCount = Integer.valueOf(topologyConfig.getProperty("spout.thread.count"));
int boltCount = Integer.valueOf(topologyConfig.getProperty("bolt.thread.count"));
builder.setSpout("kafkaSpout", kafkaSpout, spoutCount);
return boltCount;
}
开发者ID:DhruvKumar,项目名称:iot-masterclass,代码行数:10,代码来源:TruckEventProcessorKafkaTopology.java
示例16: main
import storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
String zkIp = "localhost";
String nimbusHost = "sandbox.hortonworks.com";
String zookeeperHost = zkIp +":2181";
ZkHosts zkHosts = new ZkHosts(zookeeperHost);
List<String> zkServers = new ArrayList<String>();
zkServers.add(zkIp);
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "spertus-flight-events", "/spertus-flights-events","flight_id");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
kafkaConfig.zkServers = zkServers;
kafkaConfig.zkRoot = "/spertus-flight-events";
kafkaConfig.zkPort = 2181;
kafkaConfig.forceFromStart = true;
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("flight-events", kafkaSpout, 1);
builder.setBolt("flight-stats", new GetFlightStatsBolt(), 1).shuffleGrouping("flight-events");
Map conf = new HashMap();
conf.put(backtype.storm.Config.TOPOLOGY_WORKERS, 4);
conf.put(backtype.storm.Config.TOPOLOGY_DEBUG, true);
if (args != null && args.length > 0) {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("flight-topology", conf, builder.createTopology());
}
}
开发者ID:mspertus,项目名称:Big-Data-tutorial,代码行数:36,代码来源:FlightTopology.java
示例17: createTopology
import storm.kafka.KafkaSpout; //导入依赖的package包/类
private static StormTopology createTopology()
{
SpoutConfig kafkaConf = new SpoutConfig(
new ZkHosts(Properties.getString("rts.storm.zkhosts")),
KAFKA_TOPIC,
"/kafka",
"KafkaSpout");
kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder topology = new TopologyBuilder();
topology.setSpout("kafka_spout", new KafkaSpout(kafkaConf), 4);
topology.setBolt("twitter_filter", new TwitterFilterBolt(), 4)
.shuffleGrouping("kafka_spout");
topology.setBolt("text_filter", new TextFilterBolt(), 4)
.shuffleGrouping("twitter_filter");
topology.setBolt("stemming", new StemmingBolt(), 4)
.shuffleGrouping("text_filter");
topology.setBolt("positive", new PositiveSentimentBolt(), 4)
.shuffleGrouping("stemming");
topology.setBolt("negative", new NegativeSentimentBolt(), 4)
.shuffleGrouping("stemming");
topology.setBolt("join", new JoinSentimentsBolt(), 4)
.fieldsGrouping("positive", new Fields("tweet_id"))
.fieldsGrouping("negative", new Fields("tweet_id"));
topology.setBolt("score", new SentimentScoringBolt(), 4)
.shuffleGrouping("join");
topology.setBolt("hdfs", new HDFSBolt(), 4)
.shuffleGrouping("score");
topology.setBolt("nodejs", new NodeNotifierBolt(), 4)
.shuffleGrouping("score");
return topology.createTopology();
}
开发者ID:zdata-inc,项目名称:StormSampleProject,代码行数:41,代码来源:SentimentAnalysisTopology.java
示例18: main
import storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
String kafkaZk = "zookeeper:2181"; // change it to your zookeeper server
BrokerHosts brokerHosts = new ZkHosts(kafkaZk);
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "name_of_kafka_topic", "", "test"); // change it to the name of your kafka topic
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.forceFromStart = true;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("stream", new KafkaSpout(kafkaConfig), 1);
builder.setBolt("split", new SplitterBolt(), 8).shuffleGrouping("stream");
builder.setBolt("counter", new CounterBolt(), 10).customGrouping("split", new PartialKeyGrouping());
builder.setBolt("aggregator", new AggregatorBolt(), 1).fieldsGrouping("counter", new Fields("word"));
Config conf = new Config();
conf.setDebug(false);
conf.setMaxSpoutPending(100);
// conf.setMessageTimeoutSecs(300); // optionally increase the timeout for tuples
if (args != null && args.length > 0) {
conf.setNumWorkers(10);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(15000000);
cluster.killTopology("test");
cluster.shutdown();
}
}
开发者ID:gdfm,项目名称:partial-key-grouping,代码行数:31,代码来源:WordCountPartialKeyGrouping.java
示例19: main
import storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String[] args) throws AlreadyAliveException,
InvalidTopologyException {
logger.info("begin to running recsys.");
BrokerHosts brokerHosts = new ZkHosts(Constants.kafka_zk_address);
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, Constants.kafka_topic, Constants.kafka_zk_root, Constants.kafka_id);
Config conf = new Config();
Map<String, String> map = new HashMap<String, String>();
map.put("metadata.broker.list", Constants.kakfa_broker_list);
map.put("serializer.class", "kafka.serializer.StringEncoder");
conf.put("kafka.broker.properties", map);
// conf.put("topic", "topic2");
spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new KafkaSpout(spoutConfig));
builder.setBolt("bolt", new HBaseStoreBolt()).shuffleGrouping("spout");
// builder.setBolt("kafkabolt", new KafkaBolt<String, Integer>()).shuffleGrouping("bolt");
if (!islocal) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(Constants.storm_topology_name, conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(Constants.storm_topology_name, conf, builder.createTopology());
Utils.sleep(100000);
cluster.killTopology(Constants.storm_topology_name);
cluster.shutdown();
}
logger.info("run recsys finish.");
}
开发者ID:bytegriffin,项目名称:recsys-online,代码行数:32,代码来源:Recsys.java
示例20: buildTopology
import storm.kafka.KafkaSpout; //导入依赖的package包/类
public StormTopology buildTopology(Properties properties) {
// Load properties for the storm topoology
String kafkaTopic = properties.getProperty("kafka.topic");
String hbaseTable = properties.getProperty("hbase.table.name");
String hbaseColumnFamily = properties.getProperty("hbase.column.family");
SpoutConfig kafkaConfig = new SpoutConfig(kafkaBrokerHosts, kafkaTopic, "",
"storm");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
SimpleHBaseMapper hBaseMapper = new SimpleHBaseMapper()
.withRowKeyField("host-user")
.withCounterFields(new Fields("count"))
.withColumnFamily(hbaseColumnFamily);
// HbaseBolt(tableName, hbaseMapper)
HBaseBolt hbaseBolt = new HBaseBolt(hbaseTable, hBaseMapper);
AuditLoginsCounterBolt loginCounterbolt = new AuditLoginsCounterBolt(hbaseTable);
AuditBolt auditParserBolt = new AuditBolt();
builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
builder.setBolt("ParseBolt", auditParserBolt, 1).shuffleGrouping("KafkaSpout");
builder.setBolt("CountBolt", loginCounterbolt, 1).shuffleGrouping("ParseBolt");
builder.setBolt("HBaseBolt", hbaseBolt, 1).fieldsGrouping("CountBolt",
new Fields("host-user"));
return builder.createTopology();
}
开发者ID:mvalleavila,项目名称:StormTopology-AuditActiveLogins,代码行数:31,代码来源:AuditActiveLoginsTopology.java
注:本文中的storm.kafka.KafkaSpout类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论