Make ML libraries optional, fix type issues

This commit is contained in:
Seth Michael Larson 2020-05-14 09:31:01 -05:00 committed by GitHub
parent bfd0ee6f90
commit d2047aa51a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 672 additions and 515 deletions

View File

@ -35,4 +35,4 @@ docker run \
--name eland-test-runner \
--rm \
elastic/eland \
nox -s test-${PYTHON_VERSION}
nox -s lint test-${PYTHON_VERSION}

View File

@ -178,7 +178,7 @@ class MatchAllFilter(QueryFilter):
class RandomScoreFilter(QueryFilter):
def __init__(self, query: QueryFilter, random_state: int) -> None:
def __init__(self, query: BooleanFilter, random_state: int) -> None:
q = MatchAllFilter() if query.empty() else query
seed = {}

View File

@ -6,7 +6,7 @@ import base64
import gzip
import json
from abc import ABC
from typing import List, Dict, Any, Optional
from typing import Sequence, Dict, Any, Optional
def add_if_exists(d: Dict[str, Any], k: str, v: Any) -> None:
@ -17,9 +17,9 @@ def add_if_exists(d: Dict[str, Any], k: str, v: Any) -> None:
class ModelSerializer(ABC):
def __init__(
self,
feature_names: List[str],
feature_names: Sequence[str],
target_type: Optional[str] = None,
classification_labels: Optional[List[str]] = None,
classification_labels: Optional[Sequence[str]] = None,
):
self._target_type = target_type
self._feature_names = feature_names
@ -33,7 +33,7 @@ class ModelSerializer(ABC):
return d
@property
def feature_names(self) -> List[str]:
def feature_names(self) -> Sequence[str]:
return self._feature_names
def serialize_model(self) -> Dict[str, Any]:
@ -84,10 +84,10 @@ class TreeNode:
class Tree(ModelSerializer):
def __init__(
self,
feature_names: List[str],
feature_names: Sequence[str],
target_type: Optional[str] = None,
tree_structure: Optional[List[TreeNode]] = None,
classification_labels: Optional[List[str]] = None,
tree_structure: Optional[Sequence[TreeNode]] = None,
classification_labels: Optional[Sequence[str]] = None,
):
super().__init__(
feature_names=feature_names,
@ -107,12 +107,12 @@ class Tree(ModelSerializer):
class Ensemble(ModelSerializer):
def __init__(
self,
feature_names: List[str],
trained_models: List[ModelSerializer],
feature_names: Sequence[str],
trained_models: Sequence[ModelSerializer],
output_aggregator: Dict[str, Any],
target_type: Optional[str] = None,
classification_labels: Optional[List[str]] = None,
classification_weights: Optional[List[float]] = None,
classification_labels: Optional[Sequence[str]] = None,
classification_weights: Optional[Sequence[float]] = None,
):
super().__init__(
feature_names=feature_names,

View File

@ -1,435 +0,0 @@
# Licensed to Elasticsearch B.V under one or more agreements.
# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
# See the LICENSE file in the project root for more information
from typing import List, Union, Optional
import numpy as np
from eland.ml._optional import import_optional_dependency
from eland.ml._model_serializer import Tree, TreeNode, Ensemble
sklearn = import_optional_dependency("sklearn")
xgboost = import_optional_dependency("xgboost")
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.utils.validation import check_is_fitted
from xgboost import Booster, XGBRegressor, XGBClassifier
class ModelTransformer:
def __init__(
self,
model,
feature_names: List[str],
classification_labels: Optional[List[str]] = None,
classification_weights: Optional[List[float]] = None,
):
self._feature_names = feature_names
self._model = model
self._classification_labels = classification_labels
self._classification_weights = classification_weights
def is_supported(self):
return isinstance(
self._model,
(
DecisionTreeClassifier,
DecisionTreeRegressor,
RandomForestRegressor,
RandomForestClassifier,
XGBClassifier,
XGBRegressor,
Booster,
),
)
class SKLearnTransformer(ModelTransformer):
"""
Base class for SKLearn transformers.
warning: Should not use this class directly. Use derived classes instead
"""
def __init__(
self,
model,
feature_names: List[str],
classification_labels: Optional[List[str]] = None,
classification_weights: Optional[List[float]] = None,
):
"""
Base class for SKLearn transformations
:param model: sklearn trained model
:param feature_names: The feature names for the model
:param classification_labels: Optional classification labels (if not encoded in the model)
:param classification_weights: Optional classification weights
"""
super().__init__(
model, feature_names, classification_labels, classification_weights
)
self._node_decision_type = "lte"
def build_tree_node(self, node_index: int, node_data: dict, value) -> TreeNode:
"""
This builds out a TreeNode class given the sklearn tree node definition.
Node decision types are defaulted to "lte" to match the behavior of SKLearn
:param node_index: The node index
:param node_data: Opaque node data contained in the sklearn tree state
:param value: Opaque node value (i.e. leaf/node values) from tree state
:return: TreeNode object
"""
if value.shape[0] != 1:
raise ValueError(
f"unexpected multiple values returned from leaf node '{node_index}'"
)
if node_data[0] == -1: # is leaf node
if (
value.shape[1] == 1
): # classification requires more than one value, so assume regression
leaf_value = float(value[0][0])
else:
# the classification value, which is the index of the largest value
leaf_value = int(np.argmax(value))
return TreeNode(
node_index,
decision_type=self._node_decision_type,
leaf_value=leaf_value,
)
else:
return TreeNode(
node_index,
decision_type=self._node_decision_type,
left_child=int(node_data[0]),
right_child=int(node_data[1]),
split_feature=int(node_data[2]),
threshold=float(node_data[3]),
)
class SKLearnDecisionTreeTransformer(SKLearnTransformer):
"""
class for transforming SKLearn decision tree models into Tree model formats supported by Elasticsearch.
"""
def __init__(
self,
model: Union[DecisionTreeRegressor, DecisionTreeClassifier],
feature_names: List[str],
classification_labels: Optional[List[str]] = None,
):
"""
Transforms a Decision Tree model (Regressor|Classifier) into a ES Supported Tree format
:param model: fitted decision tree model
:param feature_names: model feature names
:param classification_labels: Optional classification labels
"""
super().__init__(model, feature_names, classification_labels)
def transform(self) -> Tree:
"""
Transform the provided model into an ES supported Tree object
:return: Tree object for ES storage and use
"""
target_type = (
"regression"
if isinstance(self._model, DecisionTreeRegressor)
else "classification"
)
check_is_fitted(self._model, ["tree_"])
tree_classes = None
if self._classification_labels:
tree_classes = self._classification_labels
if isinstance(self._model, DecisionTreeClassifier):
check_is_fitted(self._model, ["classes_"])
if tree_classes is None:
tree_classes = [str(c) for c in self._model.classes_]
nodes = []
tree_state = self._model.tree_.__getstate__()
for i in range(len(tree_state["nodes"])):
nodes.append(
self.build_tree_node(i, tree_state["nodes"][i], tree_state["values"][i])
)
return Tree(self._feature_names, target_type, nodes, tree_classes)
class SKLearnForestTransformer(SKLearnTransformer):
"""
Base class for transforming SKLearn forest models into Ensemble model formats supported by Elasticsearch.
warning: do not use this class directly. Use a derived class instead
"""
def __init__(
self,
model: Union[RandomForestClassifier, RandomForestRegressor],
feature_names: List[str],
classification_labels: Optional[List[str]] = None,
classification_weights: Optional[List[float]] = None,
):
super().__init__(
model, feature_names, classification_labels, classification_weights
)
def build_aggregator_output(self) -> dict:
raise NotImplementedError("build_aggregator_output must be implemented")
def determine_target_type(self) -> str:
raise NotImplementedError("determine_target_type must be implemented")
def transform(self) -> Ensemble:
check_is_fitted(self._model, ["estimators_"])
estimators = self._model.estimators_
ensemble_classes = None
if self._classification_labels:
ensemble_classes = self._classification_labels
if isinstance(self._model, RandomForestClassifier):
check_is_fitted(self._model, ["classes_"])
if ensemble_classes is None:
ensemble_classes = [str(c) for c in self._model.classes_]
ensemble_models = [
SKLearnDecisionTreeTransformer(m, self._feature_names).transform()
for m in estimators
]
return Ensemble(
self._feature_names,
ensemble_models,
self.build_aggregator_output(),
target_type=self.determine_target_type(),
classification_labels=ensemble_classes,
classification_weights=self._classification_weights,
)
class SKLearnForestRegressorTransformer(SKLearnForestTransformer):
"""
Class for transforming RandomForestRegressor models into an ensemble model supported by Elasticsearch
"""
def __init__(self, model: RandomForestRegressor, feature_names: List[str]):
super().__init__(model, feature_names)
def build_aggregator_output(self) -> dict:
return {
"weighted_sum": {
"weights": [1.0 / len(self._model.estimators_)]
* len(self._model.estimators_),
}
}
def determine_target_type(self) -> str:
return "regression"
class SKLearnForestClassifierTransformer(SKLearnForestTransformer):
"""
Class for transforming RandomForestClassifier models into an ensemble model supported by Elasticsearch
"""
def __init__(
self,
model: RandomForestClassifier,
feature_names: List[str],
classification_labels: Optional[List[str]] = None,
):
super().__init__(model, feature_names, classification_labels)
def build_aggregator_output(self) -> dict:
return {"weighted_mode": {"num_classes": len(self._model.classes_)}}
def determine_target_type(self) -> str:
return "classification"
class XGBoostForestTransformer(ModelTransformer):
"""
Base class for transforming XGBoost models into ensemble models supported by Elasticsearch
warning: do not use directly. Use a derived classes instead
"""
def __init__(
self,
model: Booster,
feature_names: List[str],
base_score: float = 0.5,
objective: str = "reg:squarederror",
classification_labels: Optional[List[str]] = None,
classification_weights: Optional[List[float]] = None,
):
super().__init__(
model, feature_names, classification_labels, classification_weights
)
self._node_decision_type = "lt"
self._base_score = base_score
self._objective = objective
def get_feature_id(self, feature_id: str) -> int:
if feature_id[0] == "f":
try:
return int(feature_id[1:])
except ValueError:
raise RuntimeError(f"Unable to interpret '{feature_id}'")
else:
try:
return int(feature_id)
except ValueError:
raise RuntimeError(f"Unable to interpret '{feature_id}'")
def extract_node_id(self, node_id: str, curr_tree: int) -> int:
t_id, n_id = node_id.split("-")
if t_id is None or n_id is None:
raise RuntimeError(
f"cannot determine node index or tree from '{node_id}' for tree {curr_tree}"
)
try:
t_id = int(t_id)
n_id = int(n_id)
if t_id != curr_tree:
raise RuntimeError(
f"extracted tree id {t_id} does not match current tree {curr_tree}"
)
return n_id
except ValueError:
raise RuntimeError(
f"cannot determine node index or tree from '{node_id}' for tree {curr_tree}"
)
def build_tree_node(self, row, curr_tree: int) -> TreeNode:
node_index = row["Node"]
if row["Feature"] == "Leaf":
return TreeNode(node_idx=node_index, leaf_value=float(row["Gain"]))
else:
return TreeNode(
node_idx=node_index,
decision_type=self._node_decision_type,
left_child=self.extract_node_id(row["Yes"], curr_tree),
right_child=self.extract_node_id(row["No"], curr_tree),
threshold=float(row["Split"]),
split_feature=self.get_feature_id(row["Feature"]),
)
def build_tree(self, nodes: List[TreeNode]) -> Tree:
return Tree(feature_names=self._feature_names, tree_structure=nodes)
def build_base_score_stump(self) -> Tree:
return Tree(
feature_names=self._feature_names,
tree_structure=[TreeNode(0, leaf_value=self._base_score)],
)
def build_forest(self) -> List[Tree]:
"""
This builds out the forest of trees as described by XGBoost into a format
supported by Elasticsearch
:return: A list of Tree objects
"""
self.check_model_booster()
tree_table = self._model.trees_to_dataframe()
transformed_trees = []
curr_tree = None
tree_nodes = []
for _, row in tree_table.iterrows():
if row["Tree"] != curr_tree:
if len(tree_nodes) > 0:
transformed_trees.append(self.build_tree(tree_nodes))
curr_tree = row["Tree"]
tree_nodes = []
tree_nodes.append(self.build_tree_node(row, curr_tree))
# add last tree
if len(tree_nodes) > 0:
transformed_trees.append(self.build_tree(tree_nodes))
# We add this stump as XGBoost adds the base_score to the regression outputs
if self._objective.partition(":")[0] == "reg":
transformed_trees.append(self.build_base_score_stump())
return transformed_trees
def build_aggregator_output(self) -> dict:
raise NotImplementedError("build_aggregator_output must be implemented")
def determine_target_type(self) -> str:
raise NotImplementedError("determine_target_type must be implemented")
def is_objective_supported(self) -> bool:
return False
def check_model_booster(self):
# xgboost v1 made booster default to 'None' meaning 'gbtree'
if self._model.booster not in {"dart", "gbtree", None}:
raise ValueError(
f"booster must exist and be of type 'dart' or "
f"'gbtree', was {self._model.booster!r}"
)
def transform(self) -> Ensemble:
self.check_model_booster()
if not self.is_objective_supported():
raise ValueError(f"Unsupported objective '{self._objective}'")
forest = self.build_forest()
return Ensemble(
feature_names=self._feature_names,
trained_models=forest,
output_aggregator=self.build_aggregator_output(),
classification_labels=self._classification_labels,
classification_weights=self._classification_weights,
target_type=self.determine_target_type(),
)
class XGBoostRegressorTransformer(XGBoostForestTransformer):
def __init__(self, model: XGBRegressor, feature_names: List[str]):
# XGBRegressor.base_score defaults to 0.5.
base_score = model.base_score
if base_score is None:
base_score = 0.5
super().__init__(
model.get_booster(), feature_names, base_score, model.objective
)
def determine_target_type(self) -> str:
return "regression"
def is_objective_supported(self) -> bool:
return self._objective in {
"reg:squarederror",
"reg:linear",
"reg:squaredlogerror",
"reg:logistic",
}
def build_aggregator_output(self) -> dict:
return {"weighted_sum": {}}
class XGBoostClassifierTransformer(XGBoostForestTransformer):
def __init__(
self,
model: XGBClassifier,
feature_names: List[str],
classification_labels: Optional[List[str]] = None,
):
super().__init__(
model.get_booster(),
feature_names,
model.base_score,
model.objective,
classification_labels,
)
def determine_target_type(self) -> str:
return "classification"
def is_objective_supported(self) -> bool:
return self._objective in {"binary:logistic", "binary:hinge"}
def build_aggregator_output(self) -> dict:
return {"logistic_regression": {}}

View File

@ -6,29 +6,26 @@ from typing import Union, List, Optional, Tuple, TYPE_CHECKING, cast
import numpy as np # type: ignore
from eland.common import es_version
from eland.ml._model_transformers import (
SKLearnDecisionTreeTransformer,
SKLearnForestRegressorTransformer,
SKLearnForestClassifierTransformer,
XGBoostRegressorTransformer,
XGBoostClassifierTransformer,
)
from eland.ml._model_serializer import ModelSerializer
from eland.ml._optional import import_optional_dependency
from eland.ml.ml_model import MLModel
sklearn = import_optional_dependency("sklearn")
xgboost = import_optional_dependency("xgboost")
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor # type: ignore
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor # type: ignore
from xgboost import XGBRegressor, XGBClassifier # type: ignore
from .ml_model import MLModel
from .transformers import get_model_transformer
from ..common import es_version
if TYPE_CHECKING:
from elasticsearch import Elasticsearch # type: ignore # noqa: F401
# Try importing each ML lib separately so mypy users don't have to
# have both installed to use type-checking.
try:
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor # type: ignore # noqa: F401
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor # type: ignore # noqa: F401
except ImportError:
pass
try:
from xgboost import XGBRegressor, XGBClassifier # type: ignore # noqa: F401
except ImportError:
pass
class ImportedMLModel(MLModel):
"""
@ -99,12 +96,12 @@ class ImportedMLModel(MLModel):
es_client: Union[str, List[str], Tuple[str, ...], "Elasticsearch"],
model_id: str,
model: Union[
DecisionTreeClassifier,
DecisionTreeRegressor,
RandomForestRegressor,
RandomForestClassifier,
XGBClassifier,
XGBRegressor,
"DecisionTreeClassifier",
"DecisionTreeRegressor",
"RandomForestRegressor",
"RandomForestClassifier",
"XGBClassifier",
"XGBRegressor",
],
feature_names: List[str],
classification_labels: Optional[List[str]] = None,
@ -114,42 +111,15 @@ class ImportedMLModel(MLModel):
super().__init__(es_client, model_id)
self._feature_names = feature_names
self._model_type = None
serializer: ModelSerializer # type def
# Transform model
if isinstance(model, DecisionTreeRegressor):
serializer = SKLearnDecisionTreeTransformer(
model, feature_names
).transform()
self._model_type = MLModel.TYPE_REGRESSION
elif isinstance(model, DecisionTreeClassifier):
serializer = SKLearnDecisionTreeTransformer(
model, feature_names, classification_labels
).transform()
self._model_type = MLModel.TYPE_CLASSIFICATION
elif isinstance(model, RandomForestRegressor):
serializer = SKLearnForestRegressorTransformer(
model, feature_names
).transform()
self._model_type = MLModel.TYPE_REGRESSION
elif isinstance(model, RandomForestClassifier):
serializer = SKLearnForestClassifierTransformer(
model, feature_names, classification_labels
).transform()
self._model_type = MLModel.TYPE_CLASSIFICATION
elif isinstance(model, XGBRegressor):
serializer = XGBoostRegressorTransformer(model, feature_names).transform()
self._model_type = MLModel.TYPE_REGRESSION
elif isinstance(model, XGBClassifier):
serializer = XGBoostClassifierTransformer(
model, feature_names, classification_labels
).transform()
self._model_type = MLModel.TYPE_CLASSIFICATION
else:
raise NotImplementedError(
f"ML model of type {type(model)}, not currently implemented"
)
transformer = get_model_transformer(
model,
feature_names=feature_names,
classification_labels=classification_labels,
classification_weights=classification_weights,
)
self._model_type = transformer.model_type
serializer = transformer.transform()
if overwrite:
self.delete_model()

View File

@ -0,0 +1,71 @@
# Licensed to Elasticsearch B.V under one or more agreements.
# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
# See the LICENSE file in the project root for more information
import inspect
from typing import Any, Dict, Type
from .base import ModelTransformer
__all__ = ["get_model_transformer"]
_MODEL_TRANSFORMERS: Dict[type, Type[ModelTransformer]] = {}
def get_model_transformer(model: Any, **kwargs: Any) -> ModelTransformer:
"""Creates a ModelTransformer for a given model or raises an exception if one is not available"""
for model_type, transformer in _MODEL_TRANSFORMERS.items():
if isinstance(model, model_type):
# Filter out kwargs that aren't applicable to the specific 'ModelTransformer'
accepted_kwargs = {
param for param in inspect.signature(transformer.__init__).parameters
}
kwargs = {k: v for k, v in kwargs.items() if k in accepted_kwargs}
return transformer(model, **kwargs)
raise NotImplementedError(
f"ML model of type {type(model)}, not currently implemented"
)
try:
from .sklearn import (
SKLearnDecisionTreeTransformer,
SKLearnForestClassifierTransformer,
SKLearnForestRegressorTransformer,
SKLearnForestTransformer,
SKLearnTransformer,
_MODEL_TRANSFORMERS as _SKLEARN_MODEL_TRANSFORMERS,
)
__all__ += [
"SKLearnDecisionTreeTransformer",
"SKLearnForestClassifierTransformer",
"SKLearnForestRegressorTransformer",
"SKLearnForestTransformer",
"SKLearnTransformer",
]
_MODEL_TRANSFORMERS.update(_SKLEARN_MODEL_TRANSFORMERS)
except ImportError:
pass
try:
from .xgboost import (
XGBoostClassifierTransformer,
XGBClassifier,
XGBoostForestTransformer,
XGBoostRegressorTransformer,
XGBRegressor,
_MODEL_TRANSFORMERS as _XGBOOST_MODEL_TRANSFORMERS,
)
__all__ += [
"XGBoostClassifierTransformer",
"XGBClassifier",
"XGBoostForestTransformer",
"XGBoostRegressorTransformer",
"XGBRegressor",
]
_MODEL_TRANSFORMERS.update(_XGBOOST_MODEL_TRANSFORMERS)
except ImportError:
pass

View File

@ -0,0 +1,27 @@
# Licensed to Elasticsearch B.V under one or more agreements.
# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
# See the LICENSE file in the project root for more information
from typing import Sequence, Optional, Any
from .._model_serializer import ModelSerializer
class ModelTransformer:
def __init__(
self,
model: Any,
feature_names: Sequence[str],
classification_labels: Optional[Sequence[str]] = None,
classification_weights: Optional[Sequence[float]] = None,
):
self._feature_names = feature_names
self._model = model
self._classification_labels = classification_labels
self._classification_weights = classification_weights
def transform(self) -> ModelSerializer:
raise NotImplementedError()
@property
def model_type(self) -> str:
raise NotImplementedError()

View File

@ -0,0 +1,245 @@
# Licensed to Elasticsearch B.V under one or more agreements.
# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
# See the LICENSE file in the project root for more information
import numpy as np # type: ignore
from typing import Optional, Sequence, Union, Dict, Any, Type, Tuple
from .base import ModelTransformer
from ..ml_model import MLModel
from .._optional import import_optional_dependency
from .._model_serializer import Ensemble, Tree, TreeNode
import_optional_dependency("sklearn", on_version="warn")
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor # type: ignore
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor # type: ignore
from sklearn.utils.validation import check_is_fitted # type: ignore
class SKLearnTransformer(ModelTransformer):
"""
Base class for SKLearn transformers.
warning: Should not use this class directly. Use derived classes instead
"""
def __init__(
self,
model: Any,
feature_names: Sequence[str],
classification_labels: Optional[Sequence[str]] = None,
classification_weights: Optional[Sequence[float]] = None,
):
"""
Base class for SKLearn transformations
:param model: sklearn trained model
:param feature_names: The feature names for the model
:param classification_labels: Optional classification labels (if not encoded in the model)
:param classification_weights: Optional classification weights
"""
super().__init__(
model, feature_names, classification_labels, classification_weights
)
self._node_decision_type = "lte"
def build_tree_node(
self,
node_index: int,
node_data: Tuple[Union[int, float], ...],
value: np.ndarray,
) -> TreeNode:
"""
This builds out a TreeNode class given the sklearn tree node definition.
Node decision types are defaulted to "lte" to match the behavior of SKLearn
:param node_index: The node index
:param node_data: Opaque node data contained in the sklearn tree state
:param value: Opaque node value (i.e. leaf/node values) from tree state
:return: TreeNode object
"""
if value.shape[0] != 1:
raise ValueError(
f"unexpected multiple values returned from leaf node '{node_index}'"
)
if node_data[0] == -1: # is leaf node
if (
value.shape[1] == 1
): # classification requires more than one value, so assume regression
leaf_value = float(value[0][0])
else:
# the classification value, which is the index of the largest value
leaf_value = int(np.argmax(value))
return TreeNode(
node_index,
decision_type=self._node_decision_type,
leaf_value=leaf_value,
)
else:
return TreeNode(
node_index,
decision_type=self._node_decision_type,
left_child=int(node_data[0]),
right_child=int(node_data[1]),
split_feature=int(node_data[2]),
threshold=float(node_data[3]),
)
class SKLearnDecisionTreeTransformer(SKLearnTransformer):
"""
class for transforming SKLearn decision tree models into Tree model formats supported by Elasticsearch.
"""
def __init__(
self,
model: Union[DecisionTreeRegressor, DecisionTreeClassifier],
feature_names: Sequence[str],
classification_labels: Optional[Sequence[str]] = None,
):
"""
Transforms a Decision Tree model (Regressor|Classifier) into a ES Supported Tree format
:param model: fitted decision tree model
:param feature_names: model feature names
:param classification_labels: Optional classification labels
"""
super().__init__(model, feature_names, classification_labels)
def transform(self) -> Tree:
"""
Transform the provided model into an ES supported Tree object
:return: Tree object for ES storage and use
"""
target_type = (
"regression"
if isinstance(self._model, DecisionTreeRegressor)
else "classification"
)
check_is_fitted(self._model, ["tree_"])
tree_classes = None
if self._classification_labels:
tree_classes = self._classification_labels
if isinstance(self._model, DecisionTreeClassifier):
check_is_fitted(self._model, ["classes_"])
if tree_classes is None:
tree_classes = [str(c) for c in self._model.classes_]
nodes = []
tree_state = self._model.tree_.__getstate__()
for i in range(len(tree_state["nodes"])):
nodes.append(
self.build_tree_node(i, tree_state["nodes"][i], tree_state["values"][i])
)
return Tree(self._feature_names, target_type, nodes, tree_classes)
@property
def model_type(self) -> str:
return (
MLModel.TYPE_REGRESSION
if isinstance(self._model, DecisionTreeRegressor)
else MLModel.TYPE_CLASSIFICATION
)
class SKLearnForestTransformer(SKLearnTransformer):
"""
Base class for transforming SKLearn forest models into Ensemble model formats supported by Elasticsearch.
warning: do not use this class directly. Use a derived class instead
"""
def __init__(
self,
model: Union[RandomForestClassifier, RandomForestRegressor],
feature_names: Sequence[str],
classification_labels: Optional[Sequence[str]] = None,
classification_weights: Optional[Sequence[float]] = None,
):
super().__init__(
model, feature_names, classification_labels, classification_weights
)
def build_aggregator_output(self) -> Dict[str, Any]:
raise NotImplementedError("build_aggregator_output must be implemented")
def determine_target_type(self) -> str:
raise NotImplementedError("determine_target_type must be implemented")
def transform(self) -> Ensemble:
check_is_fitted(self._model, ["estimators_"])
estimators = self._model.estimators_
ensemble_classes = None
if self._classification_labels:
ensemble_classes = self._classification_labels
if isinstance(self._model, RandomForestClassifier):
check_is_fitted(self._model, ["classes_"])
if ensemble_classes is None:
ensemble_classes = [str(c) for c in self._model.classes_]
ensemble_models: Sequence[Tree] = [
SKLearnDecisionTreeTransformer(m, self._feature_names).transform()
for m in estimators
]
return Ensemble(
self._feature_names,
ensemble_models,
self.build_aggregator_output(),
target_type=self.determine_target_type(),
classification_labels=ensemble_classes,
classification_weights=self._classification_weights,
)
class SKLearnForestRegressorTransformer(SKLearnForestTransformer):
"""
Class for transforming RandomForestRegressor models into an ensemble model supported by Elasticsearch
"""
def __init__(self, model: RandomForestRegressor, feature_names: Sequence[str]):
super().__init__(model, feature_names)
def build_aggregator_output(self) -> Dict[str, Any]:
return {
"weighted_sum": {
"weights": [1.0 / len(self._model.estimators_)]
* len(self._model.estimators_),
}
}
def determine_target_type(self) -> str:
return "regression"
@property
def model_type(self) -> str:
return MLModel.TYPE_REGRESSION
class SKLearnForestClassifierTransformer(SKLearnForestTransformer):
"""
Class for transforming RandomForestClassifier models into an ensemble model supported by Elasticsearch
"""
def __init__(
self,
model: RandomForestClassifier,
feature_names: Sequence[str],
classification_labels: Optional[Sequence[str]] = None,
):
super().__init__(model, feature_names, classification_labels)
def build_aggregator_output(self) -> Dict[str, Any]:
return {"weighted_mode": {"num_classes": len(self._model.classes_)}}
def determine_target_type(self) -> str:
return "classification"
@property
def model_type(self) -> str:
return MLModel.TYPE_CLASSIFICATION
_MODEL_TRANSFORMERS: Dict[type, Type[ModelTransformer]] = {
DecisionTreeRegressor: SKLearnDecisionTreeTransformer,
DecisionTreeClassifier: SKLearnDecisionTreeTransformer,
RandomForestRegressor: SKLearnForestRegressorTransformer,
RandomForestClassifier: SKLearnForestClassifierTransformer,
}

View File

@ -0,0 +1,217 @@
# Licensed to Elasticsearch B.V under one or more agreements.
# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
# See the LICENSE file in the project root for more information
from typing import Optional, List, Dict, Any, Type
from .base import ModelTransformer
import pandas as pd # type: ignore
from .._model_serializer import Ensemble, Tree, TreeNode
from ..ml_model import MLModel
from .._optional import import_optional_dependency
import_optional_dependency("xgboost", on_version="warn")
from xgboost import Booster, XGBRegressor, XGBClassifier # type: ignore
class XGBoostForestTransformer(ModelTransformer):
"""
Base class for transforming XGBoost models into ensemble models supported by Elasticsearch
warning: do not use directly. Use a derived classes instead
"""
def __init__(
self,
model: Booster,
feature_names: List[str],
base_score: float = 0.5,
objective: str = "reg:squarederror",
classification_labels: Optional[List[str]] = None,
classification_weights: Optional[List[float]] = None,
):
super().__init__(
model, feature_names, classification_labels, classification_weights
)
self._node_decision_type = "lt"
self._base_score = base_score
self._objective = objective
def get_feature_id(self, feature_id: str) -> int:
if feature_id[0] == "f":
try:
return int(feature_id[1:])
except ValueError:
raise RuntimeError(f"Unable to interpret '{feature_id}'")
else:
try:
return int(feature_id)
except ValueError:
raise RuntimeError(f"Unable to interpret '{feature_id}'")
def extract_node_id(self, node_id: str, curr_tree: int) -> int:
t_id, n_id = node_id.split("-")
if t_id is None or n_id is None:
raise RuntimeError(
f"cannot determine node index or tree from '{node_id}' for tree {curr_tree}"
)
try:
l_id = int(t_id)
r_id = int(n_id)
if l_id != curr_tree:
raise RuntimeError(
f"extracted tree id {l_id} does not match current tree {curr_tree}"
)
return r_id
except ValueError:
raise RuntimeError(
f"cannot determine node index or tree from '{node_id}' for tree {curr_tree}"
)
def build_tree_node(self, row: pd.Series, curr_tree: int) -> TreeNode:
node_index = row["Node"]
if row["Feature"] == "Leaf":
return TreeNode(node_idx=node_index, leaf_value=float(row["Gain"]))
else:
return TreeNode(
node_idx=node_index,
decision_type=self._node_decision_type,
left_child=self.extract_node_id(row["Yes"], curr_tree),
right_child=self.extract_node_id(row["No"], curr_tree),
threshold=float(row["Split"]),
split_feature=self.get_feature_id(row["Feature"]),
)
def build_tree(self, nodes: List[TreeNode]) -> Tree:
return Tree(feature_names=self._feature_names, tree_structure=nodes)
def build_base_score_stump(self) -> Tree:
return Tree(
feature_names=self._feature_names,
tree_structure=[TreeNode(0, leaf_value=self._base_score)],
)
def build_forest(self) -> List[Tree]:
"""
This builds out the forest of trees as described by XGBoost into a format
supported by Elasticsearch
:return: A list of Tree objects
"""
self.check_model_booster()
tree_table: pd.DataFrame = self._model.trees_to_dataframe()
transformed_trees = []
curr_tree: Optional[Any] = None
tree_nodes: List[TreeNode] = []
for _, row in tree_table.iterrows():
if row["Tree"] != curr_tree:
if len(tree_nodes) > 0:
transformed_trees.append(self.build_tree(tree_nodes))
curr_tree = row["Tree"]
tree_nodes = []
tree_nodes.append(self.build_tree_node(row, curr_tree))
# add last tree
if len(tree_nodes) > 0:
transformed_trees.append(self.build_tree(tree_nodes))
# We add this stump as XGBoost adds the base_score to the regression outputs
if self._objective.partition(":")[0] == "reg":
transformed_trees.append(self.build_base_score_stump())
return transformed_trees
def build_aggregator_output(self) -> Dict[str, Any]:
raise NotImplementedError("build_aggregator_output must be implemented")
def determine_target_type(self) -> str:
raise NotImplementedError("determine_target_type must be implemented")
def is_objective_supported(self) -> bool:
return False
def check_model_booster(self) -> None:
# xgboost v1 made booster default to 'None' meaning 'gbtree'
if self._model.booster not in {"dart", "gbtree", None}:
raise ValueError(
f"booster must exist and be of type 'dart' or "
f"'gbtree', was {self._model.booster!r}"
)
def transform(self) -> Ensemble:
self.check_model_booster()
if not self.is_objective_supported():
raise ValueError(f"Unsupported objective '{self._objective}'")
forest = self.build_forest()
return Ensemble(
feature_names=self._feature_names,
trained_models=forest,
output_aggregator=self.build_aggregator_output(),
classification_labels=self._classification_labels,
classification_weights=self._classification_weights,
target_type=self.determine_target_type(),
)
class XGBoostRegressorTransformer(XGBoostForestTransformer):
def __init__(self, model: XGBRegressor, feature_names: List[str]):
# XGBRegressor.base_score defaults to 0.5.
base_score = model.base_score
if base_score is None:
base_score = 0.5
super().__init__(
model.get_booster(), feature_names, base_score, model.objective
)
def determine_target_type(self) -> str:
return "regression"
def is_objective_supported(self) -> bool:
return self._objective in {
"reg:squarederror",
"reg:linear",
"reg:squaredlogerror",
"reg:logistic",
}
def build_aggregator_output(self) -> Dict[str, Any]:
return {"weighted_sum": {}}
@property
def model_type(self) -> str:
return MLModel.TYPE_REGRESSION
class XGBoostClassifierTransformer(XGBoostForestTransformer):
def __init__(
self,
model: XGBClassifier,
feature_names: List[str],
classification_labels: Optional[List[str]] = None,
):
super().__init__(
model.get_booster(),
feature_names,
model.base_score,
model.objective,
classification_labels,
)
def determine_target_type(self) -> str:
return "classification"
def is_objective_supported(self) -> bool:
return self._objective in {"binary:logistic", "binary:hinge"}
def build_aggregator_output(self) -> Dict[str, Any]:
return {"logistic_regression": {}}
@property
def model_type(self) -> str:
return MLModel.TYPE_CLASSIFICATION
_MODEL_TRANSFORMERS: Dict[type, Type[ModelTransformer]] = {
XGBRegressor: XGBoostRegressorTransformer,
XGBClassifier: XGBoostClassifierTransformer,
}

View File

@ -14,6 +14,7 @@ if TYPE_CHECKING:
from .filter import BooleanFilter # noqa: F401
from .query_compiler import QueryCompiler # noqa: F401
from .operations import QueryParams # noqa: F401
from .index import Index # noqa: F401
RESOLVED_TASK_TYPE = Tuple["QueryParams", List["PostProcessingAction"]]
@ -50,7 +51,7 @@ class Task(ABC):
class SizeTask(Task):
def __init__(self, task_type: str, index, count: int):
def __init__(self, task_type: str, index: "Index", count: int):
super().__init__(task_type)
self._sort_field = index.sort_field
self._count = min(len(index), count)
@ -62,7 +63,7 @@ class SizeTask(Task):
class HeadTask(SizeTask):
def __init__(self, index, count: int):
def __init__(self, index: "Index", count: int):
super().__init__("head", index, count)
def __repr__(self) -> str:
@ -109,7 +110,7 @@ class HeadTask(SizeTask):
class TailTask(SizeTask):
def __init__(self, index, count: int):
def __init__(self, index: "Index", count: int):
super().__init__("tail", index, count)
def resolve_task(
@ -172,7 +173,7 @@ class TailTask(SizeTask):
class SampleTask(SizeTask):
def __init__(self, index, count: int, random_state: int):
def __init__(self, index: "Index", count: int, random_state: int):
super().__init__("sample", index, count)
self._random_state = random_state

View File

@ -2,17 +2,49 @@
# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
# See the LICENSE file in the project root for more information
import pytest
import numpy as np
from sklearn import datasets
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from xgboost import XGBRegressor, XGBClassifier
from eland.ml import ImportedMLModel
from eland.tests import ES_TEST_CLIENT
try:
from sklearn import datasets
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
HAS_SKLEARN = True
except ImportError:
HAS_SKLEARN = False
try:
from xgboost import XGBRegressor, XGBClassifier
HAS_XGBOOST = True
except ImportError:
HAS_XGBOOST = False
requires_sklearn = pytest.mark.skipif(
not HAS_SKLEARN, reason="This test requires 'scikit-learn' package to run"
)
requires_xgboost = pytest.mark.skipif(
not HAS_XGBOOST, reason="This test requires 'xgboost' package to run"
)
requires_no_ml_extras = pytest.mark.skipif(
HAS_SKLEARN or HAS_XGBOOST,
reason="This test requires 'scikit-learn' and 'xgboost' to not be installed",
)
@requires_no_ml_extras
def test_import_ml_model_when_dependencies_are_not_available():
from eland.ml import MLModel, ImportedMLModel # noqa: F401
class TestImportedMLModel:
@requires_sklearn
def test_decision_tree_classifier(self):
# Train model
training_data = datasets.make_classification(n_features=5)
@ -37,6 +69,7 @@ class TestImportedMLModel:
# Clean up
es_model.delete_model()
@requires_sklearn
def test_decision_tree_regressor(self):
# Train model
training_data = datasets.make_regression(n_features=5)
@ -61,6 +94,7 @@ class TestImportedMLModel:
# Clean up
es_model.delete_model()
@requires_sklearn
def test_random_forest_classifier(self):
# Train model
training_data = datasets.make_classification(n_features=5)
@ -85,6 +119,7 @@ class TestImportedMLModel:
# Clean up
es_model.delete_model()
@requires_sklearn
def test_random_forest_regressor(self):
# Train model
training_data = datasets.make_regression(n_features=5)
@ -109,6 +144,7 @@ class TestImportedMLModel:
# Clean up
es_model.delete_model()
@requires_xgboost
def test_xgb_classifier(self):
# Train model
training_data = datasets.make_classification(n_features=5)
@ -133,6 +169,7 @@ class TestImportedMLModel:
# Clean up
es_model.delete_model()
@requires_xgboost
def test_xgb_regressor(self):
# Train model
training_data = datasets.make_regression(n_features=5)
@ -158,6 +195,7 @@ class TestImportedMLModel:
# Clean up
es_model.delete_model()
@requires_xgboost
def test_predict_single_feature_vector(self):
# Train model
training_data = datasets.make_regression(n_features=1)

View File

@ -29,8 +29,13 @@ TYPED_FILES = {
"eland/index.py",
"eland/query.py",
"eland/tasks.py",
"eland/ml/__init__.py",
"eland/ml/_model_serializer.py",
"eland/ml/imported_ml_model.py",
"eland/ml/transformers/__init__.py",
"eland/ml/transformers/base.py",
"eland/ml/transformers/sklearn.py",
"eland/ml/transformers/xgboost.py",
}
@ -76,6 +81,24 @@ def test(session):
session.run("python", "-m", "eland.tests.setup_tests")
session.run("pytest", "--doctest-modules", *(session.posargs or ("eland/",)))
session.run("python", "-m", "pip", "uninstall", "--yes", "scikit-learn", "xgboost")
session.run("pytest", "eland/tests/ml/")
@nox.session(python=["3.6", "3.7", "3.8"], name="test-ml-deps")
def test_ml_deps(session):
def session_uninstall(*deps):
session.run("python", "-m", "pip", "uninstall", "--yes", *deps)
session.install("-r", "requirements-dev.txt")
session.run("python", "-m", "eland.tests.setup_tests")
session_uninstall("xgboost", "scikit-learn")
session.run("pytest", *(session.posargs or ("eland/tests/ml/",)))
session.install(".[scikit-learn]")
session.run("pytest", *(session.posargs or ("eland/tests/ml/",)))
@nox.session(reuse_venv=True)
def docs(session):