From 15a1977dcfd645554470d9093c6ca82f78d22fe2 Mon Sep 17 00:00:00 2001 From: Seth Michael Larson Date: Mon, 27 Apr 2020 15:16:48 -0500 Subject: [PATCH] Add agg compatibility logic to Field class --- .ci/run-repository.sh | 2 +- docs/requirements-docs.txt | 2 +- eland/common.py | 20 +- eland/dataframe.py | 8 +- eland/field_mappings.py | 94 +++- eland/ndframe.py | 30 ++ eland/operations.py | 433 +++++++----------- eland/query_compiler.py | 7 +- eland/series.py | 84 +++- eland/tests/dataframe/test_dtypes_pytest.py | 8 - eland/tests/dataframe/test_metrics_pytest.py | 38 +- .../test_map_pd_aggs_to_es_aggs_pytest.py | 39 ++ eland/tests/series/test_dtype_pytest.py | 22 + eland/tests/series/test_metrics_pytest.py | 45 +- requirements-dev.txt | 2 +- requirements.txt | 2 +- setup.py | 2 +- 17 files changed, 490 insertions(+), 348 deletions(-) create mode 100644 eland/tests/operations/test_map_pd_aggs_to_es_aggs_pytest.py create mode 100644 eland/tests/series/test_dtype_pytest.py diff --git a/.ci/run-repository.sh b/.ci/run-repository.sh index 9aa913c..2048431 100755 --- a/.ci/run-repository.sh +++ b/.ci/run-repository.sh @@ -35,4 +35,4 @@ docker run \ --name eland-test-runner \ --rm \ elastic/eland \ - nox -s test + nox -s test-${PYTHON_VERSION} diff --git a/docs/requirements-docs.txt b/docs/requirements-docs.txt index 2de725e..22dd260 100644 --- a/docs/requirements-docs.txt +++ b/docs/requirements-docs.txt @@ -1,4 +1,4 @@ -elasticsearch>=7.0.5 +elasticsearch==7.7.0a2 pandas>=1 matplotlib pytest>=5.2.1 diff --git a/eland/common.py b/eland/common.py index 016607f..4f547b8 100644 --- a/eland/common.py +++ b/eland/common.py @@ -5,8 +5,9 @@ import re import warnings from enum import Enum -from typing import Union, List, Tuple, cast, Callable, Any +from typing import Union, List, Tuple, cast, Callable, Any, Optional, Dict +import numpy as np # type: ignore import pandas as pd # type: ignore from elasticsearch import Elasticsearch # type: ignore @@ -19,6 +20,23 @@ DEFAULT_PROGRESS_REPORTING_NUM_ROWS = 10000 DEFAULT_ES_MAX_RESULT_WINDOW = 10000 # index.max_result_window +with warnings.catch_warnings(): + warnings.simplefilter("ignore") + EMPTY_SERIES_DTYPE = pd.Series().dtype + + +def build_pd_series( + data: Dict[str, Any], dtype: Optional[np.dtype] = None, **kwargs: Any +) -> pd.Series: + """Builds a pd.Series while squelching the warning + for unspecified dtype on empty series + """ + dtype = dtype or (EMPTY_SERIES_DTYPE if not data else dtype) + if dtype is not None: + kwargs["dtype"] = dtype + return pd.Series(data, **kwargs) + + def docstring_parameter(*sub: Any) -> Callable[[Any], Any]: def dec(obj: Any) -> Any: obj.__doc__ = obj.__doc__.format(*sub) diff --git a/eland/dataframe.py b/eland/dataframe.py index a1e046c..7b84fc8 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -1280,11 +1280,11 @@ class DataFrame(NDFrame): Examples -------- >>> df = ed.DataFrame('localhost', 'flights') - >>> df[['DistanceKilometers', 'AvgTicketPrice']].aggregate(['sum', 'min', 'std']) + >>> df[['DistanceKilometers', 'AvgTicketPrice']].aggregate(['sum', 'min', 'std']).astype(int) DistanceKilometers AvgTicketPrice - sum 9.261629e+07 8.204365e+06 - min 0.000000e+00 1.000205e+02 - std 4.578263e+03 2.663867e+02 + sum 92616288 8204364 + min 0 100 + std 4578 266 """ axis = pd.DataFrame._get_axis_number(axis) diff --git a/eland/field_mappings.py b/eland/field_mappings.py index 5de21db..6ae6661 100644 --- a/eland/field_mappings.py +++ b/eland/field_mappings.py @@ -14,6 +14,48 @@ from pandas.core.dtypes.common import ( is_string_dtype, ) from pandas.core.dtypes.inference import is_list_like +from typing import NamedTuple, Optional + + +class Field(NamedTuple): + """Holds all information on a particular field in the mapping""" + + index: str + es_field_name: str + is_source: bool + es_dtype: str + es_date_format: Optional[str] + pd_dtype: type + is_searchable: bool + is_aggregatable: bool + is_scripted: bool + aggregatable_es_field_name: str + + @property + def is_numeric(self) -> bool: + return is_integer_dtype(self.pd_dtype) or is_float_dtype(self.pd_dtype) + + @property + def is_timestamp(self) -> bool: + return is_datetime_or_timedelta_dtype(self.pd_dtype) + + @property + def is_bool(self) -> bool: + return is_bool_dtype(self.pd_dtype) + + @property + def np_dtype(self): + return np.dtype(self.pd_dtype) + + def is_es_agg_compatible(self, es_agg): + # Cardinality works for all types + # Numerics and bools work for all aggs + if es_agg == "cardinality" or self.is_numeric or self.is_bool: + return True + # Timestamps also work for 'min', 'max' and 'avg' + if es_agg in {"min", "max", "avg"} and self.is_timestamp: + return True + return False class FieldMappings: @@ -40,6 +82,23 @@ class FieldMappings: or es_field_name.keyword (if exists) or None """ + ES_DTYPE_TO_PD_DTYPE = { + "text": "object", + "keyword": "object", + "long": "int64", + "integer": "int64", + "short": "int64", + "byte": "int64", + "binary": "int64", + "double": "float64", + "float": "float64", + "half_float": "float64", + "scaled_float": "float64", + "date": "datetime64[ns]", + "date_nanos": "datetime64[ns]", + "boolean": "bool", + } + # the labels for each column (display_name is index) column_labels = [ "es_field_name", @@ -316,8 +375,8 @@ class FieldMappings: # return just source fields (as these are the only ones we display) return capability_matrix_df[capability_matrix_df.is_source].sort_index() - @staticmethod - def _es_dtype_to_pd_dtype(es_dtype): + @classmethod + def _es_dtype_to_pd_dtype(cls, es_dtype): """ Mapping Elasticsearch types to pandas dtypes -------------------------------------------- @@ -332,28 +391,7 @@ class FieldMappings: boolean | bool TODO - add additional mapping types """ - es_dtype_to_pd_dtype = { - "text": "object", - "keyword": "object", - "long": "int64", - "integer": "int64", - "short": "int64", - "byte": "int64", - "binary": "int64", - "double": "float64", - "float": "float64", - "half_float": "float64", - "scaled_float": "float64", - "date": "datetime64[ns]", - "date_nanos": "datetime64[ns]", - "boolean": "bool", - } - - if es_dtype in es_dtype_to_pd_dtype: - return es_dtype_to_pd_dtype[es_dtype] - - # Return 'object' for all unsupported TODO - investigate how different types could be supported - return "object" + return cls.ES_DTYPE_TO_PD_DTYPE.get(es_dtype, "object") @staticmethod def _pd_dtype_to_es_dtype(pd_dtype): @@ -591,6 +629,14 @@ class FieldMappings: pd_dtypes, es_field_names, es_date_formats = self.metric_source_fields() return es_field_names + def all_source_fields(self): + source_fields = [] + for index, row in self._mappings_capabilities.iterrows(): + row = row.to_dict() + row["index"] = index + source_fields.append(Field(**row)) + return source_fields + def metric_source_fields(self, include_bool=False, include_timestamp=False): """ Returns diff --git a/eland/ndframe.py b/eland/ndframe.py index 85812b3..1b79f43 100644 --- a/eland/ndframe.py +++ b/eland/ndframe.py @@ -409,6 +409,36 @@ class NDFrame(ABC): """ return self._query_compiler.nunique() + def mad(self, numeric_only=True): + """ + Return standard deviation for each numeric column + + Returns + ------- + pandas.Series + The value of the standard deviation for each numeric column + + See Also + -------- + :pandas_api_docs:`pandas.DataFrame.std` + + Examples + -------- + >>> df = ed.DataFrame('localhost', 'flights') + >>> df.mad() # doctest: +SKIP + AvgTicketPrice 213.368709 + Cancelled 0.000000 + DistanceKilometers 2946.168236 + DistanceMiles 1830.987236 + FlightDelay 0.000000 + FlightDelayMin 0.000000 + FlightTimeHour 3.819435 + FlightTimeMin 229.142297 + dayOfWeek 2.000000 + dtype: float64 + """ + return self._query_compiler.mad(numeric_only=numeric_only) + def _hist(self, num_bins): return self._query_compiler._hist(num_bins) diff --git a/eland/operations.py b/eland/operations.py index 893f767..d2f4b23 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -3,13 +3,12 @@ # See the LICENSE file in the project root for more information import copy +import typing import warnings from typing import Optional import numpy as np - import pandas as pd -from pandas.core.dtypes.common import is_datetime_or_timedelta_dtype from elasticsearch.helpers import scan from eland import Index @@ -18,6 +17,7 @@ from eland.common import ( DEFAULT_CSV_BATCH_OUTPUT_SIZE, DEFAULT_ES_MAX_RESULT_WINDOW, elasticsearch_date_to_pandas_date, + build_pd_series, ) from eland.query import Query from eland.actions import SortFieldAction @@ -31,15 +31,8 @@ from eland.tasks import ( SizeTask, ) -with warnings.catch_warnings(): - warnings.simplefilter("ignore") - EMPTY_SERIES_DTYPE = pd.Series().dtype - - -def build_series(data, dtype=None, **kwargs): - out_dtype = EMPTY_SERIES_DTYPE if not data else dtype - s = pd.Series(data=data, index=data.keys(), dtype=out_dtype, **kwargs) - return s +if typing.TYPE_CHECKING: + from eland.query_compiler import QueryCompiler class Operations: @@ -122,45 +115,45 @@ class Operations: )["count"] counts[field] = field_exists_count - return pd.Series(data=counts, index=fields) + return build_pd_series(data=counts, index=fields) def mean(self, query_compiler, numeric_only=True): - return self._metric_aggs(query_compiler, "avg", numeric_only=numeric_only) + results = self._metric_aggs(query_compiler, ["mean"], numeric_only=numeric_only) + return build_pd_series(results, index=results.keys()) def var(self, query_compiler, numeric_only=True): - return self._metric_aggs( - query_compiler, ("extended_stats", "variance"), numeric_only=numeric_only - ) + results = self._metric_aggs(query_compiler, ["var"], numeric_only=numeric_only) + return build_pd_series(results, index=results.keys()) def std(self, query_compiler, numeric_only=True): - return self._metric_aggs( - query_compiler, - ("extended_stats", "std_deviation"), - numeric_only=numeric_only, - ) + results = self._metric_aggs(query_compiler, ["std"], numeric_only=numeric_only) + return build_pd_series(results, index=results.keys()) def median(self, query_compiler, numeric_only=True): - return self._metric_aggs( - query_compiler, ("percentiles", "50.0"), numeric_only=numeric_only + results = self._metric_aggs( + query_compiler, ["median"], numeric_only=numeric_only ) + return build_pd_series(results, index=results.keys()) def sum(self, query_compiler, numeric_only=True): - return self._metric_aggs(query_compiler, "sum", numeric_only=numeric_only) + results = self._metric_aggs(query_compiler, ["sum"], numeric_only=numeric_only) + return build_pd_series(results, index=results.keys()) def max(self, query_compiler, numeric_only=True): - return self._metric_aggs( - query_compiler, "max", numeric_only=numeric_only, keep_original_dtype=True - ) + results = self._metric_aggs(query_compiler, ["max"], numeric_only=numeric_only) + return build_pd_series(results, index=results.keys()) def min(self, query_compiler, numeric_only=True): - return self._metric_aggs( - query_compiler, "min", numeric_only=numeric_only, keep_original_dtype=True - ) + results = self._metric_aggs(query_compiler, ["min"], numeric_only=numeric_only) + return build_pd_series(results, index=results.keys()) def nunique(self, query_compiler): - return self._metric_aggs( - query_compiler, "cardinality", field_types="aggregatable" - ) + results = self._metric_aggs(query_compiler, ["nunique"], numeric_only=False) + return build_pd_series(results, index=results.keys()) + + def mad(self, query_compiler, numeric_only=True): + results = self._metric_aggs(query_compiler, ["mad"], numeric_only=numeric_only) + return build_pd_series(results, index=results.keys()) def value_counts(self, query_compiler, es_size): return self._terms_aggs(query_compiler, "terms", es_size) @@ -168,28 +161,7 @@ class Operations: def hist(self, query_compiler, bins): return self._hist_aggs(query_compiler, bins) - def _metric_aggs( - self, - query_compiler, - func, - field_types=None, - numeric_only=None, - keep_original_dtype=False, - ): - """ - Parameters - ---------- - field_types: str, default None - if `aggregatable` use only field_names whose fields in elasticseach are aggregatable. - If `None`, use only numeric fields. - keep_original_dtype : bool, default False - if `True` the output values should keep the same domain as the input values, i.e. booleans should be booleans - - Returns - ------- - pandas.Series - Series containing results of `func` applied to the field_name(s) - """ + def _metric_aggs(self, query_compiler: "QueryCompiler", pd_aggs, numeric_only=True): query_params, post_processing = self._resolve_tasks(query_compiler) size = self._size(query_params, post_processing) @@ -198,152 +170,113 @@ class Operations: f"Can not count field matches if size is set {size}" ) + results = {} + fields = query_compiler._mappings.all_source_fields() + if numeric_only: + fields = [field for field in fields if (field.is_numeric or field.is_bool)] + body = Query(query_params["query"]) - results = {} + # Convert pandas aggs to ES equivalent + es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs) - # some metrics aggs (including cardinality) work on all aggregatable fields - # therefore we include an optional all parameter on operations - # that call _metric_aggs - if field_types == "aggregatable": - aggregatable_field_names = ( - query_compiler._mappings.aggregatable_field_names() - ) + for field in fields: + for es_agg in es_aggs: + if not field.is_es_agg_compatible(es_agg): + continue - for field in aggregatable_field_names.keys(): - body.metric_aggs(field, func, field) - - response = query_compiler._client.search( - index=query_compiler._index_pattern, size=0, body=body.to_search_body() - ) - - # Results are of the form - # "aggregations" : { - # "customer_full_name.keyword" : { - # "value" : 10 - # } - # } - - # map aggregatable (e.g. x.keyword) to field_name - for key, value in aggregatable_field_names.items(): - results[value] = response["aggregations"][key]["value"] - else: - if numeric_only: - ( - pd_dtypes, - source_fields, - date_formats, - ) = query_compiler._mappings.metric_source_fields(include_bool=True) - else: - # The only non-numerics we support are bool and timestamps currently - # strings are not supported by metric aggs in ES - # TODO - sum isn't supported for Timestamp in pandas - although ES does attempt to do it - ( - pd_dtypes, - source_fields, - date_formats, - ) = query_compiler._mappings.metric_source_fields( - include_bool=True, include_timestamp=True - ) - - for field in source_fields: - if isinstance(func, tuple): - body.metric_aggs(func[0] + "_" + field, func[0], field) - else: - body.metric_aggs(field, func, field) - - response = query_compiler._client.search( - index=query_compiler._index_pattern, size=0, body=body.to_search_body() - ) - - # Results are of the form - # "aggregations" : { - # "AvgTicketPrice" : { - # "value" : 628.2536888148849 - # }, - # "timestamp": { - # "value": 1.5165624455644382E12, - # "value_as_string": "2018-01-21T19:20:45.564Z" - # } - # } - for pd_dtype, field, date_format in zip( - pd_dtypes, source_fields, date_formats - ): - if is_datetime_or_timedelta_dtype(pd_dtype): - results[field] = elasticsearch_date_to_pandas_date( - response["aggregations"][field]["value_as_string"], date_format + # If we have multiple 'extended_stats' etc. here we simply NOOP on 2nd call + if isinstance(es_agg, tuple): + body.metric_aggs( + f"{es_agg[0]}_{field.es_field_name}", + es_agg[0], + field.aggregatable_es_field_name, ) - elif keep_original_dtype: - if isinstance(func, tuple): - results = pd_dtype.type( - response["aggregations"][func[0] + "_" + field][func[1]] - ) - else: - results[field] = pd_dtype.type( - response["aggregations"][field]["value"] - ) else: - if isinstance(func, tuple): - if func[0] == "percentiles": - results[field] = response["aggregations"][ - "percentiles_" + field - ]["values"]["50.0"] + body.metric_aggs( + f"{es_agg}_{field.es_field_name}", + es_agg, + field.aggregatable_es_field_name, + ) - # If 0-length dataframe we get None here - if results[field] is None: - results[field] = np.float64(np.NaN) - elif func[1] == "variance": - # pandas computes the sample variance - # Elasticsearch computes the population variance - count = response["aggregations"][func[0] + "_" + field][ - "count" - ] + response = query_compiler._client.search( + index=query_compiler._index_pattern, size=0, body=body.to_search_body() + ) - results[field] = response["aggregations"][ - func[0] + "_" + field - ][func[1]] + """ + Results are like (for 'sum', 'min') - # transform population variance into sample variance - if count <= 1: - results[field] = np.float64(np.NaN) - else: - results[field] = count / (count - 1.0) * results[field] - elif func[1] == "std_deviation": - # pandas computes the sample std - # Elasticsearch computes the population std - count = response["aggregations"][func[0] + "_" + field][ - "count" - ] + AvgTicketPrice DistanceKilometers DistanceMiles FlightDelayMin + sum 8.204365e+06 9.261629e+07 5.754909e+07 618150 + min 1.000205e+02 0.000000e+00 0.000000e+00 0 + """ + for field in fields: + values = [] + for es_agg, pd_agg in zip(es_aggs, pd_aggs): - results[field] = response["aggregations"][ - func[0] + "_" + field - ][func[1]] + # If the field and agg aren't compatible we add a NaN + if not field.is_es_agg_compatible(es_agg): + values.append(np.float64(np.NaN)) + continue - # transform population std into sample std + if isinstance(es_agg, tuple): + agg_value = response["aggregations"][ + f"{es_agg[0]}_{field.es_field_name}" + ] + + # Pull multiple values from 'percentiles' result. + if es_agg[0] == "percentiles": + agg_value = agg_value["values"] + + agg_value = agg_value[es_agg[1]] + + # Need to convert 'Population' stddev and variance + # from Elasticsearch into 'Sample' stddev and variance + # which is what pandas uses. + if es_agg[1] in ("std_deviation", "variance"): + # Neither transformation works with count <=1 + count = response["aggregations"][ + f"{es_agg[0]}_{field.es_field_name}" + ]["count"] + + # All of the below calculations result in NaN if count<=1 + if count <= 1: + agg_value = np.float64(np.NaN) + + elif es_agg[1] == "std_deviation": + agg_value *= count / (count - 1.0) + + else: # es_agg[1] == "variance" # sample_std=\sqrt{\frac{1}{N-1}\sum_{i=1}^N(x_i-\bar{x})^2} # population_std=\sqrt{\frac{1}{N}\sum_{i=1}^N(x_i-\bar{x})^2} # sample_std=\sqrt{\frac{N}{N-1}population_std} - if count <= 1: - results[field] = np.float64(np.NaN) - else: - results[field] = np.sqrt( - (count / (count - 1.0)) - * results[field] - * results[field] - ) - else: - results[field] = response["aggregations"][ - func[0] + "_" + field - ][func[1]] + agg_value = np.sqrt( + (count / (count - 1.0)) * agg_value * agg_value + ) + else: + agg_value = response["aggregations"][ + f"{es_agg}_{field.es_field_name}" + ] + if "value_as_string" in agg_value and field.is_timestamp: + agg_value = elasticsearch_date_to_pandas_date( + agg_value["value_as_string"], field.es_date_format + ) else: - results[field] = response["aggregations"][field]["value"] + agg_value = agg_value["value"] - # Return single value if this is a series - # if len(numeric_source_fields) == 1: - # return np.float64(results[numeric_source_fields[0]]) - s = build_series(results) + # These aggregations maintain the column datatype + if pd_agg in ("max", "min"): + agg_value = field.np_dtype.type(agg_value) - return s + # Null usually means there were no results. + if agg_value is None: + agg_value = np.float64(np.NaN) + + values.append(agg_value) + + results[field.index] = values if len(values) > 1 else values[0] + + return results def _terms_aggs(self, query_compiler, func, es_size=None): """ @@ -391,9 +324,7 @@ class Operations: except IndexError: name = None - s = build_series(results, name=name) - - return s + return build_pd_series(results, name=name) def _hist_aggs(self, query_compiler, num_bins): # Get histogram bins and weights for numeric field_names @@ -409,8 +340,12 @@ class Operations: body = Query(query_params["query"]) - min_aggs = self._metric_aggs(query_compiler, "min", numeric_only=True) - max_aggs = self._metric_aggs(query_compiler, "max", numeric_only=True) + results = self._metric_aggs(query_compiler, ["min", "max"], numeric_only=True) + min_aggs = {} + max_aggs = {} + for field, (min_agg, max_agg) in results.items(): + min_aggs[field] = min_agg + max_aggs[field] = max_agg for field in numeric_source_fields: body.hist_aggs(field, field, min_aggs, max_aggs, num_bins) @@ -476,7 +411,6 @@ class Operations: df_bins = pd.DataFrame(data=bins) df_weights = pd.DataFrame(data=weights) - return df_bins, df_weights @staticmethod @@ -511,20 +445,42 @@ class Operations: var nunique """ - ed_aggs = [] + # pd aggs that will be mapped to es aggs + # that can use 'extended_stats'. + extended_stats_pd_aggs = {"mean", "min", "max", "count", "sum", "var", "std"} + extended_stats_es_aggs = {"avg", "min", "max", "count", "sum"} + extended_stats_calls = 0 + + es_aggs = [] for pd_agg in pd_aggs: + if pd_agg in extended_stats_pd_aggs: + extended_stats_calls += 1 + + # Aggs that are 'extended_stats' compatible if pd_agg == "count": - ed_aggs.append("count") - elif pd_agg == "mad": - ed_aggs.append("median_absolute_deviation") + es_aggs.append("count") elif pd_agg == "max": - ed_aggs.append("max") - elif pd_agg == "mean": - ed_aggs.append("avg") - elif pd_agg == "median": - ed_aggs.append(("percentiles", "50.0")) + es_aggs.append("max") elif pd_agg == "min": - ed_aggs.append("min") + es_aggs.append("min") + elif pd_agg == "mean": + es_aggs.append("avg") + elif pd_agg == "sum": + es_aggs.append("sum") + elif pd_agg == "std": + es_aggs.append(("extended_stats", "std_deviation")) + elif pd_agg == "var": + es_aggs.append(("extended_stats", "variance")) + + # Aggs that aren't 'extended_stats' compatible + elif pd_agg == "nunique": + es_aggs.append("cardinality") + elif pd_agg == "mad": + es_aggs.append("median_absolute_deviation") + elif pd_agg == "median": + es_aggs.append(("percentiles", "50.0")) + + # Not implemented elif pd_agg == "mode": # We could do this via top term raise NotImplementedError(pd_agg, " not currently implemented") @@ -537,77 +493,24 @@ class Operations: elif pd_agg == "sem": # TODO raise NotImplementedError(pd_agg, " not currently implemented") - elif pd_agg == "sum": - ed_aggs.append("sum") - elif pd_agg == "std": - ed_aggs.append(("extended_stats", "std_deviation")) - elif pd_agg == "var": - ed_aggs.append(("extended_stats", "variance")) else: raise NotImplementedError(pd_agg, " not currently implemented") - # TODO - we can optimise extended_stats here as if we have 'count' and 'std' extended_stats would - # return both in one call + # If two aggs compatible with 'extended_stats' is called we can + # piggy-back on that single aggregation. + if extended_stats_calls >= 2: + es_aggs = [ + ("extended_stats", es_agg) + if es_agg in extended_stats_es_aggs + else es_agg + for es_agg in es_aggs + ] - return ed_aggs + return es_aggs def aggs(self, query_compiler, pd_aggs): - query_params, post_processing = self._resolve_tasks(query_compiler) - - size = self._size(query_params, post_processing) - if size is not None: - raise NotImplementedError( - f"Can not count field matches if size is set {size}" - ) - - field_names = query_compiler.get_field_names(include_scripted_fields=False) - - body = Query(query_params["query"]) - # convert pandas aggs to ES equivalent - es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs) - - for field in field_names: - for es_agg in es_aggs: - # If we have multiple 'extended_stats' etc. here we simply NOOP on 2nd call - if isinstance(es_agg, tuple): - body.metric_aggs(es_agg[0] + "_" + field, es_agg[0], field) - else: - body.metric_aggs(es_agg + "_" + field, es_agg, field) - - response = query_compiler._client.search( - index=query_compiler._index_pattern, size=0, body=body.to_search_body() - ) - - """ - Results are like (for 'sum', 'min') - - AvgTicketPrice DistanceKilometers DistanceMiles FlightDelayMin - sum 8.204365e+06 9.261629e+07 5.754909e+07 618150 - min 1.000205e+02 0.000000e+00 0.000000e+00 0 - """ - results = {} - - for field in field_names: - values = list() - for es_agg in es_aggs: - if isinstance(es_agg, tuple): - agg_value = response["aggregations"][es_agg[0] + "_" + field] - - # Pull multiple values from 'percentiles' result. - if es_agg[0] == "percentiles": - agg_value = agg_value["values"] - - values.append(agg_value[es_agg[1]]) - else: - values.append( - response["aggregations"][es_agg + "_" + field]["value"] - ) - - results[field] = values - - df = pd.DataFrame(data=results, index=pd_aggs) - - return df + results = self._metric_aggs(query_compiler, pd_aggs, numeric_only=False) + return pd.DataFrame(results, index=pd_aggs) def describe(self, query_compiler): query_params, post_processing = self._resolve_tasks(query_compiler) diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 919d914..8283eb1 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -66,13 +66,13 @@ class QueryCompiler: self._index_pattern = to_copy._index_pattern self._index = Index(self, to_copy._index.index_field) self._operations = copy.deepcopy(to_copy._operations) - self._mappings = copy.deepcopy(to_copy._mappings) + self._mappings: FieldMappings = copy.deepcopy(to_copy._mappings) else: self._client = ensure_es_client(client) self._index_pattern = index_pattern # Get and persist mappings, this allows us to correctly # map returned types from Elasticsearch to pandas datatypes - self._mappings = FieldMappings( + self._mappings: FieldMappings = FieldMappings( client=self._client, index_pattern=self._index_pattern, display_names=display_names, @@ -464,6 +464,9 @@ class QueryCompiler: def std(self, numeric_only=None): return self._operations.std(self, numeric_only=numeric_only) + def mad(self, numeric_only=None): + return self._operations.mad(self, numeric_only=numeric_only) + def median(self, numeric_only=None): return self._operations.median(self, numeric_only=numeric_only) diff --git a/eland/series.py b/eland/series.py index 6e7594a..a809afb 100644 --- a/eland/series.py +++ b/eland/series.py @@ -1105,7 +1105,7 @@ class Series(NDFrame): Examples -------- - >>> s = ed.Series('localhost', 'flights', name='AvgTicketPrice') + >>> s = ed.DataFrame('localhost', 'flights')['AvgTicketPrice'] >>> int(s.max()) 1199 """ @@ -1129,7 +1129,7 @@ class Series(NDFrame): Examples -------- - >>> s = ed.Series('localhost', 'flights', name='AvgTicketPrice') + >>> s = ed.DataFrame('localhost', 'flights')['AvgTicketPrice'] >>> int(s.mean()) 628 """ @@ -1153,7 +1153,7 @@ class Series(NDFrame): Examples -------- - >>> s = ed.Series('localhost', 'flights', name='AvgTicketPrice') + >>> s = ed.DataFrame('localhost', 'flights')['AvgTicketPrice'] >>> int(s.min()) 100 """ @@ -1177,7 +1177,7 @@ class Series(NDFrame): Examples -------- - >>> s = ed.Series('localhost', 'flights', name='AvgTicketPrice') + >>> s = ed.DataFrame('localhost', 'flights')['AvgTicketPrice'] >>> int(s.sum()) 8204364 """ @@ -1186,26 +1186,92 @@ class Series(NDFrame): def nunique(self): """ - Return the sum of the Series values + Return the number of unique values in a Series Returns ------- - float - max value + int + Number of unique values See Also -------- - :pandas_api_docs:`pandas.Series.sum` + :pandas_api_docs:`pandas.Series.nunique` Examples -------- - >>> s = ed.Series('localhost', 'flights', name='Carrier') + >>> s = ed.DataFrame('localhost', 'flights')['Carrier'] >>> s.nunique() 4 """ results = super().nunique() return results.squeeze() + def var(self, numeric_only=None): + """ + Return variance for a Series + + Returns + ------- + float + var value + + See Also + -------- + :pandas_api_docs:`pandas.Series.var` + + Examples + -------- + >>> s = ed.DataFrame('localhost', 'flights')['AvgTicketPrice'] + >>> int(s.var()) + 70964 + """ + results = super().var(numeric_only=numeric_only) + return results.squeeze() + + def std(self, numeric_only=None): + """ + Return standard deviation for a Series + + Returns + ------- + float + std value + + See Also + -------- + :pandas_api_docs:`pandas.Series.var` + + Examples + -------- + >>> s = ed.DataFrame('localhost', 'flights')['AvgTicketPrice'] + >>> int(s.std()) + 266 + """ + results = super().std(numeric_only=numeric_only) + return results.squeeze() + + def mad(self, numeric_only=None): + """ + Return median absolute deviation for a Series + + Returns + ------- + float + mad value + + See Also + -------- + :pandas_api_docs:`pandas.Series.mad` + + Examples + -------- + >>> s = ed.DataFrame('localhost', 'flights')['AvgTicketPrice'] + >>> int(s.mad()) + 213 + """ + results = super().mad(numeric_only=numeric_only) + return results.squeeze() + # def values TODO - not implemented as causes current implementation of query to fail def to_numpy(self): diff --git a/eland/tests/dataframe/test_dtypes_pytest.py b/eland/tests/dataframe/test_dtypes_pytest.py index 2dd5993..80cbb95 100644 --- a/eland/tests/dataframe/test_dtypes_pytest.py +++ b/eland/tests/dataframe/test_dtypes_pytest.py @@ -4,11 +4,9 @@ # File called _pytest for PyCharm compatability -import warnings import numpy as np from pandas.testing import assert_series_equal -from eland.operations import build_series, EMPTY_SERIES_DTYPE from eland.tests.common import TestData from eland.tests.common import assert_pandas_eland_frame_equal @@ -34,9 +32,3 @@ class TestDataFrameDtypes(TestData): pd_flights.select_dtypes(include=np.number), ed_flights.select_dtypes(include=np.number), ) - - def test_emtpy_series_dtypes(self): - with warnings.catch_warnings(record=True) as w: - s = build_series({}) - assert s.dtype == EMPTY_SERIES_DTYPE - assert w == [] diff --git a/eland/tests/dataframe/test_metrics_pytest.py b/eland/tests/dataframe/test_metrics_pytest.py index f2cd11d..c321bc3 100644 --- a/eland/tests/dataframe/test_metrics_pytest.py +++ b/eland/tests/dataframe/test_metrics_pytest.py @@ -11,7 +11,7 @@ from eland.tests.common import TestData class TestDataFrameMetrics(TestData): funcs = ["max", "min", "mean", "sum"] - extended_funcs = ["var", "std", "median"] + extended_funcs = ["median", "mad", "var", "std"] def test_flights_metrics(self): pd_flights = self.pd_flights() @@ -29,40 +29,48 @@ class TestDataFrameMetrics(TestData): # Test on reduced set of data for more consistent # median behaviour + better var, std test for sample vs population - pd_flights = pd_flights[pd_flights.DestAirportID == "AMS"] - ed_flights = ed_flights[ed_flights.DestAirportID == "AMS"] + pd_flights = pd_flights[["AvgTicketPrice"]] + ed_flights = ed_flights[["AvgTicketPrice"]] + + import logging + + logger = logging.getLogger("elasticsearch") + logger.addHandler(logging.StreamHandler()) + logger.setLevel(logging.DEBUG) for func in self.extended_funcs: - pd_metric = getattr(pd_flights, func)(numeric_only=True) + pd_metric = getattr(pd_flights, func)( + **({"numeric_only": True} if func != "mad" else {}) + ) ed_metric = getattr(ed_flights, func)(numeric_only=True) - assert_series_equal( - pd_metric, ed_metric, check_exact=False, check_less_precise=True - ) + pd_value = pd_metric["AvgTicketPrice"] + ed_value = ed_metric["AvgTicketPrice"] + assert (ed_value * 0.9) <= pd_value <= (ed_value * 1.1) # +/-10% def test_flights_extended_metrics_nan(self): pd_flights = self.pd_flights() ed_flights = self.ed_flights() # Test on single row to test NaN behaviour of sample std/variance - pd_flights_1 = pd_flights[pd_flights.FlightNum == "9HY9SWR"] - ed_flights_1 = ed_flights[ed_flights.FlightNum == "9HY9SWR"] + pd_flights_1 = pd_flights[pd_flights.FlightNum == "9HY9SWR"][["AvgTicketPrice"]] + ed_flights_1 = ed_flights[ed_flights.FlightNum == "9HY9SWR"][["AvgTicketPrice"]] for func in self.extended_funcs: - pd_metric = getattr(pd_flights_1, func)(numeric_only=True) - ed_metric = getattr(ed_flights_1, func)(numeric_only=True) + pd_metric = getattr(pd_flights_1, func)() + ed_metric = getattr(ed_flights_1, func)() assert_series_equal( pd_metric, ed_metric, check_exact=False, check_less_precise=True ) # Test on zero rows to test NaN behaviour of sample std/variance - pd_flights_0 = pd_flights[pd_flights.FlightNum == "XXX"] - ed_flights_0 = ed_flights[ed_flights.FlightNum == "XXX"] + pd_flights_0 = pd_flights[pd_flights.FlightNum == "XXX"][["AvgTicketPrice"]] + ed_flights_0 = ed_flights[ed_flights.FlightNum == "XXX"][["AvgTicketPrice"]] for func in self.extended_funcs: - pd_metric = getattr(pd_flights_0, func)(numeric_only=True) - ed_metric = getattr(ed_flights_0, func)(numeric_only=True) + pd_metric = getattr(pd_flights_0, func)() + ed_metric = getattr(ed_flights_0, func)() assert_series_equal( pd_metric, ed_metric, check_exact=False, check_less_precise=True diff --git a/eland/tests/operations/test_map_pd_aggs_to_es_aggs_pytest.py b/eland/tests/operations/test_map_pd_aggs_to_es_aggs_pytest.py new file mode 100644 index 0000000..59c6f23 --- /dev/null +++ b/eland/tests/operations/test_map_pd_aggs_to_es_aggs_pytest.py @@ -0,0 +1,39 @@ +# Licensed to Elasticsearch B.V under one or more agreements. +# Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +# See the LICENSE file in the project root for more information + +from eland.operations import Operations + + +def test_all_aggs(): + es_aggs = Operations._map_pd_aggs_to_es_aggs( + ["min", "max", "mean", "std", "var", "mad", "count", "nunique", "median"] + ) + + assert es_aggs == [ + ("extended_stats", "min"), + ("extended_stats", "max"), + ("extended_stats", "avg"), + ("extended_stats", "std_deviation"), + ("extended_stats", "variance"), + "median_absolute_deviation", + ("extended_stats", "count"), + "cardinality", + ("percentiles", "50.0"), + ] + + +def test_extended_stats_optimization(): + # Tests that when '' and an 'extended_stats' agg are used together + # that ('extended_stats', '') is used instead of ''. + es_aggs = Operations._map_pd_aggs_to_es_aggs(["count", "nunique"]) + assert es_aggs == ["count", "cardinality"] + + for pd_agg in ["var", "std"]: + extended_es_agg = Operations._map_pd_aggs_to_es_aggs([pd_agg])[0] + + es_aggs = Operations._map_pd_aggs_to_es_aggs([pd_agg, "nunique"]) + assert es_aggs == [extended_es_agg, "cardinality"] + + es_aggs = Operations._map_pd_aggs_to_es_aggs(["count", pd_agg, "nunique"]) + assert es_aggs == [("extended_stats", "count"), extended_es_agg, "cardinality"] diff --git a/eland/tests/series/test_dtype_pytest.py b/eland/tests/series/test_dtype_pytest.py new file mode 100644 index 0000000..77a13f6 --- /dev/null +++ b/eland/tests/series/test_dtype_pytest.py @@ -0,0 +1,22 @@ +# Licensed to Elasticsearch B.V under one or more agreements. +# Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +# See the LICENSE file in the project root for more information + +import numpy as np +import warnings +from eland.common import build_pd_series, EMPTY_SERIES_DTYPE + + +def test_empty_series_dtypes(): + with warnings.catch_warnings(record=True) as w: + s = build_pd_series({}) + assert s.dtype == EMPTY_SERIES_DTYPE + assert w == [] + + # Ensure that a passed-in dtype isn't ignore + # even if the result is empty. + with warnings.catch_warnings(record=True) as w: + s = build_pd_series({}, dtype=np.int32) + assert np.int32 != EMPTY_SERIES_DTYPE + assert s.dtype == np.int32 + assert w == [] diff --git a/eland/tests/series/test_metrics_pytest.py b/eland/tests/series/test_metrics_pytest.py index f8d8b45..955c73c 100644 --- a/eland/tests/series/test_metrics_pytest.py +++ b/eland/tests/series/test_metrics_pytest.py @@ -10,17 +10,24 @@ from eland.tests.common import TestData class TestSeriesMetrics(TestData): - funcs = ["max", "min", "mean", "sum"] - timestamp_funcs = ["max", "min", "mean"] + all_funcs = ["max", "min", "mean", "sum", "nunique", "var", "std", "mad"] + timestamp_funcs = ["max", "min", "mean", "nunique"] + + def assert_almost_equal_for_agg(self, func, pd_metric, ed_metric): + if func in ("nunique", "var", "mad"): + np.testing.assert_almost_equal(pd_metric, ed_metric, decimal=-3) + else: + np.testing.assert_almost_equal(pd_metric, ed_metric, decimal=2) def test_flights_metrics(self): pd_flights = self.pd_flights()["AvgTicketPrice"] ed_flights = self.ed_flights()["AvgTicketPrice"] - for func in self.funcs: + for func in self.all_funcs: pd_metric = getattr(pd_flights, func)() ed_metric = getattr(ed_flights, func)() - np.testing.assert_almost_equal(pd_metric, ed_metric, decimal=2) + + self.assert_almost_equal_for_agg(func, pd_metric, ed_metric) def test_flights_timestamp(self): pd_flights = self.pd_flights()["timestamp"] @@ -29,18 +36,28 @@ class TestSeriesMetrics(TestData): for func in self.timestamp_funcs: pd_metric = getattr(pd_flights, func)() ed_metric = getattr(ed_flights, func)() - pd_metric = pd_metric.floor("S") # floor or pandas mean with have ns - assert pd_metric == ed_metric + + if hasattr(pd_metric, "floor"): + pd_metric = pd_metric.floor("S") # floor or pandas mean with have ns + + if func == "nunique": + self.assert_almost_equal_for_agg(func, pd_metric, ed_metric) + else: + assert pd_metric == ed_metric def test_ecommerce_selected_non_numeric_source_fields(self): - # None of these are numeric + # None of these are numeric, will result in NaNs column = "category" ed_ecommerce = self.ed_ecommerce()[column] - for func in self.funcs: + for func in self.all_funcs: + if func == "nunique": # nunique never returns 'NaN' + continue + ed_metric = getattr(ed_ecommerce, func)() - assert ed_metric.empty + print(func, ed_metric) + assert np.isnan(ed_metric) def test_ecommerce_selected_all_numeric_source_fields(self): # All of these are numeric @@ -50,9 +67,7 @@ class TestSeriesMetrics(TestData): pd_ecommerce = self.pd_ecommerce()[column] ed_ecommerce = self.ed_ecommerce()[column] - for func in self.funcs: - np.testing.assert_almost_equal( - getattr(pd_ecommerce, func)(), - getattr(ed_ecommerce, func)(), - decimal=2, - ) + for func in self.all_funcs: + pd_metric = getattr(pd_ecommerce, func)() + ed_metric = getattr(ed_ecommerce, func)() + self.assert_almost_equal_for_agg(func, pd_metric, ed_metric) diff --git a/requirements-dev.txt b/requirements-dev.txt index e3b1ce7..0d246ab 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,4 +1,4 @@ -elasticsearch>=7.6.0 +elasticsearch==7.7.0a2 pandas>=1 matplotlib pytest>=5.2.1 diff --git a/requirements.txt b/requirements.txt index f0da4b7..708e6aa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -elasticsearch>=7.6.0 +elasticsearch==7.7.0a2 pandas>=1 matplotlib diff --git a/setup.py b/setup.py index ebe1c85..85d364a 100644 --- a/setup.py +++ b/setup.py @@ -175,6 +175,6 @@ setup( classifiers=CLASSIFIERS, keywords="elastic eland pandas python", packages=find_packages(include=["eland", "eland.*"]), - install_requires=["elasticsearch>=7.6, <8", "pandas>=1", "matplotlib"], + install_requires=["elasticsearch==7.7.0a2", "pandas>=1", "matplotlib", "numpy"], python_requires=">=3.6", )