本文整理汇总了Java中org.springframework.kafka.test.rule.KafkaEmbedded类的典型用法代码示例。如果您正苦于以下问题:Java KafkaEmbedded类的具体用法?Java KafkaEmbedded怎么用?Java KafkaEmbedded使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
KafkaEmbedded类属于org.springframework.kafka.test.rule包,在下文中一共展示了KafkaEmbedded类的6个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: kafkaBroker
import org.springframework.kafka.test.rule.KafkaEmbedded; //导入依赖的package包/类
@SuppressWarnings("serial")
public static KafkaEmbedded kafkaBroker(int port, int partitionCount, String logDirectory, String... topics) {
final int brokerCount = 1;
final boolean controlledShutdown = false;
LOG.debug("JUnit @Rule instantiating embedded Kafka broker: port [{}], topic(s) [{}], brokerCount [{}], partitionCount [{}], controlledShutdown [{}], log directory [{}]",
port, of(topics).collect(joining(", ")), brokerCount, partitionCount, controlledShutdown, logDirectory);
KafkaEmbedded embedded = new KafkaEmbedded(brokerCount, controlledShutdown, partitionCount, topics);
embedded.brokerProperties(new HashMap<String, String>() {
{
put("logs.dir", logDirectory);
}
});
embedded.setKafkaPorts(port);
LOG.debug("JUnit @Rule returning embedded Kafka broker instance [{}]", embedded);
return embedded;
}
开发者ID:rmap-project,项目名称:rmap,代码行数:21,代码来源:KafkaJunit4Bootstrapper.java
示例2: buildEmbeddedKafka
import org.springframework.kafka.test.rule.KafkaEmbedded; //导入依赖的package包/类
private static KafkaEmbedded buildEmbeddedKafka() {
//Build a list of all the topics to create and along thw way create a map for each of
//the topic types
Arrays.stream(StatisticType.values())
.forEachOrdered(type -> {
String inputTopic = TopicNameFactory.getStatisticTypedName(STATISTIC_EVENTS_TOPIC_PREFIX, type);
topics.add(inputTopic);
INPUT_TOPICS_MAP.put(type, inputTopic);
String badTopic = TopicNameFactory.getStatisticTypedName(BAD_STATISTIC_EVENTS_TOPIC_PREFIX, type);
topics.add(badTopic);
BAD_TOPICS_MAP.put(type, badTopic);
Arrays.stream(EventStoreTimeIntervalEnum.values())
.forEachOrdered(interval -> {
String rollupTopic = TopicNameFactory.getIntervalTopicName(
STATISTIC_ROLLUP_PERMS_TOPIC_PREFIX,
type,
interval);
topics.add(rollupTopic);
ROLLUP_TOPICS_MAP.computeIfAbsent(type, k -> new ArrayList<>()).add(rollupTopic);
});
});
// topics.forEach(topic -> LOGGER.info("Creating topic: {}", topic));
// return new KafkaEmbedded(1, true, 1, topics.toArray(new String[topics.size()]));
return new KafkaEmbedded(1, true, 1);
}
开发者ID:gchq,项目名称:stroom-stats,代码行数:31,代码来源:StatisticsFlatMappingServiceIT.java
示例3: createTopics
import org.springframework.kafka.test.rule.KafkaEmbedded; //导入依赖的package包/类
public static void createTopics(final KafkaEmbedded kafkaEmbedded, final String... topics) {
ZkUtils zkUtils = new ZkUtils(kafkaEmbedded.getZkClient(), null, false);
for (String topic : topics) {
LOGGER.info("Creating topic {}", topic);
AdminUtils.createTopic(zkUtils,
topic,
kafkaEmbedded.getPartitionsPerTopic(),
kafkaEmbedded.getBrokerAddresses().length,
new Properties(),
null);
}
}
开发者ID:gchq,项目名称:stroom-stats,代码行数:14,代码来源:KafkaEmbededUtils.java
示例4: deleteTopics
import org.springframework.kafka.test.rule.KafkaEmbedded; //导入依赖的package包/类
public static void deleteTopics(final KafkaEmbedded kafkaEmbedded, final String... topics) {
ZkUtils zkUtils = new ZkUtils(kafkaEmbedded.getZkClient(), null, false);
for (String topic : topics) {
if (AdminUtils.topicExists(zkUtils, topic)) {
LOGGER.info("Deleting topic {}", topic);
kafkaEmbedded.waitUntilSynced(topic, 0);
AdminUtils.deleteTopic(zkUtils, topic);
while (AdminUtils.topicExists(zkUtils, topic)) {
//wait for topic to die
}
kafkaEmbedded.waitUntilSynced(topic, 0);
}
}
}
开发者ID:gchq,项目名称:stroom-stats,代码行数:16,代码来源:KafkaEmbededUtils.java
示例5: kafka
import org.springframework.kafka.test.rule.KafkaEmbedded; //导入依赖的package包/类
@Bean
KafkaEmbedded kafka() {
KafkaEmbedded embedded = new KafkaEmbedded(1);
embedded.setKafkaPorts(9092);
return embedded;
}
开发者ID:olivergierke,项目名称:sos,代码行数:8,代码来源:MessagingConfiguration.java
示例6: deleteAndCreateTopics
import org.springframework.kafka.test.rule.KafkaEmbedded; //导入依赖的package包/类
public static void deleteAndCreateTopics(final KafkaEmbedded kafkaEmbedded, final String... topics) {
//attempt deletion first to ensure the topics don't already exist
deleteTopics(kafkaEmbedded, topics);
createTopics(kafkaEmbedded, topics);
}
开发者ID:gchq,项目名称:stroom-stats,代码行数:6,代码来源:KafkaEmbededUtils.java
注:本文中的org.springframework.kafka.test.rule.KafkaEmbedded类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论