Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.3k views
in Technique[技术] by (71.8m points)

scala - How to create a Schema file in Spark

I am trying to read a Schema file (which is a text file) and apply it to my CSV file without a header. Since I already have a schema file I don't want to use InferSchema option which is an overhead.

My input schema file looks like below,

"num IntegerType","letter StringType"

I am trying the below code to create a schema file,

val schema_file = spark.read.textFile("D:\Users\Documents\schemaFile.txt")
val struct_type = schema_file.flatMap(x => x.split(",")).map(b => (b.split(" ")(0).stripPrefix(""").asInstanceOf[String],b.split(" ")(1).stripSuffix(""").asInstanceOf[org.apache.spark.sql.types.DataType])).foreach(x=>println(x))

I am getting the error as below

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.types.DataType

- field (class: "org.apache.spark.sql.types.DataType", name: "_2") - root class: "scala.Tuple2"

and trying to use this as a schema file while using spark.read.csv like below and write it as an ORC file

  val df=spark.read
      .format("org.apache.spark.csv")
      .option("header", false)
      .option("inferSchema", true)
      .option("samplingRatio",0.01)
      .option("nullValue", "NULL")
      .option("delimiter","|")
      .schema(schema_file)
      .csv("D:\Users\sampleFile.txt")
      .toDF().write.format("orc").save("D:\Users\ORC")

Need help to convert a text file into a schema file and convert my input CSV file to ORC.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

To create a schema from a text file create a function to match the type and return DataType as

def getType(raw: String): DataType = {
  raw match {
    case "ByteType" => ByteType
    case "ShortType" => ShortType
    case "IntegerType" => IntegerType
    case "LongType" => LongType
    case "FloatType" => FloatType
    case "DoubleType" => DoubleType
    case "BooleanType" => BooleanType
    case "TimestampType" => TimestampType
    case _ => StringType
  }
}

Now create a schema by reading a schema file as

val schema = Source.fromFile("schema.txt").getLines().toList
  .flatMap(_.split(",")).map(_.replaceAll(""", "").split(" "))
  .map(x => StructField(x(0), getType(x(1)), true))

Now read the csv file as

spark.read
  .option("samplingRatio", "0.01")
  .option("delimiter", "|")
  .option("nullValue", "NULL")
  .schema(StructType(schema))
  .csv("data.csv")

Hope this helps!


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

57.0k users

...