Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
654 views
in Technique[技术] by (71.8m points)

python - Callback for celery apply_async

I use celery in my application to run periodic tasks. Let's see simple example below

from myqueue import Queue
@perodic_task(run_every=timedelta(minutes=1))
def process_queue():
    queue = Queue()
    uid, questions = queue.pop()
    if uid is None:
        return

    job = group(do_stuff(q) for q in questions)
    job.apply_async()

def do_stuff(question):
    try:
        ...
    except:
        ...
        raise

As you can see in the example above, i use celery to run async task, but (since it's a queue) i need to do queue.fail(uid) in case of exception in do_stuff or queue.ack(uid) otherwise. In this situation it would be very clear and usefull to have some callback from my task in both cases - on_failure and on_success.

I saw some documentation, but never seen practices of using callbacks with apply_async. Is it possible to do that?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Subclass the Task class and overload the on_success and on_failure functions:

class CallbackTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        pass

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        pass


@celery.task(base=CallbackTask)  # this does the trick
def add(x, y):
    return x + y

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...