diff --git a/eland/etl.py b/eland/etl.py index 10977e2..d590c61 100644 --- a/eland/etl.py +++ b/eland/etl.py @@ -9,7 +9,7 @@ import pandas as pd # type: ignore from pandas.io.parsers import _c_parser_defaults # type: ignore from eland import DataFrame -from eland.field_mappings import FieldMappings +from eland.field_mappings import FieldMappings, verify_mapping_compatibility from eland.common import ensure_es_client, DEFAULT_CHUNK_SIZE from eland.utils import deprecated_api from elasticsearch import Elasticsearch # type: ignore @@ -172,14 +172,23 @@ def pandas_to_eland( raise ValueError( f"Could not create the index [{es_dest_index}] because it " f"already exists. " - f"Change the if_exists parameter to " + f"Change the 'es_if_exists' parameter to " f"'append' or 'replace' data." ) + elif es_if_exists == "replace": es_client.indices.delete(index=es_dest_index) es_client.indices.create(index=es_dest_index, body=mapping) - # elif if_exists == "append": - # TODO validate mapping are compatible + + elif es_if_exists == "append": + dest_mapping = es_client.indices.get_mapping(index=es_dest_index)[ + es_dest_index + ] + verify_mapping_compatibility( + ed_mapping=mapping, + es_mapping=dest_mapping, + es_type_overrides=es_type_overrides, + ) else: es_client.indices.create(index=es_dest_index, body=mapping) diff --git a/eland/field_mappings.py b/eland/field_mappings.py index 2b8df8a..06e66bc 100644 --- a/eland/field_mappings.py +++ b/eland/field_mappings.py @@ -14,12 +14,29 @@ from pandas.core.dtypes.common import ( is_string_dtype, ) from pandas.core.dtypes.inference import is_list_like -from typing import NamedTuple, Optional, Mapping, Dict, TYPE_CHECKING +from typing import NamedTuple, Optional, Mapping, Dict, Any, TYPE_CHECKING if TYPE_CHECKING: from eland import DataFrame +ES_FLOAT_TYPES = {"double", "float", "half_float", "scaled_float"} +ES_INTEGER_TYPES = {"long", "integer", "short", "byte"} +ES_COMPATIBLE_TYPES = { + "double": ES_FLOAT_TYPES, + "scaled_float": ES_FLOAT_TYPES, + "float": ES_FLOAT_TYPES, + "half_float": ES_FLOAT_TYPES, + "long": ES_INTEGER_TYPES, + "integer": ES_INTEGER_TYPES, + "short": ES_INTEGER_TYPES, + "byte": ES_INTEGER_TYPES, + "date": {"date_nanos"}, + "date_nanos": {"date"}, + "keyword": {"text"}, +} + + class Field(NamedTuple): """Holds all information on a particular field in the mapping""" @@ -436,7 +453,7 @@ class FieldMappings: @staticmethod def _generate_es_mappings( dataframe: "DataFrame", es_type_overrides: Optional[Mapping[str, str]] = None - ) -> Dict[str, str]: + ) -> Dict[str, Dict[str, Dict[str, Any]]]: """Given a pandas dataframe, generate the associated Elasticsearch mapping Parameters @@ -471,17 +488,16 @@ class FieldMappings: } """ - mappings = {"properties": {}} + mapping_props = {} for field_name_name, dtype in dataframe.dtypes.iteritems(): if es_type_overrides is not None and field_name_name in es_type_overrides: es_dtype = es_type_overrides[field_name_name] else: es_dtype = FieldMappings._pd_dtype_to_es_dtype(dtype) - mappings["properties"][field_name_name] = {} - mappings["properties"][field_name_name]["type"] = es_dtype + mapping_props[field_name_name] = {"type": es_dtype} - return {"mappings": mappings} + return {"mappings": {"properties": mapping_props}} def aggregatable_field_name(self, display_name): """ @@ -751,3 +767,45 @@ class FieldMappings: renames[field_name] = display_name return renames + + +def verify_mapping_compatibility( + ed_mapping: Mapping[str, Mapping[str, Mapping[str, Mapping[str, str]]]], + es_mapping: Mapping[str, Mapping[str, Mapping[str, Mapping[str, str]]]], + es_type_overrides: Optional[Mapping[str, str]] = None, +) -> None: + """Given a mapping generated by Eland and an existing ES index mapping + attempt to see if the two are compatible. If not compatible raise ValueError + with a list of problems between the two to be reported to the user. + """ + problems = [] + es_type_overrides = es_type_overrides or {} + + ed_mapping = ed_mapping["mappings"]["properties"] + es_mapping = es_mapping["mappings"]["properties"] + + for key in sorted(es_mapping.keys()): + if key not in ed_mapping: + problems.append(f"- {key!r} is missing from DataFrame columns") + + for key, key_def in sorted(ed_mapping.items()): + if key not in es_mapping: + problems.append(f"- {key!r} is missing from ES index mapping") + continue + + key_type = es_type_overrides.get(key, key_def["type"]) + es_key_type = es_mapping[key]["type"] + if key_type != es_key_type and es_key_type not in ES_COMPATIBLE_TYPES.get( + key_type, () + ): + problems.append( + f"- {key!r} column type ({key_type!r}) not compatible with " + f"ES index mapping type ({es_key_type!r})" + ) + + if problems: + problems_message = "\n".join(problems) + raise ValueError( + f"DataFrame dtypes and Elasticsearch index mapping " + f"aren't compatible:\n{problems_message}" + ) diff --git a/eland/tests/__init__.py b/eland/tests/__init__.py index c3898bd..8f3b551 100644 --- a/eland/tests/__init__.py +++ b/eland/tests/__init__.py @@ -50,7 +50,7 @@ FLIGHTS_MAPPING = { "OriginLocation": {"type": "geo_point"}, "OriginRegion": {"type": "keyword"}, "OriginWeather": {"type": "keyword"}, - "dayOfWeek": {"type": "integer"}, + "dayOfWeek": {"type": "byte"}, "timestamp": {"type": "date", "format": "strict_date_hour_minute_second"}, } } diff --git a/eland/tests/etl/__init__.py b/eland/tests/etl/__init__.py new file mode 100644 index 0000000..1a3c439 --- /dev/null +++ b/eland/tests/etl/__init__.py @@ -0,0 +1,3 @@ +# Licensed to Elasticsearch B.V under one or more agreements. +# Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +# See the LICENSE file in the project root for more information diff --git a/eland/tests/etl/test_pandas_to_eland.py b/eland/tests/etl/test_pandas_to_eland.py new file mode 100644 index 0000000..552e3c2 --- /dev/null +++ b/eland/tests/etl/test_pandas_to_eland.py @@ -0,0 +1,183 @@ +# Licensed to Elasticsearch B.V under one or more agreements. +# Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +# See the LICENSE file in the project root for more information + +from datetime import datetime, timedelta +import pytest +import pandas as pd +from elasticsearch.helpers import BulkIndexError +from eland import pandas_to_eland, DataFrame +from eland.tests.common import ( + ES_TEST_CLIENT, + assert_frame_equal, + assert_pandas_eland_frame_equal, +) + +dt = datetime.utcnow() +pd_df = pd.DataFrame( + { + "a": [1, 2, 3], + "b": [1.0, 2.0, 3.0], + "c": ["A", "B", "C"], + "d": [dt, dt + timedelta(1), dt + timedelta(2)], + }, + index=["0", "1", "2"], +) + +pd_df2 = pd.DataFrame({"Z": [3, 2, 1], "a": ["C", "D", "E"]}, index=["0", "1", "2"]) + + +@pytest.fixture(scope="function", autouse=True) +def delete_test_index(): + ES_TEST_CLIENT.indices.delete(index="test-index", ignore=404) + yield + ES_TEST_CLIENT.indices.delete(index="test-index", ignore=404) + + +class TestPandasToEland: + def test_returns_eland_dataframe(self): + df = pandas_to_eland( + pd_df, es_client=ES_TEST_CLIENT, es_dest_index="test-index" + ) + + assert isinstance(df, DataFrame) + assert "es_index_pattern: test-index" in df.es_info() + + def test_es_if_exists_fail(self): + pandas_to_eland(pd_df, es_client=ES_TEST_CLIENT, es_dest_index="test-index") + + with pytest.raises(ValueError) as e: + pandas_to_eland(pd_df, es_client=ES_TEST_CLIENT, es_dest_index="test-index") + + assert str(e.value) == ( + "Could not create the index [test-index] because it " + "already exists. Change the 'es_if_exists' parameter " + "to 'append' or 'replace' data." + ) + + def test_es_if_exists_replace(self): + # Assert that 'replace' allows for creation + df1 = pandas_to_eland( + pd_df2, + es_client=ES_TEST_CLIENT, + es_dest_index="test-index", + es_if_exists="replace", + es_refresh=True, + ).to_pandas() + assert_frame_equal(pd_df2, df1) + + # Assert that 'replace' will replace existing mapping and entries + df2 = pandas_to_eland( + pd_df, + es_client=ES_TEST_CLIENT, + es_dest_index="test-index", + es_if_exists="replace", + es_refresh=True, + ) + assert_pandas_eland_frame_equal(pd_df, df2) + + df3 = pandas_to_eland( + pd_df2, + es_client=ES_TEST_CLIENT, + es_dest_index="test-index", + es_if_exists="replace", + es_refresh=True, + ).to_pandas() + assert_frame_equal(df1, df3) + + def test_es_if_exists_append(self): + df1 = pandas_to_eland( + pd_df, + es_client=ES_TEST_CLIENT, + es_dest_index="test-index", + es_if_exists="append", + es_refresh=True, + # We use 'short' here specifically so that the + # assumed type of 'long' is coerced into a 'short' + # by append mode. + es_type_overrides={"a": "short"}, + ) + assert_pandas_eland_frame_equal(pd_df, df1) + assert df1.shape == (3, 4) + + pd_df2 = pd.DataFrame( + { + "a": [4, 5, 6], + "b": [-1.0, -2.0, -3.0], + "c": ["A", "B", "C"], + "d": [dt, dt - timedelta(1), dt - timedelta(2)], + }, + index=["3", "4", "5"], + ) + df2 = pandas_to_eland( + pd_df2, + es_client=ES_TEST_CLIENT, + es_dest_index="test-index", + es_if_exists="append", + es_refresh=True, + ) + + # Assert that the second pandas dataframe is actually appended + assert df2.shape == (6, 4) + pd_df3 = pd_df.append(pd_df2) + assert_pandas_eland_frame_equal(pd_df3, df2) + + def test_es_if_exists_append_mapping_mismatch(self): + df1 = pandas_to_eland( + pd_df, + es_client=ES_TEST_CLIENT, + es_dest_index="test-index", + es_if_exists="append", + es_refresh=True, + ) + + with pytest.raises(ValueError) as e: + pandas_to_eland( + pd_df2, + es_client=ES_TEST_CLIENT, + es_dest_index="test-index", + es_if_exists="append", + ) + + assert str(e.value) == ( + "DataFrame dtypes and Elasticsearch index mapping aren't compatible:\n" + "- 'b' is missing from DataFrame columns\n" + "- 'c' is missing from DataFrame columns\n" + "- 'd' is missing from DataFrame columns\n" + "- 'Z' is missing from ES index mapping\n" + "- 'a' column type ('keyword') not compatible with ES index mapping type ('long')" + ) + # Assert that the index isn't modified + assert_pandas_eland_frame_equal(pd_df, df1) + + def test_es_if_exists_append_es_type_coerce_error(self): + df1 = pandas_to_eland( + pd_df, + es_client=ES_TEST_CLIENT, + es_dest_index="test-index", + es_if_exists="append", + es_refresh=True, + es_type_overrides={"a": "byte"}, + ) + assert_pandas_eland_frame_equal(pd_df, df1) + + pd_df_short = pd.DataFrame( + { + "a": [128], # This value is too large for 'byte' + "b": [-1.0], + "c": ["A"], + "d": [dt], + }, + index=["3"], + ) + + with pytest.raises(BulkIndexError) as e: + pandas_to_eland( + pd_df_short, + es_client=ES_TEST_CLIENT, + es_dest_index="test-index", + es_if_exists="append", + ) + + # Assert that the value 128 caused the index error + assert "Value [128] is out of range for a byte" in str(e.value)