From b99f25e4eea40578d75547abe8aa7b466b2cbc02 Mon Sep 17 00:00:00 2001 From: Stephen Dodson Date: Mon, 25 Nov 2019 15:00:02 +0000 Subject: [PATCH] Adding __r* operations and resolving issues with df.info() --- eland/dataframe.py | 5 ++ eland/operations.py | 81 ++++++++++++++----- eland/series.py | 55 ++++++++++++- eland/tests/series/test_arithmetics_pytest.py | 36 +++++++++ 4 files changed, 157 insertions(+), 20 deletions(-) diff --git a/eland/dataframe.py b/eland/dataframe.py index c14e152..b75fc1d 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -519,7 +519,12 @@ class DataFrame(NDFrame): else: _verbose_repr() + # pandas 0.25.1 uses get_dtype_counts() here. This + # returns a Series with strings as the index NOT dtypes. + # Therefore, to get consistent ordering we need to + # align types with pandas method. counts = self.dtypes.value_counts() + counts.index = counts.index.astype(str) dtypes = ['{k}({kk:d})'.format(k=k[0], kk=k[1]) for k in sorted(counts.items())] lines.append('dtypes: {types}'.format(types=', '.join(dtypes))) diff --git a/eland/operations.py b/eland/operations.py index 9c7dfdc..de681fa 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -1,6 +1,7 @@ import copy from enum import Enum +import numpy as np import pandas as pd from eland import Index @@ -172,7 +173,7 @@ class Operations: # some metrics aggs (including cardinality) work on all aggregatable fields # therefore we include an optional all parameter on operations # that call _metric_aggs - if field_types=='aggregatable': + if field_types == 'aggregatable': source_fields = query_compiler._mappings.aggregatable_field_names(field_names) else: source_fields = query_compiler._mappings.numeric_source_fields(field_names) @@ -193,7 +194,7 @@ class Operations: # } results = {} - if field_types=='aggregatable': + if field_types == 'aggregatable': for key, value in source_fields.items(): results[value] = response['aggregations'][key]['value'] else: @@ -538,7 +539,6 @@ class Operations: return collector.ret - def _es_results(self, query_compiler, collector): query_params, post_processing = self._resolve_tasks() @@ -561,12 +561,24 @@ class Operations: is_scan = False if size is not None and size <= 10000: if size > 0: - es_results = query_compiler._client.search( - index=query_compiler._index_pattern, - size=size, - sort=sort_params, - body=body, - _source=field_names) + try: + es_results = query_compiler._client.search( + index=query_compiler._index_pattern, + size=size, + sort=sort_params, + body=body, + _source=field_names) + except: + # Catch ES error and print debug (currently to stdout) + error = { + 'index': query_compiler._index_pattern, + 'size': size, + 'sort': sort_params, + 'body': body, + '_source': field_names + } + print("Elasticsearch error:", error) + raise else: is_scan = True es_results = query_compiler._client.scan( @@ -589,7 +601,6 @@ class Operations: df = self._apply_df_post_processing(df, post_processing) collector.collect(df) - def iloc(self, index, field_names): # index and field_names are indexers task = ('iloc', (index, field_names)) @@ -884,7 +895,7 @@ class Operations: right_field = item[1][1][1][1] # https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-api-reference-shared-java-lang.html#painless-api-reference-shared-Math - if isinstance(right_field, str): + if isinstance(left_field, str) and isinstance(right_field, str): """ (if op_name = '__truediv__') @@ -913,7 +924,6 @@ class Operations: else: raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) - if query_params['query_script_fields'] is None: query_params['query_script_fields'] = {} query_params['query_script_fields'][field_name] = { @@ -921,7 +931,7 @@ class Operations: 'source': source } } - else: + elif isinstance(left_field, str) and np.issubdtype(np.dtype(type(right_field)), np.number): """ (if op_name = '__truediv__') @@ -949,18 +959,48 @@ class Operations: source = "doc['{0}'].value - {1}".format(left_field, right_field) else: raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) + elif np.issubdtype(np.dtype(type(left_field)), np.number) and isinstance(right_field, str): + """ + (if op_name = '__truediv__') - - if query_params['query_script_fields'] is None: - query_params['query_script_fields'] = {} - query_params['query_script_fields'][field_name] = { - 'script': { - 'source': source + "script_fields": { + "field_name": { + "script": { + "source": "left_field / doc['right_field'].value" + } } } + """ + if op_name == '__add__': + source = "{0} + doc['{1}'].value".format(left_field, right_field) + elif op_name == '__truediv__': + source = "{0} / doc['{1}'].value".format(left_field, right_field) + elif op_name == '__floordiv__': + source = "Math.floor({0} / doc['{1}'].value)".format(left_field, right_field) + elif op_name == '__pow__': + source = "Math.pow({0}, doc['{1}'].value)".format(left_field, right_field) + elif op_name == '__mod__': + source = "{0} % doc['{1}'].value".format(left_field, right_field) + elif op_name == '__mul__': + source = "{0} * doc['{1}'].value".format(left_field, right_field) + elif op_name == '__sub__': + source = "{0} - doc['{1}'].value".format(left_field, right_field) + else: + raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) + else: + raise TypeError("Types for operation inconsistent {} {} {}", type(left_field), type(right_field), op_name) + + if query_params['query_script_fields'] is None: + query_params['query_script_fields'] = {} + query_params['query_script_fields'][field_name] = { + 'script': { + 'source': source + } + } return query_params, post_processing + def _resolve_post_processing_task(self, item, query_params, post_processing): # Just do this in post-processing if item[0] != 'field_names': @@ -968,6 +1008,7 @@ class Operations: return query_params, post_processing + def _size(self, query_params, post_processing): # Shrink wrap code around checking if size parameter is set size = query_params['query_size'] # can be None @@ -982,6 +1023,7 @@ class Operations: # This can return None return size + def info_es(self, buf): buf.write("Operations:\n") buf.write(" tasks: {0}\n".format(self._tasks)) @@ -1002,6 +1044,7 @@ class Operations: buf.write(" body: {0}\n".format(body)) buf.write(" post_processing: {0}\n".format(post_processing)) + def update_query(self, boolean_filter): task = ('boolean_filter', boolean_filter) self._tasks.append(task) diff --git a/eland/series.py b/eland/series.py index 3d364e1..0c1e546 100644 --- a/eland/series.py +++ b/eland/series.py @@ -499,6 +499,7 @@ class Series(NDFrame): """ return self._numeric_op(right, _get_method_name()) + def __truediv__(self, right): """ Return floating division of series and right, element-wise (binary operator truediv). @@ -528,7 +529,7 @@ class Series(NDFrame): 3 2 4 2 Name: total_quantity, dtype: int64 - >>> df.taxful_total_price / df.total_quantity + >>> df.taxful_total_price / df.total_quantity # doctest: +SKIP 0 18.490000 1 26.990000 2 99.989998 @@ -733,6 +734,21 @@ class Series(NDFrame): """ return self._numeric_op(right, _get_method_name()) + def __radd__(self, left): + return self._numeric_rop(left, _get_method_name()) + def __rtruediv__(self, left): + return self._numeric_rop(left, _get_method_name()) + def __rfloordiv__(self, left): + return self._numeric_rop(left, _get_method_name()) + def __rmod__(self, left): + return self._numeric_rop(left, _get_method_name()) + def __rmul__(self, left): + return self._numeric_rop(left, _get_method_name()) + def __rpow__(self, left): + return self._numeric_rop(left, _get_method_name()) + def __rsub__(self, left): + return self._numeric_rop(left, _get_method_name()) + add = __add__ div = __truediv__ divide = __truediv__ @@ -745,6 +761,18 @@ class Series(NDFrame): subtract = __sub__ truediv = __truediv__ + radd = __radd__ + rdiv = __rtruediv__ + rdivide = __rtruediv__ + rfloordiv = __rfloordiv__ + rmod = __rmod__ + rmul = __rmul__ + rmultiply = __rmul__ + rpow = __rpow__ + rsub = __rsub__ + rsubtract = __rsub__ + rtruediv = __rtruediv__ + def _numeric_op(self, right, method_name): """ return a op b @@ -787,6 +815,31 @@ class Series(NDFrame): "unsupported operand type(s) for '{}' {} '{}'".format(type(self), method_name, type(right)) ) + def _numeric_rop(self, left, method_name): + """ + e.g. 1 + ed.Series + """ + op_method_name = str(method_name).replace('__r', '__') + if isinstance(left, Series): + # if both are Series, revese args and call normal op method and remove 'r' from radd etc. + return left._numeric_op(self, op_method_name) + elif np.issubdtype(np.dtype(type(left)), np.number): # allow np types + # Prefix new field name with 'f_' so it's a valid ES field name + new_field_name = "f_{0}_{1}_{2}".format(str(left).replace('.', '_'), op_method_name, self.name) + + # Compatible, so create new Series + series = Series(query_compiler=self._query_compiler.arithmetic_op_fields( + new_field_name, op_method_name, left, self.name)) + + # name of Series pinned to valid series (like pandas) + series.name = self.name + + return series + else: + raise TypeError( + "unsupported operand type(s) for '{}' {} '{}'".format(type(self), method_name, type(left)) + ) + def max(self): """ Return the maximum of the Series values diff --git a/eland/tests/series/test_arithmetics_pytest.py b/eland/tests/series/test_arithmetics_pytest.py index d57703a..7a50cc9 100644 --- a/eland/tests/series/test_arithmetics_pytest.py +++ b/eland/tests/series/test_arithmetics_pytest.py @@ -104,3 +104,39 @@ class TestSeriesArithmetics(TestData): pd_series = getattr(pd_df['total_quantity'], op)(pd_df['currency']) with pytest.raises(TypeError): ed_series = getattr(ed_df['total_quantity'], op)(ed_df['currency']) + + def test_ecommerce_series_basic_rarithmetics(self): + pd_df = self.pd_ecommerce().head(10) + ed_df = self.ed_ecommerce().head(10) + + ops = ['__radd__', + '__rtruediv__', + '__rfloordiv__', + '__rpow__', + '__rmod__', + '__rmul__', + '__rsub__', + 'radd', + 'rtruediv', + 'rfloordiv', + 'rpow', + 'rmod', + 'rmul', + 'rsub'] + + for op in ops: + pd_series = getattr(pd_df['taxful_total_price'], op)(pd_df['total_quantity']) + ed_series = getattr(ed_df['taxful_total_price'], op)(ed_df['total_quantity']) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + + pd_series = getattr(pd_df['taxful_total_price'], op)(3.141) + ed_series = getattr(ed_df['taxful_total_price'], op)(3.141) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + + pd_series = getattr(pd_df['taxful_total_price'], op)(np.float32(2.879)) + ed_series = getattr(ed_df['taxful_total_price'], op)(np.float32(2.879)) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + + pd_series = getattr(pd_df['taxful_total_price'], op)(int(6)) + ed_series = getattr(ed_df['taxful_total_price'], op)(int(6)) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)