• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java TimestampType类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.common.record.TimestampType的典型用法代码示例。如果您正苦于以下问题:Java TimestampType类的具体用法?Java TimestampType怎么用?Java TimestampType使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



TimestampType类属于org.apache.kafka.common.record包,在下文中一共展示了TimestampType类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: testPutWhenPartitioningOnMessageTimeWhenNoTimestampType

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test(expected = ConnectException.class)
public void testPutWhenPartitioningOnMessageTimeWhenNoTimestampType() {
  final String topic = "test-topic";

  Map<String, String> properties = propertiesFactory.getProperties();
  properties.put(BigQuerySinkConfig.TOPICS_CONFIG, topic);
  properties.put(BigQuerySinkConfig.DATASETS_CONFIG, ".*=scratch");
  properties.put(BigQuerySinkTaskConfig.BIGQUERY_MESSAGE_TIME_PARTITIONING_CONFIG, "true");

  BigQuery bigQuery = mock(BigQuery.class);
  SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class);
  InsertAllResponse insertAllResponse = mock(InsertAllResponse.class);

  when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse);
  when(insertAllResponse.hasErrors()).thenReturn(false);

  BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null);
  testTask.initialize(sinkTaskContext);
  testTask.start(properties);

  testTask.put(Collections.singletonList(spoofSinkRecord(topic, "value", "message text", TimestampType.NO_TIMESTAMP_TYPE, null)));
}
 
开发者ID:wepay,项目名称:kafka-connect-bigquery,代码行数:23,代码来源:BigQuerySinkTaskTest.java


示例2: createBatch

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
private ByteBuffer createBatch(int batchSize) {
    byte[] value = new byte[messageSize];
    final ByteBuffer buf = ByteBuffer.allocate(
        AbstractRecords.estimateSizeInBytesUpperBound(messageVersion, compressionType, new byte[0], value,
                Record.EMPTY_HEADERS) * batchSize
    );

    final MemoryRecordsBuilder builder =
        MemoryRecords.builder(buf, messageVersion, compressionType, TimestampType.CREATE_TIME, startingOffset);

    for (int i = 0; i < batchSize; ++i) {
        switch (bytes) {
            case ONES:
                Arrays.fill(value, (byte) 1);
                break;
            case RANDOM:
                random.nextBytes(value);
                break;
        }

        builder.append(0, null, value);
    }
    return builder.build().buffer();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:RecordBatchIterationBenchmark.java


示例3: testMissingTimestampPropagation

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void testMissingTimestampPropagation() throws Exception {
    expectInitializeTask();
    expectConsumerPoll(1, RecordBatch.NO_TIMESTAMP, TimestampType.CREATE_TIME);
    expectConversionAndTransformation(1);

    Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);

    sinkTask.put(EasyMock.capture(records));

    PowerMock.replayAll();

    workerTask.initialize(TASK_CONFIG);
    workerTask.initializeAndStart();
    workerTask.iteration();

    SinkRecord record = records.getValue().iterator().next();

    // we expect null for missing timestamp, the sentinel value of Record.NO_TIMESTAMP is Kafka's API
    assertEquals(null, record.timestamp());
    assertEquals(TimestampType.CREATE_TIME, record.timestampType());

    PowerMock.verifyAll();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:WorkerSinkTaskTest.java


示例4: testTimestampPropagation

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void testTimestampPropagation() throws Exception {
    final Long timestamp = System.currentTimeMillis();
    final TimestampType timestampType = TimestampType.CREATE_TIME;

    expectInitializeTask();
    expectConsumerPoll(1, timestamp, timestampType);
    expectConversionAndTransformation(1);

    Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);

    sinkTask.put(EasyMock.capture(records));

    PowerMock.replayAll();

    workerTask.initialize(TASK_CONFIG);
    workerTask.initializeAndStart();
    workerTask.iteration();

    SinkRecord record = records.getValue().iterator().next();

    assertEquals(timestamp, record.timestamp());
    assertEquals(timestampType, record.timestampType());

    PowerMock.verifyAll();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:27,代码来源:WorkerSinkTaskTest.java


示例5: expectConsumerPoll

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
private void expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType) {
    EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
            new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                @Override
                public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
                    List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
                    for (int i = 0; i < numMessages; i++)
                        records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, timestamp, timestampType, 0L, 0, 0, RAW_KEY, RAW_VALUE));
                    recordsReturned += numMessages;
                    return new ConsumerRecords<>(
                            numMessages > 0 ?
                                    Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records) :
                                    Collections.<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>emptyMap()
                    );
                }
            });
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:18,代码来源:WorkerSinkTaskTest.java


示例6: testReloadOnStart

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void testReloadOnStart() throws Exception {
    expectConfigure();
    expectStart(Arrays.asList(
            new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE.array()),
            new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE.array()),
            new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()),
            new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array())
    ));
    expectStop();

    PowerMock.replayAll();

    store.configure(DEFAULT_DISTRIBUTED_CONFIG);
    store.start();
    HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data");
    assertEquals(TP0_VALUE_NEW, data.get(TP0_KEY));
    assertEquals(TP1_VALUE_NEW, data.get(TP1_KEY));

    store.stop();

    PowerMock.verifyAll();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:KafkaOffsetBackingStoreTest.java


示例7: testExtractMetadataTimestamp

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
void testExtractMetadataTimestamp(TimestampExtractor extractor) {
    final long metadataTimestamp = 42;

    final long timestamp = extractor.extract(
        new ConsumerRecord<>(
            "anyTopic",
            0,
            0,
            metadataTimestamp,
            TimestampType.NO_TIMESTAMP_TYPE,
            0,
            0,
            0,
            null,
            null),
        0
    );

    assertThat(timestamp, is(metadataTimestamp));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:TimestampExtractorTest.java


示例8: logAndSkipOnInvalidTimestamp

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void logAndSkipOnInvalidTimestamp() {
    final long invalidMetadataTimestamp = -42;

    final TimestampExtractor extractor = new LogAndSkipOnInvalidTimestamp();
    final long timestamp = extractor.extract(
        new ConsumerRecord<>(
            "anyTopic",
            0,
            0,
            invalidMetadataTimestamp,
            TimestampType.NO_TIMESTAMP_TYPE,
            0,
            0,
            0,
            null,
            null),
        0
    );

    assertThat(timestamp, is(invalidMetadataTimestamp));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:23,代码来源:LogAndSkipOnInvalidTimestampTest.java


示例9: ConsumerRecord

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
/**
 * Creates a record to be received from a specified topic and partition
 *
 * @param topic The topic this record is received from
 * @param partition The partition of the topic this record is received from
 * @param offset The offset of this record in the corresponding Kafka partition
 * @param timestamp The timestamp of the record.
 * @param timestampType The timestamp type
 * @param checksum The checksum (CRC32) of the full record
 * @param serializedKeySize The length of the serialized key
 * @param serializedValueSize The length of the serialized value
 * @param key The key of the record, if one exists (null is allowed)
 * @param value The record contents
 * @param headers The headers of the record.
 */
public ConsumerRecord(String topic,
                      int partition,
                      long offset,
                      long timestamp,
                      TimestampType timestampType,
                      Long checksum,
                      int serializedKeySize,
                      int serializedValueSize,
                      K key,
                      V value,
                      Headers headers) {
    if (topic == null)
        throw new IllegalArgumentException("Topic cannot be null");
    this.topic = topic;
    this.partition = partition;
    this.offset = offset;
    this.timestamp = timestamp;
    this.timestampType = timestampType;
    this.checksum = checksum;
    this.serializedKeySize = serializedKeySize;
    this.serializedValueSize = serializedValueSize;
    this.key = key;
    this.value = value;
    this.headers = headers;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:41,代码来源:ConsumerRecord.java


示例10: testAppendedChecksumMagicV0AndV1

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void testAppendedChecksumMagicV0AndV1() {
    for (byte magic : Arrays.asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1)) {
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(128), magic,
                CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
        ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), builder, now);
        byte[] key = "hi".getBytes();
        byte[] value = "there".getBytes();

        FutureRecordMetadata future = batch.tryAppend(now, key, value, Record.EMPTY_HEADERS, null, now);
        assertNotNull(future);
        byte attributes = LegacyRecord.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME);
        long expectedChecksum = LegacyRecord.computeChecksum(magic, attributes, now, key, value);
        assertEquals(expectedChecksum, future.checksumOrNull().longValue());
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:ProducerBatchTest.java


示例11: testSimpleMock

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void testSimpleMock() {
    consumer.subscribe(Arrays.asList("test"), new NoOpConsumerRebalanceListener());
    assertEquals(0, consumer.poll(1000).count());
    consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new TopicPartition("test", 1)));
    // Mock consumers need to seek manually since they cannot automatically reset offsets
    HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
    beginningOffsets.put(new TopicPartition("test", 0), 0L);
    beginningOffsets.put(new TopicPartition("test", 1), 0L);
    consumer.updateBeginningOffsets(beginningOffsets);
    consumer.seek(new TopicPartition("test", 0), 0);
    ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key1", "value1");
    ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key2", "value2");
    consumer.addRecord(rec1);
    consumer.addRecord(rec2);
    ConsumerRecords<String, String> recs = consumer.poll(1);
    Iterator<ConsumerRecord<String, String>> iter = recs.iterator();
    assertEquals(rec1, iter.next());
    assertEquals(rec2, iter.next());
    assertFalse(iter.hasNext());
    assertEquals(2L, consumer.position(new TopicPartition("test", 0)));
    consumer.commitSync();
    assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:MockConsumerTest.java


示例12: setup

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Before
public void setup() throws Exception {
    metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
    client.setNode(node);

    MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
    builder.append(0L, "key".getBytes(), "value-1".getBytes());
    builder.append(0L, "key".getBytes(), "value-2".getBytes());
    builder.append(0L, "key".getBytes(), "value-3".getBytes());
    records = builder.build();

    builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 4L);
    builder.append(0L, "key".getBytes(), "value-4".getBytes());
    builder.append(0L, "key".getBytes(), "value-5".getBytes());
    nextRecords = builder.build();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:FetcherTest.java


示例13: testFetchResponseMetrics

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void testFetchResponseMetrics() {
    subscriptions.assignFromUser(singleton(tp1));
    subscriptions.seek(tp1, 0);

    Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
    KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
    KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));

    MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
            TimestampType.CREATE_TIME, 0L);
    for (int v = 0; v < 3; v++)
        builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
    MemoryRecords records = builder.build();

    int expectedBytes = 0;
    for (Record record : records.records())
        expectedBytes += record.sizeInBytes();

    fetchRecords(tp1, records, Errors.NONE, 100L, 0);
    assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
    assertEquals(3, recordsCountAverage.value(), EPSILON);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:FetcherTest.java


示例14: testFetchResponseMetricsPartialResponse

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void testFetchResponseMetricsPartialResponse() {
    subscriptions.assignFromUser(singleton(tp1));
    subscriptions.seek(tp1, 1);

    Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
    KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
    KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));

    MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
            TimestampType.CREATE_TIME, 0L);
    for (int v = 0; v < 3; v++)
        builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
    MemoryRecords records = builder.build();

    int expectedBytes = 0;
    for (Record record : records.records()) {
        if (record.offset() >= 1)
            expectedBytes += record.sizeInBytes();
    }

    fetchRecords(tp1, records, Errors.NONE, 100L, 0);
    assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
    assertEquals(2, recordsCountAverage.value(), EPSILON);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:26,代码来源:FetcherTest.java


示例15: testOldConstructor

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
@SuppressWarnings("deprecation")
public void testOldConstructor() {
    String topic = "topic";
    int partition = 0;
    long offset = 23;
    String key = "key";
    String value = "value";

    ConsumerRecord<String, String> record = new ConsumerRecord<>(topic, partition, offset, key, value);
    assertEquals(topic, record.topic());
    assertEquals(partition, record.partition());
    assertEquals(offset, record.offset());
    assertEquals(key, record.key());
    assertEquals(value, record.value());
    assertEquals(TimestampType.NO_TIMESTAMP_TYPE, record.timestampType());
    assertEquals(ConsumerRecord.NO_TIMESTAMP, record.timestamp());
    assertEquals(ConsumerRecord.NULL_CHECKSUM, record.checksum());
    assertEquals(ConsumerRecord.NULL_SIZE, record.serializedKeySize());
    assertEquals(ConsumerRecord.NULL_SIZE, record.serializedValueSize());
    assertEquals(new RecordHeaders(), record.headers());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:23,代码来源:ConsumerRecordTest.java


示例16: iterator

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void iterator() throws Exception {

    Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new LinkedHashMap<>();

    String topic = "topic";
    records.put(new TopicPartition(topic, 0), new ArrayList<ConsumerRecord<Integer, String>>());
    ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, "value1");
    ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, "value2");
    records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2));
    records.put(new TopicPartition(topic, 2), new ArrayList<ConsumerRecord<Integer, String>>());

    ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
    Iterator<ConsumerRecord<Integer, String>> iter = consumerRecords.iterator();

    int c = 0;
    for (; iter.hasNext(); c++) {
        ConsumerRecord<Integer, String> record = iter.next();
        assertEquals(1, record.partition());
        assertEquals(topic, record.topic());
        assertEquals(c, record.offset());
    }
    assertEquals(2, c);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:ConsumerRecordsTest.java


示例17: fetchResponse

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
private FetchResponse fetchResponse(Map<TopicPartition, FetchInfo> fetches) {
    LinkedHashMap<TopicPartition, PartitionData> tpResponses = new LinkedHashMap<>();
    for (Map.Entry<TopicPartition, FetchInfo> fetchEntry : fetches.entrySet()) {
        TopicPartition partition = fetchEntry.getKey();
        long fetchOffset = fetchEntry.getValue().offset;
        int fetchCount = fetchEntry.getValue().count;
        final MemoryRecords records;
        if (fetchCount == 0) {
            records = MemoryRecords.EMPTY;
        } else {
            MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
                    TimestampType.CREATE_TIME, fetchOffset);
            for (int i = 0; i < fetchCount; i++)
                builder.append(0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
            records = builder.build();
        }
        tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE, 0, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
                null, records));
    }
    return new FetchResponse(tpResponses, 0);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:KafkaConsumerTest.java


示例18: produceRequestV3ShouldContainOnlyOneRecordBatch

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test(expected = InvalidRecordException.class)
public void produceRequestV3ShouldContainOnlyOneRecordBatch() {
    ByteBuffer buffer = ByteBuffer.allocate(256);
    MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
    builder.append(10L, null, "a".getBytes());
    builder.close();

    builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
    builder.append(11L, "1".getBytes(), "b".getBytes());
    builder.append(12L, null, "c".getBytes());
    builder.close();

    buffer.flip();

    Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
    produceData.put(new TopicPartition("test", 0), MemoryRecords.readableRecords(buffer));
    new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 5000, produceData).build().toStruct();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:RequestResponseTest.java


示例19: test

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void test() throws InterruptedException {
    Map<String, String> sinkProperties = new HashMap<>();
    FluentdSinkTask task = new FluentdSinkTask();
    task.initialize(PowerMock.createMock(SinkTaskContext.class));
    //sinkProperties.put(FluentdSinkConnectorConfig.FLUENTD_CLIENT_MAX_BUFFER_BYTES, "100000");
    task.start(sinkProperties);
    final String topic = "testtopic";
    final String value = "{\"message\":\"This is a test message\"}";
    SinkRecord sinkRecord = new SinkRecord(
            topic,
            1,
            Schema.STRING_SCHEMA,
            topic,
            null,
            value,
            0,
            System.currentTimeMillis(),
            TimestampType.NO_TIMESTAMP_TYPE
    );
    task.put(Collections.singleton(sinkRecord));
    TimeUnit.SECONDS.sleep(1);
    EventEntry eventEntry = queue.poll();
    Assert.assertNotNull(eventEntry);
    Assert.assertEquals(value, eventEntry.getRecord().asMapValue().toJson());
}
 
开发者ID:fluent,项目名称:kafka-connect-fluentd,代码行数:27,代码来源:FluentdSinkTaskTest.java


示例20: ConsumerRecord

import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
/**
 * Creates a record to be received from a specified topic and partition
 *
 * @param topic The topic this record is received from
 * @param partition The partition of the topic this record is received from
 * @param offset The offset of this record in the corresponding Kafka partition
 * @param timestamp The timestamp of the record.
 * @param timestampType The timestamp type
 * @param checksum The checksum (CRC32) of the full record
 * @param serializedKeySize The length of the serialized key
 * @param serializedValueSize The length of the serialized value
 * @param key The key of the record, if one exists (null is allowed)
 * @param value The record contents
 */
public ConsumerRecord(String topic,
                      int partition,
                      long offset,
                      long timestamp,
                      TimestampType timestampType,
                      long checksum,
                      int serializedKeySize,
                      int serializedValueSize,
                      K key,
                      V value) {
    if (topic == null)
        throw new IllegalArgumentException("Topic cannot be null");
    this.topic = topic;
    this.partition = partition;
    this.offset = offset;
    this.timestamp = timestamp;
    this.timestampType = timestampType;
    this.checksum = checksum;
    this.serializedKeySize = serializedKeySize;
    this.serializedValueSize = serializedValueSize;
    this.key = key;
    this.value = value;
}
 
开发者ID:txazo,项目名称:kafka,代码行数:38,代码来源:ConsumerRecord.java



注:本文中的org.apache.kafka.common.record.TimestampType类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java AuthenticationNotSupportedException类代码示例发布时间:2022-05-21
下一篇:
Java RadarDataSet类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap