本文整理汇总了Java中org.apache.kafka.common.serialization.LongSerializer类的典型用法代码示例。如果您正苦于以下问题:Java LongSerializer类的具体用法?Java LongSerializer怎么用?Java LongSerializer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
LongSerializer类属于org.apache.kafka.common.serialization包,在下文中一共展示了LongSerializer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: run
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
public void run(Configuration configuration, Environment environment) throws Exception {
final CollectorRegistry collectorRegistry = new CollectorRegistry();
collectorRegistry.register(new DropwizardExports(environment.metrics()));
environment.admin()
.addServlet("metrics", new MetricsServlet(collectorRegistry))
.addMapping("/metrics");
final PrometheusMetricsReporter reporter = PrometheusMetricsReporter.newMetricsReporter()
.withCollectorRegistry(collectorRegistry)
.withConstLabel("service", getName())
.build();
final Tracer tracer = getTracer();
final Tracer metricsTracer = io.opentracing.contrib.metrics.Metrics.decorate(tracer, reporter);
GlobalTracer.register(metricsTracer);
final DynamicFeature tracing = new ServerTracingDynamicFeature.Builder(metricsTracer).build();
environment.jersey().register(tracing);
final Properties producerConfigs = new Properties();
producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "tweets-kafka:9092");
producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
final KafkaProducer<Long, String> kafkaProducer =
new KafkaProducer<>(producerConfigs, new LongSerializer(), new StringSerializer());
final Producer<Long, String> tracingKafkaProducer =
new TracingKafkaProducer<>(kafkaProducer, metricsTracer);
final ObjectMapper objectMapper = environment.getObjectMapper();
final TweetEventRepository tweetRepository = new KafkaTweetEventRepository(tracingKafkaProducer, objectMapper);
final TweetsService tweetsService = new TweetsService(tweetRepository);
final TweetsResource tweetsResource = new TweetsResource(tweetsService);
environment.jersey().register(tweetsResource);
}
开发者ID:jeqo,项目名称:talk-observing-distributed-systems,代码行数:34,代码来源:WorkerServiceApplication.java
示例2: produceRecords
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private static void produceRecords(String bootstrapServers) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
Producer<Long, byte[]> producer = new KafkaProducer<>(properties);
LongStream.rangeClosed(1, 100).boxed()
.map(number ->
new ProducerRecord<>(
TOPIC, //topic
number, //key
String.format("record-%s", number.toString()).getBytes())) //value
.forEach(record -> producer.send(record));
producer.close();
}
开发者ID:jeqo,项目名称:talk-kafka-messaging-logs,代码行数:18,代码来源:ProduceConsumeLongByteArrayRecord.java
示例3: produceToStreamOne
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private void produceToStreamOne()
throws ExecutionException, InterruptedException {
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOneInput,
Arrays.asList(
new KeyValue<>(10L, 1),
new KeyValue<>(5L, 2),
new KeyValue<>(12L, 3),
new KeyValue<>(15L, 4),
new KeyValue<>(20L, 5),
new KeyValue<Long, Integer>(70L, null)), // nulls should be filtered
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
LongSerializer.class,
IntegerSerializer.class,
new Properties()),
mockTime);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:KStreamRepartitionJoinTest.java
示例4: produceTopicValues
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private void produceTopicValues(final String topic) throws java.util.concurrent.ExecutionException, InterruptedException {
IntegrationTestUtils.produceKeyValuesSynchronously(
topic,
Arrays.asList(
new KeyValue<>("a", 1L),
new KeyValue<>("b", 2L),
new KeyValue<>("c", 3L),
new KeyValue<>("d", 4L),
new KeyValue<>("e", 5L)),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
LongSerializer.class,
new Properties()),
mockTime);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:GlobalKTableIntegrationTest.java
示例5: produceGlobalTableValues
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private void produceGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
IntegrationTestUtils.produceKeyValuesSynchronously(
globalOne,
Arrays.asList(
new KeyValue<>(1L, "F"),
new KeyValue<>(2L, "G"),
new KeyValue<>(3L, "H"),
new KeyValue<>(4L, "I"),
new KeyValue<>(5L, "J")),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
LongSerializer.class,
StringSerializer.class,
new Properties()),
mockTime);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:GlobalKTableIntegrationTest.java
示例6: setupConfigsAndUtils
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
@BeforeClass
public static void setupConfigsAndUtils() throws Exception {
PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer");
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:JoinIntegrationTest.java
示例7: prepareInputData
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private void prepareInputData() throws Exception {
CLUSTER.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class);
mockTime.sleep(10);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, mockTime.milliseconds());
mockTime.sleep(10);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, mockTime.milliseconds());
mockTime.sleep(10);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, mockTime.milliseconds());
mockTime.sleep(10);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, mockTime.milliseconds());
mockTime.sleep(10);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, mockTime.milliseconds());
mockTime.sleep(10);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, mockTime.milliseconds());
mockTime.sleep(1);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, mockTime.milliseconds());
mockTime.sleep(1);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, mockTime.milliseconds());
mockTime.sleep(1);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, mockTime.milliseconds());
mockTime.sleep(1);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:27,代码来源:ResetIntegrationTest.java
示例8: RealTimeTradeProducer
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private RealTimeTradeProducer(int index, String broker, String topic, int tradesPerSecond, int keysFrom, int keysTo) throws IOException,
URISyntaxException {
if (tradesPerSecond <= 0) {
throw new RuntimeException("tradesPerSecond=" + tradesPerSecond);
}
this.index = index;
this.topic = topic;
this.tradesPerSecond = tradesPerSecond;
tickers = new String[keysTo - keysFrom];
Arrays.setAll(tickers, i -> "T-" + Integer.toString(i + keysFrom));
Properties props = new Properties();
props.setProperty("bootstrap.servers", broker);
props.setProperty("key.serializer", LongSerializer.class.getName());
props.setProperty("value.serializer", TradeSerializer.class.getName());
this.producer = new KafkaProducer<>(props);
}
开发者ID:hazelcast,项目名称:big-data-benchmark,代码行数:17,代码来源:RealTimeTradeProducer.java
示例9: testSinkDisplayData
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
@Test
public void testSinkDisplayData() {
try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
KafkaIO.Write<Integer, Long> write = KafkaIO.<Integer, Long>write()
.withBootstrapServers("myServerA:9092,myServerB:9092")
.withTopic("myTopic")
.withValueSerializer(LongSerializer.class)
.withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey));
DisplayData displayData = DisplayData.from(write);
assertThat(displayData, hasDisplayItem("topic", "myTopic"));
assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092"));
assertThat(displayData, hasDisplayItem("retries", 3));
}
}
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:KafkaIOTest.java
示例10: MockProducerWrapper
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
MockProducerWrapper() {
producerKey = String.valueOf(ThreadLocalRandom.current().nextLong());
mockProducer = new MockProducer<Integer, Long>(
false, // disable synchronous completion of send. see ProducerSendCompletionThread below.
new IntegerSerializer(),
new LongSerializer()) {
// override flush() so that it does not complete all the waiting sends, giving a chance to
// ProducerCompletionThread to inject errors.
@Override
public void flush() {
while (completeNext()) {
// there are some uncompleted records. let the completion thread handle them.
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// ok to retry.
}
}
}
};
// Add the producer to the global map so that producer factory function can access it.
assertNull(MOCK_PRODUCER_MAP.putIfAbsent(producerKey, mockProducer));
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:KafkaIOTest.java
示例11: produceInitialGlobalTableValues
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private void produceInitialGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
IntegrationTestUtils.produceKeyValuesSynchronously(
globalOne,
Arrays.asList(
new KeyValue<>(1L, "A"),
new KeyValue<>(2L, "B"),
new KeyValue<>(3L, "C"),
new KeyValue<>(4L, "D")),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
LongSerializer.class,
StringSerializer.class,
new Properties()),
mockTime);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:GlobalKTableIntegrationTest.java
示例12: writeInputData
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private void writeInputData(final List<KeyValue<Long, Long>> records) throws Exception {
IntegrationTestUtils.produceKeyValuesSynchronously(
MULTI_PARTITION_INPUT_TOPIC,
records,
TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
CLUSTER.time
);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:EosIntegrationTest.java
示例13: TradeProducer
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private TradeProducer(String broker) {
loadTickers();
Properties props = new Properties();
props.setProperty("bootstrap.servers", broker);
props.setProperty("key.serializer", LongSerializer.class.getName());
props.setProperty("value.serializer", TradeSerializer.class.getName());
producer = new KafkaProducer<>(props);
}
开发者ID:hazelcast,项目名称:big-data-benchmark,代码行数:9,代码来源:TradeProducer.java
示例14: createProducer
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private static Producer<Object, Object> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092,localhost:9093,localhost:9094");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaMDWProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
开发者ID:CenturyLinkCloud,项目名称:mdw,代码行数:12,代码来源:KafkaAdapter.java
示例15: testSink
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
@Test
public void testSink() throws Exception {
// Simply read from kafka source and write to kafka sink. Then verify the records
// are correctly published to mock kafka producer.
int numElements = 1000;
try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
ProducerSendCompletionThread completionThread =
new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
String topic = "test";
p
.apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
.withoutMetadata())
.apply(KafkaIO.<Integer, Long>write()
.withBootstrapServers("none")
.withTopic(topic)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(LongSerializer.class)
.withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey)));
p.run();
completionThread.shutdown();
verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false);
}
}
开发者ID:apache,项目名称:beam,代码行数:32,代码来源:KafkaIOTest.java
示例16: testValuesSink
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
@Test
public void testValuesSink() throws Exception {
// similar to testSink(), but use values()' interface.
int numElements = 1000;
try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
ProducerSendCompletionThread completionThread =
new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
String topic = "test";
p
.apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
.withoutMetadata())
.apply(Values.<Long>create()) // there are no keys
.apply(KafkaIO.<Integer, Long>write()
.withBootstrapServers("none")
.withTopic(topic)
.withValueSerializer(LongSerializer.class)
.withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey))
.values());
p.run();
completionThread.shutdown();
verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, true);
}
}
开发者ID:apache,项目名称:beam,代码行数:32,代码来源:KafkaIOTest.java
示例17: runSimpleCopyTest
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private void runSimpleCopyTest(final int numberOfRestarts,
final String inputTopic,
final String throughTopic,
final String outputTopic) throws Exception {
final KStreamBuilder builder = new KStreamBuilder();
final KStream<Long, Long> input = builder.stream(inputTopic);
KStream<Long, Long> output = input;
if (throughTopic != null) {
output = input.through(throughTopic);
}
output.to(outputTopic);
for (int i = 0; i < numberOfRestarts; ++i) {
final long factor = i;
final KafkaStreams streams = new KafkaStreams(
builder,
StreamsTestUtils.getStreamsConfig(
applicationId,
CLUSTER.bootstrapServers(),
Serdes.LongSerde.class.getName(),
Serdes.LongSerde.class.getName(),
new Properties() {
{
put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
}
}));
try {
streams.start();
final List<KeyValue<Long, Long>> inputData = prepareData(factor * 100, factor * 100 + 10L, 0L, 1L);
IntegrationTestUtils.produceKeyValuesSynchronously(
inputTopic,
inputData,
TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
CLUSTER.time
);
final List<KeyValue<Long, Long>> committedRecords
= IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
TestUtils.consumerConfig(
CLUSTER.bootstrapServers(),
CONSUMER_GROUP_ID,
LongDeserializer.class,
LongDeserializer.class,
new Properties() {
{
put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
}
}),
inputTopic,
inputData.size()
);
checkResultPerKey(committedRecords, inputData);
} finally {
streams.close();
}
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:63,代码来源:EosIntegrationTest.java
示例18: shouldBeAbleToPerformMultipleTransactions
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
@Test
public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
final KStreamBuilder builder = new KStreamBuilder();
builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC);
final KafkaStreams streams = new KafkaStreams(
builder,
StreamsTestUtils.getStreamsConfig(
applicationId,
CLUSTER.bootstrapServers(),
Serdes.LongSerde.class.getName(),
Serdes.LongSerde.class.getName(),
new Properties() {
{
put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
}
}));
try {
streams.start();
final List<KeyValue<Long, Long>> firstBurstOfData = prepareData(0L, 5L, 0L);
final List<KeyValue<Long, Long>> secondBurstOfData = prepareData(5L, 8L, 0L);
IntegrationTestUtils.produceKeyValuesSynchronously(
SINGLE_PARTITION_INPUT_TOPIC,
firstBurstOfData,
TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
CLUSTER.time
);
final List<KeyValue<Long, Long>> firstCommittedRecords
= IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
TestUtils.consumerConfig(
CLUSTER.bootstrapServers(),
CONSUMER_GROUP_ID,
LongDeserializer.class,
LongDeserializer.class,
new Properties() {
{
put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
}
}),
SINGLE_PARTITION_OUTPUT_TOPIC,
firstBurstOfData.size()
);
assertThat(firstCommittedRecords, equalTo(firstBurstOfData));
IntegrationTestUtils.produceKeyValuesSynchronously(
SINGLE_PARTITION_INPUT_TOPIC,
secondBurstOfData,
TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
CLUSTER.time
);
final List<KeyValue<Long, Long>> secondCommittedRecords
= IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
TestUtils.consumerConfig(
CLUSTER.bootstrapServers(),
CONSUMER_GROUP_ID,
LongDeserializer.class,
LongDeserializer.class,
new Properties() {
{
put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
}
}),
SINGLE_PARTITION_OUTPUT_TOPIC,
secondBurstOfData.size()
);
assertThat(secondCommittedRecords, equalTo(secondBurstOfData));
} finally {
streams.close();
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:78,代码来源:EosIntegrationTest.java
示例19: testKafkaBinderConfigurationWithKafkaProperties
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
@Test
public void testKafkaBinderConfigurationWithKafkaProperties() throws Exception {
assertNotNull(this.kafkaMessageChannelBinder);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
new KafkaProducerProperties());
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory",
String.class, ExtendedProducerProperties.class);
getProducerFactoryMethod.setAccessible(true);
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod
.invoke(this.kafkaMessageChannelBinder, "foo", producerProperties);
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs",
Map.class);
ReflectionUtils.makeAccessible(producerFactoryConfigField);
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField,
producerFactory);
assertTrue(producerConfigs.get("batch.size").equals(10));
assertTrue(producerConfigs.get("key.serializer").equals(LongSerializer.class));
assertTrue(producerConfigs.get("key.deserializer") == null);
assertTrue(producerConfigs.get("value.serializer").equals(LongSerializer.class));
assertTrue(producerConfigs.get("value.deserializer") == null);
assertTrue(producerConfigs.get("compression.type").equals("snappy"));
List<String> bootstrapServers = new ArrayList<>();
bootstrapServers.add("10.98.09.199:9092");
bootstrapServers.add("10.98.09.196:9092");
assertTrue((((List<String>) producerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod(
"createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class);
createKafkaConsumerFactoryMethod.setAccessible(true);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(
new KafkaConsumerProperties());
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod
.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties);
Field consumerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs",
Map.class);
ReflectionUtils.makeAccessible(consumerFactoryConfigField);
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField,
consumerFactory);
assertTrue(consumerConfigs.get("key.deserializer").equals(LongDeserializer.class));
assertTrue(consumerConfigs.get("key.serializer") == null);
assertTrue(consumerConfigs.get("value.deserializer").equals(LongDeserializer.class));
assertTrue(consumerConfigs.get("value.serialized") == null);
assertTrue(consumerConfigs.get("group.id").equals("groupIdFromBootConfig"));
assertTrue(consumerConfigs.get("auto.offset.reset").equals("earliest"));
assertTrue((((List<String>) consumerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:46,代码来源:KafkaBinderAutoConfigurationPropertiesTest.java
示例20: KafkaSubscriberBlackboxTest
import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
public KafkaSubscriberBlackboxTest() {
super(new TestEnvironment());
mockProducer = new MockProducer<Long, Long>(true, new LongSerializer(), new LongSerializer());
}
开发者ID:unicredit,项目名称:kafka-reactive-streams,代码行数:5,代码来源:KafkaSubscriberBlackboxTest.java
注:本文中的org.apache.kafka.common.serialization.LongSerializer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论