From b936e98012c983d9bdf5da313faf874e08862300 Mon Sep 17 00:00:00 2001 From: Seth Michael Larson Date: Thu, 29 Oct 2020 13:16:42 -0500 Subject: [PATCH] Allow dict in es_type_overrides, text fields by default get keyword sub-field --- eland/field_mappings.py | 32 ++++++---------- eland/operations.py | 5 +++ eland/tests/dataframe/test_utils_pytest.py | 5 ++- eland/tests/etl/test_pandas_to_eland.py | 44 ++++++++++++++++++++++ 4 files changed, 64 insertions(+), 22 deletions(-) diff --git a/eland/field_mappings.py b/eland/field_mappings.py index 00cb2f2..e2c3b57 100644 --- a/eland/field_mappings.py +++ b/eland/field_mappings.py @@ -26,6 +26,7 @@ from typing import ( Optional, Set, Tuple, + Union, ) import numpy as np @@ -515,26 +516,7 @@ class FieldMappings: ------- mapping : str """ - - """ - "mappings" : { - "properties" : { - "AvgTicketPrice" : { - "type" : "float" - }, - "Cancelled" : { - "type" : "boolean" - }, - "Carrier" : { - "type" : "keyword" - }, - "Dest" : { - "type" : "keyword" - } - } - } - """ - es_dtype: str + es_dtype: Union[str, Dict[str, Any]] mapping_props: Dict[str, Any] = {} @@ -550,10 +532,18 @@ class FieldMappings: for column, dtype in dataframe.dtypes.iteritems(): if es_type_overrides is not None and column in es_type_overrides: es_dtype = es_type_overrides[column] + if es_dtype == "text": + es_dtype = { + "type": "text", + "fields": {"keyword": {"type": "keyword"}}, + } else: es_dtype = FieldMappings._pd_dtype_to_es_dtype(dtype) - mapping_props[column] = {"type": es_dtype} + if isinstance(es_dtype, str): + mapping_props[column] = {"type": es_dtype} + else: + mapping_props[column] = es_dtype return {"mappings": {"properties": mapping_props}} diff --git a/eland/operations.py b/eland/operations.py index ea376ae..2a12986 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -598,6 +598,11 @@ class Operations: # Construct Query for by_field in by_fields: + if by_field.aggregatable_es_field_name is None: + raise ValueError( + f"Cannot use {by_field.column!r} with groupby() because " + f"it has no aggregatable fields in Elasticsearch" + ) # groupby fields will be term aggregations body.composite_agg_bucket_terms( name=f"groupby_{by_field.column}", diff --git a/eland/tests/dataframe/test_utils_pytest.py b/eland/tests/dataframe/test_utils_pytest.py index 9c9f3d0..6dfcc22 100644 --- a/eland/tests/dataframe/test_utils_pytest.py +++ b/eland/tests/dataframe/test_utils_pytest.py @@ -112,7 +112,10 @@ class TestDataFrameUtils(TestData): "E": {"type": "double"}, "F": {"type": "boolean"}, "G": {"type": "long"}, - "H": {"type": "text"}, + "H": { + "type": "text", + "fields": {"keyword": {"type": "keyword"}}, + }, "I": {"type": "geo_point"}, } } diff --git a/eland/tests/etl/test_pandas_to_eland.py b/eland/tests/etl/test_pandas_to_eland.py index e4a054a..7f8596e 100644 --- a/eland/tests/etl/test_pandas_to_eland.py +++ b/eland/tests/etl/test_pandas_to_eland.py @@ -196,3 +196,47 @@ class TestPandasToEland: # Assert that the value 128 caused the index error assert "Value [128] is out of range for a byte" in str(e.value) + + def test_pandas_to_eland_text_inserts_keyword(self): + es = ES_TEST_CLIENT + df1 = pandas_to_eland( + pd_df, + es_client=es, + es_dest_index="test-index", + es_if_exists="append", + es_refresh=True, + es_type_overrides={ + "c": "text", + "b": {"type": "float"}, + "d": {"type": "text"}, + }, + ) + assert es.indices.get_mapping(index="test-index") == { + "test-index": { + "mappings": { + "properties": { + "a": {"type": "long"}, + "b": {"type": "float"}, + "c": { + "fields": {"keyword": {"type": "keyword"}}, + "type": "text", + }, + "d": {"type": "text"}, + } + } + } + } + + # 'c' is aggregatable on 'keyword' + assert df1.groupby("c").mean().to_dict() == { + "a": {"A": 1.0, "B": 2.0, "C": 3.0}, + "b": {"A": 1.0, "B": 2.0, "C": 3.0}, + } + + # 'd' isn't aggregatable because it's missing the 'keyword' + with pytest.raises(ValueError) as e: + df1.groupby("d").mean() + assert str(e.value) == ( + "Cannot use 'd' with groupby() because it has " + "no aggregatable fields in Elasticsearch" + )