mirror of
https://github.com/elastic/eland.git
synced 2025-07-11 00:02:14 +08:00
Allow dict in es_type_overrides, text fields by default get keyword sub-field
This commit is contained in:
parent
cb4cd083c3
commit
b936e98012
@ -26,6 +26,7 @@ from typing import (
|
||||
Optional,
|
||||
Set,
|
||||
Tuple,
|
||||
Union,
|
||||
)
|
||||
|
||||
import numpy as np
|
||||
@ -515,26 +516,7 @@ class FieldMappings:
|
||||
-------
|
||||
mapping : str
|
||||
"""
|
||||
|
||||
"""
|
||||
"mappings" : {
|
||||
"properties" : {
|
||||
"AvgTicketPrice" : {
|
||||
"type" : "float"
|
||||
},
|
||||
"Cancelled" : {
|
||||
"type" : "boolean"
|
||||
},
|
||||
"Carrier" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"Dest" : {
|
||||
"type" : "keyword"
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
es_dtype: str
|
||||
es_dtype: Union[str, Dict[str, Any]]
|
||||
|
||||
mapping_props: Dict[str, Any] = {}
|
||||
|
||||
@ -550,10 +532,18 @@ class FieldMappings:
|
||||
for column, dtype in dataframe.dtypes.iteritems():
|
||||
if es_type_overrides is not None and column in es_type_overrides:
|
||||
es_dtype = es_type_overrides[column]
|
||||
if es_dtype == "text":
|
||||
es_dtype = {
|
||||
"type": "text",
|
||||
"fields": {"keyword": {"type": "keyword"}},
|
||||
}
|
||||
else:
|
||||
es_dtype = FieldMappings._pd_dtype_to_es_dtype(dtype)
|
||||
|
||||
mapping_props[column] = {"type": es_dtype}
|
||||
if isinstance(es_dtype, str):
|
||||
mapping_props[column] = {"type": es_dtype}
|
||||
else:
|
||||
mapping_props[column] = es_dtype
|
||||
|
||||
return {"mappings": {"properties": mapping_props}}
|
||||
|
||||
|
@ -598,6 +598,11 @@ class Operations:
|
||||
|
||||
# Construct Query
|
||||
for by_field in by_fields:
|
||||
if by_field.aggregatable_es_field_name is None:
|
||||
raise ValueError(
|
||||
f"Cannot use {by_field.column!r} with groupby() because "
|
||||
f"it has no aggregatable fields in Elasticsearch"
|
||||
)
|
||||
# groupby fields will be term aggregations
|
||||
body.composite_agg_bucket_terms(
|
||||
name=f"groupby_{by_field.column}",
|
||||
|
@ -112,7 +112,10 @@ class TestDataFrameUtils(TestData):
|
||||
"E": {"type": "double"},
|
||||
"F": {"type": "boolean"},
|
||||
"G": {"type": "long"},
|
||||
"H": {"type": "text"},
|
||||
"H": {
|
||||
"type": "text",
|
||||
"fields": {"keyword": {"type": "keyword"}},
|
||||
},
|
||||
"I": {"type": "geo_point"},
|
||||
}
|
||||
}
|
||||
|
@ -196,3 +196,47 @@ class TestPandasToEland:
|
||||
|
||||
# Assert that the value 128 caused the index error
|
||||
assert "Value [128] is out of range for a byte" in str(e.value)
|
||||
|
||||
def test_pandas_to_eland_text_inserts_keyword(self):
|
||||
es = ES_TEST_CLIENT
|
||||
df1 = pandas_to_eland(
|
||||
pd_df,
|
||||
es_client=es,
|
||||
es_dest_index="test-index",
|
||||
es_if_exists="append",
|
||||
es_refresh=True,
|
||||
es_type_overrides={
|
||||
"c": "text",
|
||||
"b": {"type": "float"},
|
||||
"d": {"type": "text"},
|
||||
},
|
||||
)
|
||||
assert es.indices.get_mapping(index="test-index") == {
|
||||
"test-index": {
|
||||
"mappings": {
|
||||
"properties": {
|
||||
"a": {"type": "long"},
|
||||
"b": {"type": "float"},
|
||||
"c": {
|
||||
"fields": {"keyword": {"type": "keyword"}},
|
||||
"type": "text",
|
||||
},
|
||||
"d": {"type": "text"},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# 'c' is aggregatable on 'keyword'
|
||||
assert df1.groupby("c").mean().to_dict() == {
|
||||
"a": {"A": 1.0, "B": 2.0, "C": 3.0},
|
||||
"b": {"A": 1.0, "B": 2.0, "C": 3.0},
|
||||
}
|
||||
|
||||
# 'd' isn't aggregatable because it's missing the 'keyword'
|
||||
with pytest.raises(ValueError) as e:
|
||||
df1.groupby("d").mean()
|
||||
assert str(e.value) == (
|
||||
"Cannot use 'd' with groupby() because it has "
|
||||
"no aggregatable fields in Elasticsearch"
|
||||
)
|
||||
|
Loading…
x
Reference in New Issue
Block a user