There are two questions in your question: first, you want to know how to group the items together and second how they are processed.
In order to group them, you could create a group reader as Luca suggested or something like:
public class GroupReader<I> implements ItemReader<List<I>>{
private SingleItemPeekableItemReader<I> reader;
private ItemReader<I> peekReaderDelegate;
public void setReader(ItemReader<I> reader) {
peekReaderDelegate = reader;
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(peekReaderDelegate, "The 'itemReader' may not be null");
this.reader= new SingleItemPeekableItemReader<I>();
this.reader.setDelegate(delegateReader);
}
@Override
public List<I> read() throws Exception {
State state = State.NEW;
List<I> group = null;
I item = null;
while (state != State.COMPLETE) {
item = reader.read();
switch (state) {
case NEW: {
if (item == null) {
// end reached
state = State.COMPLETE;
break;
}
group = new ArrayList<I>();
group.add(item);
state = State.READING;
I nextItem = reader.peek();
if (isItAKeyChange(item, nextItem)) {
state = State.COMPLETE;
}
break;
}
case READING: {
group.add(item);
// peek and check if there the peeked entry has a new date
I nextItem = peekEntry();
if (isItAKeyChange(item, nextItem)) {
state = State.COMPLETE;
}
break;
}
default: {
throw new org.springframework.expression.ParseException(groupCounter, "ParsingError: Reader is in an invalid state");
}
}
}
return group;
}
}
For every key, this reader will return a list with all elements matching this key. Therefore, the grouping ist done directly in the reader.
You cannot do that with a processor, as you described.
Your second question about multithreading.
Now, using a step does not necessarily mean, that the step is processed with several threads.
In order to do that, you need set an AsyncTaskExecutor and you have to set the throttle limit.
But if you do that, your reader must be threadsafe, or otherwise your grouping won't work. You could do that by simply defining the read method above as synchronized.
Another way could be to write a small SynchronizedWrapperReader, as suggested in this question: Parellel Processing Spring Batch StaxEventItemReader
Please note, depending on your target you are writing to, you probably also have to synchronize the writer, and if necessary to reorder the result.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…