• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java NewTopic类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.clients.admin.NewTopic的典型用法代码示例。如果您正苦于以下问题:Java NewTopic类的具体用法?Java NewTopic怎么用?Java NewTopic使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



NewTopic类属于org.apache.kafka.clients.admin包,在下文中一共展示了NewTopic类的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: createTopic

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
/**
 * Creates a topic in Kafka. If the topic already exists this does nothing.
 * @param topicName - the namespace name to create.
 * @param partitions - the number of partitions to create.
 */
public void createTopic(final String topicName, final int partitions) {
    final short replicationFactor = 1;

    // Create admin client
    try (final AdminClient adminClient = KafkaAdminClient.create(buildDefaultClientConfig())) {
        try {
            // Define topic
            final NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);

            // Create topic, which is async call.
            final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));

            // Since the call is Async, Lets wait for it to complete.
            createTopicsResult.values().get(topicName).get();
        } catch (InterruptedException | ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException)) {
                throw new RuntimeException(e.getMessage(), e);
            }
            // TopicExistsException - Swallow this exception, just means the topic already exists.
        }
    }
}
 
开发者ID:salesforce,项目名称:kafka-junit,代码行数:28,代码来源:KafkaTestServer.java


示例2: setupAndCreateKafkaBasedLog

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
    Map<String, Object> producerProps = new HashMap<>();
    producerProps.putAll(config.originals());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

    Map<String, Object> consumerProps = new HashMap<>();
    consumerProps.putAll(config.originals());
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

    Map<String, Object> adminProps = new HashMap<>(config.originals());
    NewTopic topicDescription = TopicAdmin.defineTopic(topic).
            compacted().
            partitions(1).
            replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)).
            build();

    return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:KafkaConfigBackingStore.java


示例3: returnNullWithApiVersionMismatch

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
/**
 * 0.10.x clients can't talk with 0.9.x brokers, and 0.10.0.0 introduced the new protocol with API versions.
 * That means we can simulate an API version mismatch.
 *
 * @throws Exception
 */
@Test
public void returnNullWithApiVersionMismatch() {
    final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
    boolean internal = false;
    Cluster cluster = createCluster(1);
    try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) {
        env.kafkaClient().setNode(cluster.controller());
        env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
        env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
        env.kafkaClient().prepareResponse(createTopicResponseWithUnsupportedVersion(newTopic));
        TopicAdmin admin = new TopicAdmin(null, env.adminClient());
        admin.createTopic(newTopic);
        fail();
    } catch (UnsupportedVersionException e) {
        // expected
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:TopicAdminTest.java


示例4: shouldCreateOneTopicWhenProvidedMultipleDefinitionsWithSameTopicName

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
@Test
public void shouldCreateOneTopicWhenProvidedMultipleDefinitionsWithSameTopicName() {
    NewTopic newTopic1 = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
    NewTopic newTopic2 = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
    Cluster cluster = createCluster(1);
    try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) {
        env.kafkaClient().setNode(cluster.controller());
        env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
        env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
        env.kafkaClient().prepareResponse(createTopicResponse(newTopic1));
        TopicAdmin admin = new TopicAdmin(null, env.adminClient());
        Set<String> newTopicNames = admin.createTopics(newTopic1, newTopic2);
        assertEquals(1, newTopicNames.size());
        assertEquals(newTopic2.name(), newTopicNames.iterator().next());
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:TopicAdminTest.java


示例5: createTopicResponseWithAlreadyExists

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
private CreateTopicsResponse createTopicResponseWithAlreadyExists(NewTopic... topics) {
    return createTopicResponse(new ApiError(Errors.TOPIC_ALREADY_EXISTS, "Topic already exists"), topics);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:4,代码来源:TopicAdminTest.java


示例6: createTopicResponseWithUnsupportedVersion

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
private CreateTopicsResponse createTopicResponseWithUnsupportedVersion(NewTopic... topics) {
    return createTopicResponse(new ApiError(Errors.UNSUPPORTED_VERSION, "This version of the API is not supported"), topics);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:4,代码来源:TopicAdminTest.java


示例7: createTopic

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
@Override
public void createTopic(String topicName, int partitionsNum, short replicationFactor)
{
    NewTopic newTopic = new NewTopic(topicName, partitionsNum, replicationFactor);
    CreateTopicsResult result = kafkaAdminClient.createTopics(Collections.singletonList(newTopic));
    KafkaFuture future = result.values().get(topicName);
    try {
        future.get();
    }
    catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}
 
开发者ID:dbiir,项目名称:paraflow,代码行数:14,代码来源:DefaultProducer.java


示例8: configure

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
@Override
public void configure(final WorkerConfig config) {
    this.topic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
    if (topic.equals(""))
        throw new ConfigException("Must specify topic for connector status.");

    Map<String, Object> producerProps = new HashMap<>();
    producerProps.putAll(config.originals());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class

    Map<String, Object> consumerProps = new HashMap<>();
    consumerProps.putAll(config.originals());
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

    Map<String, Object> adminProps = new HashMap<>(config.originals());
    NewTopic topicDescription = TopicAdmin.defineTopic(topic).
            compacted().
            partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)).
            replicationFactor(config.getShort(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG)).
            build();

    Callback<ConsumerRecord<String, byte[]>> readCallback = new Callback<ConsumerRecord<String, byte[]>>() {
        @Override
        public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
            read(record);
        }
    };
    this.kafkaLog = createKafkaBasedLog(topic, producerProps, consumerProps, readCallback, topicDescription, adminProps);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:33,代码来源:KafkaStatusBackingStore.java


示例9: createKafkaBasedLog

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
                                                          Map<String, Object> consumerProps,
                                                          Callback<ConsumerRecord<String, byte[]>> consumedCallback,
                                                          final NewTopic topicDescription, final Map<String, Object> adminProps) {
    Runnable createTopics = new Runnable() {
        @Override
        public void run() {
            try (TopicAdmin admin = new TopicAdmin(adminProps)) {
                admin.createTopics(topicDescription);
            }
        }
    };
    return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, time, createTopics);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:KafkaStatusBackingStore.java


示例10: configure

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
@Override
public void configure(final WorkerConfig config) {
    String topic = config.getString(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG);
    if (topic.equals(""))
        throw new ConfigException("Offset storage topic must be specified");

    data = new HashMap<>();

    Map<String, Object> producerProps = new HashMap<>();
    producerProps.putAll(config.originals());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

    Map<String, Object> consumerProps = new HashMap<>();
    consumerProps.putAll(config.originals());
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

    Map<String, Object> adminProps = new HashMap<>(config.originals());
    NewTopic topicDescription = TopicAdmin.defineTopic(topic).
            compacted().
            partitions(config.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)).
            replicationFactor(config.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG)).
            build();

    offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminProps);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:29,代码来源:KafkaOffsetBackingStore.java


示例11: createKafkaBasedLog

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
                                                          Map<String, Object> consumerProps,
                                                          Callback<ConsumerRecord<byte[], byte[]>> consumedCallback,
                                                          final NewTopic topicDescription, final Map<String, Object> adminProps) {
    Runnable createTopics = new Runnable() {
        @Override
        public void run() {
            try (TopicAdmin admin = new TopicAdmin(adminProps)) {
                admin.createTopics(topicDescription);
            }
        }
    };
    return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:KafkaOffsetBackingStore.java


示例12: createKafkaBasedLog

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
                                                          Map<String, Object> consumerProps,
                                                          Callback<ConsumerRecord<String, byte[]>> consumedCallback,
                                                          final NewTopic topicDescription, final Map<String, Object> adminProps) {
    Runnable createTopics = new Runnable() {
        @Override
        public void run() {
            try (TopicAdmin admin = new TopicAdmin(adminProps)) {
                admin.createTopics(topicDescription);
            }
        }
    };
    return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:KafkaConfigBackingStore.java


示例13: shouldNotCreateTopicWhenItAlreadyExists

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
@Test
public void shouldNotCreateTopicWhenItAlreadyExists() {
    NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
    Cluster cluster = createCluster(1);
    try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) {
        env.kafkaClient().setNode(cluster.controller());
        env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
        env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
        env.kafkaClient().prepareResponse(createTopicResponseWithAlreadyExists(newTopic));
        TopicAdmin admin = new TopicAdmin(null, env.adminClient());
        boolean created = admin.createTopic(newTopic);
        assertFalse(created);
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:TopicAdminTest.java


示例14: shouldCreateTopicWhenItDoesNotExist

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
@Test
public void shouldCreateTopicWhenItDoesNotExist() {
    NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
    Cluster cluster = createCluster(1);
    try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) {
        env.kafkaClient().setNode(cluster.controller());
        env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
        env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
        env.kafkaClient().prepareResponse(createTopicResponse(newTopic));
        TopicAdmin admin = new TopicAdmin(null, env.adminClient());
        boolean created = admin.createTopic(newTopic);
        assertTrue(created);
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:TopicAdminTest.java


示例15: createTopicResponse

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
private CreateTopicsResponse createTopicResponse(ApiError error, NewTopic... topics) {
    if (error == null) error = new ApiError(Errors.NONE, "");
    Map<String, ApiError> topicResults = new HashMap<>();
    for (NewTopic topic : topics) {
        topicResults.put(topic.name(), error);
    }
    return new CreateTopicsResponse(topicResults);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:TopicAdminTest.java


示例16: invokeCreateTopic

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
private void invokeCreateTopic(String topic, int partitions, int replicationFactor) throws Throwable {

		NewTopic newTopic = new NewTopic(topic, partitions,
				(short) replicationFactor);
		CreateTopicsResult topics = adminClient.createTopics(Collections.singletonList(newTopic));
		topics.all().get(DEFAULT_OPERATION_TIMEOUT, TimeUnit.SECONDS);
	}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:8,代码来源:KafkaBinderTests.java


示例17: createTopic

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
@Override
public void createTopic(final String topic,
                        final int numPartitions,
                        final short replicatonFactor,
                        final Map<String, String> configs) {
  if (isTopicExists(topic)) {
    Map<String, TopicDescription> topicDescriptions = describeTopics(Collections.singletonList(topic));
    TopicDescription topicDescription = topicDescriptions.get(topic);
    if (topicDescription.partitions().size() != numPartitions ||
        topicDescription.partitions().get(0).replicas().size() < replicatonFactor) {
      throw new KafkaTopicException(String.format(
          "Topic '%s' does not conform to the requirements Partitions:%d v %d. Replication: %d v %d", topic,
          topicDescription.partitions().size(), numPartitions,
          topicDescription.partitions().get(0).replicas().size(), replicatonFactor
      ));
    }
    // Topic with the partitons and replicas exists, reuse it!
    log.debug("Did not create topic {} with {} partitions and replication-factor {} since it already exists", topic,
        numPartitions, replicatonFactor);
    return;
  }
  NewTopic newTopic = new NewTopic(topic, numPartitions, replicatonFactor);
  newTopic.configs(configs);
  try {
    log.info("Creating topic '{}'", topic);
    adminClient.createTopics(Collections.singleton(newTopic)).all().get();

  } catch (InterruptedException | ExecutionException e) {
    throw new KafkaResponseGetFailedException("Failed to guarantee existence of topic " +
        topic, e);
  }
}
 
开发者ID:confluentinc,项目名称:ksql,代码行数:33,代码来源:KafkaTopicClientImpl.java


示例18: build

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
/**
 * Build the {@link NewTopic} representation.
 *
 * @return the topic description; never null
 */
public NewTopic build() {
    return new NewTopic(name, numPartitions, replicationFactor).configs(configs);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:TopicAdmin.java


示例19: createTopic

import org.apache.kafka.clients.admin.NewTopic; //导入依赖的package包/类
/**
 * Attempt to create the topic described by the given definition, returning true if the topic was created or false
 * if the topic already existed.
 *
 * @param topic the specification of the topic
 * @return true if the topic was created or false if the topic already existed.
 * @throws ConnectException            if an error occurs, the operation takes too long, or the thread is interrupted while
 *                                     attempting to perform this operation
 * @throws UnsupportedVersionException if the broker does not support the necessary APIs to perform this request
 */
public boolean createTopic(NewTopic topic) {
    if (topic == null) return false;
    Set<String> newTopicNames = createTopics(topic);
    return newTopicNames.contains(topic.name());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:TopicAdmin.java



注:本文中的org.apache.kafka.clients.admin.NewTopic类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java WSDLBoundPortType类代码示例发布时间:2022-05-23
下一篇:
Java SpeechResult类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap