I have an app, where I want to use multiprocessing with celery. I get errors every time I try to run it, so now I try to make a simple version just to reach normal process of working.
celery_module/my_celery.py:
from celery import Celery
app1 = Celery('cel_tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost',
include=['celery_module.tasks.my_tasks'],
)
app1.conf.update(
task_serializer='pickle',
accept_content=['pickle', 'json']
)
celery_module/tasks/my_tasks.py:
from celery_module.my_celery import *
@app1.task
def do_simple(func):
return func()*10
src/celery_launch.py:
import celery_module.tasks.my_tasks
from src.func_serialization import deserialize_function
def start_simple(des_func):
des_func = deserialize_function(des_func)
new_func = celery_module.tasks.my_tasks.do_simple.delay(des_func)
return new_func
src/func_serialization.py:
import marshal
import types
def serialize_function(function):
serialized_func = marshal.dumps(function.__code__)
return serialized_func
def deserialize_function(function_obj):
func_repr = marshal.loads(function_obj)
deserialized_function = types.FunctionType(func_repr, globals())
return deserialized_function
src/app.py:
from src.celery_launch import start_simple
from src.func_serialization import serialize_function
@staticmethod
def simple_func():
x = 5
return x*10
def some_function(self): #here is some logic of my app basically, but now here is only test function, just because it's easier to call the func from UI
start_simple(serialize_function(self.simple_func))
So I call the some_function from app.py. As I pass a function in a start_simple(), I serialize it. Then start_simple() deserializes my simple_func() and sends it as a parameter to a celery task do_simple.
So the error I get is:
kombu.exceptions.EncodeError: Can't pickle <function simple_func at 0x7f3066806d30>: attribute lookup simple_func on src.func_serialization failed
How could I fix it?
question from:
https://stackoverflow.com/questions/65845657/function-as-a-parameter-in-celery-task 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…