diff --git a/eland/dataframe.py b/eland/dataframe.py index 246600d..54c3fc0 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -5,6 +5,8 @@ import sys import warnings from io import StringIO +import re +from typing import Optional, Sequence, Union import numpy as np import pandas as pd @@ -383,7 +385,7 @@ class DataFrame(NDFrame): if labels is not None: if index is not None or columns is not None: raise ValueError("Cannot specify both 'labels' and 'index'/'columns'") - axis = pd.DataFrame()._get_axis_name(axis) + axis = pd.DataFrame._get_axis_name(axis) axes = {axis: labels} elif index is not None or columns is not None: axes, _ = pd.DataFrame()._construct_axes_from_arguments( @@ -1443,6 +1445,81 @@ class DataFrame(NDFrame): else: return default + def filter( + self, + items: Optional[Sequence[str]] = None, + like: Optional[str] = None, + regex: Optional[str] = None, + axis: Optional[Union[int, str]] = None, + ): + """ + Subset the dataframe rows or columns according to the specified index labels. + Note that this routine does not filter a dataframe on its + contents. The filter is applied to the labels of the index. + + Parameters + ---------- + items : list-like + Keep labels from axis which are in items. + like : str + Keep labels from axis for which "like in label == True". + regex : str (regular expression) + Keep labels from axis for which re.search(regex, label) == True. + axis : {0 or ‘index’, 1 or ‘columns’, None}, default None + The axis to filter on, expressed either as an index (int) or axis name (str). By default this is the info axis, ‘index’ for Series, ‘columns’ for DataFrame. + + Returns + ------- + eland.DataFrame + + See Also + -------- + :pandas_api_docs:`pandas.DataFrame.filter` + + Notes + ----- + The ``items``, ``like``, and ``regex`` parameters are + enforced to be mutually exclusive. + """ + filter_options_passed = sum([items is not None, bool(like), bool(regex)]) + if filter_options_passed > 1: + raise TypeError( + "Keyword arguments `items`, `like`, or `regex` " + "are mutually exclusive" + ) + elif filter_options_passed == 0: + raise TypeError("Must pass either 'items', 'like', or 'regex'") + + # axis defaults to 'columns' for DataFrame, 'index' for Series + if axis is None: + axis = "columns" + axis = pd.DataFrame._get_axis_name(axis) + + if axis == "index": + new_query_compiler = self._query_compiler.filter( + items=items, like=like, regex=regex + ) + return self._create_or_update_from_compiler( + new_query_compiler, inplace=False + ) + + else: # axis == "columns" + if items is not None: + # Pandas skips over columns that don't exist + # and maintains order of items=[...] + existing_columns = set(self.columns.to_list()) + return self[[column for column in items if column in existing_columns]] + + elif like is not None: + + def matcher(x): + return like in x + + else: + matcher = re.compile(regex).search + + return self[[column for column in self.columns if matcher(column)]] + @property def values(self): """ diff --git a/eland/operations.py b/eland/operations.py index bfd0ff4..361585e 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -527,6 +527,25 @@ class Operations: results = self._metric_aggs(query_compiler, pd_aggs, numeric_only=False) return pd.DataFrame(results, index=pd_aggs) + def filter(self, query_compiler, items=None, like=None, regex=None): + # This function is only called for axis='index', + # DataFrame.filter(..., axis="columns") calls .drop() + if items is not None: + self.filter_index_values( + query_compiler, field=query_compiler.index.es_index_field, items=items + ) + return + elif like is not None: + arg_name = "like" + else: + assert regex is not None + arg_name = "regex" + + raise NotImplementedError( + f".filter({arg_name}='...', axis='index') is currently not supported due " + f"to substring and regex operations not being available for Elasticsearch document IDs." + ) + def describe(self, query_compiler): query_params, post_processing = self._resolve_tasks(query_compiler) @@ -674,7 +693,7 @@ class Operations: # _source to the body rather than as a _source parameter body["_source"] = _source else: - _source = False + body["_source"] = False es_results = None @@ -794,6 +813,16 @@ class Operations: task = QueryTermsTask(False, field, items) self._tasks.append(task) + def filter_index_values(self, query_compiler, field, items): + # Basically .drop_index_values() except with must=True on tasks. + self._validate_index_operation(query_compiler, items) + + if field == Index.ID_INDEX_FIELD: + task = QueryIdsTask(True, items) + else: + task = QueryTermsTask(True, field, items) + self._tasks.append(task) + @staticmethod def _query_params_to_size_and_sort( query_params: QueryParams, diff --git a/eland/query.py b/eland/query.py index a04b5e6..604ea76 100644 --- a/eland/query.py +++ b/eland/query.py @@ -6,7 +6,14 @@ import warnings from copy import deepcopy from typing import Optional, Dict, List, Any -from eland.filter import RandomScoreFilter, BooleanFilter, NotNull, IsNull, IsIn +from eland.filter import ( + RandomScoreFilter, + BooleanFilter, + NotNull, + IsNull, + IsIn, + Rlike, +) class Query: @@ -75,6 +82,16 @@ class Query: else: self._query = self._query & ~(IsIn(field, items)) + def regexp(self, field: str, value: str) -> None: + """ + Add regexp query + https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-regexp-query.html + """ + if self._query.empty(): + self._query = Rlike(field, value) + else: + self._query = self._query & Rlike(field, value) + def terms_aggs(self, name: str, func: str, field: str, es_size: int) -> None: """ Add terms agg e.g diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 9a0bf7b..a7622b6 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -469,6 +469,14 @@ class QueryCompiler: return result + def filter(self, items=None, like=None, regex=None): + # field will be es_index_field for DataFrames or the column for Series. + # This function is only called for axis='index', + # DataFrame.filter(..., axis="columns") calls .drop() + result = self.copy() + result._operations.filter(self, items=items, like=like, regex=regex) + return result + def aggs(self, func): return self._operations.aggs(self, func) diff --git a/eland/series.py b/eland/series.py index db25585..fd27ceb 100644 --- a/eland/series.py +++ b/eland/series.py @@ -21,6 +21,7 @@ Based on NDFrame which underpins eland.DataFrame import sys import warnings from io import StringIO +from typing import Optional, Union, Sequence import numpy as np import pandas as pd @@ -153,14 +154,14 @@ class Series(NDFrame): return num_rows, num_columns @property - def field_name(self): + def es_field_name(self): """ Returns ------- - field_name: str + es_field_name: str Return the Elasticsearch field name for this series """ - return self._query_compiler.field_names[0] + return self._query_compiler.get_field_names(include_scripted_fields=True)[0] def _get_name(self): return self._query_compiler.columns[0] @@ -526,6 +527,62 @@ class Series(NDFrame): """ return 1 + def filter( + self, + items: Optional[Sequence[str]] = None, + like: Optional[str] = None, + regex: Optional[str] = None, + axis: Optional[Union[int, str]] = None, + ) -> "Series": + """ + Subset the dataframe rows or columns according to the specified index labels. + Note that this routine does not filter a dataframe on its + contents. The filter is applied to the labels of the index. + + Parameters + ---------- + items : list-like + Keep labels from axis which are in items. + like : str + Keep labels from axis for which "like in label == True". + regex : str (regular expression) + Keep labels from axis for which re.search(regex, label) == True. + axis : {0 or ‘index’, 1 or ‘columns’, None}, default None + The axis to filter on, expressed either as an index (int) or axis name (str). + By default this is the info axis, ‘index’ for Series, ‘columns’ for DataFrame. + + Returns + ------- + eland.Series + + See Also + -------- + :pandas_api_docs:`pandas.Series.filter` + + Notes + ----- + The ``items``, ``like``, and ``regex`` parameters are + enforced to be mutually exclusive. + """ + filter_options_passed = sum([items is not None, bool(like), bool(regex)]) + if filter_options_passed > 1: + raise TypeError( + "Keyword arguments `items`, `like`, or `regex` " + "are mutually exclusive" + ) + elif filter_options_passed == 0: + raise TypeError("Must pass either 'items', 'like', or 'regex'") + + # axis defaults to 'columns' for DataFrame, 'index' for Series + if axis is None: + axis = "index" + pd.Series._get_axis_name(axis) + + new_query_compiler = self._query_compiler.filter( + items=items, like=like, regex=regex + ) + return Series(_query_compiler=new_query_compiler) + def es_info(self): buf = StringIO() diff --git a/eland/tasks.py b/eland/tasks.py index 1dfcf7b..8b240da 100644 --- a/eland/tasks.py +++ b/eland/tasks.py @@ -272,6 +272,37 @@ class QueryTermsTask(Task): ) +class QueryRegexpTask(Task): + def __init__(self, field: str, value: str): + """ + Parameters + ---------- + field: str + field_name to filter + + value: str + regular expression pattern for filter + """ + super().__init__("regexp") + + self._field = field + self._value = value + + def resolve_task( + self, + query_params: "QueryParams", + post_processing: List["PostProcessingAction"], + query_compiler: "QueryCompiler", + ) -> RESOLVED_TASK_TYPE: + query_params.query.regexp(self._field, self._value) + return query_params, post_processing + + def __repr__(self) -> str: + return ( + f"('{self._task_type}': ('field': '{self._field}', 'value': {self._value}))" + ) + + class BooleanFilterTask(Task): def __init__(self, boolean_filter: "BooleanFilter"): """ diff --git a/eland/tests/common.py b/eland/tests/common.py index 661d329..6b50660 100644 --- a/eland/tests/common.py +++ b/eland/tests/common.py @@ -41,6 +41,8 @@ _ed_ecommerce = ed.DataFrame(ES_TEST_CLIENT, ECOMMERCE_INDEX_NAME) class TestData: + client = ES_TEST_CLIENT + def pd_flights(self): return _pd_flights diff --git a/eland/tests/dataframe/test_drop_pytest.py b/eland/tests/dataframe/test_drop_pytest.py index a7e7b22..82504c6 100644 --- a/eland/tests/dataframe/test_drop_pytest.py +++ b/eland/tests/dataframe/test_drop_pytest.py @@ -34,3 +34,21 @@ class TestDataFrameDrop(TestData): ed_idx0 = ed_flights_small.drop(["1", "2"]) assert_pandas_eland_frame_equal(pd_idx0, ed_idx0) + + def test_flights_drop_all_columns(self): + ed_flights_small = self.ed_flights_small() + pd_flights_small = self.pd_flights_small() + + all_columns = ed_flights_small.columns + + pd_col0 = pd_flights_small.drop(labels=all_columns, axis=1) + pd_col1 = pd_flights_small.drop(columns=all_columns) + + ed_col0 = ed_flights_small.drop(labels=all_columns, axis=1) + ed_col1 = ed_flights_small.drop(columns=all_columns) + + assert_pandas_eland_frame_equal(pd_col0, ed_col0) + assert_pandas_eland_frame_equal(pd_col1, ed_col1) + + assert ed_col0.columns.equals(pd_col0.columns) + assert ed_col1.columns.equals(pd_col1.columns) diff --git a/eland/tests/dataframe/test_filter_pytest.py b/eland/tests/dataframe/test_filter_pytest.py new file mode 100644 index 0000000..5791f58 --- /dev/null +++ b/eland/tests/dataframe/test_filter_pytest.py @@ -0,0 +1,80 @@ +# Licensed to Elasticsearch B.V under one or more agreements. +# Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +# See the LICENSE file in the project root for more information + +# File called _pytest for PyCharm compatability + +import pytest +from eland.tests.common import TestData +from eland.tests.common import assert_pandas_eland_frame_equal + + +class TestDataFrameFilter(TestData): + def test_filter_arguments_mutually_exclusive(self): + ed_flights_small = self.ed_flights_small() + + with pytest.raises(TypeError): + ed_flights_small.filter(items=[], like="!", regex="!") + with pytest.raises(TypeError): + ed_flights_small.filter(items=[], regex="!") + with pytest.raises(TypeError): + ed_flights_small.filter(items=[], like="!") + with pytest.raises(TypeError): + ed_flights_small.filter(like="!", regex="!") + with pytest.raises(TypeError): + ed_flights_small.filter() + + @pytest.mark.parametrize( + "items", + [ + ["DestCountry", "Cancelled", "AvgTicketPrice"], + [], + ["notfound", "AvgTicketPrice"], + ], + ) + def test_flights_filter_columns_items(self, items): + ed_flights_small = self.ed_flights_small() + pd_flights_small = self.pd_flights_small() + + ed_df = ed_flights_small.filter(items=items) + pd_df = pd_flights_small.filter(items=items) + + assert_pandas_eland_frame_equal(pd_df, ed_df) + + @pytest.mark.parametrize("like", ["Flight", "Nope"]) + def test_flights_filter_columns_like(self, like): + ed_flights_small = self.ed_flights_small() + pd_flights_small = self.pd_flights_small() + + ed_df = ed_flights_small.filter(like=like) + pd_df = pd_flights_small.filter(like=like) + + assert_pandas_eland_frame_equal(pd_df, ed_df) + + @pytest.mark.parametrize("regex", ["^Flig", "^Flight.*r$", ".*", "^[^C]"]) + def test_flights_filter_columns_regex(self, regex): + ed_flights_small = self.ed_flights_small() + pd_flights_small = self.pd_flights_small() + + ed_df = ed_flights_small.filter(regex=regex) + pd_df = pd_flights_small.filter(regex=regex) + + assert_pandas_eland_frame_equal(pd_df, ed_df) + + @pytest.mark.parametrize("items", [[], ["20"], [str(x) for x in range(30)]]) + def test_flights_filter_index_items(self, items): + ed_flights_small = self.ed_flights_small() + pd_flights_small = self.pd_flights_small() + + ed_df = ed_flights_small.filter(items=items, axis=0) + pd_df = pd_flights_small.filter(items=items, axis=0) + + assert_pandas_eland_frame_equal(pd_df, ed_df) + + def test_flights_filter_index_like_and_regex(self): + ed_flights_small = self.ed_flights_small() + + with pytest.raises(NotImplementedError): + ed_flights_small.filter(like="2", axis=0) + with pytest.raises(NotImplementedError): + ed_flights_small.filter(regex="^2", axis=0) diff --git a/eland/tests/series/test_filter_pytest.py b/eland/tests/series/test_filter_pytest.py new file mode 100644 index 0000000..1bb79b3 --- /dev/null +++ b/eland/tests/series/test_filter_pytest.py @@ -0,0 +1,56 @@ +# Licensed to Elasticsearch B.V under one or more agreements. +# Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +# See the LICENSE file in the project root for more information + +# File called _pytest for PyCharm compatability + +import pytest +from eland.tests.common import TestData +from eland.tests.common import assert_pandas_eland_series_equal + + +class TestSeriesFilter(TestData): + def test_filter_arguments_mutually_exclusive(self): + ed_flights_small = self.ed_flights_small()["FlightDelayType"] + + with pytest.raises(TypeError): + ed_flights_small.filter(items=[], like="!", regex="!") + with pytest.raises(TypeError): + ed_flights_small.filter(items=[], regex="!") + with pytest.raises(TypeError): + ed_flights_small.filter(items=[], like="!") + with pytest.raises(TypeError): + ed_flights_small.filter(like="!", regex="!") + with pytest.raises(TypeError): + ed_flights_small.filter() + + def test_filter_columns_not_allowed_for_series(self): + ed_flights_small = self.ed_flights_small()["FlightDelayType"] + pd_flights_small = self.pd_flights_small()["FlightDelayType"] + + with pytest.raises(ValueError): + ed_flights_small.filter(regex=".*", axis="columns") + with pytest.raises(ValueError): + ed_flights_small.filter(regex=".*", axis=1) + with pytest.raises(ValueError): + pd_flights_small.filter(regex=".*", axis="columns") + with pytest.raises(ValueError): + pd_flights_small.filter(regex=".*", axis=1) + + @pytest.mark.parametrize("items", [[], ["20"], [str(x) for x in range(30)]]) + def test_flights_filter_index_items(self, items): + ed_flights_small = self.ed_flights_small()["FlightDelayType"] + pd_flights_small = self.pd_flights_small()["FlightDelayType"] + + ed_ser = ed_flights_small.filter(items=items, axis=0) + pd_ser = pd_flights_small.filter(items=items, axis=0) + + assert_pandas_eland_series_equal(pd_ser, ed_ser) + + def test_flights_filter_index_like_and_regex(self): + ed_flights_small = self.ed_flights_small()["FlightDelayType"] + + with pytest.raises(NotImplementedError): + ed_flights_small.filter(like="2", axis=0) + with pytest.raises(NotImplementedError): + ed_flights_small.filter(regex="^2", axis=0)