Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
926 views
in Technique[技术] by (71.8m points)

java - Read parquet data from AWS s3 bucket

I need read parquet data from aws s3. If I use aws sdk for this I can get inputstream like this:

S3Object object = s3Client.getObject(new GetObjectRequest(bucketName, bucketKey));
InputStream inputStream = object.getObjectContent();

But the apache parquet reader uses only local file like this:

ParquetReader<Group> reader =
                    ParquetReader.builder(new GroupReadSupport(), new Path(file.getAbsolutePath()))
                            .withConf(conf)
                            .build();
reader.read()

So I don't know how parse input stream for parquet file. For example for csv files there is CSVParser which uses inputstream.

I know solution to use spark for this goal. Like this:

SparkSession spark = SparkSession
                .builder()
                .getOrCreate();
Dataset<Row> ds = spark.read().parquet("s3a://bucketName/file.parquet");

But I cannot use spark.

Could anyone tell me any solutions for read parquet data from s3?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
String SCHEMA_TEMPLATE = "{" +
                        ""type": "record",
" +
                        "    "name": "schema",
" +
                        "    "fields": [
" +
                        "        {"name": "timeStamp", "type": "string"},
" +
                        "        {"name": "temperature", "type": "double"},
" +
                        "        {"name": "pressure", "type": "double"}
" +
                        "    ]" +
                        "}";
String PATH_SCHEMA = "s3a";
Path internalPath = new Path(PATH_SCHEMA, bucketName, folderName);
Schema schema = new Schema.Parser().parse(SCHEMA_TEMPLATE);
Configuration configuration = new Configuration();
AvroReadSupport.setRequestedProjection(configuration, schema);
ParquetReader<GenericRecord> = AvroParquetReader.GenericRecord>builder(internalPath).withConf(configuration).build();
GenericRecord genericRecord = parquetReader.read();

while(genericRecord != null) {
        Map<String, String> valuesMap = new HashMap<>();
        genericRecord.getSchema().getFields().forEach(field -> valuesMap.put(field.name(), genericRecord.get(field.name()).toString()));

        genericRecord = parquetReader.read();
}

Gradle dependencies

    compile 'com.amazonaws:aws-java-sdk:1.11.213'
    compile 'org.apache.parquet:parquet-avro:1.9.0'
    compile 'org.apache.parquet:parquet-hadoop:1.9.0'
    compile 'org.apache.hadoop:hadoop-common:2.8.1'
    compile 'org.apache.hadoop:hadoop-aws:2.8.1'
    compile 'org.apache.hadoop:hadoop-client:2.8.1'

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...