Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
646 views
in Technique[技术] by (71.8m points)

python - Can't get kwargs data in function in Django Celery Beat

I'm implementing a reminder module in the application using Django celery-beat, I'm creating cron tab in periodic tasks and passing dictionary in kwargs parameter. It's successfully saved in Django periodic task table but when the scheduled task runs on time and calls the mentioned function, it's not getting kwargs data and through the exception.

settings.py

INSTALLED_APPS = [    
    'django_celery_beat',
]

# CELERY STUFF
CELERY_BROKER_URL = "redis://localhost:6379"
CELERY_RESULT_BACKEND = "redis://localhost:6379"

CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'

init.py

from .celery import app as celery_app

all = ("celery_app",)

celery.py

import os

from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings_local")

app = Celery("baseapp")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

Here is the task scheduler function which is creating periodic tasks:

def task_scheduler(raw_data):
try:
    if raw_data["start"] and raw_data["reminder"]:
        reminder_time = datetime.datetime.strptime(raw_data["start"], '%Y-%m-%dT%H:%M:%S.%fZ') - datetime.timedelta(minutes=raw_data["reminder"])
        reminder, _ = CrontabSchedule.objects.get_or_create(
            minute=reminder_time.minute,
            hour=reminder_time.hour,
            day_of_week="*",
            day_of_month=reminder_time.day,
            month_of_year=reminder_time.month,
        )
        PeriodicTask.objects.update_or_create(
            name=raw_data["summary"],
            task="reminder_notification",
            crontab=reminder,
            expire_seconds=7200,             
            kwargs=json.dumps({'test132':'123'}),                

        )
except Exception as error:
    print(error)

Here is a reminder notification function which is called when the task runs on time:

@shared_task(name="reminder_notification")
def reminder_notification(*args, **kwargs):
    print("hello task")

Here are tasks in the database: enter image description here

Here is the error:

[2021-01-07 06:13:00,021: ERROR/Beat] Message Error: Couldn't apply scheduled task celery15: reminder_notification() got an unexpected keyword argument 'test132'
['  File "/home/usman/Documents/venv/bin/celery", line 8, in <module>
    sys.exit(main())
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/__main__.py", line 15, in main
    sys.exit(_main())
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/bin/celery.py", line 213, in main
    return celery(auto_envvar_prefix="CELERY")
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/click/decorators.py", line 21, in new_func
    return f(get_current_context(), *args, **kwargs)
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/bin/base.py", line 132, in caller
    return f(ctx, *args, **kwargs)
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/bin/worker.py", line 327, in worker
    worker.start()
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/worker/worker.py", line 203, in start
    self.blueprint.start(self)
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/bootsteps.py", line 365, in start
    return self.obj.start()
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/billiard/process.py", line 124, in start
    self._popen = self._Popen(self)
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/billiard/context.py", line 276, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/billiard/context.py", line 333, in _Popen
    return Popen(process_obj)
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/billiard/popen_fork.py", line 24, in __init__
    self._launch(process_obj)
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/billiard/popen_fork.py", line 79, in _launch
    code = process_obj._bootstrap()
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/billiard/process.py", line 327, in _bootstrap
    self.run()
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/beat.py", line 703, in run
    self.service.start(embedded_process=True)
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/beat.py", line 627, in start
    interval = self.scheduler.tick()
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/beat.py", line 339, in tick
    self.apply_entry(entry, producer=self.producer)
', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/beat.py", line 272, in apply_entry
    exc, traceback.format_stack(), exc_info=True)
']
Traceback (most recent call last):
  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/beat.py", line 386, in apply_async
    return task.apply_async(entry_args, entry_kwargs,
  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/app/task.py", line 526, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: reminder_notification() got an unexpected keyword argument 'test132'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/beat.py", line 269, in apply_entry
    result = self.apply_async(entry, producer=producer, advance=False)
  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/beat.py", line 394, in apply_async
    reraise(SchedulingError, SchedulingError(
  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/exceptions.py", line 104, in reraise
    raise value.with_traceback(tb)
  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/beat.py", line 386, in apply_async
    return task.apply_async(entry_args, entry_kwargs,
  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/app/task.py", line 526, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
celery.beat.SchedulingError: Couldn't apply scheduled task celery15: reminder_notification() got an unexpected keyword argument 'test132'

What am I doing wrong?


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

I'm getting kwargs data successfully.

Solution:

The current running worker does not get the updated changes from the function, So we need to stop the worker and run again.

celery -A baseapp worker --beat --scheduler django --loglevel=info

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...