mirror of
https://github.com/elastic/eland.git
synced 2025-07-11 00:02:14 +08:00
Add [DataFrame, Series].filter()
This commit is contained in:
parent
890cf6dc97
commit
6000ea73d0
@ -5,6 +5,8 @@
|
||||
import sys
|
||||
import warnings
|
||||
from io import StringIO
|
||||
import re
|
||||
from typing import Optional, Sequence, Union
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
@ -383,7 +385,7 @@ class DataFrame(NDFrame):
|
||||
if labels is not None:
|
||||
if index is not None or columns is not None:
|
||||
raise ValueError("Cannot specify both 'labels' and 'index'/'columns'")
|
||||
axis = pd.DataFrame()._get_axis_name(axis)
|
||||
axis = pd.DataFrame._get_axis_name(axis)
|
||||
axes = {axis: labels}
|
||||
elif index is not None or columns is not None:
|
||||
axes, _ = pd.DataFrame()._construct_axes_from_arguments(
|
||||
@ -1443,6 +1445,81 @@ class DataFrame(NDFrame):
|
||||
else:
|
||||
return default
|
||||
|
||||
def filter(
|
||||
self,
|
||||
items: Optional[Sequence[str]] = None,
|
||||
like: Optional[str] = None,
|
||||
regex: Optional[str] = None,
|
||||
axis: Optional[Union[int, str]] = None,
|
||||
):
|
||||
"""
|
||||
Subset the dataframe rows or columns according to the specified index labels.
|
||||
Note that this routine does not filter a dataframe on its
|
||||
contents. The filter is applied to the labels of the index.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
items : list-like
|
||||
Keep labels from axis which are in items.
|
||||
like : str
|
||||
Keep labels from axis for which "like in label == True".
|
||||
regex : str (regular expression)
|
||||
Keep labels from axis for which re.search(regex, label) == True.
|
||||
axis : {0 or ‘index’, 1 or ‘columns’, None}, default None
|
||||
The axis to filter on, expressed either as an index (int) or axis name (str). By default this is the info axis, ‘index’ for Series, ‘columns’ for DataFrame.
|
||||
|
||||
Returns
|
||||
-------
|
||||
eland.DataFrame
|
||||
|
||||
See Also
|
||||
--------
|
||||
:pandas_api_docs:`pandas.DataFrame.filter`
|
||||
|
||||
Notes
|
||||
-----
|
||||
The ``items``, ``like``, and ``regex`` parameters are
|
||||
enforced to be mutually exclusive.
|
||||
"""
|
||||
filter_options_passed = sum([items is not None, bool(like), bool(regex)])
|
||||
if filter_options_passed > 1:
|
||||
raise TypeError(
|
||||
"Keyword arguments `items`, `like`, or `regex` "
|
||||
"are mutually exclusive"
|
||||
)
|
||||
elif filter_options_passed == 0:
|
||||
raise TypeError("Must pass either 'items', 'like', or 'regex'")
|
||||
|
||||
# axis defaults to 'columns' for DataFrame, 'index' for Series
|
||||
if axis is None:
|
||||
axis = "columns"
|
||||
axis = pd.DataFrame._get_axis_name(axis)
|
||||
|
||||
if axis == "index":
|
||||
new_query_compiler = self._query_compiler.filter(
|
||||
items=items, like=like, regex=regex
|
||||
)
|
||||
return self._create_or_update_from_compiler(
|
||||
new_query_compiler, inplace=False
|
||||
)
|
||||
|
||||
else: # axis == "columns"
|
||||
if items is not None:
|
||||
# Pandas skips over columns that don't exist
|
||||
# and maintains order of items=[...]
|
||||
existing_columns = set(self.columns.to_list())
|
||||
return self[[column for column in items if column in existing_columns]]
|
||||
|
||||
elif like is not None:
|
||||
|
||||
def matcher(x):
|
||||
return like in x
|
||||
|
||||
else:
|
||||
matcher = re.compile(regex).search
|
||||
|
||||
return self[[column for column in self.columns if matcher(column)]]
|
||||
|
||||
@property
|
||||
def values(self):
|
||||
"""
|
||||
|
@ -527,6 +527,25 @@ class Operations:
|
||||
results = self._metric_aggs(query_compiler, pd_aggs, numeric_only=False)
|
||||
return pd.DataFrame(results, index=pd_aggs)
|
||||
|
||||
def filter(self, query_compiler, items=None, like=None, regex=None):
|
||||
# This function is only called for axis='index',
|
||||
# DataFrame.filter(..., axis="columns") calls .drop()
|
||||
if items is not None:
|
||||
self.filter_index_values(
|
||||
query_compiler, field=query_compiler.index.es_index_field, items=items
|
||||
)
|
||||
return
|
||||
elif like is not None:
|
||||
arg_name = "like"
|
||||
else:
|
||||
assert regex is not None
|
||||
arg_name = "regex"
|
||||
|
||||
raise NotImplementedError(
|
||||
f".filter({arg_name}='...', axis='index') is currently not supported due "
|
||||
f"to substring and regex operations not being available for Elasticsearch document IDs."
|
||||
)
|
||||
|
||||
def describe(self, query_compiler):
|
||||
query_params, post_processing = self._resolve_tasks(query_compiler)
|
||||
|
||||
@ -674,7 +693,7 @@ class Operations:
|
||||
# _source to the body rather than as a _source parameter
|
||||
body["_source"] = _source
|
||||
else:
|
||||
_source = False
|
||||
body["_source"] = False
|
||||
|
||||
es_results = None
|
||||
|
||||
@ -794,6 +813,16 @@ class Operations:
|
||||
task = QueryTermsTask(False, field, items)
|
||||
self._tasks.append(task)
|
||||
|
||||
def filter_index_values(self, query_compiler, field, items):
|
||||
# Basically .drop_index_values() except with must=True on tasks.
|
||||
self._validate_index_operation(query_compiler, items)
|
||||
|
||||
if field == Index.ID_INDEX_FIELD:
|
||||
task = QueryIdsTask(True, items)
|
||||
else:
|
||||
task = QueryTermsTask(True, field, items)
|
||||
self._tasks.append(task)
|
||||
|
||||
@staticmethod
|
||||
def _query_params_to_size_and_sort(
|
||||
query_params: QueryParams,
|
||||
|
@ -6,7 +6,14 @@ import warnings
|
||||
from copy import deepcopy
|
||||
from typing import Optional, Dict, List, Any
|
||||
|
||||
from eland.filter import RandomScoreFilter, BooleanFilter, NotNull, IsNull, IsIn
|
||||
from eland.filter import (
|
||||
RandomScoreFilter,
|
||||
BooleanFilter,
|
||||
NotNull,
|
||||
IsNull,
|
||||
IsIn,
|
||||
Rlike,
|
||||
)
|
||||
|
||||
|
||||
class Query:
|
||||
@ -75,6 +82,16 @@ class Query:
|
||||
else:
|
||||
self._query = self._query & ~(IsIn(field, items))
|
||||
|
||||
def regexp(self, field: str, value: str) -> None:
|
||||
"""
|
||||
Add regexp query
|
||||
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-regexp-query.html
|
||||
"""
|
||||
if self._query.empty():
|
||||
self._query = Rlike(field, value)
|
||||
else:
|
||||
self._query = self._query & Rlike(field, value)
|
||||
|
||||
def terms_aggs(self, name: str, func: str, field: str, es_size: int) -> None:
|
||||
"""
|
||||
Add terms agg e.g
|
||||
|
@ -469,6 +469,14 @@ class QueryCompiler:
|
||||
|
||||
return result
|
||||
|
||||
def filter(self, items=None, like=None, regex=None):
|
||||
# field will be es_index_field for DataFrames or the column for Series.
|
||||
# This function is only called for axis='index',
|
||||
# DataFrame.filter(..., axis="columns") calls .drop()
|
||||
result = self.copy()
|
||||
result._operations.filter(self, items=items, like=like, regex=regex)
|
||||
return result
|
||||
|
||||
def aggs(self, func):
|
||||
return self._operations.aggs(self, func)
|
||||
|
||||
|
@ -21,6 +21,7 @@ Based on NDFrame which underpins eland.DataFrame
|
||||
import sys
|
||||
import warnings
|
||||
from io import StringIO
|
||||
from typing import Optional, Union, Sequence
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
@ -153,14 +154,14 @@ class Series(NDFrame):
|
||||
return num_rows, num_columns
|
||||
|
||||
@property
|
||||
def field_name(self):
|
||||
def es_field_name(self):
|
||||
"""
|
||||
Returns
|
||||
-------
|
||||
field_name: str
|
||||
es_field_name: str
|
||||
Return the Elasticsearch field name for this series
|
||||
"""
|
||||
return self._query_compiler.field_names[0]
|
||||
return self._query_compiler.get_field_names(include_scripted_fields=True)[0]
|
||||
|
||||
def _get_name(self):
|
||||
return self._query_compiler.columns[0]
|
||||
@ -526,6 +527,62 @@ class Series(NDFrame):
|
||||
"""
|
||||
return 1
|
||||
|
||||
def filter(
|
||||
self,
|
||||
items: Optional[Sequence[str]] = None,
|
||||
like: Optional[str] = None,
|
||||
regex: Optional[str] = None,
|
||||
axis: Optional[Union[int, str]] = None,
|
||||
) -> "Series":
|
||||
"""
|
||||
Subset the dataframe rows or columns according to the specified index labels.
|
||||
Note that this routine does not filter a dataframe on its
|
||||
contents. The filter is applied to the labels of the index.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
items : list-like
|
||||
Keep labels from axis which are in items.
|
||||
like : str
|
||||
Keep labels from axis for which "like in label == True".
|
||||
regex : str (regular expression)
|
||||
Keep labels from axis for which re.search(regex, label) == True.
|
||||
axis : {0 or ‘index’, 1 or ‘columns’, None}, default None
|
||||
The axis to filter on, expressed either as an index (int) or axis name (str).
|
||||
By default this is the info axis, ‘index’ for Series, ‘columns’ for DataFrame.
|
||||
|
||||
Returns
|
||||
-------
|
||||
eland.Series
|
||||
|
||||
See Also
|
||||
--------
|
||||
:pandas_api_docs:`pandas.Series.filter`
|
||||
|
||||
Notes
|
||||
-----
|
||||
The ``items``, ``like``, and ``regex`` parameters are
|
||||
enforced to be mutually exclusive.
|
||||
"""
|
||||
filter_options_passed = sum([items is not None, bool(like), bool(regex)])
|
||||
if filter_options_passed > 1:
|
||||
raise TypeError(
|
||||
"Keyword arguments `items`, `like`, or `regex` "
|
||||
"are mutually exclusive"
|
||||
)
|
||||
elif filter_options_passed == 0:
|
||||
raise TypeError("Must pass either 'items', 'like', or 'regex'")
|
||||
|
||||
# axis defaults to 'columns' for DataFrame, 'index' for Series
|
||||
if axis is None:
|
||||
axis = "index"
|
||||
pd.Series._get_axis_name(axis)
|
||||
|
||||
new_query_compiler = self._query_compiler.filter(
|
||||
items=items, like=like, regex=regex
|
||||
)
|
||||
return Series(_query_compiler=new_query_compiler)
|
||||
|
||||
def es_info(self):
|
||||
buf = StringIO()
|
||||
|
||||
|
@ -272,6 +272,37 @@ class QueryTermsTask(Task):
|
||||
)
|
||||
|
||||
|
||||
class QueryRegexpTask(Task):
|
||||
def __init__(self, field: str, value: str):
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
field: str
|
||||
field_name to filter
|
||||
|
||||
value: str
|
||||
regular expression pattern for filter
|
||||
"""
|
||||
super().__init__("regexp")
|
||||
|
||||
self._field = field
|
||||
self._value = value
|
||||
|
||||
def resolve_task(
|
||||
self,
|
||||
query_params: "QueryParams",
|
||||
post_processing: List["PostProcessingAction"],
|
||||
query_compiler: "QueryCompiler",
|
||||
) -> RESOLVED_TASK_TYPE:
|
||||
query_params.query.regexp(self._field, self._value)
|
||||
return query_params, post_processing
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f"('{self._task_type}': ('field': '{self._field}', 'value': {self._value}))"
|
||||
)
|
||||
|
||||
|
||||
class BooleanFilterTask(Task):
|
||||
def __init__(self, boolean_filter: "BooleanFilter"):
|
||||
"""
|
||||
|
@ -41,6 +41,8 @@ _ed_ecommerce = ed.DataFrame(ES_TEST_CLIENT, ECOMMERCE_INDEX_NAME)
|
||||
|
||||
|
||||
class TestData:
|
||||
client = ES_TEST_CLIENT
|
||||
|
||||
def pd_flights(self):
|
||||
return _pd_flights
|
||||
|
||||
|
@ -34,3 +34,21 @@ class TestDataFrameDrop(TestData):
|
||||
ed_idx0 = ed_flights_small.drop(["1", "2"])
|
||||
|
||||
assert_pandas_eland_frame_equal(pd_idx0, ed_idx0)
|
||||
|
||||
def test_flights_drop_all_columns(self):
|
||||
ed_flights_small = self.ed_flights_small()
|
||||
pd_flights_small = self.pd_flights_small()
|
||||
|
||||
all_columns = ed_flights_small.columns
|
||||
|
||||
pd_col0 = pd_flights_small.drop(labels=all_columns, axis=1)
|
||||
pd_col1 = pd_flights_small.drop(columns=all_columns)
|
||||
|
||||
ed_col0 = ed_flights_small.drop(labels=all_columns, axis=1)
|
||||
ed_col1 = ed_flights_small.drop(columns=all_columns)
|
||||
|
||||
assert_pandas_eland_frame_equal(pd_col0, ed_col0)
|
||||
assert_pandas_eland_frame_equal(pd_col1, ed_col1)
|
||||
|
||||
assert ed_col0.columns.equals(pd_col0.columns)
|
||||
assert ed_col1.columns.equals(pd_col1.columns)
|
||||
|
80
eland/tests/dataframe/test_filter_pytest.py
Normal file
80
eland/tests/dataframe/test_filter_pytest.py
Normal file
@ -0,0 +1,80 @@
|
||||
# Licensed to Elasticsearch B.V under one or more agreements.
|
||||
# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||
# See the LICENSE file in the project root for more information
|
||||
|
||||
# File called _pytest for PyCharm compatability
|
||||
|
||||
import pytest
|
||||
from eland.tests.common import TestData
|
||||
from eland.tests.common import assert_pandas_eland_frame_equal
|
||||
|
||||
|
||||
class TestDataFrameFilter(TestData):
|
||||
def test_filter_arguments_mutually_exclusive(self):
|
||||
ed_flights_small = self.ed_flights_small()
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
ed_flights_small.filter(items=[], like="!", regex="!")
|
||||
with pytest.raises(TypeError):
|
||||
ed_flights_small.filter(items=[], regex="!")
|
||||
with pytest.raises(TypeError):
|
||||
ed_flights_small.filter(items=[], like="!")
|
||||
with pytest.raises(TypeError):
|
||||
ed_flights_small.filter(like="!", regex="!")
|
||||
with pytest.raises(TypeError):
|
||||
ed_flights_small.filter()
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"items",
|
||||
[
|
||||
["DestCountry", "Cancelled", "AvgTicketPrice"],
|
||||
[],
|
||||
["notfound", "AvgTicketPrice"],
|
||||
],
|
||||
)
|
||||
def test_flights_filter_columns_items(self, items):
|
||||
ed_flights_small = self.ed_flights_small()
|
||||
pd_flights_small = self.pd_flights_small()
|
||||
|
||||
ed_df = ed_flights_small.filter(items=items)
|
||||
pd_df = pd_flights_small.filter(items=items)
|
||||
|
||||
assert_pandas_eland_frame_equal(pd_df, ed_df)
|
||||
|
||||
@pytest.mark.parametrize("like", ["Flight", "Nope"])
|
||||
def test_flights_filter_columns_like(self, like):
|
||||
ed_flights_small = self.ed_flights_small()
|
||||
pd_flights_small = self.pd_flights_small()
|
||||
|
||||
ed_df = ed_flights_small.filter(like=like)
|
||||
pd_df = pd_flights_small.filter(like=like)
|
||||
|
||||
assert_pandas_eland_frame_equal(pd_df, ed_df)
|
||||
|
||||
@pytest.mark.parametrize("regex", ["^Flig", "^Flight.*r$", ".*", "^[^C]"])
|
||||
def test_flights_filter_columns_regex(self, regex):
|
||||
ed_flights_small = self.ed_flights_small()
|
||||
pd_flights_small = self.pd_flights_small()
|
||||
|
||||
ed_df = ed_flights_small.filter(regex=regex)
|
||||
pd_df = pd_flights_small.filter(regex=regex)
|
||||
|
||||
assert_pandas_eland_frame_equal(pd_df, ed_df)
|
||||
|
||||
@pytest.mark.parametrize("items", [[], ["20"], [str(x) for x in range(30)]])
|
||||
def test_flights_filter_index_items(self, items):
|
||||
ed_flights_small = self.ed_flights_small()
|
||||
pd_flights_small = self.pd_flights_small()
|
||||
|
||||
ed_df = ed_flights_small.filter(items=items, axis=0)
|
||||
pd_df = pd_flights_small.filter(items=items, axis=0)
|
||||
|
||||
assert_pandas_eland_frame_equal(pd_df, ed_df)
|
||||
|
||||
def test_flights_filter_index_like_and_regex(self):
|
||||
ed_flights_small = self.ed_flights_small()
|
||||
|
||||
with pytest.raises(NotImplementedError):
|
||||
ed_flights_small.filter(like="2", axis=0)
|
||||
with pytest.raises(NotImplementedError):
|
||||
ed_flights_small.filter(regex="^2", axis=0)
|
56
eland/tests/series/test_filter_pytest.py
Normal file
56
eland/tests/series/test_filter_pytest.py
Normal file
@ -0,0 +1,56 @@
|
||||
# Licensed to Elasticsearch B.V under one or more agreements.
|
||||
# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||
# See the LICENSE file in the project root for more information
|
||||
|
||||
# File called _pytest for PyCharm compatability
|
||||
|
||||
import pytest
|
||||
from eland.tests.common import TestData
|
||||
from eland.tests.common import assert_pandas_eland_series_equal
|
||||
|
||||
|
||||
class TestSeriesFilter(TestData):
|
||||
def test_filter_arguments_mutually_exclusive(self):
|
||||
ed_flights_small = self.ed_flights_small()["FlightDelayType"]
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
ed_flights_small.filter(items=[], like="!", regex="!")
|
||||
with pytest.raises(TypeError):
|
||||
ed_flights_small.filter(items=[], regex="!")
|
||||
with pytest.raises(TypeError):
|
||||
ed_flights_small.filter(items=[], like="!")
|
||||
with pytest.raises(TypeError):
|
||||
ed_flights_small.filter(like="!", regex="!")
|
||||
with pytest.raises(TypeError):
|
||||
ed_flights_small.filter()
|
||||
|
||||
def test_filter_columns_not_allowed_for_series(self):
|
||||
ed_flights_small = self.ed_flights_small()["FlightDelayType"]
|
||||
pd_flights_small = self.pd_flights_small()["FlightDelayType"]
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
ed_flights_small.filter(regex=".*", axis="columns")
|
||||
with pytest.raises(ValueError):
|
||||
ed_flights_small.filter(regex=".*", axis=1)
|
||||
with pytest.raises(ValueError):
|
||||
pd_flights_small.filter(regex=".*", axis="columns")
|
||||
with pytest.raises(ValueError):
|
||||
pd_flights_small.filter(regex=".*", axis=1)
|
||||
|
||||
@pytest.mark.parametrize("items", [[], ["20"], [str(x) for x in range(30)]])
|
||||
def test_flights_filter_index_items(self, items):
|
||||
ed_flights_small = self.ed_flights_small()["FlightDelayType"]
|
||||
pd_flights_small = self.pd_flights_small()["FlightDelayType"]
|
||||
|
||||
ed_ser = ed_flights_small.filter(items=items, axis=0)
|
||||
pd_ser = pd_flights_small.filter(items=items, axis=0)
|
||||
|
||||
assert_pandas_eland_series_equal(pd_ser, ed_ser)
|
||||
|
||||
def test_flights_filter_index_like_and_regex(self):
|
||||
ed_flights_small = self.ed_flights_small()["FlightDelayType"]
|
||||
|
||||
with pytest.raises(NotImplementedError):
|
||||
ed_flights_small.filter(like="2", axis=0)
|
||||
with pytest.raises(NotImplementedError):
|
||||
ed_flights_small.filter(regex="^2", axis=0)
|
Loading…
x
Reference in New Issue
Block a user