Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.8k views
in Technique[技术] by (71.8m points)

apache kafka - ClickHouse JSON parse exception: Cannot parse input: expected ',' before

I'm trying to add JSON data to ClickHouse from Kafka. Here's simplified JSON:

{
  ...
   "sendAddress":{
      "sendCommChannelTypeId":4,
      "sendCommChannelTypeCode":"SMS",
      "sendAddress":"789345345945"},
   ...
}

Here's the steps for creating table in ClickHouse, create another table using Kafka Engine and creating MATERIALIZED VIEW to connect these two tables, and also connect CH with Kafka.

Creating the first table

CREATE TABLE tab 
(
    ...

    sendAddress Tuple (sendCommChannelTypeId Int32, sendCommChannelTypeCode String, sendAddress String),
     ...

)Engine = MergeTree()
PARTITION BY applicationId
ORDER BY (applicationId);

Creating a second table with Kafka Engine SETTINGS:

CREATE TABLE tab_kfk
(
    ...
    sendAddress Tuple (sendCommChannelTypeId Int32, sendCommChannelTypeCode String, sendAddress String),
    ...
)ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092',
       kafka_topic_list = 'topk2',
       kafka_group_name = 'group1',
       kafka_format = 'JSONEachRow',
       kafka_row_delimiter = '
';

Create MATERIALIZED VIEW

CREATE MATERIALIZED VIEW tab_mv TO tab AS
SELECT ... sendAddress, ...
FROM tab_kfk;

Then I try to SELECT all or specific items from the first table - tab and get nothing. Logs is following exception

OK. Just add '[]' before curly braces in the sendAddress like this:

"authkey":"some_value",
   "sendAddress":[{
      "sendCommChannelTypeId":4,
      "sendCommChannelTypeCode":"SMS",
      "sendAddress":"789345345945"
   }]

And I still get a mistake, but slightly different: What should I do to fix this problem, thanks!

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

There are 3 ways to fix it:

  1. Not use nested objects and flatten messages before inserting to Kafka topic. For example such way:
{
    ..
    "authkey":"key",
    "sendAddress_CommChannelTypeId":4,
    "sendAddress_CommChannelTypeCode":"SMS",
    "sendAddress":"789345345945",
    ..
}
  1. Use Nested data structure that required to change the JSON-message schema and table schema:
{
    ..
    "authkey":"key",
    "sendAddress.sendCommChannelTypeId":[4],
    "sendAddress.sendCommChannelTypeCode":["SMS"],
    "sendAddress.sendAddress":["789345345945"],
    ..
}
CREATE TABLE tab_kfk
(
    applicationId Int32,
    ..
    sendAddress Nested(
        sendCommChannelTypeId Int32,
        sendCommChannelTypeCode String,
        sendAddress String),
    ..
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092',
       kafka_topic_list = 'topk2',
       kafka_group_name = 'group1',
       kafka_format = 'JSONEachRow',
       kafka_row_delimiter = '
',
       input_format_import_nested_json = 1 /* <--- */

Take into account the setting input_format_import_nested_json.

  1. Interpret input JSON-message as string & parse it manually (see github issue #16969):
CREATE TABLE tab_kfk
(
    message String
)
ENGINE = Kafka
SETTINGS 
    ..
    kafka_format = 'JSONAsString', /* <--- */
    ..

CREATE MATERIALIZED VIEW tab_mv TO tab 
AS
SELECT 
    ..
    JSONExtractString(message, 'authkey') AS authkey,
    JSONExtract(message, 'sendAddress', 'Tuple(Int32,String,String)') AS sendAddress,
    ..
FROM tab_kfk;

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...