Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
307 views
in Technique[技术] by (71.8m points)

python - Correct usage of threading (ThreadPoolExecutor) with Celery as scheduled task

I want to kindly ask you for a bit of advice. I have a python script in my Django project, which gets some objects from the database. These objects are cars - so it fetches an URL of an external site, gets car images, and then downloads it via request library, converts to WEBP format, applies watermark, and then saves these images with foreign relation to example_object. Also, some additional data are fetched with selenium. Because there are a lot of images, I am using threading to process this method faster. Here is the method:

from my_project.models import Image
from django.core.files.images import ImageFile

def process_images(example_object):
    data_selenium = requests_get_data(example_object.name)

    # update some data
    example_data1, created = ExampleData1.objects.get_or_create(name=data_selenium[1])
    example_data2, created = ExampleData2.objects.get_or_create(title=data_selenium[2])

    example_object.example_data1 = example_data1
    example_object.example_data2 = example_data2
    example_object.save()

    # Images download, convert
    down_images = download_images(data_selenium[0], example_object.x1, example_object.x2, example_object.x3, example_object.x4)
    images_webp = watermark_and_convert(down_images)

    # Images - insert all images of a example_object to DB
    for image in images_webp:
        img = Image(example_object=example_object)
        img.image.save(image.name, ImageFile(open(image, 'rb')), save=True)

    # Delete images in temp
    delete_images(down_images, images_webp)

    logging.info(f'example_object {example_object.name} updated')

As I said, I am using threading to process these task:

import concurrent.futures

TIME_BEGIN = datetime.datetime.now()


def task_process_images():
    setup_logging()
    cleanup()
    example_objects = MyModel.objects.filter(some_column=MyModel.Xxxx.YYYY)
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # futures = []
        for my_obj in example_objects:
            futures.append(executor.submit(process_images, my_obj))

    print(f'Finished in {datetime.datetime.now() - TIME_BEGIN}')

This works very well. I have made a decision - it will be great to process this as a scheduled task via Celery. And here my question comes - when I will create for example @shared_task for my task_process_images() method and start it - will threading work here properly? When I tried that, I can see, that only one ForkPoolWorker-8 is in Celery output. But it really processes more objects at one time. So I just not 100% sure, if I am doing right. I am starting my Celery instance via command from documentation of Celery:

celery -A tasks worker --loglevel=INFO

I will be glad for any advice.

Thank you.

question from:https://stackoverflow.com/questions/66047414/correct-usage-of-threading-threadpoolexecutor-with-celery-as-scheduled-task

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...