本文整理汇总了Java中org.apache.kafka.common.record.TimestampType类的典型用法代码示例。如果您正苦于以下问题:Java TimestampType类的具体用法?Java TimestampType怎么用?Java TimestampType使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
TimestampType类属于org.apache.kafka.common.record包,在下文中一共展示了TimestampType类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testPutWhenPartitioningOnMessageTimeWhenNoTimestampType
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test(expected = ConnectException.class)
public void testPutWhenPartitioningOnMessageTimeWhenNoTimestampType() {
final String topic = "test-topic";
Map<String, String> properties = propertiesFactory.getProperties();
properties.put(BigQuerySinkConfig.TOPICS_CONFIG, topic);
properties.put(BigQuerySinkConfig.DATASETS_CONFIG, ".*=scratch");
properties.put(BigQuerySinkTaskConfig.BIGQUERY_MESSAGE_TIME_PARTITIONING_CONFIG, "true");
BigQuery bigQuery = mock(BigQuery.class);
SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class);
InsertAllResponse insertAllResponse = mock(InsertAllResponse.class);
when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse);
when(insertAllResponse.hasErrors()).thenReturn(false);
BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null);
testTask.initialize(sinkTaskContext);
testTask.start(properties);
testTask.put(Collections.singletonList(spoofSinkRecord(topic, "value", "message text", TimestampType.NO_TIMESTAMP_TYPE, null)));
}
开发者ID:wepay,项目名称:kafka-connect-bigquery,代码行数:23,代码来源:BigQuerySinkTaskTest.java
示例2: createBatch
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
private ByteBuffer createBatch(int batchSize) {
byte[] value = new byte[messageSize];
final ByteBuffer buf = ByteBuffer.allocate(
AbstractRecords.estimateSizeInBytesUpperBound(messageVersion, compressionType, new byte[0], value,
Record.EMPTY_HEADERS) * batchSize
);
final MemoryRecordsBuilder builder =
MemoryRecords.builder(buf, messageVersion, compressionType, TimestampType.CREATE_TIME, startingOffset);
for (int i = 0; i < batchSize; ++i) {
switch (bytes) {
case ONES:
Arrays.fill(value, (byte) 1);
break;
case RANDOM:
random.nextBytes(value);
break;
}
builder.append(0, null, value);
}
return builder.build().buffer();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:RecordBatchIterationBenchmark.java
示例3: testMissingTimestampPropagation
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void testMissingTimestampPropagation() throws Exception {
expectInitializeTask();
expectConsumerPoll(1, RecordBatch.NO_TIMESTAMP, TimestampType.CREATE_TIME);
expectConversionAndTransformation(1);
Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(records));
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
SinkRecord record = records.getValue().iterator().next();
// we expect null for missing timestamp, the sentinel value of Record.NO_TIMESTAMP is Kafka's API
assertEquals(null, record.timestamp());
assertEquals(TimestampType.CREATE_TIME, record.timestampType());
PowerMock.verifyAll();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:WorkerSinkTaskTest.java
示例4: testTimestampPropagation
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void testTimestampPropagation() throws Exception {
final Long timestamp = System.currentTimeMillis();
final TimestampType timestampType = TimestampType.CREATE_TIME;
expectInitializeTask();
expectConsumerPoll(1, timestamp, timestampType);
expectConversionAndTransformation(1);
Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(records));
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
SinkRecord record = records.getValue().iterator().next();
assertEquals(timestamp, record.timestamp());
assertEquals(timestampType, record.timestampType());
PowerMock.verifyAll();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:27,代码来源:WorkerSinkTaskTest.java
示例5: expectConsumerPoll
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
private void expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType) {
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
for (int i = 0; i < numMessages; i++)
records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, timestamp, timestampType, 0L, 0, 0, RAW_KEY, RAW_VALUE));
recordsReturned += numMessages;
return new ConsumerRecords<>(
numMessages > 0 ?
Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records) :
Collections.<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>emptyMap()
);
}
});
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:18,代码来源:WorkerSinkTaskTest.java
示例6: testReloadOnStart
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void testReloadOnStart() throws Exception {
expectConfigure();
expectStart(Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE.array()),
new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE.array()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()),
new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array())
));
expectStop();
PowerMock.replayAll();
store.configure(DEFAULT_DISTRIBUTED_CONFIG);
store.start();
HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data");
assertEquals(TP0_VALUE_NEW, data.get(TP0_KEY));
assertEquals(TP1_VALUE_NEW, data.get(TP1_KEY));
store.stop();
PowerMock.verifyAll();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:KafkaOffsetBackingStoreTest.java
示例7: testExtractMetadataTimestamp
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
void testExtractMetadataTimestamp(TimestampExtractor extractor) {
final long metadataTimestamp = 42;
final long timestamp = extractor.extract(
new ConsumerRecord<>(
"anyTopic",
0,
0,
metadataTimestamp,
TimestampType.NO_TIMESTAMP_TYPE,
0,
0,
0,
null,
null),
0
);
assertThat(timestamp, is(metadataTimestamp));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:TimestampExtractorTest.java
示例8: logAndSkipOnInvalidTimestamp
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void logAndSkipOnInvalidTimestamp() {
final long invalidMetadataTimestamp = -42;
final TimestampExtractor extractor = new LogAndSkipOnInvalidTimestamp();
final long timestamp = extractor.extract(
new ConsumerRecord<>(
"anyTopic",
0,
0,
invalidMetadataTimestamp,
TimestampType.NO_TIMESTAMP_TYPE,
0,
0,
0,
null,
null),
0
);
assertThat(timestamp, is(invalidMetadataTimestamp));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:23,代码来源:LogAndSkipOnInvalidTimestampTest.java
示例9: ConsumerRecord
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
/**
* Creates a record to be received from a specified topic and partition
*
* @param topic The topic this record is received from
* @param partition The partition of the topic this record is received from
* @param offset The offset of this record in the corresponding Kafka partition
* @param timestamp The timestamp of the record.
* @param timestampType The timestamp type
* @param checksum The checksum (CRC32) of the full record
* @param serializedKeySize The length of the serialized key
* @param serializedValueSize The length of the serialized value
* @param key The key of the record, if one exists (null is allowed)
* @param value The record contents
* @param headers The headers of the record.
*/
public ConsumerRecord(String topic,
int partition,
long offset,
long timestamp,
TimestampType timestampType,
Long checksum,
int serializedKeySize,
int serializedValueSize,
K key,
V value,
Headers headers) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null");
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.timestamp = timestamp;
this.timestampType = timestampType;
this.checksum = checksum;
this.serializedKeySize = serializedKeySize;
this.serializedValueSize = serializedValueSize;
this.key = key;
this.value = value;
this.headers = headers;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:41,代码来源:ConsumerRecord.java
示例10: testAppendedChecksumMagicV0AndV1
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void testAppendedChecksumMagicV0AndV1() {
for (byte magic : Arrays.asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1)) {
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(128), magic,
CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), builder, now);
byte[] key = "hi".getBytes();
byte[] value = "there".getBytes();
FutureRecordMetadata future = batch.tryAppend(now, key, value, Record.EMPTY_HEADERS, null, now);
assertNotNull(future);
byte attributes = LegacyRecord.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME);
long expectedChecksum = LegacyRecord.computeChecksum(magic, attributes, now, key, value);
assertEquals(expectedChecksum, future.checksumOrNull().longValue());
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:ProducerBatchTest.java
示例11: testSimpleMock
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void testSimpleMock() {
consumer.subscribe(Arrays.asList("test"), new NoOpConsumerRebalanceListener());
assertEquals(0, consumer.poll(1000).count());
consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new TopicPartition("test", 1)));
// Mock consumers need to seek manually since they cannot automatically reset offsets
HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(new TopicPartition("test", 0), 0L);
beginningOffsets.put(new TopicPartition("test", 1), 0L);
consumer.updateBeginningOffsets(beginningOffsets);
consumer.seek(new TopicPartition("test", 0), 0);
ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key1", "value1");
ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key2", "value2");
consumer.addRecord(rec1);
consumer.addRecord(rec2);
ConsumerRecords<String, String> recs = consumer.poll(1);
Iterator<ConsumerRecord<String, String>> iter = recs.iterator();
assertEquals(rec1, iter.next());
assertEquals(rec2, iter.next());
assertFalse(iter.hasNext());
assertEquals(2L, consumer.position(new TopicPartition("test", 0)));
consumer.commitSync();
assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:MockConsumerTest.java
示例12: setup
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Before
public void setup() throws Exception {
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
client.setNode(node);
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
builder.append(0L, "key".getBytes(), "value-1".getBytes());
builder.append(0L, "key".getBytes(), "value-2".getBytes());
builder.append(0L, "key".getBytes(), "value-3".getBytes());
records = builder.build();
builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 4L);
builder.append(0L, "key".getBytes(), "value-4".getBytes());
builder.append(0L, "key".getBytes(), "value-5".getBytes());
nextRecords = builder.build();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:FetcherTest.java
示例13: testFetchResponseMetrics
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void testFetchResponseMetrics() {
subscriptions.assignFromUser(singleton(tp1));
subscriptions.seek(tp1, 0);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, 0L);
for (int v = 0; v < 3; v++)
builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
MemoryRecords records = builder.build();
int expectedBytes = 0;
for (Record record : records.records())
expectedBytes += record.sizeInBytes();
fetchRecords(tp1, records, Errors.NONE, 100L, 0);
assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
assertEquals(3, recordsCountAverage.value(), EPSILON);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:FetcherTest.java
示例14: testFetchResponseMetricsPartialResponse
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void testFetchResponseMetricsPartialResponse() {
subscriptions.assignFromUser(singleton(tp1));
subscriptions.seek(tp1, 1);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, 0L);
for (int v = 0; v < 3; v++)
builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
MemoryRecords records = builder.build();
int expectedBytes = 0;
for (Record record : records.records()) {
if (record.offset() >= 1)
expectedBytes += record.sizeInBytes();
}
fetchRecords(tp1, records, Errors.NONE, 100L, 0);
assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
assertEquals(2, recordsCountAverage.value(), EPSILON);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:26,代码来源:FetcherTest.java
示例15: testOldConstructor
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
@SuppressWarnings("deprecation")
public void testOldConstructor() {
String topic = "topic";
int partition = 0;
long offset = 23;
String key = "key";
String value = "value";
ConsumerRecord<String, String> record = new ConsumerRecord<>(topic, partition, offset, key, value);
assertEquals(topic, record.topic());
assertEquals(partition, record.partition());
assertEquals(offset, record.offset());
assertEquals(key, record.key());
assertEquals(value, record.value());
assertEquals(TimestampType.NO_TIMESTAMP_TYPE, record.timestampType());
assertEquals(ConsumerRecord.NO_TIMESTAMP, record.timestamp());
assertEquals(ConsumerRecord.NULL_CHECKSUM, record.checksum());
assertEquals(ConsumerRecord.NULL_SIZE, record.serializedKeySize());
assertEquals(ConsumerRecord.NULL_SIZE, record.serializedValueSize());
assertEquals(new RecordHeaders(), record.headers());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:23,代码来源:ConsumerRecordTest.java
示例16: iterator
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void iterator() throws Exception {
Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new LinkedHashMap<>();
String topic = "topic";
records.put(new TopicPartition(topic, 0), new ArrayList<ConsumerRecord<Integer, String>>());
ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, "value1");
ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, "value2");
records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2));
records.put(new TopicPartition(topic, 2), new ArrayList<ConsumerRecord<Integer, String>>());
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
Iterator<ConsumerRecord<Integer, String>> iter = consumerRecords.iterator();
int c = 0;
for (; iter.hasNext(); c++) {
ConsumerRecord<Integer, String> record = iter.next();
assertEquals(1, record.partition());
assertEquals(topic, record.topic());
assertEquals(c, record.offset());
}
assertEquals(2, c);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:ConsumerRecordsTest.java
示例17: fetchResponse
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
private FetchResponse fetchResponse(Map<TopicPartition, FetchInfo> fetches) {
LinkedHashMap<TopicPartition, PartitionData> tpResponses = new LinkedHashMap<>();
for (Map.Entry<TopicPartition, FetchInfo> fetchEntry : fetches.entrySet()) {
TopicPartition partition = fetchEntry.getKey();
long fetchOffset = fetchEntry.getValue().offset;
int fetchCount = fetchEntry.getValue().count;
final MemoryRecords records;
if (fetchCount == 0) {
records = MemoryRecords.EMPTY;
} else {
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, fetchOffset);
for (int i = 0; i < fetchCount; i++)
builder.append(0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
records = builder.build();
}
tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE, 0, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
null, records));
}
return new FetchResponse(tpResponses, 0);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:KafkaConsumerTest.java
示例18: produceRequestV3ShouldContainOnlyOneRecordBatch
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test(expected = InvalidRecordException.class)
public void produceRequestV3ShouldContainOnlyOneRecordBatch() {
ByteBuffer buffer = ByteBuffer.allocate(256);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
builder.append(10L, null, "a".getBytes());
builder.close();
builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
builder.append(11L, "1".getBytes(), "b".getBytes());
builder.append(12L, null, "c".getBytes());
builder.close();
buffer.flip();
Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
produceData.put(new TopicPartition("test", 0), MemoryRecords.readableRecords(buffer));
new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 5000, produceData).build().toStruct();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:RequestResponseTest.java
示例19: test
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
@Test
public void test() throws InterruptedException {
Map<String, String> sinkProperties = new HashMap<>();
FluentdSinkTask task = new FluentdSinkTask();
task.initialize(PowerMock.createMock(SinkTaskContext.class));
//sinkProperties.put(FluentdSinkConnectorConfig.FLUENTD_CLIENT_MAX_BUFFER_BYTES, "100000");
task.start(sinkProperties);
final String topic = "testtopic";
final String value = "{\"message\":\"This is a test message\"}";
SinkRecord sinkRecord = new SinkRecord(
topic,
1,
Schema.STRING_SCHEMA,
topic,
null,
value,
0,
System.currentTimeMillis(),
TimestampType.NO_TIMESTAMP_TYPE
);
task.put(Collections.singleton(sinkRecord));
TimeUnit.SECONDS.sleep(1);
EventEntry eventEntry = queue.poll();
Assert.assertNotNull(eventEntry);
Assert.assertEquals(value, eventEntry.getRecord().asMapValue().toJson());
}
开发者ID:fluent,项目名称:kafka-connect-fluentd,代码行数:27,代码来源:FluentdSinkTaskTest.java
示例20: ConsumerRecord
import org.apache.kafka.common.record.TimestampType; //导入依赖的package包/类
/**
* Creates a record to be received from a specified topic and partition
*
* @param topic The topic this record is received from
* @param partition The partition of the topic this record is received from
* @param offset The offset of this record in the corresponding Kafka partition
* @param timestamp The timestamp of the record.
* @param timestampType The timestamp type
* @param checksum The checksum (CRC32) of the full record
* @param serializedKeySize The length of the serialized key
* @param serializedValueSize The length of the serialized value
* @param key The key of the record, if one exists (null is allowed)
* @param value The record contents
*/
public ConsumerRecord(String topic,
int partition,
long offset,
long timestamp,
TimestampType timestampType,
long checksum,
int serializedKeySize,
int serializedValueSize,
K key,
V value) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null");
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.timestamp = timestamp;
this.timestampType = timestampType;
this.checksum = checksum;
this.serializedKeySize = serializedKeySize;
this.serializedValueSize = serializedValueSize;
this.key = key;
this.value = value;
}
开发者ID:txazo,项目名称:kafka,代码行数:38,代码来源:ConsumerRecord.java
注:本文中的org.apache.kafka.common.record.TimestampType类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论