Add DataFrame.es_query() to query Elasticsearch directly

This commit is contained in:
Seth Michael Larson 2020-04-01 18:00:22 -05:00 committed by Seth Michael Larson
parent 38251ddf08
commit 7e5f0d3913
7 changed files with 155 additions and 21 deletions

View File

@ -0,0 +1,6 @@
eland.DataFrame.es_query
========================
.. currentmodule:: eland
.. automethod:: DataFrame.es_query

View File

@ -76,6 +76,14 @@ Plotting
DataFrame.hist
Elasticsearch Functions
~~~~~~~~~~~~~~~~~~~~~~~
.. autosummary::
:toctree: api/
DataFrame.info_es
DataFrame.es_query
Serialization / IO / conversion
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autosummary::
@ -86,10 +94,3 @@ Serialization / IO / conversion
DataFrame.to_csv
DataFrame.to_html
DataFrame.to_string
Elasticsearch utilities
~~~~~~~~~~~~~~~~~~~~~~~
.. autosummary::
:toctree: api/
DataFrame.info_es

View File

@ -569,6 +569,62 @@ class DataFrame(NDFrame):
return buf.getvalue()
def es_query(self, query):
"""Applies an Elasticsearch DSL query to the current DataFrame.
Parameters
----------
query:
Dictionary of the Elasticsearch DSL query to apply
Returns
-------
eland.DataFrame:
eland DataFrame with the query applied
Examples
--------
Apply a `geo-distance query`_ to a dataset with a geo-point column ``geoip.location``.
.. _geo-distance query: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-geo-distance-query.html
>>> df = ed.DataFrame('localhost', 'ecommerce', columns=['customer_first_name', 'geoip.city_name'])
>>> df.es_query({"bool": {"filter": {"geo_distance": {"distance": "1km", "geoip.location": [55.3, 25.3]}}}}).head()
customer_first_name geoip.city_name
1 Mary Dubai
9 Rabbia Al Dubai
10 Rabbia Al Dubai
22 Mary Dubai
30 Robbie Dubai
<BLANKLINE>
[5 rows x 2 columns]
If using an occurrence like ``must`` or ``filter`` you must
nest it within ``bool``:
.. code-block:: python
# Correct:
df.es_query({
"bool": {
"filter": {...}
}
})
# Incorrect, needs to be nested under 'bool':
df.es_query({
"filter": {...}
})
"""
# Unpack the {'query': ...} which some
# users may use due to documentation.
if not isinstance(query, dict):
raise TypeError("'query' must be of type 'dict'")
if tuple(query) == ("query",):
query = query["query"]
return DataFrame(query_compiler=self._query_compiler.es_query(query))
def _index_summary(self):
# Print index summary e.g.
# Index: 103 entries, 0 to 102

View File

@ -178,3 +178,9 @@ class ScriptFilter(BooleanFilter):
if params is not None:
script["params"] = params
self._filter = {"script": {"script": script}}
class QueryFilter(BooleanFilter):
def __init__(self, query):
super().__init__()
self._filter = query

View File

@ -135,27 +135,20 @@ class Query:
self._aggs[name] = agg
def to_search_body(self):
if self._query.empty():
if self._aggs:
body = {"aggs": self._aggs}
else:
body = {}
else:
if self._aggs:
body = {"query": self._query.build(), "aggs": self._aggs}
else:
body = {"query": self._query.build()}
body = {}
if self._aggs:
body["aggs"] = self._aggs
if not self._query.empty():
body["query"] = self._query.build()
return body
def to_count_body(self):
if len(self._aggs) > 0:
warnings.warn("Requesting count for agg query {}", self)
if self._query.empty():
body = None
return None
else:
body = {"query": self._query.build()}
return body
return {"query": self._query.build()}
def update_boolean_filter(self, boolean_filter):
if self._query.empty():

View File

@ -25,6 +25,7 @@ from eland import (
from eland import FieldMappings
from eland import Index
from eland import Operations
from eland.filter import QueryFilter
class QueryCompiler:
@ -397,6 +398,9 @@ class QueryCompiler:
return result
def es_query(self, query):
return self._update_query(QueryFilter(query))
# To/From Pandas
def to_pandas(self, show_progress=False):
"""Converts Eland DataFrame to Pandas DataFrame.

View File

@ -0,0 +1,68 @@
# Copyright 2019 Elasticsearch BV
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# File called _pytest for PyCharm compatability
import pytest
from eland.tests.common import TestData
from eland.tests.common import assert_eland_frame_equal
class TestDataEsQuery(TestData):
def test_flights_match_query(self):
ed_flights = self.ed_flights()
left = ed_flights.es_query({"match": {"OriginCityName": "Rome"}})[
ed_flights["Carrier"] == "Kibana Airlines"
]
right = ed_flights[ed_flights["Carrier"] == "Kibana Airlines"].es_query(
{"match": {"OriginCityName": "Rome"}}
)
assert len(left) > 0
assert_eland_frame_equal(left, right)
def test_es_query_allows_query_in_dict(self):
ed_flights = self.ed_flights()
left = ed_flights.es_query({"match": {"OriginCityName": "Rome"}})
right = ed_flights.es_query({"query": {"match": {"OriginCityName": "Rome"}}})
assert len(left) > 0
assert_eland_frame_equal(left, right)
def test_es_query_geo_location(self):
df = self.ed_ecommerce()
cur_nearby = df.es_query(
{
"bool": {
"filter": {
"geo_distance": {
"distance": "10km",
"geoip.location": {"lon": 55.3, "lat": 25.3},
}
}
}
}
)["currency"].value_counts()
assert cur_nearby["EUR"] == 476
@pytest.mark.parametrize("query", [(), [], 1, True])
def test_es_query_wrong_type(self, query):
ed_flights = self.ed_flights_small()
with pytest.raises(TypeError):
ed_flights.es_query(query)