So I'm trying to run a big web scraping job (6m+ websites) with Python + Celery + Redis.
I'm running on a big box (ml.m5.16xlarge: 64 vCPU + 256 GB RAM) and I'm noticing an issue where the longer the workers run, the more that CPU usage goes up, and the slower it begins to process the data.
On the first batch of 100k sites, it averages between 20-30% CPU usage and takes ~4 minutes to complete. Second batch CPU usage jumps up to about 60-80% usage, and takes ~5 minutes to complete. By the third batch, it's solidly at 100% CPU usage, and get's considerably slower by every batch. By about the 20th batch, it can take anywhere between 20-40 minutes to complete a single batch whereas on the first it took 4 minutes.
I know that I'm doing something wrong with regards to how I setup the workers that is leading to this I just don't know what. Here is how I'm spawning the workers:
celery multi start 63 -A tasks -P gevent --autoscale=50,5
I tried running a single worker with way more gevent greenlets but that was taking far longer to run and was only using 1% of the CPU.
I'm only grabbing the landing page of each site, so my celery task/setup looks like this:
redis_url = 'redis://localhost:6379/0'
CELERYD_MAX_TASKS_PER_CHILD=20
celery = Celery('{}_SCRAPER'.format('FRONTPAGE_WEB'), broker=redis_url)
celery.conf.worker_max_tasks_per_child = CELERYD_MAX_TASKS_PER_CHILD
celery.conf.result_backend = redis_url
celery.conf.update(
task_soft_time_limit=60,
task_time_limit=80,
)
@celery.task(bind=True, name='scrape')
def scrape(self, url):
global scraper
try:
return {'url': url, 'pageText': get_text(scraper.get(url, timeout=10).text)}
except Exception as e:
return {'url': url, 'pageText': 'Error: {}'.format(str(e))}
Then the app loop looks like this:
from tasks import scrape, celery
batch_size = 100000
bottom_idx = 0
for idx in np.arange(bottom_idx+batch_size, len(urls)+1, batch_size):
# Reset tasks
tasks = []
# Batch urls
batch_urls = urls[bottom_idx:idx]
for url in batch_urls:
tasks.append(scrape.apply_async(args=[url]))
# Blocking until batch is done
task_state = [task.ready() for task in tasks]
while not all(task_state):
time.sleep(15)
task_state = [task.ready() for task in tasks]
print('
{} tasks completed. Batch #: {}, time: {}'.format(len([b for b in task_state if b]), idx, time.strftime('%Y-%m-%d %H:%M:%S')), end='')
# Compile data and write it out
batch = [task.result for task in tasks if not isinstance(task.result, Exception)]
s3.Bucket(bucket).put_object(Key='frontpages/batch-{}'.format(idx), Body=json.dumps(batch))
redis_conn.flushdb()
celery.control.purge()
# Set new bottom idx
bottom_idx = idx