• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java Headers类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.common.header.Headers的典型用法代码示例。如果您正苦于以下问题:Java Headers类的具体用法?Java Headers怎么用?Java Headers使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Headers类属于org.apache.kafka.common.header包,在下文中一共展示了Headers类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: publish

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Override
public void publish(String topic, String key, T value) {
    validator.validate(value);

    StopWatch watch = new StopWatch();
    byte[] message = writer.toJSON(value);
    try {
        ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, key, message);
        Headers headers = record.headers();
        headers.add(KafkaHeaders.HEADER_CLIENT_IP, Strings.bytes(Network.localHostAddress()));
        if (logManager.appName != null)
            headers.add(KafkaHeaders.HEADER_CLIENT, Strings.bytes(logManager.appName));
        linkContext(headers);
        producer.send(record);
    } finally {
        long elapsedTime = watch.elapsedTime();
        ActionLogContext.track("kafka", elapsedTime);   // kafka producer send message in background, the main purpose of track is to count how many message sent in action
        logger.debug("publish, topic={}, key={}, message={}, elapsedTime={}", topic, key, LogParam.of(message), elapsedTime);
    }
}
 
开发者ID:neowu,项目名称:core-ng-project,代码行数:21,代码来源:KafkaMessagePublisher.java


示例2: extract_second_no_context

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Test
public void extract_second_no_context() {
  MockSpan span = mockTracer.buildSpan("first").start();
  Headers headers = new RecordHeaders();
  assertTrue(headers.toArray().length == 0);

  // inject first
  TracingKafkaUtils.inject(span.context(), headers, mockTracer);
  int headersLength = headers.toArray().length;
  assertTrue(headersLength > 0);

  // check second
  MockSpan.MockContext spanContext2 = (MockContext) TracingKafkaUtils
      .extractSpanContext(headers, mockTracer);
  assertNull(spanContext2);
}
 
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:17,代码来源:TracingKafkaUtilsTest.java


示例3: serialize

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
/**
 * @throws StreamsException if both old and new values of data are null, or if
 * both values are not null
 */
@Override
public byte[] serialize(String topic, Headers headers, Change<T> data) {
    byte[] serializedKey;

    // only one of the old / new values would be not null
    if (data.newValue != null) {
        if (data.oldValue != null)
            throw new StreamsException("Both old and new values are not null (" + data.oldValue
                    + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");

        serializedKey = inner.serialize(topic, headers, data.newValue);
    } else {
        if (data.oldValue == null)
            throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");

        serializedKey = inner.serialize(topic, headers, data.oldValue);
    }

    ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + NEWFLAG_SIZE);
    buf.put(serializedKey);
    buf.put((byte) (data.newValue != null ? 1 : 0));

    return buf.array();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:29,代码来源:ChangedSerializer.java


示例4: ConsumerRecord

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
/**
 * Creates a record to be received from a specified topic and partition
 *
 * @param topic The topic this record is received from
 * @param partition The partition of the topic this record is received from
 * @param offset The offset of this record in the corresponding Kafka partition
 * @param timestamp The timestamp of the record.
 * @param timestampType The timestamp type
 * @param checksum The checksum (CRC32) of the full record
 * @param serializedKeySize The length of the serialized key
 * @param serializedValueSize The length of the serialized value
 * @param key The key of the record, if one exists (null is allowed)
 * @param value The record contents
 * @param headers The headers of the record.
 */
public ConsumerRecord(String topic,
                      int partition,
                      long offset,
                      long timestamp,
                      TimestampType timestampType,
                      Long checksum,
                      int serializedKeySize,
                      int serializedValueSize,
                      K key,
                      V value,
                      Headers headers) {
    if (topic == null)
        throw new IllegalArgumentException("Topic cannot be null");
    this.topic = topic;
    this.partition = partition;
    this.offset = offset;
    this.timestamp = timestamp;
    this.timestampType = timestampType;
    this.checksum = checksum;
    this.serializedKeySize = serializedKeySize;
    this.serializedValueSize = serializedValueSize;
    this.key = key;
    this.value = value;
    this.headers = headers;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:41,代码来源:ConsumerRecord.java


示例5: reconsumeLater

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
private void reconsumeLater(ConsumerRecord<String, byte[]> consumeRecord) throws InterruptedException, ExecutionException {

		// add all header to headList except RETRY_COUNT
		Headers headers = consumeRecord.headers();
		List<Header> headerList = new ArrayList<Header>(8);
		Iterator<Header> iterator = headers.iterator();
		Integer retryCount = -1;
		boolean hasOrignalHeader = false;
		while (iterator.hasNext()) {
			Header next = iterator.next();
			if (next.key().equals(RETRY_COUNT_KEY)) {
				retryCount = serializer.deserialize(next.value());
				continue;
			}
			
			if(next.key().equals(ORGINAL_TOPIC)){
				hasOrignalHeader = true;
			}
			headerList.add(next);
		}
		
		// add RETRY_COUNT to header
		retryCount++;
		headerList.add(new RecordHeader(RETRY_COUNT_KEY, serializer.serialization(retryCount)));
		
		if(!hasOrignalHeader){
			headerList.add(new RecordHeader(ORGINAL_TOPIC, serializer.serialization(consumeRecord.topic())));
		}

		// send message to corresponding queue according to retry times
		String retryTopic = calcRetryTopic(consumeRecord.topic(), retryCount);
		
		ProducerRecord<String, byte[]> record = new ProducerRecord<>(retryTopic,
				consumeRecord.partition() % retryQueuePartitionCount.get(retryTopic), null, consumeRecord.key(),
				consumeRecord.value(), headerList);
		Future<RecordMetadata> publishKafkaMessage = retryQueueMsgProducer.publishKafkaMessage(record);
		publishKafkaMessage.get();
	}
 
开发者ID:QNJR-GROUP,项目名称:EasyTransaction,代码行数:39,代码来源:KafkaEasyTransMsgConsumerImpl.java


示例6: HeadersMapExtractAdapter

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
HeadersMapExtractAdapter(Headers headers, boolean second) {
  for (Header header : headers) {
    if (second) {
      if (header.key().startsWith("second_span_")) {
        map.put(header.key().replaceFirst("^second_span_", ""),
            new String(header.value(), StandardCharsets.UTF_8));
      }
    } else {
      map.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
    }
  }
}
 
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:13,代码来源:HeadersMapExtractAdapter.java


示例7: inject

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Test
public void inject() {
  MockSpan span = mockTracer.buildSpan("test").start();
  Headers headers = new RecordHeaders();
  assertTrue(headers.toArray().length == 0);

  TracingKafkaUtils.inject(span.context(), headers, mockTracer);

  assertTrue(headers.toArray().length > 0);
}
 
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:11,代码来源:TracingKafkaUtilsTest.java


示例8: extract

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Test
public void extract() {
  MockSpan span = mockTracer.buildSpan("test").start();
  Headers headers = new RecordHeaders();
  TracingKafkaUtils.inject(span.context(), headers, mockTracer);

  MockSpan.MockContext spanContext = (MockSpan.MockContext) TracingKafkaUtils
      .extract(headers, mockTracer);

  assertEquals(span.context().spanId(), spanContext.spanId());
  assertEquals(span.context().traceId(), spanContext.traceId());
}
 
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:13,代码来源:TracingKafkaUtilsTest.java


示例9: extract_no_context

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Test
public void extract_no_context() {
  Headers headers = new RecordHeaders();

  // first
  MockSpan.MockContext spanContext = (MockSpan.MockContext) TracingKafkaUtils
      .extract(headers, mockTracer);
  assertNull(spanContext);

  // second
  MockSpan.MockContext spanContext2 = (MockContext) TracingKafkaUtils
      .extractSpanContext(headers, mockTracer);
  assertNull(spanContext2);
}
 
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:15,代码来源:TracingKafkaUtilsTest.java


示例10: inject_and_extract_two_contexts

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Test
public void inject_and_extract_two_contexts() {
  MockSpan span = mockTracer.buildSpan("first").start();
  Headers headers = new RecordHeaders();
  assertTrue(headers.toArray().length == 0);

  // inject first
  TracingKafkaUtils.inject(span.context(), headers, mockTracer);
  int headersLength = headers.toArray().length;
  assertTrue(headersLength > 0);

  // inject second
  MockSpan span2 = mockTracer.buildSpan("second").asChildOf(span.context()).start();
  TracingKafkaUtils.injectSecond(span2.context(), headers, mockTracer);
  assertTrue(headers.toArray().length > headersLength);

  // check first
  MockSpan.MockContext spanContext = (MockSpan.MockContext) TracingKafkaUtils
      .extract(headers, mockTracer);
  assertEquals(span.context().spanId(), spanContext.spanId());
  assertEquals(span.context().traceId(), spanContext.traceId());

  // check second
  MockSpan.MockContext spanContext2 = (MockContext) TracingKafkaUtils
      .extractSpanContext(headers, mockTracer);
  assertEquals(span2.context().spanId(), spanContext2.spanId());
  assertEquals(span2.context().traceId(), spanContext2.traceId());
  assertEquals(spanContext.traceId(), spanContext2.traceId());
  assertNotEquals(spanContext.spanId(), spanContext2.spanId());
}
 
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:31,代码来源:TracingKafkaUtilsTest.java


示例11: deserialize

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Override
public Change<T> deserialize(String topic, Headers headers, byte[] data) {

    byte[] bytes = new byte[data.length - NEWFLAG_SIZE];

    System.arraycopy(data, 0, bytes, 0, bytes.length);

    if (ByteBuffer.wrap(data).get(data.length - NEWFLAG_SIZE) != 0) {
        return new Change<>(inner.deserialize(topic, headers, bytes), null);
    } else {
        return new Change<>(null, inner.deserialize(topic, headers, bytes));
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:ChangedDeserializer.java


示例12: deserializeKey

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Override
public Object deserializeKey(final String topic, final Headers headers, final byte[] data) {
    if (keyThrowsException) {
        throw new RuntimeException();
    }
    return key;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:SourceNodeRecordDeserializerTest.java


示例13: deserializeValue

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Override
public Object deserializeValue(final String topic, final Headers headers, final byte[] data) {
    if (valueThrowsException) {
        throw new RuntimeException();
    }
    return value;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:SourceNodeRecordDeserializerTest.java


示例14: remove

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Override
public Headers remove(String key) throws IllegalStateException {
    canWrite();
    checkKey(key);
    Iterator<Header> iterator = iterator();
    while (iterator.hasNext()) {
        if (iterator.next().key().equals(key)) {
            iterator.remove();
        }
    }
    return this;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:13,代码来源:RecordHeaders.java


示例15: testAdd

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Test
public void testAdd() {
    Headers headers = new RecordHeaders();
    headers.add(new RecordHeader("key", "value".getBytes()));

    Header header = headers.iterator().next();
    assertHeader("key", "value", header);

    headers.add(new RecordHeader("key2", "value2".getBytes()));

    assertHeader("key2", "value2", headers.lastHeader("key2"));
    assertEquals(2, getCount(headers));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:RecordHeadersTest.java


示例16: testRemove

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Test
public void testRemove() {
    Headers headers = new RecordHeaders();
    headers.add(new RecordHeader("key", "value".getBytes()));

    assertTrue(headers.iterator().hasNext());

    headers.remove("key");

    assertFalse(headers.iterator().hasNext());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:12,代码来源:RecordHeadersTest.java


示例17: testLastHeader

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
@Test
public void testLastHeader() {
    Headers headers = new RecordHeaders();
    headers.add(new RecordHeader("key", "value".getBytes()));
    headers.add(new RecordHeader("key", "value2".getBytes()));
    headers.add(new RecordHeader("key", "value3".getBytes()));

    assertHeader("key", "value3", headers.lastHeader("key"));
    assertEquals(3, getCount(headers));

}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:12,代码来源:RecordHeadersTest.java


示例18: getCount

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
private int getCount(Headers headers) {
    int count = 0;
    Iterator<Header> headerIterator = headers.iterator();
    while (headerIterator.hasNext()) {
        headerIterator.next();
        count++;
    }
    return count;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:10,代码来源:RecordHeadersTest.java


示例19: handle

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
private <T> void handle(String topic, KafkaMessageListener.MessageHandlerHolder<T> holder, List<ConsumerRecord<String, byte[]>> records, double longProcessThresholdInNano) {
    for (ConsumerRecord<String, byte[]> record : records) {
        logManager.begin("=== message handling begin ===");
        ActionLog actionLog = logManager.currentActionLog();
        try {
            actionLog.action("topic/" + topic);
            actionLog.context("topic", topic);
            actionLog.context("handler", holder.handler.getClass().getCanonicalName());
            actionLog.context("key", record.key());
            logger.debug("message={}", LogParam.of(record.value()));

            T message = holder.reader.fromJSON(record.value());

            Headers headers = record.headers();
            actionLog.refId(header(headers, KafkaHeaders.HEADER_REF_ID));
            String client = header(headers, KafkaHeaders.HEADER_CLIENT);
            if (client != null) actionLog.context("client", client);
            String clientIP = header(headers, KafkaHeaders.HEADER_CLIENT_IP);
            if (clientIP != null) actionLog.context("clientIP", clientIP);
            if ("true".equals(header(headers, KafkaHeaders.HEADER_TRACE))) {
                actionLog.trace = true;
            }

            holder.validator.validate(message);

            holder.handler.handle(record.key(), message);
        } catch (Throwable e) {
            logManager.logError(e);
        } finally {
            long elapsedTime = actionLog.elapsedTime();
            if (elapsedTime > longProcessThresholdInNano) {
                logger.warn(Markers.errorCode("LONG_PROCESS"), "took too long to process message, elapsedTime={}", elapsedTime);
            }
            logManager.end("=== message handling end ===");
        }
    }
}
 
开发者ID:neowu,项目名称:core-ng-project,代码行数:38,代码来源:KafkaMessageListenerThread.java


示例20: linkContext

import org.apache.kafka.common.header.Headers; //导入依赖的package包/类
private void linkContext(Headers headers) {
    ActionLog actionLog = logManager.currentActionLog();
    if (actionLog == null) return;

    headers.add(KafkaHeaders.HEADER_REF_ID, Strings.bytes(actionLog.refId()));
    if (actionLog.trace) headers.add(KafkaHeaders.HEADER_TRACE, Strings.bytes("true"));
}
 
开发者ID:neowu,项目名称:core-ng-project,代码行数:8,代码来源:KafkaMessagePublisher.java



注:本文中的org.apache.kafka.common.header.Headers类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Header类代码示例发布时间:2022-05-23
下一篇:
Java MailServiceUtil类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap