mirror of
https://github.com/elastic/eland.git
synced 2025-07-11 00:02:14 +08:00
Add quantile() to DataFrameGroupBy
This commit is contained in:
parent
7e8520a8ef
commit
5fe32a24df
@ -0,0 +1,6 @@
|
||||
eland.groupby.DataFrameGroupBy.quantile
|
||||
=======================================
|
||||
|
||||
.. currentmodule:: eland.groupby
|
||||
|
||||
.. automethod:: DataFrameGroupBy.quantile
|
@ -75,6 +75,7 @@ Function Application, GroupBy & Window
|
||||
DataFrameGroupBy.std
|
||||
DataFrameGroupBy.sum
|
||||
DataFrameGroupBy.var
|
||||
DataFrameGroupBy.quantile
|
||||
GroupBy
|
||||
|
||||
.. currentmodule:: eland
|
||||
|
@ -503,6 +503,86 @@ class DataFrameGroupBy(GroupBy):
|
||||
numeric_only=False,
|
||||
)
|
||||
|
||||
def quantile(
|
||||
self, q: Union[int, float, List[int], List[float]] = 0.5
|
||||
) -> "pd.DataFrame":
|
||||
"""
|
||||
Used to groupby and calculate quantile for a given DataFrame.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
q:
|
||||
float or array like, default 0.5
|
||||
Value between 0 <= q <= 1, the quantile(s) to compute.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pandas.DataFrame
|
||||
quantile value for each grouped column
|
||||
|
||||
See Also
|
||||
--------
|
||||
:pandas_api_docs:`pandas.core.groupby.GroupBy.quantile`
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> ed_df = ed.DataFrame('localhost', 'flights')
|
||||
>>> ed_flights = ed_df.filter(["AvgTicketPrice", "FlightDelayMin", "dayOfWeek", "timestamp"])
|
||||
>>> ed_flights.groupby(["dayOfWeek", "Cancelled"]).quantile() # doctest: +SKIP
|
||||
AvgTicketPrice FlightDelayMin
|
||||
dayOfWeek Cancelled
|
||||
0 False 572.290384 0.0
|
||||
True 578.140564 0.0
|
||||
1 False 567.980560 0.0
|
||||
True 582.618713 0.0
|
||||
2 False 590.170986 0.0
|
||||
True 579.811890 0.0
|
||||
3 False 574.131340 0.0
|
||||
True 572.852264 0.0
|
||||
4 False 591.533699 0.0
|
||||
True 582.877014 0.0
|
||||
5 False 791.622625 0.0
|
||||
True 793.362946 0.0
|
||||
6 False 817.378523 0.0
|
||||
True 766.855530 0.0
|
||||
|
||||
>>> ed_flights.groupby(["dayOfWeek", "Cancelled"]).quantile(q=[.2, .5]) # doctest: +SKIP
|
||||
AvgTicketPrice FlightDelayMin
|
||||
dayOfWeek Cancelled
|
||||
0 False 0.2 319.925979 0.0
|
||||
0.5 572.290384 0.0
|
||||
True 0.2 325.704562 0.0
|
||||
0.5 578.140564 0.0
|
||||
1 False 0.2 327.311007 0.0
|
||||
0.5 567.980560 0.0
|
||||
True 0.2 336.839572 0.0
|
||||
0.5 582.618713 0.0
|
||||
2 False 0.2 332.323011 0.0
|
||||
0.5 590.170986 0.0
|
||||
True 0.2 314.472537 0.0
|
||||
0.5 579.811890 0.0
|
||||
3 False 0.2 327.652659 0.0
|
||||
0.5 574.131340 0.0
|
||||
True 0.2 298.483032 0.0
|
||||
0.5 572.852264 0.0
|
||||
4 False 0.2 314.290205 0.0
|
||||
0.5 591.533699 0.0
|
||||
True 0.2 325.024850 0.0
|
||||
0.5 582.877014 0.0
|
||||
5 False 0.2 567.362137 0.0
|
||||
0.5 791.622625 0.0
|
||||
True 0.2 568.323944 0.0
|
||||
0.5 793.362946 0.0
|
||||
6 False 0.2 568.489746 0.0
|
||||
0.5 817.378523 0.0
|
||||
True 0.2 523.890680 0.0
|
||||
0.5 766.855530 0.0
|
||||
|
||||
"""
|
||||
return self._query_compiler.aggs_groupby(
|
||||
by=self._by, pd_aggs=["quantile"], quantiles=q, numeric_only=True
|
||||
)
|
||||
|
||||
def aggregate(
|
||||
self, func: Union[str, List[str]], numeric_only: Optional[bool] = False
|
||||
) -> "pd.DataFrame":
|
||||
|
@ -493,6 +493,7 @@ class Operations:
|
||||
numeric_only: Optional[bool],
|
||||
percentiles: Optional[List[float]] = None,
|
||||
is_dataframe_agg: bool = False,
|
||||
is_groupby: bool = False,
|
||||
) -> Dict[str, List[Any]]:
|
||||
"""
|
||||
This method unpacks metric aggregations JSON response.
|
||||
@ -554,7 +555,9 @@ class Operations:
|
||||
agg_value = agg_value["50.0"]
|
||||
# Currently Pandas does the same
|
||||
# If we call quantile it returns the same result as of median.
|
||||
elif pd_agg == "quantile" and is_dataframe_agg:
|
||||
elif (
|
||||
pd_agg == "quantile" and is_dataframe_agg and not is_groupby
|
||||
):
|
||||
agg_value = agg_value["50.0"]
|
||||
else:
|
||||
# Maintain order of percentiles
|
||||
@ -668,8 +671,9 @@ class Operations:
|
||||
# If numeric_only is True and We only have a NaN type field then we check for empty.
|
||||
if values:
|
||||
results[field.column] = values if len(values) > 1 else values[0]
|
||||
# This only runs when df.quantile() or series.quantile() is called
|
||||
if percentile_values and not is_dataframe_agg:
|
||||
# This only runs when df.quantile() or series.quantile() or
|
||||
# quantile from groupby is called
|
||||
if percentile_values:
|
||||
results[f"{field.column}"] = percentile_values
|
||||
|
||||
return results
|
||||
@ -682,19 +686,6 @@ class Operations:
|
||||
is_dataframe: bool = True,
|
||||
numeric_only: Optional[bool] = True,
|
||||
) -> Union[pd.DataFrame, pd.Series]:
|
||||
# To verify if quantile range falls between 0 to 1
|
||||
def quantile_to_percentile(quantile: Any) -> float:
|
||||
if isinstance(quantile, (int, float)):
|
||||
quantile = float(quantile)
|
||||
if quantile > 1 or quantile < 0:
|
||||
raise ValueError(
|
||||
f"quantile should be in range of 0 and 1, given {quantile}"
|
||||
)
|
||||
else:
|
||||
raise TypeError("quantile should be of type int or float")
|
||||
# quantile * 100 = percentile
|
||||
# return float(...) because min(1.0) gives 1
|
||||
return float(min(100, max(0, quantile * 100)))
|
||||
|
||||
percentiles = [
|
||||
quantile_to_percentile(x)
|
||||
@ -730,6 +721,7 @@ class Operations:
|
||||
by: List[str],
|
||||
pd_aggs: List[str],
|
||||
dropna: bool = True,
|
||||
quantiles: Optional[List[float]] = None,
|
||||
is_dataframe_agg: bool = False,
|
||||
numeric_only: Optional[bool] = True,
|
||||
) -> pd.DataFrame:
|
||||
@ -751,6 +743,8 @@ class Operations:
|
||||
Know if groupby with aggregation or single agg is called.
|
||||
numeric_only:
|
||||
return either numeric values or NaN/NaT
|
||||
quantiles:
|
||||
List of quantiles when 'quantile' agg is called. Otherwise it is None
|
||||
|
||||
Returns
|
||||
-------
|
||||
@ -779,8 +773,19 @@ class Operations:
|
||||
# To return for creating multi-index on columns
|
||||
headers = [agg_field.column for agg_field in agg_fields]
|
||||
|
||||
percentiles: Optional[List[str]] = None
|
||||
if quantiles:
|
||||
percentiles = [
|
||||
quantile_to_percentile(x)
|
||||
for x in (
|
||||
(quantiles,)
|
||||
if not isinstance(quantiles, (list, tuple))
|
||||
else quantiles
|
||||
)
|
||||
]
|
||||
|
||||
# Convert pandas aggs to ES equivalent
|
||||
es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs)
|
||||
es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs=pd_aggs, percentiles=percentiles)
|
||||
|
||||
# Construct Query
|
||||
for by_field in by_fields:
|
||||
@ -804,11 +809,18 @@ class Operations:
|
||||
|
||||
# If we have multiple 'extended_stats' etc. here we simply NOOP on 2nd call
|
||||
if isinstance(es_agg, tuple):
|
||||
body.metric_aggs(
|
||||
f"{es_agg[0]}_{agg_field.es_field_name}",
|
||||
es_agg[0],
|
||||
agg_field.aggregatable_es_field_name,
|
||||
)
|
||||
if es_agg[0] == "percentiles":
|
||||
body.percentile_agg(
|
||||
name=f"{es_agg[0]}_{agg_field.es_field_name}",
|
||||
field=agg_field.es_field_name,
|
||||
percents=es_agg[1],
|
||||
)
|
||||
else:
|
||||
body.metric_aggs(
|
||||
f"{es_agg[0]}_{agg_field.es_field_name}",
|
||||
es_agg[0],
|
||||
agg_field.aggregatable_es_field_name,
|
||||
)
|
||||
else:
|
||||
body.metric_aggs(
|
||||
f"{es_agg}_{agg_field.es_field_name}",
|
||||
@ -832,7 +844,12 @@ class Operations:
|
||||
if by_field.is_timestamp and isinstance(bucket_key, int):
|
||||
bucket_key = pd.to_datetime(bucket_key, unit="ms")
|
||||
|
||||
results[by_field.column].append(bucket_key)
|
||||
if pd_aggs == ["quantile"] and len(percentiles) > 1:
|
||||
bucket_key = [bucket_key] * len(percentiles)
|
||||
|
||||
results[by_field.column].extend(
|
||||
bucket_key if isinstance(bucket_key, list) else [bucket_key]
|
||||
)
|
||||
|
||||
agg_calculation = self._unpack_metric_aggs(
|
||||
fields=agg_fields,
|
||||
@ -840,18 +857,31 @@ class Operations:
|
||||
pd_aggs=pd_aggs,
|
||||
response={"aggregations": bucket},
|
||||
numeric_only=numeric_only,
|
||||
percentiles=percentiles,
|
||||
# We set 'True' here because we want the value
|
||||
# unpacking to always be in 'dataframe' mode.
|
||||
is_dataframe_agg=True,
|
||||
is_groupby=True,
|
||||
)
|
||||
|
||||
# to construct index with quantiles
|
||||
if pd_aggs == ["quantile"] and len(percentiles) > 1:
|
||||
results[None].extend([i / 100 for i in percentiles])
|
||||
|
||||
# Process the calculated agg values to response
|
||||
for key, value in agg_calculation.items():
|
||||
if not isinstance(value, list):
|
||||
results[key].append(value)
|
||||
continue
|
||||
for pd_agg, val in zip(pd_aggs, value):
|
||||
results[f"{key}_{pd_agg}"].append(val)
|
||||
elif isinstance(value, list) and pd_aggs == ["quantile"]:
|
||||
results[f"{key}_{pd_aggs[0]}"].extend(value)
|
||||
else:
|
||||
for pd_agg, val in zip(pd_aggs, value):
|
||||
results[f"{key}_{pd_agg}"].append(val)
|
||||
|
||||
# Just to maintain Output same as pandas with empty header.
|
||||
if pd_aggs == ["quantile"] and len(percentiles) > 1:
|
||||
by = by + [None]
|
||||
|
||||
agg_df = pd.DataFrame(results).set_index(by).sort_index()
|
||||
|
||||
@ -1408,3 +1438,18 @@ class Operations:
|
||||
def update_query(self, boolean_filter):
|
||||
task = BooleanFilterTask(boolean_filter)
|
||||
self._tasks.append(task)
|
||||
|
||||
|
||||
def quantile_to_percentile(quantile: Union[int, float]) -> float:
|
||||
# To verify if quantile range falls between 0 to 1
|
||||
if isinstance(quantile, (int, float)):
|
||||
quantile = float(quantile)
|
||||
if quantile > 1 or quantile < 0:
|
||||
raise ValueError(
|
||||
f"quantile should be in range of 0 and 1, given {quantile}"
|
||||
)
|
||||
else:
|
||||
raise TypeError("quantile should be of type int or float")
|
||||
# quantile * 100 = percentile
|
||||
# return float(...) because min(1.0) gives 1
|
||||
return float(min(100, max(0, quantile * 100)))
|
||||
|
@ -673,11 +673,13 @@ class QueryCompiler:
|
||||
dropna: bool = True,
|
||||
is_dataframe_agg: bool = False,
|
||||
numeric_only: Optional[bool] = True,
|
||||
quantiles: Union[int, float, List[int], List[float], None] = None,
|
||||
) -> pd.DataFrame:
|
||||
return self._operations.aggs_groupby(
|
||||
self,
|
||||
by=by,
|
||||
pd_aggs=pd_aggs,
|
||||
quantiles=quantiles,
|
||||
dropna=dropna,
|
||||
is_dataframe_agg=is_dataframe_agg,
|
||||
numeric_only=numeric_only,
|
||||
|
@ -230,3 +230,26 @@ class TestGroupbyDataFrame(TestData):
|
||||
match = "Currently mode is not supported for groupby"
|
||||
with pytest.raises(NotImplementedError, match=match):
|
||||
ed_flights.groupby("Cancelled").mode()
|
||||
|
||||
@pytest.mark.parametrize("dropna", [True, False])
|
||||
@pytest.mark.parametrize(
|
||||
["func", "args"],
|
||||
[
|
||||
("quantile", ()),
|
||||
("quantile", (0.55,)),
|
||||
("quantile", ([0.2, 0.4, 0.6, 0.8],)),
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize("columns", ["Cancelled", ["dayOfWeek", "Cancelled"]])
|
||||
def test_groupby_aggs_quantile(self, dropna, columns, func, args):
|
||||
# Pandas has numeric_only applicable for the above aggs with groupby only.
|
||||
|
||||
pd_flights = self.pd_flights().filter(self.filter_data)
|
||||
ed_flights = self.ed_flights().filter(self.filter_data)
|
||||
|
||||
pd_groupby = getattr(pd_flights.groupby(columns, dropna=dropna), func)(*args)
|
||||
ed_groupby = getattr(ed_flights.groupby(columns, dropna=dropna), func)(*args)
|
||||
# checking only values because dtypes are checked in aggs tests
|
||||
assert_frame_equal(
|
||||
pd_groupby, ed_groupby, check_exact=False, check_dtype=False, rtol=2
|
||||
)
|
||||
|
Loading…
x
Reference in New Issue
Block a user