Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
481 views
in Technique[技术] by (71.8m points)

multithreading - Celery CPU usage creeping up over time with many workers

So I'm trying to run a big web scraping job (6m+ websites) with Python + Celery + Redis.

I'm running on a big box (ml.m5.16xlarge: 64 vCPU + 256 GB RAM) and I'm noticing an issue where the longer the workers run, the more that CPU usage goes up, and the slower it begins to process the data.

On the first batch of 100k sites, it averages between 20-30% CPU usage and takes ~4 minutes to complete. Second batch CPU usage jumps up to about 60-80% usage, and takes ~5 minutes to complete. By the third batch, it's solidly at 100% CPU usage, and get's considerably slower by every batch. By about the 20th batch, it can take anywhere between 20-40 minutes to complete a single batch whereas on the first it took 4 minutes.

I know that I'm doing something wrong with regards to how I setup the workers that is leading to this I just don't know what. Here is how I'm spawning the workers:

celery multi start 63 -A tasks -P gevent --autoscale=50,5

I tried running a single worker with way more gevent greenlets but that was taking far longer to run and was only using 1% of the CPU.

I'm only grabbing the landing page of each site, so my celery task/setup looks like this:

redis_url = 'redis://localhost:6379/0'
CELERYD_MAX_TASKS_PER_CHILD=20

celery = Celery('{}_SCRAPER'.format('FRONTPAGE_WEB'), broker=redis_url)
celery.conf.worker_max_tasks_per_child = CELERYD_MAX_TASKS_PER_CHILD
celery.conf.result_backend = redis_url
celery.conf.update(
    task_soft_time_limit=60,
    task_time_limit=80,
)    
@celery.task(bind=True, name='scrape')
def scrape(self, url):
    global scraper
    try:
        return {'url': url, 'pageText': get_text(scraper.get(url, timeout=10).text)}
    except Exception as e:
        return {'url': url, 'pageText': 'Error: {}'.format(str(e))}

Then the app loop looks like this:

from tasks import scrape, celery
batch_size = 100000
bottom_idx = 0
for idx in np.arange(bottom_idx+batch_size, len(urls)+1, batch_size):
    # Reset tasks
    tasks = []
    
    # Batch urls 
    batch_urls = urls[bottom_idx:idx]
    for url in batch_urls:
        tasks.append(scrape.apply_async(args=[url]))
        
    # Blocking until batch is done
    task_state = [task.ready() for task in tasks]
    while not all(task_state):
        time.sleep(15)
        task_state = [task.ready() for task in tasks]
        print('
{} tasks completed. Batch #: {}, time: {}'.format(len([b for b in task_state if b]), idx, time.strftime('%Y-%m-%d %H:%M:%S')), end='')
        
    
    # Compile data and write it out
    batch = [task.result for task in tasks if not isinstance(task.result, Exception)]
    
    s3.Bucket(bucket).put_object(Key='frontpages/batch-{}'.format(idx), Body=json.dumps(batch))
    redis_conn.flushdb()
    celery.control.purge()
    
    
    # Set new bottom idx
    bottom_idx = idx

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...