diff --git a/eland/dataframe.py b/eland/dataframe.py index 5bead9d..c14e152 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -383,7 +383,8 @@ class DataFrame(NDFrame): tasks: [('boolean_filter', {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}), ('field_names', ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']), ('tail', ('_doc', 5))] size: 5 sort_params: _doc:desc - field_names: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin'] + _source: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin'] + body: {'query': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}, 'aggs': {}} post_processing: ['sort_index'] """ diff --git a/eland/mappings.py b/eland/mappings.py index 48140eb..dabbcef 100644 --- a/eland/mappings.py +++ b/eland/mappings.py @@ -1,5 +1,6 @@ import warnings +import numpy as np import pandas as pd from pandas.core.dtypes.common import (is_float_dtype, is_bool_dtype, is_integer_dtype, is_datetime_or_timedelta_dtype, is_string_dtype) @@ -454,13 +455,13 @@ class Mappings: """ if include_bool == True: df = self._mappings_capabilities[(self._mappings_capabilities._source == True) & - ((self._mappings_capabilities.pd_dtype == 'int64') | - (self._mappings_capabilities.pd_dtype == 'float64') | - (self._mappings_capabilities.pd_dtype == 'bool'))] + ((self._mappings_capabilities.pd_dtype == 'int64') | + (self._mappings_capabilities.pd_dtype == 'float64') | + (self._mappings_capabilities.pd_dtype == 'bool'))] else: df = self._mappings_capabilities[(self._mappings_capabilities._source == True) & - ((self._mappings_capabilities.pd_dtype == 'int64') | - (self._mappings_capabilities.pd_dtype == 'float64'))] + ((self._mappings_capabilities.pd_dtype == 'int64') | + (self._mappings_capabilities.pd_dtype == 'float64'))] # if field_names exists, filter index with field_names if field_names is not None: # reindex adds NA for non-existing field_names (non-numeric), so drop these after reindex @@ -493,13 +494,14 @@ class Mappings: Returns ------- dtypes: pd.Series - Source field name + pd_dtype + Source field name + pd_dtype as np.dtype """ if field_names is not None: return pd.Series( - {key: self._source_field_pd_dtypes[key] for key in field_names}) + {key: np.dtype(self._source_field_pd_dtypes[key]) for key in field_names}) - return pd.Series(self._source_field_pd_dtypes) + return pd.Series( + {key: np.dtype(value) for key, value in self._source_field_pd_dtypes.items()}) def info_es(self, buf): buf.write("Mappings:\n") diff --git a/eland/operations.py b/eland/operations.py index 20dfe14..9c7dfdc 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -77,7 +77,7 @@ class Operations: def set_field_names(self, field_names): # Setting field_names at different phases of the task list may result in different # operations. So instead of setting field_names once, set when it happens in call chain - if type(field_names) is not list: + if not isinstance(field_names, list): field_names = list(field_names) # TODO - field_name renaming @@ -538,6 +538,7 @@ class Operations: return collector.ret + def _es_results(self, query_compiler, collector): query_params, post_processing = self._resolve_tasks() @@ -989,9 +990,16 @@ class Operations: size, sort_params = Operations._query_params_to_size_and_sort(query_params) field_names = self.get_field_names() + script_fields = query_params['query_script_fields'] + query = Query(query_params['query']) + body = query.to_search_body() + if script_fields is not None: + body['script_fields'] = script_fields + buf.write(" size: {0}\n".format(size)) buf.write(" sort_params: {0}\n".format(sort_params)) - buf.write(" field_names: {0}\n".format(field_names)) + buf.write(" _source: {0}\n".format(field_names)) + buf.write(" body: {0}\n".format(body)) buf.write(" post_processing: {0}\n".format(post_processing)) def update_query(self, boolean_filter): diff --git a/eland/query_compiler.py b/eland/query_compiler.py index e057807..cf642ab 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -1,7 +1,5 @@ import pandas as pd -from pandas.core.dtypes.common import ( - is_list_like -) +import numpy as np from eland import Client from eland import Index @@ -300,6 +298,15 @@ class ElandQueryCompiler: else: out[field_name] = x else: + # Script fields end up here + + # Elasticsearch returns 'Infinity' as a string for np.inf values. + # Map this to a numeric value to avoid this whole Series being classed as an object + # TODO - create a lookup for script fields and dtypes to only map 'Infinity' + # if the field is numeric. This implementation will currently map + # any script field with "Infinity" as a string to np.inf + if x == 'Infinity': + x = np.inf out[name[:-1]] = x flatten(y) @@ -600,6 +607,5 @@ class ElandQueryCompiler: def copy(self): return self.__constructor__( field_to_display_names=self._field_to_display_names.copy(), - display_to_field_names = self._display_to_field_names.copy() + display_to_field_names=self._display_to_field_names.copy() ) - diff --git a/eland/series.py b/eland/series.py index 4b56450..3d364e1 100644 --- a/eland/series.py +++ b/eland/series.py @@ -19,6 +19,8 @@ import sys import warnings from io import StringIO +import numpy as np + import pandas as pd from pandas.io.common import _expand_user, _stringify_path @@ -140,7 +142,9 @@ class Series(NDFrame): def rename(self, new_name): """ Rename name of series. Only column rename is supported. This does not change the underlying - Elasticsearch index, but adds a soft link from the new name (column) to the Elasticsearch field name + Elasticsearch index, but adds a symbolic link from the new name (column) to the Elasticsearch field name. + + For instance, if a field was called 'tot_quan' it could be renamed 'Total Quantity'. Parameters ---------- @@ -358,6 +362,11 @@ class Series(NDFrame): def _to_pandas(self): return self._query_compiler.to_pandas()[self.name] + @property + def _dtype(self): + # DO NOT MAKE PUBLIC (i.e. def dtype) as this breaks query eval implementation + return self._query_compiler.dtypes[0] + def __gt__(self, other): if isinstance(other, Series): # Need to use scripted query to compare to values @@ -745,9 +754,15 @@ class Series(NDFrame): a == Series, b == numeric """ if isinstance(right, Series): - # Check compatibility + # Check compatibility of Elasticsearch cluster self._query_compiler.check_arithmetics(right._query_compiler) + # Check compatibility of dtypes + # either not a number? + if not (np.issubdtype(self._dtype, np.number) and np.issubdtype(right._dtype, np.number)): + # TODO - support limited ops on strings https://github.com/elastic/eland/issues/65 + raise TypeError("Unsupported operation: '{}' {} '{}'".format(self._dtype, method_name, right._dtype)) + new_field_name = "{0}_{1}_{2}".format(self.name, method_name, right.name) # Compatible, so create new Series @@ -756,12 +771,12 @@ class Series(NDFrame): series.name = None return series - elif isinstance(right, (int, float)): # TODO extend to numpy types + elif np.issubdtype(np.dtype(type(right)), np.number): # allow np types new_field_name = "{0}_{1}_{2}".format(self.name, method_name, str(right).replace('.', '_')) # Compatible, so create new Series series = Series(query_compiler=self._query_compiler.arithmetic_op_fields( - new_field_name, method_name, self.name, float(right))) # force rhs to float + new_field_name, method_name, self.name, right)) # name of Series remains original name series.name = self.name @@ -769,8 +784,7 @@ class Series(NDFrame): return series else: raise TypeError( - "Can only perform arithmetic operation on selected types " - "{0} != {1} for {2}".format(type(self), type(right), method_name) + "unsupported operand type(s) for '{}' {} '{}'".format(type(self), method_name, type(right)) ) def max(self): diff --git a/eland/tests/dataframe/test_dtypes_pytest.py b/eland/tests/dataframe/test_dtypes_pytest.py index 2db1734..9ba44ff 100644 --- a/eland/tests/dataframe/test_dtypes_pytest.py +++ b/eland/tests/dataframe/test_dtypes_pytest.py @@ -16,6 +16,9 @@ class TestDataFrameDtypes(TestData): assert_series_equal(pd_flights.dtypes, ed_flights.dtypes) + for i in range(0, len(pd_flights.dtypes)-1): + assert type(pd_flights.dtypes[i]) == type(ed_flights.dtypes[i]) + def test_flights_select_dtypes(self): ed_flights = self.ed_flights_small() pd_flights = self.pd_flights_small() diff --git a/eland/tests/series/test_arithmetics_pytest.py b/eland/tests/series/test_arithmetics_pytest.py index d13595f..d57703a 100644 --- a/eland/tests/series/test_arithmetics_pytest.py +++ b/eland/tests/series/test_arithmetics_pytest.py @@ -1,11 +1,10 @@ # File called _pytest for PyCharm compatability -import eland as ed -from eland.tests.common import TestData, assert_pandas_eland_series_equal -from pandas.util.testing import assert_series_equal import pytest import numpy as np +from eland.tests.common import TestData, assert_pandas_eland_series_equal + class TestSeriesArithmetics(TestData): @@ -45,7 +44,63 @@ class TestSeriesArithmetics(TestData): ed_series = getattr(ed_df['taxful_total_price'], op)(10.56) assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + pd_series = getattr(pd_df['taxful_total_price'], op)(np.float32(1.879)) + ed_series = getattr(ed_df['taxful_total_price'], op)(np.float32(1.879)) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + pd_series = getattr(pd_df['taxful_total_price'], op)(int(8)) ed_series = getattr(ed_df['taxful_total_price'], op)(int(8)) assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + def test_supported_series_dtypes_ops(self): + pd_df = self.pd_ecommerce().head(100) + ed_df = self.ed_ecommerce().head(100) + + # Test some specific operations that are and aren't supported + numeric_ops = ['__add__', + '__truediv__', + '__floordiv__', + '__pow__', + '__mod__', + '__mul__', + '__sub__'] + + non_string_numeric_ops = ['__add__', + '__truediv__', + '__floordiv__', + '__pow__', + '__mod__', + '__sub__'] + # __mul__ is supported for int * str in pandas + + # float op float + for op in numeric_ops: + pd_series = getattr(pd_df['taxful_total_price'], op)(pd_df['taxless_total_price']) + ed_series = getattr(ed_df['taxful_total_price'], op)(ed_df['taxless_total_price']) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + + # int op float + for op in numeric_ops: + pd_series = getattr(pd_df['total_quantity'], op)(pd_df['taxless_total_price']) + ed_series = getattr(ed_df['total_quantity'], op)(ed_df['taxless_total_price']) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + + # float op int + for op in numeric_ops: + pd_series = getattr(pd_df['taxful_total_price'], op)(pd_df['total_quantity']) + ed_series = getattr(ed_df['taxful_total_price'], op)(ed_df['total_quantity']) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + + # str op int (throws) + for op in non_string_numeric_ops: + with pytest.raises(TypeError): + pd_series = getattr(pd_df['currency'], op)(pd_df['total_quantity']) + with pytest.raises(TypeError): + ed_series = getattr(ed_df['currency'], op)(ed_df['total_quantity']) + + # int op str (throws) + for op in non_string_numeric_ops: + with pytest.raises(TypeError): + pd_series = getattr(pd_df['total_quantity'], op)(pd_df['currency']) + with pytest.raises(TypeError): + ed_series = getattr(ed_df['total_quantity'], op)(ed_df['currency'])