mirror of
https://github.com/elastic/eland.git
synced 2025-07-11 00:02:14 +08:00
Feature/mapping cache (#103)
* Adding python 3.5 compatibility. Main issue is ordering of dictionaries. * Updating notebooks with 3.7 results. * Removing tempoorary code. * Defaulting to OrderedDict for python 3.5 + lint all code All code reformated by PyCharm and inspection results analysed. * Adding support for multiple arithmetic operations. Added new 'arithmetics' file to manage this process. More tests to be added + cleanup. * Signficant refactor to arithmetics and mappings. Work in progress. Tests don't pass. * Major refactor to Mappings. Field name mappings were stored in different places (Mappings, QueryCompiler, Operations) and needed to be keep in sync. With the addition of complex arithmetic operations this became complex and difficult to maintain. Therefore, all field naming is now in 'FieldMappings' which replaces 'Mappings'. Note this commit removes the cache for some of the mapped values and so the code is SIGNIFICANTLY slower on large indices. In addition, the addition of date_format to Mappings has been removed. This again added more unncessary complexity. * Adding OrderedDict for 3.5 compatibility * Fixes to ordering issues with 3.5 * Adding simple cache for mappings in flatten Improves performance significantly on large datasets (>10000 rows). * Adding updated notebooks (new info_es). All tests (doc + nbval + pytest) pass.
This commit is contained in:
parent
efe21a6d87
commit
903fbf0341
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@ -454,7 +454,7 @@ class FieldMappings:
|
||||
# <class 'dict'>: {'category.keyword': 'category', 'currency': 'currency', ...
|
||||
return OrderedDict(aggregatables[['aggregatable_es_field_name', 'es_field_name']].to_dict(orient='split')['data'])
|
||||
|
||||
def get_date_field_format(self, es_field_name):
|
||||
def date_field_format(self, es_field_name):
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
|
@ -203,6 +203,11 @@ class QueryCompiler:
|
||||
if results is None:
|
||||
return partial_result, self._empty_pd_ef()
|
||||
|
||||
# This is one of the most performance critical areas of eland, and it repeatedly calls
|
||||
# self._mappings.field_name_pd_dtype and self._mappings.date_field_format
|
||||
# therefore create a simple cache for this data
|
||||
field_mapping_cache = FieldMappingCache(self._mappings)
|
||||
|
||||
rows = []
|
||||
index = []
|
||||
if isinstance(results, dict):
|
||||
@ -236,7 +241,7 @@ class QueryCompiler:
|
||||
index.append(index_field)
|
||||
|
||||
# flatten row to map correctly to 2D DataFrame
|
||||
rows.append(self._flatten_dict(row))
|
||||
rows.append(self._flatten_dict(row, field_mapping_cache))
|
||||
|
||||
if batch_size is not None:
|
||||
if i >= batch_size:
|
||||
@ -264,7 +269,7 @@ class QueryCompiler:
|
||||
|
||||
return partial_result, df
|
||||
|
||||
def _flatten_dict(self, y):
|
||||
def _flatten_dict(self, y, field_mapping_cache):
|
||||
out = OrderedDict()
|
||||
|
||||
def flatten(x, name=''):
|
||||
@ -275,10 +280,11 @@ class QueryCompiler:
|
||||
pd_dtype = 'object'
|
||||
else:
|
||||
try:
|
||||
pd_dtype = self._mappings.field_name_pd_dtype(name[:-1])
|
||||
pd_dtype = field_mapping_cache.field_name_pd_dtype(name[:-1])
|
||||
is_source_field = True
|
||||
except KeyError:
|
||||
is_source_field = False
|
||||
pd_dtype = 'object'
|
||||
|
||||
if not is_source_field and type(x) is dict:
|
||||
for a in x:
|
||||
@ -294,7 +300,7 @@ class QueryCompiler:
|
||||
if pd_dtype == 'datetime64[ns]':
|
||||
x = elasticsearch_date_to_pandas_date(
|
||||
x,
|
||||
self._mappings.get_date_field_format(field_name)
|
||||
field_mapping_cache.date_field_format(field_name)
|
||||
)
|
||||
|
||||
# Elasticsearch can have multiple values for a field. These are represented as lists, so
|
||||
@ -725,3 +731,37 @@ def elasticsearch_date_to_pandas_date(value: Union[int, str], date_format: str)
|
||||
Warning)
|
||||
# TODO investigate how we could generate this just once for a bulk read.
|
||||
return pd.to_datetime(value)
|
||||
|
||||
class FieldMappingCache:
|
||||
"""
|
||||
Very simple dict cache for field mappings. This improves performance > 3 times on large datasets as
|
||||
DataFrame access is slower than dict access.
|
||||
"""
|
||||
def __init__(self, mappings):
|
||||
self._mappings = mappings
|
||||
|
||||
self._field_name_pd_dtype = dict()
|
||||
self._date_field_format = dict()
|
||||
|
||||
def field_name_pd_dtype(self, es_field_name):
|
||||
if es_field_name in self._field_name_pd_dtype:
|
||||
return self._field_name_pd_dtype[es_field_name]
|
||||
|
||||
pd_dtype = self._mappings.field_name_pd_dtype(es_field_name)
|
||||
|
||||
# cache this
|
||||
self._field_name_pd_dtype[es_field_name] = pd_dtype
|
||||
|
||||
return pd_dtype
|
||||
|
||||
def date_field_format(self, es_field_name):
|
||||
if es_field_name in self._date_field_format:
|
||||
return self._date_field_format[es_field_name]
|
||||
|
||||
es_date_field_format = self._mappings.date_field_format(es_field_name)
|
||||
|
||||
# cache this
|
||||
self._date_field_format[es_field_name] = es_date_field_format
|
||||
|
||||
return es_date_field_format
|
||||
|
||||
|
@ -77,7 +77,7 @@ class TestDateTime(TestData):
|
||||
# print(buf.getvalue())
|
||||
|
||||
for format_name in self.time_formats.keys():
|
||||
es_date_format = ed_field_mappings.get_date_field_format(format_name)
|
||||
es_date_format = ed_field_mappings.date_field_format(format_name)
|
||||
|
||||
assert format_name == es_date_format
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user