Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
246 views
in Technique[技术] by (71.8m points)

machine learning - How to fix NPE when transfering RasterFrameLayer into Raster when writing out Tif?

I'm trying to convert a predicted RasterFrameLayer in RasterFrames into a GeoTiff file after training a machine learning model. When using the demo data Elkton-VA from rasterframes,it works fine.
But when using one cropping sentinel 2a tif with ndvi indice (normalized from -1000 to 1000), it failed with NullPointedException in toRaster step.
Feel like it's due to nodata value outside the ROI. The test data is here, geojson and log.

Geotrellis version:3.3.0
Rasterframes version:0.9.0


import geotrellis.proj4.LatLng
import geotrellis.raster._
import geotrellis.raster.io.geotiff.{MultibandGeoTiff, SinglebandGeoTiff}
import geotrellis.raster.io.geotiff.reader.GeoTiffReader
import geotrellis.raster.render.{ColorRamps, Png}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.sql._
import org.locationtech.rasterframes._
import org.locationtech.rasterframes.ml.{NoDataFilter, TileExploder}

object ClassificiationRaster extends App {

  def readTiff(name: String) =  GeoTiffReader.readSingleband(getClass.getResource(s"/$name").getPath)

  def readMtbTiff(name: String): MultibandGeoTiff =  GeoTiffReader.readMultiband(getClass.getResource(s"/$name").getPath)

  implicit val spark = SparkSession.builder()
    .master("local[*]")
    .appName(getClass.getName)
    .withKryoSerialization
    .getOrCreate()
    .withRasterFrames

  import spark.implicits._

  val filenamePattern = "xiangfuqu_202003_mask_%s.tif"
  val bandNumbers = "ndvi".split(",").toSeq
  val bandColNames = bandNumbers.map(b ? s"band_$b").toArray
  val tileSize = 256

  val joinedRF: RasterFrameLayer = bandNumbers
    .map { b ? (b, filenamePattern.format(b)) }
    .map { case (b, f) ? (b, readTiff(f)) }
    .map { case (b, t) ? t.projectedRaster.toLayer(tileSize, tileSize, s"band_$b") }
    .reduce(_ spatialJoin _)
    .withCRS()
    .withExtent()

  val tlm = joinedRF.tileLayerMetadata.left.get

//  println(tlm.totalDimensions.cols)
//  println(tlm.totalDimensions.rows)

  joinedRF.printSchema()

  val targetCol = "label"

  val geojsonPath = "/Users/ethan/work/data/L2a10m4326/zds/test.geojson"
  spark.sparkContext.addFile(geojsonPath)
  import org.locationtech.rasterframes.datasource.geojson._

  val jsonDF: DataFrame = spark.read.geojson.load(geojsonPath)
  val label_df: DataFrame = jsonDF
    .select($"CLASS_ID", st_reproject($"geometry",LatLng,LatLng).alias("geometry"))
    .hint("broadcast")

  val df_joined = joinedRF.join(label_df, st_intersects(st_geometry($"extent"), $"geometry"))
    .withColumn("dims",rf_dimensions($"band_ndvi"))

  val df_labeled: DataFrame = df_joined.withColumn(
    "label",
    rf_rasterize($"geometry", st_geometry($"extent"), $"CLASS_ID", $"dims.cols", $"dims.rows")
  )

  df_labeled.printSchema()

  val tmp = df_labeled.filter(rf_tile_sum($"label") > 0).cache()

  val exploder = new TileExploder()

  val noDataFilter = new NoDataFilter().setInputCols(bandColNames :+ targetCol)

  val assembler = new VectorAssembler()
    .setInputCols(bandColNames)
    .setOutputCol("features")

  val classifier = new DecisionTreeClassifier()
    .setLabelCol(targetCol)
    .setFeaturesCol(assembler.getOutputCol)

  val pipeline = new Pipeline()
    .setStages(Array(exploder, noDataFilter, assembler, classifier))

  val evaluator = new MulticlassClassificationEvaluator()
    .setLabelCol(targetCol)
    .setPredictionCol("prediction")
    .setMetricName("f1")

  val paramGrid = new ParamGridBuilder()
    //.addGrid(classifier.maxDepth, Array(1, 2, 3, 4))
    .build()

  val trainer = new CrossValidator()
    .setEstimator(pipeline)
    .setEvaluator(evaluator)
    .setEstimatorParamMaps(paramGrid)
    .setNumFolds(4)

  val model = trainer.fit(tmp)

  val metrics = model.getEstimatorParamMaps
    .map(_.toSeq.map(p ? s"${p.param.name} = ${p.value}"))
    .map(_.mkString(", "))
    .zip(model.avgMetrics)
  metrics.toSeq.toDF("params", "metric").show(false)

  val scored = model.bestModel.transform(joinedRF)

  scored.groupBy($"prediction" as "class").count().show

  scored.show(20)


  val retiled: DataFrame = scored.groupBy($"crs", $"extent").agg(
    rf_assemble_tile(
      $"column_index", $"row_index", $"prediction",
      tlm.tileCols, tlm.tileRows, IntConstantNoDataCellType
    )
  )

  val rf: RasterFrameLayer = retiled.toLayer(tlm)

  val raster: ProjectedRaster[Tile] = rf.toRaster($"prediction", 5848, 4189)

  SinglebandGeoTiff(raster.tile,tlm.extent, tlm.crs).write("/Users/ethan/project/IdeaProjects/learn/spark_ml_learn.git/src/main/resources/easy_b1.tif")

    val clusterColors = ColorRamp(
      ColorRamps.Viridis.toColorMap((0 until 1).toArray).colors
    )

//  val pngBytes = retiled.select(rf_render_png($"prediction", clusterColors)).first  //It can output the png.
//  retiled.tile.renderPng(clusterColors).write("/Users/ethan/project/IdeaProjects/learn/spark_ml_learn.git/src/main/resources/classified2.png")

//  Png(pngBytes).write("/Users/ethan/project/IdeaProjects/learn/spark_ml_learn.git/src/main/resources/classified2.png")

  spark.stop()
}


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I suspect there is a bug in the way the toLayer extension method is working. I will follow up with a bug report to RasterFrames project. That will take a little more effort I suspect.

Here is a possible workaround that is a little bit lower level. In this case it results in 25 non-overlapping GeoTiffs written out.

import geotrellis.store.hadoop.{SerializableConfiguration, _}
import geotrellis.spark.Implicits._
import org.apache.hadoop.fs.Path

// Need this to write local files from spark
val hconf = SerializableConfiguration(spark.sparkContext.hadoopConfiguration)

ContextRDD(
    rf.toTileLayerRDD($"prediction")
      .left.get
      .filter{
        case (_: SpatialKey, null) ? false  // remove any null Tiles
        case _ ? true
      },
    tlm)
    .regrid(1024)  //Regrid the Tiles so that they are 1024 x 1024
    .toGeoTiffs()
    .foreach{ case (sk: SpatialKey, gt: SinglebandGeoTiff) ?
        val path = new Path(new Path("file:///tmp/output"), s"${sk.col}_${sk.row}.tif")
        gt.write(path, hconf.value)
      }

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...