Getting started with pyspark.ml
and the pipelines API, I find myself writing custom transformers for typical preprocessing tasks in order to use them in a pipeline. Examples:
from pyspark.ml import Pipeline, Transformer
class CustomTransformer(Transformer):
# lazy workaround - a transformer needs to have these attributes
_defaultParamMap = dict()
_paramMap = dict()
_params = dict()
class ColumnSelector(CustomTransformer):
"""Transformer that selects a subset of columns
- to be used as pipeline stage"""
def __init__(self, columns):
self.columns = columns
def _transform(self, data):
return data.select(self.columns)
class ColumnRenamer(CustomTransformer):
"""Transformer renames one column"""
def __init__(self, rename):
self.rename = rename
def _transform(self, data):
(colNameBefore, colNameAfter) = self.rename
return data.withColumnRenamed(colNameBefore, colNameAfter)
class NaDropper(CustomTransformer):
"""
Drops rows with at least one not-a-number element
"""
def __init__(self, cols=None):
self.cols = cols
def _transform(self, data):
dataAfterDrop = data.dropna(subset=self.cols)
return dataAfterDrop
class ColumnCaster(CustomTransformer):
def __init__(self, col, toType):
self.col = col
self.toType = toType
def _transform(self, data):
return data.withColumn(self.col, data[self.col].cast(self.toType))
They work, but I was wondering if this is a pattern or antipattern - are such transformers a good way to work with the pipeline API? Was it necessary to implement them, or is equivalent functionality provided somewhere else?
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…