Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
568 views
in Technique[技术] by (71.8m points)

Spring Batch: Reading fixed-width file without line breaks

I've a fixed length content Flatfile which contains sample records like below and has no delimiter as such it contains special hex characters and data is spread across multiple lines too. But each line has constant 2000 bytes/characters and I need to keep picking the bytes from 1-2000, 2001-4000 and so on. I've fixed index characters.

Note - Here I don't want to read all characters from 2000 lines, just wanted to read based on Range.

Customer.java

@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class Customer {
    private String firstValue;
    private String secondValue;
    private String thirdValue;
    private String fourthValue;
}

Error

Java Bean

@Bean
public FlatFileItemReader<Customer> customerItemReader(){

    return new FlatFileItemReaderBuilder<Customer>()
            .name("customerItemReader")
            .linesToSkip(1)
            .resource(new ClassPathResource("/data/test.conv"))
            .fixedLength()
            .columns(new Range[] { new Range(3, 6), new Range(7, 13), new Range(14, 15), new Range(14, 15) })
            .names(new String[] { "firstValue", "secondValue", "thirdValue", "fourthValue" })
            .targetType(Customer.class)
            .build();
}

Error

org.springframework.batch.item.file.FlatFileParseException: Parsing error at line: 2 in resource=[class path resource [data/test.conv]], input=[560000000000411999999992052300000000D 0000        0000000000010000000100000040000000000000  00000000                    NYNNVX      N N 0      N004 000100000001000100000001000100000001000100000001000100000001000100000001000100000001                                                YNYNYYNNNNNYNNNN0004000000070000000300010000000000000000000000020000000000000000NN1N                         N00NNNND                                                                                                                                                       001NNN              00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000O840000000000000000000AN0201000000NNNC840                N  N00N A NN00400000000NNNNNUSAN       NNNN00000000000000NN141900INNNNNN   N                 N000000                 NN           200//0055//20000YNN MO    ?200528000000       !!B3K555800000001A****00             00             00             00             00             00             00             00             00             00             00             00             00             00             00             00             0005230000000000000000                                                                                                                                                                 ]
    at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:189) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:93) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:99) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:180) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:126) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:118) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410) [spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136) [spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319) [spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147) [spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) [spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_171]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_171]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) [spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at com.sun.proxy.$Proxy57.run(Unknown Source) [na:na]
    at com.example.DatabaseOutputApplication.run(DatabaseOutputApplication.java:39) [classes/:na]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784) [spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768) [spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) [spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) [spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    at com.example.DatabaseOutputApplication.main(DatabaseOutputApplication.java:29) [classes/:na]
Caused by: org.springframework.batch.item.file.transform.IncorrectLineLengthException: Line is longer than max range 15
    at org.springframework.batch.item.file.transform.FixedLengthTokenizer.doTokenize(FixedLengthTokenizer.java:113) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.item.file.transform.AbstractLineTokenizer.tokenize(AbstractLineTokenizer.java:130) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.item.file.mapping.DefaultLineMapper.mapLine(DefaultLineMapper.java:43) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:185) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]

I also tried this

 @Bean
public FlatFileItemReader<Customer> customerItemReader(){
    FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();
    tokenizer.setNames("firstValue", "secondValue", "thirdValue", "fourthValue", "fifthValue", "sixthValue", "seventhValue", "eighthValue", "ninethValue", "dummyRange");
    tokenizer.setColumns(
            new Range(3, 6), new Range(7, 13), new Range(14,15), new Range(16,24), new Range(25, 28), new Range(29,32), new Range(33, 36), new Range(1322, 1324),
            new Range(1406, 1408), new Range(1409));

    DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
    customerLineMapper.setLineTokenizer(tokenizer);
    customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
    customerLineMapper.afterPropertiesSet()

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

The main problem here is that FlatFileItemReader assumes you have line breaks, which you don't. The clearest solution to me is to copy/paste the class and swap out the readLine() method with one that takes in the appropriate number of characters. Unfortunately, because much of the class is private, you can't easily extend and override.

package org.springframework.batch.item.file;

import java.io.BufferedReader;
import java.io.IOException;
import java.nio.charset.Charset;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.ReaderNotOpenException;
import org.springframework.batch.item.file.separator.RecordSeparatorPolicy;
import org.springframework.batch.item.file.separator.SimpleRecordSeparatorPolicy;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.Resource;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;

/**
 * Modified version of {@link FlatFileItemWriter} which reads in mainframe style files with fixed width and no line breaks.
 * Still a restartable {@link ItemReader} that reads lines from input {@link #setResource(Resource)}. Line is defined by the
 * {@link #setRecordSeparatorPolicy(RecordSeparatorPolicy)} and mapped to item using {@link #setLineMapper(LineMapper)}.
 * If an exception is thrown during line mapping it is rethrown as {@link FlatFileParseException} adding information
 * about the problematic line and its line number.
 *
 * @author Robert Kasanicky
 * @author Dean Clark
 */
public class FixedLengthFlatFileItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements ResourceAwareItemReaderItemStream<T>, InitializingBean {

    private static final Log logger = LogFactory.getLog(FlatFileItemReader.class);

    // default encoding for input files
    public static final String DEFAULT_CHARSET = Charset.defaultCharset().name();

    private RecordSeparatorPolicy recordSeparatorPolicy = new SimpleRecordSeparatorPolicy();

    private Resource resource;

    private BufferedReader reader;

    private int lineCount = 0;

    private String[] comments = new String[] { "#" };

    private boolean noInput = false;

    private String encoding = DEFAULT_CHARSET;

    private LineMapper<T> lineMapper;

    private int linesToSkip = 0;

    private LineCallbackHandler skippedLinesCallback;

    private boolean strict = true;

    private BufferedReaderFactory bufferedReaderFactory = new DefaultBufferedReaderFactory();

    // CHANGE: Added a variable to store Line Length
    private Integer lineLength;
    
    public FixedLengthFlatFileItemReader() {
        setName(ClassUtils.getShortName(FlatFileItemReader.class));
    }

    /**
    * In strict mode the reader will throw an exception on
    * {@link #open(org.springframework.batch.item.ExecutionContext)} if the input resource does not exist.
    * @param strict <code>true</code> by default
    */
    public void setStrict(final boolean strict) {
        this.strict = strict;
    }

    /**
    * @param skippedLinesCallback will be called for each one of the initial skipped lines before any items are read.
    */
    public void setSkippedLinesCallback(final LineCallbackHandler skippedLinesCallback) {
        this.skippedLinesCallback = skippedLinesCallback;
    }

    /**
    * Public setter for the number of lines to skip at the start of a file. Can be used if the file contains a header
    * without useful (column name) information, and without a comment delimiter at the beginning of the lines.
    *
    * @param linesToSkip the number of lines to skip
    */
    public void setLinesToSkip(final int linesToSkip) {
        this.linesToSkip = linesToSkip;
    }

    /**
    * Setter for line mapper. This property is required to be set.
    * @param lineMapper maps line to item
    */
    public void setLineMapper(final LineMapper<T> lineMapper) {
        this.lineMapper = lineMapper;
    }

    /**
    * Setter for the encoding for this input source. Default value is {@link #DEFAULT_CHARSET}.
    *
    * @param encoding a properties object which possibly contains the encoding for this input file;
    */
    public void setEncoding(final String encoding) {
        this.encoding = encoding;
    }

    /**
    * Factory for the {@link BufferedReader} that will be used to extract lines from the file. The default is fine for
    * plain text files, but this is a useful strategy for binary files where the standard BufferedReaader from java.io
    * is limiting.
    *
    * @param bufferedReaderFactory the bufferedReaderFactory to set
    */
    public void setBufferedReaderFactory(final BufferedReaderFactory bufferedReaderFactory) {
        this.bufferedReaderFactory = bufferedReaderFactory;
    }

    /**
    * Setter for comment prefixes. Can be used to ignore header lines as well by using e.g. the first couple of column
    * names as a prefix.
    *
    * @param comments an array of comment line prefixes.
    */
    public void setComments(final String[] comments) {
        this.comments = new String[comments.length];
        System.arraycopy(comments, 0, this.comments, 0, comments.length);
    }

    /**
    * Public setter for the input resource.
    */
    @Override
    public void setResource(final Resource resource) {
        this.resource = resource;
    }

    /**
    * Public setter for the recordSeparatorPolicy. Used to determine where the line endings are and do things like
    * continue over a line ending if inside a quoted string.
    *
    * @param recordSeparatorPolicy the recordSeparatorPolicy to set
    */
    public void setRecordSeparatorPolicy(final RecordSeparatorPolicy recordSeparatorPolicy) {
        this.recordSeparatorPolicy = recordSeparatorPolicy;
    }

    /**
    * @return string corresponding to logical record according to
    * {@link #setRecordSeparatorPolicy(RecordSeparatorPolicy)} (might span multiple lines in file).
    */
    @Override
    protected T doRead() throws Exception {
        if (noInput) {
            return null;
        }

        final String line = readLine();

        if (line == null) {
            return null;
        } else {
            try {
                return lineMapper.mapLine(line, lineCount);
            } catch (final Exception ex) {
                throw new FlatFileParseException("Parsing error at line: " + lineCount + " in resource=["
                        + resource.getDescription() + "], input=[" + line + "]", ex, line, lineCount);
            }
        }
    }

    /**
    * @return next line (skip comments).getCurrentResource
    */
    // CHANGE: Modified readLine() to pull in a set number of characters
    private String readLine() {

        if (reader == null) {
            throw new ReaderNotOpenException("Reader must be open before it can be read.");
        }

        String line = null;

        try {
            
            // CHANGE: READ IN LINE BASED ON LINE LENGTH
            final char[] chars = new char[lineLength + 1];
            
            final int charsRead = reader.read(chars, 0, lineLength);
            if (charsRead <= 10) {
                noInput = true;
                return null;
            }
            line = new String(chars);
            // END CHANGE: READ IN LINE BASED ON LINE LENGTH
            
            lineCount++;
            while (isComment(line)) {
                line = reader.readLine();
                if (line == null) {
                    return null;
                }
                lineCount++;
            }

            line = applyRecordSeparatorPolicy(line);
        } catch (final IOException e) {
            // Prevent IOException from recurring indefinitely
            // if client keeps catching and re-calling
            noInput = true;
            throw new NonTransientFlatFileException("Unable to read from resource: [" + resource + "]", e, line,
                    lineCount);
        }
        return line;
    }

    private boolean isComment(final String line) {
        for (final String prefix : comments) {
            if (line.startsWith(prefix)) {
                return true;
            }
        }
        return false;
    }

    @Override
    protected void doClose() throws Exception {
        lineCount = 0;
        if (reader != null) {
            reader.close();
        }
    }

    @Override
    protected void doOpen() throws Exception {
        Assert.notNull(resource, "Input resource must be set");
        Assert.notNull(recordSeparatorPolicy, "RecordSeparatorPolicy must be set");

        noInput = true;
        if (!resource.exists()) {
            if (strict) {
                throw new IllegalStateException("Input resource must exist (reader is in 'strict' mode): " + resource);
            }
            logger.warn("Input resource does not exist " + resource.getDescription());
            return;
        }

        if (!resource.isReadable()) {
            if (strict) {
                throw new IllegalStateException("Input resource must be readable (reader is in 'strict' mode): "
                        + resource);
            }
            logger.warn("Input resource is not readable " + resource.getDescription());
            return;
        }

        reader = bufferedReaderFactory.create(resource, encoding);
        for (int i = 0; i < linesToSkip; i++) {
            final String line = readLine();
            if (skippedLinesCallback != null) {
                skippedLinesCallback.handleLine(line);
            }
        }
        noInput = false;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Assert.notNull(lineMapper, "LineMapper is required");
        // CHANGE: Added an assertion to verify Line Length was provided
        Assert.notNull(lineLength, "Line length is required");
    }

    @Override
    protected void jumpToItem(final int itemIndex) throws Exception {
        for (int i = 0; i < itemIndex; i++) {
            readLine();
        }
    }

    private String applyRecordSeparatorPolicy(String line) throws IOException {

        String record = line;
        while ((line != null) && !recordSeparatorPolicy.isEndOfRecord(record)) {
            line = this.reader.readLine();
            if (line == null) {
                if (StringUtils.hasText(record)) {
                    // A record was partially complete since it hasn't ended but
                    // the line is null
                    throw new FlatFileParseException("Unexpected end of file before record complete", record, lineCount);
                } else {
                    // Record has no text but it might still be post processed
                    // to something (skipping preProcess since that was already
                    // done)
                    break;
                }
            } else {
                lineCount++;
            }
            record = recordSeparatorPolicy.preProcess(record) + line;
        }

        return recordSeparatorPolicy.postProcess(record);

    }

    // CHANGE: Added a setter for Line Length
    public void setLineLength(final Integer lineLength) {
        this.lineLength = lineLength;
    }
    
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...