本文整理汇总了Java中org.apache.kafka.common.serialization.IntegerDeserializer类的典型用法代码示例。如果您正苦于以下问题:Java IntegerDeserializer类的具体用法?Java IntegerDeserializer怎么用?Java IntegerDeserializer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
IntegerDeserializer类属于org.apache.kafka.common.serialization包,在下文中一共展示了IntegerDeserializer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: createDefaultMessageFormats
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
/**
* Creates default message formats.
*/
private void createDefaultMessageFormats() {
final Map<String, String> defaultFormats = new HashMap<>();
defaultFormats.put("Short", ShortDeserializer.class.getName());
defaultFormats.put("ByteArray", ByteArrayDeserializer.class.getName());
defaultFormats.put("Bytes", BytesDeserializer.class.getName());
defaultFormats.put("Double", DoubleDeserializer.class.getName());
defaultFormats.put("Float", FloatDeserializer.class.getName());
defaultFormats.put("Integer", IntegerDeserializer.class.getName());
defaultFormats.put("Long", LongDeserializer.class.getName());
defaultFormats.put("String", StringDeserializer.class.getName());
// Create if needed.
for (final Map.Entry<String, String> entry : defaultFormats.entrySet()) {
MessageFormat messageFormat = messageFormatRepository.findByName(entry.getKey());
if (messageFormat == null) {
messageFormat = new MessageFormat();
}
messageFormat.setName(entry.getKey());
messageFormat.setClasspath(entry.getValue());
messageFormat.setJar("n/a");
messageFormat.setDefaultFormat(true);
messageFormatRepository.save(messageFormat);
}
}
开发者ID:SourceLabOrg,项目名称:kafka-webview,代码行数:28,代码来源:DataLoaderConfig.java
示例2: consumeRecords
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
private static void consumeRecords(String bootstrapServers) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "string-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));
ConsumerRecords<Integer, String> records = consumer.poll(10000);
for (ConsumerRecord<Integer, String> record : records)
out.printf(
"key = %s value = %s%n",
record.key(),
record.value());
consumer.close();
}
开发者ID:jeqo,项目名称:talk-kafka-messaging-logs,代码行数:23,代码来源:ProduceConsumeIntegerStringRecord.java
示例3: consumeRecords
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
private static void consumeRecords(String bootstrapServers) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "metadata-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));
ConsumerRecords<Integer, String> records = consumer.poll(10000);
for (ConsumerRecord<Integer, String> record : records) {
System.out.printf("key = %s value = %s\t", record.key(), record.value());
System.out.printf("ProducerRecord: topic=>%s partition=>%s offset=>%s timestamp=>%s checksum=>%s",
record.topic(),
record.partition(),
record.offset(),
FORMATTER.format(Instant.ofEpochMilli(record.timestamp())),
record.checksum());
System.out.println();
}
consumer.close();
}
开发者ID:jeqo,项目名称:talk-kafka-messaging-logs,代码行数:27,代码来源:ProduceConsumeRecordMetadata.java
示例4: receiveMessages
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
private List<String> receiveMessages(final Deserializer<?> valueDeserializer,
final int numMessages, final String topic) throws InterruptedException {
final Properties config = new Properties();
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test");
config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
IntegerDeserializer.class.getName());
config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializer.getClass().getName());
final List<String> received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
config,
topic,
numMessages,
60 * 1000);
Collections.sort(received);
return received;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:KStreamRepartitionJoinTest.java
示例5: setProduceConsumeProperties
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
private Properties setProduceConsumeProperties(final String clientId) {
Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
// the socket buffer needs to be large, especially when running in AWS with
// high latency. if running locally the default is fine.
props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// the socket buffer needs to be large, especially when running in AWS with
// high latency. if running locally the default is fine.
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
return props;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:SimpleBenchmark.java
示例6: testSinkWithInteger
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
@Test
public void testSinkWithInteger() throws InterruptedException {
KafkaUsage usage = new KafkaUsage();
String topic = UUID.randomUUID().toString();
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger expected = new AtomicInteger(2);
usage.consumeIntegers(topic, 10, 10, TimeUnit.SECONDS,
latch::countDown,
(k, v) -> v == expected.getAndIncrement());
KafkaSink<Integer> sink = new KafkaSink<>(vertx,
getKafkaConfig()
.put("topic", topic)
.put("value.serializer", IntegerSerializer.class.getName())
.put("value.deserializer", IntegerDeserializer.class.getName())
);
Source.from(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.transformPayload(i -> i + 1)
.to(sink);
assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue();
}
开发者ID:cescoffier,项目名称:fluid,代码行数:26,代码来源:KafkaSinkTest.java
示例7: testSource
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
@Test
public void testSource() throws InterruptedException {
KafkaUsage usage = new KafkaUsage();
String topic = UUID.randomUUID().toString();
List<Integer> results = new ArrayList<>();
KafkaSource<Integer> source = new KafkaSource<>(vertx,
getKafkaConfig()
.put("topic", topic)
.put("value.serializer", IntegerSerializer.class.getName())
.put("value.deserializer", IntegerDeserializer.class.getName())
);
source
.transformPayload(i -> i + 1)
.to(Sink.forEachPayload(results::add));
AtomicInteger counter = new AtomicInteger();
usage.produceIntegers(10, null,
() -> new ProducerRecord<>(topic, counter.getAndIncrement()));
await().atMost(1, TimeUnit.MINUTES).until(() -> results.size() >= 10);
assertThat(results).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
开发者ID:cescoffier,项目名称:fluid,代码行数:23,代码来源:KafkaSourceTest.java
示例8: testCommonHeaders
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
@Test
public void testCommonHeaders(TestContext context) throws InterruptedException {
Async async = context.async();
KafkaUsage usage = new KafkaUsage();
String topic = UUID.randomUUID().toString();
KafkaSource<Integer> source = new KafkaSource<>(vertx,
getKafkaConfig()
.put("topic", topic)
.put("value.serializer", IntegerSerializer.class.getName())
.put("value.deserializer", IntegerDeserializer.class.getName())
);
source
.to(data -> {
KafkaConsumerRecord record = original(data);
assertThat(record).isNotNull();
assertThat(key(data)).isNotNull();
async.complete();
return complete();
});
usage.produceIntegers(1, null,
() -> new ProducerRecord<>(topic, "key", 1));
}
开发者ID:cescoffier,项目名称:fluid,代码行数:26,代码来源:KafkaSourceTest.java
示例9: testMulticastWithBufferSize
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
@Test
public void testMulticastWithBufferSize() throws InterruptedException {
KafkaUsage usage = new KafkaUsage();
String topic = UUID.randomUUID().toString();
KafkaSource<Integer> source = new KafkaSource<>(vertx,
getKafkaConfig()
.put("topic", topic)
.put("value.serializer", IntegerSerializer.class.getName())
.put("value.deserializer", IntegerDeserializer.class.getName())
.put("multicast.buffer.size", 20)
);
assertThat(source).isNotNull();
checkMulticast(usage, topic, source);
}
开发者ID:cescoffier,项目名称:fluid,代码行数:18,代码来源:KafkaSourceTest.java
示例10: testMulticastWithTime
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
@Test
public void testMulticastWithTime() throws InterruptedException {
KafkaUsage usage = new KafkaUsage();
String topic = UUID.randomUUID().toString();
KafkaSource<Integer> source = new KafkaSource<>(vertx,
getKafkaConfig()
.put("topic", topic)
.put("value.serializer", IntegerSerializer.class.getName())
.put("value.deserializer", IntegerDeserializer.class.getName())
.put("multicast.buffer.period.ms", 2000)
);
assertThat(source).isNotNull();
checkMulticast(usage, topic, source);
}
开发者ID:cescoffier,项目名称:fluid,代码行数:17,代码来源:KafkaSourceTest.java
示例11: testAutoCommit
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
@Test
public void testAutoCommit() throws Exception {
LOG.info("Start testAutoCommit");
ContainerProperties containerProps = new ContainerProperties("topic3", "topic4");
final CountDownLatch latch = new CountDownLatch(4);
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
LOG.info("received: " + message);
latch.countDown();
});
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps,
IntegerDeserializer.class, StringDeserializer.class);
container.setBeanName("testAutoCommit");
container.start();
Thread.sleep(5000); // wait a bit for the container to start
KafkaTemplate<Integer, String> template = createTemplate(IntegerSerializer.class, StringSerializer.class);
template.setDefaultTopic("topic3");
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
assertTrue(latch.await(60, TimeUnit.SECONDS));
container.stop();
LOG.info("Stop testAutoCommit");
}
开发者ID:rmap-project,项目名称:rmap,代码行数:26,代码来源:SimpleKafkaIT.java
示例12: main
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
public static void main(String[] args) {
String topic = "persistent://sample/standalone/ns/my-topic";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", IntegerDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(100);
records.forEach(record -> {
log.info("Received record: {}", record);
});
// Commit last offset
consumer.commitSync();
}
}
开发者ID:apache,项目名称:incubator-pulsar,代码行数:24,代码来源:ConsumerExample.java
示例13: mkKafkaReadTransform
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
/**
* Creates a consumer with two topics, with 10 partitions each.
* numElements are (round-robin) assigned all the 20 partitions.
*/
private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
int numElements,
int maxNumRecords,
@Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) {
List<String> topics = ImmutableList.of("topic_a", "topic_b");
KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read()
.withBootstrapServers("myServer1:9092,myServer2:9092")
.withTopics(topics)
.withConsumerFactoryFn(new ConsumerFactoryFn(
topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(LongDeserializer.class)
.withMaxNumRecords(maxNumRecords);
if (timestampFn != null) {
return reader.withTimestampFn(timestampFn);
} else {
return reader;
}
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:KafkaIOTest.java
示例14: testUnboundedSourceWithSingleTopic
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
@Test
public void testUnboundedSourceWithSingleTopic() {
// same as testUnboundedSource, but with single topic
int numElements = 1000;
String topic = "my_topic";
KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read()
.withBootstrapServers("none")
.withTopic("my_topic")
.withConsumerFactoryFn(new ConsumerFactoryFn(
ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST))
.withMaxNumRecords(numElements)
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(LongDeserializer.class);
PCollection<Long> input = p
.apply(reader.withoutMetadata())
.apply(Values.<Long>create());
addCountingAsserts(input, numElements);
p.run();
}
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:KafkaIOTest.java
示例15: run
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
@Override
public void run() {
logger.info("Worker thread started");
try (Consumer<String, Integer> consumer = new KafkaConsumer<>(configuration, new StringDeserializer(), new IntegerDeserializer())) {
SeekingConsumerLogic logic = new SeekingConsumerLogic(consumer, stateDao, messagesToChangeState, percentFailureProbability);
consumer.subscribe(Collections.singletonList(topic), logic);
while (!finish.get()) {
ConsumerRecords<String, Integer> records = consumer.poll(pollTimeout.toMillis());
long startTime = System.nanoTime();
logic.processMessages(records);
long duration = System.nanoTime() - startTime;
logger.debug("Processing of poll batch finished: {} messages, {} ms", records.count(), TimeUnit.NANOSECONDS.toMillis(duration));
}
logic.optionallyCommitAllOffsets();
} catch (Exception e) {
logger.error("Unexpected exception occurred: {}", e.toString(), e);
}
logger.info("Worker thread stopped");
}
开发者ID:avast,项目名称:kafka-tests,代码行数:26,代码来源:SeekingConsumer.java
示例16: run
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
@Override
public void run() {
logger.info("Worker thread started");
try (Consumer<String, Integer> consumer = new KafkaConsumer<>(configuration, new StringDeserializer(), new IntegerDeserializer())) {
consumer.subscribe(Collections.singletonList(topic), this);
while (!finish.get()) {
ConsumerRecords<String, Integer> records = consumer.poll(pollTimeout.toMillis());
long startTime = System.nanoTime();
for (ConsumerRecord<String, Integer> record : records) {
logger.trace("Message consumed: {}, {}, {}-{}/{}", record.key(), record.value(), record.topic(), record.partition(), record.offset());
stateDao.markConsume(ConsumerType.autocommit, UUID.fromString(record.key()), record.value());
}
long duration = System.nanoTime() - startTime;
logger.debug("Processing of poll batch finished: {} messages, {} ms", records.count(), TimeUnit.NANOSECONDS.toMillis(duration));
}
} catch (Exception e) {
logger.error("Unexpected exception occurred: {}", e.toString(), e);
}
logger.info("Worker thread stopped");
}
开发者ID:avast,项目名称:kafka-tests,代码行数:26,代码来源:AutoCommitConsumer.java
示例17: doWork
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
public void doWork(Properties properties, ProteusTask task) {
topicsList.add(ConsumerUtils.getTopicName(runnerProperties.getProperty("eu.proteus.kafkaTopic")));
properties.put("bootstrap.servers", properties.get("com.treelogic.proteus.kafka.bootstrapServers"));
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerSerializer");
properties.put("value.deserializer", ProteusSerializer.class.getName());
properties.put("group.id",
"proteus-" + ConsumerUtils.getTopicName(runnerProperties.getProperty("eu.proteus.kafkaTopic")));
properties.put("max.poll.records", 100);
properties.put("session.timeout.ms", 60000);
properties.put("request.timeout.ms", 80000);
properties.put("fetch.max.wati.ms", 60000);
properties.put("auto.offset.reset", "latest");
kafkaConsumer = new KafkaConsumer<>(properties, new IntegerDeserializer(), new ProteusSerializer());
kafkaConsumer.subscribe(topicsList);
try {
while (true) {
ConsumerRecords<Integer, Measurement> records = kafkaConsumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<Integer, Measurement> record : records) {
logger.info("Task " + this.getClass().getSimpleName() + " doing work for coil "
+ record.value().getCoilID() + " on topic "
+ ConsumerUtils.getTopicName(runnerProperties.getProperty("eu.proteus.kafkaTopic")));
task.doWork(record.key(), record.value(), proteusBucket, topicsList);
}
}
} finally {
System.out.println("Cerrariamos la ejecución del hilo < "
+ this.runnerProperties.getProperty("eu.proteus.kafkaTopic") + " >");
}
}
开发者ID:proteus-h2020,项目名称:proteus-consumer-couchbase,代码行数:35,代码来源:Runner.java
示例18: main
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
public static void main(String[] args) {
ArrayList<String> topicsList = new ArrayList<String>();
HashMap<String, Object> kafkaProperties = new HashMap<String, Object>();
topicsList.add("proteus-flatness");
kafkaProperties.put("bootstrap.servers", "192.168.4.246:6667,192.168.4.247:6667,192.168.4.248:6667");
kafkaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
kafkaProperties.put("value.deserializer", ProteusSerializer.class.getName());
kafkaProperties.put("group.id", "proteus");
KafkaConsumer<Integer, Measurement> kafkaConsumer;
ProteusSerializer myValueDeserializer = new ProteusSerializer();
IntegerDeserializer keyDeserializer = new IntegerDeserializer();
kafkaConsumer = new KafkaConsumer<Integer, Measurement>(kafkaProperties, keyDeserializer, myValueDeserializer);
kafkaConsumer.subscribe(topicsList);
try {
while (true) {
ConsumerRecords<Integer, Measurement> records = kafkaConsumer.poll(1);
for (ConsumerRecord<Integer, Measurement> record : records) {
System.out.println("traza");
System.out.println(record);
}
}
} finally {
kafkaConsumer.close();
}
}
开发者ID:proteus-h2020,项目名称:proteus-consumer-couchbase,代码行数:34,代码来源:ExampleHSM.java
示例19: main
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
public static void main(String[] args) {
ArrayList<String> topicsList = new ArrayList<String>();
HashMap<String, Object> kafkaProperties = new HashMap<String, Object>();
topicsList.add("proteus-realtime");
kafkaProperties.put("bootstrap.servers", "192.168.4.246:6667,192.168.4.247:6667,192.168.4.248:6667");
kafkaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
kafkaProperties.put("value.deserializer", ProteusSerializer.class.getName());
kafkaProperties.put("group.id", "proteus");
KafkaConsumer<Integer, Measurement> kafkaConsumer;
kafkaConsumer = new KafkaConsumer<Integer, Measurement>(kafkaProperties, new IntegerDeserializer(),
new ProteusSerializer());
kafkaConsumer.subscribe(topicsList);
try {
while (true) {
ConsumerRecords<Integer, Measurement> records = kafkaConsumer.poll(1);
for (ConsumerRecord<Integer, Measurement> record : records) {
System.out.println("record realtime: " + record.toString());
}
}
} finally {
kafkaConsumer.close();
}
}
开发者ID:proteus-h2020,项目名称:proteus-consumer-couchbase,代码行数:32,代码来源:ExampleRealtime.java
示例20: shouldAggregate
import org.apache.kafka.common.serialization.IntegerDeserializer; //导入依赖的package包/类
@Test
public void shouldAggregate() throws Exception {
produceMessages(mockTime.milliseconds());
groupedStream.aggregate(
initializer,
aggregator,
Serdes.Integer(),
"aggregate-by-selected-key")
.to(Serdes.String(), Serdes.Integer(), outputTopic);
startStreams();
produceMessages(mockTime.milliseconds());
final List<KeyValue<String, Integer>> results = receiveMessages(
new StringDeserializer(),
new IntegerDeserializer(),
10);
Collections.sort(results, new Comparator<KeyValue<String, Integer>>() {
@Override
public int compare(final KeyValue<String, Integer> o1, final KeyValue<String, Integer> o2) {
return KStreamAggregationIntegrationTest.compare(o1, o2);
}
});
assertThat(results, is(Arrays.asList(
KeyValue.pair("A", 1),
KeyValue.pair("A", 2),
KeyValue.pair("B", 1),
KeyValue.pair("B", 2),
KeyValue.pair("C", 1),
KeyValue.pair("C", 2),
KeyValue.pair("D", 1),
KeyValue.pair("D", 2),
KeyValue.pair("E", 1),
KeyValue.pair("E", 2)
)));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:40,代码来源:KStreamAggregationIntegrationTest.java
注:本文中的org.apache.kafka.common.serialization.IntegerDeserializer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论