Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
878 views
in Technique[技术] by (71.8m points)

python - Celery: is there a way to write custom JSON Encoder/Decoder?

I have some objects I want to send to celery tasks on my application. Those objects are obviously not json serializable using the default json library. Is there a way to make celery serialize/de-serialize those objects with custom JSON Encoder/Decoder?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

A bit late here, but you should be able to define a custom encoder and decoder by registering them in the kombu serializer registry, as in the docs: http://docs.celeryproject.org/en/latest/userguide/calling.html#serializers.

For example, the following is a custom datetime serializer/deserializer (subclassing python's builtin json module) for Django:


myjson.py (put it in the same folder of your settings.py file)

import json
from datetime import datetime
from time import mktime

class MyEncoder(json.JSONEncoder):   
    def default(self, obj):
        if isinstance(obj, datetime):
            return {
                '__type__': '__datetime__', 
                'epoch': int(mktime(obj.timetuple()))
            }
        else:
            return json.JSONEncoder.default(self, obj)

def my_decoder(obj):
    if '__type__' in obj:
        if obj['__type__'] == '__datetime__':
            return datetime.fromtimestamp(obj['epoch'])
    return obj

# Encoder function      
def my_dumps(obj):
    return json.dumps(obj, cls=MyEncoder)

# Decoder function
def my_loads(obj):
    return json.loads(obj, object_hook=my_decoder)


settings.py

# Register your new serializer methods into kombu
from kombu.serialization import register
from .myjson import my_dumps, my_loads

register('myjson', my_dumps, my_loads, 
    content_type='application/x-myjson',
    content_encoding='utf-8') 

# Tell celery to use your new serializer:
CELERY_ACCEPT_CONTENT = ['myjson']
CELERY_TASK_SERIALIZER = 'myjson'
CELERY_RESULT_SERIALIZER = 'myjson'

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...