Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.2k views
in Technique[技术] by (71.8m points)

kafka message key as key field/column in HDFS

So I use the debezium key.field.name in my MySQL source connector to add a field into my topic.

The message looks below after landing on topic.

{"id":20,"__PKtableowner":"reviewDB.review.search_user_02"}:{"before":null,"after":{"search_user_all_shards.Value":{"id":20,"name":{"string":"oliver"},"email":{"string":"[email protected]"},"department":{"string":"sales"},"modified":"2021-01-06T09:27:02Z"}},"source":{"version":"1.0.2.Final","connector":"mysql","name":"reviewDB","ts_ms":1609925222000,"snapshot":{"string":"false"},"db":"review","table":{"string":"search_user_02"},"server_id":1,"gtid":null,"file":"binlog.000002","pos":14630,"row":0,"thread":{"long":13},"query":null},"op":"c","ts_ms":{"long":1609925222956}}

Where in, key is

{"id":20,"__PKtableowner":"reviewDB.review.search_user_02"}

and value is

{"before":null,"after":{"search_user_all_shards.Value":{"id":20,"name":{"string":"oliver"},"email":{"string":"[email protected]"},"department":{"string":"sales"},"modified":"2021-01-06T09:27:02Z"}},"source":{"version":"1.0.2.Final","connector":"mysql","name":"reviewDB","ts_ms":1609925222000,"snapshot":{"string":"false"},"db":"review","table":{"string":"search_user_02"},"server_id":1,"gtid":null,"file":"binlog.000002","pos":14630,"row":0,"thread":{"long":13},"query":null},"op":"c","ts_ms":{"long":1609925222956}}

As part of my sink hdfsSinkConnector I need to fetch the message key "__PKtableowner":"reviewDB.review.search_user_02 as part of a column or field in hdfs or hive.

The only SMT I found is ValueToKey, but it seems it didn't fit my use case because it's fetching from the value and not from the message key. I've tried (InsertField, CreateKey, ExtractField, etc.) Almost all of the transformation you can find here but no luck. https://docs.confluent.io/platform/current/connect/transforms/valuetokey.html

I'm looking for a KeyToValue kind of SMT or if there are other workaround.

Below are my source and sink configurations. Source:

{
  "name": "REVIEW__MYSQL__search_user__source",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.history.kafka.topic": "review.search_user_logs",
    "database.history.consumer.max.block.ms": "3000",
    "include.schema.changes": "false",
    "database.history.consumer.session.timeout.ms": "30000",
    "database.history.kafka.consumer.group": "compose-connect-group",
    "snapshot.new.tables": "parallel",
    "database.history.kafka.sasl.mechanism": "GSSAPI",
    "database.whitelist": "review",
    "database.history.producer.sasl.mechanism": "GSSAPI",
    "database.user": "root",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "time.precision.mode": "connect",
    "database.server.name": "reviewDB",
    "database.port": "3306",
    "database.history.consumer.heartbeat.interval.ms": "1000",
    "min.row.count.to.stream.results": "0",
    "database.hostname": "mysql",
    "database.password": "example",
    "database.history.consumer.sasl.mechanism": "GSSAPI",
    "snapshot.mode": "when_needed",
    "table.whitelist": "review.search_user_(.*)",
    "transforms": "Reroute",
    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "reviewDB.review.search_user_(.*)",
    "transforms.Reroute.topic.replacement": "search_user_all_shards",
    "transforms.Reroute.key.field.name": "__PKtableowner"
  }
}

Sink

{ "name": "REVIEW__MYSQL__search_user__sink",
  "config":
  {
      "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
      "topics.dir": "/_incr_files",
      "flush.size": 1,
      "tasks.max": 1,
      "timezone": "UTC",
      "rotate.interval.ms": 5000,
      "locale": "en",
      "hadoop.home": "/etc/hadoop",
      "logs.dir": "/_incr_files_wal",
      "hive.integration": "false",
      "partition.duration.ms": "20000",
      "hadoop.conf.dir": "/etc/hadoop",
      "topics": "search_user_all_shards",
      "hdfs.url": "hdfs://namenode:9000",
      "transforms": "unwrap,insertTopicOffset,insertTimeStamp",
      "transforms.insertTimeStamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "transforms.unwrap.drop.tombstones": "true",
      "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
      "transforms.unwrap.delete.handling.mode": "rewrite",
      "transforms.insertTimeStamp.timestamp.field": "spdb_landing_timestamp",
      "transforms.insertTopicOffset.offset.field": "spdb_topic_offset",
      "transforms.insertTopicOffset.type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "schema.compatibility": "NONE",
      "path.format": "'partition'=YYYY-MM-dd-HH",
      "partitioner.class": "io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner"
  }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Since your key is a Struct, the best way I'm aware of is this SMT that effectively wraps the key and value into a new, nested value

https://github.com/jcustenborder/kafka-connect-transform-archive


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...