• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java LongSerializer类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.common.serialization.LongSerializer的典型用法代码示例。如果您正苦于以下问题:Java LongSerializer类的具体用法?Java LongSerializer怎么用?Java LongSerializer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



LongSerializer类属于org.apache.kafka.common.serialization包,在下文中一共展示了LongSerializer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: run

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
public void run(Configuration configuration, Environment environment) throws Exception {
  final CollectorRegistry collectorRegistry = new CollectorRegistry();
  collectorRegistry.register(new DropwizardExports(environment.metrics()));
  environment.admin()
      .addServlet("metrics", new MetricsServlet(collectorRegistry))
      .addMapping("/metrics");

  final PrometheusMetricsReporter reporter = PrometheusMetricsReporter.newMetricsReporter()
      .withCollectorRegistry(collectorRegistry)
      .withConstLabel("service", getName())
      .build();

  final Tracer tracer = getTracer();
  final Tracer metricsTracer = io.opentracing.contrib.metrics.Metrics.decorate(tracer, reporter);
  GlobalTracer.register(metricsTracer);

  final DynamicFeature tracing = new ServerTracingDynamicFeature.Builder(metricsTracer).build();
  environment.jersey().register(tracing);

  final Properties producerConfigs = new Properties();
  producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "tweets-kafka:9092");
  producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
  producerConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
  final KafkaProducer<Long, String> kafkaProducer =
      new KafkaProducer<>(producerConfigs, new LongSerializer(), new StringSerializer());
  final Producer<Long, String> tracingKafkaProducer =
      new TracingKafkaProducer<>(kafkaProducer, metricsTracer);
  final ObjectMapper objectMapper = environment.getObjectMapper();
  final TweetEventRepository tweetRepository = new KafkaTweetEventRepository(tracingKafkaProducer, objectMapper);
  final TweetsService tweetsService = new TweetsService(tweetRepository);
  final TweetsResource tweetsResource = new TweetsResource(tweetsService);
  environment.jersey().register(tweetsResource);
}
 
开发者ID:jeqo,项目名称:talk-observing-distributed-systems,代码行数:34,代码来源:WorkerServiceApplication.java


示例2: produceRecords

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<Long, byte[]> producer = new KafkaProducer<>(properties);

    LongStream.rangeClosed(1, 100).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC, //topic
                            number, //key
                            String.format("record-%s", number.toString()).getBytes())) //value
            .forEach(record -> producer.send(record));
    producer.close();
}
 
开发者ID:jeqo,项目名称:talk-kafka-messaging-logs,代码行数:18,代码来源:ProduceConsumeLongByteArrayRecord.java


示例3: produceToStreamOne

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private void produceToStreamOne()
    throws ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronously(
        streamOneInput,
        Arrays.asList(
            new KeyValue<>(10L, 1),
            new KeyValue<>(5L, 2),
            new KeyValue<>(12L, 3),
            new KeyValue<>(15L, 4),
            new KeyValue<>(20L, 5),
            new KeyValue<Long, Integer>(70L, null)), // nulls should be filtered
        TestUtils.producerConfig(
            CLUSTER.bootstrapServers(),
            LongSerializer.class,
            IntegerSerializer.class,
            new Properties()),
        mockTime);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:KStreamRepartitionJoinTest.java


示例4: produceTopicValues

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private void produceTopicValues(final String topic) throws java.util.concurrent.ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronously(
            topic,
            Arrays.asList(
                    new KeyValue<>("a", 1L),
                    new KeyValue<>("b", 2L),
                    new KeyValue<>("c", 3L),
                    new KeyValue<>("d", 4L),
                    new KeyValue<>("e", 5L)),
            TestUtils.producerConfig(
                    CLUSTER.bootstrapServers(),
                    StringSerializer.class,
                    LongSerializer.class,
                    new Properties()),
            mockTime);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:GlobalKTableIntegrationTest.java


示例5: produceGlobalTableValues

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private void produceGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronously(
            globalOne,
            Arrays.asList(
                    new KeyValue<>(1L, "F"),
                    new KeyValue<>(2L, "G"),
                    new KeyValue<>(3L, "H"),
                    new KeyValue<>(4L, "I"),
                    new KeyValue<>(5L, "J")),
            TestUtils.producerConfig(
                    CLUSTER.bootstrapServers(),
                    LongSerializer.class,
                    StringSerializer.class,
                    new Properties()),
            mockTime);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:GlobalKTableIntegrationTest.java


示例6: setupConfigsAndUtils

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
@BeforeClass
public static void setupConfigsAndUtils() throws Exception {
    PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
    PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
    PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
    PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer");
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
    STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
    STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
    STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:JoinIntegrationTest.java


示例7: prepareInputData

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private void prepareInputData() throws Exception {
    CLUSTER.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);

    final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class);

    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(1);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(1);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(1);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(1);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:27,代码来源:ResetIntegrationTest.java


示例8: RealTimeTradeProducer

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private RealTimeTradeProducer(int index, String broker, String topic, int tradesPerSecond, int keysFrom, int keysTo) throws IOException,
        URISyntaxException {
    if (tradesPerSecond <= 0) {
        throw new RuntimeException("tradesPerSecond=" + tradesPerSecond);
    }
    this.index = index;
    this.topic = topic;
    this.tradesPerSecond = tradesPerSecond;
    tickers = new String[keysTo - keysFrom];
    Arrays.setAll(tickers, i -> "T-" + Integer.toString(i + keysFrom));
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", broker);
    props.setProperty("key.serializer", LongSerializer.class.getName());
    props.setProperty("value.serializer", TradeSerializer.class.getName());
    this.producer = new KafkaProducer<>(props);
}
 
开发者ID:hazelcast,项目名称:big-data-benchmark,代码行数:17,代码来源:RealTimeTradeProducer.java


示例9: testSinkDisplayData

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
@Test
public void testSinkDisplayData() {
  try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
    KafkaIO.Write<Integer, Long> write = KafkaIO.<Integer, Long>write()
      .withBootstrapServers("myServerA:9092,myServerB:9092")
      .withTopic("myTopic")
      .withValueSerializer(LongSerializer.class)
      .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey));

    DisplayData displayData = DisplayData.from(write);

    assertThat(displayData, hasDisplayItem("topic", "myTopic"));
    assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092"));
    assertThat(displayData, hasDisplayItem("retries", 3));
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:KafkaIOTest.java


示例10: MockProducerWrapper

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
MockProducerWrapper() {
  producerKey = String.valueOf(ThreadLocalRandom.current().nextLong());
  mockProducer = new MockProducer<Integer, Long>(
    false, // disable synchronous completion of send. see ProducerSendCompletionThread below.
    new IntegerSerializer(),
    new LongSerializer()) {

    // override flush() so that it does not complete all the waiting sends, giving a chance to
    // ProducerCompletionThread to inject errors.

    @Override
    public void flush() {
      while (completeNext()) {
        // there are some uncompleted records. let the completion thread handle them.
        try {
          Thread.sleep(10);
        } catch (InterruptedException e) {
          // ok to retry.
        }
      }
    }
  };

  // Add the producer to the global map so that producer factory function can access it.
  assertNull(MOCK_PRODUCER_MAP.putIfAbsent(producerKey, mockProducer));
}
 
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:KafkaIOTest.java


示例11: produceInitialGlobalTableValues

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private void produceInitialGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronously(
            globalOne,
            Arrays.asList(
                    new KeyValue<>(1L, "A"),
                    new KeyValue<>(2L, "B"),
                    new KeyValue<>(3L, "C"),
                    new KeyValue<>(4L, "D")),
            TestUtils.producerConfig(
                    CLUSTER.bootstrapServers(),
                    LongSerializer.class,
                    StringSerializer.class,
                    new Properties()),
            mockTime);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:GlobalKTableIntegrationTest.java


示例12: writeInputData

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private void writeInputData(final List<KeyValue<Long, Long>> records) throws Exception {
    IntegrationTestUtils.produceKeyValuesSynchronously(
        MULTI_PARTITION_INPUT_TOPIC,
        records,
        TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
        CLUSTER.time
    );
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:EosIntegrationTest.java


示例13: TradeProducer

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private TradeProducer(String broker) {
    loadTickers();
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", broker);
    props.setProperty("key.serializer", LongSerializer.class.getName());
    props.setProperty("value.serializer", TradeSerializer.class.getName());
    producer = new KafkaProducer<>(props);
}
 
开发者ID:hazelcast,项目名称:big-data-benchmark,代码行数:9,代码来源:TradeProducer.java


示例14: createProducer

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private static Producer<Object, Object> createProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "localhost:9092,localhost:9093,localhost:9094");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaMDWProducer");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            LongSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());
    return new KafkaProducer<>(props);
}
 
开发者ID:CenturyLinkCloud,项目名称:mdw,代码行数:12,代码来源:KafkaAdapter.java


示例15: testSink

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
@Test
public void testSink() throws Exception {
  // Simply read from kafka source and write to kafka sink. Then verify the records
  // are correctly published to mock kafka producer.

  int numElements = 1000;

  try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {

    ProducerSendCompletionThread completionThread =
      new ProducerSendCompletionThread(producerWrapper.mockProducer).start();

    String topic = "test";

    p
      .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
          .withoutMetadata())
      .apply(KafkaIO.<Integer, Long>write()
          .withBootstrapServers("none")
          .withTopic(topic)
          .withKeySerializer(IntegerSerializer.class)
          .withValueSerializer(LongSerializer.class)
          .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey)));

    p.run();

    completionThread.shutdown();

    verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false);
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:32,代码来源:KafkaIOTest.java


示例16: testValuesSink

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
@Test
public void testValuesSink() throws Exception {
  // similar to testSink(), but use values()' interface.

  int numElements = 1000;

  try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {

    ProducerSendCompletionThread completionThread =
      new ProducerSendCompletionThread(producerWrapper.mockProducer).start();

    String topic = "test";

    p
      .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
          .withoutMetadata())
      .apply(Values.<Long>create()) // there are no keys
      .apply(KafkaIO.<Integer, Long>write()
          .withBootstrapServers("none")
          .withTopic(topic)
          .withValueSerializer(LongSerializer.class)
          .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey))
          .values());

    p.run();

    completionThread.shutdown();

    verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, true);
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:32,代码来源:KafkaIOTest.java


示例17: runSimpleCopyTest

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
private void runSimpleCopyTest(final int numberOfRestarts,
                               final String inputTopic,
                               final String throughTopic,
                               final String outputTopic) throws Exception {
    final KStreamBuilder builder = new KStreamBuilder();
    final KStream<Long, Long> input = builder.stream(inputTopic);
    KStream<Long, Long> output = input;
    if (throughTopic != null) {
        output = input.through(throughTopic);
    }
    output.to(outputTopic);

    for (int i = 0; i < numberOfRestarts; ++i) {
        final long factor = i;
        final KafkaStreams streams = new KafkaStreams(
            builder,
            StreamsTestUtils.getStreamsConfig(
                applicationId,
                CLUSTER.bootstrapServers(),
                Serdes.LongSerde.class.getName(),
                Serdes.LongSerde.class.getName(),
                new Properties() {
                    {
                        put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
                        put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
                    }
                }));

        try {
            streams.start();

            final List<KeyValue<Long, Long>> inputData = prepareData(factor * 100, factor * 100 + 10L, 0L, 1L);

            IntegrationTestUtils.produceKeyValuesSynchronously(
                inputTopic,
                inputData,
                TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
                CLUSTER.time
            );

            final List<KeyValue<Long, Long>> committedRecords
                = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
                TestUtils.consumerConfig(
                    CLUSTER.bootstrapServers(),
                    CONSUMER_GROUP_ID,
                    LongDeserializer.class,
                    LongDeserializer.class,
                    new Properties() {
                        {
                            put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                        }
                    }),
                inputTopic,
                inputData.size()
            );

            checkResultPerKey(committedRecords, inputData);
        } finally {
            streams.close();
        }
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:63,代码来源:EosIntegrationTest.java


示例18: shouldBeAbleToPerformMultipleTransactions

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
@Test
public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
    final KStreamBuilder builder = new KStreamBuilder();
    builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC);

    final KafkaStreams streams = new KafkaStreams(
        builder,
        StreamsTestUtils.getStreamsConfig(
            applicationId,
            CLUSTER.bootstrapServers(),
            Serdes.LongSerde.class.getName(),
            Serdes.LongSerde.class.getName(),
            new Properties() {
                {
                    put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
                }
            }));

    try {
        streams.start();

        final List<KeyValue<Long, Long>> firstBurstOfData = prepareData(0L, 5L, 0L);
        final List<KeyValue<Long, Long>> secondBurstOfData = prepareData(5L, 8L, 0L);

        IntegrationTestUtils.produceKeyValuesSynchronously(
            SINGLE_PARTITION_INPUT_TOPIC,
            firstBurstOfData,
            TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
            CLUSTER.time
        );

        final List<KeyValue<Long, Long>> firstCommittedRecords
            = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
            TestUtils.consumerConfig(
                CLUSTER.bootstrapServers(),
                CONSUMER_GROUP_ID,
                LongDeserializer.class,
                LongDeserializer.class,
                new Properties() {
                    {
                        put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                    }
                }),
            SINGLE_PARTITION_OUTPUT_TOPIC,
            firstBurstOfData.size()
        );

        assertThat(firstCommittedRecords, equalTo(firstBurstOfData));

        IntegrationTestUtils.produceKeyValuesSynchronously(
            SINGLE_PARTITION_INPUT_TOPIC,
            secondBurstOfData,
            TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
            CLUSTER.time
        );

        final List<KeyValue<Long, Long>> secondCommittedRecords
            = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
            TestUtils.consumerConfig(
                CLUSTER.bootstrapServers(),
                CONSUMER_GROUP_ID,
                LongDeserializer.class,
                LongDeserializer.class,
                new Properties() {
                    {
                        put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                    }
                }),
            SINGLE_PARTITION_OUTPUT_TOPIC,
            secondBurstOfData.size()
        );

        assertThat(secondCommittedRecords, equalTo(secondBurstOfData));
    } finally {
        streams.close();
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:78,代码来源:EosIntegrationTest.java


示例19: testKafkaBinderConfigurationWithKafkaProperties

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
@Test
public void testKafkaBinderConfigurationWithKafkaProperties() throws Exception {
	assertNotNull(this.kafkaMessageChannelBinder);
	ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
			new KafkaProducerProperties());
	Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory",
			String.class, ExtendedProducerProperties.class);
	getProducerFactoryMethod.setAccessible(true);
	DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod
			.invoke(this.kafkaMessageChannelBinder, "foo", producerProperties);
	Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs",
			Map.class);
	ReflectionUtils.makeAccessible(producerFactoryConfigField);
	Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField,
			producerFactory);
	assertTrue(producerConfigs.get("batch.size").equals(10));
	assertTrue(producerConfigs.get("key.serializer").equals(LongSerializer.class));
	assertTrue(producerConfigs.get("key.deserializer") == null);
	assertTrue(producerConfigs.get("value.serializer").equals(LongSerializer.class));
	assertTrue(producerConfigs.get("value.deserializer") == null);
	assertTrue(producerConfigs.get("compression.type").equals("snappy"));
	List<String> bootstrapServers = new ArrayList<>();
	bootstrapServers.add("10.98.09.199:9092");
	bootstrapServers.add("10.98.09.196:9092");
	assertTrue((((List<String>) producerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
	Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod(
			"createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class);
	createKafkaConsumerFactoryMethod.setAccessible(true);
	ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(
			new KafkaConsumerProperties());
	DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod
			.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties);
	Field consumerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs",
			Map.class);
	ReflectionUtils.makeAccessible(consumerFactoryConfigField);
	Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField,
			consumerFactory);
	assertTrue(consumerConfigs.get("key.deserializer").equals(LongDeserializer.class));
	assertTrue(consumerConfigs.get("key.serializer") == null);
	assertTrue(consumerConfigs.get("value.deserializer").equals(LongDeserializer.class));
	assertTrue(consumerConfigs.get("value.serialized") == null);
	assertTrue(consumerConfigs.get("group.id").equals("groupIdFromBootConfig"));
	assertTrue(consumerConfigs.get("auto.offset.reset").equals("earliest"));
	assertTrue((((List<String>) consumerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:46,代码来源:KafkaBinderAutoConfigurationPropertiesTest.java


示例20: KafkaSubscriberBlackboxTest

import org.apache.kafka.common.serialization.LongSerializer; //导入依赖的package包/类
public KafkaSubscriberBlackboxTest() {
    super(new TestEnvironment());
    mockProducer = new MockProducer<Long, Long>(true, new LongSerializer(), new LongSerializer());
}
 
开发者ID:unicredit,项目名称:kafka-reactive-streams,代码行数:5,代码来源:KafkaSubscriberBlackboxTest.java



注:本文中的org.apache.kafka.common.serialization.LongSerializer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Resource类代码示例发布时间:2022-05-23
下一篇:
Java Profiled类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap