diff --git a/eland/common.py b/eland/common.py index 8bb8796..a9078f8 100644 --- a/eland/common.py +++ b/eland/common.py @@ -110,10 +110,11 @@ def elasticsearch_date_to_pandas_date( or epoch_millis. """ - if date_format is None: + if date_format is None or isinstance(value, (int, float)): try: - value = int(value) - return pd.to_datetime(value, unit="ms") + return pd.to_datetime( + value, unit="s" if date_format == "epoch_second" else "ms" + ) except ValueError: return pd.to_datetime(value) elif date_format == "epoch_millis": diff --git a/eland/field_mappings.py b/eland/field_mappings.py index aa58bac..18c6484 100644 --- a/eland/field_mappings.py +++ b/eland/field_mappings.py @@ -27,7 +27,16 @@ from pandas.core.dtypes.common import ( is_string_dtype, ) from pandas.core.dtypes.inference import is_list_like -from typing import NamedTuple, Optional, Mapping, Dict, Any, TYPE_CHECKING, List, Set +from typing import ( + NamedTuple, + Optional, + Mapping, + Dict, + Any, + TYPE_CHECKING, + List, + Set, +) if TYPE_CHECKING: from elasticsearch import Elasticsearch @@ -82,6 +91,9 @@ class Field(NamedTuple): return np.dtype(self.pd_dtype) def is_es_agg_compatible(self, es_agg) -> bool: + # Unpack the actual aggregation if this is 'extended_stats' + if isinstance(es_agg, tuple): + es_agg = es_agg[1] # Cardinality works for all types # Numerics and bools work for all aggs if es_agg == "cardinality" or self.is_numeric or self.is_bool: @@ -91,6 +103,13 @@ class Field(NamedTuple): return True return False + @property + def nan_value(self) -> Any: + """Returns NaN for any field except datetimes which use NaT""" + if self.is_timestamp: + return pd.NaT + return np.float64(np.NaN) + class FieldMappings: """ diff --git a/eland/operations.py b/eland/operations.py index 6a6b519..170e711 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -242,9 +242,9 @@ class Operations: values = [] for es_agg, pd_agg in zip(es_aggs, pd_aggs): - # If the field and agg aren't compatible we add a NaN + # If the field and agg aren't compatible we add a NaN/NaT if not field.is_es_agg_compatible(es_agg): - values.append(np.float64(np.NaN)) + values.append(field.nan_value) continue if isinstance(es_agg, tuple): @@ -284,21 +284,25 @@ class Operations: else: agg_value = response["aggregations"][ f"{es_agg}_{field.es_field_name}" - ] - if "value_as_string" in agg_value and field.is_timestamp: - agg_value = elasticsearch_date_to_pandas_date( - agg_value["value_as_string"], field.es_date_format - ) - else: - agg_value = agg_value["value"] - - # These aggregations maintain the column datatype - if pd_agg in ("max", "min"): - agg_value = field.np_dtype.type(agg_value) + ]["value"] # Null usually means there were no results. if agg_value is None: - agg_value = np.float64(np.NaN) + agg_value = field.nan_value + + # Cardinality is always either NaN or integer. + elif pd_agg == "nunique": + agg_value = int(agg_value) + + # If this is a non-null timestamp field convert to a pd.Timestamp() + elif field.is_timestamp: + agg_value = elasticsearch_date_to_pandas_date( + agg_value, field.es_date_format + ) + + # These aggregations maintain the column datatype + elif pd_agg in ("max", "min"): + agg_value = field.np_dtype.type(agg_value) values.append(agg_value) diff --git a/eland/tests/dataframe/test_metrics_pytest.py b/eland/tests/dataframe/test_metrics_pytest.py index dbf5049..702e9f4 100644 --- a/eland/tests/dataframe/test_metrics_pytest.py +++ b/eland/tests/dataframe/test_metrics_pytest.py @@ -17,6 +17,9 @@ # File called _pytest for PyCharm compatibility +import pytest +import numpy as np +import pandas as pd from pandas.testing import assert_series_equal from eland.tests.common import TestData @@ -26,13 +29,25 @@ class TestDataFrameMetrics(TestData): funcs = ["max", "min", "mean", "sum"] extended_funcs = ["median", "mad", "var", "std"] - def test_flights_metrics(self): + @pytest.mark.parametrize("numeric_only", [False, None]) + def test_flights_metrics(self, numeric_only): pd_flights = self.pd_flights() ed_flights = self.ed_flights() for func in self.funcs: - pd_metric = getattr(pd_flights, func)(numeric_only=True) - ed_metric = getattr(ed_flights, func)(numeric_only=True) + # Pandas v1.0 doesn't support mean() on datetime + # Pandas and Eland don't support sum() on datetime + if not numeric_only: + dtype_include = ( + [np.number, np.datetime64] + if func not in ("mean", "sum") + else [np.number] + ) + pd_flights = pd_flights.select_dtypes(include=dtype_include) + ed_flights = ed_flights.select_dtypes(include=dtype_include) + + pd_metric = getattr(pd_flights, func)(numeric_only=numeric_only) + ed_metric = getattr(ed_flights, func)(numeric_only=numeric_only) assert_series_equal(pd_metric, ed_metric) @@ -144,3 +159,49 @@ class TestDataFrameMetrics(TestData): getattr(ed_ecommerce, func)(numeric_only=True), check_less_precise=True, ) + + def test_flights_datetime_metrics_agg(self): + ed_timestamps = self.ed_flights()[["timestamp"]] + expected_values = { + "timestamp": { + "min": pd.Timestamp("2018-01-01 00:00:00"), + "mean": pd.Timestamp("2018-01-21 19:20:45.564438232"), + "max": pd.Timestamp("2018-02-11 23:50:12"), + "mad": pd.NaT, + "median": pd.NaT, + "std": pd.NaT, + "sum": pd.NaT, + "var": pd.NaT, + "nunique": 12236, + } + } + + ed_metrics = ed_timestamps.agg(self.funcs + self.extended_funcs + ["nunique"]) + assert ed_metrics.to_dict() == expected_values + + @pytest.mark.parametrize("agg", ["mean", "min", "max"]) + def test_flights_datetime_metrics_single_agg(self, agg): + ed_timestamps = self.ed_flights()[["timestamp"]] + expected_values = { + "min": pd.Timestamp("2018-01-01 00:00:00"), + "mean": pd.Timestamp("2018-01-21 19:20:45.564438232"), + "max": pd.Timestamp("2018-02-11 23:50:12"), + "nunique": 12236, + } + ed_metric = ed_timestamps.agg([agg]) + + assert ed_metric.dtypes["timestamp"] == np.dtype("datetime64[ns]") + assert ed_metric["timestamp"][0] == expected_values[agg] + + @pytest.mark.parametrize("agg", ["mean", "min", "max"]) + def test_flights_datetime_metrics_agg_func(self, agg): + ed_timestamps = self.ed_flights()[["timestamp"]] + expected_values = { + "min": pd.Timestamp("2018-01-01 00:00:00"), + "mean": pd.Timestamp("2018-01-21 19:20:45.564438232"), + "max": pd.Timestamp("2018-02-11 23:50:12"), + } + ed_metric = getattr(ed_timestamps, agg)(numeric_only=False) + + assert ed_metric.dtype == np.dtype("datetime64[ns]") + assert ed_metric[0] == expected_values[agg] diff --git a/eland/tests/series/test_metrics_pytest.py b/eland/tests/series/test_metrics_pytest.py index 20923fc..d24a796 100644 --- a/eland/tests/series/test_metrics_pytest.py +++ b/eland/tests/series/test_metrics_pytest.py @@ -17,7 +17,10 @@ # File called _pytest for PyCharm compatability +import pytest +import pandas as pd import numpy as np +from datetime import timedelta from eland.tests.common import TestData @@ -50,11 +53,12 @@ class TestSeriesMetrics(TestData): pd_metric = getattr(pd_flights, func)() ed_metric = getattr(ed_flights, func)() - if hasattr(pd_metric, "floor"): - pd_metric = pd_metric.floor("S") # floor or pandas mean with have ns - if func == "nunique": + print(pd_metric, ed_metric) self.assert_almost_equal_for_agg(func, pd_metric, ed_metric) + elif func == "mean": + offset = timedelta(seconds=0.001) + assert (ed_metric - offset) < pd_metric < (ed_metric + offset) else: assert pd_metric == ed_metric @@ -84,3 +88,15 @@ class TestSeriesMetrics(TestData): pd_metric = getattr(pd_ecommerce, func)() ed_metric = getattr(ed_ecommerce, func)() self.assert_almost_equal_for_agg(func, pd_metric, ed_metric) + + @pytest.mark.parametrize("agg", ["mean", "min", "max"]) + def test_flights_datetime_metrics_agg(self, agg): + ed_timestamps = self.ed_flights()["timestamp"] + expected_values = { + "min": pd.Timestamp("2018-01-01 00:00:00"), + "mean": pd.Timestamp("2018-01-21 19:20:45.564438232"), + "max": pd.Timestamp("2018-02-11 23:50:12"), + } + ed_metric = getattr(ed_timestamps, agg)() + + assert ed_metric == expected_values[agg]