#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from multiprocessing.pool import ThreadPool
from typing import (
Any,
Callable,
Dict,
List,
Optional,
Sequence,
Tuple,
Union,
cast,
TYPE_CHECKING,
)
import numpy as np
import pandas as pd
from pyspark import keyword_only, since, inheritable_thread_target
from pyspark.ml.connect import Estimator, Model
from pyspark.ml.connect.base import Evaluator
from pyspark.ml.connect.io_utils import (
MetaAlgorithmReadWrite,
ParamsReadWrite,
)
from pyspark.ml.param import Params, Param, TypeConverters
from pyspark.ml.param.shared import HasParallelism, HasSeed
from pyspark.sql.functions import col, lit, rand
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.utils import is_remote
if TYPE_CHECKING:
from pyspark.ml._typing import ParamMap
class _ValidatorParams(HasSeed):
"""
Common params for TrainValidationSplit and CrossValidator.
"""
estimator: Param[Estimator] = Param(
Params._dummy(), "estimator", "estimator to be cross-validated"
)
estimatorParamMaps: Param[List["ParamMap"]] = Param(
Params._dummy(), "estimatorParamMaps", "estimator param maps"
)
evaluator: Param[Evaluator] = Param(
Params._dummy(),
"evaluator",
"evaluator used to select hyper-parameters that maximize the validator metric",
)
@since("2.0.0")
def getEstimator(self) -> Estimator:
"""
Gets the value of estimator or its default value.
"""
return self.getOrDefault(self.estimator)
@since("2.0.0")
def getEstimatorParamMaps(self) -> List["ParamMap"]:
"""
Gets the value of estimatorParamMaps or its default value.
"""
return self.getOrDefault(self.estimatorParamMaps)
@since("2.0.0")
def getEvaluator(self) -> Evaluator:
"""
Gets the value of evaluator or its default value.
"""
return self.getOrDefault(self.evaluator)
class _CrossValidatorParams(_ValidatorParams):
"""
Params for :py:class:`CrossValidator` and :py:class:`CrossValidatorModel`.
.. versionadded:: 3.5.0
"""
numFolds: Param[int] = Param(
Params._dummy(),
"numFolds",
"number of folds for cross validation",
typeConverter=TypeConverters.toInt,
)
foldCol: Param[str] = Param(
Params._dummy(),
"foldCol",
"Param for the column name of user "
+ "specified fold number. Once this is specified, :py:class:`CrossValidator` "
+ "won't do random k-fold split. Note that this column should be integer type "
+ "with range [0, numFolds) and Spark will throw exception on out-of-range "
+ "fold numbers.",
typeConverter=TypeConverters.toString,
)
def __init__(self, *args: Any):
super(_CrossValidatorParams, self).__init__(*args)
self._setDefault(numFolds=3, foldCol="")
@since("1.4.0")
def getNumFolds(self) -> int:
"""
Gets the value of numFolds or its default value.
"""
return self.getOrDefault(self.numFolds)
@since("3.1.0")
def getFoldCol(self) -> str:
"""
Gets the value of foldCol or its default value.
"""
return self.getOrDefault(self.foldCol)
def _parallelFitTasks(
estimator: Estimator,
train: DataFrame,
evaluator: Evaluator,
validation: DataFrame,
epm: Sequence["ParamMap"],
) -> List[Callable[[], Tuple[int, float]]]:
"""
Creates a list of callables which can be called from different threads to fit and evaluate
an estimator in parallel. Each callable returns an `(index, metric)` pair.
Parameters
----------
est : :py:class:`pyspark.ml.baseEstimator`
he estimator to be fit.
train : :py:class:`pyspark.sql.DataFrame`
DataFrame, training data set, used for fitting.
eva : :py:class:`pyspark.ml.evaluation.Evaluator`
used to compute `metric`
validation : :py:class:`pyspark.sql.DataFrame`
DataFrame, validation data set, used for evaluation.
epm : :py:class:`collections.abc.Sequence`
Sequence of ParamMap, params maps to be used during fitting & evaluation.
collectSubModel : bool
Whether to collect sub model.
Returns
-------
tuple
(int, float), an index into `epm` and the associated metric value.
"""
active_session = SparkSession.getActiveSession()
if active_session is None:
raise RuntimeError(
"An active SparkSession is required for running cross validator fit tasks."
)
def get_single_task(index: int, param_map: Any) -> Callable[[], Tuple[int, float]]:
def single_task() -> Tuple[int, float]:
if not is_remote():
# Active session is thread-local variable, in background thread the active session
# is not set, the following line sets it as the main thread active session.
active_session._jvm.SparkSession.setActiveSession( # type: ignore[union-attr]
active_session._jsparkSession
)
model = estimator.fit(train, param_map)
metric = evaluator.evaluate(
model.transform(validation, param_map) # type: ignore[union-attr]
)
return index, metric
return single_task
return [get_single_task(index, param_map) for index, param_map in enumerate(epm)]
class _CrossValidatorReadWrite(MetaAlgorithmReadWrite):
def _get_skip_saving_params(self) -> List[str]:
"""
Returns params to be skipped when saving metadata.
"""
return ["estimator", "estimatorParamMaps", "evaluator"]
def _save_meta_algorithm(self, root_path: str, node_path: List[str]) -> Dict[str, Any]:
metadata = self._get_metadata_to_save()
metadata[
"estimator"
] = self.getEstimator()._save_to_node_path( # type: ignore[attr-defined]
root_path, node_path + ["crossvalidator_estimator"]
)
metadata[
"evaluator"
] = self.getEvaluator()._save_to_node_path( # type: ignore[attr-defined]
root_path, node_path + ["crossvalidator_evaluator"]
)
metadata["estimator_param_maps"] = [
[
{"parent": param.parent, "name": param.name, "value": value}
for param, value in param_map.items()
]
for param_map in self.getEstimatorParamMaps() # type: ignore[attr-defined]
]
if isinstance(self, CrossValidatorModel):
metadata["avg_metrics"] = self.avgMetrics
metadata["std_metrics"] = self.stdMetrics
metadata["best_model"] = self.bestModel._save_to_node_path(
root_path, node_path + ["crossvalidator_best_model"]
)
return metadata
def _load_meta_algorithm(self, root_path: str, node_metadata: Dict[str, Any]) -> None:
estimator = ParamsReadWrite._load_instance_from_metadata(
node_metadata["estimator"], root_path
)
self.set(self.estimator, estimator) # type: ignore[attr-defined]
evaluator = ParamsReadWrite._load_instance_from_metadata(
node_metadata["evaluator"], root_path
)
self.set(self.evaluator, evaluator) # type: ignore[attr-defined]
json_epm = node_metadata["estimator_param_maps"]
uid_to_instances = MetaAlgorithmReadWrite.get_uid_map(estimator)
epm = []
for json_param_map in json_epm:
param_map = {}
for json_param in json_param_map:
est = uid_to_instances[json_param["parent"]]
param = getattr(est, json_param["name"])
value = json_param["value"]
param_map[param] = value
epm.append(param_map)
self.set(self.estimatorParamMaps, epm) # type: ignore[attr-defined]
if isinstance(self, CrossValidatorModel):
self.avgMetrics = node_metadata["avg_metrics"]
self.stdMetrics = node_metadata["std_metrics"]
self.bestModel = ParamsReadWrite._load_instance_from_metadata(
node_metadata["best_model"], root_path
)
[docs]class CrossValidator(
Estimator["CrossValidatorModel"],
_CrossValidatorParams,
HasParallelism,
_CrossValidatorReadWrite,
):
"""
K-fold cross validation performs model selection by splitting the dataset into a set of
non-overlapping randomly partitioned folds which are used as separate training and test datasets
e.g., with k=3 folds, K-fold cross validation will generate 3 (training, test) dataset pairs,
each of which uses 2/3 of the data for training and 1/3 for testing. Each fold is used as the
test set exactly once.
.. versionadded:: 3.5.0
Examples
--------
>>> from pyspark.ml.connect.tuning import CrossValidator
>>> from pyspark.ml.connect.classification import LogisticRegression
>>> from pyspark.ml.connect.evaluation import BinaryClassificationEvaluator
>>> from pyspark.ml.tuning import ParamGridBuilder
>>> from sklearn.datasets import load_breast_cancer
>>> lor = LogisticRegression(maxIter=20, learningRate=0.01)
>>> ev = BinaryClassificationEvaluator()
>>> grid = ParamGridBuilder().addGrid(lor.maxIter, [2, 20]).build()
>>> cv = CrossValidator(estimator=lor, evaluator=ev, estimatorParamMaps=grid)
>>> sk_dataset = load_breast_cancer()
>>> train_dataset = spark.createDataFrame(
... zip(sk_dataset.data.tolist(), [int(t) for t in sk_dataset.target]),
... schema="features: array<double>, label: long",
... )
>>> cv_model = cv.fit(train_dataset)
>>> transformed_dataset = cv_model.transform(train_dataset.limit(10))
>>> cv_model.avgMetrics
[0.5527792527167658, 0.8348714668615984]
>>> cv_model.stdMetrics
[0.04902833489813031, 0.05247132866444953]
"""
_input_kwargs: Dict[str, Any]
@keyword_only
def __init__(
self,
*,
estimator: Optional[Estimator] = None,
estimatorParamMaps: Optional[List["ParamMap"]] = None,
evaluator: Optional[Evaluator] = None,
numFolds: int = 3,
seed: Optional[int] = None,
parallelism: int = 1,
foldCol: str = "",
) -> None:
"""
__init__(self, \\*, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\
seed=None, parallelism=1, foldCol="")
"""
super(CrossValidator, self).__init__()
self._setDefault(parallelism=1)
kwargs = self._input_kwargs
self._set(**kwargs)
[docs] @keyword_only
@since("3.5.0")
def setParams(
self,
*,
estimator: Optional[Estimator] = None,
estimatorParamMaps: Optional[List["ParamMap"]] = None,
evaluator: Optional[Evaluator] = None,
numFolds: int = 3,
seed: Optional[int] = None,
parallelism: int = 1,
foldCol: str = "",
) -> "CrossValidator":
"""
setParams(self, \\*, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\
seed=None, parallelism=1, collectSubModels=False, foldCol=""):
Sets params for cross validator.
"""
kwargs = self._input_kwargs
return self._set(**kwargs)
[docs] @since("3.5.0")
def setEstimator(self, value: Estimator) -> "CrossValidator":
"""
Sets the value of :py:attr:`estimator`.
"""
return self._set(estimator=value)
[docs] @since("3.5.0")
def setEstimatorParamMaps(self, value: List["ParamMap"]) -> "CrossValidator":
"""
Sets the value of :py:attr:`estimatorParamMaps`.
"""
return self._set(estimatorParamMaps=value)
[docs] @since("3.5.0")
def setEvaluator(self, value: Evaluator) -> "CrossValidator":
"""
Sets the value of :py:attr:`evaluator`.
"""
return self._set(evaluator=value)
[docs] @since("3.5.0")
def setNumFolds(self, value: int) -> "CrossValidator":
"""
Sets the value of :py:attr:`numFolds`.
"""
return self._set(numFolds=value)
[docs] @since("3.5.0")
def setFoldCol(self, value: str) -> "CrossValidator":
"""
Sets the value of :py:attr:`foldCol`.
"""
return self._set(foldCol=value)
[docs] def setSeed(self, value: int) -> "CrossValidator":
"""
Sets the value of :py:attr:`seed`.
"""
return self._set(seed=value)
[docs] def setParallelism(self, value: int) -> "CrossValidator":
"""
Sets the value of :py:attr:`parallelism`.
"""
return self._set(parallelism=value)
[docs] def setCollectSubModels(self, value: bool) -> "CrossValidator":
"""
Sets the value of :py:attr:`collectSubModels`.
"""
return self._set(collectSubModels=value)
@staticmethod
def _gen_avg_and_std_metrics(
metrics_all: List[List[float]],
) -> Tuple[List[float], List[float]]:
avg_metrics = np.mean(metrics_all, axis=0)
std_metrics = np.std(metrics_all, axis=0)
return list(avg_metrics), list(std_metrics)
def _fit(self, dataset: Union[pd.DataFrame, DataFrame]) -> "CrossValidatorModel":
if isinstance(dataset, pd.DataFrame):
# TODO: support pandas dataframe fitting
raise NotImplementedError("Fitting pandas dataframe is not supported yet.")
est = self.getOrDefault(self.estimator)
epm = self.getOrDefault(self.estimatorParamMaps)
numModels = len(epm)
eva = self.getOrDefault(self.evaluator)
nFolds = self.getOrDefault(self.numFolds)
metrics_all = [[0.0] * numModels for i in range(nFolds)]
pool = ThreadPool(processes=min(self.getParallelism(), numModels))
datasets = self._kFold(dataset)
for i in range(nFolds):
validation = datasets[i][1].cache()
train = datasets[i][0].cache()
tasks = _parallelFitTasks(est, train, eva, validation, epm)
if not is_remote():
tasks = list(map(inheritable_thread_target, tasks))
for j, metric in pool.imap_unordered(lambda f: f(), tasks):
metrics_all[i][j] = metric
validation.unpersist()
train.unpersist()
metrics, std_metrics = CrossValidator._gen_avg_and_std_metrics(metrics_all)
if eva.isLargerBetter():
bestIndex = np.argmax(metrics)
else:
bestIndex = np.argmin(metrics)
bestModel = cast(Model, est.fit(dataset, epm[bestIndex]))
cv_model = self._copyValues(
CrossValidatorModel(
bestModel,
avgMetrics=metrics,
stdMetrics=std_metrics,
)
)
cv_model._resetUid(self.uid)
return cv_model
def _kFold(self, dataset: DataFrame) -> List[Tuple[DataFrame, DataFrame]]:
nFolds = self.getOrDefault(self.numFolds)
foldCol = self.getOrDefault(self.foldCol)
datasets = []
if not foldCol:
# Do random k-fold split.
seed = self.getOrDefault(self.seed)
h = 1.0 / nFolds
randCol = self.uid + "_rand"
df = dataset.select("*", rand(seed).alias(randCol))
for i in range(nFolds):
validateLB = i * h
validateUB = (i + 1) * h
condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB)
validation = df.filter(condition)
train = df.filter(~condition)
datasets.append((train, validation))
else:
# TODO:
# Add verification that foldCol column values are in range [0, nFolds)
for i in range(nFolds):
training = dataset.filter(col(foldCol) != lit(i))
validation = dataset.filter(col(foldCol) == lit(i))
if training.isEmpty():
raise ValueError("The training data at fold %s is empty." % i)
if validation.isEmpty():
raise ValueError("The validation data at fold %s is empty." % i)
datasets.append((training, validation))
return datasets
[docs] def copy(self, extra: Optional["ParamMap"] = None) -> "CrossValidator":
"""
Creates a copy of this instance with a randomly generated uid
and some extra params. This copies creates a deep copy of
the embedded paramMap, and copies the embedded and extra parameters over.
.. versionadded:: 3.5.0
Parameters
----------
extra : dict, optional
Extra parameters to copy to the new instance
Returns
-------
:py:class:`CrossValidator`
Copy of this instance
"""
if extra is None:
extra = dict()
newCV = Params.copy(self, extra)
if self.isSet(self.estimator):
newCV.setEstimator(self.getEstimator().copy(extra))
# estimatorParamMaps remain the same
if self.isSet(self.evaluator):
newCV.setEvaluator(self.getEvaluator().copy(extra))
return newCV
[docs]class CrossValidatorModel(Model, _CrossValidatorParams, _CrossValidatorReadWrite):
"""
CrossValidatorModel contains the model with the highest average cross-validation
metric across folds and uses this model to transform input data. CrossValidatorModel
also tracks the metrics for each param map evaluated.
.. versionadded:: 3.5.0
"""
def __init__(
self,
bestModel: Optional[Model] = None,
avgMetrics: Optional[List[float]] = None,
stdMetrics: Optional[List[float]] = None,
) -> None:
super(CrossValidatorModel, self).__init__()
#: best model from cross validation
self.bestModel = bestModel
#: Average cross-validation metrics for each paramMap in
#: CrossValidator.estimatorParamMaps, in the corresponding order.
self.avgMetrics = avgMetrics or []
#: standard deviation of metrics for each paramMap in
#: CrossValidator.estimatorParamMaps, in the corresponding order.
self.stdMetrics = stdMetrics or []
def _transform(self, dataset: Union[DataFrame, pd.DataFrame]) -> Union[DataFrame, pd.DataFrame]:
return self.bestModel.transform(dataset)
[docs] def copy(self, extra: Optional["ParamMap"] = None) -> "CrossValidatorModel":
"""
Creates a copy of this instance with a randomly generated uid
and some extra params. This copies the underlying bestModel,
creates a deep copy of the embedded paramMap, and
copies the embedded and extra parameters over.
It does not copy the extra Params into the subModels.
.. versionadded:: 3.5.0
Parameters
----------
extra : dict, optional
Extra parameters to copy to the new instance
Returns
-------
:py:class:`CrossValidatorModel`
Copy of this instance
"""
if extra is None:
extra = dict()
bestModel = self.bestModel.copy(extra)
avgMetrics = list(self.avgMetrics)
stdMetrics = list(self.stdMetrics)
return self._copyValues(
CrossValidatorModel(bestModel, avgMetrics=avgMetrics, stdMetrics=stdMetrics),
extra=extra,
)