Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
112 views
in Technique[技术] by (71.8m points)

python - Function as a parameter in celery task

I have an app, where I want to use multiprocessing with celery. I get errors every time I try to run it, so now I try to make a simple version just to reach normal process of working.

celery_module/my_celery.py:

from celery import Celery

app1 = Celery('cel_tasks',
              broker='redis://localhost:6379/0',
              backend='redis://localhost',
              include=['celery_module.tasks.my_tasks'],
              )

app1.conf.update(
    task_serializer='pickle',
    accept_content=['pickle', 'json']
)

celery_module/tasks/my_tasks.py:

from celery_module.my_celery import *

@app1.task
def do_simple(func):
    return func()*10

src/celery_launch.py:

import celery_module.tasks.my_tasks
from src.func_serialization import deserialize_function

    def start_simple(des_func):
        des_func = deserialize_function(des_func)
        new_func = celery_module.tasks.my_tasks.do_simple.delay(des_func)
        return new_func

src/func_serialization.py:

import marshal
import types


def serialize_function(function):
    serialized_func = marshal.dumps(function.__code__)
    return serialized_func


def deserialize_function(function_obj):
    func_repr = marshal.loads(function_obj)
    deserialized_function = types.FunctionType(func_repr, globals())
    return deserialized_function

src/app.py:

from src.celery_launch import start_simple
from src.func_serialization import serialize_function

@staticmethod
def simple_func():
    x = 5
    return x*10

def some_function(self):  #here is some logic of my app basically, but now here is only test function, just because it's easier to call the func from UI
    start_simple(serialize_function(self.simple_func))

So I call the some_function from app.py. As I pass a function in a start_simple(), I serialize it. Then start_simple() deserializes my simple_func() and sends it as a parameter to a celery task do_simple.

So the error I get is: kombu.exceptions.EncodeError: Can't pickle <function simple_func at 0x7f3066806d30>: attribute lookup simple_func on src.func_serialization failed

How could I fix it?

question from:https://stackoverflow.com/questions/65845657/function-as-a-parameter-in-celery-task

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...