本文整理汇总了Java中org.apache.kafka.clients.admin.NewTopic类的典型用法代码示例。如果您正苦于以下问题:Java NewTopic类的具体用法?Java NewTopic怎么用?Java NewTopic使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
NewTopic类属于org.apache.kafka.clients.admin包,在下文中一共展示了NewTopic类的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: createTopic
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
/**
* Creates a topic in Kafka. If the topic already exists this does nothing.
* @param topicName - the namespace name to create.
* @param partitions - the number of partitions to create.
*/
public void createTopic(final String topicName, final int partitions) {
final short replicationFactor = 1;
// Create admin client
try (final AdminClient adminClient = KafkaAdminClient.create(buildDefaultClientConfig())) {
try {
// Define topic
final NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
// Create topic, which is async call.
final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
// Since the call is Async, Lets wait for it to complete.
createTopicsResult.values().get(topicName).get();
} catch (InterruptedException | ExecutionException e) {
if (!(e.getCause() instanceof TopicExistsException)) {
throw new RuntimeException(e.getMessage(), e);
}
// TopicExistsException - Swallow this exception, just means the topic already exists.
}
}
}
开发者ID:salesforce,项目名称:kafka-junit,代码行数:28,代码来源:KafkaTestServer.java
示例2: setupAndCreateKafkaBasedLog
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
Map<String, Object> producerProps = new HashMap<>();
producerProps.putAll(config.originals());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.putAll(config.originals());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
Map<String, Object> adminProps = new HashMap<>(config.originals());
NewTopic topicDescription = TopicAdmin.defineTopic(topic).
compacted().
partitions(1).
replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)).
build();
return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:KafkaConfigBackingStore.java
示例3: returnNullWithApiVersionMismatch
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
/**
* 0.10.x clients can't talk with 0.9.x brokers, and 0.10.0.0 introduced the new protocol with API versions.
* That means we can simulate an API version mismatch.
*
* @throws Exception
*/
@Test
public void returnNullWithApiVersionMismatch() {
final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
boolean internal = false;
Cluster cluster = createCluster(1);
try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) {
env.kafkaClient().setNode(cluster.controller());
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
env.kafkaClient().prepareResponse(createTopicResponseWithUnsupportedVersion(newTopic));
TopicAdmin admin = new TopicAdmin(null, env.adminClient());
admin.createTopic(newTopic);
fail();
} catch (UnsupportedVersionException e) {
// expected
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:TopicAdminTest.java
示例4: shouldCreateOneTopicWhenProvidedMultipleDefinitionsWithSameTopicName
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
@Test
public void shouldCreateOneTopicWhenProvidedMultipleDefinitionsWithSameTopicName() {
NewTopic newTopic1 = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
NewTopic newTopic2 = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
Cluster cluster = createCluster(1);
try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) {
env.kafkaClient().setNode(cluster.controller());
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
env.kafkaClient().prepareResponse(createTopicResponse(newTopic1));
TopicAdmin admin = new TopicAdmin(null, env.adminClient());
Set<String> newTopicNames = admin.createTopics(newTopic1, newTopic2);
assertEquals(1, newTopicNames.size());
assertEquals(newTopic2.name(), newTopicNames.iterator().next());
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:TopicAdminTest.java
示例5: createTopicResponseWithAlreadyExists
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
private CreateTopicsResponse createTopicResponseWithAlreadyExists(NewTopic... topics) {
return createTopicResponse(new ApiError(Errors.TOPIC_ALREADY_EXISTS, "Topic already exists"), topics);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:4,代码来源:TopicAdminTest.java
示例6: createTopicResponseWithUnsupportedVersion
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
private CreateTopicsResponse createTopicResponseWithUnsupportedVersion(NewTopic... topics) {
return createTopicResponse(new ApiError(Errors.UNSUPPORTED_VERSION, "This version of the API is not supported"), topics);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:4,代码来源:TopicAdminTest.java
示例7: createTopic
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
@Override
public void createTopic(String topicName, int partitionsNum, short replicationFactor)
{
NewTopic newTopic = new NewTopic(topicName, partitionsNum, replicationFactor);
CreateTopicsResult result = kafkaAdminClient.createTopics(Collections.singletonList(newTopic));
KafkaFuture future = result.values().get(topicName);
try {
future.get();
}
catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
开发者ID:dbiir,项目名称:paraflow,代码行数:14,代码来源:DefaultProducer.java
示例8: configure
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
@Override
public void configure(final WorkerConfig config) {
this.topic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
if (topic.equals(""))
throw new ConfigException("Must specify topic for connector status.");
Map<String, Object> producerProps = new HashMap<>();
producerProps.putAll(config.originals());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.putAll(config.originals());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
Map<String, Object> adminProps = new HashMap<>(config.originals());
NewTopic topicDescription = TopicAdmin.defineTopic(topic).
compacted().
partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)).
replicationFactor(config.getShort(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG)).
build();
Callback<ConsumerRecord<String, byte[]>> readCallback = new Callback<ConsumerRecord<String, byte[]>>() {
@Override
public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
read(record);
}
};
this.kafkaLog = createKafkaBasedLog(topic, producerProps, consumerProps, readCallback, topicDescription, adminProps);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:33,代码来源:KafkaStatusBackingStore.java
示例9: createKafkaBasedLog
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps,
Callback<ConsumerRecord<String, byte[]>> consumedCallback,
final NewTopic topicDescription, final Map<String, Object> adminProps) {
Runnable createTopics = new Runnable() {
@Override
public void run() {
try (TopicAdmin admin = new TopicAdmin(adminProps)) {
admin.createTopics(topicDescription);
}
}
};
return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, time, createTopics);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:KafkaStatusBackingStore.java
示例10: configure
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
@Override
public void configure(final WorkerConfig config) {
String topic = config.getString(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG);
if (topic.equals(""))
throw new ConfigException("Offset storage topic must be specified");
data = new HashMap<>();
Map<String, Object> producerProps = new HashMap<>();
producerProps.putAll(config.originals());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.putAll(config.originals());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
Map<String, Object> adminProps = new HashMap<>(config.originals());
NewTopic topicDescription = TopicAdmin.defineTopic(topic).
compacted().
partitions(config.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)).
replicationFactor(config.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG)).
build();
offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminProps);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:29,代码来源:KafkaOffsetBackingStore.java
示例11: createKafkaBasedLog
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps,
Callback<ConsumerRecord<byte[], byte[]>> consumedCallback,
final NewTopic topicDescription, final Map<String, Object> adminProps) {
Runnable createTopics = new Runnable() {
@Override
public void run() {
try (TopicAdmin admin = new TopicAdmin(adminProps)) {
admin.createTopics(topicDescription);
}
}
};
return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:KafkaOffsetBackingStore.java
示例12: createKafkaBasedLog
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps,
Callback<ConsumerRecord<String, byte[]>> consumedCallback,
final NewTopic topicDescription, final Map<String, Object> adminProps) {
Runnable createTopics = new Runnable() {
@Override
public void run() {
try (TopicAdmin admin = new TopicAdmin(adminProps)) {
admin.createTopics(topicDescription);
}
}
};
return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:KafkaConfigBackingStore.java
示例13: shouldNotCreateTopicWhenItAlreadyExists
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
@Test
public void shouldNotCreateTopicWhenItAlreadyExists() {
NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
Cluster cluster = createCluster(1);
try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) {
env.kafkaClient().setNode(cluster.controller());
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
env.kafkaClient().prepareResponse(createTopicResponseWithAlreadyExists(newTopic));
TopicAdmin admin = new TopicAdmin(null, env.adminClient());
boolean created = admin.createTopic(newTopic);
assertFalse(created);
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:TopicAdminTest.java
示例14: shouldCreateTopicWhenItDoesNotExist
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
@Test
public void shouldCreateTopicWhenItDoesNotExist() {
NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
Cluster cluster = createCluster(1);
try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) {
env.kafkaClient().setNode(cluster.controller());
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
env.kafkaClient().prepareResponse(createTopicResponse(newTopic));
TopicAdmin admin = new TopicAdmin(null, env.adminClient());
boolean created = admin.createTopic(newTopic);
assertTrue(created);
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:TopicAdminTest.java
示例15: createTopicResponse
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
private CreateTopicsResponse createTopicResponse(ApiError error, NewTopic... topics) {
if (error == null) error = new ApiError(Errors.NONE, "");
Map<String, ApiError> topicResults = new HashMap<>();
for (NewTopic topic : topics) {
topicResults.put(topic.name(), error);
}
return new CreateTopicsResponse(topicResults);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:TopicAdminTest.java
示例16: invokeCreateTopic
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
private void invokeCreateTopic(String topic, int partitions, int replicationFactor) throws Throwable {
NewTopic newTopic = new NewTopic(topic, partitions,
(short) replicationFactor);
CreateTopicsResult topics = adminClient.createTopics(Collections.singletonList(newTopic));
topics.all().get(DEFAULT_OPERATION_TIMEOUT, TimeUnit.SECONDS);
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:8,代码来源:KafkaBinderTests.java
示例17: createTopic
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
@Override
public void createTopic(final String topic,
final int numPartitions,
final short replicatonFactor,
final Map<String, String> configs) {
if (isTopicExists(topic)) {
Map<String, TopicDescription> topicDescriptions = describeTopics(Collections.singletonList(topic));
TopicDescription topicDescription = topicDescriptions.get(topic);
if (topicDescription.partitions().size() != numPartitions ||
topicDescription.partitions().get(0).replicas().size() < replicatonFactor) {
throw new KafkaTopicException(String.format(
"Topic '%s' does not conform to the requirements Partitions:%d v %d. Replication: %d v %d", topic,
topicDescription.partitions().size(), numPartitions,
topicDescription.partitions().get(0).replicas().size(), replicatonFactor
));
}
// Topic with the partitons and replicas exists, reuse it!
log.debug("Did not create topic {} with {} partitions and replication-factor {} since it already exists", topic,
numPartitions, replicatonFactor);
return;
}
NewTopic newTopic = new NewTopic(topic, numPartitions, replicatonFactor);
newTopic.configs(configs);
try {
log.info("Creating topic '{}'", topic);
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
} catch (InterruptedException | ExecutionException e) {
throw new KafkaResponseGetFailedException("Failed to guarantee existence of topic " +
topic, e);
}
}
开发者ID:confluentinc,项目名称:ksql,代码行数:33,代码来源:KafkaTopicClientImpl.java
示例18: build
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
/**
* Build the {@link NewTopic} representation.
*
* @return the topic description; never null
*/
public NewTopic build() {
return new NewTopic(name, numPartitions, replicationFactor).configs(configs);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:TopicAdmin.java
示例19: createTopic
import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
/**
* Attempt to create the topic described by the given definition, returning true if the topic was created or false
* if the topic already existed.
*
* @param topic the specification of the topic
* @return true if the topic was created or false if the topic already existed.
* @throws ConnectException if an error occurs, the operation takes too long, or the thread is interrupted while
* attempting to perform this operation
* @throws UnsupportedVersionException if the broker does not support the necessary APIs to perform this request
*/
public boolean createTopic(NewTopic topic) {
if (topic == null) return false;
Set<String> newTopicNames = createTopics(topic);
return newTopicNames.contains(topic.name());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:TopicAdmin.java
注:本文中的org.apache.kafka.clients.admin.NewTopic类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论