As of today Spark doesn't provide any method that can do it for you, so if you have to create your own. Let's say your data looks like this:
import random
random.seed(1)
df = sc.parallelize([(
random.choice([0.0, 1.0]),
random.choice(["a", "b", "c"]),
random.choice(["foo", "bar"]),
random.randint(0, 100),
random.random(),
) for _ in range(100)]).toDF(["label", "x1", "x2", "x3", "x4"])
and is processed using following pipeline:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
indexers = [
StringIndexer(inputCol=c, outputCol="{}_idx".format(c)) for c in ["x1", "x2"]]
encoders = [
OneHotEncoder(
inputCol=idx.getOutputCol(),
outputCol="{0}_enc".format(idx.getOutputCol())) for idx in indexers]
assembler = VectorAssembler(
inputCols=[enc.getOutputCol() for enc in encoders] + ["x3", "x4"],
outputCol="features")
pipeline = Pipeline(
stages=indexers + encoders + [assembler, LinearRegression()])
model = pipeline.fit(df)
Get the LinearRegressionModel
:
lrm = model.stages[-1]
Transform the data:
transformed = model.transform(df)
Extract and flatten ML attributes:
from itertools import chain
attrs = sorted(
(attr["idx"], attr["name"]) for attr in (chain(*transformed
.schema[lrm.summary.featuresCol]
.metadata["ml_attr"]["attrs"].values())))
and map to the output:
[(name, lrm.summary.pValues[idx]) for idx, name in attrs]
[('x1_idx_enc_a', 0.26400012641279824),
('x1_idx_enc_c', 0.06320192217171572),
('x2_idx_enc_foo', 0.40447778902400433),
('x3', 0.1081883594783335),
('x4', 0.4545851609776568)]
[(name, lrm.coefficients[idx]) for idx, name in attrs]
[('x1_idx_enc_a', 0.13874401585637453),
('x1_idx_enc_c', 0.23498565469334595),
('x2_idx_enc_foo', -0.083558932128022873),
('x3', 0.0030186112903237442),
('x4', -0.12951394186593695)]