本文整理汇总了Java中org.springframework.kafka.test.utils.KafkaTestUtils类的典型用法代码示例。如果您正苦于以下问题:Java KafkaTestUtils类的具体用法?Java KafkaTestUtils怎么用?Java KafkaTestUtils使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
KafkaTestUtils类属于org.springframework.kafka.test.utils包,在下文中一共展示了KafkaTestUtils类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: nullKey
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
@Test
public void nullKey() throws Exception {
Producer<Integer, String> producer = createProducer();
ProducerRecord<Integer, String> record = new ProducerRecord<>("messages", "test");
producer.send(record);
final Map<String, Object> consumerProps = KafkaTestUtils
.consumerProps("sampleRawConsumer", "false", embeddedKafka);
consumerProps.put("auto.offset.reset", "earliest");
final CountDownLatch latch = new CountDownLatch(1);
createConsumer(latch, null);
producer.close();
}
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:17,代码来源:TracingKafkaTest.java
示例2: receiveAndValidateFoo
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
private void receiveAndValidateFoo(ConfigurableApplicationContext context) throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("foos");
template.sendDefault("{\"id\":\"123\"}");
ConsumerRecord<Integer, String> cr = KafkaTestUtils.getSingleRecord(consumer, "counts-id");
assertThat(cr.key().equals(123));
ObjectMapper om = new ObjectMapper();
Long aLong = om.readValue(cr.value(), Long.class);
assertThat(aLong.equals(1L));
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:14,代码来源:KStreamBinderPojoInputAndPrimitiveTypeOutputTests.java
示例3: setUp
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
@Before
public void setUp() throws Exception {
// set up the Kafka producer properties
Map<String, Object> senderProperties =
KafkaTestUtils.senderProps(AllSpringKafkaTests.embeddedKafka.getBrokersAsString());
// create a Kafka producer factory
ProducerFactory<String, String> producerFactory =
new DefaultKafkaProducerFactory<String, String>(senderProperties);
// create a Kafka template
template = new KafkaTemplate<>(producerFactory);
// set the default topic to send to
template.setDefaultTopic(AllSpringKafkaTests.RECEIVER_TOPIC);
// wait until the partitions are assigned
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
AllSpringKafkaTests.embeddedKafka.getPartitionsPerTopic());
}
}
开发者ID:code-not-found,项目名称:spring-kafka,代码行数:23,代码来源:SpringKafkaReceiverTest.java
示例4: setUp
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
@Before
public void setUp() throws Exception {
// set up the Kafka producer properties
Map<String, Object> senderProperties =
KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
// create a Kafka producer factory
ProducerFactory<String, String> producerFactory =
new DefaultKafkaProducerFactory<String, String>(senderProperties);
// create a Kafka template
template = new KafkaTemplate<>(producerFactory);
// set the default topic to send to
template.setDefaultTopic(RECEIVER_TOPIC);
// wait until the partitions are assigned
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
embeddedKafka.getPartitionsPerTopic());
}
}
开发者ID:code-not-found,项目名称:spring-kafka,代码行数:23,代码来源:SpringKafkaReceiverTest.java
示例5: createConsumer
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
private void createConsumer(final CountDownLatch latch, final Integer key)
throws InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
final Map<String, Object> consumerProps = KafkaTestUtils
.consumerProps("sampleRawConsumer", "false", embeddedKafka);
consumerProps.put("auto.offset.reset", "earliest");
executorService.execute(() -> {
KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
TracingKafkaConsumer<Integer, String> tracingKafkaConsumer = new TracingKafkaConsumer<>(
kafkaConsumer, mockTracer);
tracingKafkaConsumer.subscribe(Collections.singletonList("messages"));
while (latch.getCount() > 0) {
ConsumerRecords<Integer, String> records = tracingKafkaConsumer.poll(100);
for (ConsumerRecord<Integer, String> record : records) {
SpanContext spanContext = TracingKafkaUtils
.extractSpanContext(record.headers(), mockTracer);
assertNotNull(spanContext);
assertEquals("test", record.value());
if (key != null) {
assertEquals(key, record.key());
}
tracingKafkaConsumer.commitSync();
latch.countDown();
}
}
kafkaConsumer.close();
});
assertTrue(latch.await(30, TimeUnit.SECONDS));
}
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:36,代码来源:TracingKafkaTest.java
示例6: test
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
@Test
public void test() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-app");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, senderProps.get("bootstrap.servers"));
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
Producer<Integer, String> producer = createProducer();
ProducerRecord<Integer, String> record = new ProducerRecord<>("stream-test", 1, "test");
producer.send(record);
final Serde<String> stringSerde = Serdes.String();
final Serde<Integer> intSerde = Serdes.Integer();
KStreamBuilder builder = new KStreamBuilder();
KStream<Integer, String> kStream = builder
.stream(intSerde, stringSerde, "stream-test");
kStream.map((key, value) -> new KeyValue<>(key, value + "map")).to("stream-out");
KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(config),
new TracingKafkaClientSupplier(mockTracer));
streams.start();
await().atMost(15, TimeUnit.SECONDS).until(reportedSpansSize(), equalTo(3));
streams.close();
producer.close();
List<MockSpan> spans = mockTracer.finishedSpans();
assertEquals(3, spans.size());
checkSpans(spans);
assertNull(mockTracer.activeSpan());
}
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:39,代码来源:TracingKafkaStreamsTest.java
示例7: consumerFactory
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
final Map<String, Object> consumerProps = KafkaTestUtils
.consumerProps("sampleRawConsumer", "false", embeddedKafka);
consumerProps.put("auto.offset.reset", "earliest");
return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(consumerProps), tracer());
}
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:9,代码来源:TestConfiguration.java
示例8: setup
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
@Before
public void setup() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<String, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties(TEST_TOPIC);
container = new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<String, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) record -> {
log.error("Message received: " + record);
records.add(record);
});
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
ProducerFactory<String, String> pf =
new DefaultKafkaProducerFactory<>(senderProps);
template = new KafkaTemplate<>(pf);
template.setDefaultTopic(TEST_TOPIC);
}
开发者ID:underscorenico,项目名称:skeleton-oms-java,代码行数:21,代码来源:HelloProcessTest.java
示例9: consumerProps
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
private Map<String, Object> consumerProps(Object keyDeserializer, Object valueDeserializer) {
Map<String, Object> props = KafkaTestUtils.consumerProps("group", "true", kafkaEmbedded);
// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
return props;
}
开发者ID:rmap-project,项目名称:rmap,代码行数:12,代码来源:SimpleKafkaIT.java
示例10: senderProps
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
private Map<String, Object> senderProps(Object keySerializer, Object valueSerializer) {
Map<String, Object> props = KafkaTestUtils.senderProps(kafkaEmbedded.getBrokersAsString());
// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
return props;
}
开发者ID:rmap-project,项目名称:rmap,代码行数:12,代码来源:SimpleKafkaIT.java
示例11: setUp
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
@BeforeClass
public static void setUp() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-id", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "counts-id");
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:9,代码来源:KstreamBinderPojoInputStringOutputIntegrationTests.java
示例12: receiveAndValidateFoo
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
private void receiveAndValidateFoo(ConfigurableApplicationContext context) throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("foos");
template.sendDefault("{\"id\":\"123\"}");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "counts-id");
assertThat(cr.value().contains("Count for product with ID 123: 1")).isTrue();
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:10,代码来源:KstreamBinderPojoInputStringOutputIntegrationTests.java
示例13: setUp
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
@BeforeClass
public static void setUp() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "counts");
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:9,代码来源:KStreamBinderWordCountIntegrationTests.java
示例14: receiveAndValidate
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
private void receiveAndValidate(ConfigurableApplicationContext context) throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("words");
template.sendDefault("foobar");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "counts");
assertThat(cr.value().contains("\"word\":\"foobar\",\"count\":1")).isTrue();
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:10,代码来源:KStreamBinderWordCountIntegrationTests.java
示例15: setUp
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
@BeforeClass
public static void setUp() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-id", "false", embeddedKafka);
//consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class.getName());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "counts-id");
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:10,代码来源:KStreamBinderPojoInputAndPrimitiveTypeOutputTests.java
示例16: receiveAndValidateFoo
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
private void receiveAndValidateFoo(ConfigurableApplicationContext context) throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("foos");
template.sendDefault("{\"id\":\"123\"}");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "counts-id");
assertThat(cr.value().contains("Count for product with ID 123: 1")).isTrue();
ProductCountApplication.Foo foo = context.getBean(ProductCountApplication.Foo.class);
assertThat(foo.getProductStock(123).equals(1L));
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:13,代码来源:KStreamInteractiveQueryIntegrationTests.java
示例17: testMessagesOverKafka
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
@Test
public void testMessagesOverKafka() throws Exception {
this.template.send("input", "bar".getBytes());
Consumer<byte[], String> consumer = this.consumerFactory.createConsumer();
kafkaEmbedded.consumeFromAnEmbeddedTopic(consumer, "output");
ConsumerRecords<byte[], String> replies = KafkaTestUtils.getRecords(consumer);
assertThat(replies.count()).isEqualTo(1);
Iterator<ConsumerRecord<byte[], String>> iterator = replies.iterator();
assertThat(iterator.next().value()).isEqualTo("BAR");
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-samples,代码行数:15,代码来源:ToUpperCaseProcessorIntTests.java
示例18: setUp
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
@Before
public void setUp() throws Exception {
// set up the Kafka consumer properties
Map<String, Object> consumerProperties =
KafkaTestUtils.consumerProps("sender", "false", embeddedKafka);
// create a Kafka consumer factory
DefaultKafkaConsumerFactory<String, String> consumerFactory =
new DefaultKafkaConsumerFactory<String, String>(consumerProperties);
// set the topic that needs to be consumed
ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);
// create a Kafka MessageListenerContainer
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
// create a thread safe queue to store the received message
records = new LinkedBlockingQueue<>();
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
LOGGER.debug("test-listener received message='{}'", record.toString());
records.add(record);
}
});
// start the container and underlying message listener
container.start();
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
}
开发者ID:code-not-found,项目名称:spring-kafka,代码行数:35,代码来源:SpringKafkaSenderTest.java
示例19: createProducer
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
private Producer<Integer, String> createProducer() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
KafkaProducer<Integer, String> kafkaProducer = new KafkaProducer<>(senderProps);
return new TracingKafkaProducer<>(kafkaProducer, mockTracer);
}
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:6,代码来源:TracingKafkaTest.java
示例20: producerFactory
import org.springframework.kafka.test.utils.KafkaTestUtils; //导入依赖的package包/类
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(
KafkaTestUtils.producerProps(embeddedKafka)), tracer());
}
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:6,代码来源:TestConfiguration.java
注:本文中的org.springframework.kafka.test.utils.KafkaTestUtils类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论