from kafka import KafkaConsumer
import json
import io
if __name__ == '__main__':
# consumer = KafkaConsumer(
# 'ldt_lm_mytable',
# bootstrap_servers = 'localhost:9092',
# auto_offset_reset = 'earliest',
# group_id = 'consumer_group_a')
KAFKA_HOSTS = ['kafka:9092']
KAFKA_VERSION = (0, 10)
topic = "ldt_lm_mytable"
consumer = KafkaConsumer(topic, bootstrap_servers=KAFKA_HOSTS, api_version=KAFKA_VERSION)
for msg in consumer:
print('Lead = {}'.format(json.loads(msg.value)))
There is nothing printing. I am using avro converter when producing data into topic (Debezium). I've tried some converters from internet. But those are not working. One of those is like this
bytes_reader = io.BytesIO(consumer)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
decoded_data = reader.read(decoder)
In this converter from where I will get that 'schema' variable's value? How do I load that 'avro' package? And that 'io.BytesIO' giving me an error like
Traceback (most recent call last):
File "consumer.py", line 19, in <module>
bytes_reader = io.BytesIO(consumer)
TypeError: a bytes-like object is required, not 'KafkaConsumer'
Thanks in advance!
question from:
https://stackoverflow.com/questions/65919750/convert-avro-serialized-messages-into-json-using-python-consumer 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…