mirror of
https://github.com/elastic/eland.git
synced 2025-07-11 00:02:14 +08:00
Add support for es_if_exists='append' to pandas_to_eland()
This commit is contained in:
parent
ad2e012f1e
commit
f63941014f
17
eland/etl.py
17
eland/etl.py
@ -9,7 +9,7 @@ import pandas as pd # type: ignore
|
||||
from pandas.io.parsers import _c_parser_defaults # type: ignore
|
||||
|
||||
from eland import DataFrame
|
||||
from eland.field_mappings import FieldMappings
|
||||
from eland.field_mappings import FieldMappings, verify_mapping_compatibility
|
||||
from eland.common import ensure_es_client, DEFAULT_CHUNK_SIZE
|
||||
from eland.utils import deprecated_api
|
||||
from elasticsearch import Elasticsearch # type: ignore
|
||||
@ -172,14 +172,23 @@ def pandas_to_eland(
|
||||
raise ValueError(
|
||||
f"Could not create the index [{es_dest_index}] because it "
|
||||
f"already exists. "
|
||||
f"Change the if_exists parameter to "
|
||||
f"Change the 'es_if_exists' parameter to "
|
||||
f"'append' or 'replace' data."
|
||||
)
|
||||
|
||||
elif es_if_exists == "replace":
|
||||
es_client.indices.delete(index=es_dest_index)
|
||||
es_client.indices.create(index=es_dest_index, body=mapping)
|
||||
# elif if_exists == "append":
|
||||
# TODO validate mapping are compatible
|
||||
|
||||
elif es_if_exists == "append":
|
||||
dest_mapping = es_client.indices.get_mapping(index=es_dest_index)[
|
||||
es_dest_index
|
||||
]
|
||||
verify_mapping_compatibility(
|
||||
ed_mapping=mapping,
|
||||
es_mapping=dest_mapping,
|
||||
es_type_overrides=es_type_overrides,
|
||||
)
|
||||
else:
|
||||
es_client.indices.create(index=es_dest_index, body=mapping)
|
||||
|
||||
|
@ -14,12 +14,29 @@ from pandas.core.dtypes.common import (
|
||||
is_string_dtype,
|
||||
)
|
||||
from pandas.core.dtypes.inference import is_list_like
|
||||
from typing import NamedTuple, Optional, Mapping, Dict, TYPE_CHECKING
|
||||
from typing import NamedTuple, Optional, Mapping, Dict, Any, TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from eland import DataFrame
|
||||
|
||||
|
||||
ES_FLOAT_TYPES = {"double", "float", "half_float", "scaled_float"}
|
||||
ES_INTEGER_TYPES = {"long", "integer", "short", "byte"}
|
||||
ES_COMPATIBLE_TYPES = {
|
||||
"double": ES_FLOAT_TYPES,
|
||||
"scaled_float": ES_FLOAT_TYPES,
|
||||
"float": ES_FLOAT_TYPES,
|
||||
"half_float": ES_FLOAT_TYPES,
|
||||
"long": ES_INTEGER_TYPES,
|
||||
"integer": ES_INTEGER_TYPES,
|
||||
"short": ES_INTEGER_TYPES,
|
||||
"byte": ES_INTEGER_TYPES,
|
||||
"date": {"date_nanos"},
|
||||
"date_nanos": {"date"},
|
||||
"keyword": {"text"},
|
||||
}
|
||||
|
||||
|
||||
class Field(NamedTuple):
|
||||
"""Holds all information on a particular field in the mapping"""
|
||||
|
||||
@ -436,7 +453,7 @@ class FieldMappings:
|
||||
@staticmethod
|
||||
def _generate_es_mappings(
|
||||
dataframe: "DataFrame", es_type_overrides: Optional[Mapping[str, str]] = None
|
||||
) -> Dict[str, str]:
|
||||
) -> Dict[str, Dict[str, Dict[str, Any]]]:
|
||||
"""Given a pandas dataframe, generate the associated Elasticsearch mapping
|
||||
|
||||
Parameters
|
||||
@ -471,17 +488,16 @@ class FieldMappings:
|
||||
}
|
||||
"""
|
||||
|
||||
mappings = {"properties": {}}
|
||||
mapping_props = {}
|
||||
for field_name_name, dtype in dataframe.dtypes.iteritems():
|
||||
if es_type_overrides is not None and field_name_name in es_type_overrides:
|
||||
es_dtype = es_type_overrides[field_name_name]
|
||||
else:
|
||||
es_dtype = FieldMappings._pd_dtype_to_es_dtype(dtype)
|
||||
|
||||
mappings["properties"][field_name_name] = {}
|
||||
mappings["properties"][field_name_name]["type"] = es_dtype
|
||||
mapping_props[field_name_name] = {"type": es_dtype}
|
||||
|
||||
return {"mappings": mappings}
|
||||
return {"mappings": {"properties": mapping_props}}
|
||||
|
||||
def aggregatable_field_name(self, display_name):
|
||||
"""
|
||||
@ -751,3 +767,45 @@ class FieldMappings:
|
||||
renames[field_name] = display_name
|
||||
|
||||
return renames
|
||||
|
||||
|
||||
def verify_mapping_compatibility(
|
||||
ed_mapping: Mapping[str, Mapping[str, Mapping[str, Mapping[str, str]]]],
|
||||
es_mapping: Mapping[str, Mapping[str, Mapping[str, Mapping[str, str]]]],
|
||||
es_type_overrides: Optional[Mapping[str, str]] = None,
|
||||
) -> None:
|
||||
"""Given a mapping generated by Eland and an existing ES index mapping
|
||||
attempt to see if the two are compatible. If not compatible raise ValueError
|
||||
with a list of problems between the two to be reported to the user.
|
||||
"""
|
||||
problems = []
|
||||
es_type_overrides = es_type_overrides or {}
|
||||
|
||||
ed_mapping = ed_mapping["mappings"]["properties"]
|
||||
es_mapping = es_mapping["mappings"]["properties"]
|
||||
|
||||
for key in sorted(es_mapping.keys()):
|
||||
if key not in ed_mapping:
|
||||
problems.append(f"- {key!r} is missing from DataFrame columns")
|
||||
|
||||
for key, key_def in sorted(ed_mapping.items()):
|
||||
if key not in es_mapping:
|
||||
problems.append(f"- {key!r} is missing from ES index mapping")
|
||||
continue
|
||||
|
||||
key_type = es_type_overrides.get(key, key_def["type"])
|
||||
es_key_type = es_mapping[key]["type"]
|
||||
if key_type != es_key_type and es_key_type not in ES_COMPATIBLE_TYPES.get(
|
||||
key_type, ()
|
||||
):
|
||||
problems.append(
|
||||
f"- {key!r} column type ({key_type!r}) not compatible with "
|
||||
f"ES index mapping type ({es_key_type!r})"
|
||||
)
|
||||
|
||||
if problems:
|
||||
problems_message = "\n".join(problems)
|
||||
raise ValueError(
|
||||
f"DataFrame dtypes and Elasticsearch index mapping "
|
||||
f"aren't compatible:\n{problems_message}"
|
||||
)
|
||||
|
@ -50,7 +50,7 @@ FLIGHTS_MAPPING = {
|
||||
"OriginLocation": {"type": "geo_point"},
|
||||
"OriginRegion": {"type": "keyword"},
|
||||
"OriginWeather": {"type": "keyword"},
|
||||
"dayOfWeek": {"type": "integer"},
|
||||
"dayOfWeek": {"type": "byte"},
|
||||
"timestamp": {"type": "date", "format": "strict_date_hour_minute_second"},
|
||||
}
|
||||
}
|
||||
|
3
eland/tests/etl/__init__.py
Normal file
3
eland/tests/etl/__init__.py
Normal file
@ -0,0 +1,3 @@
|
||||
# Licensed to Elasticsearch B.V under one or more agreements.
|
||||
# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||
# See the LICENSE file in the project root for more information
|
183
eland/tests/etl/test_pandas_to_eland.py
Normal file
183
eland/tests/etl/test_pandas_to_eland.py
Normal file
@ -0,0 +1,183 @@
|
||||
# Licensed to Elasticsearch B.V under one or more agreements.
|
||||
# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||
# See the LICENSE file in the project root for more information
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
import pytest
|
||||
import pandas as pd
|
||||
from elasticsearch.helpers import BulkIndexError
|
||||
from eland import pandas_to_eland, DataFrame
|
||||
from eland.tests.common import (
|
||||
ES_TEST_CLIENT,
|
||||
assert_frame_equal,
|
||||
assert_pandas_eland_frame_equal,
|
||||
)
|
||||
|
||||
dt = datetime.utcnow()
|
||||
pd_df = pd.DataFrame(
|
||||
{
|
||||
"a": [1, 2, 3],
|
||||
"b": [1.0, 2.0, 3.0],
|
||||
"c": ["A", "B", "C"],
|
||||
"d": [dt, dt + timedelta(1), dt + timedelta(2)],
|
||||
},
|
||||
index=["0", "1", "2"],
|
||||
)
|
||||
|
||||
pd_df2 = pd.DataFrame({"Z": [3, 2, 1], "a": ["C", "D", "E"]}, index=["0", "1", "2"])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def delete_test_index():
|
||||
ES_TEST_CLIENT.indices.delete(index="test-index", ignore=404)
|
||||
yield
|
||||
ES_TEST_CLIENT.indices.delete(index="test-index", ignore=404)
|
||||
|
||||
|
||||
class TestPandasToEland:
|
||||
def test_returns_eland_dataframe(self):
|
||||
df = pandas_to_eland(
|
||||
pd_df, es_client=ES_TEST_CLIENT, es_dest_index="test-index"
|
||||
)
|
||||
|
||||
assert isinstance(df, DataFrame)
|
||||
assert "es_index_pattern: test-index" in df.es_info()
|
||||
|
||||
def test_es_if_exists_fail(self):
|
||||
pandas_to_eland(pd_df, es_client=ES_TEST_CLIENT, es_dest_index="test-index")
|
||||
|
||||
with pytest.raises(ValueError) as e:
|
||||
pandas_to_eland(pd_df, es_client=ES_TEST_CLIENT, es_dest_index="test-index")
|
||||
|
||||
assert str(e.value) == (
|
||||
"Could not create the index [test-index] because it "
|
||||
"already exists. Change the 'es_if_exists' parameter "
|
||||
"to 'append' or 'replace' data."
|
||||
)
|
||||
|
||||
def test_es_if_exists_replace(self):
|
||||
# Assert that 'replace' allows for creation
|
||||
df1 = pandas_to_eland(
|
||||
pd_df2,
|
||||
es_client=ES_TEST_CLIENT,
|
||||
es_dest_index="test-index",
|
||||
es_if_exists="replace",
|
||||
es_refresh=True,
|
||||
).to_pandas()
|
||||
assert_frame_equal(pd_df2, df1)
|
||||
|
||||
# Assert that 'replace' will replace existing mapping and entries
|
||||
df2 = pandas_to_eland(
|
||||
pd_df,
|
||||
es_client=ES_TEST_CLIENT,
|
||||
es_dest_index="test-index",
|
||||
es_if_exists="replace",
|
||||
es_refresh=True,
|
||||
)
|
||||
assert_pandas_eland_frame_equal(pd_df, df2)
|
||||
|
||||
df3 = pandas_to_eland(
|
||||
pd_df2,
|
||||
es_client=ES_TEST_CLIENT,
|
||||
es_dest_index="test-index",
|
||||
es_if_exists="replace",
|
||||
es_refresh=True,
|
||||
).to_pandas()
|
||||
assert_frame_equal(df1, df3)
|
||||
|
||||
def test_es_if_exists_append(self):
|
||||
df1 = pandas_to_eland(
|
||||
pd_df,
|
||||
es_client=ES_TEST_CLIENT,
|
||||
es_dest_index="test-index",
|
||||
es_if_exists="append",
|
||||
es_refresh=True,
|
||||
# We use 'short' here specifically so that the
|
||||
# assumed type of 'long' is coerced into a 'short'
|
||||
# by append mode.
|
||||
es_type_overrides={"a": "short"},
|
||||
)
|
||||
assert_pandas_eland_frame_equal(pd_df, df1)
|
||||
assert df1.shape == (3, 4)
|
||||
|
||||
pd_df2 = pd.DataFrame(
|
||||
{
|
||||
"a": [4, 5, 6],
|
||||
"b": [-1.0, -2.0, -3.0],
|
||||
"c": ["A", "B", "C"],
|
||||
"d": [dt, dt - timedelta(1), dt - timedelta(2)],
|
||||
},
|
||||
index=["3", "4", "5"],
|
||||
)
|
||||
df2 = pandas_to_eland(
|
||||
pd_df2,
|
||||
es_client=ES_TEST_CLIENT,
|
||||
es_dest_index="test-index",
|
||||
es_if_exists="append",
|
||||
es_refresh=True,
|
||||
)
|
||||
|
||||
# Assert that the second pandas dataframe is actually appended
|
||||
assert df2.shape == (6, 4)
|
||||
pd_df3 = pd_df.append(pd_df2)
|
||||
assert_pandas_eland_frame_equal(pd_df3, df2)
|
||||
|
||||
def test_es_if_exists_append_mapping_mismatch(self):
|
||||
df1 = pandas_to_eland(
|
||||
pd_df,
|
||||
es_client=ES_TEST_CLIENT,
|
||||
es_dest_index="test-index",
|
||||
es_if_exists="append",
|
||||
es_refresh=True,
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError) as e:
|
||||
pandas_to_eland(
|
||||
pd_df2,
|
||||
es_client=ES_TEST_CLIENT,
|
||||
es_dest_index="test-index",
|
||||
es_if_exists="append",
|
||||
)
|
||||
|
||||
assert str(e.value) == (
|
||||
"DataFrame dtypes and Elasticsearch index mapping aren't compatible:\n"
|
||||
"- 'b' is missing from DataFrame columns\n"
|
||||
"- 'c' is missing from DataFrame columns\n"
|
||||
"- 'd' is missing from DataFrame columns\n"
|
||||
"- 'Z' is missing from ES index mapping\n"
|
||||
"- 'a' column type ('keyword') not compatible with ES index mapping type ('long')"
|
||||
)
|
||||
# Assert that the index isn't modified
|
||||
assert_pandas_eland_frame_equal(pd_df, df1)
|
||||
|
||||
def test_es_if_exists_append_es_type_coerce_error(self):
|
||||
df1 = pandas_to_eland(
|
||||
pd_df,
|
||||
es_client=ES_TEST_CLIENT,
|
||||
es_dest_index="test-index",
|
||||
es_if_exists="append",
|
||||
es_refresh=True,
|
||||
es_type_overrides={"a": "byte"},
|
||||
)
|
||||
assert_pandas_eland_frame_equal(pd_df, df1)
|
||||
|
||||
pd_df_short = pd.DataFrame(
|
||||
{
|
||||
"a": [128], # This value is too large for 'byte'
|
||||
"b": [-1.0],
|
||||
"c": ["A"],
|
||||
"d": [dt],
|
||||
},
|
||||
index=["3"],
|
||||
)
|
||||
|
||||
with pytest.raises(BulkIndexError) as e:
|
||||
pandas_to_eland(
|
||||
pd_df_short,
|
||||
es_client=ES_TEST_CLIENT,
|
||||
es_dest_index="test-index",
|
||||
es_if_exists="append",
|
||||
)
|
||||
|
||||
# Assert that the value 128 caused the index error
|
||||
assert "Value [128] is out of range for a byte" in str(e.value)
|
Loading…
x
Reference in New Issue
Block a user