diff --git a/.ci/run-repository.sh b/.ci/run-repository.sh index 2048431..6e92e8f 100755 --- a/.ci/run-repository.sh +++ b/.ci/run-repository.sh @@ -35,4 +35,4 @@ docker run \ --name eland-test-runner \ --rm \ elastic/eland \ - nox -s test-${PYTHON_VERSION} + nox -s lint test-${PYTHON_VERSION} diff --git a/eland/filter.py b/eland/filter.py index bd2801b..2f5800c 100644 --- a/eland/filter.py +++ b/eland/filter.py @@ -178,7 +178,7 @@ class MatchAllFilter(QueryFilter): class RandomScoreFilter(QueryFilter): - def __init__(self, query: QueryFilter, random_state: int) -> None: + def __init__(self, query: BooleanFilter, random_state: int) -> None: q = MatchAllFilter() if query.empty() else query seed = {} diff --git a/eland/ml/_model_serializer.py b/eland/ml/_model_serializer.py index 94ea694..f74922c 100644 --- a/eland/ml/_model_serializer.py +++ b/eland/ml/_model_serializer.py @@ -6,7 +6,7 @@ import base64 import gzip import json from abc import ABC -from typing import List, Dict, Any, Optional +from typing import Sequence, Dict, Any, Optional def add_if_exists(d: Dict[str, Any], k: str, v: Any) -> None: @@ -17,9 +17,9 @@ def add_if_exists(d: Dict[str, Any], k: str, v: Any) -> None: class ModelSerializer(ABC): def __init__( self, - feature_names: List[str], + feature_names: Sequence[str], target_type: Optional[str] = None, - classification_labels: Optional[List[str]] = None, + classification_labels: Optional[Sequence[str]] = None, ): self._target_type = target_type self._feature_names = feature_names @@ -33,7 +33,7 @@ class ModelSerializer(ABC): return d @property - def feature_names(self) -> List[str]: + def feature_names(self) -> Sequence[str]: return self._feature_names def serialize_model(self) -> Dict[str, Any]: @@ -84,10 +84,10 @@ class TreeNode: class Tree(ModelSerializer): def __init__( self, - feature_names: List[str], + feature_names: Sequence[str], target_type: Optional[str] = None, - tree_structure: Optional[List[TreeNode]] = None, - classification_labels: Optional[List[str]] = None, + tree_structure: Optional[Sequence[TreeNode]] = None, + classification_labels: Optional[Sequence[str]] = None, ): super().__init__( feature_names=feature_names, @@ -107,12 +107,12 @@ class Tree(ModelSerializer): class Ensemble(ModelSerializer): def __init__( self, - feature_names: List[str], - trained_models: List[ModelSerializer], + feature_names: Sequence[str], + trained_models: Sequence[ModelSerializer], output_aggregator: Dict[str, Any], target_type: Optional[str] = None, - classification_labels: Optional[List[str]] = None, - classification_weights: Optional[List[float]] = None, + classification_labels: Optional[Sequence[str]] = None, + classification_weights: Optional[Sequence[float]] = None, ): super().__init__( feature_names=feature_names, diff --git a/eland/ml/_model_transformers.py b/eland/ml/_model_transformers.py deleted file mode 100644 index abeac77..0000000 --- a/eland/ml/_model_transformers.py +++ /dev/null @@ -1,435 +0,0 @@ -# Licensed to Elasticsearch B.V under one or more agreements. -# Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -# See the LICENSE file in the project root for more information - -from typing import List, Union, Optional - -import numpy as np - -from eland.ml._optional import import_optional_dependency -from eland.ml._model_serializer import Tree, TreeNode, Ensemble - -sklearn = import_optional_dependency("sklearn") -xgboost = import_optional_dependency("xgboost") - -from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor -from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor -from sklearn.utils.validation import check_is_fitted -from xgboost import Booster, XGBRegressor, XGBClassifier - - -class ModelTransformer: - def __init__( - self, - model, - feature_names: List[str], - classification_labels: Optional[List[str]] = None, - classification_weights: Optional[List[float]] = None, - ): - self._feature_names = feature_names - self._model = model - self._classification_labels = classification_labels - self._classification_weights = classification_weights - - def is_supported(self): - return isinstance( - self._model, - ( - DecisionTreeClassifier, - DecisionTreeRegressor, - RandomForestRegressor, - RandomForestClassifier, - XGBClassifier, - XGBRegressor, - Booster, - ), - ) - - -class SKLearnTransformer(ModelTransformer): - """ - Base class for SKLearn transformers. - warning: Should not use this class directly. Use derived classes instead - """ - - def __init__( - self, - model, - feature_names: List[str], - classification_labels: Optional[List[str]] = None, - classification_weights: Optional[List[float]] = None, - ): - """ - Base class for SKLearn transformations - - :param model: sklearn trained model - :param feature_names: The feature names for the model - :param classification_labels: Optional classification labels (if not encoded in the model) - :param classification_weights: Optional classification weights - """ - super().__init__( - model, feature_names, classification_labels, classification_weights - ) - self._node_decision_type = "lte" - - def build_tree_node(self, node_index: int, node_data: dict, value) -> TreeNode: - """ - This builds out a TreeNode class given the sklearn tree node definition. - - Node decision types are defaulted to "lte" to match the behavior of SKLearn - - :param node_index: The node index - :param node_data: Opaque node data contained in the sklearn tree state - :param value: Opaque node value (i.e. leaf/node values) from tree state - :return: TreeNode object - """ - if value.shape[0] != 1: - raise ValueError( - f"unexpected multiple values returned from leaf node '{node_index}'" - ) - if node_data[0] == -1: # is leaf node - if ( - value.shape[1] == 1 - ): # classification requires more than one value, so assume regression - leaf_value = float(value[0][0]) - else: - # the classification value, which is the index of the largest value - leaf_value = int(np.argmax(value)) - return TreeNode( - node_index, - decision_type=self._node_decision_type, - leaf_value=leaf_value, - ) - else: - return TreeNode( - node_index, - decision_type=self._node_decision_type, - left_child=int(node_data[0]), - right_child=int(node_data[1]), - split_feature=int(node_data[2]), - threshold=float(node_data[3]), - ) - - -class SKLearnDecisionTreeTransformer(SKLearnTransformer): - """ - class for transforming SKLearn decision tree models into Tree model formats supported by Elasticsearch. - """ - - def __init__( - self, - model: Union[DecisionTreeRegressor, DecisionTreeClassifier], - feature_names: List[str], - classification_labels: Optional[List[str]] = None, - ): - """ - Transforms a Decision Tree model (Regressor|Classifier) into a ES Supported Tree format - :param model: fitted decision tree model - :param feature_names: model feature names - :param classification_labels: Optional classification labels - """ - super().__init__(model, feature_names, classification_labels) - - def transform(self) -> Tree: - """ - Transform the provided model into an ES supported Tree object - :return: Tree object for ES storage and use - """ - target_type = ( - "regression" - if isinstance(self._model, DecisionTreeRegressor) - else "classification" - ) - check_is_fitted(self._model, ["tree_"]) - tree_classes = None - if self._classification_labels: - tree_classes = self._classification_labels - if isinstance(self._model, DecisionTreeClassifier): - check_is_fitted(self._model, ["classes_"]) - if tree_classes is None: - tree_classes = [str(c) for c in self._model.classes_] - nodes = [] - tree_state = self._model.tree_.__getstate__() - for i in range(len(tree_state["nodes"])): - nodes.append( - self.build_tree_node(i, tree_state["nodes"][i], tree_state["values"][i]) - ) - - return Tree(self._feature_names, target_type, nodes, tree_classes) - - -class SKLearnForestTransformer(SKLearnTransformer): - """ - Base class for transforming SKLearn forest models into Ensemble model formats supported by Elasticsearch. - - warning: do not use this class directly. Use a derived class instead - """ - - def __init__( - self, - model: Union[RandomForestClassifier, RandomForestRegressor], - feature_names: List[str], - classification_labels: Optional[List[str]] = None, - classification_weights: Optional[List[float]] = None, - ): - super().__init__( - model, feature_names, classification_labels, classification_weights - ) - - def build_aggregator_output(self) -> dict: - raise NotImplementedError("build_aggregator_output must be implemented") - - def determine_target_type(self) -> str: - raise NotImplementedError("determine_target_type must be implemented") - - def transform(self) -> Ensemble: - check_is_fitted(self._model, ["estimators_"]) - estimators = self._model.estimators_ - ensemble_classes = None - if self._classification_labels: - ensemble_classes = self._classification_labels - if isinstance(self._model, RandomForestClassifier): - check_is_fitted(self._model, ["classes_"]) - if ensemble_classes is None: - ensemble_classes = [str(c) for c in self._model.classes_] - ensemble_models = [ - SKLearnDecisionTreeTransformer(m, self._feature_names).transform() - for m in estimators - ] - return Ensemble( - self._feature_names, - ensemble_models, - self.build_aggregator_output(), - target_type=self.determine_target_type(), - classification_labels=ensemble_classes, - classification_weights=self._classification_weights, - ) - - -class SKLearnForestRegressorTransformer(SKLearnForestTransformer): - """ - Class for transforming RandomForestRegressor models into an ensemble model supported by Elasticsearch - """ - - def __init__(self, model: RandomForestRegressor, feature_names: List[str]): - super().__init__(model, feature_names) - - def build_aggregator_output(self) -> dict: - return { - "weighted_sum": { - "weights": [1.0 / len(self._model.estimators_)] - * len(self._model.estimators_), - } - } - - def determine_target_type(self) -> str: - return "regression" - - -class SKLearnForestClassifierTransformer(SKLearnForestTransformer): - """ - Class for transforming RandomForestClassifier models into an ensemble model supported by Elasticsearch - """ - - def __init__( - self, - model: RandomForestClassifier, - feature_names: List[str], - classification_labels: Optional[List[str]] = None, - ): - super().__init__(model, feature_names, classification_labels) - - def build_aggregator_output(self) -> dict: - return {"weighted_mode": {"num_classes": len(self._model.classes_)}} - - def determine_target_type(self) -> str: - return "classification" - - -class XGBoostForestTransformer(ModelTransformer): - """ - Base class for transforming XGBoost models into ensemble models supported by Elasticsearch - - warning: do not use directly. Use a derived classes instead - """ - - def __init__( - self, - model: Booster, - feature_names: List[str], - base_score: float = 0.5, - objective: str = "reg:squarederror", - classification_labels: Optional[List[str]] = None, - classification_weights: Optional[List[float]] = None, - ): - super().__init__( - model, feature_names, classification_labels, classification_weights - ) - self._node_decision_type = "lt" - self._base_score = base_score - self._objective = objective - - def get_feature_id(self, feature_id: str) -> int: - if feature_id[0] == "f": - try: - return int(feature_id[1:]) - except ValueError: - raise RuntimeError(f"Unable to interpret '{feature_id}'") - else: - try: - return int(feature_id) - except ValueError: - raise RuntimeError(f"Unable to interpret '{feature_id}'") - - def extract_node_id(self, node_id: str, curr_tree: int) -> int: - t_id, n_id = node_id.split("-") - if t_id is None or n_id is None: - raise RuntimeError( - f"cannot determine node index or tree from '{node_id}' for tree {curr_tree}" - ) - try: - t_id = int(t_id) - n_id = int(n_id) - if t_id != curr_tree: - raise RuntimeError( - f"extracted tree id {t_id} does not match current tree {curr_tree}" - ) - return n_id - except ValueError: - raise RuntimeError( - f"cannot determine node index or tree from '{node_id}' for tree {curr_tree}" - ) - - def build_tree_node(self, row, curr_tree: int) -> TreeNode: - node_index = row["Node"] - if row["Feature"] == "Leaf": - return TreeNode(node_idx=node_index, leaf_value=float(row["Gain"])) - else: - return TreeNode( - node_idx=node_index, - decision_type=self._node_decision_type, - left_child=self.extract_node_id(row["Yes"], curr_tree), - right_child=self.extract_node_id(row["No"], curr_tree), - threshold=float(row["Split"]), - split_feature=self.get_feature_id(row["Feature"]), - ) - - def build_tree(self, nodes: List[TreeNode]) -> Tree: - return Tree(feature_names=self._feature_names, tree_structure=nodes) - - def build_base_score_stump(self) -> Tree: - return Tree( - feature_names=self._feature_names, - tree_structure=[TreeNode(0, leaf_value=self._base_score)], - ) - - def build_forest(self) -> List[Tree]: - """ - This builds out the forest of trees as described by XGBoost into a format - supported by Elasticsearch - - :return: A list of Tree objects - """ - self.check_model_booster() - - tree_table = self._model.trees_to_dataframe() - transformed_trees = [] - curr_tree = None - tree_nodes = [] - for _, row in tree_table.iterrows(): - if row["Tree"] != curr_tree: - if len(tree_nodes) > 0: - transformed_trees.append(self.build_tree(tree_nodes)) - curr_tree = row["Tree"] - tree_nodes = [] - tree_nodes.append(self.build_tree_node(row, curr_tree)) - # add last tree - if len(tree_nodes) > 0: - transformed_trees.append(self.build_tree(tree_nodes)) - # We add this stump as XGBoost adds the base_score to the regression outputs - if self._objective.partition(":")[0] == "reg": - transformed_trees.append(self.build_base_score_stump()) - return transformed_trees - - def build_aggregator_output(self) -> dict: - raise NotImplementedError("build_aggregator_output must be implemented") - - def determine_target_type(self) -> str: - raise NotImplementedError("determine_target_type must be implemented") - - def is_objective_supported(self) -> bool: - return False - - def check_model_booster(self): - # xgboost v1 made booster default to 'None' meaning 'gbtree' - if self._model.booster not in {"dart", "gbtree", None}: - raise ValueError( - f"booster must exist and be of type 'dart' or " - f"'gbtree', was {self._model.booster!r}" - ) - - def transform(self) -> Ensemble: - self.check_model_booster() - - if not self.is_objective_supported(): - raise ValueError(f"Unsupported objective '{self._objective}'") - - forest = self.build_forest() - return Ensemble( - feature_names=self._feature_names, - trained_models=forest, - output_aggregator=self.build_aggregator_output(), - classification_labels=self._classification_labels, - classification_weights=self._classification_weights, - target_type=self.determine_target_type(), - ) - - -class XGBoostRegressorTransformer(XGBoostForestTransformer): - def __init__(self, model: XGBRegressor, feature_names: List[str]): - # XGBRegressor.base_score defaults to 0.5. - base_score = model.base_score - if base_score is None: - base_score = 0.5 - super().__init__( - model.get_booster(), feature_names, base_score, model.objective - ) - - def determine_target_type(self) -> str: - return "regression" - - def is_objective_supported(self) -> bool: - return self._objective in { - "reg:squarederror", - "reg:linear", - "reg:squaredlogerror", - "reg:logistic", - } - - def build_aggregator_output(self) -> dict: - return {"weighted_sum": {}} - - -class XGBoostClassifierTransformer(XGBoostForestTransformer): - def __init__( - self, - model: XGBClassifier, - feature_names: List[str], - classification_labels: Optional[List[str]] = None, - ): - super().__init__( - model.get_booster(), - feature_names, - model.base_score, - model.objective, - classification_labels, - ) - - def determine_target_type(self) -> str: - return "classification" - - def is_objective_supported(self) -> bool: - return self._objective in {"binary:logistic", "binary:hinge"} - - def build_aggregator_output(self) -> dict: - return {"logistic_regression": {}} diff --git a/eland/ml/imported_ml_model.py b/eland/ml/imported_ml_model.py index fc64381..137b655 100644 --- a/eland/ml/imported_ml_model.py +++ b/eland/ml/imported_ml_model.py @@ -6,29 +6,26 @@ from typing import Union, List, Optional, Tuple, TYPE_CHECKING, cast import numpy as np # type: ignore -from eland.common import es_version -from eland.ml._model_transformers import ( - SKLearnDecisionTreeTransformer, - SKLearnForestRegressorTransformer, - SKLearnForestClassifierTransformer, - XGBoostRegressorTransformer, - XGBoostClassifierTransformer, -) -from eland.ml._model_serializer import ModelSerializer -from eland.ml._optional import import_optional_dependency -from eland.ml.ml_model import MLModel - -sklearn = import_optional_dependency("sklearn") -xgboost = import_optional_dependency("xgboost") - -from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor # type: ignore -from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor # type: ignore -from xgboost import XGBRegressor, XGBClassifier # type: ignore +from .ml_model import MLModel +from .transformers import get_model_transformer +from ..common import es_version if TYPE_CHECKING: from elasticsearch import Elasticsearch # type: ignore # noqa: F401 + # Try importing each ML lib separately so mypy users don't have to + # have both installed to use type-checking. + try: + from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor # type: ignore # noqa: F401 + from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor # type: ignore # noqa: F401 + except ImportError: + pass + try: + from xgboost import XGBRegressor, XGBClassifier # type: ignore # noqa: F401 + except ImportError: + pass + class ImportedMLModel(MLModel): """ @@ -99,12 +96,12 @@ class ImportedMLModel(MLModel): es_client: Union[str, List[str], Tuple[str, ...], "Elasticsearch"], model_id: str, model: Union[ - DecisionTreeClassifier, - DecisionTreeRegressor, - RandomForestRegressor, - RandomForestClassifier, - XGBClassifier, - XGBRegressor, + "DecisionTreeClassifier", + "DecisionTreeRegressor", + "RandomForestRegressor", + "RandomForestClassifier", + "XGBClassifier", + "XGBRegressor", ], feature_names: List[str], classification_labels: Optional[List[str]] = None, @@ -114,42 +111,15 @@ class ImportedMLModel(MLModel): super().__init__(es_client, model_id) self._feature_names = feature_names - self._model_type = None - serializer: ModelSerializer # type def - # Transform model - if isinstance(model, DecisionTreeRegressor): - serializer = SKLearnDecisionTreeTransformer( - model, feature_names - ).transform() - self._model_type = MLModel.TYPE_REGRESSION - elif isinstance(model, DecisionTreeClassifier): - serializer = SKLearnDecisionTreeTransformer( - model, feature_names, classification_labels - ).transform() - self._model_type = MLModel.TYPE_CLASSIFICATION - elif isinstance(model, RandomForestRegressor): - serializer = SKLearnForestRegressorTransformer( - model, feature_names - ).transform() - self._model_type = MLModel.TYPE_REGRESSION - elif isinstance(model, RandomForestClassifier): - serializer = SKLearnForestClassifierTransformer( - model, feature_names, classification_labels - ).transform() - self._model_type = MLModel.TYPE_CLASSIFICATION - elif isinstance(model, XGBRegressor): - serializer = XGBoostRegressorTransformer(model, feature_names).transform() - self._model_type = MLModel.TYPE_REGRESSION - elif isinstance(model, XGBClassifier): - serializer = XGBoostClassifierTransformer( - model, feature_names, classification_labels - ).transform() - self._model_type = MLModel.TYPE_CLASSIFICATION - else: - raise NotImplementedError( - f"ML model of type {type(model)}, not currently implemented" - ) + transformer = get_model_transformer( + model, + feature_names=feature_names, + classification_labels=classification_labels, + classification_weights=classification_weights, + ) + self._model_type = transformer.model_type + serializer = transformer.transform() if overwrite: self.delete_model() diff --git a/eland/ml/transformers/__init__.py b/eland/ml/transformers/__init__.py new file mode 100644 index 0000000..0c56214 --- /dev/null +++ b/eland/ml/transformers/__init__.py @@ -0,0 +1,71 @@ +# Licensed to Elasticsearch B.V under one or more agreements. +# Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +# See the LICENSE file in the project root for more information + +import inspect +from typing import Any, Dict, Type +from .base import ModelTransformer + + +__all__ = ["get_model_transformer"] +_MODEL_TRANSFORMERS: Dict[type, Type[ModelTransformer]] = {} + + +def get_model_transformer(model: Any, **kwargs: Any) -> ModelTransformer: + """Creates a ModelTransformer for a given model or raises an exception if one is not available""" + for model_type, transformer in _MODEL_TRANSFORMERS.items(): + if isinstance(model, model_type): + # Filter out kwargs that aren't applicable to the specific 'ModelTransformer' + accepted_kwargs = { + param for param in inspect.signature(transformer.__init__).parameters + } + kwargs = {k: v for k, v in kwargs.items() if k in accepted_kwargs} + + return transformer(model, **kwargs) + + raise NotImplementedError( + f"ML model of type {type(model)}, not currently implemented" + ) + + +try: + from .sklearn import ( + SKLearnDecisionTreeTransformer, + SKLearnForestClassifierTransformer, + SKLearnForestRegressorTransformer, + SKLearnForestTransformer, + SKLearnTransformer, + _MODEL_TRANSFORMERS as _SKLEARN_MODEL_TRANSFORMERS, + ) + + __all__ += [ + "SKLearnDecisionTreeTransformer", + "SKLearnForestClassifierTransformer", + "SKLearnForestRegressorTransformer", + "SKLearnForestTransformer", + "SKLearnTransformer", + ] + _MODEL_TRANSFORMERS.update(_SKLEARN_MODEL_TRANSFORMERS) +except ImportError: + pass + +try: + from .xgboost import ( + XGBoostClassifierTransformer, + XGBClassifier, + XGBoostForestTransformer, + XGBoostRegressorTransformer, + XGBRegressor, + _MODEL_TRANSFORMERS as _XGBOOST_MODEL_TRANSFORMERS, + ) + + __all__ += [ + "XGBoostClassifierTransformer", + "XGBClassifier", + "XGBoostForestTransformer", + "XGBoostRegressorTransformer", + "XGBRegressor", + ] + _MODEL_TRANSFORMERS.update(_XGBOOST_MODEL_TRANSFORMERS) +except ImportError: + pass diff --git a/eland/ml/transformers/base.py b/eland/ml/transformers/base.py new file mode 100644 index 0000000..eb63c01 --- /dev/null +++ b/eland/ml/transformers/base.py @@ -0,0 +1,27 @@ +# Licensed to Elasticsearch B.V under one or more agreements. +# Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +# See the LICENSE file in the project root for more information + +from typing import Sequence, Optional, Any +from .._model_serializer import ModelSerializer + + +class ModelTransformer: + def __init__( + self, + model: Any, + feature_names: Sequence[str], + classification_labels: Optional[Sequence[str]] = None, + classification_weights: Optional[Sequence[float]] = None, + ): + self._feature_names = feature_names + self._model = model + self._classification_labels = classification_labels + self._classification_weights = classification_weights + + def transform(self) -> ModelSerializer: + raise NotImplementedError() + + @property + def model_type(self) -> str: + raise NotImplementedError() diff --git a/eland/ml/transformers/sklearn.py b/eland/ml/transformers/sklearn.py new file mode 100644 index 0000000..e61e45d --- /dev/null +++ b/eland/ml/transformers/sklearn.py @@ -0,0 +1,245 @@ +# Licensed to Elasticsearch B.V under one or more agreements. +# Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +# See the LICENSE file in the project root for more information + +import numpy as np # type: ignore +from typing import Optional, Sequence, Union, Dict, Any, Type, Tuple +from .base import ModelTransformer +from ..ml_model import MLModel +from .._optional import import_optional_dependency +from .._model_serializer import Ensemble, Tree, TreeNode + +import_optional_dependency("sklearn", on_version="warn") + +from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor # type: ignore +from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor # type: ignore +from sklearn.utils.validation import check_is_fitted # type: ignore + + +class SKLearnTransformer(ModelTransformer): + """ + Base class for SKLearn transformers. + warning: Should not use this class directly. Use derived classes instead + """ + + def __init__( + self, + model: Any, + feature_names: Sequence[str], + classification_labels: Optional[Sequence[str]] = None, + classification_weights: Optional[Sequence[float]] = None, + ): + """ + Base class for SKLearn transformations + + :param model: sklearn trained model + :param feature_names: The feature names for the model + :param classification_labels: Optional classification labels (if not encoded in the model) + :param classification_weights: Optional classification weights + """ + super().__init__( + model, feature_names, classification_labels, classification_weights + ) + self._node_decision_type = "lte" + + def build_tree_node( + self, + node_index: int, + node_data: Tuple[Union[int, float], ...], + value: np.ndarray, + ) -> TreeNode: + """ + This builds out a TreeNode class given the sklearn tree node definition. + + Node decision types are defaulted to "lte" to match the behavior of SKLearn + + :param node_index: The node index + :param node_data: Opaque node data contained in the sklearn tree state + :param value: Opaque node value (i.e. leaf/node values) from tree state + :return: TreeNode object + """ + if value.shape[0] != 1: + raise ValueError( + f"unexpected multiple values returned from leaf node '{node_index}'" + ) + if node_data[0] == -1: # is leaf node + if ( + value.shape[1] == 1 + ): # classification requires more than one value, so assume regression + leaf_value = float(value[0][0]) + else: + # the classification value, which is the index of the largest value + leaf_value = int(np.argmax(value)) + return TreeNode( + node_index, + decision_type=self._node_decision_type, + leaf_value=leaf_value, + ) + else: + return TreeNode( + node_index, + decision_type=self._node_decision_type, + left_child=int(node_data[0]), + right_child=int(node_data[1]), + split_feature=int(node_data[2]), + threshold=float(node_data[3]), + ) + + +class SKLearnDecisionTreeTransformer(SKLearnTransformer): + """ + class for transforming SKLearn decision tree models into Tree model formats supported by Elasticsearch. + """ + + def __init__( + self, + model: Union[DecisionTreeRegressor, DecisionTreeClassifier], + feature_names: Sequence[str], + classification_labels: Optional[Sequence[str]] = None, + ): + """ + Transforms a Decision Tree model (Regressor|Classifier) into a ES Supported Tree format + :param model: fitted decision tree model + :param feature_names: model feature names + :param classification_labels: Optional classification labels + """ + super().__init__(model, feature_names, classification_labels) + + def transform(self) -> Tree: + """ + Transform the provided model into an ES supported Tree object + :return: Tree object for ES storage and use + """ + target_type = ( + "regression" + if isinstance(self._model, DecisionTreeRegressor) + else "classification" + ) + check_is_fitted(self._model, ["tree_"]) + tree_classes = None + if self._classification_labels: + tree_classes = self._classification_labels + if isinstance(self._model, DecisionTreeClassifier): + check_is_fitted(self._model, ["classes_"]) + if tree_classes is None: + tree_classes = [str(c) for c in self._model.classes_] + nodes = [] + tree_state = self._model.tree_.__getstate__() + for i in range(len(tree_state["nodes"])): + nodes.append( + self.build_tree_node(i, tree_state["nodes"][i], tree_state["values"][i]) + ) + + return Tree(self._feature_names, target_type, nodes, tree_classes) + + @property + def model_type(self) -> str: + return ( + MLModel.TYPE_REGRESSION + if isinstance(self._model, DecisionTreeRegressor) + else MLModel.TYPE_CLASSIFICATION + ) + + +class SKLearnForestTransformer(SKLearnTransformer): + """ + Base class for transforming SKLearn forest models into Ensemble model formats supported by Elasticsearch. + + warning: do not use this class directly. Use a derived class instead + """ + + def __init__( + self, + model: Union[RandomForestClassifier, RandomForestRegressor], + feature_names: Sequence[str], + classification_labels: Optional[Sequence[str]] = None, + classification_weights: Optional[Sequence[float]] = None, + ): + super().__init__( + model, feature_names, classification_labels, classification_weights + ) + + def build_aggregator_output(self) -> Dict[str, Any]: + raise NotImplementedError("build_aggregator_output must be implemented") + + def determine_target_type(self) -> str: + raise NotImplementedError("determine_target_type must be implemented") + + def transform(self) -> Ensemble: + check_is_fitted(self._model, ["estimators_"]) + estimators = self._model.estimators_ + ensemble_classes = None + if self._classification_labels: + ensemble_classes = self._classification_labels + if isinstance(self._model, RandomForestClassifier): + check_is_fitted(self._model, ["classes_"]) + if ensemble_classes is None: + ensemble_classes = [str(c) for c in self._model.classes_] + ensemble_models: Sequence[Tree] = [ + SKLearnDecisionTreeTransformer(m, self._feature_names).transform() + for m in estimators + ] + return Ensemble( + self._feature_names, + ensemble_models, + self.build_aggregator_output(), + target_type=self.determine_target_type(), + classification_labels=ensemble_classes, + classification_weights=self._classification_weights, + ) + + +class SKLearnForestRegressorTransformer(SKLearnForestTransformer): + """ + Class for transforming RandomForestRegressor models into an ensemble model supported by Elasticsearch + """ + + def __init__(self, model: RandomForestRegressor, feature_names: Sequence[str]): + super().__init__(model, feature_names) + + def build_aggregator_output(self) -> Dict[str, Any]: + return { + "weighted_sum": { + "weights": [1.0 / len(self._model.estimators_)] + * len(self._model.estimators_), + } + } + + def determine_target_type(self) -> str: + return "regression" + + @property + def model_type(self) -> str: + return MLModel.TYPE_REGRESSION + + +class SKLearnForestClassifierTransformer(SKLearnForestTransformer): + """ + Class for transforming RandomForestClassifier models into an ensemble model supported by Elasticsearch + """ + + def __init__( + self, + model: RandomForestClassifier, + feature_names: Sequence[str], + classification_labels: Optional[Sequence[str]] = None, + ): + super().__init__(model, feature_names, classification_labels) + + def build_aggregator_output(self) -> Dict[str, Any]: + return {"weighted_mode": {"num_classes": len(self._model.classes_)}} + + def determine_target_type(self) -> str: + return "classification" + + @property + def model_type(self) -> str: + return MLModel.TYPE_CLASSIFICATION + + +_MODEL_TRANSFORMERS: Dict[type, Type[ModelTransformer]] = { + DecisionTreeRegressor: SKLearnDecisionTreeTransformer, + DecisionTreeClassifier: SKLearnDecisionTreeTransformer, + RandomForestRegressor: SKLearnForestRegressorTransformer, + RandomForestClassifier: SKLearnForestClassifierTransformer, +} diff --git a/eland/ml/transformers/xgboost.py b/eland/ml/transformers/xgboost.py new file mode 100644 index 0000000..3be0dcb --- /dev/null +++ b/eland/ml/transformers/xgboost.py @@ -0,0 +1,217 @@ +# Licensed to Elasticsearch B.V under one or more agreements. +# Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +# See the LICENSE file in the project root for more information + +from typing import Optional, List, Dict, Any, Type +from .base import ModelTransformer +import pandas as pd # type: ignore +from .._model_serializer import Ensemble, Tree, TreeNode +from ..ml_model import MLModel +from .._optional import import_optional_dependency + +import_optional_dependency("xgboost", on_version="warn") + +from xgboost import Booster, XGBRegressor, XGBClassifier # type: ignore + + +class XGBoostForestTransformer(ModelTransformer): + """ + Base class for transforming XGBoost models into ensemble models supported by Elasticsearch + + warning: do not use directly. Use a derived classes instead + """ + + def __init__( + self, + model: Booster, + feature_names: List[str], + base_score: float = 0.5, + objective: str = "reg:squarederror", + classification_labels: Optional[List[str]] = None, + classification_weights: Optional[List[float]] = None, + ): + super().__init__( + model, feature_names, classification_labels, classification_weights + ) + self._node_decision_type = "lt" + self._base_score = base_score + self._objective = objective + + def get_feature_id(self, feature_id: str) -> int: + if feature_id[0] == "f": + try: + return int(feature_id[1:]) + except ValueError: + raise RuntimeError(f"Unable to interpret '{feature_id}'") + else: + try: + return int(feature_id) + except ValueError: + raise RuntimeError(f"Unable to interpret '{feature_id}'") + + def extract_node_id(self, node_id: str, curr_tree: int) -> int: + t_id, n_id = node_id.split("-") + if t_id is None or n_id is None: + raise RuntimeError( + f"cannot determine node index or tree from '{node_id}' for tree {curr_tree}" + ) + try: + l_id = int(t_id) + r_id = int(n_id) + if l_id != curr_tree: + raise RuntimeError( + f"extracted tree id {l_id} does not match current tree {curr_tree}" + ) + return r_id + except ValueError: + raise RuntimeError( + f"cannot determine node index or tree from '{node_id}' for tree {curr_tree}" + ) + + def build_tree_node(self, row: pd.Series, curr_tree: int) -> TreeNode: + node_index = row["Node"] + if row["Feature"] == "Leaf": + return TreeNode(node_idx=node_index, leaf_value=float(row["Gain"])) + else: + return TreeNode( + node_idx=node_index, + decision_type=self._node_decision_type, + left_child=self.extract_node_id(row["Yes"], curr_tree), + right_child=self.extract_node_id(row["No"], curr_tree), + threshold=float(row["Split"]), + split_feature=self.get_feature_id(row["Feature"]), + ) + + def build_tree(self, nodes: List[TreeNode]) -> Tree: + return Tree(feature_names=self._feature_names, tree_structure=nodes) + + def build_base_score_stump(self) -> Tree: + return Tree( + feature_names=self._feature_names, + tree_structure=[TreeNode(0, leaf_value=self._base_score)], + ) + + def build_forest(self) -> List[Tree]: + """ + This builds out the forest of trees as described by XGBoost into a format + supported by Elasticsearch + + :return: A list of Tree objects + """ + self.check_model_booster() + + tree_table: pd.DataFrame = self._model.trees_to_dataframe() + transformed_trees = [] + curr_tree: Optional[Any] = None + tree_nodes: List[TreeNode] = [] + for _, row in tree_table.iterrows(): + if row["Tree"] != curr_tree: + if len(tree_nodes) > 0: + transformed_trees.append(self.build_tree(tree_nodes)) + curr_tree = row["Tree"] + tree_nodes = [] + tree_nodes.append(self.build_tree_node(row, curr_tree)) + # add last tree + if len(tree_nodes) > 0: + transformed_trees.append(self.build_tree(tree_nodes)) + # We add this stump as XGBoost adds the base_score to the regression outputs + if self._objective.partition(":")[0] == "reg": + transformed_trees.append(self.build_base_score_stump()) + return transformed_trees + + def build_aggregator_output(self) -> Dict[str, Any]: + raise NotImplementedError("build_aggregator_output must be implemented") + + def determine_target_type(self) -> str: + raise NotImplementedError("determine_target_type must be implemented") + + def is_objective_supported(self) -> bool: + return False + + def check_model_booster(self) -> None: + # xgboost v1 made booster default to 'None' meaning 'gbtree' + if self._model.booster not in {"dart", "gbtree", None}: + raise ValueError( + f"booster must exist and be of type 'dart' or " + f"'gbtree', was {self._model.booster!r}" + ) + + def transform(self) -> Ensemble: + self.check_model_booster() + + if not self.is_objective_supported(): + raise ValueError(f"Unsupported objective '{self._objective}'") + + forest = self.build_forest() + return Ensemble( + feature_names=self._feature_names, + trained_models=forest, + output_aggregator=self.build_aggregator_output(), + classification_labels=self._classification_labels, + classification_weights=self._classification_weights, + target_type=self.determine_target_type(), + ) + + +class XGBoostRegressorTransformer(XGBoostForestTransformer): + def __init__(self, model: XGBRegressor, feature_names: List[str]): + # XGBRegressor.base_score defaults to 0.5. + base_score = model.base_score + if base_score is None: + base_score = 0.5 + super().__init__( + model.get_booster(), feature_names, base_score, model.objective + ) + + def determine_target_type(self) -> str: + return "regression" + + def is_objective_supported(self) -> bool: + return self._objective in { + "reg:squarederror", + "reg:linear", + "reg:squaredlogerror", + "reg:logistic", + } + + def build_aggregator_output(self) -> Dict[str, Any]: + return {"weighted_sum": {}} + + @property + def model_type(self) -> str: + return MLModel.TYPE_REGRESSION + + +class XGBoostClassifierTransformer(XGBoostForestTransformer): + def __init__( + self, + model: XGBClassifier, + feature_names: List[str], + classification_labels: Optional[List[str]] = None, + ): + super().__init__( + model.get_booster(), + feature_names, + model.base_score, + model.objective, + classification_labels, + ) + + def determine_target_type(self) -> str: + return "classification" + + def is_objective_supported(self) -> bool: + return self._objective in {"binary:logistic", "binary:hinge"} + + def build_aggregator_output(self) -> Dict[str, Any]: + return {"logistic_regression": {}} + + @property + def model_type(self) -> str: + return MLModel.TYPE_CLASSIFICATION + + +_MODEL_TRANSFORMERS: Dict[type, Type[ModelTransformer]] = { + XGBRegressor: XGBoostRegressorTransformer, + XGBClassifier: XGBoostClassifierTransformer, +} diff --git a/eland/tasks.py b/eland/tasks.py index a1542fe..1dfcf7b 100644 --- a/eland/tasks.py +++ b/eland/tasks.py @@ -14,6 +14,7 @@ if TYPE_CHECKING: from .filter import BooleanFilter # noqa: F401 from .query_compiler import QueryCompiler # noqa: F401 from .operations import QueryParams # noqa: F401 + from .index import Index # noqa: F401 RESOLVED_TASK_TYPE = Tuple["QueryParams", List["PostProcessingAction"]] @@ -50,7 +51,7 @@ class Task(ABC): class SizeTask(Task): - def __init__(self, task_type: str, index, count: int): + def __init__(self, task_type: str, index: "Index", count: int): super().__init__(task_type) self._sort_field = index.sort_field self._count = min(len(index), count) @@ -62,7 +63,7 @@ class SizeTask(Task): class HeadTask(SizeTask): - def __init__(self, index, count: int): + def __init__(self, index: "Index", count: int): super().__init__("head", index, count) def __repr__(self) -> str: @@ -109,7 +110,7 @@ class HeadTask(SizeTask): class TailTask(SizeTask): - def __init__(self, index, count: int): + def __init__(self, index: "Index", count: int): super().__init__("tail", index, count) def resolve_task( @@ -172,7 +173,7 @@ class TailTask(SizeTask): class SampleTask(SizeTask): - def __init__(self, index, count: int, random_state: int): + def __init__(self, index: "Index", count: int, random_state: int): super().__init__("sample", index, count) self._random_state = random_state diff --git a/eland/tests/ml/test_imported_ml_model_pytest.py b/eland/tests/ml/test_imported_ml_model_pytest.py index 62acbc5..7dd14ac 100644 --- a/eland/tests/ml/test_imported_ml_model_pytest.py +++ b/eland/tests/ml/test_imported_ml_model_pytest.py @@ -2,17 +2,49 @@ # Elasticsearch B.V licenses this file to you under the Apache 2.0 License. # See the LICENSE file in the project root for more information +import pytest import numpy as np -from sklearn import datasets -from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier -from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor -from xgboost import XGBRegressor, XGBClassifier from eland.ml import ImportedMLModel from eland.tests import ES_TEST_CLIENT +try: + from sklearn import datasets + from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier + from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor + + HAS_SKLEARN = True +except ImportError: + HAS_SKLEARN = False + +try: + from xgboost import XGBRegressor, XGBClassifier + + HAS_XGBOOST = True +except ImportError: + HAS_XGBOOST = False + + +requires_sklearn = pytest.mark.skipif( + not HAS_SKLEARN, reason="This test requires 'scikit-learn' package to run" +) +requires_xgboost = pytest.mark.skipif( + not HAS_XGBOOST, reason="This test requires 'xgboost' package to run" +) +requires_no_ml_extras = pytest.mark.skipif( + HAS_SKLEARN or HAS_XGBOOST, + reason="This test requires 'scikit-learn' and 'xgboost' to not be installed", +) + + +@requires_no_ml_extras +def test_import_ml_model_when_dependencies_are_not_available(): + from eland.ml import MLModel, ImportedMLModel # noqa: F401 + + class TestImportedMLModel: + @requires_sklearn def test_decision_tree_classifier(self): # Train model training_data = datasets.make_classification(n_features=5) @@ -37,6 +69,7 @@ class TestImportedMLModel: # Clean up es_model.delete_model() + @requires_sklearn def test_decision_tree_regressor(self): # Train model training_data = datasets.make_regression(n_features=5) @@ -61,6 +94,7 @@ class TestImportedMLModel: # Clean up es_model.delete_model() + @requires_sklearn def test_random_forest_classifier(self): # Train model training_data = datasets.make_classification(n_features=5) @@ -85,6 +119,7 @@ class TestImportedMLModel: # Clean up es_model.delete_model() + @requires_sklearn def test_random_forest_regressor(self): # Train model training_data = datasets.make_regression(n_features=5) @@ -109,6 +144,7 @@ class TestImportedMLModel: # Clean up es_model.delete_model() + @requires_xgboost def test_xgb_classifier(self): # Train model training_data = datasets.make_classification(n_features=5) @@ -133,6 +169,7 @@ class TestImportedMLModel: # Clean up es_model.delete_model() + @requires_xgboost def test_xgb_regressor(self): # Train model training_data = datasets.make_regression(n_features=5) @@ -158,6 +195,7 @@ class TestImportedMLModel: # Clean up es_model.delete_model() + @requires_xgboost def test_predict_single_feature_vector(self): # Train model training_data = datasets.make_regression(n_features=1) diff --git a/noxfile.py b/noxfile.py index 31b3e7e..4dd47e9 100644 --- a/noxfile.py +++ b/noxfile.py @@ -29,8 +29,13 @@ TYPED_FILES = { "eland/index.py", "eland/query.py", "eland/tasks.py", + "eland/ml/__init__.py", "eland/ml/_model_serializer.py", "eland/ml/imported_ml_model.py", + "eland/ml/transformers/__init__.py", + "eland/ml/transformers/base.py", + "eland/ml/transformers/sklearn.py", + "eland/ml/transformers/xgboost.py", } @@ -76,6 +81,24 @@ def test(session): session.run("python", "-m", "eland.tests.setup_tests") session.run("pytest", "--doctest-modules", *(session.posargs or ("eland/",))) + session.run("python", "-m", "pip", "uninstall", "--yes", "scikit-learn", "xgboost") + session.run("pytest", "eland/tests/ml/") + + +@nox.session(python=["3.6", "3.7", "3.8"], name="test-ml-deps") +def test_ml_deps(session): + def session_uninstall(*deps): + session.run("python", "-m", "pip", "uninstall", "--yes", *deps) + + session.install("-r", "requirements-dev.txt") + session.run("python", "-m", "eland.tests.setup_tests") + + session_uninstall("xgboost", "scikit-learn") + session.run("pytest", *(session.posargs or ("eland/tests/ml/",))) + + session.install(".[scikit-learn]") + session.run("pytest", *(session.posargs or ("eland/tests/ml/",))) + @nox.session(reuse_venv=True) def docs(session):