Source code for pyspark.ml.connect.evaluation

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any, Union, List, Tuple

import numpy as np
import pandas as pd

from pyspark import keyword_only
from pyspark.ml.param import Param, Params, TypeConverters
from pyspark.ml.param.shared import HasLabelCol, HasPredictionCol, HasProbabilityCol
from pyspark.ml.connect.base import Evaluator
from pyspark.ml.connect.io_utils import ParamsReadWrite
from pyspark.ml.connect.util import aggregate_dataframe
from pyspark.sql import DataFrame


class _TorchMetricEvaluator(Evaluator):
    metricName: Param[str] = Param(
        Params._dummy(),
        "metricName",
        "metric name for the regression evaluator, valid values are 'mse' and 'r2'",
        typeConverter=TypeConverters.toString,
    )

    def getMetricName(self) -> str:
        """
        Gets the value of metricName or its default value.

        .. versionadded:: 3.5.0
        """
        return self.getOrDefault(self.metricName)

    def _get_torch_metric(self) -> Any:
        raise NotImplementedError()

    def _get_input_cols(self) -> List[str]:
        raise NotImplementedError()

    def _get_metric_update_inputs(self, dataset: "pd.DataFrame") -> Tuple[Any, Any]:
        raise NotImplementedError()

    def _evaluate(self, dataset: Union["DataFrame", "pd.DataFrame"]) -> float:
        torch_metric = self._get_torch_metric()

        def local_agg_fn(pandas_df: "pd.DataFrame") -> "pd.DataFrame":
            torch_metric.update(*self._get_metric_update_inputs(pandas_df))
            return torch_metric

        def merge_agg_state(state1: Any, state2: Any) -> Any:
            state1.merge_state([state2])
            return state1

        def agg_state_to_result(state: Any) -> Any:
            return state.compute().item()

        return aggregate_dataframe(
            dataset,
            self._get_input_cols(),
            local_agg_fn,
            merge_agg_state,
            agg_state_to_result,
        )


def _get_rmse_torchmetric() -> Any:
    import torch
    import torcheval.metrics as torchmetrics

    class _RootMeanSquaredError(torchmetrics.MeanSquaredError):
        def compute(self: Any) -> torch.Tensor:
            return torch.sqrt(super().compute())

    return _RootMeanSquaredError()


[docs]class RegressionEvaluator(_TorchMetricEvaluator, HasLabelCol, HasPredictionCol, ParamsReadWrite): """ Evaluator for Regression, which expects input columns prediction and label. Supported metrics are 'rmse', 'mse' and 'r2'. .. versionadded:: 3.5.0 Examples -------- >>> from pyspark.ml.connect.evaluation import RegressionEvaluator >>> eva = RegressionEvaluator(metricName='mse') >>> dataset = spark.createDataFrame( ... [(1.0, 2.0), (-1.0, -1.5)], schema=['label', 'prediction'] ... ) >>> eva.evaluate(dataset) 0.625 >>> eva.isLargerBetter() False """ @keyword_only def __init__( self, *, metricName: str = "rmse", labelCol: str = "label", predictionCol: str = "prediction", ) -> None: """ __init__(self, *, metricName='rmse', labelCol='label', predictionCol='prediction') -> None: """ super().__init__() self._set(metricName=metricName, labelCol=labelCol, predictionCol=predictionCol) def _get_torch_metric(self) -> Any: import torcheval.metrics as torchmetrics metric_name = self.getOrDefault(self.metricName) if metric_name == "mse": return torchmetrics.MeanSquaredError() if metric_name == "r2": return torchmetrics.R2Score() if metric_name == "rmse": return _get_rmse_torchmetric() raise ValueError(f"Unsupported regressor evaluator metric name: {metric_name}") def _get_input_cols(self) -> List[str]: return [self.getPredictionCol(), self.getLabelCol()] def _get_metric_update_inputs(self, dataset: "pd.DataFrame") -> Tuple[Any, Any]: import torch preds_tensor = torch.tensor(dataset[self.getPredictionCol()].values) labels_tensor = torch.tensor(dataset[self.getLabelCol()].values) return preds_tensor, labels_tensor
[docs] def isLargerBetter(self) -> bool: if self.getOrDefault(self.metricName) == "r2": return True return False
[docs]class BinaryClassificationEvaluator( _TorchMetricEvaluator, HasLabelCol, HasProbabilityCol, ParamsReadWrite ): """ Evaluator for binary classification, which expects input columns prediction and label. Supported metrics are 'areaUnderROC' and 'areaUnderPR'. .. versionadded:: 3.5.0 Examples -------- >>> from pyspark.ml.connect.evaluation import BinaryClassificationEvaluator >>> eva = BinaryClassificationEvaluator(metricName='areaUnderPR') >>> dataset = spark.createDataFrame( ... [(1, 0.6), (0, 0.55), (0, 0.1), (1, 0.6), (1, 0.4)], ... schema=['label', 'probability'] ... ) >>> eva.evaluate(dataset) 0.9166666865348816 >>> eva.isLargerBetter() True """ @keyword_only def __init__( self, *, metricName: str = "areaUnderROC", labelCol: str = "label", probabilityCol: str = "probability", ) -> None: """ __init__( self, *, metricName='rmse', labelCol='label', probabilityCol='probability' ) -> None: """ super().__init__() self._set(metricName=metricName, labelCol=labelCol, probabilityCol=probabilityCol) def _get_torch_metric(self) -> Any: import torcheval.metrics as torchmetrics metric_name = self.getOrDefault(self.metricName) if metric_name == "areaUnderROC": return torchmetrics.BinaryAUROC() if metric_name == "areaUnderPR": return torchmetrics.BinaryAUPRC() raise ValueError(f"Unsupported binary classification evaluator metric name: {metric_name}") def _get_input_cols(self) -> List[str]: return [self.getProbabilityCol(), self.getLabelCol()] def _get_metric_update_inputs(self, dataset: "pd.DataFrame") -> Tuple[Any, Any]: import torch values = np.stack(dataset[self.getProbabilityCol()].values) # type: ignore[call-overload] preds_tensor = torch.tensor(values) if preds_tensor.dim() == 2: preds_tensor = preds_tensor[:, 1] labels_tensor = torch.tensor(dataset[self.getLabelCol()].values) return preds_tensor, labels_tensor
[docs] def isLargerBetter(self) -> bool: return True
[docs]class MulticlassClassificationEvaluator( _TorchMetricEvaluator, HasLabelCol, HasPredictionCol, ParamsReadWrite ): """ Evaluator for multiclass classification, which expects input columns prediction and label. Supported metrics are 'accuracy'. .. versionadded:: 3.5.0 Examples -------- >>> from pyspark.ml.connect.evaluation import MulticlassClassificationEvaluator >>> eva = MulticlassClassificationEvaluator(metricName='accuracy') >>> dataset = spark.createDataFrame( ... [(1, 1), (0, 0), (2, 2), (1, 0), (2, 1)], ... schema=['label', 'prediction'] ... ) >>> eva.evaluate(dataset) 0.6000000238418579 >>> eva.isLargerBetter() True """ def __init__( self, metricName: str = "accuracy", labelCol: str = "label", predictionCol: str = "prediction", ) -> None: """ __init__( self, *, metricName='accuracy', labelCol='label', predictionCol='prediction' ) -> None: """ super().__init__() self._set(metricName=metricName, labelCol=labelCol, predictionCol=predictionCol) def _get_torch_metric(self) -> Any: import torcheval.metrics as torchmetrics metric_name = self.getOrDefault(self.metricName) if metric_name == "accuracy": return torchmetrics.MulticlassAccuracy() raise ValueError( f"Unsupported multiclass classification evaluator metric name: {metric_name}" ) def _get_input_cols(self) -> List[str]: return [self.getPredictionCol(), self.getLabelCol()] def _get_metric_update_inputs(self, dataset: "pd.DataFrame") -> Tuple[Any, Any]: import torch preds_tensor = torch.tensor(dataset[self.getPredictionCol()].values) labels_tensor = torch.tensor(dataset[self.getLabelCol()].values) return preds_tensor, labels_tensor
[docs] def isLargerBetter(self) -> bool: return True