I'm using the Flink FileSystem SQL Connector to read events from Kafka and write to S3(Using MinIo). Here is my code,
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
# start a checkpoint every 10 s
exec_env.enable_checkpointing(10000)
exec_env.set_state_backend(FsStateBackend("s3://test-bucket/checkpoints/"))
t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)
INPUT_TABLE = "source"
INPUT_TOPIC = "Rides"
LOCAL_KAFKA = 'kafka:9092'
OUTPUT_TABLE = "sink"
ddl_source = f"""
CREATE TABLE {INPUT_TABLE} (
`rideId` BIGINT,
`isStart` BOOLEAN,
`eventTime` STRING,
`lon` FLOAT,
`lat` FLOAT,
`psgCnt` INTEGER,
`taxiId` BIGINT
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""
ddl_sink = f"""
CREATE TABLE {OUTPUT_TABLE} (
`rideId` BIGINT,
`isStart` BOOLEAN,
`eventTime` STRING,
`lon` FLOAT,
`lat` FLOAT,
`psgCnt` INTEGER,
`taxiId` BIGINT
) WITH (
'connector' = 'filesystem',
'path' = 's3://test-bucket/kafka_output',
'format' = 'parquet'
)
"""
t_env.sql_update(ddl_source)
t_env.sql_update(ddl_sink)
t_env.execute_sql(f"""
INSERT INTO {OUTPUT_TABLE}
SELECT *
FROM {INPUT_TABLE}
""")
I'm using Flink 1.11.3 and flink-s3-fs-hadoop 1.11.3. I have copied the flink-s3-fs-hadoop-1.11.3.jar into the plugins folder.
cp /opt/flink/lib/flink-s3-fs-hadoop-1.11.3.jar /opt/flink/plugins/s3-fs-hadoop/;
Also I have added the following configs into the flink-conf.yaml.
state.backend: filesystem
state.checkpoints.dir: s3://test-bucket/checkpoints/
s3.endpoint: http://127.0.0.1:9000
s3.path.style.access: true
s3.access-key: minio
s3.secret-key: minio123
MinIo is running properly and I have created the 'test-bucket' in MinIo. When I run this job the job submission doesn't happen and Flink Dashboard goes to a some sort of waiting state. After 15-20 mins I get the following exception,
pyflink.util.exceptions.TableException: Failed to execute sql
What seems to be the problem here?
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…