Add quantile() to DataFrame and Series

This commit is contained in:
P. Sai Vinay 2021-06-08 23:32:44 +05:30 committed by GitHub
parent aa9d60e7e7
commit e9c0b897f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 391 additions and 38 deletions

View File

@ -0,0 +1,6 @@
eland.DataFrame.quantile
========================
.. currentmodule:: eland
.. automethod:: DataFrame.quantile

View File

@ -0,0 +1,6 @@
eland.Series.quantile
=====================
.. currentmodule:: eland
.. automethod:: Series.quantile

View File

@ -99,6 +99,7 @@ Computations / Descriptive Stats
DataFrame.sum
DataFrame.nunique
DataFrame.mode
DataFrame.quantile
Reindexing / Selection / Label Manipulation
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@ -80,6 +80,7 @@ Computations / Descriptive Stats
Series.nunique
Series.value_counts
Series.mode
Series.quantile
Reindexing / Selection / Label Manipulation
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@ -1686,6 +1686,58 @@ class DataFrame(NDFrame):
numeric_only=numeric_only, dropna=True, is_dataframe=True, es_size=es_size
)
def quantile(
self,
q: Union[int, float, List[int], List[float]] = 0.5,
numeric_only: Optional[bool] = True,
) -> "pd.DataFrame":
"""
Used to calculate quantile for a given DataFrame.
Parameters
----------
q:
float or array like, default 0.5
Value between 0 <= q <= 1, the quantile(s) to compute.
numeric_only: {True, False, None} Default is True
Which datatype to be returned
- True: Returns all values as float64, NaN/NaT values are removed
- None: Returns all values as the same dtype where possible, NaN/NaT are removed
- False: Returns all values as the same dtype where possible, NaN/NaT are preserved
Returns
-------
pandas.DataFrame
quantile value for each column
See Also
--------
:pandas_api_docs:`pandas.DataFrame.quantile`
Examples
--------
>>> ed_df = ed.DataFrame('localhost', 'flights')
>>> ed_flights = ed_df.filter(["AvgTicketPrice", "FlightDelayMin", "dayOfWeek", "timestamp"])
>>> ed_flights.quantile() # doctest: +SKIP
AvgTicketPrice 640.387285
FlightDelayMin 0.000000
dayOfWeek 3.000000
Name: 0.5, dtype: float64
>>> ed_flights.quantile([.2, .5, .75]) # doctest: +SKIP
AvgTicketPrice FlightDelayMin dayOfWeek
0.20 361.040768 0.0 1.0
0.50 640.387285 0.0 3.0
0.75 842.213490 15.0 4.0
>>> ed_flights.quantile([.2, .5, .75], numeric_only=False) # doctest: +SKIP
AvgTicketPrice FlightDelayMin dayOfWeek timestamp
0.20 361.040768 0.0 1.0 2018-01-09 04:43:55.296587520
0.50 640.387285 0.0 3.0 2018-01-21 23:51:57.637076736
0.75 842.213490 15.0 4.0 2018-02-01 04:46:16.658119680
"""
return self._query_compiler.quantile(quantiles=q, numeric_only=numeric_only)
def query(self, expr) -> "DataFrame":
"""
Query the columns of a DataFrame with a boolean expression.

View File

@ -243,6 +243,7 @@ class Operations:
is_dataframe_agg: bool = False,
es_mode_size: Optional[int] = None,
dropna: bool = True,
percentiles: Optional[List[float]] = None,
) -> Dict[str, Any]:
"""
Used to calculate metric aggregations
@ -262,6 +263,8 @@ class Operations:
number of rows to return when multiple mode values are present.
dropna:
drop NaN/NaT for a dataframe
percentiles:
List of percentiles when 'quantile' agg is called. Otherwise it is None
Returns
-------
@ -283,7 +286,7 @@ class Operations:
body = Query(query_params.query)
# Convert pandas aggs to ES equivalent
es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs)
es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs, percentiles)
for field in fields:
for es_agg in es_aggs:
@ -293,25 +296,33 @@ class Operations:
# If we have multiple 'extended_stats' etc. here we simply NOOP on 2nd call
if isinstance(es_agg, tuple):
body.metric_aggs(
f"{es_agg[0]}_{field.es_field_name}",
es_agg[0],
field.aggregatable_es_field_name,
)
if es_agg[0] == "percentiles":
body.percentile_agg(
name=f"{es_agg[0]}_{field.es_field_name}",
field=field.es_field_name,
percents=es_agg[1],
)
else:
body.metric_aggs(
name=f"{es_agg[0]}_{field.es_field_name}",
func=es_agg[0],
field=field.aggregatable_es_field_name,
)
elif es_agg == "mode":
# TODO for dropna=False, Check If field is timestamp or boolean or numeric,
# then use missing parameter for terms aggregation.
body.terms_aggs(
f"{es_agg}_{field.es_field_name}",
"terms",
field.aggregatable_es_field_name,
es_mode_size,
name=f"{es_agg}_{field.es_field_name}",
func="terms",
field=field.aggregatable_es_field_name,
es_size=es_mode_size,
)
else:
body.metric_aggs(
f"{es_agg}_{field.es_field_name}",
es_agg,
field.aggregatable_es_field_name,
name=f"{es_agg}_{field.es_field_name}",
func=es_agg,
field=field.aggregatable_es_field_name,
)
response = query_compiler._client.search(
@ -333,6 +344,7 @@ class Operations:
response=response,
numeric_only=numeric_only,
is_dataframe_agg=is_dataframe_agg,
percentiles=percentiles,
)
def _terms_aggs(
@ -479,8 +491,9 @@ class Operations:
pd_aggs: List[str],
response: Dict[str, Any],
numeric_only: Optional[bool],
percentiles: Optional[List[float]] = None,
is_dataframe_agg: bool = False,
):
) -> Dict[str, List[Any]]:
"""
This method unpacks metric aggregations JSON response.
This can be called either directly on an aggs query
@ -495,15 +508,22 @@ class Operations:
pd_aggs:
a list of aggs
response:
a dict containing response from Elastic Search
a dict containing response from Elasticsearch
numeric_only:
return either numeric values or NaN/NaT
is_dataframe_agg:
- True then aggregation is called from dataframe
- False then aggregation is called from series
percentiles:
List of percentiles when 'quantile' agg is called. Otherwise it is None
Returns
-------
a dictionary on which agg caluculations are done.
"""
results: Dict[str, Any] = {}
percentile_values: List[float] = []
agg_value: Union[int, float]
for field in fields:
values = []
@ -529,10 +549,19 @@ class Operations:
# Pull multiple values from 'percentiles' result.
if es_agg[0] == "percentiles":
agg_value = agg_value["values"]
agg_value = agg_value[es_agg[1]]
agg_value = agg_value["values"] # Returns dictionary
if pd_agg == "median":
agg_value = agg_value["50.0"]
# Currently Pandas does the same
# If we call quantile it returns the same result as of median.
elif pd_agg == "quantile" and is_dataframe_agg:
agg_value = agg_value["50.0"]
else:
# Maintain order of percentiles
percentile_values = [agg_value[str(i)] for i in percentiles]
if not percentile_values and pd_agg not in ("quantile", "median"):
agg_value = agg_value[es_agg[1]]
# Need to convert 'Population' stddev and variance
# from Elasticsearch into 'Sample' stddev and variance
# which is what pandas uses.
@ -590,7 +619,7 @@ class Operations:
]
# Null usually means there were no results.
if not isinstance(agg_value, list) and (
if not isinstance(agg_value, (list, dict)) and (
agg_value is None or np.isnan(agg_value)
):
if is_dataframe_agg and not numeric_only:
@ -612,12 +641,19 @@ class Operations:
)
for value in agg_value
]
elif percentile_values:
percentile_values = [
elasticsearch_date_to_pandas_date(
value, field.es_date_format
)
for value in percentile_values
]
else:
agg_value = elasticsearch_date_to_pandas_date(
agg_value, field.es_date_format
)
# If numeric_only is False | None then maintain column datatype
elif not numeric_only:
elif not numeric_only and pd_agg != "quantile":
# we're only converting to bool for lossless aggs like min, max, and median.
if pd_agg in {"max", "min", "median", "sum", "mode"}:
# 'sum' isn't representable with bool, use int64
@ -626,14 +662,68 @@ class Operations:
else:
agg_value = field.np_dtype.type(agg_value)
values.append(agg_value)
if not percentile_values:
values.append(agg_value)
# If numeric_only is True and We only have a NaN type field then we check for empty.
if values:
results[field.column] = values if len(values) > 1 else values[0]
# This only runs when df.quantile() or series.quantile() is called
if percentile_values and not is_dataframe_agg:
results[f"{field.column}"] = percentile_values
return results
def quantile(
self,
query_compiler: "QueryCompiler",
pd_aggs: List[str],
quantiles: Union[int, float, List[int], List[float]],
is_dataframe: bool = True,
numeric_only: Optional[bool] = True,
) -> Union[pd.DataFrame, pd.Series]:
# To verify if quantile range falls between 0 to 1
def quantile_to_percentile(quantile: Any) -> float:
if isinstance(quantile, (int, float)):
quantile = float(quantile)
if quantile > 1 or quantile < 0:
raise ValueError(
f"quantile should be in range of 0 and 1, given {quantile}"
)
else:
raise TypeError("quantile should be of type int or float")
# quantile * 100 = percentile
# return float(...) because min(1.0) gives 1
return float(min(100, max(0, quantile * 100)))
percentiles = [
quantile_to_percentile(x)
for x in (
(quantiles,) if not isinstance(quantiles, (list, tuple)) else quantiles
)
]
result = self._metric_aggs(
query_compiler,
pd_aggs=pd_aggs,
percentiles=percentiles,
is_dataframe_agg=False,
numeric_only=numeric_only,
)
df = pd.DataFrame(
result,
index=[i / 100 for i in percentiles],
columns=result.keys(),
dtype=(np.float64 if numeric_only else None),
)
# Display Output same as pandas does
if isinstance(quantiles, float):
return df.squeeze()
else:
return df if is_dataframe else df.transpose().iloc[0]
def aggs_groupby(
self,
query_compiler: "QueryCompiler",
@ -821,10 +911,13 @@ class Operations:
return composite_buckets["buckets"]
@staticmethod
def _map_pd_aggs_to_es_aggs(pd_aggs):
def _map_pd_aggs_to_es_aggs(
pd_aggs: List[str], percentiles: Optional[List[float]] = None
) -> Union[List[str], List[Tuple[str, List[float]]]]:
"""
Args:
pd_aggs - list of pandas aggs (e.g. ['mad', 'min', 'std'] etc.)
percentiles - list of percentiles for 'quantile' agg
Returns:
ed_aggs - list of corresponding es_aggs (e.g. ['median_absolute_deviation', 'min', 'std'] etc.)
@ -885,7 +978,14 @@ class Operations:
elif pd_agg == "mad":
es_aggs.append("median_absolute_deviation")
elif pd_agg == "median":
es_aggs.append(("percentiles", "50.0"))
es_aggs.append(("percentiles", (50.0,)))
elif pd_agg == "quantile":
# None when 'quantile' is called in df.agg[...]
# Behaves same as median because pandas does the same.
if percentiles is not None:
es_aggs.append(("percentiles", tuple(percentiles)))
else:
es_aggs.append(("percentiles", (50.0,)))
elif pd_agg == "mode":
if len(pd_aggs) != 1:
@ -896,9 +996,6 @@ class Operations:
es_aggs.append("mode")
# Not implemented
elif pd_agg == "quantile":
# TODO
raise NotImplementedError(pd_agg, " not currently implemented")
elif pd_agg == "rank":
# TODO
raise NotImplementedError(pd_agg, " not currently implemented")

View File

@ -145,6 +145,24 @@ class Query:
agg = {func: {"field": field}}
self._aggs[name] = agg
def percentile_agg(self, name: str, field: str, percents: List[float]) -> None:
"""
Ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-percentile-aggregation.html
"aggs": {
"percentile_": {
"percentiles": {
"field": "AvgTicketPrice",
"percents": [95, 99, 99.0]
}
}
}
"""
agg = {"percentiles": {"field": field, "percents": percents}}
self._aggs[name] = agg
def composite_agg_bucket_terms(self, name: str, field: str) -> None:
"""
Add terms agg for composite aggregation
@ -242,7 +260,7 @@ class Query:
"""
Add's after_key to existing query to fetch next bunch of results
PARAMETERS
Parameters
----------
name: str
Name of the buckets

View File

@ -637,6 +637,35 @@ class QueryCompiler:
es_size=es_size,
)
def quantile(
self,
quantiles: Union[int, float, List[int], List[float]],
numeric_only: Optional[bool] = True,
is_dataframe: bool = True,
) -> Union[pd.DataFrame, pd.Series, Any]:
"""
Holds quantile object for both DataFrame and Series
Parameters
----------
quantiles:
list of quantiles for computation
numeric_only:
Flag used to filter numeric columns
is_dataframe:
To identify if quantile is called from Series or DataFrame
True: Called from DataFrame
False: Called from Series
"""
return self._operations.quantile(
self,
pd_aggs=["quantile"],
quantiles=quantiles,
numeric_only=numeric_only,
is_dataframe=is_dataframe,
)
def aggs_groupby(
self,
by: List[str],

View File

@ -35,7 +35,7 @@ import sys
import warnings
from collections.abc import Collection
from io import StringIO
from typing import TYPE_CHECKING, Any, Optional, Sequence, Tuple, Union
from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple, Union
import numpy as np
import pandas as pd
@ -565,6 +565,45 @@ class Series(NDFrame):
notnull = notna
def quantile(
self, q: Union[int, float, List[int], List[float]] = 0.5
) -> Union[pd.Series, Any]:
"""
Used to calculate quantile for a given Series.
Parameters
----------
q:
float or array like, default 0.5
Value between 0 <= q <= 1, the quantile(s) to compute.
Returns
-------
pandas.Series or any single dtype
See Also
--------
:pandas_api_docs:`pandas.Series.quantile`
Examples
--------
>>> ed_flights = ed.DataFrame('localhost', 'flights')
>>> ed_flights["timestamp"].quantile([.2,.5,.75]) # doctest: +SKIP
0.20 2018-01-09 04:30:57.289159912
0.50 2018-01-21 23:39:27.031627441
0.75 2018-02-01 04:54:59.256136963
Name: timestamp, dtype: datetime64[ns]
>>> ed_flights["dayOfWeek"].quantile() # doctest: +SKIP
3.0
>>> ed_flights["timestamp"].quantile() # doctest: +SKIP
Timestamp('2018-01-22 00:12:48.844534180')
"""
return self._query_compiler.quantile(
quantiles=q, numeric_only=None, is_dataframe=False
)
@property
def ndim(self) -> int:
"""

View File

@ -217,7 +217,8 @@ class TestDataFrameMetrics(TestData):
assert ed_metric.dtype == np.dtype("datetime64[ns]")
assert_almost_equal(ed_metric[0], expected_values[agg])
def test_flights_datetime_metrics_median(self):
@pytest.mark.parametrize("agg", ["median", "quantile"])
def test_flights_datetime_metrics_median_quantile(self, agg):
ed_df = self.ed_flights_small()[["timestamp"]]
median = ed_df.median(numeric_only=False)[0]
@ -228,11 +229,11 @@ class TestDataFrameMetrics(TestData):
<= pd.to_datetime("2018-01-01 12:00:00.000")
)
median = ed_df.agg(["mean"])["timestamp"][0]
assert isinstance(median, pd.Timestamp)
agg_value = ed_df.agg([agg])["timestamp"][0]
assert isinstance(agg_value, pd.Timestamp)
assert (
pd.to_datetime("2018-01-01 10:00:00.000")
<= median
<= agg_value
<= pd.to_datetime("2018-01-01 12:00:00.000")
)
@ -446,3 +447,54 @@ class TestDataFrameMetrics(TestData):
assert_frame_equal(
pd_mode, ed_mode, check_dtype=(False if es_size == 1 else True)
)
@pytest.mark.parametrize("quantiles", [[0.2, 0.5], [0, 1], [0.75, 0.2, 0.1, 0.5]])
@pytest.mark.parametrize("numeric_only", [False, None])
def test_flights_quantile(self, quantiles, numeric_only):
pd_flights = self.pd_flights()
ed_flights = self.ed_flights()
pd_quantile = pd_flights.filter(
["AvgTicketPrice", "FlightDelayMin", "dayOfWeek"]
).quantile(q=quantiles, numeric_only=numeric_only)
ed_quantile = ed_flights.filter(
["AvgTicketPrice", "FlightDelayMin", "dayOfWeek"]
).quantile(q=quantiles, numeric_only=numeric_only)
assert_frame_equal(pd_quantile, ed_quantile, check_exact=False, rtol=2)
pd_quantile = pd_flights[["timestamp"]].quantile(
q=quantiles, numeric_only=numeric_only
)
ed_quantile = ed_flights[["timestamp"]].quantile(
q=quantiles, numeric_only=numeric_only
)
pd_timestamp = pd.to_numeric(pd_quantile.squeeze(), downcast="float")
ed_timestamp = pd.to_numeric(ed_quantile.squeeze(), downcast="float")
assert_series_equal(pd_timestamp, ed_timestamp, check_exact=False, rtol=2)
@pytest.mark.parametrize("quantiles", [5, [2, 1], -1.5, [1.2, 0.2]])
def test_flights_quantile_error(self, quantiles):
ed_flights = self.ed_flights().filter(self.filter_data)
match = f"quantile should be in range of 0 and 1, given {quantiles[0] if isinstance(quantiles, list) else quantiles}"
with pytest.raises(ValueError, match=match):
ed_flights[["timestamp"]].quantile(q=quantiles)
@pytest.mark.parametrize("numeric_only", [True, False, None])
def test_flights_agg_quantile(self, numeric_only):
pd_flights = self.pd_flights().filter(
["AvgTicketPrice", "FlightDelayMin", "dayOfWeek"]
)
ed_flights = self.ed_flights().filter(
["AvgTicketPrice", "FlightDelayMin", "dayOfWeek"]
)
pd_quantile = pd_flights.agg(["quantile", "min"], numeric_only=numeric_only)
ed_quantile = ed_flights.agg(["quantile", "min"], numeric_only=numeric_only)
assert_frame_equal(
pd_quantile, ed_quantile, check_exact=False, rtol=4, check_dtype=False
)

View File

@ -20,7 +20,19 @@ from eland.operations import Operations
def test_all_aggs():
es_aggs = Operations._map_pd_aggs_to_es_aggs(
["min", "max", "mean", "std", "var", "mad", "count", "nunique", "median"]
[
"min",
"max",
"mean",
"std",
"var",
"mad",
"count",
"nunique",
"median",
"quantile",
],
percentiles=[0.2, 0.5, 0.8],
)
assert es_aggs == [
@ -32,7 +44,15 @@ def test_all_aggs():
"median_absolute_deviation",
"value_count",
"cardinality",
("percentiles", "50.0"),
("percentiles", (50.0,)),
(
"percentiles",
(
0.2,
0.5,
0.8,
),
),
]
@ -50,3 +70,9 @@ def test_extended_stats_optimization():
es_aggs = Operations._map_pd_aggs_to_es_aggs(["count", pd_agg, "nunique"])
assert es_aggs == ["value_count", extended_es_agg, "cardinality"]
def test_percentiles_none():
es_aggs = Operations._map_pd_aggs_to_es_aggs(["count", "min", "quantile"])
assert es_aggs == ["value_count", "min", ("percentiles", (50.0,))]

View File

@ -105,14 +105,15 @@ class TestSeriesMetrics(TestData):
assert_almost_equal(ed_metric, expected_values[agg])
def test_flights_datetime_median_metric(self):
@pytest.mark.parametrize("agg", ["median", "quantile"])
def test_flights_datetime_median_metric(self, agg):
ed_series = self.ed_flights_small()["timestamp"]
median = ed_series.median()
assert isinstance(median, pd.Timestamp)
agg_value = getattr(ed_series, agg)()
assert isinstance(agg_value, pd.Timestamp)
assert (
pd.to_datetime("2018-01-01 10:00:00.000")
<= median
<= agg_value
<= pd.to_datetime("2018-01-01 12:00:00.000")
)
@ -137,3 +138,28 @@ class TestSeriesMetrics(TestData):
ed_mode = ed_series["order_date"].mode(es_size)
assert_series_equal(pd_mode, ed_mode)
@pytest.mark.parametrize(
"quantile_list", [0.2, 0.5, [0.2, 0.5], [0.75, 0.2, 0.1, 0.5]]
)
@pytest.mark.parametrize(
"column", ["AvgTicketPrice", "FlightDelayMin", "dayOfWeek"]
)
def test_flights_quantile(self, column, quantile_list):
pd_flights = self.pd_flights()[column]
ed_flights = self.ed_flights()[column]
pd_quantile = pd_flights.quantile(quantile_list)
ed_quantile = ed_flights.quantile(quantile_list)
if isinstance(quantile_list, list):
assert_series_equal(pd_quantile, ed_quantile, check_exact=False, rtol=2)
else:
assert pd_quantile * 0.9 <= ed_quantile <= pd_quantile * 1.1
@pytest.mark.parametrize("quantiles_list", [[np.array([1, 2])], ["1", 2]])
def test_quantile_non_numeric_values(self, quantiles_list):
ed_flights = self.ed_flights()["dayOfWeek"]
match = "quantile should be of type int or float"
with pytest.raises(TypeError, match=match):
ed_flights.quantile(q=quantiles_list)