Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
531 views
in Technique[技术] by (71.8m points)

minio - Apache Flink 1.11 Streaming Sink to S3

I'm using the Flink FileSystem SQL Connector to read events from Kafka and write to S3(Using MinIo). Here is my code,

exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
# start a checkpoint every 10 s
exec_env.enable_checkpointing(10000)
exec_env.set_state_backend(FsStateBackend("s3://test-bucket/checkpoints/"))
t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)

INPUT_TABLE = "source"
INPUT_TOPIC = "Rides"
LOCAL_KAFKA = 'kafka:9092'
OUTPUT_TABLE = "sink"

ddl_source = f"""
       CREATE TABLE {INPUT_TABLE} (
           `rideId` BIGINT,
           `isStart` BOOLEAN,
           `eventTime` STRING,
           `lon` FLOAT,
           `lat` FLOAT,
           `psgCnt` INTEGER,
           `taxiId` BIGINT
       ) WITH (
           'connector' = 'kafka',
           'topic' = '{INPUT_TOPIC}',
           'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
           'format' = 'json'
       )
   """

ddl_sink = f"""
       CREATE TABLE {OUTPUT_TABLE} (
           `rideId` BIGINT,
           `isStart` BOOLEAN,
           `eventTime` STRING,
           `lon` FLOAT,
           `lat` FLOAT,
           `psgCnt` INTEGER,
           `taxiId` BIGINT
       ) WITH (
           'connector' = 'filesystem',
           'path' = 's3://test-bucket/kafka_output',
           'format' = 'parquet'
       )
   """

t_env.sql_update(ddl_source)
t_env.sql_update(ddl_sink)

t_env.execute_sql(f"""
    INSERT INTO {OUTPUT_TABLE}
    SELECT * 
    FROM {INPUT_TABLE}
""")

I'm using Flink 1.11.3 and flink-s3-fs-hadoop 1.11.3. I have copied the flink-s3-fs-hadoop-1.11.3.jar into the plugins folder.

cp /opt/flink/lib/flink-s3-fs-hadoop-1.11.3.jar /opt/flink/plugins/s3-fs-hadoop/;

Also I have added the following configs into the flink-conf.yaml.

    state.backend: filesystem
    state.checkpoints.dir: s3://test-bucket/checkpoints/
    s3.endpoint: http://127.0.0.1:9000
    s3.path.style.access: true
    s3.access-key: minio
    s3.secret-key: minio123

MinIo is running properly and I have created the 'test-bucket' in MinIo. When I run this job the job submission doesn't happen and Flink Dashboard goes to a some sort of waiting state. After 15-20 mins I get the following exception,

pyflink.util.exceptions.TableException: Failed to execute sql

What seems to be the problem here?


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...