mirror of
https://github.com/elastic/eland.git
synced 2025-07-11 00:02:14 +08:00
This switches our sklearn.DecisionTreeClassifier serialization logic to account for multi-valued leaves in the tree. The key difference between our inference and DecisionTreeClassifier, is that we run a softMax over the leaf where sklearn simply normalizes the results. This means that our "probabilities" returned will be different than sklearn.
497 lines
17 KiB
Python
497 lines
17 KiB
Python
# Licensed to Elasticsearch B.V. under one or more contributor
|
|
# license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright
|
|
# ownership. Elasticsearch B.V. licenses this file to you under
|
|
# the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from eland.ml import MLModel
|
|
from tests import ES_TEST_CLIENT, ES_VERSION
|
|
|
|
try:
|
|
from sklearn import datasets
|
|
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
|
|
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
|
|
|
|
HAS_SKLEARN = True
|
|
except ImportError:
|
|
HAS_SKLEARN = False
|
|
|
|
try:
|
|
from xgboost import XGBClassifier, XGBRegressor
|
|
|
|
HAS_XGBOOST = True
|
|
except ImportError:
|
|
HAS_XGBOOST = False
|
|
|
|
try:
|
|
from lightgbm import LGBMClassifier, LGBMRegressor
|
|
|
|
HAS_LIGHTGBM = True
|
|
except ImportError:
|
|
HAS_LIGHTGBM = False
|
|
|
|
|
|
requires_sklearn = pytest.mark.skipif(
|
|
not HAS_SKLEARN, reason="This test requires 'scikit-learn' package to run"
|
|
)
|
|
requires_xgboost = pytest.mark.skipif(
|
|
not HAS_XGBOOST, reason="This test requires 'xgboost' package to run"
|
|
)
|
|
requires_no_ml_extras = pytest.mark.skipif(
|
|
HAS_SKLEARN or HAS_XGBOOST,
|
|
reason="This test requires 'scikit-learn' and 'xgboost' to not be installed",
|
|
)
|
|
|
|
requires_lightgbm = pytest.mark.skipif(
|
|
not HAS_LIGHTGBM, reason="This test requires 'lightgbm' package to run"
|
|
)
|
|
|
|
|
|
def skip_if_multiclass_classifition():
|
|
if ES_VERSION < (7, 7):
|
|
raise pytest.skip(
|
|
"Skipped because multiclass classification "
|
|
"isn't supported on Elasticsearch 7.6"
|
|
)
|
|
|
|
|
|
def random_rows(data, size):
|
|
return data[np.random.randint(data.shape[0], size=size), :]
|
|
|
|
|
|
def check_prediction_equality(es_model: MLModel, py_model, test_data):
|
|
# Get some test results
|
|
test_results = py_model.predict(np.asarray(test_data))
|
|
es_results = es_model.predict(test_data)
|
|
np.testing.assert_almost_equal(test_results, es_results, decimal=2)
|
|
|
|
|
|
class TestMLModel:
|
|
@requires_no_ml_extras
|
|
def test_import_ml_model_when_dependencies_are_not_available(self):
|
|
from eland.ml import MLModel # noqa: F401
|
|
|
|
@requires_sklearn
|
|
def test_unpack_and_raise_errors_in_ingest_simulate(self, mocker):
|
|
# Train model
|
|
training_data = datasets.make_classification(n_features=5)
|
|
classifier = DecisionTreeClassifier()
|
|
classifier.fit(training_data[0], training_data[1])
|
|
|
|
# Serialise the models to Elasticsearch
|
|
feature_names = ["f0", "f1", "f2", "f3", "f4"]
|
|
model_id = "test_decision_tree_classifier"
|
|
test_data = [[0.1, 0.2, 0.3, -0.5, 1.0], [1.6, 2.1, -10, 50, -1.0]]
|
|
|
|
es_model = MLModel.import_model(
|
|
ES_TEST_CLIENT,
|
|
model_id,
|
|
classifier,
|
|
feature_names,
|
|
es_if_exists="replace",
|
|
es_compress_model_definition=True,
|
|
)
|
|
|
|
# Mock the ingest.simulate API to return an error within {'docs': [...]}
|
|
mock = mocker.patch.object(ES_TEST_CLIENT.ingest, "simulate")
|
|
mock.return_value = {
|
|
"docs": [
|
|
{
|
|
"error": {
|
|
"type": "x_content_parse_exception",
|
|
"reason": "[1:1052] [inference_model_definition] failed to parse field [trained_model]",
|
|
}
|
|
}
|
|
]
|
|
}
|
|
|
|
with pytest.raises(RuntimeError) as err:
|
|
es_model.predict(test_data)
|
|
|
|
assert repr(err.value) == (
|
|
'RuntimeError("Failed to run prediction for model ID '
|
|
"'test_decision_tree_classifier'\", {'type': 'x_content_parse_exception', "
|
|
"'reason': '[1:1052] [inference_model_definition] failed to parse "
|
|
"field [trained_model]'})"
|
|
)
|
|
|
|
@requires_sklearn
|
|
@pytest.mark.parametrize("compress_model_definition", [True, False])
|
|
@pytest.mark.parametrize("multi_class", [True, False])
|
|
def test_decision_tree_classifier(self, compress_model_definition, multi_class):
|
|
# Train model
|
|
training_data = (
|
|
datasets.make_classification(
|
|
n_features=7,
|
|
n_classes=3,
|
|
n_clusters_per_class=2,
|
|
n_informative=6,
|
|
n_redundant=1,
|
|
)
|
|
if multi_class
|
|
else datasets.make_classification(n_features=7)
|
|
)
|
|
classifier = DecisionTreeClassifier()
|
|
classifier.fit(training_data[0], training_data[1])
|
|
|
|
# Serialise the models to Elasticsearch
|
|
feature_names = ["f0", "f1", "f2", "f3", "f4", "f5", "f6"]
|
|
model_id = "test_decision_tree_classifier"
|
|
|
|
es_model = MLModel.import_model(
|
|
ES_TEST_CLIENT,
|
|
model_id,
|
|
classifier,
|
|
feature_names,
|
|
es_if_exists="replace",
|
|
es_compress_model_definition=compress_model_definition,
|
|
)
|
|
|
|
# Get some test results
|
|
check_prediction_equality(
|
|
es_model, classifier, random_rows(training_data[0], 20)
|
|
)
|
|
|
|
# Clean up
|
|
es_model.delete_model()
|
|
|
|
@requires_sklearn
|
|
@pytest.mark.parametrize("compress_model_definition", [True, False])
|
|
def test_decision_tree_regressor(self, compress_model_definition):
|
|
# Train model
|
|
training_data = datasets.make_regression(n_features=5)
|
|
regressor = DecisionTreeRegressor()
|
|
regressor.fit(training_data[0], training_data[1])
|
|
|
|
# Serialise the models to Elasticsearch
|
|
feature_names = ["f0", "f1", "f2", "f3", "f4"]
|
|
model_id = "test_decision_tree_regressor"
|
|
|
|
es_model = MLModel.import_model(
|
|
ES_TEST_CLIENT,
|
|
model_id,
|
|
regressor,
|
|
feature_names,
|
|
es_if_exists="replace",
|
|
es_compress_model_definition=compress_model_definition,
|
|
)
|
|
# Get some test results
|
|
check_prediction_equality(
|
|
es_model, regressor, random_rows(training_data[0], 20)
|
|
)
|
|
|
|
# Clean up
|
|
es_model.delete_model()
|
|
|
|
@requires_sklearn
|
|
@pytest.mark.parametrize("compress_model_definition", [True, False])
|
|
def test_random_forest_classifier(self, compress_model_definition):
|
|
# Train model
|
|
training_data = datasets.make_classification(n_features=5)
|
|
classifier = RandomForestClassifier()
|
|
classifier.fit(training_data[0], training_data[1])
|
|
|
|
# Serialise the models to Elasticsearch
|
|
feature_names = ["f0", "f1", "f2", "f3", "f4"]
|
|
model_id = "test_random_forest_classifier"
|
|
|
|
es_model = MLModel.import_model(
|
|
ES_TEST_CLIENT,
|
|
model_id,
|
|
classifier,
|
|
feature_names,
|
|
es_if_exists="replace",
|
|
es_compress_model_definition=compress_model_definition,
|
|
)
|
|
# Get some test results
|
|
check_prediction_equality(
|
|
es_model, classifier, random_rows(training_data[0], 20)
|
|
)
|
|
|
|
# Clean up
|
|
es_model.delete_model()
|
|
|
|
@requires_sklearn
|
|
@pytest.mark.parametrize("compress_model_definition", [True, False])
|
|
def test_random_forest_regressor(self, compress_model_definition):
|
|
# Train model
|
|
training_data = datasets.make_regression(n_features=5)
|
|
regressor = RandomForestRegressor()
|
|
regressor.fit(training_data[0], training_data[1])
|
|
|
|
# Serialise the models to Elasticsearch
|
|
feature_names = ["f0", "f1", "f2", "f3", "f4"]
|
|
model_id = "test_random_forest_regressor"
|
|
|
|
es_model = MLModel.import_model(
|
|
ES_TEST_CLIENT,
|
|
model_id,
|
|
regressor,
|
|
feature_names,
|
|
es_if_exists="replace",
|
|
es_compress_model_definition=compress_model_definition,
|
|
)
|
|
# Get some test results
|
|
check_prediction_equality(
|
|
es_model, regressor, random_rows(training_data[0], 20)
|
|
)
|
|
|
|
match = f"Trained machine learning model {model_id} already exists"
|
|
with pytest.raises(ValueError, match=match):
|
|
MLModel.import_model(
|
|
ES_TEST_CLIENT,
|
|
model_id,
|
|
regressor,
|
|
feature_names,
|
|
es_if_exists="fail",
|
|
es_compress_model_definition=compress_model_definition,
|
|
)
|
|
|
|
# Clean up
|
|
es_model.delete_model()
|
|
|
|
@requires_xgboost
|
|
@pytest.mark.parametrize("compress_model_definition", [True, False])
|
|
@pytest.mark.parametrize("multi_class", [True, False])
|
|
def test_xgb_classifier(self, compress_model_definition, multi_class):
|
|
# test both multiple and binary classification
|
|
if multi_class:
|
|
skip_if_multiclass_classifition()
|
|
training_data = datasets.make_classification(
|
|
n_features=5, n_classes=3, n_informative=3
|
|
)
|
|
classifier = XGBClassifier(
|
|
booster="gbtree", objective="multi:softmax", use_label_encoder=False
|
|
)
|
|
else:
|
|
training_data = datasets.make_classification(n_features=5)
|
|
classifier = XGBClassifier(booster="gbtree", use_label_encoder=False)
|
|
|
|
# Train model
|
|
classifier.fit(training_data[0], training_data[1])
|
|
|
|
# Serialise the models to Elasticsearch
|
|
feature_names = ["f0", "f1", "f2", "f3", "f4"]
|
|
model_id = "test_xgb_classifier"
|
|
|
|
es_model = MLModel.import_model(
|
|
ES_TEST_CLIENT,
|
|
model_id,
|
|
classifier,
|
|
feature_names,
|
|
es_if_exists="replace",
|
|
es_compress_model_definition=compress_model_definition,
|
|
)
|
|
# Get some test results
|
|
check_prediction_equality(
|
|
es_model, classifier, random_rows(training_data[0], 20)
|
|
)
|
|
|
|
# Clean up
|
|
es_model.delete_model()
|
|
|
|
@requires_xgboost
|
|
@pytest.mark.parametrize(
|
|
"objective", ["multi:softmax", "multi:softprob", "binary:logistic"]
|
|
)
|
|
@pytest.mark.parametrize("booster", ["gbtree", "dart"])
|
|
def test_xgb_classifier_objectives_and_booster(self, objective, booster):
|
|
# test both multiple and binary classification
|
|
if objective.startswith("multi"):
|
|
skip_if_multiclass_classifition()
|
|
training_data = datasets.make_classification(
|
|
n_features=5, n_classes=3, n_informative=3
|
|
)
|
|
classifier = XGBClassifier(
|
|
booster=booster, objective=objective, use_label_encoder=False
|
|
)
|
|
else:
|
|
training_data = datasets.make_classification(n_features=5)
|
|
classifier = XGBClassifier(
|
|
booster=booster, objective=objective, use_label_encoder=False
|
|
)
|
|
|
|
# Train model
|
|
classifier.fit(training_data[0], training_data[1])
|
|
|
|
# Serialise the models to Elasticsearch
|
|
feature_names = ["feature0", "feature1", "feature2", "feature3", "feature4"]
|
|
model_id = "test_xgb_classifier"
|
|
|
|
es_model = MLModel.import_model(
|
|
ES_TEST_CLIENT, model_id, classifier, feature_names, es_if_exists="replace"
|
|
)
|
|
# Get some test results
|
|
check_prediction_equality(
|
|
es_model, classifier, random_rows(training_data[0], 20)
|
|
)
|
|
|
|
# Clean up
|
|
es_model.delete_model()
|
|
|
|
@requires_xgboost
|
|
@pytest.mark.parametrize("compress_model_definition", [True, False])
|
|
@pytest.mark.parametrize(
|
|
"objective",
|
|
[
|
|
"reg:squarederror",
|
|
"reg:squaredlogerror",
|
|
"reg:linear",
|
|
"reg:logistic",
|
|
"reg:pseudohubererror",
|
|
],
|
|
)
|
|
@pytest.mark.parametrize("booster", ["gbtree", "dart"])
|
|
def test_xgb_regressor(self, compress_model_definition, objective, booster):
|
|
# Train model
|
|
training_data = datasets.make_regression(n_features=5)
|
|
regressor = XGBRegressor(objective=objective, booster=booster)
|
|
regressor.fit(
|
|
training_data[0],
|
|
np.exp(training_data[1] - np.max(training_data[1]))
|
|
/ sum(np.exp(training_data[1])),
|
|
)
|
|
|
|
# Serialise the models to Elasticsearch
|
|
feature_names = ["f0", "f1", "f2", "f3", "f4"]
|
|
model_id = "test_xgb_regressor"
|
|
|
|
es_model = MLModel.import_model(
|
|
ES_TEST_CLIENT,
|
|
model_id,
|
|
regressor,
|
|
feature_names,
|
|
es_if_exists="replace",
|
|
es_compress_model_definition=compress_model_definition,
|
|
)
|
|
# Get some test results
|
|
check_prediction_equality(
|
|
es_model, regressor, random_rows(training_data[0], 20)
|
|
)
|
|
|
|
# Clean up
|
|
es_model.delete_model()
|
|
|
|
@requires_xgboost
|
|
def test_predict_single_feature_vector(self):
|
|
# Train model
|
|
training_data = datasets.make_regression(n_features=1)
|
|
regressor = XGBRegressor()
|
|
regressor.fit(training_data[0], training_data[1])
|
|
|
|
# Get some test results
|
|
test_data = [[0.1]]
|
|
test_results = regressor.predict(np.asarray(test_data))
|
|
|
|
# Serialise the models to Elasticsearch
|
|
feature_names = ["f0"]
|
|
model_id = "test_xgb_regressor"
|
|
|
|
es_model = MLModel.import_model(
|
|
ES_TEST_CLIENT, model_id, regressor, feature_names, es_if_exists="replace"
|
|
)
|
|
|
|
# Single feature
|
|
es_results = es_model.predict(test_data[0])
|
|
|
|
np.testing.assert_almost_equal(test_results, es_results, decimal=2)
|
|
|
|
# Clean up
|
|
es_model.delete_model()
|
|
|
|
@requires_lightgbm
|
|
@pytest.mark.parametrize("compress_model_definition", [True, False])
|
|
@pytest.mark.parametrize(
|
|
"objective",
|
|
["regression", "regression_l1", "huber", "fair", "quantile", "mape"],
|
|
)
|
|
@pytest.mark.parametrize("booster", ["gbdt", "rf", "dart", "goss"])
|
|
def test_lgbm_regressor(self, compress_model_definition, objective, booster):
|
|
# Train model
|
|
training_data = datasets.make_regression(n_features=5)
|
|
if booster == "rf":
|
|
regressor = LGBMRegressor(
|
|
boosting_type=booster,
|
|
objective=objective,
|
|
bagging_fraction=0.5,
|
|
bagging_freq=3,
|
|
)
|
|
else:
|
|
regressor = LGBMRegressor(boosting_type=booster, objective=objective)
|
|
regressor.fit(training_data[0], training_data[1])
|
|
|
|
# Serialise the models to Elasticsearch
|
|
feature_names = ["Column_0", "Column_1", "Column_2", "Column_3", "Column_4"]
|
|
model_id = "test_lgbm_regressor"
|
|
|
|
es_model = MLModel.import_model(
|
|
ES_TEST_CLIENT,
|
|
model_id,
|
|
regressor,
|
|
feature_names,
|
|
es_if_exists="replace",
|
|
es_compress_model_definition=compress_model_definition,
|
|
)
|
|
# Get some test results
|
|
check_prediction_equality(
|
|
es_model, regressor, random_rows(training_data[0], 20)
|
|
)
|
|
|
|
# Clean up
|
|
es_model.delete_model()
|
|
|
|
@requires_lightgbm
|
|
@pytest.mark.parametrize("compress_model_definition", [True, False])
|
|
@pytest.mark.parametrize("objective", ["binary", "multiclass", "multiclassova"])
|
|
@pytest.mark.parametrize("booster", ["gbdt", "dart", "goss"])
|
|
def test_lgbm_classifier_objectives_and_booster(
|
|
self, compress_model_definition, objective, booster
|
|
):
|
|
# test both multiple and binary classification
|
|
if objective.startswith("multi"):
|
|
skip_if_multiclass_classifition()
|
|
training_data = datasets.make_classification(
|
|
n_features=5, n_classes=3, n_informative=3
|
|
)
|
|
classifier = LGBMClassifier(boosting_type=booster, objective=objective)
|
|
else:
|
|
training_data = datasets.make_classification(n_features=5)
|
|
classifier = LGBMClassifier(boosting_type=booster, objective=objective)
|
|
|
|
# Train model
|
|
classifier.fit(training_data[0], training_data[1])
|
|
|
|
# Serialise the models to Elasticsearch
|
|
feature_names = ["Column_0", "Column_1", "Column_2", "Column_3", "Column_4"]
|
|
model_id = "test_lgbm_classifier"
|
|
|
|
es_model = MLModel.import_model(
|
|
ES_TEST_CLIENT,
|
|
model_id,
|
|
classifier,
|
|
feature_names,
|
|
es_if_exists="replace",
|
|
es_compress_model_definition=compress_model_definition,
|
|
)
|
|
|
|
check_prediction_equality(
|
|
es_model, classifier, random_rows(training_data[0], 20)
|
|
)
|
|
|
|
# Clean up
|
|
es_model.delete_model()
|