本文整理汇总了Java中org.springframework.integration.dsl.support.Consumer类的典型用法代码示例。如果您正苦于以下问题:Java Consumer类的具体用法?Java Consumer怎么用?Java Consumer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Consumer类属于org.springframework.integration.dsl.support包,在下文中一共展示了Consumer类的12个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: ftpInboundFlow
import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean
public IntegrationFlow ftpInboundFlow(FtpSinkProperties properties, SessionFactory<FTPFile> ftpSessionFactory) {
FtpMessageHandlerSpec handlerSpec =
Ftp.outboundAdapter(new FtpRemoteFileTemplate(ftpSessionFactory), properties.getMode())
.remoteDirectory(properties.getRemoteDir())
.remoteFileSeparator(properties.getRemoteFileSeparator())
.autoCreateDirectory(properties.isAutoCreateDir())
.temporaryFileSuffix(properties.getTmpFileSuffix());
if (properties.getFilenameExpression() != null) {
handlerSpec.fileNameExpression(properties.getFilenameExpression().getExpressionString());
}
return IntegrationFlows.from(Sink.INPUT)
.handle(handlerSpec,
new Consumer<GenericEndpointSpec<FileTransferringMessageHandler<FTPFile>>>() {
@Override
public void accept(GenericEndpointSpec<FileTransferringMessageHandler<FTPFile>> e) {
e.autoStartup(false);
}
})
.get();
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-app-starters,代码行数:22,代码来源:FtpSinkConfiguration.java
示例2: ftpInboundFlow
import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean
public IntegrationFlow ftpInboundFlow(SftpSinkProperties properties, SessionFactory<LsEntry> ftpSessionFactory) {
SftpMessageHandlerSpec handlerSpec =
Sftp.outboundAdapter(new SftpRemoteFileTemplate(ftpSessionFactory), properties.getMode())
.remoteDirectory(properties.getRemoteDir())
.remoteFileSeparator(properties.getRemoteFileSeparator())
.autoCreateDirectory(properties.isAutoCreateDir())
.temporaryFileSuffix(properties.getTmpFileSuffix());
if (properties.getFilenameExpression() != null) {
handlerSpec.fileNameExpression(properties.getFilenameExpression().getExpressionString());
}
return IntegrationFlows.from(Sink.INPUT)
.handle(handlerSpec,
new Consumer<GenericEndpointSpec<FileTransferringMessageHandler<LsEntry>>>() {
@Override
public void accept(GenericEndpointSpec<FileTransferringMessageHandler<LsEntry>> e) {
e.autoStartup(false);
}
})
.get();
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-app-starters,代码行数:22,代码来源:SftpSinkConfiguration.java
示例3: pollingFlow
import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean
public IntegrationFlow pollingFlow() {
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(jdbcMessageSource(),
new Consumer<SourcePollingChannelAdapterSpec>() {
@Override
public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
sourcePollingChannelAdapterSpec.poller(poller);
}
});
if (this.properties.isSplit()) {
flowBuilder.split();
}
flowBuilder.channel(this.source.output());
return flowBuilder.get();
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-app-starters,代码行数:18,代码来源:JdbcSourceConfiguration.java
示例4: producer
import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean(name = OUTBOUND_ID)
IntegrationFlow producer() {
log.info("starting producer flow..");
return flowDefinition -> {
Consumer<KafkaProducerMessageHandlerSpec.ProducerMetadataSpec> producerMetadataSpecConsumer =
(KafkaProducerMessageHandlerSpec.ProducerMetadataSpec metadata) ->
metadata.async(true)
.batchNumMessages(10)
.valueClassType(String.class)
.<String>valueEncoder(String::getBytes);
KafkaProducerMessageHandlerSpec messageHandlerSpec =
Kafka.outboundChannelAdapter(props -> props.put("queue.buffering.max.ms", "15000"))
.messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.addProducer(this.kafkaConfig.getTopic(), this.kafkaConfig.getBrokerAddress(), producerMetadataSpecConsumer);
flowDefinition
.handle(messageHandlerSpec);
};
}
开发者ID:joshlong,项目名称:spring-and-kafka,代码行数:22,代码来源:DemoApplication.java
示例5: consumer
import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean
IntegrationFlow consumer() {
log.info("starting consumer..");
KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka.inboundChannelAdapter(
new ZookeeperConnect(this.kafkaConfig.getZookeeperAddress()))
.consumerProperties(props ->
props.put("auto.offset.reset", "smallest")
.put("auto.commit.interval.ms", "100"))
.addConsumer("myGroup", metadata -> metadata.consumerTimeout(100)
.topicStreamMap(m -> m.put(this.kafkaConfig.getTopic(), 1))
.maxMessages(10)
.valueDecoder(String::new));
Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer = e -> e.poller(p -> p.fixedDelay(100));
return IntegrationFlows
.from(messageSourceSpec, endpointConfigurer)
.<Map<String, List<String>>>handle((payload, headers) -> {
payload.entrySet().forEach(e -> log.info(e.getKey() + '=' + e.getValue()));
return null;
})
.get();
}
开发者ID:joshlong,项目名称:spring-and-kafka,代码行数:26,代码来源:DemoApplication.java
示例6: consumer
import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean IntegrationFlow consumer() {
KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka.inboundChannelAdapter(
new ZookeeperConnect("0.0.0.0:2181"))
.consumerProperties(props ->
props.put("auto.offset.reset", "smallest")
.put("auto.commit.interval.ms", "100"))
.addConsumer("myGroup", metadata -> metadata.consumerTimeout(100)
.topicStreamMap(m -> m.put("test-topic", 1))
.maxMessages(10)
.valueDecoder(String::new));
Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer = e -> e.poller(p -> p.fixedDelay(100));
return IntegrationFlows
.from(messageSourceSpec, endpointConfigurer)
.<Map<String, List<String>>>handle((payload, headers) -> {
payload.entrySet().forEach(e -> System.out.println(e.getKey() + '=' + e.getValue()));
return null;
})
.get();
}
开发者ID:jadekler,项目名称:git-java-kafka-example,代码行数:22,代码来源:ConsumerConfiguration.java
示例7: ftpInboundFlow
import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean
public IntegrationFlow ftpInboundFlow(SessionFactory<FTPFile> ftpSessionFactory, FtpSourceProperties properties,
FileConsumerProperties fileConsumerProperties) {
FtpInboundChannelAdapterSpec messageSourceBuilder = Ftp.inboundAdapter(ftpSessionFactory)
.preserveTimestamp(properties.isPreserveTimestamp())
.remoteDirectory(properties.getRemoteDir())
.remoteFileSeparator(properties.getRemoteFileSeparator())
.localDirectory(properties.getLocalDir())
.autoCreateLocalDirectory(properties.isAutoCreateLocalDir())
.temporaryFileSuffix(properties.getTmpFileSuffix())
.deleteRemoteFiles(properties.isDeleteRemoteFiles());
if (StringUtils.hasText(properties.getFilenamePattern())) {
messageSourceBuilder.filter(new FtpSimplePatternFileListFilter(properties.getFilenamePattern()));
}
else if (properties.getFilenameRegex() != null) {
messageSourceBuilder
.filter(new FtpRegexPatternFileListFilter(properties.getFilenameRegex()));
}
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(messageSourceBuilder
, new Consumer<SourcePollingChannelAdapterSpec>() {
@Override
public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
sourcePollingChannelAdapterSpec
.poller(FtpSourceConfiguration.this.defaultPoller);
}
});
return FileUtils.enhanceFlowForReadingMode(flowBuilder, fileConsumerProperties)
.channel(this.source.output())
.get();
}
开发者ID:spring-cloud-stream-app-starters,项目名称:ftp,代码行数:36,代码来源:FtpSourceConfiguration.java
示例8: sftpInboundFlow
import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean
public IntegrationFlow sftpInboundFlow(SessionFactory<LsEntry> sftpSessionFactory, SftpSourceProperties properties,
FileConsumerProperties fileConsumerProperties) {
SftpInboundChannelAdapterSpec messageSourceBuilder = Sftp.inboundAdapter(sftpSessionFactory)
.preserveTimestamp(properties.isPreserveTimestamp())
.remoteDirectory(properties.getRemoteDir())
.remoteFileSeparator(properties.getRemoteFileSeparator())
.localDirectory(properties.getLocalDir())
.autoCreateLocalDirectory(properties.isAutoCreateLocalDir())
.temporaryFileSuffix(properties.getTmpFileSuffix())
.deleteRemoteFiles(properties.isDeleteRemoteFiles());
if (StringUtils.hasText(properties.getFilenamePattern())) {
messageSourceBuilder.filter(new SftpSimplePatternFileListFilter(properties.getFilenamePattern()));
}
else if (properties.getFilenameRegex() != null) {
messageSourceBuilder
.filter(new SftpRegexPatternFileListFilter(properties.getFilenameRegex()));
}
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(messageSourceBuilder
, new Consumer<SourcePollingChannelAdapterSpec>() {
@Override
public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
sourcePollingChannelAdapterSpec
.poller(SftpSourceConfiguration.this.defaultPoller);
}
});
return FileUtils.enhanceFlowForReadingMode(flowBuilder, fileConsumerProperties)
.channel(this.source.output())
.get();
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-app-starters,代码行数:36,代码来源:SftpSourceConfiguration.java
示例9: fileSourceFlow
import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean
public IntegrationFlow fileSourceFlow() {
FileInboundChannelAdapterSpec messageSourceSpec = Files.inboundAdapter(new File(this.properties.getDirectory()));
if (StringUtils.hasText(this.properties.getFilenamePattern())) {
messageSourceSpec.patternFilter(this.properties.getFilenamePattern());
} else if (this.properties.getFilenameRegex() != null) {
messageSourceSpec.regexFilter(this.properties.getFilenameRegex().pattern());
}
if (this.properties.isPreventDuplicates()) {
messageSourceSpec.preventDuplicates();
}
IntegrationFlowBuilder flowBuilder = IntegrationFlows
.from(messageSourceSpec,
new Consumer<SourcePollingChannelAdapterSpec>() {
@Override
public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
sourcePollingChannelAdapterSpec
.poller(defaultPoller);
}
});
return FileUtils.enhanceFlowForReadingMode(flowBuilder, this.fileConsumerProperties)
.channel(source.output())
.get();
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-app-starters,代码行数:30,代码来源:FileSourceConfiguration.java
示例10: producer
import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean(name = OUTBOUND_ID)
public IntegrationFlow producer() {
log.info("starting producer flow..");
return flowDefinition -> {
Consumer<KafkaProducerMessageHandlerSpec.ProducerMetadataSpec> producerMetadataSpecConsumer = (
KafkaProducerMessageHandlerSpec.ProducerMetadataSpec metadata) -> metadata
.async(true).batchNumMessages(5)
.valueClassType(String.class);
Consumer<PropertiesBuilder> producerProperties = props -> props
.put("queue.buffering.max.ms", "15000");
Function<Message<Object>, ?> messageKey = m -> m.getHeaders().get(
IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER);
KafkaProducerMessageHandlerSpec outboundChannelAdapter = Kafka
.outboundChannelAdapter(producerProperties);
String topic = this.kafkaConfig.getTopic();
String brokerAddress = this.kafkaConfig.getBrokerAddress();
KafkaProducerMessageHandlerSpec messageHandlerSpec = outboundChannelAdapter
.messageKey(messageKey).addProducer(topic, brokerAddress,
producerMetadataSpecConsumer);
flowDefinition.handle(messageHandlerSpec);
};
}
开发者ID:codecentric,项目名称:event-based-shopping-system,代码行数:28,代码来源:OrderEntryProducerConfiguration.java
示例11: consumer
import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean
IntegrationFlow consumer() {
log.info("starting consumer..");
KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka
.inboundChannelAdapter(
new ZookeeperConnect(this.kafkaConfig
.getZookeeperAddress()))
.consumerProperties(
props -> props.put("auto.offset.reset", "smallest")
.put("auto.commit.interval.ms", "100"))
.addConsumer(
"myGroup",
metadata -> metadata
.consumerTimeout(100)
.topicStreamMap(
m -> m.put(this.kafkaConfig.getTopic(),
1)).maxMessages(1)
.valueDecoder(String::new));
Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer = e -> e.poller(p -> p.fixedDelay(100));
return IntegrationFlows
.from(messageSourceSpec, endpointConfigurer)
.<Map<String, ConcurrentHashMap<String, String>>> handle(
(payload, headers) -> {
payload.entrySet().forEach(
e -> orderEntryService.createOrderEntryFromJson(e.getValue()));
return null;
}).get();
}
开发者ID:codecentric,项目名称:event-based-shopping-system,代码行数:33,代码来源:CommoditiesReservationConsumerConfiguration.java
示例12: getFlowBuilder
import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
/**
* Method to build Integration Flow for Mail. Suppress Warnings for
* MailInboundChannelAdapterSpec.
* @return Integration Flow object for Mail Source
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
private IntegrationFlowBuilder getFlowBuilder() {
IntegrationFlowBuilder flowBuilder;
URLName urlName = this.properties.getUrl();
if (this.properties.isIdleImap()) {
flowBuilder = getIdleImapFlow(urlName);
}
else {
MailInboundChannelAdapterSpec adapterSpec;
switch (urlName.getProtocol().toUpperCase()) {
case "IMAP":
case "IMAPS":
adapterSpec = getImapFlowBuilder(urlName);
break;
case "POP3":
case "POP3S":
adapterSpec = getPop3FlowBuilder(urlName);
break;
default:
throw new IllegalArgumentException(
"Unsupported mail protocol: " + urlName.getProtocol());
}
flowBuilder = IntegrationFlows.from(
adapterSpec.javaMailProperties(getJavaMailProperties(urlName))
.selectorExpression(this.properties.getExpression())
.shouldDeleteMessages(this.properties.isDelete()),
new Consumer<SourcePollingChannelAdapterSpec>() {
@Override
public void accept(
SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
sourcePollingChannelAdapterSpec.poller(MailSourceConfiguration.this.defaultPoller);
}
});
}
return flowBuilder;
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-app-starters,代码行数:48,代码来源:MailSourceConfiguration.java
注:本文中的org.springframework.integration.dsl.support.Consumer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论