Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
418 views
in Technique[技术] by (71.8m points)

jpa with https request multithreading spring

I'm working with spring JPA and HTTP post request, fetching the data row by row then post the data into HTTP request to API and its worked fine with me, but here im working with bulk number of data, so i have to use multi-threading but im new with java and spring how do I implement to work with 10 thread and each one of them reads 1k per each time in parallel that here ?

i have read something about multithreading for 10 threads each thread of them read 1k row per each time, I have around 10 million records in my database

AccessingDataJpaApplication class :

@SpringBootApplication
public class AccessingDataJpaApplication implements CommandLineRunner {

    private static final Logger logger = LoggerFactory.getLogger(AccessingDataJpaApplication.class);
    @Autowired

    private Bulk_repositoryRepository bulk_repositoryRepository;


    public static void main(String[] args) {
        SpringApplication.run(AccessingDataJpaApplication.class);
    }
    Date currentDate = new Date();

    @Override
    public void run(String... args) throws Exception {
        RestTemplate restTemplate = new RestTemplate();
        HttpHeaders headers = new HttpHeaders();
        headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON));
        headers.setBasicAuth("user", "pass");

        while(true) {
            Date currentDate = new Date();
            logger.info("Just Started"); 
            for (Bulk_repository churnss : bulk_repositoryRepository.findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(0,2,currentDate,currentDate)) {
                System.out.print(churnss);
                logger.info(churnss.toString());
                AddOfferRequest AddOffer = new AddOfferRequest("113", churnss.getMsisdn(),churnss.getParam1());

                logger.info(AddOffer.toString());
                HttpEntity<AddOfferRequest> entity = new HttpEntity<AddOfferRequest>(AddOffer,headers);

                ResponseEntity<String> responseEntity = restTemplate.exchange(
                        "api link", HttpMethod.POST, entity, String.class);

                if(responseEntity.getStatusCode() == HttpStatus.OK){
                    String response = responseEntity.getBody();
                    churnss.setStatus(1);
                    churnss.setProcessDate(new Date());
                    churnss.setFulfilment_status(response);
                    logger.info(churnss.toString() + ", Response: " + response);
                    bulk_repositoryRepository.save(churnss);
                }else {
                    logger.warn("Record Id: " + churnss.getId() + ", Http Failed Response: " + responseEntity.getStatusCode());
                }
            }
            Thread.sleep(1000);
        }
    }

}

Bulk_repository class:

@Entity
@Table(name = "BULK_REPOSITORY")
public class Bulk_repository {

   @Id
   @GeneratedValue(strategy=GenerationType.AUTO)
   @Column(name = "id")
   private long id;

   @Column(name = "msisdn")
   private String msisdn;

   @Column(name = "camp_start_date")   
   private Date campStartDate;

   @Column(name = "camp_end_date")
   private Date campEndDate;

   @Column(name = "camp_type")
   private int campType;

   @Column(name = "camp_cd")
   private String camp_cd;

   @Column(name = "status")
   private int status;

   @Column(name = "process_date")
   private Date processDate;

   @Column(name = "entry_date")
   private Date entryDate;

   @Column(name = "entry_user")
   private String entry_user;

   @Column(name = "param1")
   private String param1;

   @Column(name = "param2")
   private String param2;

   @Column(name = "param3")
   private String param3;

   @Column(name = "param4")
   private String param4;

   @Column(name = "param5")
   private String param5;

   @Column(name = "error_desc")
   private String error_desc;

   @Column(name = "fulfilment_status")
   private int fulfilment_status;
   ##then getter and setters and tostring

Bulk_repositoryRepository class :

public interface Bulk_repositoryRepository extends CrudRepository<Bulk_repository, Long> {

      Date today = new Date();
      List<Bulk_repository>findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(int status, int campType,Date today0, Date today1);
      Bulk_repository findById(long id);
}

AddOfferRequest class :

public class AddOfferRequest {

    private String ChannelID="113";
    private String MSISDN;
    private String ServiceID;

    public AddOfferRequest() {
    }
    public AddOfferRequest(String channelID,String mSISDN,String serviceID ) {
        this.MSISDN = mSISDN;
        this.ServiceID = serviceID;

    }
    ## then getter and setter and tostring

i have created AsyncConfiguration class:

package com.example.accessingdatajpa;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;


@Configuration
@EnableAsync
public class AsyncConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncConfiguration.class);
    @Bean (name = "taskExecutor")
    public Executor taskExecutor() {
        LOGGER.debug("Creating Async Task Executor");
        final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("CarThread-");
        executor.initialize();
        return executor;
    }
}

but till now i can't undertand how can combaine the findby and http post with multithreading

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Rewrite your code. Instead of a List<Bulk_repository> return a Stream<Bulk_repository>. This will lazily load the records from the database, instead of trying to do everything at once.

Then use the TaskExecutor to execute the different requests per thread, just give a task to it and it will be executed when there is a free thread.

@SpringBootApplication
public class AccessingDataJpaApplication implements CommandLineRunner {

    private static final Logger logger = LoggerFactory.getLogger(AccessingDataJpaApplication.class);

    @Autowired
    private Bulk_repositoryRepository bulk_repositoryRepository;

    @Autowired
    private AsyncTaskExecutor executor;

    @Autowired
    private RestTemplate rest;

    public static void main(String[] args) {
        SpringApplication.run(AccessingDataJpaApplication.class);
    }

    @Override
    public void run(String... args) throws Exception {
        Date currentDate = new Date();

        Stream< Bulk_repository> results = Bulk_repository churnss : bulk_repositoryRepository.findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(0,2,currentDate,currentDate);

        results.forEach(it -> executor.submit(this.process(it)));
        Thread.sleep(1000);
    }

    private void process(RestTemplate rest, Bulk_repository churnss) {
      AddOfferRequest AddOffer = new AddOfferRequest("113", churnss.getMsisdn(),churnss.getParam1());

      HttpEntity<AddOfferRequest> entity = new HttpEntity<AddOfferRequest>(AddOffer,headers);

      try {
        ResponseEntity<String> responseEntity = restTemplate.exchange(
                        "api link", HttpMethod.POST, entity, String.class);
         if(responseEntity.getStatusCode() == HttpStatus.OK){
           String response = responseEntity.getBody();
           churnss.setStatus(1);
           churnss.setProcessDate(new Date());
           churnss.setFulfilment_status(response);
           bulk_repositoryRepository.save(churnss);
         }else {
           logger.warn("Record Id: {}, Http Failed Response: {}",churnss.getId(), responseEntity.getStatusCode());
                }
      } catch (RestClientException rce) {
          logger.warn("Record Id: {} Http Failed. ", churnss.getId(), rce);
      }               
    }

}

NOTE: This was typed from the top of my head and isn't tested. However should provide some guidance.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...