Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
937 views
in Technique[技术] by (71.8m points)

apache spark - Writing R data frames returned from SparkR:::map

I am using SparkR:::map and my function returns a large-ish R dataframe for each input row, each of the same shape. I would like to write these dataframes as parquet files without 'collect'ing them. Can I map write.df over my output list? Can I get the worker tasks to write the parquet instead?

I now have a working. example. I am happy with this other than I did not expect the reduce to implicitly 'collect' as I wanted to write the resultant DF as Parquet.

Also, I'm not convinced that :::map actually does anything in parallel. Do I need always to call 'parallelise' as well?

#! /usr/bin/Rscript
library(SparkR, lib.loc="/opt/spark-1.5.1-bin-without-hadoop/R/lib")

source("jdbc-utils.R")

options(stringsAsFactors = FALSE)

# I dislike having these here but when I move them into main(), it breaks - the sqlContext drops.
assign("sc", sparkR.init(master = "spark://poc-master-1:7077",
                         sparkHome = "/opt/spark-1.5.1-bin-without-hadoop/",
                         appName = "Peter Spark test",
                         list(spark.executor.memory="4G")), envir = .GlobalEnv)
assign("sqlContext", sparkRSQL.init(sc), envir =.GlobalEnv)

#### MAP function ####
run.model <- function(v) {
  x <- v$xs[1]
  y <- v$ys[1]
  startTime     <- format(Sys.time(), "%F %T")
  xs <- c(1:x)
  endTime <- format(Sys.time(), "%F %T")
  hostname <- system("hostname", intern = TRUE)
  xys <- data.frame(xs,y,startTime,endTime,hostname,stringsAsFactors = FALSE)
  return(xys)
}

# HERE BE THE SCRIPT BIT
main <- function() {

  # Make unique identifiers for each run
  xs <- c(1:365)
  ys <- c(1:1)
  xys <- data.frame(xs,ys,stringsAsFactors = FALSE)

  # Convert to Spark dataframe for mapping
  sqlContext <- get("sqlContext", envir = .GlobalEnv)
  xys.sdf <- createDataFrame(sqlContext, xys)

  # Let Spark do what Spark does
  output.list <- SparkR:::map(xys.sdf, run.model)

  # Reduce gives us a single R dataframe, which may not be what we want.
  output.redux <- SparkR:::reduce(output.list, rbind)

  # Or you can have it as a list of data frames.
  output.col <- collect(output.list)

  return(NULL)
}
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Assuming your data looks more or less like this:

rdd <- SparkR:::parallelize(sc, 1:5)
dfs <- SparkR:::map(rdd, function(x) mtcars[(x * 5):((x + 1) * 5), ])

and all columns have supported types you can convert it to the row-wise format:

rows <- SparkR:::flatMap(dfs, function(x) {
  data <- as.list(x)
  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
  do.call(mapply, append(args, data))
})

call createDataFrame:

sdf <- createDataFrame(sqlContext, rows)
head(sdf)

##    mpg cyl  disp  hp drat   wt  qsec vs am gear carb
## 1 18.7   8 360.0 175 3.15 3.44 17.02  0  0    3    2
## 2 18.1   6 225.0 105 2.76 3.46 20.22  1  0    3    1
## 3 14.3   8 360.0 245 3.21 3.57 15.84  0  0    3    4
## 4 24.4   4 146.7  62 3.69 3.19 20.00  1  0    4    2
## 5 22.8   4 140.8  95 3.92 3.15 22.90  1  0    4    2
## 6 19.2   6 167.6 123 3.92 3.44 18.30  1  0    4    4

printSchema(sdf)

## root
##  |-- mpg: double (nullable = true)
##  |-- cyl: double (nullable = true)
##  |-- disp: double (nullable = true)
##  |-- hp: double (nullable = true)
##  |-- drat: double (nullable = true)
##  |-- wt: double (nullable = true)
##  |-- qsec: double (nullable = true)
##  |-- vs: double (nullable = true)
##  |-- am: double (nullable = true)
##  |-- gear: double (nullable = true)
##  |-- carb: double (nullable = true)

and simply use write.df / saveDF.

Problem is you shouldn't use an internal API in the first place. One of the reasons it was removed in the initial release is not robust enough to be used directly. Not to mention it is still not clear if it will be supported or even available in the future. Just saying...


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...