本文整理汇总了Java中org.springframework.cloud.stream.binder.BinderHeaders类的典型用法代码示例。如果您正苦于以下问题:Java BinderHeaders类的具体用法?Java BinderHeaders怎么用?Java BinderHeaders使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
BinderHeaders类属于org.springframework.cloud.stream.binder包,在下文中一共展示了BinderHeaders类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: createProducerMessageHandler
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<KinesisProducerProperties> producerProperties, MessageChannel errorChannel) {
KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(this.amazonKinesis);
kinesisMessageHandler.setSync(producerProperties.getExtension().isSync());
kinesisMessageHandler.setSendTimeout(producerProperties.getExtension().getSendTimeout());
kinesisMessageHandler.setStream(destination.getName());
if (producerProperties.isPartitioned()) {
kinesisMessageHandler
.setPartitionKeyExpressionString("'partitionKey-' + headers." + BinderHeaders.PARTITION_HEADER);
}
kinesisMessageHandler.setFailureChannel(errorChannel);
kinesisMessageHandler.setBeanFactory(getBeanFactory());
return kinesisMessageHandler;
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-aws-kinesis,代码行数:18,代码来源:KinesisMessageChannelBinder.java
示例2: RedisMessageChannelBinder
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
public RedisMessageChannelBinder(RedisConnectionFactory connectionFactory, String... headersToMap) {
Assert.notNull(connectionFactory, "connectionFactory must not be null");
this.connectionFactory = connectionFactory;
StringRedisTemplate template = new StringRedisTemplate(connectionFactory);
template.afterPropertiesSet();
this.redisOperations = template;
if (headersToMap != null && headersToMap.length > 0) {
String[] combinedHeadersToMap =
Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0, BinderHeaders.STANDARD_HEADERS.length
+ headersToMap.length);
System.arraycopy(headersToMap, 0, combinedHeadersToMap, BinderHeaders.STANDARD_HEADERS.length,
headersToMap.length);
this.headersToMap = combinedHeadersToMap;
}
else {
this.headersToMap = BinderHeaders.STANDARD_HEADERS;
}
this.errorAdapter = new RedisQueueOutboundChannelAdapter(
parser.parseExpression("headers['" + ERROR_HEADER + "']"), connectionFactory);
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-redis,代码行数:21,代码来源:RedisMessageChannelBinder.java
示例3: ProducerConfigurationMessageHandler
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
ProducerFactory<byte[], byte[]> producerFactory) {
super(kafkaTemplate);
setTopicExpression(new LiteralExpression(topic));
setMessageKeyExpression(producerProperties.getExtension().getMessageKeyExpression());
setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
if (producerProperties.isPartitioned()) {
SpelExpressionParser parser = new SpelExpressionParser();
setPartitionIdExpression(parser.parseExpression("headers." + BinderHeaders.PARTITION_HEADER));
}
if (producerProperties.getExtension().isSync()) {
setSync(true);
}
this.producerFactory = producerFactory;
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:17,代码来源:KafkaMessageChannelBinder.java
示例4: finishPreSend
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
/**
* This is strictly to support 1.3 semantics where BINDER_ORIGINAL_CONTENT_TYPE header
* needs to be set for certain cases and String payloads needs to be converted to byte[].
*
* Factored out of what was left of MessageSerializationUtils.
*/
// deprecated at the get go as a reminder to remove at v3.0
@Deprecated
private Message<?> finishPreSend(Message<?> message) {
String oct = message.getHeaders().containsKey(MessageHeaders.CONTENT_TYPE) ? message.getHeaders().get(MessageHeaders.CONTENT_TYPE).toString() : null;
String ct = oct;
if (message.getPayload() instanceof String) {
ct = JavaClassMimeTypeUtils.mimeTypeFromObject(message.getPayload(), ObjectUtils.nullSafeToString(oct)).toString();
}
MessageValues messageValues = new MessageValues(message);
Object payload = message.getPayload();
if (payload instanceof String) {
payload = ((String)payload).getBytes(StandardCharsets.UTF_8);
}
messageValues.setPayload(payload);
if (ct != null && !ct.equals(oct)) {
messageValues.put(MessageHeaders.CONTENT_TYPE, ct);
messageValues.put(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE, oct);
}
return messageValues.toMessage();
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:28,代码来源:MessageConverterConfigurer.java
示例5: preSend
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
if (!message.getHeaders().containsKey(BinderHeaders.PARTITION_OVERRIDE)) {
int partition = this.partitionHandler.determinePartition(message);
return MessageConverterConfigurer.this.messageBuilderFactory
.fromMessage(message)
.setHeader(BinderHeaders.PARTITION_HEADER, partition).build();
}
else {
return MessageConverterConfigurer.this.messageBuilderFactory
.fromMessage(message)
.setHeader(BinderHeaders.PARTITION_HEADER,
message.getHeaders()
.get(BinderHeaders.PARTITION_OVERRIDE))
.removeHeader(BinderHeaders.PARTITION_OVERRIDE).build();
}
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:18,代码来源:MessageConverterConfigurer.java
示例6: testConfigureInputChannelWithLegacyContentType
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Test
public void testConfigureInputChannelWithLegacyContentType() {
BindingServiceProperties props = new BindingServiceProperties();
BindingProperties bindingProps = new BindingProperties();
bindingProps.setContentType("foo/bar");
props.setBindings(Collections.singletonMap("foo", bindingProps));
CompositeMessageConverterFactory converterFactory = new CompositeMessageConverterFactory(
Collections.<MessageConverter>emptyList(), null);
MessageConverterConfigurer configurer = new MessageConverterConfigurer(props, converterFactory);
QueueChannel in = new QueueChannel();
configurer.configureInputChannel(in, "foo");
Foo foo = new Foo();
in.send(
MessageBuilder.withPayload(foo)
.setHeader(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE, "application/json")
.setHeader(BinderHeaders.SCST_VERSION, "1.x")
.build());
Message<?> received = in.receive(0);
assertThat(received).isNotNull();
assertThat(received.getPayload()).isEqualTo(foo);
assertThat(received.getHeaders().get(MessageHeaders.CONTENT_TYPE).toString()).isEqualTo("application/json");
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:23,代码来源:MessageConverterConfigurerTests.java
示例7: testOriginalContentTypeIsRetrievedForLegacyContentHeaderType
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Test
public void testOriginalContentTypeIsRetrievedForLegacyContentHeaderType() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
MessageHandler messageHandler = new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
assertThat(message.getPayload()).isInstanceOf(String.class);
assertThat(message.getPayload()).isEqualTo("{\"message\":\"Hi\"}");
assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE).toString()).isEqualTo("application/json");
latch.countDown();
}
};
testSink.input().subscribe(messageHandler);
testSink.input().send(MessageBuilder.withPayload("{\"message\":\"Hi\"}".getBytes())
.setHeader(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE, "application/json")
.setHeader(BinderHeaders.SCST_VERSION, "1.x")
.build());
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
testSink.input().unsubscribe(messageHandler);
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:21,代码来源:LegacyContentTypeTests.java
示例8: headersToMap
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
private static String[] headersToMap(KinesisBinderConfigurationProperties configurationProperties) {
Assert.notNull(configurationProperties, "'configurationProperties' must not be null");
if (ObjectUtils.isEmpty(configurationProperties.getHeaders())) {
return BinderHeaders.STANDARD_HEADERS;
}
else {
String[] combinedHeadersToMap = Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0,
BinderHeaders.STANDARD_HEADERS.length + configurationProperties.getHeaders().length);
System.arraycopy(configurationProperties.getHeaders(), 0, combinedHeadersToMap,
BinderHeaders.STANDARD_HEADERS.length, configurationProperties.getHeaders().length);
return combinedHeadersToMap;
}
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-aws-kinesis,代码行数:14,代码来源:KinesisMessageChannelBinder.java
示例9: convert
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
protected PubSubMessage convert(Message<?> message) throws Exception {
String encodedHeaders = encodeHeaders(message.getHeaders());
String topic = producerProperties.isPartitioned() ? topics
.get((Integer) message.getHeaders().get(BinderHeaders.PARTITION_HEADER))
.name() : topics.get(0).name();
PubSubMessage pubSubMessage = new PubSubMessage(
com.google.cloud.pubsub.Message
.builder(ByteArray.copyFrom((byte[]) message.getPayload()))
.addAttribute(PubSubBinder.SCST_HEADERS, encodedHeaders).build(),
topic);
return pubSubMessage;
}
开发者ID:viniciusccarvalho,项目名称:spring-cloud-stream-binder-pubsub,代码行数:13,代码来源:PubSubMessageHandler.java
示例10: deserializePayload
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
/**
* Will *only* deserialize payload if its 'contentType' is 'text/* or application/json' or Java/Kryo serialized.
* While this would naturally happen via MessageConverters at the time of handler method
* invocation, doing it here also is strictly to support behavior established
* in previous versions of SCSt. One of these cases is return payload as String if contentType is text or json.
* Also to support certain type of assumptions on type-less handlers (i.e., handle(?) vs. handle(Foo));
*/
private Object deserializePayload(Message<?> message, MimeType contentTypeToUse) {
Object payload = null;
if ("text".equalsIgnoreCase(contentTypeToUse.getType()) || equalTypeAndSubType(MimeTypeUtils.APPLICATION_JSON, contentTypeToUse)) {
payload = new String((byte[])message.getPayload(), StandardCharsets.UTF_8);
}
else {
message = MessageBuilder.fromMessage(message).setHeader(MessageHeaders.CONTENT_TYPE, contentTypeToUse).build();
MessageConverter converter = equalTypeAndSubType(MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT, contentTypeToUse)
? compositeMessageConverterFactory.getMessageConverterForType(contentTypeToUse)
: compositeMessageConverterFactory.getMessageConverterForAllRegistered();
String targetClassName = contentTypeToUse.getParameter("type");
Class<?> targetClass = null;
if (StringUtils.hasText(targetClassName)) {
try {
targetClass = Class.forName(targetClassName, false, Thread.currentThread().getContextClassLoader());
}
catch (Exception e) {
throw new IllegalStateException("Failed to determine class name for contentType: "
+ message.getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE), e);
}
}
Assert.isTrue(!(equalTypeAndSubType(MessageConverterUtils.X_JAVA_OBJECT, contentTypeToUse) && targetClass == null),
"Cannot deserialize into message since 'contentType` is not "
+ "encoded with the actual target type."
+ "Consider 'application/x-java-object; type=foo.bar.MyClass'");
payload = converter.fromMessage(message, targetClass);
}
return payload;
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:39,代码来源:MessageConverterConfigurer.java
示例11: testPartitionHeader
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Test
public void testPartitionHeader() throws Exception {
this.testSource.output().send(MessageBuilder.withPayload("{\"message\":\"Hi\"}").build());
Message<?> message = this.messageCollector.forChannel(testSource.output()).poll(1, TimeUnit.SECONDS);
assertThat(message.getHeaders().get(BinderHeaders.PARTITION_HEADER).equals(0));
assertNull(message.getHeaders().get(BinderHeaders.PARTITION_OVERRIDE));
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:8,代码来源:MessageChannelConfigurerTests.java
示例12: testPartitionHeaderWithPartitionOverride
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Test
public void testPartitionHeaderWithPartitionOverride() throws Exception {
this.testSource.output().send(MessageBuilder.withPayload("{\"message\":\"Hi\"}")
.setHeader(BinderHeaders.PARTITION_OVERRIDE, 123).build());
Message<?> message = this.messageCollector.forChannel(testSource.output()).poll(1, TimeUnit.SECONDS);
assertThat(message.getHeaders().get(BinderHeaders.PARTITION_HEADER).equals(123));
assertNull(message.getHeaders().get(BinderHeaders.PARTITION_OVERRIDE));
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:9,代码来源:MessageChannelConfigurerTests.java
示例13: testOriginalContentTypeHeaderOnly
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Test
public void testOriginalContentTypeHeaderOnly() throws Exception {
User specificRecord = new User();
specificRecord.setName("joe");
Schema v1 = new Schema.Parser().parse(AvroMessageConverterSerializationTests.class
.getClassLoader().getResourceAsStream("schemas/user.avsc"));
GenericRecord genericRecord = new GenericData.Record(v1);
genericRecord.put("name", "joe");
SchemaRegistryClient client = new DefaultSchemaRegistryClient();
client.register("user", "avro", v1.toString());
AvroSchemaRegistryClientMessageConverter converter = new AvroSchemaRegistryClientMessageConverter(
client, new NoOpCacheManager());
converter.setDynamicSchemaGenerationEnabled(false);
converter.afterPropertiesSet();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DatumWriter<User> writer = new SpecificDatumWriter<>(User.class);
Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
writer.write(specificRecord, encoder);
encoder.flush();
Message source = MessageBuilder.withPayload(baos.toByteArray())
.setHeader(MessageHeaders.CONTENT_TYPE,
MimeTypeUtils.APPLICATION_OCTET_STREAM)
.setHeader(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE,
"application/vnd.user.v1+avro")
.build();
Object converted = converter.fromMessage(source, User.class);
Assert.assertNotNull(converted);
Assert.assertEquals(specificRecord.getName().toString(),
((User) converted).getName().toString());
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:32,代码来源:AvroMessageConverterSerializationTests.java
示例14: testPartitionedModuleJava
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Test
@Override
public void testPartitionedModuleJava() throws Exception {
KinesisTestBinder binder = getBinder();
ExtendedConsumerProperties<KinesisConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setConcurrency(2);
consumerProperties.setInstanceCount(3);
consumerProperties.setInstanceIndex(0);
consumerProperties.setPartitioned(true);
final List<Message<?>> results = new ArrayList<>();
final CountDownLatch receiveLatch = new CountDownLatch(3);
MessageHandler receivingHandler = message -> {
results.add(message);
receiveLatch.countDown();
};
DirectChannel input0 = createBindableChannel("test.input0J", new BindingProperties());
input0.subscribe(receivingHandler);
Binding<MessageChannel> input0Binding = binder.bindConsumer("partJ.0", "testPartitionedModuleJava", input0,
consumerProperties);
consumerProperties.setInstanceIndex(1);
DirectChannel input1 = createBindableChannel("test.input1J", new BindingProperties());
input1.subscribe(receivingHandler);
Binding<MessageChannel> input1Binding = binder.bindConsumer("partJ.0", "testPartitionedModuleJava", input1,
consumerProperties);
consumerProperties.setInstanceIndex(2);
DirectChannel input2 = createBindableChannel("test.input2J", new BindingProperties());
input2.subscribe(receivingHandler);
Binding<MessageChannel> input2Binding = binder.bindConsumer("partJ.0", "testPartitionedModuleJava", input2,
consumerProperties);
ExtendedProducerProperties<KinesisProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionKeyExtractorClass(PartitionTestSupport.class);
producerProperties.setPartitionSelectorClass(PartitionTestSupport.class);
producerProperties.setPartitionCount(3);
DirectChannel output = createBindableChannel("test.output",
createProducerBindingProperties(producerProperties));
Binding<MessageChannel> outputBinding = binder.bindProducer("partJ.0", output, producerProperties);
if (usesExplicitRouting()) {
Object endpoint = extractEndpoint(outputBinding);
assertThat(getEndpointRouting(endpoint))
.contains(getExpectedRoutingBaseDestination("partJ.0", "testPartitionedModuleJava")
+ "-' + headers['" + BinderHeaders.PARTITION_HEADER + "']");
}
output.send(new GenericMessage<>(2));
output.send(new GenericMessage<>(1));
output.send(new GenericMessage<>(0));
assertThat(receiveLatch.await(20, TimeUnit.SECONDS)).isTrue();
assertThat(results).extracting("payload").containsExactlyInAnyOrder("0", "1", "2");
input0Binding.unbind();
input1Binding.unbind();
input2Binding.unbind();
outputBinding.unbind();
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-aws-kinesis,代码行数:71,代码来源:KinesisBinderTests.java
示例15: assertMessageReceive
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
private void assertMessageReceive(QueueChannel moduleInputChannel, String payload) {
Message<?> inbound = receive(moduleInputChannel);
assertNotNull(inbound);
assertEquals(payload, new String((byte[])inbound.getPayload()));
assertNull(inbound.getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE));
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-redis,代码行数:7,代码来源:RawModeRedisBinderTests.java
示例16: testTrustedPackages
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@SuppressWarnings({"rawtypes", "unchecked"})
@Test
public void testTrustedPackages() throws Exception {
Binder binder = getBinder();
BindingProperties producerBindingProperties = createProducerBindingProperties(createProducerProperties());
DirectChannel moduleOutputChannel = createBindableChannel("output", producerBindingProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.getExtension().setTrustedPackages(new String[]{"org.springframework.util"});
DirectChannel moduleInputChannel = createBindableChannel("input", createConsumerBindingProperties(consumerProperties));
Binding<MessageChannel> producerBinding = binder.bindProducer("bar.0", moduleOutputChannel,
producerBindingProperties.getProducer());
Binding<MessageChannel> consumerBinding = binder.bindConsumer("bar.0",
"testSendAndReceiveNoOriginalContentType", moduleInputChannel, consumerProperties);
binderBindUnbindLatency();
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("foo")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.setHeader("foo", MimeTypeUtils.TEXT_PLAIN)
.build();
moduleOutputChannel.send(message);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Message<String>> inboundMessageRef = new AtomicReference<>();
moduleInputChannel.subscribe(message1 -> {
try {
inboundMessageRef.set((Message<String>) message1);
}
finally {
latch.countDown();
}
});
Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");
Assertions.assertThat(inboundMessageRef.get()).isNotNull();
Assertions.assertThat(inboundMessageRef.get().getPayload()).isEqualTo("foo");
Assertions.assertThat(inboundMessageRef.get().getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)).isNull();
Assertions.assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MimeTypeUtils.TEXT_PLAIN);
Assertions.assertThat(inboundMessageRef.get().getHeaders().get("foo")).isInstanceOf(MimeType.class);
MimeType actual = (MimeType) inboundMessageRef.get().getHeaders().get("foo");
Assertions.assertThat(actual).isEqualTo(MimeTypeUtils.TEXT_PLAIN);
producerBinding.unbind();
consumerBinding.unbind();
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:51,代码来源:KafkaBinderTests.java
示例17: createProducerMessageHandler
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Override
protected MessageHandler createProducerMessageHandler(final ProducerDestination producerDestination,
ExtendedProducerProperties<RabbitProducerProperties> producerProperties, MessageChannel errorChannel) {
Assert.state(!HeaderMode.embeddedHeaders.equals(producerProperties.getHeaderMode()),
"the RabbitMQ binder does not support embedded headers since RabbitMQ supports headers natively");
String prefix = producerProperties.getExtension().getPrefix();
String exchangeName = producerDestination.getName();
String destination = StringUtils.isEmpty(prefix) ? exchangeName : exchangeName.substring(prefix.length());
final AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(
buildRabbitTemplate(producerProperties.getExtension(), errorChannel != null));
endpoint.setExchangeName(producerDestination.getName());
RabbitProducerProperties extendedProperties = producerProperties.getExtension();
boolean expressionInterceptorNeeded = expressionInterceptorNeeded(extendedProperties);
String routingKeyExpression = extendedProperties.getRoutingKeyExpression();
if (!producerProperties.isPartitioned()) {
if (routingKeyExpression == null) {
endpoint.setRoutingKey(destination);
}
else {
if (expressionInterceptorNeeded) {
endpoint.setRoutingKeyExpressionString("headers['"
+ RabbitExpressionEvaluatingInterceptor.ROUTING_KEY_HEADER + "']");
}
else {
endpoint.setRoutingKeyExpressionString(routingKeyExpression);
}
}
}
else {
if (routingKeyExpression == null) {
endpoint.setRoutingKeyExpressionString(buildPartitionRoutingExpression(destination, false));
}
else {
if (expressionInterceptorNeeded) {
endpoint.setRoutingKeyExpressionString(buildPartitionRoutingExpression("headers['"
+ RabbitExpressionEvaluatingInterceptor.ROUTING_KEY_HEADER + "']", true));
}
else {
endpoint.setRoutingKeyExpressionString(buildPartitionRoutingExpression(routingKeyExpression,
true));
}
}
}
if (extendedProperties.getDelayExpression() != null) {
if (expressionInterceptorNeeded) {
endpoint.setDelayExpressionString("headers['"
+ RabbitExpressionEvaluatingInterceptor.DELAY_HEADER + "']");
}
else {
endpoint.setDelayExpressionString(extendedProperties.getDelayExpression());
}
}
DefaultAmqpHeaderMapper mapper = DefaultAmqpHeaderMapper.outboundMapper();
List<String> headerPatterns = new ArrayList<>(extendedProperties.getHeaderPatterns().length + 1);
headerPatterns.add("!" + BinderHeaders.PARTITION_HEADER);
headerPatterns.addAll(Arrays.asList(extendedProperties.getHeaderPatterns()));
mapper.setRequestHeaderNames(headerPatterns.toArray(new String[headerPatterns.size()]));
endpoint.setHeaderMapper(mapper);
endpoint.setDefaultDeliveryMode(extendedProperties.getDeliveryMode());
endpoint.setBeanFactory(this.getBeanFactory());
if (errorChannel != null) {
checkConnectionFactoryIsErrorCapable();
endpoint.setReturnChannel(errorChannel);
endpoint.setConfirmNackChannel(errorChannel);
endpoint.setConfirmCorrelationExpressionString("#root");
endpoint.setErrorMessageStrategy(new DefaultErrorMessageStrategy());
}
endpoint.afterPropertiesSet();
return endpoint;
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-rabbit,代码行数:71,代码来源:RabbitMessageChannelBinder.java
示例18: buildPartitionRoutingExpression
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
private String buildPartitionRoutingExpression(String expressionRoot, boolean rootIsExpression) {
return rootIsExpression
? expressionRoot + " + '-' + headers['" + BinderHeaders.PARTITION_HEADER + "']"
: "'" + expressionRoot + "-' + headers['" + BinderHeaders.PARTITION_HEADER + "']";
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-rabbit,代码行数:6,代码来源:RabbitMessageChannelBinder.java
示例19: checkRkExpressionForPartitionedModuleSpEL
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Override
protected void checkRkExpressionForPartitionedModuleSpEL(Object endpoint) {
assertThat(getEndpointRouting(endpoint)).contains(getExpectedRoutingBaseDestination("'part.0'", "test")
+ " + '-' + headers['" + BinderHeaders.PARTITION_HEADER + "']");
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-rabbit,代码行数:6,代码来源:RabbitBinderTests.java
示例20: preSend
import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
Class<?> targetClass = null;
MessageConverter converter = null;
MimeType contentType = message.getHeaders().containsKey(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)
? MimeType.valueOf((String)message.getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE))
: contentTypeResolver.resolve(message.getHeaders());
if (contentType != null){
if (equalTypeAndSubType(MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT, contentType) ||
equalTypeAndSubType(MessageConverterUtils.X_JAVA_OBJECT, contentType)) {
// for Java and Kryo de-serialization we need to reset the content type
message = MessageBuilder.fromMessage(message).setHeader(MessageHeaders.CONTENT_TYPE, contentType).build();
converter = equalTypeAndSubType(MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT, contentType)
? converterFactory.getMessageConverterForType(contentType)
: converterFactory.getMessageConverterForAllRegistered();
String targetClassName = contentType.getParameter("type");
if (StringUtils.hasText(targetClassName)) {
try {
targetClass = Class.forName(targetClassName, false, Thread.currentThread().getContextClassLoader());
}
catch (Exception e) {
throw new IllegalStateException("Failed to determine class name for contentType: "
+ message.getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE), e);
}
}
}
}
Object payload;
if (converter != null){
Assert.isTrue(!(equalTypeAndSubType(MessageConverterUtils.X_JAVA_OBJECT, contentType) && targetClass == null),
"Cannot deserialize into message since 'contentType` is not "
+ "encoded with the actual target type."
+ "Consider 'application/x-java-object; type=foo.bar.MyClass'");
payload = converter.fromMessage(message, targetClass);
}
else {
MimeType deserializeContentType = contentTypeResolver.resolve(message.getHeaders());
if (deserializeContentType == null) {
deserializeContentType = contentType;
}
payload = deserializeContentType == null ? message.getPayload() : this.deserializePayload(message.getPayload(), deserializeContentType);
}
message = MessageBuilder.withPayload(payload)
.copyHeaders(message.getHeaders())
.setHeader(MessageHeaders.CONTENT_TYPE, contentType)
.removeHeader(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)
.build();
return message;
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:53,代码来源:TestSupportBinder.java
注:本文中的org.springframework.cloud.stream.binder.BinderHeaders类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论