diff --git a/docs/sphinx/reference/api/eland.groupby.DataFrameGroupBy.quantile.rst b/docs/sphinx/reference/api/eland.groupby.DataFrameGroupBy.quantile.rst new file mode 100644 index 0000000..154a267 --- /dev/null +++ b/docs/sphinx/reference/api/eland.groupby.DataFrameGroupBy.quantile.rst @@ -0,0 +1,6 @@ +eland.groupby.DataFrameGroupBy.quantile +======================================= + +.. currentmodule:: eland.groupby + +.. automethod:: DataFrameGroupBy.quantile diff --git a/docs/sphinx/reference/dataframe.rst b/docs/sphinx/reference/dataframe.rst index 7cab8a6..b73a0f4 100644 --- a/docs/sphinx/reference/dataframe.rst +++ b/docs/sphinx/reference/dataframe.rst @@ -75,6 +75,7 @@ Function Application, GroupBy & Window DataFrameGroupBy.std DataFrameGroupBy.sum DataFrameGroupBy.var + DataFrameGroupBy.quantile GroupBy .. currentmodule:: eland diff --git a/eland/groupby.py b/eland/groupby.py index 2183039..cb0a16a 100644 --- a/eland/groupby.py +++ b/eland/groupby.py @@ -503,6 +503,86 @@ class DataFrameGroupBy(GroupBy): numeric_only=False, ) + def quantile( + self, q: Union[int, float, List[int], List[float]] = 0.5 + ) -> "pd.DataFrame": + """ + Used to groupby and calculate quantile for a given DataFrame. + + Parameters + ---------- + q: + float or array like, default 0.5 + Value between 0 <= q <= 1, the quantile(s) to compute. + + Returns + ------- + pandas.DataFrame + quantile value for each grouped column + + See Also + -------- + :pandas_api_docs:`pandas.core.groupby.GroupBy.quantile` + + Examples + -------- + >>> ed_df = ed.DataFrame('localhost', 'flights') + >>> ed_flights = ed_df.filter(["AvgTicketPrice", "FlightDelayMin", "dayOfWeek", "timestamp"]) + >>> ed_flights.groupby(["dayOfWeek", "Cancelled"]).quantile() # doctest: +SKIP + AvgTicketPrice FlightDelayMin + dayOfWeek Cancelled + 0 False 572.290384 0.0 + True 578.140564 0.0 + 1 False 567.980560 0.0 + True 582.618713 0.0 + 2 False 590.170986 0.0 + True 579.811890 0.0 + 3 False 574.131340 0.0 + True 572.852264 0.0 + 4 False 591.533699 0.0 + True 582.877014 0.0 + 5 False 791.622625 0.0 + True 793.362946 0.0 + 6 False 817.378523 0.0 + True 766.855530 0.0 + + >>> ed_flights.groupby(["dayOfWeek", "Cancelled"]).quantile(q=[.2, .5]) # doctest: +SKIP + AvgTicketPrice FlightDelayMin + dayOfWeek Cancelled + 0 False 0.2 319.925979 0.0 + 0.5 572.290384 0.0 + True 0.2 325.704562 0.0 + 0.5 578.140564 0.0 + 1 False 0.2 327.311007 0.0 + 0.5 567.980560 0.0 + True 0.2 336.839572 0.0 + 0.5 582.618713 0.0 + 2 False 0.2 332.323011 0.0 + 0.5 590.170986 0.0 + True 0.2 314.472537 0.0 + 0.5 579.811890 0.0 + 3 False 0.2 327.652659 0.0 + 0.5 574.131340 0.0 + True 0.2 298.483032 0.0 + 0.5 572.852264 0.0 + 4 False 0.2 314.290205 0.0 + 0.5 591.533699 0.0 + True 0.2 325.024850 0.0 + 0.5 582.877014 0.0 + 5 False 0.2 567.362137 0.0 + 0.5 791.622625 0.0 + True 0.2 568.323944 0.0 + 0.5 793.362946 0.0 + 6 False 0.2 568.489746 0.0 + 0.5 817.378523 0.0 + True 0.2 523.890680 0.0 + 0.5 766.855530 0.0 + + """ + return self._query_compiler.aggs_groupby( + by=self._by, pd_aggs=["quantile"], quantiles=q, numeric_only=True + ) + def aggregate( self, func: Union[str, List[str]], numeric_only: Optional[bool] = False ) -> "pd.DataFrame": diff --git a/eland/operations.py b/eland/operations.py index abb4f84..da4b5c2 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -493,6 +493,7 @@ class Operations: numeric_only: Optional[bool], percentiles: Optional[List[float]] = None, is_dataframe_agg: bool = False, + is_groupby: bool = False, ) -> Dict[str, List[Any]]: """ This method unpacks metric aggregations JSON response. @@ -554,7 +555,9 @@ class Operations: agg_value = agg_value["50.0"] # Currently Pandas does the same # If we call quantile it returns the same result as of median. - elif pd_agg == "quantile" and is_dataframe_agg: + elif ( + pd_agg == "quantile" and is_dataframe_agg and not is_groupby + ): agg_value = agg_value["50.0"] else: # Maintain order of percentiles @@ -668,8 +671,9 @@ class Operations: # If numeric_only is True and We only have a NaN type field then we check for empty. if values: results[field.column] = values if len(values) > 1 else values[0] - # This only runs when df.quantile() or series.quantile() is called - if percentile_values and not is_dataframe_agg: + # This only runs when df.quantile() or series.quantile() or + # quantile from groupby is called + if percentile_values: results[f"{field.column}"] = percentile_values return results @@ -682,19 +686,6 @@ class Operations: is_dataframe: bool = True, numeric_only: Optional[bool] = True, ) -> Union[pd.DataFrame, pd.Series]: - # To verify if quantile range falls between 0 to 1 - def quantile_to_percentile(quantile: Any) -> float: - if isinstance(quantile, (int, float)): - quantile = float(quantile) - if quantile > 1 or quantile < 0: - raise ValueError( - f"quantile should be in range of 0 and 1, given {quantile}" - ) - else: - raise TypeError("quantile should be of type int or float") - # quantile * 100 = percentile - # return float(...) because min(1.0) gives 1 - return float(min(100, max(0, quantile * 100))) percentiles = [ quantile_to_percentile(x) @@ -730,6 +721,7 @@ class Operations: by: List[str], pd_aggs: List[str], dropna: bool = True, + quantiles: Optional[List[float]] = None, is_dataframe_agg: bool = False, numeric_only: Optional[bool] = True, ) -> pd.DataFrame: @@ -751,6 +743,8 @@ class Operations: Know if groupby with aggregation or single agg is called. numeric_only: return either numeric values or NaN/NaT + quantiles: + List of quantiles when 'quantile' agg is called. Otherwise it is None Returns ------- @@ -779,8 +773,19 @@ class Operations: # To return for creating multi-index on columns headers = [agg_field.column for agg_field in agg_fields] + percentiles: Optional[List[str]] = None + if quantiles: + percentiles = [ + quantile_to_percentile(x) + for x in ( + (quantiles,) + if not isinstance(quantiles, (list, tuple)) + else quantiles + ) + ] + # Convert pandas aggs to ES equivalent - es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs) + es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs=pd_aggs, percentiles=percentiles) # Construct Query for by_field in by_fields: @@ -804,11 +809,18 @@ class Operations: # If we have multiple 'extended_stats' etc. here we simply NOOP on 2nd call if isinstance(es_agg, tuple): - body.metric_aggs( - f"{es_agg[0]}_{agg_field.es_field_name}", - es_agg[0], - agg_field.aggregatable_es_field_name, - ) + if es_agg[0] == "percentiles": + body.percentile_agg( + name=f"{es_agg[0]}_{agg_field.es_field_name}", + field=agg_field.es_field_name, + percents=es_agg[1], + ) + else: + body.metric_aggs( + f"{es_agg[0]}_{agg_field.es_field_name}", + es_agg[0], + agg_field.aggregatable_es_field_name, + ) else: body.metric_aggs( f"{es_agg}_{agg_field.es_field_name}", @@ -832,7 +844,12 @@ class Operations: if by_field.is_timestamp and isinstance(bucket_key, int): bucket_key = pd.to_datetime(bucket_key, unit="ms") - results[by_field.column].append(bucket_key) + if pd_aggs == ["quantile"] and len(percentiles) > 1: + bucket_key = [bucket_key] * len(percentiles) + + results[by_field.column].extend( + bucket_key if isinstance(bucket_key, list) else [bucket_key] + ) agg_calculation = self._unpack_metric_aggs( fields=agg_fields, @@ -840,18 +857,31 @@ class Operations: pd_aggs=pd_aggs, response={"aggregations": bucket}, numeric_only=numeric_only, + percentiles=percentiles, # We set 'True' here because we want the value # unpacking to always be in 'dataframe' mode. is_dataframe_agg=True, + is_groupby=True, ) + # to construct index with quantiles + if pd_aggs == ["quantile"] and len(percentiles) > 1: + results[None].extend([i / 100 for i in percentiles]) + # Process the calculated agg values to response for key, value in agg_calculation.items(): if not isinstance(value, list): results[key].append(value) continue - for pd_agg, val in zip(pd_aggs, value): - results[f"{key}_{pd_agg}"].append(val) + elif isinstance(value, list) and pd_aggs == ["quantile"]: + results[f"{key}_{pd_aggs[0]}"].extend(value) + else: + for pd_agg, val in zip(pd_aggs, value): + results[f"{key}_{pd_agg}"].append(val) + + # Just to maintain Output same as pandas with empty header. + if pd_aggs == ["quantile"] and len(percentiles) > 1: + by = by + [None] agg_df = pd.DataFrame(results).set_index(by).sort_index() @@ -1408,3 +1438,18 @@ class Operations: def update_query(self, boolean_filter): task = BooleanFilterTask(boolean_filter) self._tasks.append(task) + + +def quantile_to_percentile(quantile: Union[int, float]) -> float: + # To verify if quantile range falls between 0 to 1 + if isinstance(quantile, (int, float)): + quantile = float(quantile) + if quantile > 1 or quantile < 0: + raise ValueError( + f"quantile should be in range of 0 and 1, given {quantile}" + ) + else: + raise TypeError("quantile should be of type int or float") + # quantile * 100 = percentile + # return float(...) because min(1.0) gives 1 + return float(min(100, max(0, quantile * 100))) diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 3495200..fc6e0ea 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -673,11 +673,13 @@ class QueryCompiler: dropna: bool = True, is_dataframe_agg: bool = False, numeric_only: Optional[bool] = True, + quantiles: Union[int, float, List[int], List[float], None] = None, ) -> pd.DataFrame: return self._operations.aggs_groupby( self, by=by, pd_aggs=pd_aggs, + quantiles=quantiles, dropna=dropna, is_dataframe_agg=is_dataframe_agg, numeric_only=numeric_only, diff --git a/tests/dataframe/test_groupby_pytest.py b/tests/dataframe/test_groupby_pytest.py index f3188c1..e6be6c0 100644 --- a/tests/dataframe/test_groupby_pytest.py +++ b/tests/dataframe/test_groupby_pytest.py @@ -230,3 +230,26 @@ class TestGroupbyDataFrame(TestData): match = "Currently mode is not supported for groupby" with pytest.raises(NotImplementedError, match=match): ed_flights.groupby("Cancelled").mode() + + @pytest.mark.parametrize("dropna", [True, False]) + @pytest.mark.parametrize( + ["func", "args"], + [ + ("quantile", ()), + ("quantile", (0.55,)), + ("quantile", ([0.2, 0.4, 0.6, 0.8],)), + ], + ) + @pytest.mark.parametrize("columns", ["Cancelled", ["dayOfWeek", "Cancelled"]]) + def test_groupby_aggs_quantile(self, dropna, columns, func, args): + # Pandas has numeric_only applicable for the above aggs with groupby only. + + pd_flights = self.pd_flights().filter(self.filter_data) + ed_flights = self.ed_flights().filter(self.filter_data) + + pd_groupby = getattr(pd_flights.groupby(columns, dropna=dropna), func)(*args) + ed_groupby = getattr(ed_flights.groupby(columns, dropna=dropna), func)(*args) + # checking only values because dtypes are checked in aggs tests + assert_frame_equal( + pd_groupby, ed_groupby, check_exact=False, check_dtype=False, rtol=2 + )