Feature/refactor tasks (#83)

* Significant refactor of task list in operations.py

Classes based on composite pattern replace tuples for
tasks.

* Addressing review comments for eland/operations.py

* Minor update to review fixes

* Minor fix for some better handling of non-aggregatable fields: https://github.com/elastic/eland/issues/71

* Test for non-aggrgatable value_counts

* Refactoring tasks/actions

* Removing debug and fixing doctest
This commit is contained in:
stevedodson 2019-12-06 08:46:43 +00:00 committed by GitHub
parent f263e21b8a
commit f06219f0ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 625 additions and 445 deletions

View File

@ -1 +1 @@
include LICENSE
include LICENSE.txt

View File

@ -34,14 +34,15 @@ max 400140.000000 246.000000 5.000000
## Connecting to Elasticsearch Cloud
```python
```
>>> import eland as ed
>>> from elasticsearch import Elasticsearch
>>> es = Elasticsearch(cloud_id="<cloud_id>", http_auth=('<user>','<password>'))
>>> es.info()
{'name': 'instance-0000000000', 'cluster_name': 'bf900cfce5684a81bca0be0cce5913bc', 'cluster_uuid': 'xLPvrV3jQNeadA7oM4l1jA', 'version': {'number': '7.4.2', 'build_flavor': 'default', 'build_type': 'tar', 'build_hash': '2f90bbf7b93631e52bafb59b3b049cb44ec25e96', 'build_date': '2019-10-28T20:40:44.881551Z', 'build_snapshot': False, 'lucene_version': '8.2.0', 'minimum_wire_compatibility_version': '6.8.0', 'minimum_index_compatibility_version': '6.0.0-beta1'}, 'tagline': 'You Know, for Search'}
>>> import eland as ed
>>> df = ed.read_es(es, 'reviews')
```

87
eland/actions.py Normal file
View File

@ -0,0 +1,87 @@
from abc import ABC, abstractmethod
# -------------------------------------------------------------------------------------------------------------------- #
# PostProcessingActions #
# -------------------------------------------------------------------------------------------------------------------- #
class PostProcessingAction(ABC):
def __init__(self, action_type):
"""
Abstract class for postprocessing actions
Parameters
----------
action_type: str
The action type (e.g. sort_index, head etc.)
"""
self._action_type = action_type
@property
def type(self):
return self._action_type
@abstractmethod
def resolve_action(self, df):
pass
@abstractmethod
def __repr__(self):
pass
class SortIndexAction(PostProcessingAction):
def __init__(self):
super().__init__("sort_index")
def resolve_action(self, df):
return df.sort_index()
def __repr__(self):
return "('{}')".format(self.type)
class HeadAction(PostProcessingAction):
def __init__(self, count):
super().__init__("head")
self._count = count
def resolve_action(self, df):
return df.head(self._count)
def __repr__(self):
return "('{}': ('count': {}))".format(self.type, self._count)
class TailAction(PostProcessingAction):
def __init__(self, count):
super().__init__("tail")
self._count = count
def resolve_action(self, df):
return df.tail(self._count)
def __repr__(self):
return "('{}': ('count': {}))".format(self.type, self._count)
class SortFieldAction(PostProcessingAction):
def __init__(self, sort_params_string):
super().__init__("sort_field")
if sort_params_string is None:
raise ValueError("Expected valid string")
# Split string
sort_params = sort_params_string.split(":")
if len(sort_params) != 2:
raise ValueError("Expected ES sort params string (e.g. _doc:desc). Got '{}'".format(sort_params_string))
self._sort_field = sort_params[0]
self._sort_order = Operations.SortOrder.from_string(sort_params[1])
def resolve_action(self, df):
if self._sort_order == Operations.SortOrder.ASC:
return df.sort_values(self._sort_field, True)
return df.sort_values(self._sort_field, False)
def __repr__(self):
return "('{}': ('sort_field': '{}', 'sort_order': {}))".format(self.type, self._sort_field, self._sort_order)

View File

@ -385,12 +385,14 @@ class DataFrame(NDFrame):
<BLANKLINE>
[27 rows x 5 columns]
Operations:
tasks: [('boolean_filter', {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}), ('field_names', ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']), ('tail', ('_doc', 5))]
tasks: [('boolean_filter': ('boolean_filter': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}})), ('tail': ('sort_field': '_doc', 'count': 5))]
size: 5
sort_params: _doc:desc
_source: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']
body: {'query': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}, 'aggs': {}}
post_processing: ['sort_index']
post_processing: [('sort_index')]
'field_to_display_names': {}
'display_to_field_names': {}
<BLANKLINE>
"""
buf = StringIO()

View File

@ -1,7 +1,7 @@
"""
NDFrame
---------
Base class for eland.DataFrame and eland.Series.
Abstract base class for eland.DataFrame and eland.Series.
The underlying data resides in Elasticsearch and the API aligns as much as
possible with pandas APIs.
@ -24,6 +24,7 @@ only Elasticsearch aggregatable fields can be aggregated or grouped.
"""
import sys
from abc import ABC
import pandas as pd
from pandas.core.dtypes.common import is_list_like
@ -32,7 +33,7 @@ from pandas.util._validators import validate_bool_kwarg
from eland import ElandQueryCompiler
class NDFrame:
class NDFrame(ABC):
def __init__(self,
client=None,

View File

@ -1,11 +1,12 @@
import copy
from enum import Enum
import numpy as np
import pandas as pd
from eland import Index
from eland import Query
from eland.actions import SortFieldAction
from eland.tasks import HeadTask, TailTask, BooleanFilterTask, ArithmeticOpFieldsTask, QueryTermsTask, \
QueryIdsTask, SortOrder, SizeTask
class Operations:
@ -20,87 +21,46 @@ class Operations:
This is maintained as a 'task graph' (inspired by dask)
(see https://docs.dask.org/en/latest/spec.html)
"""
class SortOrder(Enum):
ASC = 0
DESC = 1
@staticmethod
def reverse(order):
if order == Operations.SortOrder.ASC:
return Operations.SortOrder.DESC
return Operations.SortOrder.ASC
@staticmethod
def to_string(order):
if order == Operations.SortOrder.ASC:
return "asc"
return "desc"
@staticmethod
def from_string(order):
if order == "asc":
return Operations.SortOrder.ASC
return Operations.SortOrder.DESC
def __init__(self, tasks=None):
def __init__(self, tasks=None, field_names=None):
if tasks is None:
self._tasks = []
else:
self._tasks = tasks
self._field_names = field_names
def __constructor__(self, *args, **kwargs):
return type(self)(*args, **kwargs)
def copy(self):
return self.__constructor__(tasks=copy.deepcopy(self._tasks))
return self.__constructor__(tasks=copy.deepcopy(self._tasks), field_names=copy.deepcopy(self._field_names))
def head(self, index, n):
# Add a task that is an ascending sort with size=n
task = ('head', (index.sort_field, n))
task = HeadTask(index.sort_field, n)
self._tasks.append(task)
def tail(self, index, n):
# Add a task that is descending sort with size=n
task = ('tail', (index.sort_field, n))
task = TailTask(index.sort_field, n)
self._tasks.append(task)
def arithmetic_op_fields(self, field_name, op_name, left_field, right_field, op_type=None):
if op_type:
task = ('arithmetic_op_fields', (field_name, (op_name, (left_field, right_field))), op_type)
else:
task = ('arithmetic_op_fields', (field_name, (op_name, (left_field, right_field))))
# Set this as a column we want to retrieve
self.set_field_names([field_name])
task = ArithmeticOpFieldsTask(field_name, op_name, left_field, right_field, op_type)
self._tasks.append(task)
def set_field_names(self, field_names):
# Setting field_names at different phases of the task list may result in different
# operations. So instead of setting field_names once, set when it happens in call chain
if not isinstance(field_names, list):
field_names = list(field_names)
# TODO - field_name renaming
# TODO - validate we are setting field_names to a subset of last field_names?
task = ('field_names', field_names)
self._tasks.append(task)
# Iterate backwards through task list looking for last 'field_names' task
for task in reversed(self._tasks):
if task[0] == 'field_names':
return task[1]
return None
self._field_names = field_names
return self._field_names
def get_field_names(self):
# Iterate backwards through task list looking for last 'field_names' task
for task in reversed(self._tasks):
if task[0] == 'field_names':
return task[1]
return None
return self._field_names
def __repr__(self):
return repr(self._tasks)
@ -248,7 +208,9 @@ class Operations:
results = {}
for key, value in aggregatable_field_names.items():
for key in aggregatable_field_names.keys():
# key is aggregatable field, value is label
# e.g. key=category.keyword, value=category
for bucket in response['aggregations'][key]['buckets']:
results[bucket['key']] = bucket['doc_count']
@ -597,7 +559,7 @@ class Operations:
_source=field_names)
# create post sort
if sort_params is not None:
post_processing.append(self._sort_params_to_postprocessing(sort_params))
post_processing.append(SortFieldAction(sort_params))
if is_scan:
while True:
@ -611,11 +573,6 @@ class Operations:
df = self._apply_df_post_processing(df, post_processing)
collector.collect(df)
def iloc(self, index, field_names):
# index and field_names are indexers
task = ('iloc', (index, field_names))
self._tasks.append(task)
def index_count(self, query_compiler, field):
# field is the index field so count values
query_params, post_processing = self._resolve_tasks()
@ -671,28 +628,16 @@ class Operations:
# b not in ['a','b','c']
# For now use term queries
if field == Index.ID_INDEX_FIELD:
task = ('query_ids', ('must_not', items))
task = QueryIdsTask(False, items)
else:
task = ('query_terms', ('must_not', (field, items)))
task = QueryTermsTask(False, field, items)
self._tasks.append(task)
@staticmethod
def _sort_params_to_postprocessing(input):
# Split string
sort_params = input.split(":")
query_sort_field = sort_params[0]
query_sort_order = Operations.SortOrder.from_string(sort_params[1])
task = ('sort_field', (query_sort_field, query_sort_order))
return task
@staticmethod
def _query_params_to_size_and_sort(query_params):
sort_params = None
if query_params['query_sort_field'] and query_params['query_sort_order']:
sort_params = query_params['query_sort_field'] + ":" + Operations.SortOrder.to_string(
sort_params = query_params['query_sort_field'] + ":" + SortOrder.to_string(
query_params['query_sort_order'])
size = query_params['query_size']
@ -703,37 +648,16 @@ class Operations:
def _count_post_processing(post_processing):
size = None
for action in post_processing:
if action[0] == 'head' or action[0] == 'tail':
if size is None or action[1][1] < size:
size = action[1][1]
if isinstance(action, SizeTask):
if size is None or action.size() < size:
size = action.size()
return size
@staticmethod
def _apply_df_post_processing(df, post_processing):
for action in post_processing:
if action == 'sort_index':
df = df.sort_index()
elif action[0] == 'head':
df = df.head(action[1][1])
elif action[0] == 'tail':
df = df.tail(action[1][1])
elif action[0] == 'sort_field':
sort_field = action[1][0]
sort_order = action[1][1]
if sort_order == Operations.SortOrder.ASC:
df = df.sort_values(sort_field, True)
else:
df = df.sort_values(sort_field, False)
elif action[0] == 'iloc':
index_indexer = action[1][0]
field_name_indexer = action[1][1]
if index_indexer is None:
index_indexer = slice(None)
if field_name_indexer is None:
field_name_indexer = slice(None)
df = df.iloc[index_indexer, field_name_indexer]
# field_names could be in here (and we ignore it)
df = action.resolve_action(df)
return df
@ -752,337 +676,7 @@ class Operations:
post_processing = []
for task in self._tasks:
if task[0] == 'head':
query_params, post_processing = self._resolve_head(task, query_params, post_processing)
elif task[0] == 'tail':
query_params, post_processing = self._resolve_tail(task, query_params, post_processing)
elif task[0] == 'iloc':
query_params, post_processing = self._resolve_iloc(task, query_params, post_processing)
elif task[0] == 'query_ids':
query_params, post_processing = self._resolve_query_ids(task, query_params, post_processing)
elif task[0] == 'query_terms':
query_params, post_processing = self._resolve_query_terms(task, query_params, post_processing)
elif task[0] == 'boolean_filter':
query_params, post_processing = self._resolve_boolean_filter(task, query_params, post_processing)
elif task[0] == 'arithmetic_op_fields':
query_params, post_processing = self._resolve_arithmetic_op_fields(task, query_params, post_processing)
else: # a lot of operations simply post-process the dataframe - put these straight through
query_params, post_processing = self._resolve_post_processing_task(task, query_params, post_processing)
return query_params, post_processing
@staticmethod
def _resolve_head(item, query_params, post_processing):
# head - sort asc, size n
# |12345-------------|
query_sort_field = item[1][0]
query_sort_order = Operations.SortOrder.ASC
query_size = item[1][1]
# If we are already postprocessing the query results, we just get 'head' of these
# (note, currently we just append another head, we don't optimise by
# overwriting previous head)
if len(post_processing) > 0:
post_processing.append(item)
return query_params, post_processing
if query_params['query_sort_field'] is None:
query_params['query_sort_field'] = query_sort_field
# if it is already sorted we use existing field
if query_params['query_sort_order'] is None:
query_params['query_sort_order'] = query_sort_order
# if it is already sorted we get head of existing order
if query_params['query_size'] is None:
query_params['query_size'] = query_size
else:
# truncate if head is smaller
if query_size < query_params['query_size']:
query_params['query_size'] = query_size
return query_params, post_processing
@staticmethod
def _resolve_tail(item, query_params, post_processing):
# tail - sort desc, size n, post-process sort asc
# |-------------12345|
query_sort_field = item[1][0]
query_sort_order = Operations.SortOrder.DESC
query_size = item[1][1]
# If this is a tail of a tail adjust settings and return
if query_params['query_size'] is not None and \
query_params['query_sort_order'] == query_sort_order and \
post_processing == ['sort_index']:
if query_size < query_params['query_size']:
query_params['query_size'] = query_size
return query_params, post_processing
# If we are already postprocessing the query results, just get 'tail' of these
# (note, currently we just append another tail, we don't optimise by
# overwriting previous tail)
if len(post_processing) > 0:
post_processing.append(item)
return query_params, post_processing
# If results are already constrained, just get 'tail' of these
# (note, currently we just append another tail, we don't optimise by
# overwriting previous tail)
if query_params['query_size'] is not None:
post_processing.append(item)
return query_params, post_processing
else:
query_params['query_size'] = query_size
if query_params['query_sort_field'] is None:
query_params['query_sort_field'] = query_sort_field
if query_params['query_sort_order'] is None:
query_params['query_sort_order'] = query_sort_order
else:
# reverse sort order
query_params['query_sort_order'] = Operations.SortOrder.reverse(query_sort_order)
post_processing.append('sort_index')
return query_params, post_processing
@staticmethod
def _resolve_iloc(item, query_params, post_processing):
# tail - sort desc, size n, post-process sort asc
# |---4--7-9---------|
# This is a list of items we return via an integer index
int_index = item[1][0]
if int_index is not None:
last_item = int_index.max()
# If we have a query_size we do this post processing
if query_params['query_size'] is not None:
post_processing.append(item)
return query_params, post_processing
# size should be > last item
query_params['query_size'] = last_item + 1
post_processing.append(item)
return query_params, post_processing
@staticmethod
def _resolve_query_ids(item, query_params, post_processing):
# task = ('query_ids', ('must_not', items))
must_clause = item[1][0]
ids = item[1][1]
if must_clause == 'must':
query_params['query'].ids(ids, must=True)
else:
query_params['query'].ids(ids, must=False)
return query_params, post_processing
@staticmethod
def _resolve_query_terms(item, query_params, post_processing):
# task = ('query_terms', ('must_not', (field, terms)))
must_clause = item[1][0]
field = item[1][1][0]
terms = item[1][1][1]
if must_clause == 'must':
query_params['query'].terms(field, terms, must=True)
else:
query_params['query'].terms(field, terms, must=False)
return query_params, post_processing
@staticmethod
def _resolve_boolean_filter(item, query_params, post_processing):
# task = ('boolean_filter', object)
boolean_filter = item[1]
query_params['query'].update_boolean_filter(boolean_filter)
return query_params, post_processing
def _resolve_arithmetic_op_fields(self, item, query_params, post_processing):
# task = ('arithmetic_op_fields', (field_name, (op_name, (left_field, right_field))))
field_name = item[1][0]
op_name = item[1][1][0]
left_field = item[1][1][1][0]
right_field = item[1][1][1][1]
try:
op_type = item[2]
except IndexError:
op_type = None
# https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-api-reference-shared-java-lang.html#painless-api-reference-shared-Math
if not op_type:
if isinstance(left_field, str) and isinstance(right_field, str):
"""
(if op_name = '__truediv__')
"script_fields": {
"field_name": {
"script": {
"source": "doc[left_field].value / doc[right_field].value"
}
}
}
"""
if op_name == '__add__':
source = "doc['{0}'].value + doc['{1}'].value".format(left_field, right_field)
elif op_name == '__truediv__':
source = "doc['{0}'].value / doc['{1}'].value".format(left_field, right_field)
elif op_name == '__floordiv__':
source = "Math.floor(doc['{0}'].value / doc['{1}'].value)".format(left_field, right_field)
elif op_name == '__pow__':
source = "Math.pow(doc['{0}'].value, doc['{1}'].value)".format(left_field, right_field)
elif op_name == '__mod__':
source = "doc['{0}'].value % doc['{1}'].value".format(left_field, right_field)
elif op_name == '__mul__':
source = "doc['{0}'].value * doc['{1}'].value".format(left_field, right_field)
elif op_name == '__sub__':
source = "doc['{0}'].value - doc['{1}'].value".format(left_field, right_field)
else:
raise NotImplementedError("Not implemented operation '{0}'".format(op_name))
if query_params['query_script_fields'] is None:
query_params['query_script_fields'] = {}
query_params['query_script_fields'][field_name] = {
'script': {
'source': source
}
}
elif isinstance(left_field, str) and np.issubdtype(np.dtype(type(right_field)), np.number):
"""
(if op_name = '__truediv__')
"script_fields": {
"field_name": {
"script": {
"source": "doc[left_field].value / right_field"
}
}
}
"""
if op_name == '__add__':
source = "doc['{0}'].value + {1}".format(left_field, right_field)
elif op_name == '__truediv__':
source = "doc['{0}'].value / {1}".format(left_field, right_field)
elif op_name == '__floordiv__':
source = "Math.floor(doc['{0}'].value / {1})".format(left_field, right_field)
elif op_name == '__pow__':
source = "Math.pow(doc['{0}'].value, {1})".format(left_field, right_field)
elif op_name == '__mod__':
source = "doc['{0}'].value % {1}".format(left_field, right_field)
elif op_name == '__mul__':
source = "doc['{0}'].value * {1}".format(left_field, right_field)
elif op_name == '__sub__':
source = "doc['{0}'].value - {1}".format(left_field, right_field)
else:
raise NotImplementedError("Not implemented operation '{0}'".format(op_name))
elif np.issubdtype(np.dtype(type(left_field)), np.number) and isinstance(right_field, str):
"""
(if op_name = '__truediv__')
"script_fields": {
"field_name": {
"script": {
"source": "left_field / doc['right_field'].value"
}
}
}
"""
if op_name == '__add__':
source = "{0} + doc['{1}'].value".format(left_field, right_field)
elif op_name == '__truediv__':
source = "{0} / doc['{1}'].value".format(left_field, right_field)
elif op_name == '__floordiv__':
source = "Math.floor({0} / doc['{1}'].value)".format(left_field, right_field)
elif op_name == '__pow__':
source = "Math.pow({0}, doc['{1}'].value)".format(left_field, right_field)
elif op_name == '__mod__':
source = "{0} % doc['{1}'].value".format(left_field, right_field)
elif op_name == '__mul__':
source = "{0} * doc['{1}'].value".format(left_field, right_field)
elif op_name == '__sub__':
source = "{0} - doc['{1}'].value".format(left_field, right_field)
else:
raise NotImplementedError("Not implemented operation '{0}'".format(op_name))
else:
raise TypeError("Types for operation inconsistent {} {} {}", type(left_field), type(right_field), op_name)
elif op_type[0] == "string":
# we need to check the type of string addition
if op_type[1] == "s":
"""
(if op_name = '__add__')
"script_fields": {
"field_name": {
"script": {
"source": "doc[left_field].value + doc[right_field].value"
}
}
}
"""
if op_name == '__add__':
source = "doc['{0}'].value + doc['{1}'].value".format(left_field, right_field)
else:
raise NotImplementedError("Not implemented operation '{0}'".format(op_name))
elif op_type[1] == "r":
if isinstance(left_field, str) and isinstance(right_field, str):
"""
(if op_name = '__add__')
"script_fields": {
"field_name": {
"script": {
"source": "doc[left_field].value + right_field"
}
}
}
"""
if op_name == '__add__':
source = "doc['{0}'].value + '{1}'".format(left_field, right_field)
else:
raise NotImplementedError("Not implemented operation '{0}'".format(op_name))
elif op_type[1] == 'l':
if isinstance(left_field, str) and isinstance(right_field, str):
"""
(if op_name = '__add__')
"script_fields": {
"field_name": {
"script": {
"source": "left_field + doc[right_field].value"
}
}
}
"""
if op_name == '__add__':
source = "'{0}' + doc['{1}'].value".format(left_field, right_field)
else:
raise NotImplementedError("Not implemented operation '{0}'".format(op_name))
if query_params['query_script_fields'] is None:
query_params['query_script_fields'] = {}
query_params['query_script_fields'][field_name] = {
'script': {
'source': source
}
}
return query_params, post_processing
@staticmethod
def _resolve_post_processing_task(item, query_params, post_processing):
# Just do this in post-processing
if item[0] != 'field_names':
post_processing.append(item)
query_params, post_processing = task.resolve_task(query_params, post_processing)
return query_params, post_processing
@ -1121,5 +715,5 @@ class Operations:
buf.write(" post_processing: {0}\n".format(post_processing))
def update_query(self, boolean_filter):
task = ('boolean_filter', boolean_filter)
task = BooleanFilterTask(boolean_filter)
self._tasks.append(task)

View File

@ -461,6 +461,7 @@ class ElandQueryCompiler:
self._index.info_es(buf)
self._mappings.info_es(buf)
self._operations.info_es(buf)
self._name_mapper.info_es(buf)
def describe(self):
return self._operations.describe(self)
@ -548,7 +549,7 @@ class ElandQueryCompiler:
else:
raise ValueError(
"Can not perform arithmetic operations on non aggregatable fields"
"One of [{}, {}] is not aggregatable.".format(self.name, right.name)
"One of [{}, {}] is not aggregatable.".format(self_field, right_field)
)
def arithmetic_op_fields(self, new_field_name, op, left_field, right_field, op_type=None):
@ -648,6 +649,9 @@ class ElandQueryCompiler:
display_to_field_names=self._display_to_field_names.copy()
)
def info_es(self, buf):
buf.write("'field_to_display_names': {}\n".format(self._field_to_display_names))
buf.write("'display_to_field_names': {}\n".format(self._display_to_field_names))
def elasticsearch_date_to_pandas_date(value: Union[int, str], date_format: str) -> pd.Timestamp:
"""

View File

@ -215,7 +215,7 @@ class Series(NDFrame):
Returns
-------
pandas.Series
number of occurences of each value in the column
number of occurrences of each value in the column
See Also
--------

438
eland/tasks.py Normal file
View File

@ -0,0 +1,438 @@
from abc import ABC, abstractmethod
from enum import Enum
import numpy as np
from eland.actions import HeadAction, TailAction, SortIndexAction
class SortOrder(Enum):
ASC = 0
DESC = 1
@staticmethod
def reverse(order):
if order == SortOrder.ASC:
return SortOrder.DESC
return SortOrder.ASC
@staticmethod
def to_string(order):
if order == SortOrder.ASC:
return "asc"
return "desc"
@staticmethod
def from_string(order):
if order == "asc":
return SortOrder.ASC
return SortOrder.DESC
# -------------------------------------------------------------------------------------------------------------------- #
# Tasks #
# -------------------------------------------------------------------------------------------------------------------- #
class Task(ABC):
"""
Abstract class for tasks
Parameters
----------
task_type: str
The task type (e.g. head, tail etc.)
"""
def __init__(self, task_type):
self._task_type = task_type
@property
def type(self):
return self._task_type
@abstractmethod
def resolve_task(self, query_params, post_processing):
pass
@abstractmethod
def __repr__(self):
pass
class SizeTask(Task):
def __init__(self, task_type):
super().__init__(task_type)
@abstractmethod
def size(self):
# must override
pass
class HeadTask(SizeTask):
def __init__(self, sort_field, count):
super().__init__("head")
# Add a task that is an ascending sort with size=count
self._sort_field = sort_field
self._count = count
def __repr__(self):
return "('{}': ('sort_field': '{}', 'count': {}))".format(self._task_type, self._sort_field, self._count)
def resolve_task(self, query_params, post_processing):
# head - sort asc, size n
# |12345-------------|
query_sort_field = self._sort_field
query_sort_order = SortOrder.ASC
query_size = self._count
# If we are already postprocessing the query results, we just get 'head' of these
# (note, currently we just append another head, we don't optimise by
# overwriting previous head)
if len(post_processing) > 0:
post_processing.append(HeadAction(self._count))
return query_params, post_processing
if query_params['query_sort_field'] is None:
query_params['query_sort_field'] = query_sort_field
# if it is already sorted we use existing field
if query_params['query_sort_order'] is None:
query_params['query_sort_order'] = query_sort_order
# if it is already sorted we get head of existing order
if query_params['query_size'] is None:
query_params['query_size'] = query_size
else:
# truncate if head is smaller
if query_size < query_params['query_size']:
query_params['query_size'] = query_size
return query_params, post_processing
def size(self):
return self._count
class TailTask(SizeTask):
def __init__(self, sort_field, count):
super().__init__("tail")
# Add a task that is descending sort with size=count
self._sort_field = sort_field
self._count = count
def __repr__(self):
return "('{}': ('sort_field': '{}', 'count': {}))".format(self._task_type, self._sort_field, self._count)
def resolve_task(self, query_params, post_processing):
# tail - sort desc, size n, post-process sort asc
# |-------------12345|
query_sort_field = self._sort_field
query_sort_order = SortOrder.DESC
query_size = self._count
# If this is a tail of a tail adjust settings and return
if query_params['query_size'] is not None and \
query_params['query_sort_order'] == query_sort_order and \
post_processing == ['sort_index']:
if query_size < query_params['query_size']:
query_params['query_size'] = query_size
return query_params, post_processing
# If we are already postprocessing the query results, just get 'tail' of these
# (note, currently we just append another tail, we don't optimise by
# overwriting previous tail)
if len(post_processing) > 0:
post_processing.append(TailAction(self._count))
return query_params, post_processing
# If results are already constrained, just get 'tail' of these
# (note, currently we just append another tail, we don't optimise by
# overwriting previous tail)
if query_params['query_size'] is not None:
post_processing.append(TailAction(self._count))
return query_params, post_processing
else:
query_params['query_size'] = query_size
if query_params['query_sort_field'] is None:
query_params['query_sort_field'] = query_sort_field
if query_params['query_sort_order'] is None:
query_params['query_sort_order'] = query_sort_order
else:
# reverse sort order
query_params['query_sort_order'] = SortOrder.reverse(query_sort_order)
post_processing.append(SortIndexAction())
return query_params, post_processing
def size(self):
return self._count
class QueryIdsTask(Task):
def __init__(self, must, ids):
"""
Parameters
----------
must: bool
Include or exclude these ids (must/must_not)
ids: list
ids for the filter
"""
super().__init__("query_ids")
self._must = must
self._ids = ids
def resolve_task(self, query_params, post_processing):
query_params['query'].ids(self._ids, must=self._must)
return query_params, post_processing
def __repr__(self):
return "('{}': ('must': {}, 'ids': {}))".format(self._task_type, self._must, self._ids)
class QueryTermsTask(Task):
def __init__(self, must, field, terms):
"""
Parameters
----------
must: bool
Include or exclude these ids (must/must_not)
field: str
field_name to filter
terms: list
field_values for filter
"""
super().__init__("query_terms")
self._must = must
self._field = field
self._terms = terms
def __repr__(self):
return "('{}': ('must': {}, 'field': '{}', 'terms': {}))".format(self._task_type, self._must, self._field,
self._terms)
def resolve_task(self, query_params, post_processing):
query_params['query'].terms(self._field, self._terms, must=self._must)
return query_params, post_processing
class BooleanFilterTask(Task):
def __init__(self, boolean_filter):
"""
Parameters
----------
boolean_filter: BooleanFilter or str
The filter to apply
"""
super().__init__("boolean_filter")
self._boolean_filter = boolean_filter
def __repr__(self):
return "('{}': ('boolean_filter': {}))".format(self._task_type, repr(self._boolean_filter))
def resolve_task(self, query_params, post_processing):
query_params['query'].update_boolean_filter(self._boolean_filter)
return query_params, post_processing
class ArithmeticOpFieldsTask(Task):
def __init__(self, field_name, op_name, left_field, right_field, op_type):
super().__init__("arithmetic_op_fields")
self._field_name = field_name
self._op_name = op_name
self._left_field = left_field
self._right_field = right_field
self._op_type = op_type
def __repr__(self):
return "('{}': (" \
"'field_name': {}, " \
"'op_name': {}, " \
"'left_field': {}, " \
"'right_field': {}, " \
"'op_type': {}" \
"))" \
.format(self._task_type, self._field_name, self._op_name, self._left_field, self._right_field,
self._op_type)
def resolve_task(self, query_params, post_processing):
# https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-api-reference-shared-java-lang.html#painless-api-reference-shared-Math
if not self._op_type:
if isinstance(self._left_field, str) and isinstance(self._right_field, str):
"""
(if op_name = '__truediv__')
"script_fields": {
"field_name": {
"script": {
"source": "doc[left_field].value / doc[right_field].value"
}
}
}
"""
if self._op_name == '__add__':
source = "doc['{0}'].value + doc['{1}'].value".format(self._left_field, self._right_field)
elif self._op_name == '__truediv__':
source = "doc['{0}'].value / doc['{1}'].value".format(self._left_field, self._right_field)
elif self._op_name == '__floordiv__':
source = "Math.floor(doc['{0}'].value / doc['{1}'].value)".format(self._left_field,
self._right_field)
elif self._op_name == '__pow__':
source = "Math.pow(doc['{0}'].value, doc['{1}'].value)".format(self._left_field, self._right_field)
elif self._op_name == '__mod__':
source = "doc['{0}'].value % doc['{1}'].value".format(self._left_field, self._right_field)
elif self._op_name == '__mul__':
source = "doc['{0}'].value * doc['{1}'].value".format(self._left_field, self._right_field)
elif self._op_name == '__sub__':
source = "doc['{0}'].value - doc['{1}'].value".format(self._left_field, self._right_field)
else:
raise NotImplementedError("Not implemented operation '{0}'".format(self._op_name))
if query_params['query_script_fields'] is None:
query_params['query_script_fields'] = {}
query_params['query_script_fields'][self._field_name] = {
'script': {
'source': source
}
}
elif isinstance(self._left_field, str) and np.issubdtype(np.dtype(type(self._right_field)), np.number):
"""
(if self._op_name = '__truediv__')
"script_fields": {
"field_name": {
"script": {
"source": "doc[self._left_field].value / self._right_field"
}
}
}
"""
if self._op_name == '__add__':
source = "doc['{0}'].value + {1}".format(self._left_field, self._right_field)
elif self._op_name == '__truediv__':
source = "doc['{0}'].value / {1}".format(self._left_field, self._right_field)
elif self._op_name == '__floordiv__':
source = "Math.floor(doc['{0}'].value / {1})".format(self._left_field, self._right_field)
elif self._op_name == '__pow__':
source = "Math.pow(doc['{0}'].value, {1})".format(self._left_field, self._right_field)
elif self._op_name == '__mod__':
source = "doc['{0}'].value % {1}".format(self._left_field, self._right_field)
elif self._op_name == '__mul__':
source = "doc['{0}'].value * {1}".format(self._left_field, self._right_field)
elif self._op_name == '__sub__':
source = "doc['{0}'].value - {1}".format(self._left_field, self._right_field)
else:
raise NotImplementedError("Not implemented operation '{0}'".format(self._op_name))
elif np.issubdtype(np.dtype(type(self._left_field)), np.number) and isinstance(self._right_field, str):
"""
(if self._op_name = '__truediv__')
"script_fields": {
"field_name": {
"script": {
"source": "self._left_field / doc['self._right_field'].value"
}
}
}
"""
if self._op_name == '__add__':
source = "{0} + doc['{1}'].value".format(self._left_field, self._right_field)
elif self._op_name == '__truediv__':
source = "{0} / doc['{1}'].value".format(self._left_field, self._right_field)
elif self._op_name == '__floordiv__':
source = "Math.floor({0} / doc['{1}'].value)".format(self._left_field, self._right_field)
elif self._op_name == '__pow__':
source = "Math.pow({0}, doc['{1}'].value)".format(self._left_field, self._right_field)
elif self._op_name == '__mod__':
source = "{0} % doc['{1}'].value".format(self._left_field, self._right_field)
elif self._op_name == '__mul__':
source = "{0} * doc['{1}'].value".format(self._left_field, self._right_field)
elif self._op_name == '__sub__':
source = "{0} - doc['{1}'].value".format(self._left_field, self._right_field)
else:
raise NotImplementedError("Not implemented operation '{0}'".format(self._op_name))
else:
raise TypeError("Types for operation inconsistent {} {} {}", type(self._left_field),
type(self._right_field), self._op_name)
elif self._op_type[0] == "string":
# we need to check the type of string addition
if self._op_type[1] == "s":
"""
(if self._op_name = '__add__')
"script_fields": {
"field_name": {
"script": {
"source": "doc[self._left_field].value + doc[self._right_field].value"
}
}
}
"""
if self._op_name == '__add__':
source = "doc['{0}'].value + doc['{1}'].value".format(self._left_field, self._right_field)
else:
raise NotImplementedError("Not implemented operation '{0}'".format(self._op_name))
elif self._op_type[1] == "r":
if isinstance(self._left_field, str) and isinstance(self._right_field, str):
"""
(if self._op_name = '__add__')
"script_fields": {
"field_name": {
"script": {
"source": "doc[self._left_field].value + self._right_field"
}
}
}
"""
if self._op_name == '__add__':
source = "doc['{0}'].value + '{1}'".format(self._left_field, self._right_field)
else:
raise NotImplementedError("Not implemented operation '{0}'".format(self._op_name))
elif self._op_type[1] == 'l':
if isinstance(self._left_field, str) and isinstance(self._right_field, str):
"""
(if self._op_name = '__add__')
"script_fields": {
"field_name": {
"script": {
"source": "self._left_field + doc[self._right_field].value"
}
}
}
"""
if self._op_name == '__add__':
source = "'{0}' + doc['{1}'].value".format(self._left_field, self._right_field)
else:
raise NotImplementedError("Not implemented operation '{0}'".format(self._op_name))
if query_params['query_script_fields'] is None:
query_params['query_script_fields'] = {}
query_params['query_script_fields'][self._field_name] = {
'script': {
'source': source
}
}
return query_params, post_processing

View File

@ -27,3 +27,23 @@ class TestDataFrameAggs(TestData):
print(ed_sum_min_std.dtypes)
assert_almost_equal(pd_sum_min_std, ed_sum_min_std, check_less_precise=True)
def test_terms_aggs(self):
pd_flights = self.pd_flights()
ed_flights = self.ed_flights()
pd_sum_min = pd_flights.select_dtypes(include=[np.number]).agg(['sum', 'min'])
ed_sum_min = ed_flights.select_dtypes(include=[np.number]).agg(['sum', 'min'])
# Eland returns all float values for all metric aggs, pandas can return int
# TODO - investigate this more
pd_sum_min = pd_sum_min.astype('float64')
assert_almost_equal(pd_sum_min, ed_sum_min)
pd_sum_min_std = pd_flights.select_dtypes(include=[np.number]).agg(['sum', 'min', 'std'])
ed_sum_min_std = ed_flights.select_dtypes(include=[np.number]).agg(['sum', 'min', 'std'])
print(pd_sum_min_std.dtypes)
print(ed_sum_min_std.dtypes)
assert_almost_equal(pd_sum_min_std, ed_sum_min_std, check_less_precise=True)

View File

@ -85,3 +85,10 @@ class TestDataFrameHeadTail(TestData):
ed_head_0 = ed_flights.head(0)
pd_head_0 = pd_flights.head(0)
assert_pandas_eland_frame_equal(pd_head_0, ed_head_0)
def test_doc_test_tail(self):
df = self.ed_flights()
df = df[(df.OriginAirportID == 'AMS') & (df.FlightDelayMin > 60)]
df = df[['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']]
df = df.tail()
print(df)

View File

@ -20,8 +20,7 @@ class TestSeriesArithmetics(TestData):
with pytest.raises(TypeError):
assert self.ed_ecommerce()['total_quantity'] + self.ed_ecommerce()['currency']
def test_str_add_ser(self):
def test_ser_add_ser(self):
edadd = self.ed_ecommerce()['customer_first_name'] + self.ed_ecommerce()['customer_last_name']
pdadd = self.pd_ecommerce()['customer_first_name'] + self.pd_ecommerce()['customer_last_name']
@ -33,12 +32,31 @@ class TestSeriesArithmetics(TestData):
assert_pandas_eland_series_equal(pdadd, edadd)
def test_ser_add_ser(self):
def test_str_add_ser(self):
edadd = "The last name is: " + self.ed_ecommerce()['customer_last_name']
pdadd = "The last name is: " + self.pd_ecommerce()['customer_last_name']
assert_pandas_eland_series_equal(pdadd, edadd)
def test_bad_str_add_ser(self):
# TODO encode special characters better
# Elasticsearch accepts this, but it will cause problems
edadd = " *" + self.ed_ecommerce()['customer_last_name']
pdadd = " *" + self.pd_ecommerce()['customer_last_name']
assert_pandas_eland_series_equal(pdadd, edadd)
def test_ser_add_str_add_ser(self):
pdadd = self.pd_ecommerce()['customer_first_name'] + self.pd_ecommerce()['customer_last_name']
print(pdadd.name)
edadd = self.ed_ecommerce()['customer_first_name'] + self.ed_ecommerce()['customer_last_name']
print(edadd.name)
print(edadd.info_es())
assert_pandas_eland_series_equal(pdadd, edadd)
def test_non_aggregatable_add_str(self):
with pytest.raises(ValueError):
assert self.ed_ecommerce()['customer_gender'] + "is the gender"

View File

@ -47,6 +47,14 @@ class TestSeriesValueCounts(TestData):
assert ed_s.value_counts(es_size=-9)
def test_value_counts_non_aggregatable(self):
ed_s = self.ed_ecommerce()['customer_first_name']
pd_s = self.pd_ecommerce()['customer_first_name']
pd_vc = pd_s.value_counts().head(20).sort_index()
ed_vc = ed_s.value_counts(es_size=20).sort_index()
assert_series_equal(pd_vc, ed_vc)
ed_s = self.ed_ecommerce()['customer_gender']
with pytest.raises(ValueError):
assert ed_s.value_counts()