mirror of
https://github.com/elastic/eland.git
synced 2025-07-11 00:02:14 +08:00
Add support for eland.Series.unqiue()
This commit is contained in:
parent
15a3007288
commit
76a52b7947
6
docs/sphinx/reference/api/eland.Series.unique.rst
Normal file
6
docs/sphinx/reference/api/eland.Series.unique.rst
Normal file
@ -0,0 +1,6 @@
|
||||
eland.Series.unique
|
||||
====================
|
||||
|
||||
.. currentmodule:: eland
|
||||
|
||||
.. automethod:: Series.unique
|
@ -78,6 +78,7 @@ Computations / Descriptive Stats
|
||||
Series.std
|
||||
Series.var
|
||||
Series.nunique
|
||||
Series.unique
|
||||
Series.value_counts
|
||||
Series.mode
|
||||
Series.quantile
|
||||
|
@ -800,6 +800,33 @@ class Operations:
|
||||
else:
|
||||
return df if is_dataframe else df.transpose().iloc[0]
|
||||
|
||||
def unique(self, query_compiler: "QueryCompiler") -> pd.Series:
|
||||
|
||||
query_params, _ = self._resolve_tasks(query_compiler)
|
||||
body = Query(query_params.query)
|
||||
|
||||
fields = query_compiler._mappings.all_source_fields()
|
||||
assert len(fields) == 1 # Unique is only for eland.Series
|
||||
field = fields[0]
|
||||
bucket_key = f"unique_{field.column}"
|
||||
|
||||
body.composite_agg_bucket_terms(
|
||||
name=bucket_key,
|
||||
field=field.aggregatable_es_field_name,
|
||||
)
|
||||
|
||||
# Composite aggregation
|
||||
body.composite_agg_start(size=DEFAULT_PAGINATION_SIZE, name="unique_buckets")
|
||||
|
||||
unique_buckets: List[Any] = sum(
|
||||
self.bucket_generator(query_compiler, body, agg_name="unique_buckets"), [] # type: ignore
|
||||
)
|
||||
|
||||
return np.array(
|
||||
[bucket["key"][bucket_key] for bucket in unique_buckets],
|
||||
dtype=field.pd_dtype,
|
||||
)
|
||||
|
||||
def aggs_groupby(
|
||||
self,
|
||||
query_compiler: "QueryCompiler",
|
||||
@ -920,7 +947,9 @@ class Operations:
|
||||
size=DEFAULT_PAGINATION_SIZE, name="groupby_buckets", dropna=dropna
|
||||
)
|
||||
|
||||
for buckets in self.bucket_generator(query_compiler, body):
|
||||
for buckets in self.bucket_generator(
|
||||
query_compiler, body, agg_name="groupby_buckets"
|
||||
):
|
||||
# We recieve response row-wise
|
||||
for bucket in buckets:
|
||||
# groupby columns are added to result same way they are returned
|
||||
@ -984,7 +1013,7 @@ class Operations:
|
||||
|
||||
@staticmethod
|
||||
def bucket_generator(
|
||||
query_compiler: "QueryCompiler", body: "Query"
|
||||
query_compiler: "QueryCompiler", body: "Query", agg_name: str
|
||||
) -> Generator[Sequence[Dict[str, Any]], None, Sequence[Dict[str, Any]]]:
|
||||
"""
|
||||
This can be used for all groupby operations.
|
||||
@ -1015,7 +1044,7 @@ class Operations:
|
||||
)
|
||||
|
||||
# Pagination Logic
|
||||
composite_buckets: Dict[str, Any] = res["aggregations"]["groupby_buckets"]
|
||||
composite_buckets: Dict[str, Any] = res["aggregations"][agg_name]
|
||||
|
||||
after_key: Optional[Dict[str, Any]] = composite_buckets.get(
|
||||
"after_key", None
|
||||
@ -1028,7 +1057,7 @@ class Operations:
|
||||
yield buckets
|
||||
|
||||
body.composite_agg_after_key(
|
||||
name="groupby_buckets",
|
||||
name=agg_name,
|
||||
after_key=after_key,
|
||||
)
|
||||
else:
|
||||
|
@ -621,6 +621,9 @@ class QueryCompiler:
|
||||
self, ["nunique"], numeric_only=False
|
||||
)
|
||||
|
||||
def unique(self) -> pd.Series:
|
||||
return self._operations.unique(self)
|
||||
|
||||
def mode(
|
||||
self,
|
||||
es_size: int,
|
||||
|
@ -1560,6 +1560,24 @@ class Series(NDFrame):
|
||||
results = super().nunique()
|
||||
return results.squeeze()
|
||||
|
||||
def unique(self) -> pd.Series:
|
||||
"""
|
||||
Returns all unique values within a Series.
|
||||
Note that behavior is slightly different between pandas and Eland: pandas will return values in the order
|
||||
they're first seen and Eland returns values in sorted order.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.Series
|
||||
A series containing unique values of given series is returned.
|
||||
|
||||
See Also
|
||||
--------
|
||||
:pandas_api_docs:`pandas.Series.unique`
|
||||
|
||||
"""
|
||||
return self._query_compiler.unique()
|
||||
|
||||
def var(self, numeric_only: Optional[bool] = None) -> pd.Series:
|
||||
"""
|
||||
Return variance for a Series
|
||||
|
@ -156,6 +156,30 @@ class TestSeriesMetrics(TestData):
|
||||
else:
|
||||
assert pd_quantile * 0.9 <= ed_quantile <= pd_quantile * 1.1
|
||||
|
||||
@pytest.mark.parametrize("column", ["FlightDelayMin", "dayOfWeek"])
|
||||
def test_flights_unique_numeric(self, column):
|
||||
pd_flights = self.pd_flights()[column]
|
||||
ed_flights = self.ed_flights()[column]
|
||||
|
||||
# Pandas returns unique values in order of their appearance
|
||||
# ES returns results in ascending order, hence sort the pandas array to check equality
|
||||
pd_unique = np.sort(pd_flights.unique())
|
||||
ed_unique = ed_flights.unique()
|
||||
|
||||
np.testing.assert_allclose(pd_unique, ed_unique)
|
||||
|
||||
@pytest.mark.parametrize("column", ["Cancelled", "DestCountry"])
|
||||
def test_flights_unique_strings(self, column):
|
||||
pd_flights = self.pd_flights()[column]
|
||||
ed_flights = self.ed_flights()[column]
|
||||
|
||||
# Pandas returns unique values in order of their appearance
|
||||
# ES returns results in ascending order, hence sort the pandas array to check equality
|
||||
pd_unique = np.sort(pd_flights.unique())
|
||||
ed_unique = ed_flights.unique()
|
||||
|
||||
np.equal(pd_unique, ed_unique)
|
||||
|
||||
@pytest.mark.parametrize("quantiles_list", [[np.array([1, 2])], ["1", 2]])
|
||||
def test_quantile_non_numeric_values(self, quantiles_list):
|
||||
ed_flights = self.ed_flights()["dayOfWeek"]
|
||||
|
Loading…
x
Reference in New Issue
Block a user