diff --git a/docs/sphinx/reference/api/eland.Series.unique.rst b/docs/sphinx/reference/api/eland.Series.unique.rst new file mode 100644 index 0000000..f5b1f6d --- /dev/null +++ b/docs/sphinx/reference/api/eland.Series.unique.rst @@ -0,0 +1,6 @@ +eland.Series.unique +==================== + +.. currentmodule:: eland + +.. automethod:: Series.unique diff --git a/docs/sphinx/reference/series.rst b/docs/sphinx/reference/series.rst index 5dd0af7..0882e59 100644 --- a/docs/sphinx/reference/series.rst +++ b/docs/sphinx/reference/series.rst @@ -78,6 +78,7 @@ Computations / Descriptive Stats Series.std Series.var Series.nunique + Series.unique Series.value_counts Series.mode Series.quantile diff --git a/eland/operations.py b/eland/operations.py index 15c0174..2ff75f7 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -800,6 +800,33 @@ class Operations: else: return df if is_dataframe else df.transpose().iloc[0] + def unique(self, query_compiler: "QueryCompiler") -> pd.Series: + + query_params, _ = self._resolve_tasks(query_compiler) + body = Query(query_params.query) + + fields = query_compiler._mappings.all_source_fields() + assert len(fields) == 1 # Unique is only for eland.Series + field = fields[0] + bucket_key = f"unique_{field.column}" + + body.composite_agg_bucket_terms( + name=bucket_key, + field=field.aggregatable_es_field_name, + ) + + # Composite aggregation + body.composite_agg_start(size=DEFAULT_PAGINATION_SIZE, name="unique_buckets") + + unique_buckets: List[Any] = sum( + self.bucket_generator(query_compiler, body, agg_name="unique_buckets"), [] # type: ignore + ) + + return np.array( + [bucket["key"][bucket_key] for bucket in unique_buckets], + dtype=field.pd_dtype, + ) + def aggs_groupby( self, query_compiler: "QueryCompiler", @@ -920,7 +947,9 @@ class Operations: size=DEFAULT_PAGINATION_SIZE, name="groupby_buckets", dropna=dropna ) - for buckets in self.bucket_generator(query_compiler, body): + for buckets in self.bucket_generator( + query_compiler, body, agg_name="groupby_buckets" + ): # We recieve response row-wise for bucket in buckets: # groupby columns are added to result same way they are returned @@ -984,7 +1013,7 @@ class Operations: @staticmethod def bucket_generator( - query_compiler: "QueryCompiler", body: "Query" + query_compiler: "QueryCompiler", body: "Query", agg_name: str ) -> Generator[Sequence[Dict[str, Any]], None, Sequence[Dict[str, Any]]]: """ This can be used for all groupby operations. @@ -1015,7 +1044,7 @@ class Operations: ) # Pagination Logic - composite_buckets: Dict[str, Any] = res["aggregations"]["groupby_buckets"] + composite_buckets: Dict[str, Any] = res["aggregations"][agg_name] after_key: Optional[Dict[str, Any]] = composite_buckets.get( "after_key", None @@ -1028,7 +1057,7 @@ class Operations: yield buckets body.composite_agg_after_key( - name="groupby_buckets", + name=agg_name, after_key=after_key, ) else: diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 8c8b562..6dc032d 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -621,6 +621,9 @@ class QueryCompiler: self, ["nunique"], numeric_only=False ) + def unique(self) -> pd.Series: + return self._operations.unique(self) + def mode( self, es_size: int, diff --git a/eland/series.py b/eland/series.py index 2d7f6b4..0228c52 100644 --- a/eland/series.py +++ b/eland/series.py @@ -1560,6 +1560,24 @@ class Series(NDFrame): results = super().nunique() return results.squeeze() + def unique(self) -> pd.Series: + """ + Returns all unique values within a Series. + Note that behavior is slightly different between pandas and Eland: pandas will return values in the order + they're first seen and Eland returns values in sorted order. + + Returns + ------- + pd.Series + A series containing unique values of given series is returned. + + See Also + -------- + :pandas_api_docs:`pandas.Series.unique` + + """ + return self._query_compiler.unique() + def var(self, numeric_only: Optional[bool] = None) -> pd.Series: """ Return variance for a Series diff --git a/tests/series/test_metrics_pytest.py b/tests/series/test_metrics_pytest.py index 70c4aa1..1fde9ed 100644 --- a/tests/series/test_metrics_pytest.py +++ b/tests/series/test_metrics_pytest.py @@ -156,6 +156,30 @@ class TestSeriesMetrics(TestData): else: assert pd_quantile * 0.9 <= ed_quantile <= pd_quantile * 1.1 + @pytest.mark.parametrize("column", ["FlightDelayMin", "dayOfWeek"]) + def test_flights_unique_numeric(self, column): + pd_flights = self.pd_flights()[column] + ed_flights = self.ed_flights()[column] + + # Pandas returns unique values in order of their appearance + # ES returns results in ascending order, hence sort the pandas array to check equality + pd_unique = np.sort(pd_flights.unique()) + ed_unique = ed_flights.unique() + + np.testing.assert_allclose(pd_unique, ed_unique) + + @pytest.mark.parametrize("column", ["Cancelled", "DestCountry"]) + def test_flights_unique_strings(self, column): + pd_flights = self.pd_flights()[column] + ed_flights = self.ed_flights()[column] + + # Pandas returns unique values in order of their appearance + # ES returns results in ascending order, hence sort the pandas array to check equality + pd_unique = np.sort(pd_flights.unique()) + ed_unique = ed_flights.unique() + + np.equal(pd_unique, ed_unique) + @pytest.mark.parametrize("quantiles_list", [[np.array([1, 2])], ["1", 2]]) def test_quantile_non_numeric_values(self, quantiles_list): ed_flights = self.ed_flights()["dayOfWeek"]