I am working on Spring Batch and Partition using the JdbcPagingItemReader
, but I am only getting half records.
If I am expecting 100 thousand records instead getting only 50 thousand. What's wrong is happening
How to use Nested or Inner Query in OraclePagingQueryProvider
?
My Original Query
SELECT q.*
FROM (SELECT DEPT.ID id,
DEPT.CREATOR createdby,
DEPT.CREATE_DATE createddate,
DEPT.UPDATED_BY updatedby,
DEPT.LAST_UPDATE_DATE updateddate,
DEPT.NAME name,
DEPT.STATUS status,
statusT.DESCR statusdesc,
REL.ROWID_DEPT1 rowidDEPT1,
REL.ROWID_DEPT2 rowidDEPT2,
DEPT2.DEPT_FROM_VAL parentcid,
DEPT2.NAME parentname,
ROW_NUMBER() OVER (PARTITION BY DEPT.CREATE_DATE ORDER BY DEPT.ID) AS rn
FROM TEST.DEPT_TABLE DEPT
LEFT JOIN TEST.STATUS_TABLE statusT
ON DEPT.STATUS = statusT.STATUS
LEFT JOIN TEST.C_REL_DEPT rel
ON DEPT.ID = REL.ROWID_DEPT2
LEFT JOIN TEST.DEPT_TABLE DEPT2
ON REL.ROWID_DEPT1 = DEPT2.ID) q
WHERE rn BETWEEN ? AND ?; // ? will be fromValue to toValue
Code:
@Configuration
public class CustomerJob2 {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public CustomerPartitioner customerPartitioner() {
return new CustomerPartitioner();
}
@Bean("readCustomerJob")
@Primary
public Job readCustomerJob() throws Exception {
return jobBuilderFactory.get("readCustomerJob")
.incrementer(new RunIdIncrementer())
.start(customerStepOne())
.build();
}
@Bean
public Step customerStepOne() throws Exception {
return stepBuilderFactory.get("customerStepOne")
.partitioner(slaveStep().getName(), customerPartitioner())
.step(slaveStep())
.gridSize(5)
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
// slave step
@Bean
public Step slaveStep() throws Exception {
return stepBuilderFactory.get("slaveStep")
.<Customer, Customer>chunk(3000)
.reader(pagingItemReader(null, null))
.writer(customerWriter())
.listener(customerStepOneExecutionListener())
.build();
}
// Reader
@Bean(destroyMethod = "")
@StepScope
public JdbcPagingItemReader<Customer> pagingItemReader(
@Value("#{stepExecutionContext['fromValue']}") Long fromValue,
@Value("#{stepExecutionContext['toValue']}") Long toValue) throws Exception {
System.out.println(" FROM = "+ fromValue + " TO VALUE ="+ toValue);
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setRowMapper(new CustomerRowMapper());
reader.setSaveState(false);
reader.setPageSize(3000);
// Sort Keys
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
OraclePagingQueryProvider queryProvider = new OraclePagingQueryProvider();
queryProvider.setSelectClause("q.* FROM ( SELECT Row_Number() OVER (ORDER BY party.ROWID_OBJECT) MyRow, "
+ " OTHER coumns in the Query");
queryProvider.setFromClause("**** "
+ "LEFT JOIN ********* "
+ "LEFT JOIN ********* "
+ "LEFT JOIN ********* ) q ");
queryProvider.setWhereClause("MyRow BETWEEN "+ fromValue + " AND "+ toValue);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
reader.afterPropertiesSet();
return reader;
}
@Bean
public CustomerWriter customerWriter() {
return new CustomerWriter();
}
}
Partition Logic
public class CustomerPartitioner implements Partitioner{
private static final String CUSTOMER_CNT = "SELECT COUNT(party.IS) ***** COMPLEX JOIN";
@Autowired
@Qualifier("edrJdbcTemplate")
private JdbcTemplate jdbcTemplate;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Long custCnt = jdbcTemplate.queryForObject(CUSTOMER_CNT, Long.class);
int toValue = 0;
int fromValue = 0;
int increment = 3000;
int counter = 0;
int temp = 0;
Map<String, ExecutionContext> partitionMap = new HashMap<>();
for (int i = 0; i < custCnt; i += increment) { // custCnt fives 100 thousand
counter++;
temp = i;
if(i == 0) {
fromValue = temp;
toValue = increment;
}else {
fromValue = toValue + 1;
toValue = fromValue + increment - 1;
}
ExecutionContext context = new ExecutionContext();
context.put("fromValue", fromValue);
context.put("toValue", toValue);
partitionMap.put("Thread--" + counter, context);
}
return partitionMap;
}
}
Here are the logs -
2020-06-22 22:44:14.750 INFO 15752 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=readCustomerJob]] launched with th
e following parameters: [{JobID=1592846054670, date=1592846054670}]
2020-06-22 22:44:14.790 INFO 15752 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [customerStepOne]
Cust Count = 1035483
FROM = 6001 TO VALUE =9000
FROM = 0 TO VALUE =3000
FROM = 3001 TO VALUE =6000
FROM = 9001 TO VALUE =12000
2020-06-22 22:44:15.874 DEBUG 15752 --- [cTaskExecutor-4] o.s.b.i.database.JdbcPagingItemReader : Reading page 0
2020-06-22 22:44:15.874 DEBUG 15752 --- [cTaskExecutor-1] o.s.b.i.database.JdbcPagingItemReader : Reading page 0
2020-06-22 22:44:15.874 DEBUG 15752 --- [cTaskExecutor-2] o.s.b.i.database.JdbcPagingItemReader : Reading page 0
2020-06-22 22:44:15.874 DEBUG 15752 --- [cTaskExecutor-3] o.s.b.i.database.JdbcPagingItemReader : Reading page 0
See Question&Answers more detail:
os