diff --git a/MANIFEST.in b/MANIFEST.in index 1aba38f..42eb410 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1 @@ -include LICENSE +include LICENSE.txt diff --git a/README.md b/README.md index 12deb50..d112b48 100644 --- a/README.md +++ b/README.md @@ -34,14 +34,15 @@ max 400140.000000 246.000000 5.000000 ## Connecting to Elasticsearch Cloud -```python +``` +>>> import eland as ed +>>> from elasticsearch import Elasticsearch + >>> es = Elasticsearch(cloud_id="", http_auth=('','')) >>> es.info() {'name': 'instance-0000000000', 'cluster_name': 'bf900cfce5684a81bca0be0cce5913bc', 'cluster_uuid': 'xLPvrV3jQNeadA7oM4l1jA', 'version': {'number': '7.4.2', 'build_flavor': 'default', 'build_type': 'tar', 'build_hash': '2f90bbf7b93631e52bafb59b3b049cb44ec25e96', 'build_date': '2019-10-28T20:40:44.881551Z', 'build_snapshot': False, 'lucene_version': '8.2.0', 'minimum_wire_compatibility_version': '6.8.0', 'minimum_index_compatibility_version': '6.0.0-beta1'}, 'tagline': 'You Know, for Search'} ->>> import eland as ed - >>> df = ed.read_es(es, 'reviews') ``` diff --git a/eland/actions.py b/eland/actions.py new file mode 100644 index 0000000..286726d --- /dev/null +++ b/eland/actions.py @@ -0,0 +1,87 @@ +from abc import ABC, abstractmethod + +# -------------------------------------------------------------------------------------------------------------------- # +# PostProcessingActions # +# -------------------------------------------------------------------------------------------------------------------- # +class PostProcessingAction(ABC): + def __init__(self, action_type): + """ + Abstract class for postprocessing actions + + Parameters + ---------- + action_type: str + The action type (e.g. sort_index, head etc.) + """ + self._action_type = action_type + + @property + def type(self): + return self._action_type + + @abstractmethod + def resolve_action(self, df): + pass + + @abstractmethod + def __repr__(self): + pass + +class SortIndexAction(PostProcessingAction): + def __init__(self): + super().__init__("sort_index") + + def resolve_action(self, df): + return df.sort_index() + + def __repr__(self): + return "('{}')".format(self.type) + +class HeadAction(PostProcessingAction): + def __init__(self, count): + super().__init__("head") + + self._count = count + + def resolve_action(self, df): + return df.head(self._count) + + def __repr__(self): + return "('{}': ('count': {}))".format(self.type, self._count) + + +class TailAction(PostProcessingAction): + def __init__(self, count): + super().__init__("tail") + + self._count = count + + def resolve_action(self, df): + return df.tail(self._count) + + def __repr__(self): + return "('{}': ('count': {}))".format(self.type, self._count) + + +class SortFieldAction(PostProcessingAction): + def __init__(self, sort_params_string): + super().__init__("sort_field") + + if sort_params_string is None: + raise ValueError("Expected valid string") + + # Split string + sort_params = sort_params_string.split(":") + if len(sort_params) != 2: + raise ValueError("Expected ES sort params string (e.g. _doc:desc). Got '{}'".format(sort_params_string)) + + self._sort_field = sort_params[0] + self._sort_order = Operations.SortOrder.from_string(sort_params[1]) + + def resolve_action(self, df): + if self._sort_order == Operations.SortOrder.ASC: + return df.sort_values(self._sort_field, True) + return df.sort_values(self._sort_field, False) + + def __repr__(self): + return "('{}': ('sort_field': '{}', 'sort_order': {}))".format(self.type, self._sort_field, self._sort_order) diff --git a/eland/dataframe.py b/eland/dataframe.py index 2f01aef..6f7707d 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -385,12 +385,14 @@ class DataFrame(NDFrame): [27 rows x 5 columns] Operations: - tasks: [('boolean_filter', {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}), ('field_names', ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']), ('tail', ('_doc', 5))] + tasks: [('boolean_filter': ('boolean_filter': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}})), ('tail': ('sort_field': '_doc', 'count': 5))] size: 5 sort_params: _doc:desc _source: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin'] body: {'query': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}, 'aggs': {}} - post_processing: ['sort_index'] + post_processing: [('sort_index')] + 'field_to_display_names': {} + 'display_to_field_names': {} """ buf = StringIO() diff --git a/eland/ndframe.py b/eland/ndframe.py index 6a7ddeb..5d6fd61 100644 --- a/eland/ndframe.py +++ b/eland/ndframe.py @@ -1,7 +1,7 @@ """ NDFrame --------- -Base class for eland.DataFrame and eland.Series. +Abstract base class for eland.DataFrame and eland.Series. The underlying data resides in Elasticsearch and the API aligns as much as possible with pandas APIs. @@ -24,6 +24,7 @@ only Elasticsearch aggregatable fields can be aggregated or grouped. """ import sys +from abc import ABC import pandas as pd from pandas.core.dtypes.common import is_list_like @@ -32,7 +33,7 @@ from pandas.util._validators import validate_bool_kwarg from eland import ElandQueryCompiler -class NDFrame: +class NDFrame(ABC): def __init__(self, client=None, diff --git a/eland/operations.py b/eland/operations.py index 5868854..9b1fc44 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -1,11 +1,12 @@ import copy -from enum import Enum -import numpy as np import pandas as pd from eland import Index from eland import Query +from eland.actions import SortFieldAction +from eland.tasks import HeadTask, TailTask, BooleanFilterTask, ArithmeticOpFieldsTask, QueryTermsTask, \ + QueryIdsTask, SortOrder, SizeTask class Operations: @@ -20,87 +21,46 @@ class Operations: This is maintained as a 'task graph' (inspired by dask) (see https://docs.dask.org/en/latest/spec.html) """ - - class SortOrder(Enum): - ASC = 0 - DESC = 1 - - @staticmethod - def reverse(order): - if order == Operations.SortOrder.ASC: - return Operations.SortOrder.DESC - - return Operations.SortOrder.ASC - - @staticmethod - def to_string(order): - if order == Operations.SortOrder.ASC: - return "asc" - - return "desc" - - @staticmethod - def from_string(order): - if order == "asc": - return Operations.SortOrder.ASC - - return Operations.SortOrder.DESC - - def __init__(self, tasks=None): + def __init__(self, tasks=None, field_names=None): if tasks is None: self._tasks = [] else: self._tasks = tasks + self._field_names = field_names def __constructor__(self, *args, **kwargs): return type(self)(*args, **kwargs) def copy(self): - return self.__constructor__(tasks=copy.deepcopy(self._tasks)) + return self.__constructor__(tasks=copy.deepcopy(self._tasks), field_names=copy.deepcopy(self._field_names)) def head(self, index, n): # Add a task that is an ascending sort with size=n - task = ('head', (index.sort_field, n)) + task = HeadTask(index.sort_field, n) self._tasks.append(task) def tail(self, index, n): # Add a task that is descending sort with size=n - task = ('tail', (index.sort_field, n)) + task = TailTask(index.sort_field, n) self._tasks.append(task) def arithmetic_op_fields(self, field_name, op_name, left_field, right_field, op_type=None): - if op_type: - task = ('arithmetic_op_fields', (field_name, (op_name, (left_field, right_field))), op_type) - else: - task = ('arithmetic_op_fields', (field_name, (op_name, (left_field, right_field)))) # Set this as a column we want to retrieve self.set_field_names([field_name]) + task = ArithmeticOpFieldsTask(field_name, op_name, left_field, right_field, op_type) self._tasks.append(task) def set_field_names(self, field_names): - # Setting field_names at different phases of the task list may result in different - # operations. So instead of setting field_names once, set when it happens in call chain if not isinstance(field_names, list): field_names = list(field_names) - # TODO - field_name renaming - # TODO - validate we are setting field_names to a subset of last field_names? - task = ('field_names', field_names) - self._tasks.append(task) - # Iterate backwards through task list looking for last 'field_names' task - for task in reversed(self._tasks): - if task[0] == 'field_names': - return task[1] - return None + self._field_names = field_names + + return self._field_names def get_field_names(self): - # Iterate backwards through task list looking for last 'field_names' task - for task in reversed(self._tasks): - if task[0] == 'field_names': - return task[1] - - return None + return self._field_names def __repr__(self): return repr(self._tasks) @@ -248,7 +208,9 @@ class Operations: results = {} - for key, value in aggregatable_field_names.items(): + for key in aggregatable_field_names.keys(): + # key is aggregatable field, value is label + # e.g. key=category.keyword, value=category for bucket in response['aggregations'][key]['buckets']: results[bucket['key']] = bucket['doc_count'] @@ -597,7 +559,7 @@ class Operations: _source=field_names) # create post sort if sort_params is not None: - post_processing.append(self._sort_params_to_postprocessing(sort_params)) + post_processing.append(SortFieldAction(sort_params)) if is_scan: while True: @@ -611,11 +573,6 @@ class Operations: df = self._apply_df_post_processing(df, post_processing) collector.collect(df) - def iloc(self, index, field_names): - # index and field_names are indexers - task = ('iloc', (index, field_names)) - self._tasks.append(task) - def index_count(self, query_compiler, field): # field is the index field so count values query_params, post_processing = self._resolve_tasks() @@ -671,28 +628,16 @@ class Operations: # b not in ['a','b','c'] # For now use term queries if field == Index.ID_INDEX_FIELD: - task = ('query_ids', ('must_not', items)) + task = QueryIdsTask(False, items) else: - task = ('query_terms', ('must_not', (field, items))) + task = QueryTermsTask(False, field, items) self._tasks.append(task) - @staticmethod - def _sort_params_to_postprocessing(input): - # Split string - sort_params = input.split(":") - - query_sort_field = sort_params[0] - query_sort_order = Operations.SortOrder.from_string(sort_params[1]) - - task = ('sort_field', (query_sort_field, query_sort_order)) - - return task - @staticmethod def _query_params_to_size_and_sort(query_params): sort_params = None if query_params['query_sort_field'] and query_params['query_sort_order']: - sort_params = query_params['query_sort_field'] + ":" + Operations.SortOrder.to_string( + sort_params = query_params['query_sort_field'] + ":" + SortOrder.to_string( query_params['query_sort_order']) size = query_params['query_size'] @@ -703,37 +648,16 @@ class Operations: def _count_post_processing(post_processing): size = None for action in post_processing: - if action[0] == 'head' or action[0] == 'tail': - if size is None or action[1][1] < size: - size = action[1][1] + if isinstance(action, SizeTask): + if size is None or action.size() < size: + size = action.size() return size @staticmethod def _apply_df_post_processing(df, post_processing): for action in post_processing: - if action == 'sort_index': - df = df.sort_index() - elif action[0] == 'head': - df = df.head(action[1][1]) - elif action[0] == 'tail': - df = df.tail(action[1][1]) - elif action[0] == 'sort_field': - sort_field = action[1][0] - sort_order = action[1][1] - if sort_order == Operations.SortOrder.ASC: - df = df.sort_values(sort_field, True) - else: - df = df.sort_values(sort_field, False) - elif action[0] == 'iloc': - index_indexer = action[1][0] - field_name_indexer = action[1][1] - if index_indexer is None: - index_indexer = slice(None) - if field_name_indexer is None: - field_name_indexer = slice(None) - df = df.iloc[index_indexer, field_name_indexer] - # field_names could be in here (and we ignore it) + df = action.resolve_action(df) return df @@ -752,337 +676,7 @@ class Operations: post_processing = [] for task in self._tasks: - if task[0] == 'head': - query_params, post_processing = self._resolve_head(task, query_params, post_processing) - elif task[0] == 'tail': - query_params, post_processing = self._resolve_tail(task, query_params, post_processing) - elif task[0] == 'iloc': - query_params, post_processing = self._resolve_iloc(task, query_params, post_processing) - elif task[0] == 'query_ids': - query_params, post_processing = self._resolve_query_ids(task, query_params, post_processing) - elif task[0] == 'query_terms': - query_params, post_processing = self._resolve_query_terms(task, query_params, post_processing) - elif task[0] == 'boolean_filter': - query_params, post_processing = self._resolve_boolean_filter(task, query_params, post_processing) - elif task[0] == 'arithmetic_op_fields': - query_params, post_processing = self._resolve_arithmetic_op_fields(task, query_params, post_processing) - else: # a lot of operations simply post-process the dataframe - put these straight through - query_params, post_processing = self._resolve_post_processing_task(task, query_params, post_processing) - - return query_params, post_processing - - @staticmethod - def _resolve_head(item, query_params, post_processing): - # head - sort asc, size n - # |12345-------------| - query_sort_field = item[1][0] - query_sort_order = Operations.SortOrder.ASC - query_size = item[1][1] - - # If we are already postprocessing the query results, we just get 'head' of these - # (note, currently we just append another head, we don't optimise by - # overwriting previous head) - if len(post_processing) > 0: - post_processing.append(item) - return query_params, post_processing - - if query_params['query_sort_field'] is None: - query_params['query_sort_field'] = query_sort_field - # if it is already sorted we use existing field - - if query_params['query_sort_order'] is None: - query_params['query_sort_order'] = query_sort_order - # if it is already sorted we get head of existing order - - if query_params['query_size'] is None: - query_params['query_size'] = query_size - else: - # truncate if head is smaller - if query_size < query_params['query_size']: - query_params['query_size'] = query_size - - return query_params, post_processing - - @staticmethod - def _resolve_tail(item, query_params, post_processing): - # tail - sort desc, size n, post-process sort asc - # |-------------12345| - query_sort_field = item[1][0] - query_sort_order = Operations.SortOrder.DESC - query_size = item[1][1] - - # If this is a tail of a tail adjust settings and return - if query_params['query_size'] is not None and \ - query_params['query_sort_order'] == query_sort_order and \ - post_processing == ['sort_index']: - if query_size < query_params['query_size']: - query_params['query_size'] = query_size - return query_params, post_processing - - # If we are already postprocessing the query results, just get 'tail' of these - # (note, currently we just append another tail, we don't optimise by - # overwriting previous tail) - if len(post_processing) > 0: - post_processing.append(item) - return query_params, post_processing - - # If results are already constrained, just get 'tail' of these - # (note, currently we just append another tail, we don't optimise by - # overwriting previous tail) - if query_params['query_size'] is not None: - post_processing.append(item) - return query_params, post_processing - else: - query_params['query_size'] = query_size - if query_params['query_sort_field'] is None: - query_params['query_sort_field'] = query_sort_field - if query_params['query_sort_order'] is None: - query_params['query_sort_order'] = query_sort_order - else: - # reverse sort order - query_params['query_sort_order'] = Operations.SortOrder.reverse(query_sort_order) - - post_processing.append('sort_index') - - return query_params, post_processing - - @staticmethod - def _resolve_iloc(item, query_params, post_processing): - # tail - sort desc, size n, post-process sort asc - # |---4--7-9---------| - - # This is a list of items we return via an integer index - int_index = item[1][0] - if int_index is not None: - last_item = int_index.max() - - # If we have a query_size we do this post processing - if query_params['query_size'] is not None: - post_processing.append(item) - return query_params, post_processing - - # size should be > last item - query_params['query_size'] = last_item + 1 - post_processing.append(item) - - return query_params, post_processing - - @staticmethod - def _resolve_query_ids(item, query_params, post_processing): - # task = ('query_ids', ('must_not', items)) - must_clause = item[1][0] - ids = item[1][1] - - if must_clause == 'must': - query_params['query'].ids(ids, must=True) - else: - query_params['query'].ids(ids, must=False) - - return query_params, post_processing - - @staticmethod - def _resolve_query_terms(item, query_params, post_processing): - # task = ('query_terms', ('must_not', (field, terms))) - must_clause = item[1][0] - field = item[1][1][0] - terms = item[1][1][1] - - if must_clause == 'must': - query_params['query'].terms(field, terms, must=True) - else: - query_params['query'].terms(field, terms, must=False) - - return query_params, post_processing - - @staticmethod - def _resolve_boolean_filter(item, query_params, post_processing): - # task = ('boolean_filter', object) - boolean_filter = item[1] - - query_params['query'].update_boolean_filter(boolean_filter) - - return query_params, post_processing - - def _resolve_arithmetic_op_fields(self, item, query_params, post_processing): - # task = ('arithmetic_op_fields', (field_name, (op_name, (left_field, right_field)))) - field_name = item[1][0] - op_name = item[1][1][0] - left_field = item[1][1][1][0] - right_field = item[1][1][1][1] - - try: - op_type = item[2] - except IndexError: - op_type = None - - # https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-api-reference-shared-java-lang.html#painless-api-reference-shared-Math - if not op_type: - if isinstance(left_field, str) and isinstance(right_field, str): - """ - (if op_name = '__truediv__') - - "script_fields": { - "field_name": { - "script": { - "source": "doc[left_field].value / doc[right_field].value" - } - } - } - """ - if op_name == '__add__': - source = "doc['{0}'].value + doc['{1}'].value".format(left_field, right_field) - elif op_name == '__truediv__': - source = "doc['{0}'].value / doc['{1}'].value".format(left_field, right_field) - elif op_name == '__floordiv__': - source = "Math.floor(doc['{0}'].value / doc['{1}'].value)".format(left_field, right_field) - elif op_name == '__pow__': - source = "Math.pow(doc['{0}'].value, doc['{1}'].value)".format(left_field, right_field) - elif op_name == '__mod__': - source = "doc['{0}'].value % doc['{1}'].value".format(left_field, right_field) - elif op_name == '__mul__': - source = "doc['{0}'].value * doc['{1}'].value".format(left_field, right_field) - elif op_name == '__sub__': - source = "doc['{0}'].value - doc['{1}'].value".format(left_field, right_field) - else: - raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) - - if query_params['query_script_fields'] is None: - query_params['query_script_fields'] = {} - query_params['query_script_fields'][field_name] = { - 'script': { - 'source': source - } - } - elif isinstance(left_field, str) and np.issubdtype(np.dtype(type(right_field)), np.number): - """ - (if op_name = '__truediv__') - - "script_fields": { - "field_name": { - "script": { - "source": "doc[left_field].value / right_field" - } - } - } - """ - if op_name == '__add__': - source = "doc['{0}'].value + {1}".format(left_field, right_field) - elif op_name == '__truediv__': - source = "doc['{0}'].value / {1}".format(left_field, right_field) - elif op_name == '__floordiv__': - source = "Math.floor(doc['{0}'].value / {1})".format(left_field, right_field) - elif op_name == '__pow__': - source = "Math.pow(doc['{0}'].value, {1})".format(left_field, right_field) - elif op_name == '__mod__': - source = "doc['{0}'].value % {1}".format(left_field, right_field) - elif op_name == '__mul__': - source = "doc['{0}'].value * {1}".format(left_field, right_field) - elif op_name == '__sub__': - source = "doc['{0}'].value - {1}".format(left_field, right_field) - else: - raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) - elif np.issubdtype(np.dtype(type(left_field)), np.number) and isinstance(right_field, str): - """ - (if op_name = '__truediv__') - - "script_fields": { - "field_name": { - "script": { - "source": "left_field / doc['right_field'].value" - } - } - } - """ - if op_name == '__add__': - source = "{0} + doc['{1}'].value".format(left_field, right_field) - elif op_name == '__truediv__': - source = "{0} / doc['{1}'].value".format(left_field, right_field) - elif op_name == '__floordiv__': - source = "Math.floor({0} / doc['{1}'].value)".format(left_field, right_field) - elif op_name == '__pow__': - source = "Math.pow({0}, doc['{1}'].value)".format(left_field, right_field) - elif op_name == '__mod__': - source = "{0} % doc['{1}'].value".format(left_field, right_field) - elif op_name == '__mul__': - source = "{0} * doc['{1}'].value".format(left_field, right_field) - elif op_name == '__sub__': - source = "{0} - doc['{1}'].value".format(left_field, right_field) - else: - raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) - - else: - raise TypeError("Types for operation inconsistent {} {} {}", type(left_field), type(right_field), op_name) - - elif op_type[0] == "string": - # we need to check the type of string addition - if op_type[1] == "s": - """ - (if op_name = '__add__') - - "script_fields": { - "field_name": { - "script": { - "source": "doc[left_field].value + doc[right_field].value" - } - } - } - """ - if op_name == '__add__': - source = "doc['{0}'].value + doc['{1}'].value".format(left_field, right_field) - else: - raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) - - elif op_type[1] == "r": - if isinstance(left_field, str) and isinstance(right_field, str): - """ - (if op_name = '__add__') - - "script_fields": { - "field_name": { - "script": { - "source": "doc[left_field].value + right_field" - } - } - } - """ - if op_name == '__add__': - source = "doc['{0}'].value + '{1}'".format(left_field, right_field) - else: - raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) - - elif op_type[1] == 'l': - if isinstance(left_field, str) and isinstance(right_field, str): - """ - (if op_name = '__add__') - - "script_fields": { - "field_name": { - "script": { - "source": "left_field + doc[right_field].value" - } - } - } - """ - if op_name == '__add__': - source = "'{0}' + doc['{1}'].value".format(left_field, right_field) - else: - raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) - - if query_params['query_script_fields'] is None: - query_params['query_script_fields'] = {} - query_params['query_script_fields'][field_name] = { - 'script': { - 'source': source - } - } - - return query_params, post_processing - - @staticmethod - def _resolve_post_processing_task(item, query_params, post_processing): - # Just do this in post-processing - if item[0] != 'field_names': - post_processing.append(item) + query_params, post_processing = task.resolve_task(query_params, post_processing) return query_params, post_processing @@ -1121,5 +715,5 @@ class Operations: buf.write(" post_processing: {0}\n".format(post_processing)) def update_query(self, boolean_filter): - task = ('boolean_filter', boolean_filter) + task = BooleanFilterTask(boolean_filter) self._tasks.append(task) diff --git a/eland/query_compiler.py b/eland/query_compiler.py index b5bd00f..85361e0 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -461,6 +461,7 @@ class ElandQueryCompiler: self._index.info_es(buf) self._mappings.info_es(buf) self._operations.info_es(buf) + self._name_mapper.info_es(buf) def describe(self): return self._operations.describe(self) @@ -548,7 +549,7 @@ class ElandQueryCompiler: else: raise ValueError( "Can not perform arithmetic operations on non aggregatable fields" - "One of [{}, {}] is not aggregatable.".format(self.name, right.name) + "One of [{}, {}] is not aggregatable.".format(self_field, right_field) ) def arithmetic_op_fields(self, new_field_name, op, left_field, right_field, op_type=None): @@ -648,6 +649,9 @@ class ElandQueryCompiler: display_to_field_names=self._display_to_field_names.copy() ) + def info_es(self, buf): + buf.write("'field_to_display_names': {}\n".format(self._field_to_display_names)) + buf.write("'display_to_field_names': {}\n".format(self._display_to_field_names)) def elasticsearch_date_to_pandas_date(value: Union[int, str], date_format: str) -> pd.Timestamp: """ diff --git a/eland/series.py b/eland/series.py index 7e1d7de..8caf6ad 100644 --- a/eland/series.py +++ b/eland/series.py @@ -215,7 +215,7 @@ class Series(NDFrame): Returns ------- pandas.Series - number of occurences of each value in the column + number of occurrences of each value in the column See Also -------- diff --git a/eland/tasks.py b/eland/tasks.py new file mode 100644 index 0000000..c939b02 --- /dev/null +++ b/eland/tasks.py @@ -0,0 +1,438 @@ +from abc import ABC, abstractmethod +from enum import Enum + +import numpy as np + +from eland.actions import HeadAction, TailAction, SortIndexAction + + +class SortOrder(Enum): + ASC = 0 + DESC = 1 + + @staticmethod + def reverse(order): + if order == SortOrder.ASC: + return SortOrder.DESC + + return SortOrder.ASC + + @staticmethod + def to_string(order): + if order == SortOrder.ASC: + return "asc" + + return "desc" + + @staticmethod + def from_string(order): + if order == "asc": + return SortOrder.ASC + + return SortOrder.DESC + + +# -------------------------------------------------------------------------------------------------------------------- # +# Tasks # +# -------------------------------------------------------------------------------------------------------------------- # +class Task(ABC): + """ + Abstract class for tasks + + Parameters + ---------- + task_type: str + The task type (e.g. head, tail etc.) + """ + + def __init__(self, task_type): + self._task_type = task_type + + @property + def type(self): + return self._task_type + + @abstractmethod + def resolve_task(self, query_params, post_processing): + pass + + @abstractmethod + def __repr__(self): + pass + + +class SizeTask(Task): + def __init__(self, task_type): + super().__init__(task_type) + + @abstractmethod + def size(self): + # must override + pass + + +class HeadTask(SizeTask): + def __init__(self, sort_field, count): + super().__init__("head") + + # Add a task that is an ascending sort with size=count + self._sort_field = sort_field + self._count = count + + def __repr__(self): + return "('{}': ('sort_field': '{}', 'count': {}))".format(self._task_type, self._sort_field, self._count) + + def resolve_task(self, query_params, post_processing): + # head - sort asc, size n + # |12345-------------| + query_sort_field = self._sort_field + query_sort_order = SortOrder.ASC + query_size = self._count + + # If we are already postprocessing the query results, we just get 'head' of these + # (note, currently we just append another head, we don't optimise by + # overwriting previous head) + if len(post_processing) > 0: + post_processing.append(HeadAction(self._count)) + return query_params, post_processing + + if query_params['query_sort_field'] is None: + query_params['query_sort_field'] = query_sort_field + # if it is already sorted we use existing field + + if query_params['query_sort_order'] is None: + query_params['query_sort_order'] = query_sort_order + # if it is already sorted we get head of existing order + + if query_params['query_size'] is None: + query_params['query_size'] = query_size + else: + # truncate if head is smaller + if query_size < query_params['query_size']: + query_params['query_size'] = query_size + + return query_params, post_processing + + def size(self): + return self._count + + +class TailTask(SizeTask): + def __init__(self, sort_field, count): + super().__init__("tail") + + # Add a task that is descending sort with size=count + self._sort_field = sort_field + self._count = count + + def __repr__(self): + return "('{}': ('sort_field': '{}', 'count': {}))".format(self._task_type, self._sort_field, self._count) + + def resolve_task(self, query_params, post_processing): + # tail - sort desc, size n, post-process sort asc + # |-------------12345| + query_sort_field = self._sort_field + query_sort_order = SortOrder.DESC + query_size = self._count + + # If this is a tail of a tail adjust settings and return + if query_params['query_size'] is not None and \ + query_params['query_sort_order'] == query_sort_order and \ + post_processing == ['sort_index']: + if query_size < query_params['query_size']: + query_params['query_size'] = query_size + return query_params, post_processing + + # If we are already postprocessing the query results, just get 'tail' of these + # (note, currently we just append another tail, we don't optimise by + # overwriting previous tail) + if len(post_processing) > 0: + post_processing.append(TailAction(self._count)) + return query_params, post_processing + + # If results are already constrained, just get 'tail' of these + # (note, currently we just append another tail, we don't optimise by + # overwriting previous tail) + if query_params['query_size'] is not None: + post_processing.append(TailAction(self._count)) + return query_params, post_processing + else: + query_params['query_size'] = query_size + if query_params['query_sort_field'] is None: + query_params['query_sort_field'] = query_sort_field + if query_params['query_sort_order'] is None: + query_params['query_sort_order'] = query_sort_order + else: + # reverse sort order + query_params['query_sort_order'] = SortOrder.reverse(query_sort_order) + + post_processing.append(SortIndexAction()) + + return query_params, post_processing + + def size(self): + return self._count + + +class QueryIdsTask(Task): + def __init__(self, must, ids): + """ + Parameters + ---------- + must: bool + Include or exclude these ids (must/must_not) + + ids: list + ids for the filter + """ + super().__init__("query_ids") + + self._must = must + self._ids = ids + + def resolve_task(self, query_params, post_processing): + query_params['query'].ids(self._ids, must=self._must) + + return query_params, post_processing + + def __repr__(self): + return "('{}': ('must': {}, 'ids': {}))".format(self._task_type, self._must, self._ids) + + +class QueryTermsTask(Task): + def __init__(self, must, field, terms): + """ + Parameters + ---------- + must: bool + Include or exclude these ids (must/must_not) + + field: str + field_name to filter + + terms: list + field_values for filter + """ + super().__init__("query_terms") + + self._must = must + self._field = field + self._terms = terms + + def __repr__(self): + return "('{}': ('must': {}, 'field': '{}', 'terms': {}))".format(self._task_type, self._must, self._field, + self._terms) + + def resolve_task(self, query_params, post_processing): + query_params['query'].terms(self._field, self._terms, must=self._must) + + return query_params, post_processing + + +class BooleanFilterTask(Task): + def __init__(self, boolean_filter): + """ + Parameters + ---------- + boolean_filter: BooleanFilter or str + The filter to apply + """ + super().__init__("boolean_filter") + + self._boolean_filter = boolean_filter + + def __repr__(self): + return "('{}': ('boolean_filter': {}))".format(self._task_type, repr(self._boolean_filter)) + + def resolve_task(self, query_params, post_processing): + query_params['query'].update_boolean_filter(self._boolean_filter) + + return query_params, post_processing + + +class ArithmeticOpFieldsTask(Task): + def __init__(self, field_name, op_name, left_field, right_field, op_type): + super().__init__("arithmetic_op_fields") + + self._field_name = field_name + self._op_name = op_name + self._left_field = left_field + self._right_field = right_field + self._op_type = op_type + + def __repr__(self): + return "('{}': (" \ + "'field_name': {}, " \ + "'op_name': {}, " \ + "'left_field': {}, " \ + "'right_field': {}, " \ + "'op_type': {}" \ + "))" \ + .format(self._task_type, self._field_name, self._op_name, self._left_field, self._right_field, + self._op_type) + + def resolve_task(self, query_params, post_processing): + # https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-api-reference-shared-java-lang.html#painless-api-reference-shared-Math + if not self._op_type: + if isinstance(self._left_field, str) and isinstance(self._right_field, str): + """ + (if op_name = '__truediv__') + + "script_fields": { + "field_name": { + "script": { + "source": "doc[left_field].value / doc[right_field].value" + } + } + } + """ + if self._op_name == '__add__': + source = "doc['{0}'].value + doc['{1}'].value".format(self._left_field, self._right_field) + elif self._op_name == '__truediv__': + source = "doc['{0}'].value / doc['{1}'].value".format(self._left_field, self._right_field) + elif self._op_name == '__floordiv__': + source = "Math.floor(doc['{0}'].value / doc['{1}'].value)".format(self._left_field, + self._right_field) + elif self._op_name == '__pow__': + source = "Math.pow(doc['{0}'].value, doc['{1}'].value)".format(self._left_field, self._right_field) + elif self._op_name == '__mod__': + source = "doc['{0}'].value % doc['{1}'].value".format(self._left_field, self._right_field) + elif self._op_name == '__mul__': + source = "doc['{0}'].value * doc['{1}'].value".format(self._left_field, self._right_field) + elif self._op_name == '__sub__': + source = "doc['{0}'].value - doc['{1}'].value".format(self._left_field, self._right_field) + else: + raise NotImplementedError("Not implemented operation '{0}'".format(self._op_name)) + + if query_params['query_script_fields'] is None: + query_params['query_script_fields'] = {} + query_params['query_script_fields'][self._field_name] = { + 'script': { + 'source': source + } + } + elif isinstance(self._left_field, str) and np.issubdtype(np.dtype(type(self._right_field)), np.number): + """ + (if self._op_name = '__truediv__') + + "script_fields": { + "field_name": { + "script": { + "source": "doc[self._left_field].value / self._right_field" + } + } + } + """ + if self._op_name == '__add__': + source = "doc['{0}'].value + {1}".format(self._left_field, self._right_field) + elif self._op_name == '__truediv__': + source = "doc['{0}'].value / {1}".format(self._left_field, self._right_field) + elif self._op_name == '__floordiv__': + source = "Math.floor(doc['{0}'].value / {1})".format(self._left_field, self._right_field) + elif self._op_name == '__pow__': + source = "Math.pow(doc['{0}'].value, {1})".format(self._left_field, self._right_field) + elif self._op_name == '__mod__': + source = "doc['{0}'].value % {1}".format(self._left_field, self._right_field) + elif self._op_name == '__mul__': + source = "doc['{0}'].value * {1}".format(self._left_field, self._right_field) + elif self._op_name == '__sub__': + source = "doc['{0}'].value - {1}".format(self._left_field, self._right_field) + else: + raise NotImplementedError("Not implemented operation '{0}'".format(self._op_name)) + elif np.issubdtype(np.dtype(type(self._left_field)), np.number) and isinstance(self._right_field, str): + """ + (if self._op_name = '__truediv__') + + "script_fields": { + "field_name": { + "script": { + "source": "self._left_field / doc['self._right_field'].value" + } + } + } + """ + if self._op_name == '__add__': + source = "{0} + doc['{1}'].value".format(self._left_field, self._right_field) + elif self._op_name == '__truediv__': + source = "{0} / doc['{1}'].value".format(self._left_field, self._right_field) + elif self._op_name == '__floordiv__': + source = "Math.floor({0} / doc['{1}'].value)".format(self._left_field, self._right_field) + elif self._op_name == '__pow__': + source = "Math.pow({0}, doc['{1}'].value)".format(self._left_field, self._right_field) + elif self._op_name == '__mod__': + source = "{0} % doc['{1}'].value".format(self._left_field, self._right_field) + elif self._op_name == '__mul__': + source = "{0} * doc['{1}'].value".format(self._left_field, self._right_field) + elif self._op_name == '__sub__': + source = "{0} - doc['{1}'].value".format(self._left_field, self._right_field) + else: + raise NotImplementedError("Not implemented operation '{0}'".format(self._op_name)) + + else: + raise TypeError("Types for operation inconsistent {} {} {}", type(self._left_field), + type(self._right_field), self._op_name) + + elif self._op_type[0] == "string": + # we need to check the type of string addition + if self._op_type[1] == "s": + """ + (if self._op_name = '__add__') + + "script_fields": { + "field_name": { + "script": { + "source": "doc[self._left_field].value + doc[self._right_field].value" + } + } + } + """ + if self._op_name == '__add__': + source = "doc['{0}'].value + doc['{1}'].value".format(self._left_field, self._right_field) + else: + raise NotImplementedError("Not implemented operation '{0}'".format(self._op_name)) + + elif self._op_type[1] == "r": + if isinstance(self._left_field, str) and isinstance(self._right_field, str): + """ + (if self._op_name = '__add__') + + "script_fields": { + "field_name": { + "script": { + "source": "doc[self._left_field].value + self._right_field" + } + } + } + """ + if self._op_name == '__add__': + source = "doc['{0}'].value + '{1}'".format(self._left_field, self._right_field) + else: + raise NotImplementedError("Not implemented operation '{0}'".format(self._op_name)) + + elif self._op_type[1] == 'l': + if isinstance(self._left_field, str) and isinstance(self._right_field, str): + """ + (if self._op_name = '__add__') + + "script_fields": { + "field_name": { + "script": { + "source": "self._left_field + doc[self._right_field].value" + } + } + } + """ + if self._op_name == '__add__': + source = "'{0}' + doc['{1}'].value".format(self._left_field, self._right_field) + else: + raise NotImplementedError("Not implemented operation '{0}'".format(self._op_name)) + + if query_params['query_script_fields'] is None: + query_params['query_script_fields'] = {} + query_params['query_script_fields'][self._field_name] = { + 'script': { + 'source': source + } + } + + return query_params, post_processing diff --git a/eland/tests/dataframe/test_aggs_pytest.py b/eland/tests/dataframe/test_aggs_pytest.py index 0086515..a118079 100644 --- a/eland/tests/dataframe/test_aggs_pytest.py +++ b/eland/tests/dataframe/test_aggs_pytest.py @@ -27,3 +27,23 @@ class TestDataFrameAggs(TestData): print(ed_sum_min_std.dtypes) assert_almost_equal(pd_sum_min_std, ed_sum_min_std, check_less_precise=True) + + def test_terms_aggs(self): + pd_flights = self.pd_flights() + ed_flights = self.ed_flights() + + pd_sum_min = pd_flights.select_dtypes(include=[np.number]).agg(['sum', 'min']) + ed_sum_min = ed_flights.select_dtypes(include=[np.number]).agg(['sum', 'min']) + + # Eland returns all float values for all metric aggs, pandas can return int + # TODO - investigate this more + pd_sum_min = pd_sum_min.astype('float64') + assert_almost_equal(pd_sum_min, ed_sum_min) + + pd_sum_min_std = pd_flights.select_dtypes(include=[np.number]).agg(['sum', 'min', 'std']) + ed_sum_min_std = ed_flights.select_dtypes(include=[np.number]).agg(['sum', 'min', 'std']) + + print(pd_sum_min_std.dtypes) + print(ed_sum_min_std.dtypes) + + assert_almost_equal(pd_sum_min_std, ed_sum_min_std, check_less_precise=True) diff --git a/eland/tests/dataframe/test_head_tail_pytest.py b/eland/tests/dataframe/test_head_tail_pytest.py index f342ac2..0860382 100644 --- a/eland/tests/dataframe/test_head_tail_pytest.py +++ b/eland/tests/dataframe/test_head_tail_pytest.py @@ -85,3 +85,10 @@ class TestDataFrameHeadTail(TestData): ed_head_0 = ed_flights.head(0) pd_head_0 = pd_flights.head(0) assert_pandas_eland_frame_equal(pd_head_0, ed_head_0) + + def test_doc_test_tail(self): + df = self.ed_flights() + df = df[(df.OriginAirportID == 'AMS') & (df.FlightDelayMin > 60)] + df = df[['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']] + df = df.tail() + print(df) diff --git a/eland/tests/series/test_str_arithmetics_pytest.py b/eland/tests/series/test_str_arithmetics_pytest.py index d5b8341..67769b2 100644 --- a/eland/tests/series/test_str_arithmetics_pytest.py +++ b/eland/tests/series/test_str_arithmetics_pytest.py @@ -20,8 +20,7 @@ class TestSeriesArithmetics(TestData): with pytest.raises(TypeError): assert self.ed_ecommerce()['total_quantity'] + self.ed_ecommerce()['currency'] - def test_str_add_ser(self): - + def test_ser_add_ser(self): edadd = self.ed_ecommerce()['customer_first_name'] + self.ed_ecommerce()['customer_last_name'] pdadd = self.pd_ecommerce()['customer_first_name'] + self.pd_ecommerce()['customer_last_name'] @@ -33,12 +32,31 @@ class TestSeriesArithmetics(TestData): assert_pandas_eland_series_equal(pdadd, edadd) - def test_ser_add_ser(self): + def test_str_add_ser(self): edadd = "The last name is: " + self.ed_ecommerce()['customer_last_name'] pdadd = "The last name is: " + self.pd_ecommerce()['customer_last_name'] assert_pandas_eland_series_equal(pdadd, edadd) + def test_bad_str_add_ser(self): + # TODO encode special characters better + # Elasticsearch accepts this, but it will cause problems + edadd = " *" + self.ed_ecommerce()['customer_last_name'] + pdadd = " *" + self.pd_ecommerce()['customer_last_name'] + + assert_pandas_eland_series_equal(pdadd, edadd) + + + def test_ser_add_str_add_ser(self): + pdadd = self.pd_ecommerce()['customer_first_name'] + self.pd_ecommerce()['customer_last_name'] + print(pdadd.name) + edadd = self.ed_ecommerce()['customer_first_name'] + self.ed_ecommerce()['customer_last_name'] + print(edadd.name) + + print(edadd.info_es()) + + assert_pandas_eland_series_equal(pdadd, edadd) + def test_non_aggregatable_add_str(self): with pytest.raises(ValueError): assert self.ed_ecommerce()['customer_gender'] + "is the gender" diff --git a/eland/tests/series/test_value_counts_pytest.py b/eland/tests/series/test_value_counts_pytest.py index 97effbf..a43d8a8 100644 --- a/eland/tests/series/test_value_counts_pytest.py +++ b/eland/tests/series/test_value_counts_pytest.py @@ -47,6 +47,14 @@ class TestSeriesValueCounts(TestData): assert ed_s.value_counts(es_size=-9) def test_value_counts_non_aggregatable(self): + ed_s = self.ed_ecommerce()['customer_first_name'] + pd_s = self.pd_ecommerce()['customer_first_name'] + + pd_vc = pd_s.value_counts().head(20).sort_index() + ed_vc = ed_s.value_counts(es_size=20).sort_index() + + assert_series_equal(pd_vc, ed_vc) + ed_s = self.ed_ecommerce()['customer_gender'] with pytest.raises(ValueError): assert ed_s.value_counts()