Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
602 views
in Technique[技术] by (71.8m points)

compression - Zip support in Apache Spark

I have read about Spark's support for gzip-kind input files here, and I wonder if the same support exists for different kind of compressed files, such as .zip files. So far I have tried computing a file compressed under a zip file, but Spark seems unable to read its contents successfully.

I have taken a look to Hadoop's newAPIHadoopFile and newAPIHadoopRDD, but so far I have not been able to get anything working.

In addition, Spark supports creating a partition for every file under a specified folder, like in the example below:

SparkConf SpkCnf = new SparkConf().setAppName("SparkApp")
                                  .setMaster("local[4]");

JavaSparkContext Ctx = new JavaSparkContext(SpkCnf);

JavaRDD<String> FirstRDD = Ctx.textFile("C:input).cache();

Where C:input points to a directory with multiple files.

In the case computing zipped files would be possible, would it also be possible to pack every file under a single compressed file and follow the same pattern of one partition per file?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Spark default support compressed files

According to Spark Programming Guide

All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").

This could be expanded by providing information about what compression formats are supported by Hadoop, which basically can be checked by finding all classes extending CompressionCodec (docs)

name    | ext      | codec class
-------------------------------------------------------------
bzip2   | .bz2     | org.apache.hadoop.io.compress.BZip2Codec 
default | .deflate | org.apache.hadoop.io.compress.DefaultCodec 
deflate | .deflate | org.apache.hadoop.io.compress.DeflateCodec 
gzip    | .gz      | org.apache.hadoop.io.compress.GzipCodec 
lz4     | .lz4     | org.apache.hadoop.io.compress.Lz4Codec 
snappy  | .snappy  | org.apache.hadoop.io.compress.SnappyCodec

Source : List the available hadoop codecs

So the above formats and much more possibilities could be achieved simply by calling:

sc.readFile(path)

Reading zip files in Spark

Unfortunately, zip is not on the supported list by default.

I have found a great article: Hadoop: Processing ZIP files in Map/Reduce and some answers (example) explaining how to use imported ZipFileInputFormat together with sc.newAPIHadoopFile API. But this did not work for me.

My solution

Without any external dependencies, you can load your file with sc.binaryFiles and later on decompress the PortableDataStream reading the content. This is the approach I have chosen.

import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.ZipInputStream
import org.apache.spark.SparkContext
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD

implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {

    def readFile(path: String,
                 minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {

      if (path.endsWith(".zip")) {
        sc.binaryFiles(path, minPartitions)
          .flatMap { case (name: String, content: PortableDataStream) =>
            val zis = new ZipInputStream(content.open)
            // this solution works only for single file in the zip
            val entry = zis.getNextEntry
            val br = new BufferedReader(new InputStreamReader(zis))
            Stream.continually(br.readLine()).takeWhile(_ != null)
          }
      } else {
        sc.textFile(path, minPartitions)
      }
    }
  }

using this implicit class, you need to import it and call the readFile method on SparkContext:

import com.github.atais.spark.Implicits.ZipSparkContext
sc.readFile(path)

And the implicit class will load your zip file properly and return RDD[String] like it used to.

Note: This only works for single file in the zip archive!
For multiple files in your zip support, check this answer: https://stackoverflow.com/a/45958458/1549135


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...