Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
504 views
in Technique[技术] by (71.8m points)

apache kafka - Convert avro serialized messages into json using python consumer

from kafka import KafkaConsumer
import json
import io

if __name__ == '__main__':

  # consumer = KafkaConsumer(
  #     'ldt_lm_mytable',
  #     bootstrap_servers = 'localhost:9092',
  #     auto_offset_reset = 'earliest',
  #     group_id = 'consumer_group_a')

  KAFKA_HOSTS = ['kafka:9092']
  KAFKA_VERSION = (0, 10)
  topic = "ldt_lm_mytable"

  consumer = KafkaConsumer(topic, bootstrap_servers=KAFKA_HOSTS, api_version=KAFKA_VERSION)

  for msg in consumer:
    print('Lead = {}'.format(json.loads(msg.value)))

There is nothing printing. I am using avro converter when producing data into topic (Debezium). I've tried some converters from internet. But those are not working. One of those is like this

bytes_reader = io.BytesIO(consumer)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
decoded_data = reader.read(decoder)

In this converter from where I will get that 'schema' variable's value? How do I load that 'avro' package? And that 'io.BytesIO' giving me an error like

Traceback (most recent call last):
File "consumer.py", line 19, in <module>
bytes_reader = io.BytesIO(consumer)
TypeError: a bytes-like object is required, not 'KafkaConsumer'

Thanks in advance!

question from:https://stackoverflow.com/questions/65919750/convert-avro-serialized-messages-into-json-using-python-consumer

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Assuming the Debezium connector is using the standard io.confluent.connect.avro.AvroConverter in Kafka Connect then you need to use the Avro deserialiser that goes with the Confluent Schema Registry.

Here's an example consumer from here:

from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError


c = AvroConsumer({
    'bootstrap.servers': 'mybroker,mybroker2',
    'group.id': 'groupid',
    'schema.registry.url': 'http://127.0.0.1:8081'})

c.subscribe(['my_topic'])

while True:
    try:
        msg = c.poll(10)

    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break

    if msg is None:
        continue

    if msg.error():
        print("AvroConsumer error: {}".format(msg.error()))
        continue

    print(msg.value())

c.close()

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

56.9k users

...