From ef289bfe787fe36ad723825fa2779a3f75705a23 Mon Sep 17 00:00:00 2001 From: Stephen Dodson Date: Wed, 14 Aug 2019 14:44:04 +0000 Subject: [PATCH] Adding partial DataFrame.query support Only > and == currently implemented for PoC. 'query' language not supported yet. --- eland/dataframe.py | 36 ++++ eland/groupby.py | 22 +++ eland/operations.py | 137 ++++++++++++++ eland/operators/__init__.py | 3 + eland/operators/filter.py | 148 +++++++++++++++ eland/query.py | 62 ++++-- eland/query_compiler.py | 10 + eland/series.py | 26 ++- eland/tests/dataframe/test_aggs_pytest.py | 61 ++---- eland/tests/dataframe/test_datetime_pytest.py | 44 +++++ eland/tests/dataframe/test_query_pytest.py | 48 +++++ eland/tests/operators/__init__.py | 0 .../tests/operators/test_operators_pytest.py | 177 ++++++++++++++++++ 13 files changed, 705 insertions(+), 69 deletions(-) create mode 100644 eland/groupby.py create mode 100644 eland/operators/__init__.py create mode 100644 eland/operators/filter.py create mode 100644 eland/tests/dataframe/test_datetime_pytest.py create mode 100644 eland/tests/dataframe/test_query_pytest.py create mode 100644 eland/tests/operators/__init__.py create mode 100644 eland/tests/operators/test_operators_pytest.py diff --git a/eland/dataframe.py b/eland/dataframe.py index dd47a59..858ba37 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -4,6 +4,8 @@ from distutils.version import LooseVersion import numpy as np import pandas as pd +import pandas.compat as compat +import six from pandas.compat import StringIO from pandas.core.common import apply_if_callable, is_bool_indexer from pandas.core.dtypes.common import ( @@ -17,6 +19,7 @@ from pandas.io.formats.printing import pprint_thing import eland.plotting as gfx from eland import NDFrame from eland import Series +from eland.operators import BooleanFilter, ScriptFilter class DataFrame(NDFrame): @@ -373,6 +376,10 @@ class DataFrame(NDFrame): return self._getitem_array(key) elif isinstance(key, DataFrame): return self.where(key) + elif isinstance(key, BooleanFilter): + return DataFrame( + query_compiler=self._query_compiler._update_query(key) + ) else: return self._getitem_column(key) @@ -502,6 +509,11 @@ class DataFrame(NDFrame): def keys(self): return self.columns + def groupby(self, by=None, axis=0, *args, **kwargs): + axis = self._get_axis_number(axis) + + if axis == 1: + raise NotImplementedError("Aggregating via index not currently implemented - needs index transform") def aggregate(self, func, axis=0, *args, **kwargs): """ @@ -540,7 +552,31 @@ class DataFrame(NDFrame): # currently we only support a subset of functions that aggregate columns. # ['count', 'mad', 'max', 'mean', 'median', 'min', 'mode', 'quantile', 'rank', 'sem', 'skew', 'sum', 'std', 'var', 'nunique'] + if isinstance(func, compat.string_types): + # wrap in list + func = [func] + return self._query_compiler.aggs(func) + elif is_list_like(func): + # we have a list! + return self._query_compiler.aggs(func) agg = aggregate hist = gfx.ed_hist_frame + + def query(self, expr, inplace=False, **kwargs): + """Queries the Dataframe with a boolean expression + + Returns: + A new DataFrame if inplace=False + """ + if isinstance(expr, BooleanFilter): + return DataFrame( + query_compiler=self._query_compiler._update_query(key) + ) + elif isinstance(expr, six.string_types): + return DataFrame( + query_compiler=self._query_compiler._update_query(ScriptFilter(expr)) + ) + else: + raise NotImplementedError(expr, type(expr)) diff --git a/eland/groupby.py b/eland/groupby.py new file mode 100644 index 0000000..2233523 --- /dev/null +++ b/eland/groupby.py @@ -0,0 +1,22 @@ +""" +GroupBy +--------- +Define the SeriesGroupBy, DataFrameGroupBy, and PanelGroupBy +classes that hold the groupby interfaces (and some implementations). + +These are user facing as the result of the ``df.groupby(...)`` operations, +which here returns a DataFrameGroupBy object. +""" + +from eland import NDFrame + + +class DataFrameGroupBy(NDFrame): + + def __init__(self, + df, + by): + super().__init__( + query_compiler=df._query_compiler.copy() + ) + self._query_compiler.groupby_agg(by) diff --git a/eland/operations.py b/eland/operations.py index 7ec29c2..22f01ce 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -1,5 +1,6 @@ import copy from enum import Enum +from io import StringIO import pandas as pd import numpy as np @@ -278,6 +279,128 @@ class Operations: return df_bins, df_weights + @staticmethod + def _map_pd_aggs_to_es_aggs(pd_aggs): + """ + Args: + pd_aggs - list of pandas aggs (e.g. ['mad', 'min', 'std'] etc.) + + Returns: + ed_aggs - list of corresponding es_aggs (e.g. ['median_absolute_deviation', 'min', 'std'] etc.) + + Pandas supports a lot of options here, and these options generally work on text and numerics in pandas. + Elasticsearch has metric aggs and terms aggs so will have different behaviour. + + Pandas aggs that return columns (as opposed to transformed rows): + + all + any + count + mad + max + mean + median + min + mode + quantile + rank + sem + skew + sum + std + var + nunique + """ + ed_aggs = [] + for pd_agg in pd_aggs: + if pd_agg == 'count': + ed_aggs.append('count') + elif pd_agg == 'mad': + ed_aggs.append('median_absolute_deviation') + elif pd_agg == 'max': + ed_aggs.append('max') + elif pd_agg == 'mean': + ed_aggs.append('avg') + elif pd_agg == 'median': + ed_aggs.append(('percentiles', '50.0')) + elif pd_agg == 'min': + ed_aggs.append('min') + elif pd_agg == 'mode': + # We could do this via top term + raise NotImplementedError(pd_agg, " not currently implemented") + elif pd_agg == 'quantile': + # TODO + raise NotImplementedError(pd_agg, " not currently implemented") + elif pd_agg == 'rank': + # TODO + raise NotImplementedError(pd_agg, " not currently implemented") + elif pd_agg == 'sem': + # TODO + raise NotImplementedError(pd_agg, " not currently implemented") + elif pd_agg == 'sum': + ed_aggs.append('sum') + elif pd_agg == 'std': + ed_aggs.append(('extended_stats', 'std_deviation')) + elif pd_agg == 'var': + ed_aggs.append(('extended_stats', 'variance')) + else: + raise NotImplementedError(pd_agg, " not currently implemented") + + # TODO - we can optimise extended_stats here as if we have 'count' and 'std' extended_stats would + # return both in one call + + return ed_aggs + + def aggs(self, query_compiler, pd_aggs): + query_params, post_processing = self._resolve_tasks() + + size = self._size(query_params, post_processing) + if size is not None: + raise NotImplementedError("Can not count field matches if size is set {}".format(size)) + + columns = self.get_columns() + + body = Query(query_params['query']) + + # convert pandas aggs to ES equivalent + es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs) + + for field in columns: + for es_agg in es_aggs: + # If we have multiple 'extended_stats' etc. here we simply NOOP on 2nd call + if isinstance(es_agg, tuple): + body.metric_aggs(es_agg[0] + '_' + field, es_agg[0], field) + else: + body.metric_aggs(es_agg + '_' + field, es_agg, field) + + response = query_compiler._client.search( + index=query_compiler._index_pattern, + size=0, + body=body.to_search_body()) + + """ + Results are like (for 'sum', 'min') + + AvgTicketPrice DistanceKilometers DistanceMiles FlightDelayMin + sum 8.204365e+06 9.261629e+07 5.754909e+07 618150 + min 1.000205e+02 0.000000e+00 0.000000e+00 0 + """ + results = {} + + for field in columns: + values = list() + for es_agg in es_aggs: + if isinstance(es_agg, tuple): + values.append(response['aggregations'][es_agg[0] + '_' + field][es_agg[1]]) + else: + values.append(response['aggregations'][es_agg + '_' + field]['value']) + + results[field] = values + + df = pd.DataFrame(data=results, index=pd_aggs) + + return df + def describe(self, query_compiler): query_params, post_processing = self._resolve_tasks() @@ -566,6 +689,8 @@ class Operations: query_params, post_processing = self._resolve_query_ids(task, query_params, post_processing) elif task[0] == 'query_terms': query_params, post_processing = self._resolve_query_terms(task, query_params, post_processing) + elif task[0] == 'boolean_filter': + query_params, post_processing = self._resolve_boolean_filter(task, query_params, post_processing) else: # a lot of operations simply post-process the dataframe - put these straight through query_params, post_processing = self._resolve_post_processing_task(task, query_params, post_processing) @@ -689,6 +814,14 @@ class Operations: return query_params, post_processing + def _resolve_boolean_filter(self, item, query_params, post_processing): + # task = ('boolean_filter', object) + boolean_filter = item[1] + + query_params['query'].update_boolean_filter(boolean_filter) + + return query_params, post_processing + def _resolve_post_processing_task(self, item, query_params, post_processing): # Just do this in post-processing if item[0] != 'columns': @@ -722,3 +855,7 @@ class Operations: buf.write("\tsort_params: {0}\n".format(sort_params)) buf.write("\tcolumns: {0}\n".format(columns)) buf.write("\tpost_processing: {0}\n".format(post_processing)) + + def update_query(self, boolean_filter): + task = ('boolean_filter', boolean_filter) + self._tasks.append(task) diff --git a/eland/operators/__init__.py b/eland/operators/__init__.py new file mode 100644 index 0000000..02fc01f --- /dev/null +++ b/eland/operators/__init__.py @@ -0,0 +1,3 @@ +# -*- coding: UTF-8 -*- + +from eland.operators.filter import * diff --git a/eland/operators/filter.py b/eland/operators/filter.py new file mode 100644 index 0000000..2a6c1fb --- /dev/null +++ b/eland/operators/filter.py @@ -0,0 +1,148 @@ +# Derived from pandasticsearch filters + +# Es filter builder for BooleanCond +class BooleanFilter(object): + def __init__(self, *args): + self._filter = None + + def __and__(self, x): + # Combine results + if isinstance(self, AndFilter): + self.subtree['must'].append(x.subtree) + return self + elif isinstance(x, AndFilter): + x.subtree['must'].append(self.subtree) + return x + return AndFilter(self, x) + + def __or__(self, x): + # Combine results + if isinstance(self, OrFilter): + self.subtree['should'].append(x.subtree) + return self + elif isinstance(x, OrFilter): + x.subtree['should'].append(self.subtree) + return x + return OrFilter(self, x) + + def __invert__(self): + return NotFilter(self) + + def empty(self): + if self._filter is None: + return True + return False + + @property + def subtree(self): + if 'bool' in self._filter: + return self._filter['bool'] + else: + return self._filter + + def build(self): + return self._filter + + +# Binary operator +class AndFilter(BooleanFilter): + def __init__(self, *args): + [isinstance(x, BooleanFilter) for x in args] + super(AndFilter, self).__init__() + self._filter = {'bool': {'must': [x.build() for x in args]}} + + +class OrFilter(BooleanFilter): + def __init__(self, *args): + [isinstance(x, BooleanFilter) for x in args] + super(OrFilter, self).__init__() + self._filter = {'bool': {'should': [x.build() for x in args]}} + + +class NotFilter(BooleanFilter): + def __init__(self, x): + assert isinstance(x, BooleanFilter) + super(NotFilter, self).__init__() + self._filter = {'bool': {'must_not': x.build()}} + + +# LeafBooleanFilter +class GreaterEqual(BooleanFilter): + def __init__(self, field, value): + super(GreaterEqual, self).__init__() + self._filter = {'range': {field: {'gte': value}}} + + +class Greater(BooleanFilter): + def __init__(self, field, value): + super(Greater, self).__init__() + self._filter = {'range': {field: {'gt': value}}} + + +class LessEqual(BooleanFilter): + def __init__(self, field, value): + super(LessEqual, self).__init__() + self._filter = {'range': {field: {'lte': value}}} + + +class Less(BooleanFilter): + def __init__(self, field, value): + super(Less, self).__init__() + self._filter = {'range': {field: {'lt': value}}} + + +class Equal(BooleanFilter): + def __init__(self, field, value): + super(Equal, self).__init__() + self._filter = {'term': {field: value}} + + +class IsIn(BooleanFilter): + def __init__(self, field, value): + super(IsIn, self).__init__() + assert isinstance(value, list) + if field == 'ids': + self._filter = {'ids': {'values': value}} + else: + self._filter = {'terms': {field: value}} + + +class Like(BooleanFilter): + def __init__(self, field, value): + super(Like, self).__init__() + self._filter = {'wildcard': {field: value}} + + +class Rlike(BooleanFilter): + def __init__(self, field, value): + super(Rlike, self).__init__() + self._filter = {'regexp': {field: value}} + + +class Startswith(BooleanFilter): + def __init__(self, field, value): + super(Startswith, self).__init__() + self._filter = {'prefix': {field: value}} + + +class IsNull(BooleanFilter): + def __init__(self, field): + super(IsNull, self).__init__() + self._filter = {'missing': {'field': field}} + + +class NotNull(BooleanFilter): + def __init__(self, field): + super(NotNull, self).__init__() + self._filter = {'exists': {'field': field}} + + +class ScriptFilter(BooleanFilter): + def __init__(self, inline, lang=None, params=None): + super(ScriptFilter, self).__init__() + script = {'inline': inline} + if lang is not None: + script['lang'] = lang + if params is not None: + script['params'] = params + self._filter = {'script': {'script': script}} \ No newline at end of file diff --git a/eland/query.py b/eland/query.py index 0b12d6f..2f67b1d 100644 --- a/eland/query.py +++ b/eland/query.py @@ -1,6 +1,8 @@ import warnings from copy import deepcopy +from eland.operators import BooleanFilter, NotNull, IsNull, IsIn + class Query: """ @@ -12,7 +14,7 @@ class Query: def __init__(self, query=None): if query is None: - self._query = self._query_template() + self._query = BooleanFilter() self._aggs = {} else: # Deep copy the incoming query so we can change it @@ -25,9 +27,15 @@ class Query: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-exists-query.html """ if must: - self._query['bool']['must'].append({'exists': {'field': field}}) + if self._query.empty(): + self._query = NotNull(field) + else: + self._query = self._query & NotNull(field) else: - self._query['bool']['must_not'].append({'exists': {'field': field}}) + if self._query.empty(): + self._query = IsNull(field) + else: + self._query = self._query & IsNull(field) def ids(self, items, must=True): """ @@ -35,9 +43,15 @@ class Query: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-ids-query.html """ if must: - self._query['bool']['must'].append({'ids': {'values': items}}) + if self._query.empty(): + self._query = IsIn('ids', items) + else: + self._query = self._query & IsIn('ids', items) else: - self._query['bool']['must_not'].append({'ids': {'values': items}}) + if self._query.empty(): + self._query = ~(IsIn('ids', items)) + else: + self._query = self._query & ~(IsIn('ids', items)) def terms(self, field, items, must=True): """ @@ -45,9 +59,15 @@ class Query: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-terms-query.html """ if must: - self._query['bool']['must'].append({'terms': {field: items}}) + if self._query.empty(): + self._query = IsIn(field, items) + else: + self._query = self._query & IsIn(field, items) else: - self._query['bool']['must_not'].append({'terms': {field: items}}) + if self._query.empty(): + self._query = ~(IsIn(field, items)) + else: + self._query = self._query & ~(IsIn(field, items)) def metric_aggs(self, name, func, field): """ @@ -83,7 +103,7 @@ class Query: min = min_aggs[field] max = max_aggs[field] - interval = (max - min)/num_bins + interval = (max - min) / num_bins agg = { "histogram": { @@ -94,25 +114,27 @@ class Query: self._aggs[name] = agg def to_search_body(self): - body = {"query": self._query, "aggs": self._aggs} + if self._query.empty(): + body = {"aggs": self._aggs} + else: + body = {"query": self._query.build(), "aggs": self._aggs} return body def to_count_body(self): if len(self._aggs) > 0: warnings.warn('Requesting count for agg query {}', self) - body = {"query": self._query} + if self._query.empty(): + body = None + else: + body = {"query": self._query.build()} return body + def update_boolean_filter(self, boolean_filter): + if self._query.empty(): + self._query = boolean_filter + else: + self._query = self._query & boolean_filter + def __repr__(self): return repr(self.to_search_body()) - - @staticmethod - def _query_template(): - template = { - "bool": { - "must": [], - "must_not": [] - } - } - return deepcopy(template) diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 92fd6a8..7dacd9c 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -407,6 +407,9 @@ class ElandQueryCompiler(BaseQueryCompiler): return result + def aggs(self, func): + return self._operations.aggs(self, func) + def count(self): return self._operations.count(self) def mean(self): @@ -461,4 +464,11 @@ class ElandQueryCompiler(BaseQueryCompiler): else: pass + def _update_query(self, boolean_filter): + result = self.copy() + + result._operations.update_query(boolean_filter) + + return result + diff --git a/eland/series.py b/eland/series.py index 89381af..8100070 100644 --- a/eland/series.py +++ b/eland/series.py @@ -15,10 +15,12 @@ Based on NDFrame which underpins eland.1DataFrame """ -import pandas as pd import warnings +import pandas as pd + from eland import NDFrame +from eland.operators import Equal, Greater, ScriptFilter class Series(NDFrame): @@ -130,7 +132,7 @@ class Series(NDFrame): max_rows = 60 # Create a slightly bigger dataframe than display - temp_df = self._build_repr_df(max_rows+1, None) + temp_df = self._build_repr_df(max_rows + 1, None) if isinstance(temp_df, pd.DataFrame): temp_df = temp_df[self.name] temp_str = repr(temp_df) @@ -151,3 +153,23 @@ class Series(NDFrame): def _to_pandas(self): return self._query_compiler.to_pandas()[self.name] + + def __gt__(self, other): + if isinstance(other, Series): + # Need to use scripted query to compare to values + painless = "doc['{}'].value > doc['{}'].value".format(self.name, other.name) + return ScriptFilter(painless) + elif isinstance(other, (int, float)): + return Greater(field=self.name, value=other) + else: + raise NotImplementedError(other, type(other)) + + def __eq__(self, other): + if isinstance(other, Series): + # Need to use scripted query to compare to values + painless = "doc['{}'].value == doc['{}'].value".format(self.name, other.name) + return ScriptFilter(painless) + elif isinstance(other, (int, float)): + return Equal(field=self.name, value=other) + else: + raise NotImplementedError(other, type(other)) diff --git a/eland/tests/dataframe/test_aggs_pytest.py b/eland/tests/dataframe/test_aggs_pytest.py index 6affb79..afdaa74 100644 --- a/eland/tests/dataframe/test_aggs_pytest.py +++ b/eland/tests/dataframe/test_aggs_pytest.py @@ -2,6 +2,7 @@ import numpy as np import pandas as pd +from pandas.util.testing import (assert_almost_equal) from eland.tests.common import TestData @@ -12,52 +13,18 @@ class TestDataFrameAggs(TestData): pd_flights = self.pd_flights() ed_flights = self.ed_flights() - pd_numerics = pd_flights.select_dtypes(include=[np.number]) - print(pd_numerics.columns) - print(pd_numerics.agg('abs')) # all rows - print(pd_numerics.agg('all')) # columns True/False - print(pd_numerics.agg('any')) # columns True/False - print(pd_numerics.agg('corr')) # matrix col/col - print(pd_numerics.agg('count')) # columns count - print(pd_numerics.agg('cov')) # matrix col/col - print(pd_numerics.agg('cummax')) # all rows - print(pd_numerics.agg('cummin')) # all rows - print(pd_numerics.agg('cumprod')) # all rows - print(pd_numerics.agg('cumsum')) # all rows - print(pd_numerics.agg('describe')) # describe - print(pd_numerics.agg('diff')) # all rows - print(pd_numerics.agg('kurt')) # ?> - print(pd_numerics.agg('mad')) # col - print('MAX') - print(pd_numerics.agg('max')) # col - print(pd_numerics.agg('mean')) # col - print(pd_numerics.agg('median')) # col - print(pd_numerics.agg('min')) # col - print(pd_numerics.agg('mode')) # col - print(pd_numerics.agg('pct_change')) # all rows - print(pd_numerics.agg('prod')) # all rows - print(pd_numerics.agg('quantile')) # col - print(pd_numerics.agg('rank')) # col - print(pd_numerics.agg('round')) # all rows - print('SEM') - print(pd_numerics.agg('sem')) # col - print(pd_numerics.agg('skew')) # col - print(pd_numerics.agg('sum')) # col - print(pd_numerics.agg('std')) # col - print(pd_numerics.agg('var')) # col - print(pd_numerics.agg('nunique')) # col - - print(pd_numerics.aggs(np.sqrt)) # all rows - - - return - pd_sum_min = pd_flights.select_dtypes(include=[np.number]).agg(['sum', 'min']) - print(type(pd_sum_min)) - with pd.option_context('display.max_rows', None, 'display.max_columns', None): - print(pd_sum_min) - ed_sum_min = ed_flights.select_dtypes(include=[np.number]).agg(['sum', 'min']) - print(type(ed_sum_min)) - with pd.option_context('display.max_rows', None, 'display.max_columns', None): - print(ed_sum_min) + + # Eland returns all float values for all metric aggs, pandas can return int + # TODO - investigate this more + pd_sum_min = pd_sum_min.astype('float64') + assert_almost_equal(pd_sum_min, ed_sum_min) + + pd_sum_min_std = pd_flights.select_dtypes(include=[np.number]).agg(['sum', 'min', 'std']) + ed_sum_min_std = ed_flights.select_dtypes(include=[np.number]).agg(['sum', 'min', 'std']) + + print(pd_sum_min_std.dtypes) + print(ed_sum_min_std.dtypes) + + assert_almost_equal(pd_sum_min_std, ed_sum_min_std, check_less_precise=True) diff --git a/eland/tests/dataframe/test_datetime_pytest.py b/eland/tests/dataframe/test_datetime_pytest.py new file mode 100644 index 0000000..16c0b39 --- /dev/null +++ b/eland/tests/dataframe/test_datetime_pytest.py @@ -0,0 +1,44 @@ +# File called _pytest for PyCharm compatability + +import numpy as np +import pandas as pd + +import eland as ed +from eland.tests.common import ELASTICSEARCH_HOST +from eland.tests.common import TestData + + +class TestDataFrameDateTime(TestData): + + def test_datetime_to_ms(self): + df = pd.DataFrame(data={'A': np.random.rand(3), + 'B': 1, + 'C': 'foo', + 'D': pd.Timestamp('20190102'), + 'E': [1.0, 2.0, 3.0], + 'F': False, + 'G': [1, 2, 3]}, + index=['0', '1', '2']) + + expected_mappings = {'mappings': { + 'properties': {'A': {'type': 'double'}, + 'B': {'type': 'long'}, + 'C': {'type': 'keyword'}, + 'D': {'type': 'date'}, + 'E': {'type': 'double'}, + 'F': {'type': 'boolean'}, + 'G': {'type': 'long'}}}} + + mappings = ed.Mappings._generate_es_mappings(df) + + assert expected_mappings == mappings + + # Now create index + index_name = 'eland_test_generate_es_mappings' + + ed.pandas_to_es(df, ELASTICSEARCH_HOST, index_name, if_exists="replace", refresh=True) + + ed_df = ed.DataFrame(ELASTICSEARCH_HOST, index_name) + ed_df_head = ed_df.head() + + # assert_frame_equal(df, ed_df_head) diff --git a/eland/tests/dataframe/test_query_pytest.py b/eland/tests/dataframe/test_query_pytest.py new file mode 100644 index 0000000..1467078 --- /dev/null +++ b/eland/tests/dataframe/test_query_pytest.py @@ -0,0 +1,48 @@ +# File called _pytest for PyCharm compatability + +import pandas as pd + +import eland as ed +from eland.tests.common import ELASTICSEARCH_HOST +from eland.tests.common import TestData +from eland.tests.common import assert_pandas_eland_frame_equal + + +class TestDataFrameQuery(TestData): + + def test_query1(self): + # Examples from: + # https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.query.html + pd_df = pd.DataFrame({'A': range(1, 6), 'B': range(10, 0, -2), 'C': range(10, 5, -1)}, + index=['0', '1', '2', '3', '4']) + + # Now create index + index_name = 'eland_test_query1' + + ed.pandas_to_es(pd_df, ELASTICSEARCH_HOST, index_name, if_exists="replace", refresh=True) + ed_df = ed.DataFrame(ELASTICSEARCH_HOST, index_name) + + assert_pandas_eland_frame_equal(pd_df, ed_df) + + pd_df.info() + ed_df.info() + + pd_q1 = pd_df[pd_df.A > 2] + pd_q2 = pd_df[pd_df.A > pd_df.B] + pd_q3 = pd_df[pd_df.B == pd_df.C] + + ed_q1 = ed_df[ed_df.A > 2] + ed_q2 = ed_df[ed_df.A > ed_df.B] + ed_q3 = ed_df[ed_df.B == ed_df.C] + + assert_pandas_eland_frame_equal(pd_q1, ed_q1) + assert_pandas_eland_frame_equal(pd_q2, ed_q2) + assert_pandas_eland_frame_equal(pd_q3, ed_q3) + + def test_query2(self): + ed_flights = self.ed_flights() + pd_flights = self.pd_flights() + + cancelled = pd_flights[pd_flights.Cancelled == True] + + print(cancelled.groupby(['DestWeather']).count()) diff --git a/eland/tests/operators/__init__.py b/eland/tests/operators/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/eland/tests/operators/test_operators_pytest.py b/eland/tests/operators/test_operators_pytest.py new file mode 100644 index 0000000..bab87ca --- /dev/null +++ b/eland/tests/operators/test_operators_pytest.py @@ -0,0 +1,177 @@ +# -*- coding: UTF-8 -*- +from eland.operators import * + + +class TestOperators(): + def test_leaf_boolean_filter(self): + assert GreaterEqual('a', 2).build() == {"range": {"a": {"gte": 2}}} + assert LessEqual('a', 2).build() == {"range": {"a": {"lte": 2}}} + assert Less('a', 2).build() == {"range": {"a": {"lt": 2}}} + assert Equal('a', 2).build() == {"term": {"a": 2}} + exp = Equal('a', 2) + assert (~exp).build()['bool'], {"must_not": {"term": {"a": 2}}} + assert Greater('a', 2).build() == {"range": {"a": {"gt": 2}}} + assert IsIn('a', [1, 2, 3]).build() == {'terms': {'a': [1, 2, 3]}} + assert Like('a', 'a*b').build() == {'wildcard': {'a': 'a*b'}} + assert Rlike('a', 'a*b').build() == {'regexp': {'a': 'a*b'}} + assert Startswith('a', 'jj').build() == {'prefix': {'a': 'jj'}} + assert IsNull('a').build() == {'missing': {'field': 'a'}} + assert NotNull('a').build() == {'exists': {'field': 'a'}} + assert ScriptFilter('doc["num1"].value > params.param1', params={'param1': 5}).build() == { + 'script': {'script': {'inline': 'doc["num1"].value > params.param1', 'params': {'param1': 5}}}} + assert IsIn('ids', [1, 2, 3]).build() == {'ids': {'values': [1, 2, 3]}} + + def test_and_none(self): + exp = None + exp = exp & Less('b', 3) + print(exp.build()) + + def test_and_filter1(self): + exp = GreaterEqual('a', 2) & Less('b', 3) + assert exp.build() == {'bool': {'must': [{'range': {'a': {'gte': 2}}}, {'range': {'b': {'lt': 3}}}]}} + + def test_and_filter2(self): + exp = GreaterEqual('a', 2) & Less('b', 3) & Equal('c', 4) + assert exp.build() == \ + { + 'bool': { + 'must': [ + {'range': {'a': {'gte': 2}}}, + {'range': {'b': {'lt': 3}}}, + {'term': {'c': 4}} + ] + } + } + + def test_and_filter3(self): + exp = GreaterEqual('a', 2) & (Less('b', 3) & Equal('c', 4)) + assert exp.build() == \ + { + 'bool': { + 'must': [ + {'range': {'b': {'lt': 3}}}, + {'term': {'c': 4}}, + {'range': {'a': {'gte': 2}}} + ] + } + } + + def test_or_filter1(self): + exp = GreaterEqual('a', 2) | Less('b', 3) + assert exp.build() == \ + { + 'bool': { + 'should': [ + {'range': {'a': {'gte': 2}}}, + {'range': {'b': {'lt': 3}}} + ] + } + } + + def test_or_filter2(self): + exp = GreaterEqual('a', 2) | Less('b', 3) | Equal('c', 4) + assert exp.build() == \ + { + 'bool': { + 'should': [ + {'range': {'a': {'gte': 2}}}, + {'range': {'b': {'lt': 3}}}, + {'term': {'c': 4}} + ] + } + } + + def test_or_filter3(self): + exp = GreaterEqual('a', 2) | (Less('b', 3) | Equal('c', 4)) + assert exp.build() == \ + { + 'bool': { + 'should': [ + {'range': {'b': {'lt': 3}}}, + {'term': {'c': 4}}, + {'range': {'a': {'gte': 2}}} + ] + } + } + + def test_not_filter(self): + exp = ~GreaterEqual('a', 2) + assert exp.build() == \ + { + 'bool': { + 'must_not': {'range': {'a': {'gte': 2}}} + } + } + + def test_not_not_filter(self): + exp = ~~GreaterEqual('a', 2) + + assert exp.build() == \ + { + 'bool': { + 'must_not': { + 'bool': { + 'must_not': {'range': {'a': {'gte': 2}}} + } + } + } + } + + def test_not_and_filter(self): + exp = ~(GreaterEqual('a', 2) & Less('b', 3)) + assert exp.build() == \ + { + 'bool': { + 'must_not': { + 'bool': { + 'must': [ + {'range': {'a': {'gte': 2}}}, + {'range': {'b': {'lt': 3}}} + ] + } + } + } + } + + def test_and_or_filter(self): + exp = GreaterEqual('a', 2) & (Less('b', 3) | Equal('c', 4)) + assert exp.build() == \ + { + 'bool': { + 'must': [ + {'range': {'a': {'gte': 2}}}, + { + 'bool': { + 'should': [ + {'range': {'b': {'lt': 3}}}, + {'term': {'c': 4}} + ] + } + } + ] + } + } + + def test_and_not_or_filter(self): + exp = GreaterEqual('a', 2) & ~(Less('b', 3) | Equal('c', 4)) + assert exp.build() == \ + { + 'bool': { + 'must': [ + {'range': {'a': {'gte': 2}}}, + { + 'bool': { + 'must_not': { + 'bool': { + 'should': [ + {'range': {'b': {'lt': 3}}}, + {'term': {'c': 4}} + ] + } + + } + } + } + ] + } + }