• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java Consumer类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.springframework.integration.dsl.support.Consumer的典型用法代码示例。如果您正苦于以下问题:Java Consumer类的具体用法?Java Consumer怎么用?Java Consumer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Consumer类属于org.springframework.integration.dsl.support包,在下文中一共展示了Consumer类的12个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: ftpInboundFlow

import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean
public IntegrationFlow ftpInboundFlow(FtpSinkProperties properties, SessionFactory<FTPFile> ftpSessionFactory) {
	FtpMessageHandlerSpec handlerSpec =
		Ftp.outboundAdapter(new FtpRemoteFileTemplate(ftpSessionFactory), properties.getMode())
			.remoteDirectory(properties.getRemoteDir())
			.remoteFileSeparator(properties.getRemoteFileSeparator())
			.autoCreateDirectory(properties.isAutoCreateDir())
			.temporaryFileSuffix(properties.getTmpFileSuffix());
	if (properties.getFilenameExpression() != null) {
		handlerSpec.fileNameExpression(properties.getFilenameExpression().getExpressionString());
	}
	return IntegrationFlows.from(Sink.INPUT)
		.handle(handlerSpec,
			new Consumer<GenericEndpointSpec<FileTransferringMessageHandler<FTPFile>>>() {
				@Override
				public void accept(GenericEndpointSpec<FileTransferringMessageHandler<FTPFile>> e) {
					e.autoStartup(false);
				}
			})
		.get();
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-app-starters,代码行数:22,代码来源:FtpSinkConfiguration.java


示例2: ftpInboundFlow

import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean
public IntegrationFlow ftpInboundFlow(SftpSinkProperties properties, SessionFactory<LsEntry> ftpSessionFactory) {
	SftpMessageHandlerSpec handlerSpec =
		Sftp.outboundAdapter(new SftpRemoteFileTemplate(ftpSessionFactory), properties.getMode())
			.remoteDirectory(properties.getRemoteDir())
			.remoteFileSeparator(properties.getRemoteFileSeparator())
			.autoCreateDirectory(properties.isAutoCreateDir())
			.temporaryFileSuffix(properties.getTmpFileSuffix());
	if (properties.getFilenameExpression() != null) {
		handlerSpec.fileNameExpression(properties.getFilenameExpression().getExpressionString());
	}
	return IntegrationFlows.from(Sink.INPUT)
		.handle(handlerSpec,
			new Consumer<GenericEndpointSpec<FileTransferringMessageHandler<LsEntry>>>() {
				@Override
				public void accept(GenericEndpointSpec<FileTransferringMessageHandler<LsEntry>> e) {
					e.autoStartup(false);
				}
			})
		.get();
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-app-starters,代码行数:22,代码来源:SftpSinkConfiguration.java


示例3: pollingFlow

import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean
public IntegrationFlow pollingFlow() {
	IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(jdbcMessageSource(),
			new Consumer<SourcePollingChannelAdapterSpec>() {

				@Override
				public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
					sourcePollingChannelAdapterSpec.poller(poller);
				}

			});
	if (this.properties.isSplit()) {
		flowBuilder.split();
	}
	flowBuilder.channel(this.source.output());
	return flowBuilder.get();
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-app-starters,代码行数:18,代码来源:JdbcSourceConfiguration.java


示例4: producer

import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean(name = OUTBOUND_ID)
IntegrationFlow producer() {

    log.info("starting producer flow..");

    return flowDefinition -> {
        Consumer<KafkaProducerMessageHandlerSpec.ProducerMetadataSpec> producerMetadataSpecConsumer =
                (KafkaProducerMessageHandlerSpec.ProducerMetadataSpec metadata) ->
                        metadata.async(true)
                                .batchNumMessages(10)
                                .valueClassType(String.class)
                                .<String>valueEncoder(String::getBytes);

        KafkaProducerMessageHandlerSpec messageHandlerSpec =
                Kafka.outboundChannelAdapter(props -> props.put("queue.buffering.max.ms", "15000"))
                        .messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
                        .addProducer(this.kafkaConfig.getTopic(), this.kafkaConfig.getBrokerAddress(), producerMetadataSpecConsumer);
        flowDefinition
                .handle(messageHandlerSpec);
    };
}
 
开发者ID:joshlong,项目名称:spring-and-kafka,代码行数:22,代码来源:DemoApplication.java


示例5: consumer

import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean
IntegrationFlow consumer() {

    log.info("starting consumer..");

    KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka.inboundChannelAdapter(
            new ZookeeperConnect(this.kafkaConfig.getZookeeperAddress()))
            .consumerProperties(props ->
                    props.put("auto.offset.reset", "smallest")
                            .put("auto.commit.interval.ms", "100"))
            .addConsumer("myGroup", metadata -> metadata.consumerTimeout(100)
                    .topicStreamMap(m -> m.put(this.kafkaConfig.getTopic(), 1))
                    .maxMessages(10)
                    .valueDecoder(String::new));

    Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer = e -> e.poller(p -> p.fixedDelay(100));

    return IntegrationFlows
            .from(messageSourceSpec, endpointConfigurer)
            .<Map<String, List<String>>>handle((payload, headers) -> {
                payload.entrySet().forEach(e -> log.info(e.getKey() + '=' + e.getValue()));
                return null;
            })
            .get();
}
 
开发者ID:joshlong,项目名称:spring-and-kafka,代码行数:26,代码来源:DemoApplication.java


示例6: consumer

import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean IntegrationFlow consumer() {
  KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka.inboundChannelAdapter(
      new ZookeeperConnect("0.0.0.0:2181"))
        .consumerProperties(props ->
            props.put("auto.offset.reset", "smallest")
                 .put("auto.commit.interval.ms", "100"))
        .addConsumer("myGroup", metadata -> metadata.consumerTimeout(100)
          .topicStreamMap(m -> m.put("test-topic", 1))
          .maxMessages(10)
          .valueDecoder(String::new));

  Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer = e -> e.poller(p -> p.fixedDelay(100));

  return IntegrationFlows
    .from(messageSourceSpec, endpointConfigurer)
    .<Map<String, List<String>>>handle((payload, headers) -> {
        payload.entrySet().forEach(e -> System.out.println(e.getKey() + '=' + e.getValue()));
        return null;
    })
    .get();
}
 
开发者ID:jadekler,项目名称:git-java-kafka-example,代码行数:22,代码来源:ConsumerConfiguration.java


示例7: ftpInboundFlow

import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean
public IntegrationFlow ftpInboundFlow(SessionFactory<FTPFile> ftpSessionFactory, FtpSourceProperties properties,
		FileConsumerProperties fileConsumerProperties) {
	FtpInboundChannelAdapterSpec messageSourceBuilder = Ftp.inboundAdapter(ftpSessionFactory)
			.preserveTimestamp(properties.isPreserveTimestamp())
			.remoteDirectory(properties.getRemoteDir())
			.remoteFileSeparator(properties.getRemoteFileSeparator())
			.localDirectory(properties.getLocalDir())
			.autoCreateLocalDirectory(properties.isAutoCreateLocalDir())
			.temporaryFileSuffix(properties.getTmpFileSuffix())
			.deleteRemoteFiles(properties.isDeleteRemoteFiles());

	if (StringUtils.hasText(properties.getFilenamePattern())) {
		messageSourceBuilder.filter(new FtpSimplePatternFileListFilter(properties.getFilenamePattern()));
	}
	else if (properties.getFilenameRegex() != null) {
		messageSourceBuilder
				.filter(new FtpRegexPatternFileListFilter(properties.getFilenameRegex()));
	}

	IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(messageSourceBuilder
			, new Consumer<SourcePollingChannelAdapterSpec>() {

		@Override
		public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
			sourcePollingChannelAdapterSpec
					.poller(FtpSourceConfiguration.this.defaultPoller);
		}

	});

	return FileUtils.enhanceFlowForReadingMode(flowBuilder, fileConsumerProperties)
			.channel(this.source.output())
			.get();
}
 
开发者ID:spring-cloud-stream-app-starters,项目名称:ftp,代码行数:36,代码来源:FtpSourceConfiguration.java


示例8: sftpInboundFlow

import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean
public IntegrationFlow sftpInboundFlow(SessionFactory<LsEntry> sftpSessionFactory, SftpSourceProperties properties,
		FileConsumerProperties fileConsumerProperties) {
	SftpInboundChannelAdapterSpec messageSourceBuilder = Sftp.inboundAdapter(sftpSessionFactory)
			.preserveTimestamp(properties.isPreserveTimestamp())
			.remoteDirectory(properties.getRemoteDir())
			.remoteFileSeparator(properties.getRemoteFileSeparator())
			.localDirectory(properties.getLocalDir())
			.autoCreateLocalDirectory(properties.isAutoCreateLocalDir())
			.temporaryFileSuffix(properties.getTmpFileSuffix())
			.deleteRemoteFiles(properties.isDeleteRemoteFiles());

	if (StringUtils.hasText(properties.getFilenamePattern())) {
		messageSourceBuilder.filter(new SftpSimplePatternFileListFilter(properties.getFilenamePattern()));
	}
	else if (properties.getFilenameRegex() != null) {
		messageSourceBuilder
				.filter(new SftpRegexPatternFileListFilter(properties.getFilenameRegex()));
	}

	IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(messageSourceBuilder
			, new Consumer<SourcePollingChannelAdapterSpec>() {

		@Override
		public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
			sourcePollingChannelAdapterSpec
					.poller(SftpSourceConfiguration.this.defaultPoller);
		}

	});

	return FileUtils.enhanceFlowForReadingMode(flowBuilder, fileConsumerProperties)
			.channel(this.source.output())
			.get();
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-app-starters,代码行数:36,代码来源:SftpSourceConfiguration.java


示例9: fileSourceFlow

import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean
public IntegrationFlow fileSourceFlow() {
	FileInboundChannelAdapterSpec messageSourceSpec = Files.inboundAdapter(new File(this.properties.getDirectory()));

	if (StringUtils.hasText(this.properties.getFilenamePattern())) {
		messageSourceSpec.patternFilter(this.properties.getFilenamePattern());
	} else if (this.properties.getFilenameRegex() != null) {
		messageSourceSpec.regexFilter(this.properties.getFilenameRegex().pattern());
	}

	if (this.properties.isPreventDuplicates()) {
		messageSourceSpec.preventDuplicates();
	}

	IntegrationFlowBuilder flowBuilder = IntegrationFlows
			.from(messageSourceSpec,
					new Consumer<SourcePollingChannelAdapterSpec>() {

						@Override
						public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
							sourcePollingChannelAdapterSpec
									.poller(defaultPoller);
						}

					});
	return FileUtils.enhanceFlowForReadingMode(flowBuilder, this.fileConsumerProperties)
			.channel(source.output())
			.get();
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-app-starters,代码行数:30,代码来源:FileSourceConfiguration.java


示例10: producer

import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean(name = OUTBOUND_ID)
public IntegrationFlow producer() {

	log.info("starting producer flow..");

	return flowDefinition -> {
		Consumer<KafkaProducerMessageHandlerSpec.ProducerMetadataSpec> producerMetadataSpecConsumer = (
				KafkaProducerMessageHandlerSpec.ProducerMetadataSpec metadata) -> metadata
				.async(true).batchNumMessages(5)
				.valueClassType(String.class);

		Consumer<PropertiesBuilder> producerProperties = props -> props
				.put("queue.buffering.max.ms", "15000");
		Function<Message<Object>, ?> messageKey = m -> m.getHeaders().get(
				IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER);
		KafkaProducerMessageHandlerSpec outboundChannelAdapter = Kafka
				.outboundChannelAdapter(producerProperties);
		String topic = this.kafkaConfig.getTopic();
		String brokerAddress = this.kafkaConfig.getBrokerAddress();

		KafkaProducerMessageHandlerSpec messageHandlerSpec = outboundChannelAdapter
				.messageKey(messageKey).addProducer(topic, brokerAddress,
						producerMetadataSpecConsumer);

		flowDefinition.handle(messageHandlerSpec);
	};
}
 
开发者ID:codecentric,项目名称:event-based-shopping-system,代码行数:28,代码来源:OrderEntryProducerConfiguration.java


示例11: consumer

import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
@Bean
IntegrationFlow consumer() {

	log.info("starting consumer..");

	KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka
			.inboundChannelAdapter(
					new ZookeeperConnect(this.kafkaConfig
							.getZookeeperAddress()))
			.consumerProperties(
					props -> props.put("auto.offset.reset", "smallest")
							.put("auto.commit.interval.ms", "100"))
			.addConsumer(
					"myGroup",
					metadata -> metadata
							.consumerTimeout(100)
							.topicStreamMap(
									m -> m.put(this.kafkaConfig.getTopic(),
											1)).maxMessages(1)
							.valueDecoder(String::new));

	Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer = e -> e.poller(p -> p.fixedDelay(100));

	return IntegrationFlows
			.from(messageSourceSpec, endpointConfigurer)
			.<Map<String, ConcurrentHashMap<String, String>>> handle(
					(payload, headers) -> {
						payload.entrySet().forEach(
								e -> orderEntryService.createOrderEntryFromJson(e.getValue()));
						return null;
					}).get();
}
 
开发者ID:codecentric,项目名称:event-based-shopping-system,代码行数:33,代码来源:CommoditiesReservationConsumerConfiguration.java


示例12: getFlowBuilder

import org.springframework.integration.dsl.support.Consumer; //导入依赖的package包/类
/**
 * Method to build Integration Flow for Mail. Suppress Warnings for
 * MailInboundChannelAdapterSpec.
 * @return Integration Flow object for Mail Source
 */
@SuppressWarnings({ "rawtypes", "unchecked" })
private IntegrationFlowBuilder getFlowBuilder() {

	IntegrationFlowBuilder flowBuilder;
	URLName urlName = this.properties.getUrl();

	if (this.properties.isIdleImap()) {
		flowBuilder = getIdleImapFlow(urlName);
	}
	else {

		MailInboundChannelAdapterSpec adapterSpec;
		switch (urlName.getProtocol().toUpperCase()) {
			case "IMAP":
			case "IMAPS":
				adapterSpec = getImapFlowBuilder(urlName);
				break;
			case "POP3":
			case "POP3S":
				adapterSpec = getPop3FlowBuilder(urlName);
				break;
			default:
				throw new IllegalArgumentException(
						"Unsupported mail protocol: " + urlName.getProtocol());
		}
		flowBuilder = IntegrationFlows.from(
				adapterSpec.javaMailProperties(getJavaMailProperties(urlName))
						.selectorExpression(this.properties.getExpression())
						.shouldDeleteMessages(this.properties.isDelete()),
				new Consumer<SourcePollingChannelAdapterSpec>() {

					@Override
					public void accept(
							SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
						sourcePollingChannelAdapterSpec.poller(MailSourceConfiguration.this.defaultPoller);
					}

				});

	}
	return flowBuilder;
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-app-starters,代码行数:48,代码来源:MailSourceConfiguration.java



注:本文中的org.springframework.integration.dsl.support.Consumer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java ApplicationStartDataProto类代码示例发布时间:2022-05-22
下一篇:
Java ProxyManager类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap