The minimum number of partitions is actually a lower bound set by the SparkContext
. Since spark uses hadoop under the hood, Hadoop InputFormat` will still be the behaviour by default.
The first case should reflect defaultParallelism
as mentioned here which may differ, depending on settings and hardware. (Numbers of cores, etc.)
So unless you provide the number of slices, that first case would be defined by the number described by sc.defaultParallelism
:
scala> sc.defaultParallelism
res0: Int = 6
scala> sc.parallelize(1 to 100).partitions.size
res1: Int = 6
As for the second case, with sc.textFile
, the number of slices by default is the minimum number of partitions.
Which is equal to 2 as you can see in this section of code.
Thus, you should consider the following :
sc.parallelize
will take numSlices
or defaultParallelism
.
sc.textFile
will take the maximum between minPartitions
and the number of splits computed based on hadoop input split size divided by the block size.
Example:
Let's create some dummy text files:
fallocate -l 241m bigfile.txt
fallocate -l 4G hugefile.txt
This will create 2 files, respectively, of size 241MB and 4GB.
We can see what happens when we read each of the files:
scala> val rdd = sc.textFile("bigfile.txt")
// rdd: org.apache.spark.rdd.RDD[String] = bigfile.txt MapPartitionsRDD[1] at textFile at <console>:27
scala> rdd.getNumPartitions
// res0: Int = 8
scala> val rdd2 = sc.textFile("hugefile.txt")
// rdd2: org.apache.spark.rdd.RDD[String] = hugefile.txt MapPartitionsRDD[3] at textFile at <console>:27
scala> rdd2.getNumPartitions
// res1: Int = 128
Both of them are actually HadoopRDD
s:
scala> rdd.toDebugString
// res2: String =
// (8) bigfile.txt MapPartitionsRDD[1] at textFile at <console>:27 []
// | bigfile.txt HadoopRDD[0] at textFile at <console>:27 []
scala> rdd2.toDebugString
// res3: String =
// (128) hugefile.txt MapPartitionsRDD[3] at textFile at <console>:27 []
// | hugefile.txt HadoopRDD[2] at textFile at <console>:27 []
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…