Merge pull request #64 from stevedodson/feature/arithmetics

Series arithmetics, series metric aggs, series docs
This commit is contained in:
stevedodson 2019-11-25 16:17:12 +00:00 committed by GitHub
commit 5ce315f55c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
62 changed files with 2294 additions and 226 deletions

View File

@ -0,0 +1,6 @@
eland.DataFrame.to_numpy
========================
.. currentmodule:: eland
.. automethod:: DataFrame.to_numpy

View File

@ -0,0 +1,6 @@
eland.DataFrame.values
======================
.. currentmodule:: eland
.. autoattribute:: DataFrame.values

View File

@ -0,0 +1,6 @@
eland.Series.add
================
.. currentmodule:: eland
.. automethod:: Series.add

View File

@ -0,0 +1,6 @@
eland.Series.describe
=====================
.. currentmodule:: eland
.. automethod:: Series.describe

View File

@ -0,0 +1,6 @@
eland.Series.div
================
.. currentmodule:: eland
.. automethod:: Series.div

View File

@ -0,0 +1,6 @@
eland.Series.empty
==================
.. currentmodule:: eland
.. autoattribute:: Series.empty

View File

@ -0,0 +1,6 @@
eland.Series.floordiv
=====================
.. currentmodule:: eland
.. automethod:: Series.floordiv

View File

@ -0,0 +1,6 @@
eland.Series.head
=================
.. currentmodule:: eland
.. automethod:: Series.head

View File

@ -0,0 +1,6 @@
eland.Series.index
==================
.. currentmodule:: eland
.. autoattribute:: Series.index

View File

@ -0,0 +1,6 @@
eland.Series.info_es
====================
.. currentmodule:: eland
.. automethod:: Series.info_es

View File

@ -0,0 +1,6 @@
eland.Series.max
================
.. currentmodule:: eland
.. automethod:: Series.max

View File

@ -0,0 +1,6 @@
eland.Series.mean
=================
.. currentmodule:: eland
.. automethod:: Series.mean

View File

@ -0,0 +1,6 @@
eland.Series.min
================
.. currentmodule:: eland
.. automethod:: Series.min

View File

@ -0,0 +1,6 @@
eland.Series.mod
================
.. currentmodule:: eland
.. automethod:: Series.mod

View File

@ -0,0 +1,6 @@
eland.Series.mul
================
.. currentmodule:: eland
.. automethod:: Series.mul

View File

@ -0,0 +1,6 @@
eland.Series.name
=================
.. currentmodule:: eland
.. autoattribute:: Series.name

View File

@ -0,0 +1,6 @@
eland.Series.nunique
====================
.. currentmodule:: eland
.. automethod:: Series.nunique

View File

@ -0,0 +1,6 @@
eland.Series.pow
================
.. currentmodule:: eland
.. automethod:: Series.pow

View File

@ -0,0 +1,6 @@
eland.Series.radd
=================
.. currentmodule:: eland
.. automethod:: Series.radd

View File

@ -0,0 +1,6 @@
eland.Series.rdiv
=================
.. currentmodule:: eland
.. automethod:: Series.rdiv

View File

@ -0,0 +1,6 @@
eland.Series.rename
===================
.. currentmodule:: eland
.. automethod:: Series.rename

View File

@ -0,0 +1,6 @@
eland.Series.rfloordiv
======================
.. currentmodule:: eland
.. automethod:: Series.rfloordiv

View File

@ -0,0 +1,6 @@
eland.Series.rmod
=================
.. currentmodule:: eland
.. automethod:: Series.rmod

View File

@ -0,0 +1,6 @@
eland.Series.rmul
=================
.. currentmodule:: eland
.. automethod:: Series.rmul

View File

@ -0,0 +1,6 @@
eland.Series.rpow
=================
.. currentmodule:: eland
.. automethod:: Series.rpow

View File

@ -0,0 +1,6 @@
eland.Series
============
.. currentmodule:: eland
.. autoclass:: Series

View File

@ -0,0 +1,6 @@
eland.Series.rsub
=================
.. currentmodule:: eland
.. automethod:: Series.rsub

View File

@ -0,0 +1,6 @@
eland.Series.rtruediv
=====================
.. currentmodule:: eland
.. automethod:: Series.rtruediv

View File

@ -0,0 +1,6 @@
eland.Series.shape
==================
.. currentmodule:: eland
.. autoattribute:: Series.shape

View File

@ -0,0 +1,6 @@
eland.Series.sub
================
.. currentmodule:: eland
.. automethod:: Series.sub

View File

@ -0,0 +1,6 @@
eland.Series.sum
================
.. currentmodule:: eland
.. automethod:: Series.sum

View File

@ -0,0 +1,6 @@
eland.Series.tail
=================
.. currentmodule:: eland
.. automethod:: Series.tail

View File

@ -0,0 +1,6 @@
eland.Series.to_numpy
=====================
.. currentmodule:: eland
.. automethod:: Series.to_numpy

View File

@ -0,0 +1,6 @@
eland.Series.to_string
======================
.. currentmodule:: eland
.. automethod:: Series.to_string

View File

@ -0,0 +1,6 @@
eland.Series.truediv
====================
.. currentmodule:: eland
.. automethod:: Series.truediv

View File

@ -1,5 +1,5 @@
eland.Series.value_counts
===========================
=========================
.. currentmodule:: eland

View File

@ -23,6 +23,7 @@ Attributes and underlying data
DataFrame.columns
DataFrame.dtypes
DataFrame.select_dtypes
DataFrame.values
DataFrame.empty
DataFrame.shape
@ -81,6 +82,7 @@ Serialization / IO / conversion
:toctree: api/
DataFrame.info
DataFrame.to_numpy
DataFrame.to_csv
DataFrame.to_html
DataFrame.to_string
@ -91,5 +93,3 @@ Elasticsearch utilities
:toctree: api/
DataFrame.info_es

View File

@ -5,9 +5,86 @@ Series
=========
.. currentmodule:: eland
Constructor
~~~~~~~~~~~
.. autosummary::
:toctree: api/
Series
Attributes and underlying data
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
**Axes**
.. autosummary::
:toctree: api/
Series.index
Series.shape
Series.name
Series.empty
Indexing, iteration
~~~~~~~~~~~~~~~~~~~
.. autosummary::
:toctree: api/
Series.head
Series.tail
Binary operator functions
~~~~~~~~~~~~~~~~~~~~~~~~~
.. autosummary::
:toctree: api/
Series.add
Series.sub
Series.mul
Series.div
Series.truediv
Series.floordiv
Series.mod
Series.pow
Series.radd
Series.rsub
Series.rmul
Series.rdiv
Series.rtruediv
Series.rfloordiv
Series.rmod
Series.rpow
Computations / descriptive stats
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autosummary::
:toctree: api/
Series.describe
Series.max
Series.mean
Series.min
Series.sum
Series.nunique
Series.value_counts
Reindexing / selection / label manipulation
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autosummary::
:toctree: api/
Series.rename
Serialization / IO / conversion
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autosummary::
:toctree: api/
Series.to_string
Series.to_numpy
Elasticsearch utilities
~~~~~~~~~~~~~~~~~~~~~~~
.. autosummary::
:toctree: api/
Series.info_es

View File

@ -1,5 +1,6 @@
from __future__ import absolute_import
from eland.common import *
from eland.client import *
from eland.filter import *
from eland.index import *

8
eland/common.py Normal file
View File

@ -0,0 +1,8 @@
# Default number of rows displayed (different to pandas where ALL could be displayed)
DEFAULT_NUM_ROWS_DISPLAYED = 60
def docstring_parameter(*sub):
def dec(obj):
obj.__doc__ = obj.__doc__.format(*sub)
return obj
return dec

View File

@ -18,15 +18,7 @@ import eland.plotting as gfx
from eland import NDFrame
from eland import Series
from eland.filter import BooleanFilter, ScriptFilter
# Default number of rows displayed (different to pandas where ALL could be displayed)
DEFAULT_NUM_ROWS_DISPLAYED = 60
def docstring_parameter(*sub):
def dec(obj):
obj.__doc__ = obj.__doc__.format(*sub)
return obj
return dec
from eland.common import DEFAULT_NUM_ROWS_DISPLAYED, docstring_parameter
class DataFrame(NDFrame):
@ -98,7 +90,6 @@ class DataFrame(NDFrame):
<BLANKLINE>
[5 rows x 2 columns]
"""
def __init__(self,
client=None,
index_pattern=None,
@ -389,10 +380,11 @@ class DataFrame(NDFrame):
<BLANKLINE>
[27 rows x 5 columns]
Operations:
tasks: [('boolean_filter', {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}), ('columns', ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']), ('tail', ('_doc', 5))]
tasks: [('boolean_filter', {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}), ('field_names', ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']), ('tail', ('_doc', 5))]
size: 5
sort_params: _doc:desc
columns: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']
_source: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']
body: {'query': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}, 'aggs': {}}
post_processing: ['sort_index']
<BLANKLINE>
"""
@ -527,7 +519,12 @@ class DataFrame(NDFrame):
else:
_verbose_repr()
# pandas 0.25.1 uses get_dtype_counts() here. This
# returns a Series with strings as the index NOT dtypes.
# Therefore, to get consistent ordering we need to
# align types with pandas method.
counts = self.dtypes.value_counts()
counts.index = counts.index.astype(str)
dtypes = ['{k}({kk:d})'.format(k=k[0], kk=k[1]) for k
in sorted(counts.items())]
lines.append('dtypes: {types}'.format(types=', '.join(dtypes)))
@ -586,7 +583,7 @@ class DataFrame(NDFrame):
max_rows = 1
# Create a slightly bigger dataframe than display
df = self._build_repr_df(max_rows + 1, max_cols)
df = self._build_repr(max_rows + 1)
if buf is not None:
_buf = _expand_user(_stringify_path(buf))
@ -651,7 +648,7 @@ class DataFrame(NDFrame):
max_rows = 1
# Create a slightly bigger dataframe than display
df = self._build_repr_df(max_rows + 1, max_cols)
df = self._build_repr(max_rows + 1)
if buf is not None:
_buf = _expand_user(_stringify_path(buf))
@ -1064,3 +1061,66 @@ class DataFrame(NDFrame):
return self._getitem(key)
else:
return default
@property
def values(self):
"""
Not implemented.
In pandas this returns a Numpy representation of the DataFrame. This would involve scan/scrolling the
entire index.
If this is required, call ``ed.eland_to_pandas(ed_df).values``, *but beware this will scan/scroll the entire
Elasticsearch index(s) into memory.*
See Also
--------
:pandas_api_docs:`pandas.DataFrame.values`
eland_to_pandas
to_numpy
"""
self.to_numpy()
def to_numpy(self):
"""
Not implemented.
In pandas this returns a Numpy representation of the DataFrame. This would involve scan/scrolling the
entire index.
If this is required, call ``ed.eland_to_pandas(ed_df).values``, *but beware this will scan/scroll the entire
Elasticsearch index(s) into memory.*
See Also
--------
:pandas_api_docs:`pandas.DataFrame.to_numpy`
eland_to_pandas
Examples
--------
>>> ed_df = ed.DataFrame('localhost', 'flights', columns=['AvgTicketPrice', 'Carrier']).head(5)
>>> pd_df = ed.eland_to_pandas(ed_df)
>>> print("type(ed_df)={0}\\ntype(pd_df)={1}".format(type(ed_df), type(pd_df)))
type(ed_df)=<class 'eland.dataframe.DataFrame'>
type(pd_df)=<class 'pandas.core.frame.DataFrame'>
>>> ed_df
AvgTicketPrice Carrier
0 841.265642 Kibana Airlines
1 882.982662 Logstash Airways
2 190.636904 Logstash Airways
3 181.694216 Kibana Airlines
4 730.041778 Kibana Airlines
<BLANKLINE>
[5 rows x 2 columns]
>>> pd_df.values
array([[841.2656419677076, 'Kibana Airlines'],
[882.9826615595518, 'Logstash Airways'],
[190.6369038508356, 'Logstash Airways'],
[181.69421554118, 'Kibana Airlines'],
[730.041778346198, 'Kibana Airlines']], dtype=object)
"""
raise AttributeError(
"This method would scan/scroll the entire Elasticsearch index(s) into memory. "
"If this is explicitly required, and there is sufficient memory, call `ed.eland_to_pandas(ed_df).values`"
)

View File

@ -1,5 +1,6 @@
import warnings
import numpy as np
import pandas as pd
from pandas.core.dtypes.common import (is_float_dtype, is_bool_dtype, is_integer_dtype, is_datetime_or_timedelta_dtype,
is_string_dtype)
@ -182,7 +183,7 @@ class Mappings:
"""
all_fields_caps_fields = all_fields_caps['fields']
columns = ['_source', 'es_dtype', 'pd_dtype', 'searchable', 'aggregatable']
field_names = ['_source', 'es_dtype', 'pd_dtype', 'searchable', 'aggregatable']
capability_matrix = {}
for field, field_caps in all_fields_caps_fields.items():
@ -208,7 +209,7 @@ class Mappings:
format(field, vv['non_searchable_indices']),
UserWarning)
capability_matrix_df = pd.DataFrame.from_dict(capability_matrix, orient='index', columns=columns)
capability_matrix_df = pd.DataFrame.from_dict(capability_matrix, orient='index', columns=field_names)
return capability_matrix_df.sort_index()
@ -325,14 +326,14 @@ class Mappings:
mappings = {}
mappings['properties'] = {}
for column_name, dtype in dataframe.dtypes.iteritems():
if geo_points is not None and column_name in geo_points:
for field_name_name, dtype in dataframe.dtypes.iteritems():
if geo_points is not None and field_name_name in geo_points:
es_dtype = 'geo_point'
else:
es_dtype = Mappings._pd_dtype_to_es_dtype(dtype)
mappings['properties'][column_name] = {}
mappings['properties'][column_name]['type'] = es_dtype
mappings['properties'][field_name_name] = {}
mappings['properties'][field_name_name]['type'] = es_dtype
return {"mappings": mappings}
@ -407,12 +408,12 @@ class Mappings:
return is_source_field
def aggregatable_columns(self, columns=None):
def aggregatable_field_names(self, field_names=None):
"""
Return a dict of aggregatable columns from all columns or columns list
Return a dict of aggregatable field_names from all field_names or field_names list
{'customer_full_name': 'customer_full_name.keyword', ...}
Logic here is that column names are '_source' fields and keyword fields
Logic here is that field_name names are '_source' fields and keyword fields
may be nested beneath the field. E.g.
customer_full_name: text
customer_full_name.keyword: keyword
@ -424,28 +425,28 @@ class Mappings:
dict
e.g. {'customer_full_name': 'customer_full_name.keyword', ...}
"""
if columns is None:
columns = self.source_fields()
if field_names is None:
field_names = self.source_fields()
aggregatables = {}
for column in columns:
capabilities = self.field_capabilities(column)
for field_name in field_names:
capabilities = self.field_capabilities(field_name)
if capabilities['aggregatable']:
aggregatables[column] = column
aggregatables[field_name] = field_name
else:
# Try 'column.keyword'
column_keyword = column + '.keyword'
capabilities = self.field_capabilities(column_keyword)
# Try 'field_name.keyword'
field_name_keyword = field_name + '.keyword'
capabilities = self.field_capabilities(field_name_keyword)
if capabilities['aggregatable']:
aggregatables[column_keyword] = column
aggregatables[field_name_keyword] = field_name
else:
# Aggregations not supported for this field
raise ValueError("Aggregations not supported for ", column)
raise ValueError("Aggregations not supported for ", field_name)
return aggregatables
def numeric_source_fields(self, columns, include_bool=True):
def numeric_source_fields(self, field_names, include_bool=True):
"""
Returns
-------
@ -461,10 +462,10 @@ class Mappings:
df = self._mappings_capabilities[(self._mappings_capabilities._source == True) &
((self._mappings_capabilities.pd_dtype == 'int64') |
(self._mappings_capabilities.pd_dtype == 'float64'))]
# if columns exists, filter index with columns
if columns is not None:
# reindex adds NA for non-existing columns (non-numeric), so drop these after reindex
df = df.reindex(columns)
# if field_names exists, filter index with field_names
if field_names is not None:
# reindex adds NA for non-existing field_names (non-numeric), so drop these after reindex
df = df.reindex(field_names)
df.dropna(inplace=True)
# return as list
@ -488,18 +489,19 @@ class Mappings:
"""
return len(self.source_fields())
def dtypes(self, columns=None):
def dtypes(self, field_names=None):
"""
Returns
-------
dtypes: pd.Series
Source field name + pd_dtype
Source field name + pd_dtype as np.dtype
"""
if columns is not None:
if field_names is not None:
return pd.Series(
{key: self._source_field_pd_dtypes[key] for key in columns})
{key: np.dtype(self._source_field_pd_dtypes[key]) for key in field_names})
return pd.Series(self._source_field_pd_dtypes)
return pd.Series(
{key: np.dtype(value) for key, value in self._source_field_pd_dtypes.items()})
def info_es(self, buf):
buf.write("Mappings:\n")

View File

@ -31,7 +31,6 @@ from pandas.util._validators import validate_bool_kwarg
from eland import ElandQueryCompiler
class NDFrame:
def __init__(self,
@ -49,9 +48,7 @@ class NDFrame:
A reference to a Elasticsearch python client
"""
if query_compiler is None:
query_compiler = ElandQueryCompiler(client=client,
index_pattern=index_pattern,
columns=columns,
query_compiler = ElandQueryCompiler(client=client, index_pattern=index_pattern, field_names=columns,
index_field=index_field)
self._query_compiler = query_compiler
@ -67,6 +64,7 @@ class NDFrame:
See Also
--------
:pandas_api_docs:`pandas.DataFrame.index`
:pandas_api_docs:`pandas.Series.index`
Examples
--------
@ -74,6 +72,10 @@ class NDFrame:
>>> assert isinstance(df.index, ed.Index)
>>> df.index.index_field
'_id'
>>> s = df['Carrier']
>>> assert isinstance(s.index, ed.Index)
>>> s.index.index_field
'_id'
"""
return self._query_compiler.index
@ -106,9 +108,8 @@ class NDFrame:
"""
return self._query_compiler.dtypes
def _build_repr_df(self, num_rows, num_cols):
# Overriden version of BasePandasDataset._build_repr_df
# to avoid issues with concat
def _build_repr(self, num_rows):
# self could be Series or DataFrame
if len(self.index) <= num_rows:
return self._to_pandas()

View File

@ -1,6 +1,7 @@
import copy
from enum import Enum
import numpy as np
import pandas as pd
from eland import Index
@ -12,7 +13,7 @@ class Operations:
A collector of the queries and selectors we apply to queries to return the appropriate results.
For example,
- a list of the columns in the DataFrame (a subset of columns in the index)
- a list of the field_names in the DataFrame (a subset of field_names in the index)
- a size limit on the results (e.g. for head(n=5))
- a query to filter the results (e.g. df.A > 10)
@ -66,26 +67,34 @@ class Operations:
task = ('tail', (index.sort_field, n))
self._tasks.append(task)
def set_columns(self, columns):
# Setting columns at different phases of the task list may result in different
# operations. So instead of setting columns once, set when it happens in call chain
if type(columns) is not list:
columns = list(columns)
def arithmetic_op_fields(self, field_name, op_name, left_field, right_field):
task = ('arithmetic_op_fields', (field_name, (op_name, (left_field, right_field))))
# Set this as a column we want to retrieve
self.set_field_names([field_name])
# TODO - column renaming
# TODO - validate we are setting columns to a subset of last columns?
task = ('columns', columns)
self._tasks.append(task)
# Iterate backwards through task list looking for last 'columns' task
def set_field_names(self, field_names):
# Setting field_names at different phases of the task list may result in different
# operations. So instead of setting field_names once, set when it happens in call chain
if not isinstance(field_names, list):
field_names = list(field_names)
# TODO - field_name renaming
# TODO - validate we are setting field_names to a subset of last field_names?
task = ('field_names', field_names)
self._tasks.append(task)
# Iterate backwards through task list looking for last 'field_names' task
for task in reversed(self._tasks):
if task[0] == 'columns':
if task[0] == 'field_names':
return task[1]
return None
def get_columns(self):
# Iterate backwards through task list looking for last 'columns' task
def get_field_names(self):
# Iterate backwards through task list looking for last 'field_names' task
for task in reversed(self._tasks):
if task[0] == 'columns':
if task[0] == 'field_names':
return task[1]
return None
@ -103,8 +112,8 @@ class Operations:
"not supported {0} {1}"
.format(query_params, post_processing))
# Only return requested columns
fields = query_compiler.columns
# Only return requested field_names
fields = query_compiler.field_names
counts = {}
for field in fields:
@ -143,13 +152,13 @@ class Operations:
Parameters
----------
field_types: str, default None
if `aggregatable` use only columns whose fields in elasticseach are aggregatable.
if `aggregatable` use only field_names whose fields in elasticseach are aggregatable.
If `None`, use only numeric fields.
Returns
-------
pandas.Series
Series containing results of `func` applied to the column(s)
Series containing results of `func` applied to the field_name(s)
"""
query_params, post_processing = self._resolve_tasks()
@ -157,17 +166,17 @@ class Operations:
if size is not None:
raise NotImplementedError("Can not count field matches if size is set {}".format(size))
columns = self.get_columns()
field_names = self.get_field_names()
body = Query(query_params['query'])
# some metrics aggs (including cardinality) work on all aggregatable fields
# therefore we include an optional all parameter on operations
# that call _metric_aggs
if field_types=='aggregatable':
source_fields = query_compiler._mappings.aggregatable_columns(columns)
if field_types == 'aggregatable':
source_fields = query_compiler._mappings.aggregatable_field_names(field_names)
else:
source_fields = query_compiler._mappings.numeric_source_fields(columns)
source_fields = query_compiler._mappings.numeric_source_fields(field_names)
for field in source_fields:
body.metric_aggs(field, func, field)
@ -185,7 +194,7 @@ class Operations:
# }
results = {}
if field_types=='aggregatable':
if field_types == 'aggregatable':
for key, value in source_fields.items():
results[value] = response['aggregations'][key]['value']
else:
@ -209,7 +218,7 @@ class Operations:
Returns
-------
pandas.Series
Series containing results of `func` applied to the column(s)
Series containing results of `func` applied to the field_name(s)
"""
query_params, post_processing = self._resolve_tasks()
@ -217,14 +226,14 @@ class Operations:
if size is not None:
raise NotImplementedError("Can not count field matches if size is set {}".format(size))
columns = self.get_columns()
field_names = self.get_field_names()
# Get just aggregatable columns
aggregatable_columns = query_compiler._mappings.aggregatable_columns(columns)
# Get just aggregatable field_names
aggregatable_field_names = query_compiler._mappings.aggregatable_field_names(field_names)
body = Query(query_params['query'])
for field in aggregatable_columns.keys():
for field in aggregatable_field_names.keys():
body.terms_aggs(field, func, field, es_size=es_size)
response = query_compiler._client.search(
@ -234,12 +243,12 @@ class Operations:
results = {}
for key, value in aggregatable_columns.items():
for bucket in response['aggregations'][columns[0]]['buckets']:
for key, value in aggregatable_field_names.items():
for bucket in response['aggregations'][field_names[0]]['buckets']:
results[bucket['key']] = bucket['doc_count']
try:
name = columns[0]
name = field_names[0]
except IndexError:
name = None
@ -248,16 +257,16 @@ class Operations:
return s
def _hist_aggs(self, query_compiler, num_bins):
# Get histogram bins and weights for numeric columns
# Get histogram bins and weights for numeric field_names
query_params, post_processing = self._resolve_tasks()
size = self._size(query_params, post_processing)
if size is not None:
raise NotImplementedError("Can not count field matches if size is set {}".format(size))
columns = self.get_columns()
field_names = self.get_field_names()
numeric_source_fields = query_compiler._mappings.numeric_source_fields(columns)
numeric_source_fields = query_compiler._mappings.numeric_source_fields(field_names)
body = Query(query_params['query'])
@ -331,7 +340,7 @@ class Operations:
Pandas supports a lot of options here, and these options generally work on text and numerics in pandas.
Elasticsearch has metric aggs and terms aggs so will have different behaviour.
Pandas aggs that return columns (as opposed to transformed rows):
Pandas aggs that return field_names (as opposed to transformed rows):
all
any
@ -398,14 +407,14 @@ class Operations:
if size is not None:
raise NotImplementedError("Can not count field matches if size is set {}".format(size))
columns = self.get_columns()
field_names = self.get_field_names()
body = Query(query_params['query'])
# convert pandas aggs to ES equivalent
es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs)
for field in columns:
for field in field_names:
for es_agg in es_aggs:
# If we have multiple 'extended_stats' etc. here we simply NOOP on 2nd call
if isinstance(es_agg, tuple):
@ -427,7 +436,7 @@ class Operations:
"""
results = {}
for field in columns:
for field in field_names:
values = list()
for es_agg in es_aggs:
if isinstance(es_agg, tuple):
@ -448,9 +457,9 @@ class Operations:
if size is not None:
raise NotImplementedError("Can not count field matches if size is set {}".format(size))
columns = self.get_columns()
field_names = self.get_field_names()
numeric_source_fields = query_compiler._mappings.numeric_source_fields(columns, include_bool=False)
numeric_source_fields = query_compiler._mappings.numeric_source_fields(field_names, include_bool=False)
# for each field we compute:
# count, mean, std, min, 25%, 50%, 75%, max
@ -535,10 +544,15 @@ class Operations:
size, sort_params = Operations._query_params_to_size_and_sort(query_params)
body = Query(query_params['query'])
script_fields = query_params['query_script_fields']
query = Query(query_params['query'])
# Only return requested columns
columns = self.get_columns()
body = query.to_search_body()
if script_fields is not None:
body['script_fields'] = script_fields
# Only return requested field_names
field_names = self.get_field_names()
es_results = None
@ -547,18 +561,30 @@ class Operations:
is_scan = False
if size is not None and size <= 10000:
if size > 0:
try:
es_results = query_compiler._client.search(
index=query_compiler._index_pattern,
size=size,
sort=sort_params,
body=body.to_search_body(),
_source=columns)
body=body,
_source=field_names)
except:
# Catch ES error and print debug (currently to stdout)
error = {
'index': query_compiler._index_pattern,
'size': size,
'sort': sort_params,
'body': body,
'_source': field_names
}
print("Elasticsearch error:", error)
raise
else:
is_scan = True
es_results = query_compiler._client.scan(
index=query_compiler._index_pattern,
query=body.to_search_body(),
_source=columns)
query=body,
_source=field_names)
# create post sort
if sort_params is not None:
post_processing.append(self._sort_params_to_postprocessing(sort_params))
@ -575,9 +601,9 @@ class Operations:
df = self._apply_df_post_processing(df, post_processing)
collector.collect(df)
def iloc(self, index, columns):
# index and columns are indexers
task = ('iloc', (index, columns))
def iloc(self, index, field_names):
# index and field_names are indexers
task = ('iloc', (index, field_names))
self._tasks.append(task)
def index_count(self, query_compiler, field):
@ -691,13 +717,13 @@ class Operations:
df = df.sort_values(sort_field, False)
elif action[0] == 'iloc':
index_indexer = action[1][0]
column_indexer = action[1][1]
field_name_indexer = action[1][1]
if index_indexer is None:
index_indexer = slice(None)
if column_indexer is None:
column_indexer = slice(None)
df = df.iloc[index_indexer, column_indexer]
# columns could be in here (and we ignore it)
if field_name_indexer is None:
field_name_indexer = slice(None)
df = df.iloc[index_indexer, field_name_indexer]
# field_names could be in here (and we ignore it)
return df
@ -710,6 +736,7 @@ class Operations:
"query_sort_order": None,
"query_size": None,
"query_fields": None,
"query_script_fields": None,
"query": Query()}
post_processing = []
@ -727,6 +754,8 @@ class Operations:
query_params, post_processing = self._resolve_query_terms(task, query_params, post_processing)
elif task[0] == 'boolean_filter':
query_params, post_processing = self._resolve_boolean_filter(task, query_params, post_processing)
elif task[0] == 'arithmetic_op_fields':
query_params, post_processing = self._resolve_arithmetic_op_fields(task, query_params, post_processing)
else: # a lot of operations simply post-process the dataframe - put these straight through
query_params, post_processing = self._resolve_post_processing_task(task, query_params, post_processing)
@ -858,13 +887,128 @@ class Operations:
return query_params, post_processing
def _resolve_arithmetic_op_fields(self, item, query_params, post_processing):
# task = ('arithmetic_op_fields', (field_name, (op_name, (left_field, right_field))))
field_name = item[1][0]
op_name = item[1][1][0]
left_field = item[1][1][1][0]
right_field = item[1][1][1][1]
# https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-api-reference-shared-java-lang.html#painless-api-reference-shared-Math
if isinstance(left_field, str) and isinstance(right_field, str):
"""
(if op_name = '__truediv__')
"script_fields": {
"field_name": {
"script": {
"source": "doc[left_field].value / doc[right_field].value"
}
}
}
"""
if op_name == '__add__':
source = "doc['{0}'].value + doc['{1}'].value".format(left_field, right_field)
elif op_name == '__truediv__':
source = "doc['{0}'].value / doc['{1}'].value".format(left_field, right_field)
elif op_name == '__floordiv__':
source = "Math.floor(doc['{0}'].value / doc['{1}'].value)".format(left_field, right_field)
elif op_name == '__pow__':
source = "Math.pow(doc['{0}'].value, doc['{1}'].value)".format(left_field, right_field)
elif op_name == '__mod__':
source = "doc['{0}'].value % doc['{1}'].value".format(left_field, right_field)
elif op_name == '__mul__':
source = "doc['{0}'].value * doc['{1}'].value".format(left_field, right_field)
elif op_name == '__sub__':
source = "doc['{0}'].value - doc['{1}'].value".format(left_field, right_field)
else:
raise NotImplementedError("Not implemented operation '{0}'".format(op_name))
if query_params['query_script_fields'] is None:
query_params['query_script_fields'] = {}
query_params['query_script_fields'][field_name] = {
'script': {
'source': source
}
}
elif isinstance(left_field, str) and np.issubdtype(np.dtype(type(right_field)), np.number):
"""
(if op_name = '__truediv__')
"script_fields": {
"field_name": {
"script": {
"source": "doc[left_field].value / right_field"
}
}
}
"""
if op_name == '__add__':
source = "doc['{0}'].value + {1}".format(left_field, right_field)
elif op_name == '__truediv__':
source = "doc['{0}'].value / {1}".format(left_field, right_field)
elif op_name == '__floordiv__':
source = "Math.floor(doc['{0}'].value / {1})".format(left_field, right_field)
elif op_name == '__pow__':
source = "Math.pow(doc['{0}'].value, {1})".format(left_field, right_field)
elif op_name == '__mod__':
source = "doc['{0}'].value % {1}".format(left_field, right_field)
elif op_name == '__mul__':
source = "doc['{0}'].value * {1}".format(left_field, right_field)
elif op_name == '__sub__':
source = "doc['{0}'].value - {1}".format(left_field, right_field)
else:
raise NotImplementedError("Not implemented operation '{0}'".format(op_name))
elif np.issubdtype(np.dtype(type(left_field)), np.number) and isinstance(right_field, str):
"""
(if op_name = '__truediv__')
"script_fields": {
"field_name": {
"script": {
"source": "left_field / doc['right_field'].value"
}
}
}
"""
if op_name == '__add__':
source = "{0} + doc['{1}'].value".format(left_field, right_field)
elif op_name == '__truediv__':
source = "{0} / doc['{1}'].value".format(left_field, right_field)
elif op_name == '__floordiv__':
source = "Math.floor({0} / doc['{1}'].value)".format(left_field, right_field)
elif op_name == '__pow__':
source = "Math.pow({0}, doc['{1}'].value)".format(left_field, right_field)
elif op_name == '__mod__':
source = "{0} % doc['{1}'].value".format(left_field, right_field)
elif op_name == '__mul__':
source = "{0} * doc['{1}'].value".format(left_field, right_field)
elif op_name == '__sub__':
source = "{0} - doc['{1}'].value".format(left_field, right_field)
else:
raise NotImplementedError("Not implemented operation '{0}'".format(op_name))
else:
raise TypeError("Types for operation inconsistent {} {} {}", type(left_field), type(right_field), op_name)
if query_params['query_script_fields'] is None:
query_params['query_script_fields'] = {}
query_params['query_script_fields'][field_name] = {
'script': {
'source': source
}
}
return query_params, post_processing
def _resolve_post_processing_task(self, item, query_params, post_processing):
# Just do this in post-processing
if item[0] != 'columns':
if item[0] != 'field_names':
post_processing.append(item)
return query_params, post_processing
def _size(self, query_params, post_processing):
# Shrink wrap code around checking if size parameter is set
size = query_params['query_size'] # can be None
@ -879,19 +1023,28 @@ class Operations:
# This can return None
return size
def info_es(self, buf):
buf.write("Operations:\n")
buf.write(" tasks: {0}\n".format(self._tasks))
query_params, post_processing = self._resolve_tasks()
size, sort_params = Operations._query_params_to_size_and_sort(query_params)
columns = self.get_columns()
field_names = self.get_field_names()
script_fields = query_params['query_script_fields']
query = Query(query_params['query'])
body = query.to_search_body()
if script_fields is not None:
body['script_fields'] = script_fields
buf.write(" size: {0}\n".format(size))
buf.write(" sort_params: {0}\n".format(sort_params))
buf.write(" columns: {0}\n".format(columns))
buf.write(" _source: {0}\n".format(field_names))
buf.write(" body: {0}\n".format(body))
buf.write(" post_processing: {0}\n".format(post_processing))
def update_query(self, boolean_filter):
task = ('boolean_filter', boolean_filter)
self._tasks.append(task)

View File

@ -15,10 +15,12 @@ class Query:
def __init__(self, query=None):
if query is None:
self._query = BooleanFilter()
self._script_fields = {}
self._aggs = {}
else:
# Deep copy the incoming query so we can change it
self._query = deepcopy(query._query)
self._script_fields = deepcopy(query._script_fields)
self._aggs = deepcopy(query._aggs)
def exists(self, field, must=True):
@ -157,5 +159,14 @@ class Query:
else:
self._query = self._query & boolean_filter
def arithmetic_op_fields(self, op_name, left_field, right_field):
if self._script_fields.empty():
body = None
else:
body = {"query": self._script_fields.build()}
return body
def __repr__(self):
return repr(self.to_search_body())

View File

@ -1,7 +1,5 @@
import pandas as pd
from pandas.core.dtypes.common import (
is_list_like
)
import numpy as np
from eland import Client
from eland import Index
@ -38,12 +36,8 @@ class ElandQueryCompiler:
A way to mitigate this would be to post process this drop - TODO
"""
def __init__(self,
client=None,
index_pattern=None,
columns=None,
index_field=None,
operations=None):
def __init__(self, client=None, index_pattern=None, field_names=None, index_field=None, operations=None,
name_mapper=None):
self._client = Client(client)
self._index_pattern = index_pattern
@ -58,29 +52,54 @@ class ElandQueryCompiler:
else:
self._operations = operations
if columns is not None:
self.columns = columns
if field_names is not None:
self.field_names = field_names
if name_mapper is None:
self._name_mapper = ElandQueryCompiler.DisplayNameToFieldNameMapper()
else:
self._name_mapper = name_mapper
def _get_index(self):
return self._index
def _get_field_names(self):
field_names = self._operations.get_field_names()
if field_names is None:
# default to all
field_names = self._mappings.source_fields()
return pd.Index(field_names)
def _set_field_names(self, field_names):
self._operations.set_field_names(field_names)
field_names = property(_get_field_names, _set_field_names)
def _get_columns(self):
columns = self._operations.get_columns()
columns = self._operations.get_field_names()
if columns is None:
# default to all
columns = self._mappings.source_fields()
# map renames
columns = self._name_mapper.field_to_display_names(columns)
return pd.Index(columns)
def _set_columns(self, columns):
self._operations.set_columns(columns)
# map renames
columns = self._name_mapper.display_to_field_names(columns)
self._operations.set_field_names(columns)
columns = property(_get_columns, _set_columns)
index = property(_get_index)
@property
def dtypes(self):
columns = self._operations.get_columns()
columns = self._operations.get_field_names()
return self._mappings.dtypes(columns)
@ -194,6 +213,12 @@ class ElandQueryCompiler:
row = hit['_source']
# script_fields appear in 'fields'
if 'fields' in hit:
fields = hit['fields']
for key, value in fields.items():
row[key] = value
# get index value - can be _id or can be field value in source
if self._index.is_source_field:
index_field = row[self._index.index_field]
@ -212,16 +237,21 @@ class ElandQueryCompiler:
# Create pandas DataFrame
df = pd.DataFrame(data=rows, index=index)
# _source may not contain all columns in the mapping
# therefore, fill in missing columns
# (note this returns self.columns NOT IN df.columns)
missing_columns = list(set(self.columns) - set(df.columns))
# _source may not contain all field_names in the mapping
# therefore, fill in missing field_names
# (note this returns self.field_names NOT IN df.columns)
missing_field_names = list(set(self.field_names) - set(df.columns))
for missing in missing_columns:
for missing in missing_field_names:
is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(missing)
df[missing] = pd.Series(dtype=pd_dtype)
# Rename columns
if not self._name_mapper.empty:
df.rename(columns=self._name_mapper.display_names_mapper(), inplace=True)
# Sort columns in mapping order
if len(self.columns) > 1:
df = df[self.columns]
return partial_result, df
@ -267,6 +297,17 @@ class ElandQueryCompiler:
out[field_name].append(x)
else:
out[field_name] = x
else:
# Script fields end up here
# Elasticsearch returns 'Infinity' as a string for np.inf values.
# Map this to a numeric value to avoid this whole Series being classed as an object
# TODO - create a lookup for script fields and dtypes to only map 'Infinity'
# if the field is numeric. This implementation will currently map
# any script field with "Infinity" as a string to np.inf
if x == 'Infinity':
x = np.inf
out[name[:-1]] = x
flatten(y)
@ -307,13 +348,18 @@ class ElandQueryCompiler:
return df
def copy(self):
return ElandQueryCompiler(
client=self._client,
index_pattern=self._index_pattern,
columns=None, # columns are embedded in operations
index_field=self._index.index_field,
operations=self._operations.copy()
)
return ElandQueryCompiler(client=self._client, index_pattern=self._index_pattern, field_names=None,
index_field=self._index.index_field, operations=self._operations.copy(),
name_mapper=self._name_mapper.copy())
def rename(self, renames, inplace=False):
if inplace:
self._name_mapper.rename_display_name(renames)
return self
else:
result = self.copy()
result._name_mapper.rename_display_name(renames)
return result
def head(self, n):
result = self.copy()
@ -364,14 +410,7 @@ class ElandQueryCompiler:
if numeric:
raise NotImplementedError("Not implemented yet...")
result._operations.set_columns(list(key))
return result
def view(self, index=None, columns=None):
result = self.copy()
result._operations.iloc(index, columns)
result._operations.set_field_names(list(key))
return result
@ -382,7 +421,7 @@ class ElandQueryCompiler:
if columns is not None:
# columns is a pandas.Index so we can use pandas drop feature
new_columns = self.columns.drop(columns)
result._operations.set_columns(new_columns.to_list())
result._operations.set_field_names(new_columns.to_list())
if index is not None:
result._operations.drop_index_values(self, self.index.index_field, index)
@ -433,3 +472,140 @@ class ElandQueryCompiler:
return result
def check_arithmetics(self, right):
"""
Compare 2 query_compilers to see if arithmetic operations can be performed by the NDFrame object.
This does very basic comparisons and ignores some of the complexities of incompatible task lists
Raises exception if incompatible
Parameters
----------
right: ElandQueryCompiler
The query compiler to compare self to
Raises
------
TypeError, ValueError
If arithmetic operations aren't possible
"""
if not isinstance(right, ElandQueryCompiler):
raise TypeError(
"Incompatible types "
"{0} != {1}".format(type(self), type(right))
)
if self._client._es != right._client._es:
raise ValueError(
"Can not perform arithmetic operations across different clients"
"{0} != {1}".format(self._client._es, right._client._es)
)
if self._index.index_field != right._index.index_field:
raise ValueError(
"Can not perform arithmetic operations across different index fields "
"{0} != {1}".format(self._index.index_field, right._index.index_field)
)
if self._index_pattern != right._index_pattern:
raise ValueError(
"Can not perform arithmetic operations across different index patterns"
"{0} != {1}".format(self._index_pattern, right._index_pattern)
)
def arithmetic_op_fields(self, new_field_name, op, left_field, right_field):
result = self.copy()
result._operations.arithmetic_op_fields(new_field_name, op, left_field, right_field)
return result
"""
Internal class to deal with column renaming and script_fields
"""
class DisplayNameToFieldNameMapper:
def __init__(self,
field_to_display_names=None,
display_to_field_names=None):
if field_to_display_names is not None:
self._field_to_display_names = field_to_display_names
else:
self._field_to_display_names = dict()
if display_to_field_names is not None:
self._display_to_field_names = display_to_field_names
else:
self._display_to_field_names = dict()
def rename_display_name(self, renames):
for current_display_name, new_display_name in renames.items():
if current_display_name in self._display_to_field_names:
# has been renamed already - update name
field_name = self._display_to_field_names[current_display_name]
del self._display_to_field_names[current_display_name]
del self._field_to_display_names[field_name]
self._display_to_field_names[new_display_name] = field_name
self._field_to_display_names[field_name] = new_display_name
else:
# new rename - assume 'current_display_name' is 'field_name'
field_name = current_display_name
# if field_name is already mapped ignore
if field_name not in self._field_to_display_names:
self._display_to_field_names[new_display_name] = field_name
self._field_to_display_names[field_name] = new_display_name
def field_names_to_list(self):
return sorted(list(self._field_to_display_names.keys()))
def display_names_to_list(self):
return sorted(list(self._display_to_field_names.keys()))
# Return mapper values as dict
def display_names_mapper(self):
return self._field_to_display_names
@property
def empty(self):
return not self._display_to_field_names
def field_to_display_names(self, field_names):
if self.empty:
return field_names
display_names = []
for field_name in field_names:
if field_name in self._field_to_display_names:
display_name = self._field_to_display_names[field_name]
else:
display_name = field_name
display_names.append(display_name)
return display_names
def display_to_field_names(self, display_names):
if self.empty:
return display_names
field_names = []
for display_name in display_names:
if display_name in self._display_to_field_names:
field_name = self._display_to_field_names[display_name]
else:
field_name = display_name
field_names.append(field_name)
return field_names
def __constructor__(self, *args, **kwargs):
return type(self)(*args, **kwargs)
def copy(self):
return self.__constructor__(
field_to_display_names=self._field_to_display_names.copy(),
display_to_field_names=self._display_to_field_names.copy()
)

File diff suppressed because it is too large Load Diff

View File

@ -279,10 +279,10 @@ ECOMMERCE_MAPPING = {"mappings": {
"type": "keyword"
},
"taxful_total_price": {
"type": "half_float"
"type": "float"
},
"taxless_total_price": {
"type": "half_float"
"type": "float"
},
"total_quantity": {
"type": "integer"

View File

View File

@ -0,0 +1,28 @@
# File called _pytest for PyCharm compatability
from elasticsearch import Elasticsearch
import eland as ed
from eland.tests.common import TestData
import pytest
class TestClientEq(TestData):
def test_self_eq(self):
es = Elasticsearch('localhost')
client = ed.Client(es)
assert client != es
assert client == client
def test_non_self_ne(self):
es1 = Elasticsearch('localhost')
es2 = Elasticsearch('localhost')
client1 = ed.Client(es1)
client2 = ed.Client(es2)
assert client1 != client2

View File

@ -80,7 +80,7 @@ def assert_eland_frame_equal(left, right):
assert_frame_equal(left._to_pandas(), right._to_pandas())
def assert_pandas_eland_series_equal(left, right):
def assert_pandas_eland_series_equal(left, right, check_less_precise=False):
if not isinstance(left, pd.Series):
raise AssertionError("Expected type {exp_type}, found {act_type} instead".format(
exp_type='pd.Series', act_type=type(left)))
@ -90,4 +90,4 @@ def assert_pandas_eland_series_equal(left, right):
exp_type='ed.Series', act_type=type(right)))
# Use pandas tests to check similarity
assert_series_equal(left, right._to_pandas())
assert_series_equal(left, right._to_pandas(), check_less_precise=check_less_precise)

View File

@ -16,6 +16,9 @@ class TestDataFrameDtypes(TestData):
assert_series_equal(pd_flights.dtypes, ed_flights.dtypes)
for i in range(0, len(pd_flights.dtypes)-1):
assert type(pd_flights.dtypes[i]) == type(ed_flights.dtypes[i])
def test_flights_select_dtypes(self):
ed_flights = self.ed_flights_small()
pd_flights = self.pd_flights_small()

View File

@ -8,7 +8,7 @@ class TestMappingsAggregatables(TestData):
def test_ecommerce_all_aggregatables(self):
ed_ecommerce = self.ed_ecommerce()
aggregatables = ed_ecommerce._query_compiler._mappings.aggregatable_columns()
aggregatables = ed_ecommerce._query_compiler._mappings.aggregatable_field_names()
expected = {'category.keyword': 'category',
'currency': 'currency',
@ -67,6 +67,6 @@ class TestMappingsAggregatables(TestData):
'customer_first_name.keyword': 'customer_first_name',
'type': 'type', 'user': 'user'}
aggregatables = ed_ecommerce._query_compiler._mappings.aggregatable_columns(expected.values())
aggregatables = ed_ecommerce._query_compiler._mappings.aggregatable_field_names(expected.values())
assert expected == aggregatables

View File

@ -21,6 +21,6 @@ class TestMappingsDtypes(TestData):
pd_flights = self.pd_flights()[['Carrier', 'AvgTicketPrice', 'Cancelled']]
pd_dtypes = pd_flights.dtypes
ed_dtypes = ed_flights._query_compiler._mappings.dtypes(columns=['Carrier', 'AvgTicketPrice', 'Cancelled'])
ed_dtypes = ed_flights._query_compiler._mappings.dtypes(field_names=['Carrier', 'AvgTicketPrice', 'Cancelled'])
assert_series_equal(pd_dtypes, ed_dtypes)

View File

@ -13,13 +13,13 @@ class TestMappingsNumericSourceFields(TestData):
ed_flights = self.ed_flights()
pd_flights = self.pd_flights()
ed_numeric = ed_flights._query_compiler._mappings.numeric_source_fields(columns=None, include_bool=False)
ed_numeric = ed_flights._query_compiler._mappings.numeric_source_fields(field_names=None, include_bool=False)
pd_numeric = pd_flights.select_dtypes(include=np.number)
assert pd_numeric.columns.to_list() == ed_numeric
def test_ecommerce_selected_non_numeric_source_fields(self):
columns = ['category', 'currency', 'customer_birth_date', 'customer_first_name', 'user']
field_names = ['category', 'currency', 'customer_birth_date', 'customer_first_name', 'user']
"""
Note: non of there are numeric
category object
@ -29,16 +29,16 @@ class TestMappingsNumericSourceFields(TestData):
user object
"""
ed_ecommerce = self.ed_ecommerce()[columns]
pd_ecommerce = self.pd_ecommerce()[columns]
ed_ecommerce = self.ed_ecommerce()[field_names]
pd_ecommerce = self.pd_ecommerce()[field_names]
ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(columns=columns, include_bool=False)
ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(field_names=field_names, include_bool=False)
pd_numeric = pd_ecommerce.select_dtypes(include=np.number)
assert pd_numeric.columns.to_list() == ed_numeric
def test_ecommerce_selected_mixed_numeric_source_fields(self):
columns = ['category', 'currency', 'customer_birth_date', 'customer_first_name', 'total_quantity', 'user']
field_names = ['category', 'currency', 'customer_birth_date', 'customer_first_name', 'total_quantity', 'user']
"""
Note: one is numeric
@ -50,16 +50,16 @@ class TestMappingsNumericSourceFields(TestData):
user object
"""
ed_ecommerce = self.ed_ecommerce()[columns]
pd_ecommerce = self.pd_ecommerce()[columns]
ed_ecommerce = self.ed_ecommerce()[field_names]
pd_ecommerce = self.pd_ecommerce()[field_names]
ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(columns=columns, include_bool=False)
ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(field_names=field_names, include_bool=False)
pd_numeric = pd_ecommerce.select_dtypes(include=np.number)
assert pd_numeric.columns.to_list() == ed_numeric
def test_ecommerce_selected_all_numeric_source_fields(self):
columns = ['total_quantity', 'taxful_total_price', 'taxless_total_price']
field_names = ['total_quantity', 'taxful_total_price', 'taxless_total_price']
"""
Note: all are numeric
@ -68,10 +68,10 @@ class TestMappingsNumericSourceFields(TestData):
taxless_total_price float64
"""
ed_ecommerce = self.ed_ecommerce()[columns]
pd_ecommerce = self.pd_ecommerce()[columns]
ed_ecommerce = self.ed_ecommerce()[field_names]
pd_ecommerce = self.pd_ecommerce()[field_names]
ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(columns=columns, include_bool=False)
ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(field_names=field_names, include_bool=False)
pd_numeric = pd_ecommerce.select_dtypes(include=np.number)
assert pd_numeric.columns.to_list() == ed_numeric

View File

@ -0,0 +1,75 @@
# File called _pytest for PyCharm compatability
import pandas as pd
from pandas.util.testing import assert_series_equal
from eland import ElandQueryCompiler
from eland.tests.common import TestData
class TestQueryCompilerRename(TestData):
def test_query_compiler_basic_rename(self):
field_names = []
display_names = []
mapper = ElandQueryCompiler.DisplayNameToFieldNameMapper()
assert field_names == mapper.field_names_to_list()
assert display_names == mapper.display_names_to_list()
field_names = ['a']
display_names = ['A']
update_A = {'a' : 'A'}
mapper.rename_display_name(update_A)
assert field_names == mapper.field_names_to_list()
assert display_names == mapper.display_names_to_list()
field_names = ['a', 'b']
display_names = ['A', 'B']
update_B = {'b' : 'B'}
mapper.rename_display_name(update_B)
assert field_names == mapper.field_names_to_list()
assert display_names == mapper.display_names_to_list()
field_names = ['a', 'b']
display_names = ['AA', 'B']
update_AA = {'A' : 'AA'}
mapper.rename_display_name(update_AA)
assert field_names == mapper.field_names_to_list()
assert display_names == mapper.display_names_to_list()
def test_query_compiler_basic_rename_columns(self):
columns = ['a', 'b', 'c', 'd']
mapper = ElandQueryCompiler.DisplayNameToFieldNameMapper()
display_names = ['A', 'b', 'c', 'd']
update_A = {'a' : 'A'}
mapper.rename_display_name(update_A)
assert display_names == mapper.field_to_display_names(columns)
# Invalid update
display_names = ['A', 'b', 'c', 'd']
update_ZZ = {'a' : 'ZZ'}
mapper.rename_display_name(update_ZZ)
assert display_names == mapper.field_to_display_names(columns)
display_names = ['AA', 'b', 'c', 'd']
update_AA = {'A' : 'AA'} # already renamed to 'A'
mapper.rename_display_name(update_AA)
assert display_names == mapper.field_to_display_names(columns)
display_names = ['AA', 'b', 'C', 'd']
update_AA_C = {'a' : 'AA', 'c' : 'C'} # 'a' rename ignored
mapper.rename_display_name(update_AA_C)
assert display_names == mapper.field_to_display_names(columns)

View File

@ -0,0 +1,204 @@
# File called _pytest for PyCharm compatability
import pytest
import numpy as np
from eland.tests.common import TestData, assert_pandas_eland_series_equal
class TestSeriesArithmetics(TestData):
def test_ecommerce_series_invalid_div(self):
pd_df = self.pd_ecommerce()
ed_df = self.ed_ecommerce()
# eland / pandas == error
with pytest.raises(TypeError):
ed_series = ed_df['total_quantity'] / pd_df['taxful_total_price']
def test_ecommerce_series_basic_arithmetics(self):
pd_df = self.pd_ecommerce().head(100)
ed_df = self.ed_ecommerce().head(100)
ops = ['__add__',
'__truediv__',
'__floordiv__',
'__pow__',
'__mod__',
'__mul__',
'__sub__',
'add',
'truediv',
'floordiv',
'pow',
'mod',
'mul',
'sub']
for op in ops:
pd_series = getattr(pd_df['taxful_total_price'], op)(pd_df['total_quantity'])
ed_series = getattr(ed_df['taxful_total_price'], op)(ed_df['total_quantity'])
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
pd_series = getattr(pd_df['taxful_total_price'], op)(10.56)
ed_series = getattr(ed_df['taxful_total_price'], op)(10.56)
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
pd_series = getattr(pd_df['taxful_total_price'], op)(np.float32(1.879))
ed_series = getattr(ed_df['taxful_total_price'], op)(np.float32(1.879))
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
pd_series = getattr(pd_df['taxful_total_price'], op)(int(8))
ed_series = getattr(ed_df['taxful_total_price'], op)(int(8))
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
def test_supported_series_dtypes_ops(self):
pd_df = self.pd_ecommerce().head(100)
ed_df = self.ed_ecommerce().head(100)
# Test some specific operations that are and aren't supported
numeric_ops = ['__add__',
'__truediv__',
'__floordiv__',
'__pow__',
'__mod__',
'__mul__',
'__sub__']
non_string_numeric_ops = ['__add__',
'__truediv__',
'__floordiv__',
'__pow__',
'__mod__',
'__sub__']
# __mul__ is supported for int * str in pandas
# float op float
for op in numeric_ops:
pd_series = getattr(pd_df['taxful_total_price'], op)(pd_df['taxless_total_price'])
ed_series = getattr(ed_df['taxful_total_price'], op)(ed_df['taxless_total_price'])
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
# int op float
for op in numeric_ops:
pd_series = getattr(pd_df['total_quantity'], op)(pd_df['taxless_total_price'])
ed_series = getattr(ed_df['total_quantity'], op)(ed_df['taxless_total_price'])
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
# float op int
for op in numeric_ops:
pd_series = getattr(pd_df['taxful_total_price'], op)(pd_df['total_quantity'])
ed_series = getattr(ed_df['taxful_total_price'], op)(ed_df['total_quantity'])
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
# str op int (throws)
for op in non_string_numeric_ops:
with pytest.raises(TypeError):
pd_series = getattr(pd_df['currency'], op)(pd_df['total_quantity'])
with pytest.raises(TypeError):
ed_series = getattr(ed_df['currency'], op)(ed_df['total_quantity'])
with pytest.raises(TypeError):
pd_series = getattr(pd_df['currency'], op)(1)
with pytest.raises(TypeError):
ed_series = getattr(ed_df['currency'], op)(1)
# int op str (throws)
for op in non_string_numeric_ops:
with pytest.raises(TypeError):
pd_series = getattr(pd_df['total_quantity'], op)(pd_df['currency'])
with pytest.raises(TypeError):
ed_series = getattr(ed_df['total_quantity'], op)(ed_df['currency'])
def test_ecommerce_series_basic_rarithmetics(self):
pd_df = self.pd_ecommerce().head(10)
ed_df = self.ed_ecommerce().head(10)
ops = ['__radd__',
'__rtruediv__',
'__rfloordiv__',
'__rpow__',
'__rmod__',
'__rmul__',
'__rsub__',
'radd',
'rtruediv',
'rfloordiv',
'rpow',
'rmod',
'rmul',
'rsub']
for op in ops:
pd_series = getattr(pd_df['taxful_total_price'], op)(pd_df['total_quantity'])
ed_series = getattr(ed_df['taxful_total_price'], op)(ed_df['total_quantity'])
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
pd_series = getattr(pd_df['taxful_total_price'], op)(3.141)
ed_series = getattr(ed_df['taxful_total_price'], op)(3.141)
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
pd_series = getattr(pd_df['taxful_total_price'], op)(np.float32(2.879))
ed_series = getattr(ed_df['taxful_total_price'], op)(np.float32(2.879))
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
pd_series = getattr(pd_df['taxful_total_price'], op)(int(6))
ed_series = getattr(ed_df['taxful_total_price'], op)(int(6))
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
def test_supported_series_dtypes_rops(self):
pd_df = self.pd_ecommerce().head(100)
ed_df = self.ed_ecommerce().head(100)
# Test some specific operations that are and aren't supported
numeric_ops = ['__radd__',
'__rtruediv__',
'__rfloordiv__',
'__rpow__',
'__rmod__',
'__rmul__',
'__rsub__']
non_string_numeric_ops = ['__radd__',
'__rtruediv__',
'__rfloordiv__',
'__rpow__',
'__rmod__',
'__rsub__']
# __rmul__ is supported for int * str in pandas
# float op float
for op in numeric_ops:
pd_series = getattr(pd_df['taxful_total_price'], op)(pd_df['taxless_total_price'])
ed_series = getattr(ed_df['taxful_total_price'], op)(ed_df['taxless_total_price'])
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
# int op float
for op in numeric_ops:
pd_series = getattr(pd_df['total_quantity'], op)(pd_df['taxless_total_price'])
ed_series = getattr(ed_df['total_quantity'], op)(ed_df['taxless_total_price'])
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
# float op int
for op in numeric_ops:
pd_series = getattr(pd_df['taxful_total_price'], op)(pd_df['total_quantity'])
ed_series = getattr(ed_df['taxful_total_price'], op)(ed_df['total_quantity'])
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
# str op int (throws)
for op in non_string_numeric_ops:
print(op)
with pytest.raises(TypeError):
pd_series = getattr(pd_df['currency'], op)(pd_df['total_quantity'])
with pytest.raises(TypeError):
ed_series = getattr(ed_df['currency'], op)(ed_df['total_quantity'])
with pytest.raises(TypeError):
pd_series = getattr(pd_df['currency'], op)(10.0)
with pytest.raises(TypeError):
ed_series = getattr(ed_df['currency'], op)(10.0)
# int op str (throws)
for op in non_string_numeric_ops:
with pytest.raises(TypeError):
pd_series = getattr(pd_df['total_quantity'], op)(pd_df['currency'])
with pytest.raises(TypeError):
ed_series = getattr(ed_df['total_quantity'], op)(ed_df['currency'])

View File

@ -0,0 +1,17 @@
# File called _pytest for PyCharm compatability
from pandas.util.testing import assert_almost_equal
from eland.tests.common import TestData
import eland as ed
class TestSeriesInfoEs(TestData):
def test_flights_info_es(self):
ed_flights = self.ed_flights()['AvgTicketPrice']
# No assertion, just test it can be called
info_es = ed_flights.info_es()

View File

@ -0,0 +1,44 @@
# File called _pytest for PyCharm compatability
from pandas.util.testing import assert_almost_equal
from eland.tests.common import TestData
import eland as ed
class TestSeriesMetrics(TestData):
funcs = ['max', 'min', 'mean', 'sum']
def test_flights_metrics(self):
pd_flights = self.pd_flights()['AvgTicketPrice']
ed_flights = self.ed_flights()['AvgTicketPrice']
for func in self.funcs:
pd_metric = getattr(pd_flights, func)()
ed_metric = getattr(ed_flights, func)()
assert_almost_equal(pd_metric, ed_metric, check_less_precise=True)
def test_ecommerce_selected_non_numeric_source_fields(self):
# None of these are numeric
column = 'category'
ed_ecommerce = self.ed_ecommerce()[column]
for func in self.funcs:
ed_metric = getattr(ed_ecommerce, func)()
assert ed_metric.empty
def test_ecommerce_selected_all_numeric_source_fields(self):
# All of these are numeric
columns = ['total_quantity', 'taxful_total_price', 'taxless_total_price']
for column in columns:
pd_ecommerce = self.pd_ecommerce()[column]
ed_ecommerce = self.ed_ecommerce()[column]
for func in self.funcs:
assert_almost_equal(getattr(pd_ecommerce, func)(), getattr(ed_ecommerce, func)(),
check_less_precise=True)

View File

@ -0,0 +1,32 @@
# File called _pytest for PyCharm compatability
import eland as ed
from eland.tests import ELASTICSEARCH_HOST
from eland.tests import FLIGHTS_INDEX_NAME
from eland.tests.common import TestData
from eland.tests.common import assert_pandas_eland_series_equal
class TestSeriesName(TestData):
def test_name(self):
# deep copy pandas DataFrame as .name alters this reference frame
pd_series = self.pd_flights()['Carrier'].copy(deep=True)
ed_series = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'Carrier')
assert_pandas_eland_series_equal(pd_series, ed_series)
assert ed_series.name == pd_series.name
pd_series.name = "renamed1"
ed_series.name = "renamed1"
assert_pandas_eland_series_equal(pd_series, ed_series)
assert ed_series.name == pd_series.name
pd_series.name = "renamed2"
ed_series.name = "renamed2"
assert_pandas_eland_series_equal(pd_series, ed_series)
assert ed_series.name == pd_series.name

View File

@ -0,0 +1,23 @@
# File called _pytest for PyCharm compatability
import eland as ed
from eland.tests import ELASTICSEARCH_HOST
from eland.tests import FLIGHTS_INDEX_NAME
from eland.tests.common import TestData
from eland.tests.common import assert_pandas_eland_series_equal
class TestSeriesRename(TestData):
def test_rename(self):
pd_carrier = self.pd_flights()['Carrier']
ed_carrier = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'Carrier')
assert_pandas_eland_series_equal(pd_carrier, ed_carrier)
pd_renamed = pd_carrier.rename("renamed")
ed_renamed = ed_carrier.rename("renamed")
assert_pandas_eland_series_equal(pd_renamed, ed_renamed)

View File

@ -1,13 +1,14 @@
# File called _pytest for PyCharm compatability
import eland as ed
import pandas as pd
from eland.tests import ELASTICSEARCH_HOST
from eland.tests import FLIGHTS_INDEX_NAME
from eland.tests import FLIGHTS_INDEX_NAME, ECOMMERCE_INDEX_NAME
from eland.tests.common import TestData
class TestSeriesRepr(TestData):
def test_repr(self):
def test_repr_flights_carrier(self):
pd_s = self.pd_flights()['Carrier']
ed_s = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'Carrier')
@ -15,3 +16,12 @@ class TestSeriesRepr(TestData):
ed_repr = repr(ed_s)
assert pd_repr == ed_repr
def test_repr_flights_carrier_5(self):
pd_s = self.pd_flights()['Carrier'].head(5)
ed_s = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'Carrier').head(5)
pd_repr = repr(pd_s)
ed_repr = repr(ed_s)
assert pd_repr == ed_repr