• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java DefaultKafkaConsumerFactory类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.springframework.kafka.core.DefaultKafkaConsumerFactory的典型用法代码示例。如果您正苦于以下问题:Java DefaultKafkaConsumerFactory类的具体用法?Java DefaultKafkaConsumerFactory怎么用?Java DefaultKafkaConsumerFactory使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



DefaultKafkaConsumerFactory类属于org.springframework.kafka.core包,在下文中一共展示了DefaultKafkaConsumerFactory类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: createSystemConsumer

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
private void createSystemConsumer(String name, MessageListener<String, String> consumeEvent) {
    log.info("Creating kafka consumer for topic {}", name);
    ContainerProperties containerProps = new ContainerProperties(name);

    Map<String, Object> props = kafkaProperties.buildConsumerProperties();
    if (name.equals(applicationProperties.getKafkaSystemTopic())) {
        props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
    }
    ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(props);

    ConcurrentMessageListenerContainer<String, String> container =
        new ConcurrentMessageListenerContainer<>(factory, containerProps);
    container.setupMessageListener(consumeEvent);
    container.start();
    log.info("Successfully created kafka consumer for topic {}", name);
}
 
开发者ID:xm-online,项目名称:xm-uaa,代码行数:17,代码来源:ApplicationStartup.java


示例2: consumerFactory

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
public ConsumerFactory<String, String> consumerFactory() {

        Map<String, Object> properties = new HashMap<String, Object>();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        return new DefaultKafkaConsumerFactory<String, String>(properties);

    }
 
开发者ID:cwenao,项目名称:springboot_cwenao,代码行数:17,代码来源:KafkaConsumerConfig.java


示例3: createConsumerFactory

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
private ConsumerFactory<?, ?> createConsumerFactory(String group) {
	if (defaultConsumerFactory != null) {
		return defaultConsumerFactory;
	}
	Map<String, Object> props = new HashMap<>();
	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
	if (!ObjectUtils.isEmpty(binderConfigurationProperties.getConsumerConfiguration())) {
		props.putAll(binderConfigurationProperties.getConsumerConfiguration());
	}
	if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
				this.binderConfigurationProperties.getKafkaConnectionString());
	}
	props.put("group.id", group);
	return new DefaultKafkaConsumerFactory<>(props);
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:18,代码来源:KafkaBinderMetrics.java


示例4: createKafkaConsumerFactory

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
private ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
	Map<String, Object> props = new HashMap<>();
	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
	props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
	props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
	props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, anonymous ? "latest" : "earliest");
	props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);

	if (!ObjectUtils.isEmpty(configurationProperties.getConsumerConfiguration())) {
		props.putAll(configurationProperties.getConsumerConfiguration());
	}
	if (ObjectUtils.isEmpty(props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
	}
	if (!ObjectUtils.isEmpty(consumerProperties.getExtension().getConfiguration())) {
		props.putAll(consumerProperties.getExtension().getConfiguration());
	}
	if (!ObjectUtils.isEmpty(consumerProperties.getExtension().getStartOffset())) {
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
				consumerProperties.getExtension().getStartOffset().name());
	}

	return new DefaultKafkaConsumerFactory<>(props);
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:27,代码来源:KafkaMessageChannelBinder.java


示例5: healthIndicator

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
@Bean
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
	Map<String, Object> props = new HashMap<>();
	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
	if (!ObjectUtils.isEmpty(configurationProperties.getConsumerConfiguration())) {
		props.putAll(configurationProperties.getConsumerConfiguration());
	}
	if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
	}
	ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
	KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder,
			consumerFactory);
	indicator.setTimeout(this.configurationProperties.getHealthTimeout());
	return indicator;
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:18,代码来源:KafkaBinderConfiguration.java


示例6: testKafkaHealthIndicatorProperties

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
@Test
public void testKafkaHealthIndicatorProperties() {
	assertNotNull(this.kafkaBinderHealthIndicator);
	Field consumerFactoryField = ReflectionUtils.findField(KafkaBinderHealthIndicator.class, "consumerFactory",
			ConsumerFactory.class);
	ReflectionUtils.makeAccessible(consumerFactoryField);
	DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) ReflectionUtils.getField(
			consumerFactoryField, this.kafkaBinderHealthIndicator);
	Field configField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs", Map.class);
	ReflectionUtils.makeAccessible(configField);
	Map<String, Object> configs = (Map<String, Object>) ReflectionUtils.getField(configField, consumerFactory);
	assertTrue(configs.containsKey("bootstrap.servers"));
	List<String> bootstrapServers = new ArrayList<>();
	bootstrapServers.add("10.98.09.199:9092");
	bootstrapServers.add("10.98.09.196:9092");
	assertTrue(((List<String>) configs.get("bootstrap.servers")).containsAll(bootstrapServers));
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:18,代码来源:KafkaBinderAutoConfigurationPropertiesTest.java


示例7: consumerFactory

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
  final Map<String, Object> consumerProps = KafkaTestUtils
      .consumerProps("sampleRawConsumer", "false", embeddedKafka);
  consumerProps.put("auto.offset.reset", "earliest");

  return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(consumerProps), tracer());
}
 
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:9,代码来源:TestConfiguration.java


示例8: consumerFactory

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
public ConsumerFactory<String, String> consumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}
 
开发者ID:ebi-wp,项目名称:kafka-streams-api-websockets,代码行数:9,代码来源:KafkaConsumerConfig.java


示例9: consumerFactory

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
/**
 * Create the consumers by loading the config
 * 
 * @return
 */
@Bean
public ConsumerFactory<String, Student> consumerFactory() {

	return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
			new StringDeserializer(), new JsonDeserializer<>(
					Student.class));
}
 
开发者ID:sarojrout,项目名称:spring-tutorial,代码行数:13,代码来源:StudentConsumerConfig.java


示例10: createCommandConsumer

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
private void createCommandConsumer(String name) {
    log.info("Creating kafka command consumer for topic {}", name);
    ContainerProperties containerProps = new ContainerProperties(name);

    Map<String, Object> props = kafkaProperties.buildConsumerProperties();
    props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
    ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(props);

    ConcurrentMessageListenerContainer<String, String> container =
        new ConcurrentMessageListenerContainer<>(factory, containerProps);
    container.setupMessageListener((MessageListener<String, String>) commandConsumer::consumeEvent);
    container.start();
    log.info("Successfully created kafka command consumer for topic {}", name);
}
 
开发者ID:xm-online,项目名称:xm-ms-timeline,代码行数:15,代码来源:ApplicationStartup.java


示例11: setup

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
@Before
public void setup() throws Exception {
	Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
	DefaultKafkaConsumerFactory<String, String> cf =
		new DefaultKafkaConsumerFactory<>(consumerProps);
	ContainerProperties containerProperties = new ContainerProperties(TEST_TOPIC);
	container = new KafkaMessageListenerContainer<>(cf, containerProperties);
	final BlockingQueue<ConsumerRecord<String, String>> records = new LinkedBlockingQueue<>();
	container.setupMessageListener((MessageListener<String, String>) record -> {
           log.error("Message received: " + record);
           records.add(record);
       });
	container.start();
	ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
	Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
	ProducerFactory<String, String> pf =
		new DefaultKafkaProducerFactory<>(senderProps);
	template = new KafkaTemplate<>(pf);
	template.setDefaultTopic(TEST_TOPIC);
}
 
开发者ID:underscorenico,项目名称:skeleton-oms-java,代码行数:21,代码来源:HelloProcessTest.java


示例12: createContainerForDto

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
private KafkaMessageListenerContainer<Integer, IndexDTO> createContainerForDto(
        ContainerProperties containerProps, Object keyDeser, Object valDeser) {
    Map<String, Object> props = consumerProps(keyDeser, valDeser);
    DefaultKafkaConsumerFactory<Integer, IndexDTO> cf =
            new DefaultKafkaConsumerFactory<>(props);
    KafkaMessageListenerContainer<Integer, IndexDTO> container =
            new KafkaMessageListenerContainer<>(cf, containerProps);
    return container;
}
 
开发者ID:rmap-project,项目名称:rmap,代码行数:10,代码来源:SimpleKafkaIT.java


示例13: createContainer

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
private KafkaMessageListenerContainer<Integer, String> createContainer(
        ContainerProperties containerProps, Object keyDeser, Object valDeser) {
    Map<String, Object> props = consumerProps(keyDeser, valDeser);
    DefaultKafkaConsumerFactory<Integer, String> cf =
            new DefaultKafkaConsumerFactory<>(props);
    KafkaMessageListenerContainer<Integer, String> container =
            new KafkaMessageListenerContainer<>(cf, containerProps);
    return container;
}
 
开发者ID:rmap-project,项目名称:rmap,代码行数:10,代码来源:SimpleKafkaIT.java


示例14: consumerFactory

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
private ConsumerFactory<byte[], byte[]> consumerFactory() {
	Map<String, Object> props = new HashMap<>();
	KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
	props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
	props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
	props.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST-CONSUMER-GROUP");
	Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
	Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();

	return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:12,代码来源:KafkaBinderTests.java


示例15: getKafkaConsumer

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
private KafkaConsumer getKafkaConsumer(Binding binding) {
	DirectFieldAccessor bindingAccessor = new DirectFieldAccessor(binding);
	KafkaMessageDrivenChannelAdapter adapter = (KafkaMessageDrivenChannelAdapter) bindingAccessor
			.getPropertyValue("lifecycle");
	DirectFieldAccessor adapterAccessor = new DirectFieldAccessor(adapter);
	ConcurrentMessageListenerContainer messageListenerContainer =
			(ConcurrentMessageListenerContainer) adapterAccessor.getPropertyValue("messageListenerContainer");
	DirectFieldAccessor containerAccessor = new DirectFieldAccessor(messageListenerContainer);
	DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) containerAccessor
			.getPropertyValue("consumerFactory");
	return (KafkaConsumer) consumerFactory.createConsumer();
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:13,代码来源:KafkaBinderTests.java


示例16: setUp

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
@BeforeClass
public static void setUp() throws Exception {
	Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-id", "false", embeddedKafka);
	consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
	DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
	consumer = cf.createConsumer();
	embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "counts-id");
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:9,代码来源:KstreamBinderPojoInputStringOutputIntegrationTests.java


示例17: setUp

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
@BeforeClass
public static void setUp() throws Exception {
	Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group", "false", embeddedKafka);
	consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
	DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
	consumer = cf.createConsumer();
	embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "counts");
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:9,代码来源:KStreamBinderWordCountIntegrationTests.java


示例18: setUp

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
@BeforeClass
public static void setUp() throws Exception {
	Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-id", "false", embeddedKafka);
	//consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class.getName());
	consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
	DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
	consumer = cf.createConsumer();
	embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "counts-id");
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:10,代码来源:KStreamBinderPojoInputAndPrimitiveTypeOutputTests.java


示例19: consumerFactory

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
@Bean
public ConsumerFactory<String, Foo> consumerFactory() {
	Map<String, Object> props = new HashMap<>();
	props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configProperties.getBrokerAddress());
	props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup");
	props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
	props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
	props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
	JsonDeserializer<Foo> jsonDeserializer = new JsonDeserializer<>(Foo.class);
	return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), jsonDeserializer);
}
 
开发者ID:SpringOnePlatform2016,项目名称:grussell-spring-kafka,代码行数:12,代码来源:JsonConfiguration.java


示例20: consumerFactory

import org.springframework.kafka.core.DefaultKafkaConsumerFactory; //导入依赖的package包/类
@Bean
public ConsumerFactory<String, DefaultAvMessage> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(
            consumerConfigs(),
            new StringDeserializer(),
            new JsonDeserializer<>(DefaultAvMessage.class)
    );
}
 
开发者ID:dvoraka,项目名称:av-service,代码行数:9,代码来源:KafkaFileClientConfig.java



注:本文中的org.springframework.kafka.core.DefaultKafkaConsumerFactory类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java WorldTickEvent类代码示例发布时间:2022-05-21
下一篇:
Java IndexType类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap