Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
220 views
in Technique[技术] by (71.8m points)

amazon s3 - "For an upload InputStream with no MD5 digest metadata, the markSupported() method must evaluate to true." in Spring Integration AWS

UPDATE: There is bug in spring-integration-aws-2.3.4

I am integrating SFTP (SftpStreamingMessageSource) as source with S3 as destination. I have similar Spring Integration configuration:

    @Bean
    public S3MessageHandler.UploadMetadataProvider uploadMetadataProvider() {
        return (metadata, message) -> {
            if ( message.getPayload() instanceof DigestInputStream) {
                metadata.setContentType( MediaType.APPLICATION_JSON_VALUE );
                // can not read stream to manually compute MD5
                // metadata.setContentMD5("BLABLA==");
                // this is wrong approach:  metadata.setContentMD5(BinaryUtils.toBase64((((DigestInputStream) message.getPayload()).getMessageDigest().digest()));
            }
        };
    }
    @Bean
    @InboundChannelAdapter(channel = "ftpStream")
    public MessageSource<InputStream> ftpSource(SftpRemoteFileTemplate template) {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template);
        messageSource.setRemoteDirectory("foo");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        messageSource.setLoggingEnabled(true);
        messageSource.setCountsEnabled(true);
        return messageSource;
    }
...
    @Bean
    @ServiceActivator(inputChannel = "ftpStream")
    public MessageHandler s3MessageHandler(AmazonS3 amazonS3, S3MessageHandler.UploadMetadataProvider uploadMetadataProvider) {
        S3MessageHandler messageHandler = new S3MessageHandler(amazonS3, "bucketName");
        messageHandler.setLoggingEnabled(true);
        messageHandler.setCountsEnabled(true);
        messageHandler.setCommand(S3MessageHandler.Command.UPLOAD);
        messageHandler.setUploadMetadataProvider(uploadMetadataProvider);
        messageHandler.setKeyExpression(new ValueExpression<>("key"));
        return messageHandler;
    }

After start, I am getting following error "For an upload InputStream with no MD5 digest metadata, the markSupported() method must evaluate to true."

This is because ftpSource is producing InputStream payload without mark/reset support. I even tried to transform InputStream to BufferedInputStream using @Transformer e.g. following

return new BufferedInputStream((InputStream) message.getPayload());

and no success, because then I am getting message "java.io.IOException: Stream closed" because S3MessageHandler:338 is calling Md5Utils.md5AsBase64(inputStream) which closes stream too early.

How to generate MD5 for all messages in Spring Integration AWS without pain?

I am using spring-integration-aws-2.3.4.RELEASE

question from:https://stackoverflow.com/questions/65892759/for-an-upload-inputstream-with-no-md5-digest-metadata-the-marksupported-meth

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

The S3MessageHandler does this:

if (payload instanceof InputStream) {
                InputStream inputStream = (InputStream) payload;
                if (metadata.getContentMD5() == null) {
                    Assert.state(inputStream.markSupported(),
                            "For an upload InputStream with no MD5 digest metadata, "
                                    + "the markSupported() method must evaluate to true.");
                    String contentMd5 = Md5Utils.md5AsBase64(inputStream);
                    metadata.setContentMD5(contentMd5);
                    inputStream.reset();
                }
                putObjectRequest = new PutObjectRequest(bucketName, key, inputStream, metadata);
            }

Where that Md5Utils.md5AsBase64() closes an InputStream in the end - bad for us.

This is an omission on our side. Please, raise a GH issue and we will fix it ASAP. Or feel free to provide a contribution.

As a workaround I would suggest to have a transformer upfront of this S3MessageHandler with the code like:

return org.springframework.util.StreamUtils.copyToByteArray(inputStream);

This way you will have already a byte[] as a payload for the S3MessageHandler which will use a different branch for processing:

 else if (payload instanceof byte[]) {
                byte[] payloadBytes = (byte[]) payload;
                InputStream inputStream = new ByteArrayInputStream(payloadBytes);
                if (metadata.getContentMD5() == null) {
                    String contentMd5 = Md5Utils.md5AsBase64(inputStream);
                    metadata.setContentMD5(contentMd5);
                    inputStream.reset();
                }
                if (metadata.getContentLength() == 0) {
                    metadata.setContentLength(payloadBytes.length);
                }
                putObjectRequest = new PutObjectRequest(bucketName, key, inputStream, metadata);
            }

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...