Switch to Point-in-Time with search_after instead of using scroll APIs

Co-authored-by: Seth Michael Larson <seth.larson@elastic.co>
This commit is contained in:
P. Sai Vinay 2021-08-08 02:35:33 +05:30 committed by GitHub
parent 8f84a315be
commit 30876c8899
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 129 additions and 81 deletions

View File

@ -16,7 +16,7 @@
# under the License. # under the License.
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, List, Optional, Union from typing import TYPE_CHECKING, Dict, List, Optional, Union
from eland import SortOrder from eland import SortOrder
@ -91,17 +91,17 @@ class TailAction(PostProcessingAction):
class SortFieldAction(PostProcessingAction): class SortFieldAction(PostProcessingAction):
def __init__(self, sort_params_string: str) -> None: def __init__(self, sort_params: Dict[str, str]) -> None:
super().__init__("sort_field") super().__init__("sort_field")
if sort_params_string is None: if sort_params is None:
raise ValueError("Expected valid string") raise ValueError("Expected valid dictionary")
# Split string # Split string
sort_field, _, sort_order = sort_params_string.partition(":") sort_field, sort_order = list(sort_params.items())[0]
if not sort_field or sort_order not in ("asc", "desc"): if not sort_field or sort_order not in ("asc", "desc"):
raise ValueError( raise ValueError(
f"Expected ES sort params string (e.g. _doc:desc). Got '{sort_params_string}'" f"Expected ES sort params dictionary (e.g. {{'_doc': 'desc'}}). Got '{sort_params}'"
) )
self._sort_field = sort_field self._sort_field = sort_field

View File

@ -41,7 +41,8 @@ DEFAULT_NUM_ROWS_DISPLAYED = 60
DEFAULT_CHUNK_SIZE = 10000 DEFAULT_CHUNK_SIZE = 10000
DEFAULT_CSV_BATCH_OUTPUT_SIZE = 10000 DEFAULT_CSV_BATCH_OUTPUT_SIZE = 10000
DEFAULT_PROGRESS_REPORTING_NUM_ROWS = 10000 DEFAULT_PROGRESS_REPORTING_NUM_ROWS = 10000
DEFAULT_ES_MAX_RESULT_WINDOW = 10000 # index.max_result_window DEFAULT_SEARCH_SIZE = 5000
DEFAULT_PIT_KEEP_ALIVE = "3m"
DEFAULT_PAGINATION_SIZE = 5000 # for composite aggregations DEFAULT_PAGINATION_SIZE = 5000 # for composite aggregations
PANDAS_VERSION: Tuple[int, ...] = tuple( PANDAS_VERSION: Tuple[int, ...] = tuple(
int(part) for part in pd.__version__.split(".") if part.isdigit() int(part) for part in pd.__version__.split(".") if part.isdigit()

View File

@ -626,7 +626,7 @@ class DataFrame(NDFrame):
Operations: Operations:
tasks: [('boolean_filter': ('boolean_filter': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}})), ('tail': ('sort_field': '_doc', 'count': 5))] tasks: [('boolean_filter': ('boolean_filter': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}})), ('tail': ('sort_field': '_doc', 'count': 5))]
size: 5 size: 5
sort_params: _doc:desc sort_params: {'_doc': 'desc'}
_source: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin'] _source: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']
body: {'query': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}} body: {'query': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}}
post_processing: [('sort_index')] post_processing: [('sort_index')]

View File

@ -33,13 +33,14 @@ from typing import (
import numpy as np import numpy as np
import pandas as pd # type: ignore import pandas as pd # type: ignore
from elasticsearch.helpers import scan from elasticsearch.exceptions import NotFoundError
from eland.actions import PostProcessingAction, SortFieldAction from eland.actions import PostProcessingAction
from eland.common import ( from eland.common import (
DEFAULT_CSV_BATCH_OUTPUT_SIZE, DEFAULT_CSV_BATCH_OUTPUT_SIZE,
DEFAULT_ES_MAX_RESULT_WINDOW,
DEFAULT_PAGINATION_SIZE, DEFAULT_PAGINATION_SIZE,
DEFAULT_PIT_KEEP_ALIVE,
DEFAULT_SEARCH_SIZE,
SortOrder, SortOrder,
build_pd_series, build_pd_series,
elasticsearch_date_to_pandas_date, elasticsearch_date_to_pandas_date,
@ -1224,7 +1225,9 @@ class Operations:
) -> None: ) -> None:
query_params, post_processing = self._resolve_tasks(query_compiler) query_params, post_processing = self._resolve_tasks(query_compiler)
size, sort_params = Operations._query_params_to_size_and_sort(query_params) result_size, sort_params = Operations._query_params_to_size_and_sort(
query_params
)
script_fields = query_params.script_fields script_fields = query_params.script_fields
query = Query(query_params.query) query = Query(query_params.query)
@ -1233,58 +1236,22 @@ class Operations:
if script_fields is not None: if script_fields is not None:
body["script_fields"] = script_fields body["script_fields"] = script_fields
# Only return requested field_names # Only return requested field_names and add them to body
_source = query_compiler.get_field_names(include_scripted_fields=False) _source = query_compiler.get_field_names(include_scripted_fields=False)
if _source: body["_source"] = _source if _source else False
# For query_compiler._client.search we could add _source
# as a parameter, or add this value in body.
#
# If _source is a parameter it is encoded into to the url.
#
# If _source is a large number of fields (1000+) then this can result in an
# extremely long url and a `too_long_frame_exception`. Therefore, add
# _source to the body rather than as a _source parameter
body["_source"] = _source
else:
body["_source"] = False
es_results: Any = None if sort_params:
body["sort"] = [sort_params]
# If size=None use scan not search - then post sort results when in df es_results = list(
# If size>10000 use scan search_after_with_pit(
is_scan = False query_compiler=query_compiler, body=body, max_number_of_hits=result_size
if size is not None and size <= DEFAULT_ES_MAX_RESULT_WINDOW:
if size > 0:
es_results = query_compiler._client.search(
index=query_compiler._index_pattern,
size=size,
sort=sort_params,
body=body,
)
else:
is_scan = True
es_results = scan(
client=query_compiler._client,
index=query_compiler._index_pattern,
query=body,
) )
# create post sort )
if sort_params is not None:
post_processing.append(SortFieldAction(sort_params))
if is_scan: _, df = query_compiler._es_results_to_pandas(es_results)
while True: df = self._apply_df_post_processing(df, post_processing)
partial_result, df = query_compiler._es_results_to_pandas( collector.collect(df)
es_results, collector.batch_size(), collector.show_progress
)
df = self._apply_df_post_processing(df, post_processing)
collector.collect(df)
if not partial_result:
break
else:
partial_result, df = query_compiler._es_results_to_pandas(es_results)
df = self._apply_df_post_processing(df, post_processing)
collector.collect(df)
def index_count(self, query_compiler: "QueryCompiler", field: str) -> int: def index_count(self, query_compiler: "QueryCompiler", field: str) -> int:
# field is the index field so count values # field is the index field so count values
@ -1379,13 +1346,12 @@ class Operations:
@staticmethod @staticmethod
def _query_params_to_size_and_sort( def _query_params_to_size_and_sort(
query_params: QueryParams, query_params: QueryParams,
) -> Tuple[Optional[int], Optional[str]]: ) -> Tuple[Optional[int], Optional[Dict[str, str]]]:
sort_params = None sort_params = None
if query_params.sort_field and query_params.sort_order: if query_params.sort_field and query_params.sort_order:
sort_params = ( sort_params = {
f"{query_params.sort_field}:" query_params.sort_field: SortOrder.to_string(query_params.sort_order)
f"{SortOrder.to_string(query_params.sort_order)}" }
)
size = query_params.size size = query_params.size
return size, sort_params return size, sort_params
@ -1541,3 +1507,93 @@ class PandasDataFrameCollector:
@property @property
def show_progress(self) -> bool: def show_progress(self) -> bool:
return self._show_progress return self._show_progress
def search_after_with_pit(
query_compiler: "QueryCompiler",
body: Dict[str, Any],
max_number_of_hits: Optional[int],
) -> Generator[Dict[str, Any], None, None]:
"""
This is a generator used to initialize point in time API and query the
search API and return generator which yields an individual document
Parameters
----------
query_compiler:
An instance of query_compiler
body:
body for search API
max_number_of_hits: Optional[int]
Maximum number of documents to yield, set to 'None' to
yield all documents.
Examples
--------
>>> results = list(search_after_with_pit(query_compiler, body, 2)) # doctest: +SKIP
[{'_index': 'flights', '_type': '_doc', '_id': '0', '_score': None, '_source': {...}, 'sort': [...]},
{'_index': 'flights', '_type': '_doc', '_id': '1', '_score': None, '_source': {...}, 'sort': [...]}]
"""
# No documents, no reason to send a search.
if max_number_of_hits == 0:
return
hits_yielded = 0 # Track the total number of hits yielded.
pit_id: Optional[str] = None
try:
pit_id = query_compiler._client.open_point_in_time(
index=query_compiler._index_pattern, keep_alive=DEFAULT_PIT_KEEP_ALIVE
)["id"]
# Modify the search with the new point in time ID and keep-alive time.
body["pit"] = {"id": pit_id, "keep_alive": DEFAULT_PIT_KEEP_ALIVE}
# Use the default search size
body.setdefault("size", DEFAULT_SEARCH_SIZE)
# Improves performance by not tracking # of hits. We only
# care about the hit itself for these queries.
body.setdefault("track_total_hits", False)
# Pagination with 'search_after' must have a 'sort' setting.
# Using '_doc:asc' is the most efficient as reads documents
# in the order that they're written on disk in Lucene.
body.setdefault("sort", [{"_doc": "asc"}])
while max_number_of_hits is None or hits_yielded < max_number_of_hits:
resp = query_compiler._client.search(body=body)
hits: List[Dict[str, Any]] = resp["hits"]["hits"]
# The point in time ID can change between searches so we
# need to keep the next search up-to-date
pit_id = resp.get("pit_id", pit_id)
body["pit"]["id"] = pit_id
# If we didn't receive any hits it means we've reached the end.
if not hits:
break
# Calculate which hits should be yielded from this batch
if max_number_of_hits is None:
hits_to_yield = len(hits)
else:
hits_to_yield = min(len(hits), max_number_of_hits - hits_yielded)
# Yield the hits we need to and then track the total number.
yield from hits[:hits_to_yield]
hits_yielded += hits_to_yield
# Set the 'search_after' for the next request
# to be the last sort value for this set of hits.
body["search_after"] = hits[-1]["sort"]
finally:
# We want to cleanup the point in time if we allocated one
# to keep our memory footprint low.
if pit_id is not None:
try:
query_compiler._client.close_point_in_time(body={"id": pit_id})
except NotFoundError:
# If a point in time is already closed Elasticsearch throws NotFoundError
pass

View File

@ -147,14 +147,14 @@ class QueryCompiler:
def _es_results_to_pandas( def _es_results_to_pandas(
self, self,
results: Dict[Any, Any], results: List[Dict[str, Any]],
batch_size: Optional[int] = None, batch_size: Optional[int] = None,
show_progress: bool = False, show_progress: bool = False,
) -> "pd.Dataframe": ) -> "pd.Dataframe":
""" """
Parameters Parameters
---------- ----------
results: dict results: List[Dict[str, Any]]
Elasticsearch results from self.client.search Elasticsearch results from self.client.search
Returns Returns
@ -251,18 +251,9 @@ class QueryCompiler:
rows = [] rows = []
index = [] index = []
if isinstance(results, dict):
iterator = results["hits"]["hits"]
if batch_size is not None:
raise NotImplementedError(
"Can not specify batch_size with dict results"
)
else:
iterator = results
i = 0 i = 0
for hit in iterator: for hit in results:
i = i + 1 i = i + 1
if "_source" in hit: if "_source" in hit:
@ -338,10 +329,10 @@ class QueryCompiler:
is_source_field = False is_source_field = False
pd_dtype = "object" pd_dtype = "object"
if not is_source_field and type(x) is dict: if not is_source_field and isinstance(x, dict):
for a in x: for a in x:
flatten(x[a], name + a + ".") flatten(x[a], name + a + ".")
elif not is_source_field and type(x) is list: elif not is_source_field and isinstance(x, list):
for a in x: for a in x:
flatten(a, name) flatten(a, name)
elif is_source_field: # only print source fields from mappings elif is_source_field: # only print source fields from mappings
@ -357,7 +348,7 @@ class QueryCompiler:
# Elasticsearch can have multiple values for a field. These are represented as lists, so # Elasticsearch can have multiple values for a field. These are represented as lists, so
# create lists for this pivot (see notes above) # create lists for this pivot (see notes above)
if field_name in out: if field_name in out:
if type(out[field_name]) is not list: if not isinstance(out[field_name], list):
field_as_list = [out[field_name]] field_as_list = [out[field_name]]
out[field_name] = field_as_list out[field_name] = field_as_list
out[field_name].append(x) out[field_name].append(x)

View File

@ -1418,7 +1418,7 @@
"Operations:\n", "Operations:\n",
" tasks: [('boolean_filter': ('boolean_filter': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}})), ('tail': ('sort_field': '_doc', 'count': 5))]\n", " tasks: [('boolean_filter': ('boolean_filter': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}})), ('tail': ('sort_field': '_doc', 'count': 5))]\n",
" size: 5\n", " size: 5\n",
" sort_params: _doc:desc\n", " sort_params: {'_doc': 'desc'}\n",
" _source: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']\n", " _source: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']\n",
" body: {'query': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}}\n", " body: {'query': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}}\n",
" post_processing: [('sort_index')]\n", " post_processing: [('sort_index')]\n",
@ -1452,4 +1452,4 @@
}, },
"nbformat": 4, "nbformat": 4,
"nbformat_minor": 4 "nbformat_minor": 4
} }