Merge pull request #23 from stevedodson/master

Adding partial DataFrame.query support
This commit is contained in:
stevedodson 2019-08-14 14:45:45 +00:00 committed by GitHub
commit e34816144d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 705 additions and 69 deletions

View File

@ -4,6 +4,8 @@ from distutils.version import LooseVersion
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import pandas.compat as compat
import six
from pandas.compat import StringIO from pandas.compat import StringIO
from pandas.core.common import apply_if_callable, is_bool_indexer from pandas.core.common import apply_if_callable, is_bool_indexer
from pandas.core.dtypes.common import ( from pandas.core.dtypes.common import (
@ -17,6 +19,7 @@ from pandas.io.formats.printing import pprint_thing
import eland.plotting as gfx import eland.plotting as gfx
from eland import NDFrame from eland import NDFrame
from eland import Series from eland import Series
from eland.operators import BooleanFilter, ScriptFilter
class DataFrame(NDFrame): class DataFrame(NDFrame):
@ -373,6 +376,10 @@ class DataFrame(NDFrame):
return self._getitem_array(key) return self._getitem_array(key)
elif isinstance(key, DataFrame): elif isinstance(key, DataFrame):
return self.where(key) return self.where(key)
elif isinstance(key, BooleanFilter):
return DataFrame(
query_compiler=self._query_compiler._update_query(key)
)
else: else:
return self._getitem_column(key) return self._getitem_column(key)
@ -502,6 +509,11 @@ class DataFrame(NDFrame):
def keys(self): def keys(self):
return self.columns return self.columns
def groupby(self, by=None, axis=0, *args, **kwargs):
axis = self._get_axis_number(axis)
if axis == 1:
raise NotImplementedError("Aggregating via index not currently implemented - needs index transform")
def aggregate(self, func, axis=0, *args, **kwargs): def aggregate(self, func, axis=0, *args, **kwargs):
""" """
@ -540,7 +552,31 @@ class DataFrame(NDFrame):
# currently we only support a subset of functions that aggregate columns. # currently we only support a subset of functions that aggregate columns.
# ['count', 'mad', 'max', 'mean', 'median', 'min', 'mode', 'quantile', 'rank', 'sem', 'skew', 'sum', 'std', 'var', 'nunique'] # ['count', 'mad', 'max', 'mean', 'median', 'min', 'mode', 'quantile', 'rank', 'sem', 'skew', 'sum', 'std', 'var', 'nunique']
if isinstance(func, compat.string_types):
# wrap in list
func = [func]
return self._query_compiler.aggs(func)
elif is_list_like(func):
# we have a list!
return self._query_compiler.aggs(func)
agg = aggregate agg = aggregate
hist = gfx.ed_hist_frame hist = gfx.ed_hist_frame
def query(self, expr, inplace=False, **kwargs):
"""Queries the Dataframe with a boolean expression
Returns:
A new DataFrame if inplace=False
"""
if isinstance(expr, BooleanFilter):
return DataFrame(
query_compiler=self._query_compiler._update_query(key)
)
elif isinstance(expr, six.string_types):
return DataFrame(
query_compiler=self._query_compiler._update_query(ScriptFilter(expr))
)
else:
raise NotImplementedError(expr, type(expr))

22
eland/groupby.py Normal file
View File

@ -0,0 +1,22 @@
"""
GroupBy
---------
Define the SeriesGroupBy, DataFrameGroupBy, and PanelGroupBy
classes that hold the groupby interfaces (and some implementations).
These are user facing as the result of the ``df.groupby(...)`` operations,
which here returns a DataFrameGroupBy object.
"""
from eland import NDFrame
class DataFrameGroupBy(NDFrame):
def __init__(self,
df,
by):
super().__init__(
query_compiler=df._query_compiler.copy()
)
self._query_compiler.groupby_agg(by)

View File

@ -1,5 +1,6 @@
import copy import copy
from enum import Enum from enum import Enum
from io import StringIO
import pandas as pd import pandas as pd
import numpy as np import numpy as np
@ -278,6 +279,128 @@ class Operations:
return df_bins, df_weights return df_bins, df_weights
@staticmethod
def _map_pd_aggs_to_es_aggs(pd_aggs):
"""
Args:
pd_aggs - list of pandas aggs (e.g. ['mad', 'min', 'std'] etc.)
Returns:
ed_aggs - list of corresponding es_aggs (e.g. ['median_absolute_deviation', 'min', 'std'] etc.)
Pandas supports a lot of options here, and these options generally work on text and numerics in pandas.
Elasticsearch has metric aggs and terms aggs so will have different behaviour.
Pandas aggs that return columns (as opposed to transformed rows):
all
any
count
mad
max
mean
median
min
mode
quantile
rank
sem
skew
sum
std
var
nunique
"""
ed_aggs = []
for pd_agg in pd_aggs:
if pd_agg == 'count':
ed_aggs.append('count')
elif pd_agg == 'mad':
ed_aggs.append('median_absolute_deviation')
elif pd_agg == 'max':
ed_aggs.append('max')
elif pd_agg == 'mean':
ed_aggs.append('avg')
elif pd_agg == 'median':
ed_aggs.append(('percentiles', '50.0'))
elif pd_agg == 'min':
ed_aggs.append('min')
elif pd_agg == 'mode':
# We could do this via top term
raise NotImplementedError(pd_agg, " not currently implemented")
elif pd_agg == 'quantile':
# TODO
raise NotImplementedError(pd_agg, " not currently implemented")
elif pd_agg == 'rank':
# TODO
raise NotImplementedError(pd_agg, " not currently implemented")
elif pd_agg == 'sem':
# TODO
raise NotImplementedError(pd_agg, " not currently implemented")
elif pd_agg == 'sum':
ed_aggs.append('sum')
elif pd_agg == 'std':
ed_aggs.append(('extended_stats', 'std_deviation'))
elif pd_agg == 'var':
ed_aggs.append(('extended_stats', 'variance'))
else:
raise NotImplementedError(pd_agg, " not currently implemented")
# TODO - we can optimise extended_stats here as if we have 'count' and 'std' extended_stats would
# return both in one call
return ed_aggs
def aggs(self, query_compiler, pd_aggs):
query_params, post_processing = self._resolve_tasks()
size = self._size(query_params, post_processing)
if size is not None:
raise NotImplementedError("Can not count field matches if size is set {}".format(size))
columns = self.get_columns()
body = Query(query_params['query'])
# convert pandas aggs to ES equivalent
es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs)
for field in columns:
for es_agg in es_aggs:
# If we have multiple 'extended_stats' etc. here we simply NOOP on 2nd call
if isinstance(es_agg, tuple):
body.metric_aggs(es_agg[0] + '_' + field, es_agg[0], field)
else:
body.metric_aggs(es_agg + '_' + field, es_agg, field)
response = query_compiler._client.search(
index=query_compiler._index_pattern,
size=0,
body=body.to_search_body())
"""
Results are like (for 'sum', 'min')
AvgTicketPrice DistanceKilometers DistanceMiles FlightDelayMin
sum 8.204365e+06 9.261629e+07 5.754909e+07 618150
min 1.000205e+02 0.000000e+00 0.000000e+00 0
"""
results = {}
for field in columns:
values = list()
for es_agg in es_aggs:
if isinstance(es_agg, tuple):
values.append(response['aggregations'][es_agg[0] + '_' + field][es_agg[1]])
else:
values.append(response['aggregations'][es_agg + '_' + field]['value'])
results[field] = values
df = pd.DataFrame(data=results, index=pd_aggs)
return df
def describe(self, query_compiler): def describe(self, query_compiler):
query_params, post_processing = self._resolve_tasks() query_params, post_processing = self._resolve_tasks()
@ -566,6 +689,8 @@ class Operations:
query_params, post_processing = self._resolve_query_ids(task, query_params, post_processing) query_params, post_processing = self._resolve_query_ids(task, query_params, post_processing)
elif task[0] == 'query_terms': elif task[0] == 'query_terms':
query_params, post_processing = self._resolve_query_terms(task, query_params, post_processing) query_params, post_processing = self._resolve_query_terms(task, query_params, post_processing)
elif task[0] == 'boolean_filter':
query_params, post_processing = self._resolve_boolean_filter(task, query_params, post_processing)
else: # a lot of operations simply post-process the dataframe - put these straight through else: # a lot of operations simply post-process the dataframe - put these straight through
query_params, post_processing = self._resolve_post_processing_task(task, query_params, post_processing) query_params, post_processing = self._resolve_post_processing_task(task, query_params, post_processing)
@ -689,6 +814,14 @@ class Operations:
return query_params, post_processing return query_params, post_processing
def _resolve_boolean_filter(self, item, query_params, post_processing):
# task = ('boolean_filter', object)
boolean_filter = item[1]
query_params['query'].update_boolean_filter(boolean_filter)
return query_params, post_processing
def _resolve_post_processing_task(self, item, query_params, post_processing): def _resolve_post_processing_task(self, item, query_params, post_processing):
# Just do this in post-processing # Just do this in post-processing
if item[0] != 'columns': if item[0] != 'columns':
@ -722,3 +855,7 @@ class Operations:
buf.write("\tsort_params: {0}\n".format(sort_params)) buf.write("\tsort_params: {0}\n".format(sort_params))
buf.write("\tcolumns: {0}\n".format(columns)) buf.write("\tcolumns: {0}\n".format(columns))
buf.write("\tpost_processing: {0}\n".format(post_processing)) buf.write("\tpost_processing: {0}\n".format(post_processing))
def update_query(self, boolean_filter):
task = ('boolean_filter', boolean_filter)
self._tasks.append(task)

View File

@ -0,0 +1,3 @@
# -*- coding: UTF-8 -*-
from eland.operators.filter import *

148
eland/operators/filter.py Normal file
View File

@ -0,0 +1,148 @@
# Derived from pandasticsearch filters
# Es filter builder for BooleanCond
class BooleanFilter(object):
def __init__(self, *args):
self._filter = None
def __and__(self, x):
# Combine results
if isinstance(self, AndFilter):
self.subtree['must'].append(x.subtree)
return self
elif isinstance(x, AndFilter):
x.subtree['must'].append(self.subtree)
return x
return AndFilter(self, x)
def __or__(self, x):
# Combine results
if isinstance(self, OrFilter):
self.subtree['should'].append(x.subtree)
return self
elif isinstance(x, OrFilter):
x.subtree['should'].append(self.subtree)
return x
return OrFilter(self, x)
def __invert__(self):
return NotFilter(self)
def empty(self):
if self._filter is None:
return True
return False
@property
def subtree(self):
if 'bool' in self._filter:
return self._filter['bool']
else:
return self._filter
def build(self):
return self._filter
# Binary operator
class AndFilter(BooleanFilter):
def __init__(self, *args):
[isinstance(x, BooleanFilter) for x in args]
super(AndFilter, self).__init__()
self._filter = {'bool': {'must': [x.build() for x in args]}}
class OrFilter(BooleanFilter):
def __init__(self, *args):
[isinstance(x, BooleanFilter) for x in args]
super(OrFilter, self).__init__()
self._filter = {'bool': {'should': [x.build() for x in args]}}
class NotFilter(BooleanFilter):
def __init__(self, x):
assert isinstance(x, BooleanFilter)
super(NotFilter, self).__init__()
self._filter = {'bool': {'must_not': x.build()}}
# LeafBooleanFilter
class GreaterEqual(BooleanFilter):
def __init__(self, field, value):
super(GreaterEqual, self).__init__()
self._filter = {'range': {field: {'gte': value}}}
class Greater(BooleanFilter):
def __init__(self, field, value):
super(Greater, self).__init__()
self._filter = {'range': {field: {'gt': value}}}
class LessEqual(BooleanFilter):
def __init__(self, field, value):
super(LessEqual, self).__init__()
self._filter = {'range': {field: {'lte': value}}}
class Less(BooleanFilter):
def __init__(self, field, value):
super(Less, self).__init__()
self._filter = {'range': {field: {'lt': value}}}
class Equal(BooleanFilter):
def __init__(self, field, value):
super(Equal, self).__init__()
self._filter = {'term': {field: value}}
class IsIn(BooleanFilter):
def __init__(self, field, value):
super(IsIn, self).__init__()
assert isinstance(value, list)
if field == 'ids':
self._filter = {'ids': {'values': value}}
else:
self._filter = {'terms': {field: value}}
class Like(BooleanFilter):
def __init__(self, field, value):
super(Like, self).__init__()
self._filter = {'wildcard': {field: value}}
class Rlike(BooleanFilter):
def __init__(self, field, value):
super(Rlike, self).__init__()
self._filter = {'regexp': {field: value}}
class Startswith(BooleanFilter):
def __init__(self, field, value):
super(Startswith, self).__init__()
self._filter = {'prefix': {field: value}}
class IsNull(BooleanFilter):
def __init__(self, field):
super(IsNull, self).__init__()
self._filter = {'missing': {'field': field}}
class NotNull(BooleanFilter):
def __init__(self, field):
super(NotNull, self).__init__()
self._filter = {'exists': {'field': field}}
class ScriptFilter(BooleanFilter):
def __init__(self, inline, lang=None, params=None):
super(ScriptFilter, self).__init__()
script = {'inline': inline}
if lang is not None:
script['lang'] = lang
if params is not None:
script['params'] = params
self._filter = {'script': {'script': script}}

View File

@ -1,6 +1,8 @@
import warnings import warnings
from copy import deepcopy from copy import deepcopy
from eland.operators import BooleanFilter, NotNull, IsNull, IsIn
class Query: class Query:
""" """
@ -12,7 +14,7 @@ class Query:
def __init__(self, query=None): def __init__(self, query=None):
if query is None: if query is None:
self._query = self._query_template() self._query = BooleanFilter()
self._aggs = {} self._aggs = {}
else: else:
# Deep copy the incoming query so we can change it # Deep copy the incoming query so we can change it
@ -25,9 +27,15 @@ class Query:
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-exists-query.html https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-exists-query.html
""" """
if must: if must:
self._query['bool']['must'].append({'exists': {'field': field}}) if self._query.empty():
self._query = NotNull(field)
else: else:
self._query['bool']['must_not'].append({'exists': {'field': field}}) self._query = self._query & NotNull(field)
else:
if self._query.empty():
self._query = IsNull(field)
else:
self._query = self._query & IsNull(field)
def ids(self, items, must=True): def ids(self, items, must=True):
""" """
@ -35,9 +43,15 @@ class Query:
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-ids-query.html https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-ids-query.html
""" """
if must: if must:
self._query['bool']['must'].append({'ids': {'values': items}}) if self._query.empty():
self._query = IsIn('ids', items)
else: else:
self._query['bool']['must_not'].append({'ids': {'values': items}}) self._query = self._query & IsIn('ids', items)
else:
if self._query.empty():
self._query = ~(IsIn('ids', items))
else:
self._query = self._query & ~(IsIn('ids', items))
def terms(self, field, items, must=True): def terms(self, field, items, must=True):
""" """
@ -45,9 +59,15 @@ class Query:
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-terms-query.html https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-terms-query.html
""" """
if must: if must:
self._query['bool']['must'].append({'terms': {field: items}}) if self._query.empty():
self._query = IsIn(field, items)
else: else:
self._query['bool']['must_not'].append({'terms': {field: items}}) self._query = self._query & IsIn(field, items)
else:
if self._query.empty():
self._query = ~(IsIn(field, items))
else:
self._query = self._query & ~(IsIn(field, items))
def metric_aggs(self, name, func, field): def metric_aggs(self, name, func, field):
""" """
@ -94,25 +114,27 @@ class Query:
self._aggs[name] = agg self._aggs[name] = agg
def to_search_body(self): def to_search_body(self):
body = {"query": self._query, "aggs": self._aggs} if self._query.empty():
body = {"aggs": self._aggs}
else:
body = {"query": self._query.build(), "aggs": self._aggs}
return body return body
def to_count_body(self): def to_count_body(self):
if len(self._aggs) > 0: if len(self._aggs) > 0:
warnings.warn('Requesting count for agg query {}', self) warnings.warn('Requesting count for agg query {}', self)
body = {"query": self._query} if self._query.empty():
body = None
else:
body = {"query": self._query.build()}
return body return body
def update_boolean_filter(self, boolean_filter):
if self._query.empty():
self._query = boolean_filter
else:
self._query = self._query & boolean_filter
def __repr__(self): def __repr__(self):
return repr(self.to_search_body()) return repr(self.to_search_body())
@staticmethod
def _query_template():
template = {
"bool": {
"must": [],
"must_not": []
}
}
return deepcopy(template)

View File

@ -407,6 +407,9 @@ class ElandQueryCompiler(BaseQueryCompiler):
return result return result
def aggs(self, func):
return self._operations.aggs(self, func)
def count(self): def count(self):
return self._operations.count(self) return self._operations.count(self)
def mean(self): def mean(self):
@ -461,4 +464,11 @@ class ElandQueryCompiler(BaseQueryCompiler):
else: else:
pass pass
def _update_query(self, boolean_filter):
result = self.copy()
result._operations.update_query(boolean_filter)
return result

View File

@ -15,10 +15,12 @@ Based on NDFrame which underpins eland.1DataFrame
""" """
import pandas as pd
import warnings import warnings
import pandas as pd
from eland import NDFrame from eland import NDFrame
from eland.operators import Equal, Greater, ScriptFilter
class Series(NDFrame): class Series(NDFrame):
@ -151,3 +153,23 @@ class Series(NDFrame):
def _to_pandas(self): def _to_pandas(self):
return self._query_compiler.to_pandas()[self.name] return self._query_compiler.to_pandas()[self.name]
def __gt__(self, other):
if isinstance(other, Series):
# Need to use scripted query to compare to values
painless = "doc['{}'].value > doc['{}'].value".format(self.name, other.name)
return ScriptFilter(painless)
elif isinstance(other, (int, float)):
return Greater(field=self.name, value=other)
else:
raise NotImplementedError(other, type(other))
def __eq__(self, other):
if isinstance(other, Series):
# Need to use scripted query to compare to values
painless = "doc['{}'].value == doc['{}'].value".format(self.name, other.name)
return ScriptFilter(painless)
elif isinstance(other, (int, float)):
return Equal(field=self.name, value=other)
else:
raise NotImplementedError(other, type(other))

View File

@ -2,6 +2,7 @@
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from pandas.util.testing import (assert_almost_equal)
from eland.tests.common import TestData from eland.tests.common import TestData
@ -12,52 +13,18 @@ class TestDataFrameAggs(TestData):
pd_flights = self.pd_flights() pd_flights = self.pd_flights()
ed_flights = self.ed_flights() ed_flights = self.ed_flights()
pd_numerics = pd_flights.select_dtypes(include=[np.number])
print(pd_numerics.columns)
print(pd_numerics.agg('abs')) # all rows
print(pd_numerics.agg('all')) # columns True/False
print(pd_numerics.agg('any')) # columns True/False
print(pd_numerics.agg('corr')) # matrix col/col
print(pd_numerics.agg('count')) # columns count
print(pd_numerics.agg('cov')) # matrix col/col
print(pd_numerics.agg('cummax')) # all rows
print(pd_numerics.agg('cummin')) # all rows
print(pd_numerics.agg('cumprod')) # all rows
print(pd_numerics.agg('cumsum')) # all rows
print(pd_numerics.agg('describe')) # describe
print(pd_numerics.agg('diff')) # all rows
print(pd_numerics.agg('kurt')) # ?>
print(pd_numerics.agg('mad')) # col
print('MAX')
print(pd_numerics.agg('max')) # col
print(pd_numerics.agg('mean')) # col
print(pd_numerics.agg('median')) # col
print(pd_numerics.agg('min')) # col
print(pd_numerics.agg('mode')) # col
print(pd_numerics.agg('pct_change')) # all rows
print(pd_numerics.agg('prod')) # all rows
print(pd_numerics.agg('quantile')) # col
print(pd_numerics.agg('rank')) # col
print(pd_numerics.agg('round')) # all rows
print('SEM')
print(pd_numerics.agg('sem')) # col
print(pd_numerics.agg('skew')) # col
print(pd_numerics.agg('sum')) # col
print(pd_numerics.agg('std')) # col
print(pd_numerics.agg('var')) # col
print(pd_numerics.agg('nunique')) # col
print(pd_numerics.aggs(np.sqrt)) # all rows
return
pd_sum_min = pd_flights.select_dtypes(include=[np.number]).agg(['sum', 'min']) pd_sum_min = pd_flights.select_dtypes(include=[np.number]).agg(['sum', 'min'])
print(type(pd_sum_min))
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
print(pd_sum_min)
ed_sum_min = ed_flights.select_dtypes(include=[np.number]).agg(['sum', 'min']) ed_sum_min = ed_flights.select_dtypes(include=[np.number]).agg(['sum', 'min'])
print(type(ed_sum_min))
with pd.option_context('display.max_rows', None, 'display.max_columns', None): # Eland returns all float values for all metric aggs, pandas can return int
print(ed_sum_min) # TODO - investigate this more
pd_sum_min = pd_sum_min.astype('float64')
assert_almost_equal(pd_sum_min, ed_sum_min)
pd_sum_min_std = pd_flights.select_dtypes(include=[np.number]).agg(['sum', 'min', 'std'])
ed_sum_min_std = ed_flights.select_dtypes(include=[np.number]).agg(['sum', 'min', 'std'])
print(pd_sum_min_std.dtypes)
print(ed_sum_min_std.dtypes)
assert_almost_equal(pd_sum_min_std, ed_sum_min_std, check_less_precise=True)

View File

@ -0,0 +1,44 @@
# File called _pytest for PyCharm compatability
import numpy as np
import pandas as pd
import eland as ed
from eland.tests.common import ELASTICSEARCH_HOST
from eland.tests.common import TestData
class TestDataFrameDateTime(TestData):
def test_datetime_to_ms(self):
df = pd.DataFrame(data={'A': np.random.rand(3),
'B': 1,
'C': 'foo',
'D': pd.Timestamp('20190102'),
'E': [1.0, 2.0, 3.0],
'F': False,
'G': [1, 2, 3]},
index=['0', '1', '2'])
expected_mappings = {'mappings': {
'properties': {'A': {'type': 'double'},
'B': {'type': 'long'},
'C': {'type': 'keyword'},
'D': {'type': 'date'},
'E': {'type': 'double'},
'F': {'type': 'boolean'},
'G': {'type': 'long'}}}}
mappings = ed.Mappings._generate_es_mappings(df)
assert expected_mappings == mappings
# Now create index
index_name = 'eland_test_generate_es_mappings'
ed.pandas_to_es(df, ELASTICSEARCH_HOST, index_name, if_exists="replace", refresh=True)
ed_df = ed.DataFrame(ELASTICSEARCH_HOST, index_name)
ed_df_head = ed_df.head()
# assert_frame_equal(df, ed_df_head)

View File

@ -0,0 +1,48 @@
# File called _pytest for PyCharm compatability
import pandas as pd
import eland as ed
from eland.tests.common import ELASTICSEARCH_HOST
from eland.tests.common import TestData
from eland.tests.common import assert_pandas_eland_frame_equal
class TestDataFrameQuery(TestData):
def test_query1(self):
# Examples from:
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.query.html
pd_df = pd.DataFrame({'A': range(1, 6), 'B': range(10, 0, -2), 'C': range(10, 5, -1)},
index=['0', '1', '2', '3', '4'])
# Now create index
index_name = 'eland_test_query1'
ed.pandas_to_es(pd_df, ELASTICSEARCH_HOST, index_name, if_exists="replace", refresh=True)
ed_df = ed.DataFrame(ELASTICSEARCH_HOST, index_name)
assert_pandas_eland_frame_equal(pd_df, ed_df)
pd_df.info()
ed_df.info()
pd_q1 = pd_df[pd_df.A > 2]
pd_q2 = pd_df[pd_df.A > pd_df.B]
pd_q3 = pd_df[pd_df.B == pd_df.C]
ed_q1 = ed_df[ed_df.A > 2]
ed_q2 = ed_df[ed_df.A > ed_df.B]
ed_q3 = ed_df[ed_df.B == ed_df.C]
assert_pandas_eland_frame_equal(pd_q1, ed_q1)
assert_pandas_eland_frame_equal(pd_q2, ed_q2)
assert_pandas_eland_frame_equal(pd_q3, ed_q3)
def test_query2(self):
ed_flights = self.ed_flights()
pd_flights = self.pd_flights()
cancelled = pd_flights[pd_flights.Cancelled == True]
print(cancelled.groupby(['DestWeather']).count())

View File

View File

@ -0,0 +1,177 @@
# -*- coding: UTF-8 -*-
from eland.operators import *
class TestOperators():
def test_leaf_boolean_filter(self):
assert GreaterEqual('a', 2).build() == {"range": {"a": {"gte": 2}}}
assert LessEqual('a', 2).build() == {"range": {"a": {"lte": 2}}}
assert Less('a', 2).build() == {"range": {"a": {"lt": 2}}}
assert Equal('a', 2).build() == {"term": {"a": 2}}
exp = Equal('a', 2)
assert (~exp).build()['bool'], {"must_not": {"term": {"a": 2}}}
assert Greater('a', 2).build() == {"range": {"a": {"gt": 2}}}
assert IsIn('a', [1, 2, 3]).build() == {'terms': {'a': [1, 2, 3]}}
assert Like('a', 'a*b').build() == {'wildcard': {'a': 'a*b'}}
assert Rlike('a', 'a*b').build() == {'regexp': {'a': 'a*b'}}
assert Startswith('a', 'jj').build() == {'prefix': {'a': 'jj'}}
assert IsNull('a').build() == {'missing': {'field': 'a'}}
assert NotNull('a').build() == {'exists': {'field': 'a'}}
assert ScriptFilter('doc["num1"].value > params.param1', params={'param1': 5}).build() == {
'script': {'script': {'inline': 'doc["num1"].value > params.param1', 'params': {'param1': 5}}}}
assert IsIn('ids', [1, 2, 3]).build() == {'ids': {'values': [1, 2, 3]}}
def test_and_none(self):
exp = None
exp = exp & Less('b', 3)
print(exp.build())
def test_and_filter1(self):
exp = GreaterEqual('a', 2) & Less('b', 3)
assert exp.build() == {'bool': {'must': [{'range': {'a': {'gte': 2}}}, {'range': {'b': {'lt': 3}}}]}}
def test_and_filter2(self):
exp = GreaterEqual('a', 2) & Less('b', 3) & Equal('c', 4)
assert exp.build() == \
{
'bool': {
'must': [
{'range': {'a': {'gte': 2}}},
{'range': {'b': {'lt': 3}}},
{'term': {'c': 4}}
]
}
}
def test_and_filter3(self):
exp = GreaterEqual('a', 2) & (Less('b', 3) & Equal('c', 4))
assert exp.build() == \
{
'bool': {
'must': [
{'range': {'b': {'lt': 3}}},
{'term': {'c': 4}},
{'range': {'a': {'gte': 2}}}
]
}
}
def test_or_filter1(self):
exp = GreaterEqual('a', 2) | Less('b', 3)
assert exp.build() == \
{
'bool': {
'should': [
{'range': {'a': {'gte': 2}}},
{'range': {'b': {'lt': 3}}}
]
}
}
def test_or_filter2(self):
exp = GreaterEqual('a', 2) | Less('b', 3) | Equal('c', 4)
assert exp.build() == \
{
'bool': {
'should': [
{'range': {'a': {'gte': 2}}},
{'range': {'b': {'lt': 3}}},
{'term': {'c': 4}}
]
}
}
def test_or_filter3(self):
exp = GreaterEqual('a', 2) | (Less('b', 3) | Equal('c', 4))
assert exp.build() == \
{
'bool': {
'should': [
{'range': {'b': {'lt': 3}}},
{'term': {'c': 4}},
{'range': {'a': {'gte': 2}}}
]
}
}
def test_not_filter(self):
exp = ~GreaterEqual('a', 2)
assert exp.build() == \
{
'bool': {
'must_not': {'range': {'a': {'gte': 2}}}
}
}
def test_not_not_filter(self):
exp = ~~GreaterEqual('a', 2)
assert exp.build() == \
{
'bool': {
'must_not': {
'bool': {
'must_not': {'range': {'a': {'gte': 2}}}
}
}
}
}
def test_not_and_filter(self):
exp = ~(GreaterEqual('a', 2) & Less('b', 3))
assert exp.build() == \
{
'bool': {
'must_not': {
'bool': {
'must': [
{'range': {'a': {'gte': 2}}},
{'range': {'b': {'lt': 3}}}
]
}
}
}
}
def test_and_or_filter(self):
exp = GreaterEqual('a', 2) & (Less('b', 3) | Equal('c', 4))
assert exp.build() == \
{
'bool': {
'must': [
{'range': {'a': {'gte': 2}}},
{
'bool': {
'should': [
{'range': {'b': {'lt': 3}}},
{'term': {'c': 4}}
]
}
}
]
}
}
def test_and_not_or_filter(self):
exp = GreaterEqual('a', 2) & ~(Less('b', 3) | Equal('c', 4))
assert exp.build() == \
{
'bool': {
'must': [
{'range': {'a': {'gte': 2}}},
{
'bool': {
'must_not': {
'bool': {
'should': [
{'range': {'b': {'lt': 3}}},
{'term': {'c': 4}}
]
}
}
}
}
]
}
}