From 7e5f0d39136865eeab95fc03a21e88c342e8c381 Mon Sep 17 00:00:00 2001 From: Seth Michael Larson Date: Wed, 1 Apr 2020 18:00:22 -0500 Subject: [PATCH] Add DataFrame.es_query() to query Elasticsearch directly --- .../api/eland.DataFrame.es_query.rst | 6 ++ docs/source/reference/dataframe.rst | 15 ++-- eland/dataframe.py | 56 +++++++++++++++ eland/filter.py | 6 ++ eland/query.py | 21 ++---- eland/query_compiler.py | 4 ++ eland/tests/dataframe/test_es_query_pytest.py | 68 +++++++++++++++++++ 7 files changed, 155 insertions(+), 21 deletions(-) create mode 100644 docs/source/reference/api/eland.DataFrame.es_query.rst create mode 100644 eland/tests/dataframe/test_es_query_pytest.py diff --git a/docs/source/reference/api/eland.DataFrame.es_query.rst b/docs/source/reference/api/eland.DataFrame.es_query.rst new file mode 100644 index 0000000..e0e7879 --- /dev/null +++ b/docs/source/reference/api/eland.DataFrame.es_query.rst @@ -0,0 +1,6 @@ +eland.DataFrame.es_query +======================== + +.. currentmodule:: eland + +.. automethod:: DataFrame.es_query diff --git a/docs/source/reference/dataframe.rst b/docs/source/reference/dataframe.rst index 4757c0c..9da5a29 100644 --- a/docs/source/reference/dataframe.rst +++ b/docs/source/reference/dataframe.rst @@ -76,6 +76,14 @@ Plotting DataFrame.hist +Elasticsearch Functions +~~~~~~~~~~~~~~~~~~~~~~~ +.. autosummary:: + :toctree: api/ + + DataFrame.info_es + DataFrame.es_query + Serialization / IO / conversion ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. autosummary:: @@ -86,10 +94,3 @@ Serialization / IO / conversion DataFrame.to_csv DataFrame.to_html DataFrame.to_string - -Elasticsearch utilities -~~~~~~~~~~~~~~~~~~~~~~~ -.. autosummary:: - :toctree: api/ - - DataFrame.info_es diff --git a/eland/dataframe.py b/eland/dataframe.py index 4a95cf0..c434567 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -569,6 +569,62 @@ class DataFrame(NDFrame): return buf.getvalue() + def es_query(self, query): + """Applies an Elasticsearch DSL query to the current DataFrame. + + Parameters + ---------- + query: + Dictionary of the Elasticsearch DSL query to apply + + Returns + ------- + eland.DataFrame: + eland DataFrame with the query applied + + Examples + -------- + + Apply a `geo-distance query`_ to a dataset with a geo-point column ``geoip.location``. + + .. _geo-distance query: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-geo-distance-query.html + + >>> df = ed.DataFrame('localhost', 'ecommerce', columns=['customer_first_name', 'geoip.city_name']) + >>> df.es_query({"bool": {"filter": {"geo_distance": {"distance": "1km", "geoip.location": [55.3, 25.3]}}}}).head() + customer_first_name geoip.city_name + 1 Mary Dubai + 9 Rabbia Al Dubai + 10 Rabbia Al Dubai + 22 Mary Dubai + 30 Robbie Dubai + + [5 rows x 2 columns] + + If using an occurrence like ``must`` or ``filter`` you must + nest it within ``bool``: + + .. code-block:: python + + # Correct: + df.es_query({ + "bool": { + "filter": {...} + } + }) + + # Incorrect, needs to be nested under 'bool': + df.es_query({ + "filter": {...} + }) + """ + # Unpack the {'query': ...} which some + # users may use due to documentation. + if not isinstance(query, dict): + raise TypeError("'query' must be of type 'dict'") + if tuple(query) == ("query",): + query = query["query"] + return DataFrame(query_compiler=self._query_compiler.es_query(query)) + def _index_summary(self): # Print index summary e.g. # Index: 103 entries, 0 to 102 diff --git a/eland/filter.py b/eland/filter.py index 592fe9d..fb2bbf2 100644 --- a/eland/filter.py +++ b/eland/filter.py @@ -178,3 +178,9 @@ class ScriptFilter(BooleanFilter): if params is not None: script["params"] = params self._filter = {"script": {"script": script}} + + +class QueryFilter(BooleanFilter): + def __init__(self, query): + super().__init__() + self._filter = query diff --git a/eland/query.py b/eland/query.py index 3ab814c..701e6ad 100644 --- a/eland/query.py +++ b/eland/query.py @@ -135,27 +135,20 @@ class Query: self._aggs[name] = agg def to_search_body(self): - if self._query.empty(): - if self._aggs: - body = {"aggs": self._aggs} - else: - body = {} - else: - if self._aggs: - body = {"query": self._query.build(), "aggs": self._aggs} - else: - body = {"query": self._query.build()} + body = {} + if self._aggs: + body["aggs"] = self._aggs + if not self._query.empty(): + body["query"] = self._query.build() return body def to_count_body(self): if len(self._aggs) > 0: warnings.warn("Requesting count for agg query {}", self) if self._query.empty(): - body = None + return None else: - body = {"query": self._query.build()} - - return body + return {"query": self._query.build()} def update_boolean_filter(self, boolean_filter): if self._query.empty(): diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 7b0bd72..6a7aae0 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -25,6 +25,7 @@ from eland import ( from eland import FieldMappings from eland import Index from eland import Operations +from eland.filter import QueryFilter class QueryCompiler: @@ -397,6 +398,9 @@ class QueryCompiler: return result + def es_query(self, query): + return self._update_query(QueryFilter(query)) + # To/From Pandas def to_pandas(self, show_progress=False): """Converts Eland DataFrame to Pandas DataFrame. diff --git a/eland/tests/dataframe/test_es_query_pytest.py b/eland/tests/dataframe/test_es_query_pytest.py new file mode 100644 index 0000000..9e9d3ce --- /dev/null +++ b/eland/tests/dataframe/test_es_query_pytest.py @@ -0,0 +1,68 @@ +# Copyright 2019 Elasticsearch BV +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# File called _pytest for PyCharm compatability + +import pytest +from eland.tests.common import TestData +from eland.tests.common import assert_eland_frame_equal + + +class TestDataEsQuery(TestData): + def test_flights_match_query(self): + ed_flights = self.ed_flights() + + left = ed_flights.es_query({"match": {"OriginCityName": "Rome"}})[ + ed_flights["Carrier"] == "Kibana Airlines" + ] + + right = ed_flights[ed_flights["Carrier"] == "Kibana Airlines"].es_query( + {"match": {"OriginCityName": "Rome"}} + ) + + assert len(left) > 0 + assert_eland_frame_equal(left, right) + + def test_es_query_allows_query_in_dict(self): + ed_flights = self.ed_flights() + + left = ed_flights.es_query({"match": {"OriginCityName": "Rome"}}) + right = ed_flights.es_query({"query": {"match": {"OriginCityName": "Rome"}}}) + + assert len(left) > 0 + assert_eland_frame_equal(left, right) + + def test_es_query_geo_location(self): + df = self.ed_ecommerce() + cur_nearby = df.es_query( + { + "bool": { + "filter": { + "geo_distance": { + "distance": "10km", + "geoip.location": {"lon": 55.3, "lat": 25.3}, + } + } + } + } + )["currency"].value_counts() + + assert cur_nearby["EUR"] == 476 + + @pytest.mark.parametrize("query", [(), [], 1, True]) + def test_es_query_wrong_type(self, query): + ed_flights = self.ed_flights_small() + + with pytest.raises(TypeError): + ed_flights.es_query(query)