Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
145 views
in Technique[技术] by (71.8m points)

java - How to convert spark dataset to geomesa simplefeature and save it to cassandra (Could not find a SpatialRDDProvider)

I have CSV files of geo data. I use apache spark to import those files into a DataSet and then I want to use GeoMesa. So I need to convert the dataset to simplefeature and in term to save it to Cassandra as GeoMesa format

public class Main {

public static void main(String[] args) throws IOException {

    Map<String, String> dsProperties = new HashMap<String, String>();
    dsProperties.put("cassandra.keyspace", "t1");
    dsProperties.put("cassandra.catalog", "testgeo");
    dsProperties.put("cassandra.tableName", "testgeo");
    dsProperties.put("cassandra.contact.point", "localhost:9042");
    DataStore ds = DataStoreFinder.getDataStore(dsProperties);
    SimpleFeatureType sft = SimpleFeatureTypes.createType("testgeo", "geoid:Integer,geopoint:Point:srid=4326");
    ds.createSchema(sft);
    SparkSession spark = SparkSession.builder().appName("my-app").master("local[*]")
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .config("spark.kryo.registrator", "org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator")
            .getOrCreate();
    org.apache.spark.sql.SQLTypes.init(spark.sqlContext());

    StructType schema = new StructType()
            .add(new StructField("id", DataTypes.IntegerType, true, Metadata.empty()))
            .add(new StructField("dt", DataTypes.TimestampType, true, Metadata.empty()))
            .add(new StructField("lat", DataTypes.DoubleType, true, Metadata.empty()))
            .add(new StructField("lon", DataTypes.DoubleType, true, Metadata.empty()));

    Dataset<Row> df = spark.read().format("geomesa").option("header", true).option("inferSchema", true)
            .option("dateFormat", "yyyy-MM-dd HH:mm:ss").schema(schema).option("delimiter", ",")
            .csv("C:\Users\h6\Desktop\dta.csv");

    df.createOrReplaceTempView("testgeo");
    df = spark.sql("SELECT id as geoid, st_makePoint(lat, lon) as geopoint FROM testgeo");
    df.show();

    Map<String, String> tableProperties = new HashMap<String, String>();
    tableProperties.put("cassandra.keyspace", "t1");
    tableProperties.put("cassandra.catalog", "testgeo");
    tableProperties.put("cassandra.tableName", "testgeo");
    tableProperties.put("cassandra.contact.point", "localhost:9042");
    df.write().format("geomesa").option("geomesa.feature", "testgeo").options(tableProperties).save();
}

}

sample of data

id,date,lat,lon
1277,2008-02-02 13:30:49,116.31412,39.89454
1277,2008-02-02 13:34:51,116.32674,39.89577

i got error :

Exception in thread "main" java.lang.RuntimeException: Could not find a SpatialRDDProvider
at org.locationtech.geomesa.spark.GeoMesaSpark$$anonfun$apply$2.apply(GeoMesaSpark.scala:33)
at org.locationtech.geomesa.spark.GeoMesaSpark$$anonfun$apply$2.apply(GeoMesaSpark.scala:33)
at scala.Option.getOrElse(Option.scala:121)
at org.locationtech.geomesa.spark.GeoMesaSpark$.apply(GeoMesaSpark.scala:33)
at org.locationtech.geomesa.spark.GeoMesaDataSource.createRelation(GeoMesaSparkSQL.scala:206)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.test.spark.Main.main(Main.java:75)
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

There isn't currently a SpatialRDDProvider for Cassandra, so you need to use the generic 'GeoTools' one: https://www.geomesa.org/documentation/user/spark/providers.html#geotools-rdd-provider

In short, you need to add "geotools" -> "true" to your tableProperties map. You'll also need to ensure that the appropriate Cassandra data store JARs are on the Spark classpath.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...