• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java BinderHeaders类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.springframework.cloud.stream.binder.BinderHeaders的典型用法代码示例。如果您正苦于以下问题:Java BinderHeaders类的具体用法?Java BinderHeaders怎么用?Java BinderHeaders使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



BinderHeaders类属于org.springframework.cloud.stream.binder包,在下文中一共展示了BinderHeaders类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: createProducerMessageHandler

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
		ExtendedProducerProperties<KinesisProducerProperties> producerProperties, MessageChannel errorChannel) {

	KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(this.amazonKinesis);
	kinesisMessageHandler.setSync(producerProperties.getExtension().isSync());
	kinesisMessageHandler.setSendTimeout(producerProperties.getExtension().getSendTimeout());
	kinesisMessageHandler.setStream(destination.getName());
	if (producerProperties.isPartitioned()) {
		kinesisMessageHandler
				.setPartitionKeyExpressionString("'partitionKey-' + headers." + BinderHeaders.PARTITION_HEADER);
	}
	kinesisMessageHandler.setFailureChannel(errorChannel);
	kinesisMessageHandler.setBeanFactory(getBeanFactory());

	return kinesisMessageHandler;
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-aws-kinesis,代码行数:18,代码来源:KinesisMessageChannelBinder.java


示例2: RedisMessageChannelBinder

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
public RedisMessageChannelBinder(RedisConnectionFactory connectionFactory, String... headersToMap) {
	Assert.notNull(connectionFactory, "connectionFactory must not be null");
	this.connectionFactory = connectionFactory;
	StringRedisTemplate template = new StringRedisTemplate(connectionFactory);
	template.afterPropertiesSet();
	this.redisOperations = template;
	if (headersToMap != null && headersToMap.length > 0) {
		String[] combinedHeadersToMap =
				Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0, BinderHeaders.STANDARD_HEADERS.length
						+ headersToMap.length);
		System.arraycopy(headersToMap, 0, combinedHeadersToMap, BinderHeaders.STANDARD_HEADERS.length,
				headersToMap.length);
		this.headersToMap = combinedHeadersToMap;
	}
	else {
		this.headersToMap = BinderHeaders.STANDARD_HEADERS;
	}
	this.errorAdapter = new RedisQueueOutboundChannelAdapter(
			parser.parseExpression("headers['" + ERROR_HEADER + "']"), connectionFactory);
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-redis,代码行数:21,代码来源:RedisMessageChannelBinder.java


示例3: ProducerConfigurationMessageHandler

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic,
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
		ProducerFactory<byte[], byte[]> producerFactory) {
	super(kafkaTemplate);
	setTopicExpression(new LiteralExpression(topic));
	setMessageKeyExpression(producerProperties.getExtension().getMessageKeyExpression());
	setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
	if (producerProperties.isPartitioned()) {
		SpelExpressionParser parser = new SpelExpressionParser();
		setPartitionIdExpression(parser.parseExpression("headers." + BinderHeaders.PARTITION_HEADER));
	}
	if (producerProperties.getExtension().isSync()) {
		setSync(true);
	}
	this.producerFactory = producerFactory;
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:17,代码来源:KafkaMessageChannelBinder.java


示例4: finishPreSend

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
/**
 * This is strictly to support 1.3 semantics where BINDER_ORIGINAL_CONTENT_TYPE header
 * needs to be set for certain cases and String payloads needs to be converted to byte[].
 *
 * Factored out of what was left of MessageSerializationUtils.
 */
// deprecated at the get go as a reminder to remove at v3.0
@Deprecated
private Message<?> finishPreSend(Message<?> message) {
	String oct = message.getHeaders().containsKey(MessageHeaders.CONTENT_TYPE) ? message.getHeaders().get(MessageHeaders.CONTENT_TYPE).toString() : null;
	String ct = oct;
	if (message.getPayload() instanceof String) {
		ct = JavaClassMimeTypeUtils.mimeTypeFromObject(message.getPayload(), ObjectUtils.nullSafeToString(oct)).toString();
	}
	MessageValues messageValues = new MessageValues(message);
	Object payload = message.getPayload();
	if (payload instanceof String) {
		payload = ((String)payload).getBytes(StandardCharsets.UTF_8);
	}

	messageValues.setPayload(payload);
	if (ct != null && !ct.equals(oct)) {
		messageValues.put(MessageHeaders.CONTENT_TYPE, ct);
		messageValues.put(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE, oct);
	}
	return messageValues.toMessage();
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:28,代码来源:MessageConverterConfigurer.java


示例5: preSend

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
	if (!message.getHeaders().containsKey(BinderHeaders.PARTITION_OVERRIDE)) {
		int partition = this.partitionHandler.determinePartition(message);
		return MessageConverterConfigurer.this.messageBuilderFactory
				.fromMessage(message)
				.setHeader(BinderHeaders.PARTITION_HEADER, partition).build();
	}
	else {
		return MessageConverterConfigurer.this.messageBuilderFactory
				.fromMessage(message)
				.setHeader(BinderHeaders.PARTITION_HEADER,
						message.getHeaders()
								.get(BinderHeaders.PARTITION_OVERRIDE))
				.removeHeader(BinderHeaders.PARTITION_OVERRIDE).build();
	}
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:18,代码来源:MessageConverterConfigurer.java


示例6: testConfigureInputChannelWithLegacyContentType

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Test
public void testConfigureInputChannelWithLegacyContentType() {
	BindingServiceProperties props = new BindingServiceProperties();
	BindingProperties bindingProps = new BindingProperties();
	bindingProps.setContentType("foo/bar");
	props.setBindings(Collections.singletonMap("foo", bindingProps));
	CompositeMessageConverterFactory converterFactory = new CompositeMessageConverterFactory(
			Collections.<MessageConverter>emptyList(), null);
	MessageConverterConfigurer configurer = new MessageConverterConfigurer(props, converterFactory);
	QueueChannel in = new QueueChannel();
	configurer.configureInputChannel(in, "foo");
	Foo foo = new Foo();
	in.send(
			MessageBuilder.withPayload(foo)
				.setHeader(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE, "application/json")
				.setHeader(BinderHeaders.SCST_VERSION, "1.x")
				.build());
	Message<?> received = in.receive(0);
	assertThat(received).isNotNull();
	assertThat(received.getPayload()).isEqualTo(foo);
	assertThat(received.getHeaders().get(MessageHeaders.CONTENT_TYPE).toString()).isEqualTo("application/json");
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:23,代码来源:MessageConverterConfigurerTests.java


示例7: testOriginalContentTypeIsRetrievedForLegacyContentHeaderType

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Test
public void testOriginalContentTypeIsRetrievedForLegacyContentHeaderType() throws Exception {
	final CountDownLatch latch = new CountDownLatch(1);
	MessageHandler messageHandler = new MessageHandler() {
		@Override
		public void handleMessage(Message<?> message) throws MessagingException {
			assertThat(message.getPayload()).isInstanceOf(String.class);
			assertThat(message.getPayload()).isEqualTo("{\"message\":\"Hi\"}");
			assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE).toString()).isEqualTo("application/json");
			latch.countDown();
		}
	};
	testSink.input().subscribe(messageHandler);
	testSink.input().send(MessageBuilder.withPayload("{\"message\":\"Hi\"}".getBytes())
						.setHeader(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE, "application/json")
						.setHeader(BinderHeaders.SCST_VERSION, "1.x")
						.build());
	assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
	testSink.input().unsubscribe(messageHandler);
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:21,代码来源:LegacyContentTypeTests.java


示例8: headersToMap

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
private static String[] headersToMap(KinesisBinderConfigurationProperties configurationProperties) {
	Assert.notNull(configurationProperties, "'configurationProperties' must not be null");
	if (ObjectUtils.isEmpty(configurationProperties.getHeaders())) {
		return BinderHeaders.STANDARD_HEADERS;
	}
	else {
		String[] combinedHeadersToMap = Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0,
				BinderHeaders.STANDARD_HEADERS.length + configurationProperties.getHeaders().length);
		System.arraycopy(configurationProperties.getHeaders(), 0, combinedHeadersToMap,
				BinderHeaders.STANDARD_HEADERS.length, configurationProperties.getHeaders().length);
		return combinedHeadersToMap;
	}
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-aws-kinesis,代码行数:14,代码来源:KinesisMessageChannelBinder.java


示例9: convert

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
protected PubSubMessage convert(Message<?> message) throws Exception {
	String encodedHeaders = encodeHeaders(message.getHeaders());
	String topic = producerProperties.isPartitioned() ? topics
			.get((Integer) message.getHeaders().get(BinderHeaders.PARTITION_HEADER))
			.name() : topics.get(0).name();
	PubSubMessage pubSubMessage = new PubSubMessage(
			com.google.cloud.pubsub.Message
					.builder(ByteArray.copyFrom((byte[]) message.getPayload()))
					.addAttribute(PubSubBinder.SCST_HEADERS, encodedHeaders).build(),
			topic);
	return pubSubMessage;
}
 
开发者ID:viniciusccarvalho,项目名称:spring-cloud-stream-binder-pubsub,代码行数:13,代码来源:PubSubMessageHandler.java


示例10: deserializePayload

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
/**
 * Will *only* deserialize payload if its 'contentType' is 'text/* or application/json' or Java/Kryo serialized.
 * While this would naturally happen via MessageConverters at the time of handler method
 * invocation, doing it here also is strictly to support behavior established
 * in previous versions of SCSt. One of these cases is return payload as String if contentType is text or json.
 * Also to support certain type of assumptions on type-less handlers (i.e., handle(?) vs. handle(Foo));
 */
private Object deserializePayload(Message<?> message, MimeType contentTypeToUse) {
	Object payload = null;

	if ("text".equalsIgnoreCase(contentTypeToUse.getType()) || equalTypeAndSubType(MimeTypeUtils.APPLICATION_JSON, contentTypeToUse)) {
		payload = new String((byte[])message.getPayload(), StandardCharsets.UTF_8);
	}
	else {
		message = MessageBuilder.fromMessage(message).setHeader(MessageHeaders.CONTENT_TYPE, contentTypeToUse).build();
		MessageConverter converter = equalTypeAndSubType(MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT, contentTypeToUse)
				? compositeMessageConverterFactory.getMessageConverterForType(contentTypeToUse)
						: compositeMessageConverterFactory.getMessageConverterForAllRegistered();
		String targetClassName = contentTypeToUse.getParameter("type");
		Class<?> targetClass = null;
		if (StringUtils.hasText(targetClassName)) {
			try {
				targetClass = Class.forName(targetClassName, false, Thread.currentThread().getContextClassLoader());
			}
			catch (Exception e) {
				throw new IllegalStateException("Failed to determine class name for contentType: "
						+ message.getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE), e);
			}
		}

		Assert.isTrue(!(equalTypeAndSubType(MessageConverterUtils.X_JAVA_OBJECT, contentTypeToUse) && targetClass == null),
				"Cannot deserialize into message since 'contentType` is not "
					+ "encoded with the actual target type."
					+ "Consider 'application/x-java-object; type=foo.bar.MyClass'");
		payload = converter.fromMessage(message, targetClass);
	}
	return payload;
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:39,代码来源:MessageConverterConfigurer.java


示例11: testPartitionHeader

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Test
public void testPartitionHeader() throws Exception {
	this.testSource.output().send(MessageBuilder.withPayload("{\"message\":\"Hi\"}").build());
	Message<?> message = this.messageCollector.forChannel(testSource.output()).poll(1, TimeUnit.SECONDS);
	assertThat(message.getHeaders().get(BinderHeaders.PARTITION_HEADER).equals(0));
	assertNull(message.getHeaders().get(BinderHeaders.PARTITION_OVERRIDE));
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:8,代码来源:MessageChannelConfigurerTests.java


示例12: testPartitionHeaderWithPartitionOverride

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Test
public void testPartitionHeaderWithPartitionOverride() throws Exception {
	this.testSource.output().send(MessageBuilder.withPayload("{\"message\":\"Hi\"}")
			.setHeader(BinderHeaders.PARTITION_OVERRIDE, 123).build());
	Message<?> message = this.messageCollector.forChannel(testSource.output()).poll(1, TimeUnit.SECONDS);
	assertThat(message.getHeaders().get(BinderHeaders.PARTITION_HEADER).equals(123));
	assertNull(message.getHeaders().get(BinderHeaders.PARTITION_OVERRIDE));
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:9,代码来源:MessageChannelConfigurerTests.java


示例13: testOriginalContentTypeHeaderOnly

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Test
public void testOriginalContentTypeHeaderOnly() throws Exception {
	User specificRecord = new User();
	specificRecord.setName("joe");
	Schema v1 = new Schema.Parser().parse(AvroMessageConverterSerializationTests.class
			.getClassLoader().getResourceAsStream("schemas/user.avsc"));
	GenericRecord genericRecord = new GenericData.Record(v1);
	genericRecord.put("name", "joe");
	SchemaRegistryClient client = new DefaultSchemaRegistryClient();
	client.register("user", "avro", v1.toString());
	AvroSchemaRegistryClientMessageConverter converter = new AvroSchemaRegistryClientMessageConverter(
			client, new NoOpCacheManager());
	converter.setDynamicSchemaGenerationEnabled(false);
	converter.afterPropertiesSet();
	ByteArrayOutputStream baos = new ByteArrayOutputStream();
	DatumWriter<User> writer = new SpecificDatumWriter<>(User.class);
	Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
	writer.write(specificRecord, encoder);
	encoder.flush();
	Message source = MessageBuilder.withPayload(baos.toByteArray())
			.setHeader(MessageHeaders.CONTENT_TYPE,
					MimeTypeUtils.APPLICATION_OCTET_STREAM)
			.setHeader(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE,
					"application/vnd.user.v1+avro")
			.build();
	Object converted = converter.fromMessage(source, User.class);
	Assert.assertNotNull(converted);
	Assert.assertEquals(specificRecord.getName().toString(),
			((User) converted).getName().toString());

}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:32,代码来源:AvroMessageConverterSerializationTests.java


示例14: testPartitionedModuleJava

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Test
@Override
public void testPartitionedModuleJava() throws Exception {
	KinesisTestBinder binder = getBinder();

	ExtendedConsumerProperties<KinesisConsumerProperties> consumerProperties = createConsumerProperties();
	consumerProperties.setConcurrency(2);
	consumerProperties.setInstanceCount(3);
	consumerProperties.setInstanceIndex(0);
	consumerProperties.setPartitioned(true);

	final List<Message<?>> results = new ArrayList<>();
	final CountDownLatch receiveLatch = new CountDownLatch(3);

	MessageHandler receivingHandler = message -> {
		results.add(message);
		receiveLatch.countDown();
	};

	DirectChannel input0 = createBindableChannel("test.input0J", new BindingProperties());
	input0.subscribe(receivingHandler);

	Binding<MessageChannel> input0Binding = binder.bindConsumer("partJ.0", "testPartitionedModuleJava", input0,
			consumerProperties);

	consumerProperties.setInstanceIndex(1);

	DirectChannel input1 = createBindableChannel("test.input1J", new BindingProperties());
	input1.subscribe(receivingHandler);

	Binding<MessageChannel> input1Binding = binder.bindConsumer("partJ.0", "testPartitionedModuleJava", input1,
			consumerProperties);

	consumerProperties.setInstanceIndex(2);

	DirectChannel input2 = createBindableChannel("test.input2J", new BindingProperties());
	input2.subscribe(receivingHandler);

	Binding<MessageChannel> input2Binding = binder.bindConsumer("partJ.0", "testPartitionedModuleJava", input2,
			consumerProperties);

	ExtendedProducerProperties<KinesisProducerProperties> producerProperties = createProducerProperties();
	producerProperties.setPartitionKeyExtractorClass(PartitionTestSupport.class);
	producerProperties.setPartitionSelectorClass(PartitionTestSupport.class);
	producerProperties.setPartitionCount(3);

	DirectChannel output = createBindableChannel("test.output",
			createProducerBindingProperties(producerProperties));

	Binding<MessageChannel> outputBinding = binder.bindProducer("partJ.0", output, producerProperties);
	if (usesExplicitRouting()) {
		Object endpoint = extractEndpoint(outputBinding);
		assertThat(getEndpointRouting(endpoint))
				.contains(getExpectedRoutingBaseDestination("partJ.0", "testPartitionedModuleJava")
						+ "-' + headers['" + BinderHeaders.PARTITION_HEADER + "']");
	}

	output.send(new GenericMessage<>(2));
	output.send(new GenericMessage<>(1));
	output.send(new GenericMessage<>(0));

	assertThat(receiveLatch.await(20, TimeUnit.SECONDS)).isTrue();

	assertThat(results).extracting("payload").containsExactlyInAnyOrder("0", "1", "2");

	input0Binding.unbind();
	input1Binding.unbind();
	input2Binding.unbind();
	outputBinding.unbind();
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-aws-kinesis,代码行数:71,代码来源:KinesisBinderTests.java


示例15: assertMessageReceive

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
private void assertMessageReceive(QueueChannel moduleInputChannel, String payload) {
	Message<?> inbound = receive(moduleInputChannel);
	assertNotNull(inbound);
	assertEquals(payload, new String((byte[])inbound.getPayload()));
	assertNull(inbound.getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE));
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-redis,代码行数:7,代码来源:RawModeRedisBinderTests.java


示例16: testTrustedPackages

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@SuppressWarnings({"rawtypes", "unchecked"})
@Test
public void testTrustedPackages() throws Exception {
	Binder binder = getBinder();

	BindingProperties producerBindingProperties = createProducerBindingProperties(createProducerProperties());
	DirectChannel moduleOutputChannel = createBindableChannel("output", producerBindingProperties);

	ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
	consumerProperties.getExtension().setTrustedPackages(new String[]{"org.springframework.util"});

	DirectChannel moduleInputChannel = createBindableChannel("input", createConsumerBindingProperties(consumerProperties));

	Binding<MessageChannel> producerBinding = binder.bindProducer("bar.0", moduleOutputChannel,
			producerBindingProperties.getProducer());

	Binding<MessageChannel> consumerBinding = binder.bindConsumer("bar.0",
			"testSendAndReceiveNoOriginalContentType", moduleInputChannel, consumerProperties);
	binderBindUnbindLatency();

	Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("foo")
			.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
			.setHeader("foo", MimeTypeUtils.TEXT_PLAIN)
			.build();

	moduleOutputChannel.send(message);
	CountDownLatch latch = new CountDownLatch(1);
	AtomicReference<Message<String>> inboundMessageRef = new AtomicReference<>();
	moduleInputChannel.subscribe(message1 -> {
		try {
			inboundMessageRef.set((Message<String>) message1);
		}
		finally {
			latch.countDown();
		}
	});
	Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");


	Assertions.assertThat(inboundMessageRef.get()).isNotNull();
	Assertions.assertThat(inboundMessageRef.get().getPayload()).isEqualTo("foo");
	Assertions.assertThat(inboundMessageRef.get().getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)).isNull();
	Assertions.assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
			.isEqualTo(MimeTypeUtils.TEXT_PLAIN);
	Assertions.assertThat(inboundMessageRef.get().getHeaders().get("foo")).isInstanceOf(MimeType.class);
	MimeType actual = (MimeType) inboundMessageRef.get().getHeaders().get("foo");
	Assertions.assertThat(actual).isEqualTo(MimeTypeUtils.TEXT_PLAIN);
	producerBinding.unbind();
	consumerBinding.unbind();
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:51,代码来源:KafkaBinderTests.java


示例17: createProducerMessageHandler

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Override
protected MessageHandler createProducerMessageHandler(final ProducerDestination producerDestination,
		ExtendedProducerProperties<RabbitProducerProperties> producerProperties, MessageChannel errorChannel) {
	Assert.state(!HeaderMode.embeddedHeaders.equals(producerProperties.getHeaderMode()),
			"the RabbitMQ binder does not support embedded headers since RabbitMQ supports headers natively");
	String prefix = producerProperties.getExtension().getPrefix();
	String exchangeName = producerDestination.getName();
	String destination = StringUtils.isEmpty(prefix) ? exchangeName : exchangeName.substring(prefix.length());
	final AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(
			buildRabbitTemplate(producerProperties.getExtension(), errorChannel != null));
	endpoint.setExchangeName(producerDestination.getName());
	RabbitProducerProperties extendedProperties = producerProperties.getExtension();
	boolean expressionInterceptorNeeded = expressionInterceptorNeeded(extendedProperties);
	String routingKeyExpression = extendedProperties.getRoutingKeyExpression();
	if (!producerProperties.isPartitioned()) {
		if (routingKeyExpression == null) {
			endpoint.setRoutingKey(destination);
		}
		else {
			if (expressionInterceptorNeeded) {
				endpoint.setRoutingKeyExpressionString("headers['"
						+ RabbitExpressionEvaluatingInterceptor.ROUTING_KEY_HEADER + "']");
			}
			else {
				endpoint.setRoutingKeyExpressionString(routingKeyExpression);
			}
		}
	}
	else {
		if (routingKeyExpression == null) {
			endpoint.setRoutingKeyExpressionString(buildPartitionRoutingExpression(destination, false));
		}
		else {
			if (expressionInterceptorNeeded) {
				endpoint.setRoutingKeyExpressionString(buildPartitionRoutingExpression("headers['"
						+ RabbitExpressionEvaluatingInterceptor.ROUTING_KEY_HEADER + "']", true));
			}
			else {
				endpoint.setRoutingKeyExpressionString(buildPartitionRoutingExpression(routingKeyExpression,
						true));
			}
		}
	}
	if (extendedProperties.getDelayExpression() != null) {
		if (expressionInterceptorNeeded) {
			endpoint.setDelayExpressionString("headers['"
					+ RabbitExpressionEvaluatingInterceptor.DELAY_HEADER + "']");
		}
		else {
			endpoint.setDelayExpressionString(extendedProperties.getDelayExpression());
		}
	}
	DefaultAmqpHeaderMapper mapper = DefaultAmqpHeaderMapper.outboundMapper();
	List<String> headerPatterns = new ArrayList<>(extendedProperties.getHeaderPatterns().length + 1);
	headerPatterns.add("!" + BinderHeaders.PARTITION_HEADER);
	headerPatterns.addAll(Arrays.asList(extendedProperties.getHeaderPatterns()));
	mapper.setRequestHeaderNames(headerPatterns.toArray(new String[headerPatterns.size()]));
	endpoint.setHeaderMapper(mapper);
	endpoint.setDefaultDeliveryMode(extendedProperties.getDeliveryMode());
	endpoint.setBeanFactory(this.getBeanFactory());
	if (errorChannel != null) {
		checkConnectionFactoryIsErrorCapable();
		endpoint.setReturnChannel(errorChannel);
		endpoint.setConfirmNackChannel(errorChannel);
		endpoint.setConfirmCorrelationExpressionString("#root");
		endpoint.setErrorMessageStrategy(new DefaultErrorMessageStrategy());
	}
	endpoint.afterPropertiesSet();
	return endpoint;
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-rabbit,代码行数:71,代码来源:RabbitMessageChannelBinder.java


示例18: buildPartitionRoutingExpression

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
private String buildPartitionRoutingExpression(String expressionRoot, boolean rootIsExpression) {
	return rootIsExpression
			? expressionRoot + " + '-' + headers['" + BinderHeaders.PARTITION_HEADER + "']"
			: "'" + expressionRoot + "-' + headers['" + BinderHeaders.PARTITION_HEADER + "']";
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-rabbit,代码行数:6,代码来源:RabbitMessageChannelBinder.java


示例19: checkRkExpressionForPartitionedModuleSpEL

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Override
protected void checkRkExpressionForPartitionedModuleSpEL(Object endpoint) {
	assertThat(getEndpointRouting(endpoint)).contains(getExpectedRoutingBaseDestination("'part.0'", "test")
			+ " + '-' + headers['" + BinderHeaders.PARTITION_HEADER + "']");
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-rabbit,代码行数:6,代码来源:RabbitBinderTests.java


示例20: preSend

import org.springframework.cloud.stream.binder.BinderHeaders; //导入依赖的package包/类
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
	Class<?> targetClass = null;
	MessageConverter converter = null;
	MimeType contentType = message.getHeaders().containsKey(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)
				? MimeType.valueOf((String)message.getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE))
						: contentTypeResolver.resolve(message.getHeaders());

	if (contentType != null){
		if (equalTypeAndSubType(MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT, contentType) ||
				equalTypeAndSubType(MessageConverterUtils.X_JAVA_OBJECT, contentType)) {
			// for Java and Kryo de-serialization we need to reset the content type
			message = MessageBuilder.fromMessage(message).setHeader(MessageHeaders.CONTENT_TYPE, contentType).build();
			converter = equalTypeAndSubType(MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT, contentType)
					? converterFactory.getMessageConverterForType(contentType)
							: converterFactory.getMessageConverterForAllRegistered();
			String targetClassName = contentType.getParameter("type");
			if (StringUtils.hasText(targetClassName)) {
				try {
					targetClass = Class.forName(targetClassName, false, Thread.currentThread().getContextClassLoader());
				}
				catch (Exception e) {
					throw new IllegalStateException("Failed to determine class name for contentType: "
							+ message.getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE), e);
				}
			}
		}

	}

	Object payload;
	if (converter != null){
		Assert.isTrue(!(equalTypeAndSubType(MessageConverterUtils.X_JAVA_OBJECT, contentType) && targetClass == null),
				"Cannot deserialize into message since 'contentType` is not "
					+ "encoded with the actual target type."
					+ "Consider 'application/x-java-object; type=foo.bar.MyClass'");
		payload = converter.fromMessage(message, targetClass);
	}
	else {
		MimeType deserializeContentType = contentTypeResolver.resolve(message.getHeaders());
		if (deserializeContentType == null) {
			deserializeContentType = contentType;
		}
		payload = deserializeContentType == null ? message.getPayload() : this.deserializePayload(message.getPayload(), deserializeContentType);
	}
	message = MessageBuilder.withPayload(payload)
			.copyHeaders(message.getHeaders())
			.setHeader(MessageHeaders.CONTENT_TYPE, contentType)
			.removeHeader(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)
			.build();
	return message;
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:53,代码来源:TestSupportBinder.java



注:本文中的org.springframework.cloud.stream.binder.BinderHeaders类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java CloseReason类代码示例发布时间:2022-05-23
下一篇:
Java ChildOf类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap