本文整理汇总了Java中org.apache.kafka.common.header.Headers类的典型用法代码示例。如果您正苦于以下问题:Java Headers类的具体用法?Java Headers怎么用?Java Headers使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Headers类属于org.apache.kafka.common.header包,在下文中一共展示了Headers类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: publish
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Override
public void publish(String topic, String key, T value) {
validator.validate(value);
StopWatch watch = new StopWatch();
byte[] message = writer.toJSON(value);
try {
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, key, message);
Headers headers = record.headers();
headers.add(KafkaHeaders.HEADER_CLIENT_IP, Strings.bytes(Network.localHostAddress()));
if (logManager.appName != null)
headers.add(KafkaHeaders.HEADER_CLIENT, Strings.bytes(logManager.appName));
linkContext(headers);
producer.send(record);
} finally {
long elapsedTime = watch.elapsedTime();
ActionLogContext.track("kafka", elapsedTime); // kafka producer send message in background, the main purpose of track is to count how many message sent in action
logger.debug("publish, topic={}, key={}, message={}, elapsedTime={}", topic, key, LogParam.of(message), elapsedTime);
}
}
开发者ID:neowu,项目名称:core-ng-project,代码行数:21,代码来源:KafkaMessagePublisher.java
示例2: extract_second_no_context
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Test
public void extract_second_no_context() {
MockSpan span = mockTracer.buildSpan("first").start();
Headers headers = new RecordHeaders();
assertTrue(headers.toArray().length == 0);
// inject first
TracingKafkaUtils.inject(span.context(), headers, mockTracer);
int headersLength = headers.toArray().length;
assertTrue(headersLength > 0);
// check second
MockSpan.MockContext spanContext2 = (MockContext) TracingKafkaUtils
.extractSpanContext(headers, mockTracer);
assertNull(spanContext2);
}
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:17,代码来源:TracingKafkaUtilsTest.java
示例3: serialize
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
/**
* @throws StreamsException if both old and new values of data are null, or if
* both values are not null
*/
@Override
public byte[] serialize(String topic, Headers headers, Change<T> data) {
byte[] serializedKey;
// only one of the old / new values would be not null
if (data.newValue != null) {
if (data.oldValue != null)
throw new StreamsException("Both old and new values are not null (" + data.oldValue
+ " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
serializedKey = inner.serialize(topic, headers, data.newValue);
} else {
if (data.oldValue == null)
throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
serializedKey = inner.serialize(topic, headers, data.oldValue);
}
ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + NEWFLAG_SIZE);
buf.put(serializedKey);
buf.put((byte) (data.newValue != null ? 1 : 0));
return buf.array();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:29,代码来源:ChangedSerializer.java
示例4: ConsumerRecord
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
/**
* Creates a record to be received from a specified topic and partition
*
* @param topic The topic this record is received from
* @param partition The partition of the topic this record is received from
* @param offset The offset of this record in the corresponding Kafka partition
* @param timestamp The timestamp of the record.
* @param timestampType The timestamp type
* @param checksum The checksum (CRC32) of the full record
* @param serializedKeySize The length of the serialized key
* @param serializedValueSize The length of the serialized value
* @param key The key of the record, if one exists (null is allowed)
* @param value The record contents
* @param headers The headers of the record.
*/
public ConsumerRecord(String topic,
int partition,
long offset,
long timestamp,
TimestampType timestampType,
Long checksum,
int serializedKeySize,
int serializedValueSize,
K key,
V value,
Headers headers) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null");
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.timestamp = timestamp;
this.timestampType = timestampType;
this.checksum = checksum;
this.serializedKeySize = serializedKeySize;
this.serializedValueSize = serializedValueSize;
this.key = key;
this.value = value;
this.headers = headers;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:41,代码来源:ConsumerRecord.java
示例5: reconsumeLater
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
private void reconsumeLater(ConsumerRecord<String, byte[]> consumeRecord) throws InterruptedException, ExecutionException {
// add all header to headList except RETRY_COUNT
Headers headers = consumeRecord.headers();
List<Header> headerList = new ArrayList<Header>(8);
Iterator<Header> iterator = headers.iterator();
Integer retryCount = -1;
boolean hasOrignalHeader = false;
while (iterator.hasNext()) {
Header next = iterator.next();
if (next.key().equals(RETRY_COUNT_KEY)) {
retryCount = serializer.deserialize(next.value());
continue;
}
if(next.key().equals(ORGINAL_TOPIC)){
hasOrignalHeader = true;
}
headerList.add(next);
}
// add RETRY_COUNT to header
retryCount++;
headerList.add(new RecordHeader(RETRY_COUNT_KEY, serializer.serialization(retryCount)));
if(!hasOrignalHeader){
headerList.add(new RecordHeader(ORGINAL_TOPIC, serializer.serialization(consumeRecord.topic())));
}
// send message to corresponding queue according to retry times
String retryTopic = calcRetryTopic(consumeRecord.topic(), retryCount);
ProducerRecord<String, byte[]> record = new ProducerRecord<>(retryTopic,
consumeRecord.partition() % retryQueuePartitionCount.get(retryTopic), null, consumeRecord.key(),
consumeRecord.value(), headerList);
Future<RecordMetadata> publishKafkaMessage = retryQueueMsgProducer.publishKafkaMessage(record);
publishKafkaMessage.get();
}
开发者ID:QNJR-GROUP,项目名称:EasyTransaction,代码行数:39,代码来源:KafkaEasyTransMsgConsumerImpl.java
示例6: HeadersMapExtractAdapter
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
HeadersMapExtractAdapter(Headers headers, boolean second) {
for (Header header : headers) {
if (second) {
if (header.key().startsWith("second_span_")) {
map.put(header.key().replaceFirst("^second_span_", ""),
new String(header.value(), StandardCharsets.UTF_8));
}
} else {
map.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
}
}
}
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:13,代码来源:HeadersMapExtractAdapter.java
示例7: inject
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Test
public void inject() {
MockSpan span = mockTracer.buildSpan("test").start();
Headers headers = new RecordHeaders();
assertTrue(headers.toArray().length == 0);
TracingKafkaUtils.inject(span.context(), headers, mockTracer);
assertTrue(headers.toArray().length > 0);
}
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:11,代码来源:TracingKafkaUtilsTest.java
示例8: extract
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Test
public void extract() {
MockSpan span = mockTracer.buildSpan("test").start();
Headers headers = new RecordHeaders();
TracingKafkaUtils.inject(span.context(), headers, mockTracer);
MockSpan.MockContext spanContext = (MockSpan.MockContext) TracingKafkaUtils
.extract(headers, mockTracer);
assertEquals(span.context().spanId(), spanContext.spanId());
assertEquals(span.context().traceId(), spanContext.traceId());
}
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:13,代码来源:TracingKafkaUtilsTest.java
示例9: extract_no_context
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Test
public void extract_no_context() {
Headers headers = new RecordHeaders();
// first
MockSpan.MockContext spanContext = (MockSpan.MockContext) TracingKafkaUtils
.extract(headers, mockTracer);
assertNull(spanContext);
// second
MockSpan.MockContext spanContext2 = (MockContext) TracingKafkaUtils
.extractSpanContext(headers, mockTracer);
assertNull(spanContext2);
}
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:15,代码来源:TracingKafkaUtilsTest.java
示例10: inject_and_extract_two_contexts
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Test
public void inject_and_extract_two_contexts() {
MockSpan span = mockTracer.buildSpan("first").start();
Headers headers = new RecordHeaders();
assertTrue(headers.toArray().length == 0);
// inject first
TracingKafkaUtils.inject(span.context(), headers, mockTracer);
int headersLength = headers.toArray().length;
assertTrue(headersLength > 0);
// inject second
MockSpan span2 = mockTracer.buildSpan("second").asChildOf(span.context()).start();
TracingKafkaUtils.injectSecond(span2.context(), headers, mockTracer);
assertTrue(headers.toArray().length > headersLength);
// check first
MockSpan.MockContext spanContext = (MockSpan.MockContext) TracingKafkaUtils
.extract(headers, mockTracer);
assertEquals(span.context().spanId(), spanContext.spanId());
assertEquals(span.context().traceId(), spanContext.traceId());
// check second
MockSpan.MockContext spanContext2 = (MockContext) TracingKafkaUtils
.extractSpanContext(headers, mockTracer);
assertEquals(span2.context().spanId(), spanContext2.spanId());
assertEquals(span2.context().traceId(), spanContext2.traceId());
assertEquals(spanContext.traceId(), spanContext2.traceId());
assertNotEquals(spanContext.spanId(), spanContext2.spanId());
}
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:31,代码来源:TracingKafkaUtilsTest.java
示例11: deserialize
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Override
public Change<T> deserialize(String topic, Headers headers, byte[] data) {
byte[] bytes = new byte[data.length - NEWFLAG_SIZE];
System.arraycopy(data, 0, bytes, 0, bytes.length);
if (ByteBuffer.wrap(data).get(data.length - NEWFLAG_SIZE) != 0) {
return new Change<>(inner.deserialize(topic, headers, bytes), null);
} else {
return new Change<>(null, inner.deserialize(topic, headers, bytes));
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:ChangedDeserializer.java
示例12: deserializeKey
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Override
public Object deserializeKey(final String topic, final Headers headers, final byte[] data) {
if (keyThrowsException) {
throw new RuntimeException();
}
return key;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:SourceNodeRecordDeserializerTest.java
示例13: deserializeValue
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Override
public Object deserializeValue(final String topic, final Headers headers, final byte[] data) {
if (valueThrowsException) {
throw new RuntimeException();
}
return value;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:SourceNodeRecordDeserializerTest.java
示例14: remove
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Override
public Headers remove(String key) throws IllegalStateException {
canWrite();
checkKey(key);
Iterator<Header> iterator = iterator();
while (iterator.hasNext()) {
if (iterator.next().key().equals(key)) {
iterator.remove();
}
}
return this;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:13,代码来源:RecordHeaders.java
示例15: testAdd
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Test
public void testAdd() {
Headers headers = new RecordHeaders();
headers.add(new RecordHeader("key", "value".getBytes()));
Header header = headers.iterator().next();
assertHeader("key", "value", header);
headers.add(new RecordHeader("key2", "value2".getBytes()));
assertHeader("key2", "value2", headers.lastHeader("key2"));
assertEquals(2, getCount(headers));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:RecordHeadersTest.java
示例16: testRemove
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Test
public void testRemove() {
Headers headers = new RecordHeaders();
headers.add(new RecordHeader("key", "value".getBytes()));
assertTrue(headers.iterator().hasNext());
headers.remove("key");
assertFalse(headers.iterator().hasNext());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:12,代码来源:RecordHeadersTest.java
示例17: testLastHeader
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Test
public void testLastHeader() {
Headers headers = new RecordHeaders();
headers.add(new RecordHeader("key", "value".getBytes()));
headers.add(new RecordHeader("key", "value2".getBytes()));
headers.add(new RecordHeader("key", "value3".getBytes()));
assertHeader("key", "value3", headers.lastHeader("key"));
assertEquals(3, getCount(headers));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:12,代码来源:RecordHeadersTest.java
示例18: getCount
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
private int getCount(Headers headers) {
int count = 0;
Iterator<Header> headerIterator = headers.iterator();
while (headerIterator.hasNext()) {
headerIterator.next();
count++;
}
return count;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:10,代码来源:RecordHeadersTest.java
示例19: handle
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
private <T> void handle(String topic, KafkaMessageListener.MessageHandlerHolder<T> holder, List<ConsumerRecord<String, byte[]>> records, double longProcessThresholdInNano) {
for (ConsumerRecord<String, byte[]> record : records) {
logManager.begin("=== message handling begin ===");
ActionLog actionLog = logManager.currentActionLog();
try {
actionLog.action("topic/" + topic);
actionLog.context("topic", topic);
actionLog.context("handler", holder.handler.getClass().getCanonicalName());
actionLog.context("key", record.key());
logger.debug("message={}", LogParam.of(record.value()));
T message = holder.reader.fromJSON(record.value());
Headers headers = record.headers();
actionLog.refId(header(headers, KafkaHeaders.HEADER_REF_ID));
String client = header(headers, KafkaHeaders.HEADER_CLIENT);
if (client != null) actionLog.context("client", client);
String clientIP = header(headers, KafkaHeaders.HEADER_CLIENT_IP);
if (clientIP != null) actionLog.context("clientIP", clientIP);
if ("true".equals(header(headers, KafkaHeaders.HEADER_TRACE))) {
actionLog.trace = true;
}
holder.validator.validate(message);
holder.handler.handle(record.key(), message);
} catch (Throwable e) {
logManager.logError(e);
} finally {
long elapsedTime = actionLog.elapsedTime();
if (elapsedTime > longProcessThresholdInNano) {
logger.warn(Markers.errorCode("LONG_PROCESS"), "took too long to process message, elapsedTime={}", elapsedTime);
}
logManager.end("=== message handling end ===");
}
}
}
开发者ID:neowu,项目名称:core-ng-project,代码行数:38,代码来源:KafkaMessageListenerThread.java
示例20: linkContext
import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
private void linkContext(Headers headers) {
ActionLog actionLog = logManager.currentActionLog();
if (actionLog == null) return;
headers.add(KafkaHeaders.HEADER_REF_ID, Strings.bytes(actionLog.refId()));
if (actionLog.trace) headers.add(KafkaHeaders.HEADER_TRACE, Strings.bytes("true"));
}
开发者ID:neowu,项目名称:core-ng-project,代码行数:8,代码来源:KafkaMessagePublisher.java
注:本文中的org.apache.kafka.common.header.Headers类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论