I got it working with the following libraries:
pip install confluent-avro
pip install kafka-python
And the code:
from kafka import KafkaConsumer
from confluent_avro import AvroKeyValueSerde, SchemaRegistry
from confluent_avro.schema_registry import HTTPBasicAuth
KAFKA_TOPIC = "SOME-TOPIC"
registry_client = SchemaRegistry(
"https://...",
HTTPBasicAuth("USER", "PASSWORD"),
headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
)
avroSerde = AvroKeyValueSerde(registry_client, KAFKA_TOPIC)
consumer = KafkaConsumer(KAFKA_TOPIC,
other connection parameters,
auto_offset_reset= 'earliest')
for msg in consumer:
v = avroSerde.value.deserialize(msg.value)
k = avroSerde.key.deserialize(msg.key)
print(msg.offset, msg.partition, k, v)
break
Reference: https://pypi.org/project/confluent_avro/
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…