From 30876c88996c9ddd768a213ea3b355fd3d0adfa4 Mon Sep 17 00:00:00 2001 From: "P. Sai Vinay" <33659563+V1NAY8@users.noreply.github.com> Date: Sun, 8 Aug 2021 02:35:33 +0530 Subject: [PATCH] Switch to Point-in-Time with search_after instead of using scroll APIs Co-authored-by: Seth Michael Larson --- eland/actions.py | 12 +- eland/common.py | 3 +- eland/dataframe.py | 2 +- eland/operations.py | 168 ++++++++++++++++-------- eland/query_compiler.py | 21 +-- tests/notebook/test_demo_notebook.ipynb | 4 +- 6 files changed, 129 insertions(+), 81 deletions(-) diff --git a/eland/actions.py b/eland/actions.py index d9f4306..f20955c 100644 --- a/eland/actions.py +++ b/eland/actions.py @@ -16,7 +16,7 @@ # under the License. from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, List, Optional, Union +from typing import TYPE_CHECKING, Dict, List, Optional, Union from eland import SortOrder @@ -91,17 +91,17 @@ class TailAction(PostProcessingAction): class SortFieldAction(PostProcessingAction): - def __init__(self, sort_params_string: str) -> None: + def __init__(self, sort_params: Dict[str, str]) -> None: super().__init__("sort_field") - if sort_params_string is None: - raise ValueError("Expected valid string") + if sort_params is None: + raise ValueError("Expected valid dictionary") # Split string - sort_field, _, sort_order = sort_params_string.partition(":") + sort_field, sort_order = list(sort_params.items())[0] if not sort_field or sort_order not in ("asc", "desc"): raise ValueError( - f"Expected ES sort params string (e.g. _doc:desc). Got '{sort_params_string}'" + f"Expected ES sort params dictionary (e.g. {{'_doc': 'desc'}}). Got '{sort_params}'" ) self._sort_field = sort_field diff --git a/eland/common.py b/eland/common.py index 3fbaccc..08665c6 100644 --- a/eland/common.py +++ b/eland/common.py @@ -41,7 +41,8 @@ DEFAULT_NUM_ROWS_DISPLAYED = 60 DEFAULT_CHUNK_SIZE = 10000 DEFAULT_CSV_BATCH_OUTPUT_SIZE = 10000 DEFAULT_PROGRESS_REPORTING_NUM_ROWS = 10000 -DEFAULT_ES_MAX_RESULT_WINDOW = 10000 # index.max_result_window +DEFAULT_SEARCH_SIZE = 5000 +DEFAULT_PIT_KEEP_ALIVE = "3m" DEFAULT_PAGINATION_SIZE = 5000 # for composite aggregations PANDAS_VERSION: Tuple[int, ...] = tuple( int(part) for part in pd.__version__.split(".") if part.isdigit() diff --git a/eland/dataframe.py b/eland/dataframe.py index fca8145..1b4c37d 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -626,7 +626,7 @@ class DataFrame(NDFrame): Operations: tasks: [('boolean_filter': ('boolean_filter': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}})), ('tail': ('sort_field': '_doc', 'count': 5))] size: 5 - sort_params: _doc:desc + sort_params: {'_doc': 'desc'} _source: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin'] body: {'query': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}} post_processing: [('sort_index')] diff --git a/eland/operations.py b/eland/operations.py index 64e6e9d..c72d7e0 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -33,13 +33,14 @@ from typing import ( import numpy as np import pandas as pd # type: ignore -from elasticsearch.helpers import scan +from elasticsearch.exceptions import NotFoundError -from eland.actions import PostProcessingAction, SortFieldAction +from eland.actions import PostProcessingAction from eland.common import ( DEFAULT_CSV_BATCH_OUTPUT_SIZE, - DEFAULT_ES_MAX_RESULT_WINDOW, DEFAULT_PAGINATION_SIZE, + DEFAULT_PIT_KEEP_ALIVE, + DEFAULT_SEARCH_SIZE, SortOrder, build_pd_series, elasticsearch_date_to_pandas_date, @@ -1224,7 +1225,9 @@ class Operations: ) -> None: query_params, post_processing = self._resolve_tasks(query_compiler) - size, sort_params = Operations._query_params_to_size_and_sort(query_params) + result_size, sort_params = Operations._query_params_to_size_and_sort( + query_params + ) script_fields = query_params.script_fields query = Query(query_params.query) @@ -1233,58 +1236,22 @@ class Operations: if script_fields is not None: body["script_fields"] = script_fields - # Only return requested field_names + # Only return requested field_names and add them to body _source = query_compiler.get_field_names(include_scripted_fields=False) - if _source: - # For query_compiler._client.search we could add _source - # as a parameter, or add this value in body. - # - # If _source is a parameter it is encoded into to the url. - # - # If _source is a large number of fields (1000+) then this can result in an - # extremely long url and a `too_long_frame_exception`. Therefore, add - # _source to the body rather than as a _source parameter - body["_source"] = _source - else: - body["_source"] = False + body["_source"] = _source if _source else False - es_results: Any = None + if sort_params: + body["sort"] = [sort_params] - # If size=None use scan not search - then post sort results when in df - # If size>10000 use scan - is_scan = False - if size is not None and size <= DEFAULT_ES_MAX_RESULT_WINDOW: - if size > 0: - es_results = query_compiler._client.search( - index=query_compiler._index_pattern, - size=size, - sort=sort_params, - body=body, - ) - else: - is_scan = True - es_results = scan( - client=query_compiler._client, - index=query_compiler._index_pattern, - query=body, + es_results = list( + search_after_with_pit( + query_compiler=query_compiler, body=body, max_number_of_hits=result_size ) - # create post sort - if sort_params is not None: - post_processing.append(SortFieldAction(sort_params)) + ) - if is_scan: - while True: - partial_result, df = query_compiler._es_results_to_pandas( - es_results, collector.batch_size(), collector.show_progress - ) - df = self._apply_df_post_processing(df, post_processing) - collector.collect(df) - if not partial_result: - break - else: - partial_result, df = query_compiler._es_results_to_pandas(es_results) - df = self._apply_df_post_processing(df, post_processing) - collector.collect(df) + _, df = query_compiler._es_results_to_pandas(es_results) + df = self._apply_df_post_processing(df, post_processing) + collector.collect(df) def index_count(self, query_compiler: "QueryCompiler", field: str) -> int: # field is the index field so count values @@ -1379,13 +1346,12 @@ class Operations: @staticmethod def _query_params_to_size_and_sort( query_params: QueryParams, - ) -> Tuple[Optional[int], Optional[str]]: + ) -> Tuple[Optional[int], Optional[Dict[str, str]]]: sort_params = None if query_params.sort_field and query_params.sort_order: - sort_params = ( - f"{query_params.sort_field}:" - f"{SortOrder.to_string(query_params.sort_order)}" - ) + sort_params = { + query_params.sort_field: SortOrder.to_string(query_params.sort_order) + } size = query_params.size return size, sort_params @@ -1541,3 +1507,93 @@ class PandasDataFrameCollector: @property def show_progress(self) -> bool: return self._show_progress + + +def search_after_with_pit( + query_compiler: "QueryCompiler", + body: Dict[str, Any], + max_number_of_hits: Optional[int], +) -> Generator[Dict[str, Any], None, None]: + """ + This is a generator used to initialize point in time API and query the + search API and return generator which yields an individual document + + Parameters + ---------- + query_compiler: + An instance of query_compiler + body: + body for search API + max_number_of_hits: Optional[int] + Maximum number of documents to yield, set to 'None' to + yield all documents. + + Examples + -------- + >>> results = list(search_after_with_pit(query_compiler, body, 2)) # doctest: +SKIP + [{'_index': 'flights', '_type': '_doc', '_id': '0', '_score': None, '_source': {...}, 'sort': [...]}, + {'_index': 'flights', '_type': '_doc', '_id': '1', '_score': None, '_source': {...}, 'sort': [...]}] + + """ + # No documents, no reason to send a search. + if max_number_of_hits == 0: + return + + hits_yielded = 0 # Track the total number of hits yielded. + pit_id: Optional[str] = None + try: + pit_id = query_compiler._client.open_point_in_time( + index=query_compiler._index_pattern, keep_alive=DEFAULT_PIT_KEEP_ALIVE + )["id"] + + # Modify the search with the new point in time ID and keep-alive time. + body["pit"] = {"id": pit_id, "keep_alive": DEFAULT_PIT_KEEP_ALIVE} + + # Use the default search size + body.setdefault("size", DEFAULT_SEARCH_SIZE) + + # Improves performance by not tracking # of hits. We only + # care about the hit itself for these queries. + body.setdefault("track_total_hits", False) + + # Pagination with 'search_after' must have a 'sort' setting. + # Using '_doc:asc' is the most efficient as reads documents + # in the order that they're written on disk in Lucene. + body.setdefault("sort", [{"_doc": "asc"}]) + + while max_number_of_hits is None or hits_yielded < max_number_of_hits: + resp = query_compiler._client.search(body=body) + hits: List[Dict[str, Any]] = resp["hits"]["hits"] + + # The point in time ID can change between searches so we + # need to keep the next search up-to-date + pit_id = resp.get("pit_id", pit_id) + body["pit"]["id"] = pit_id + + # If we didn't receive any hits it means we've reached the end. + if not hits: + break + + # Calculate which hits should be yielded from this batch + if max_number_of_hits is None: + hits_to_yield = len(hits) + else: + hits_to_yield = min(len(hits), max_number_of_hits - hits_yielded) + + # Yield the hits we need to and then track the total number. + yield from hits[:hits_to_yield] + hits_yielded += hits_to_yield + + # Set the 'search_after' for the next request + # to be the last sort value for this set of hits. + body["search_after"] = hits[-1]["sort"] + + finally: + # We want to cleanup the point in time if we allocated one + # to keep our memory footprint low. + if pit_id is not None: + try: + query_compiler._client.close_point_in_time(body={"id": pit_id}) + except NotFoundError: + # If a point in time is already closed Elasticsearch throws NotFoundError + pass diff --git a/eland/query_compiler.py b/eland/query_compiler.py index cefdef1..34fbc54 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -147,14 +147,14 @@ class QueryCompiler: def _es_results_to_pandas( self, - results: Dict[Any, Any], + results: List[Dict[str, Any]], batch_size: Optional[int] = None, show_progress: bool = False, ) -> "pd.Dataframe": """ Parameters ---------- - results: dict + results: List[Dict[str, Any]] Elasticsearch results from self.client.search Returns @@ -251,18 +251,9 @@ class QueryCompiler: rows = [] index = [] - if isinstance(results, dict): - iterator = results["hits"]["hits"] - - if batch_size is not None: - raise NotImplementedError( - "Can not specify batch_size with dict results" - ) - else: - iterator = results i = 0 - for hit in iterator: + for hit in results: i = i + 1 if "_source" in hit: @@ -338,10 +329,10 @@ class QueryCompiler: is_source_field = False pd_dtype = "object" - if not is_source_field and type(x) is dict: + if not is_source_field and isinstance(x, dict): for a in x: flatten(x[a], name + a + ".") - elif not is_source_field and type(x) is list: + elif not is_source_field and isinstance(x, list): for a in x: flatten(a, name) elif is_source_field: # only print source fields from mappings @@ -357,7 +348,7 @@ class QueryCompiler: # Elasticsearch can have multiple values for a field. These are represented as lists, so # create lists for this pivot (see notes above) if field_name in out: - if type(out[field_name]) is not list: + if not isinstance(out[field_name], list): field_as_list = [out[field_name]] out[field_name] = field_as_list out[field_name].append(x) diff --git a/tests/notebook/test_demo_notebook.ipynb b/tests/notebook/test_demo_notebook.ipynb index 5ebbfc5..1e6dcdc 100644 --- a/tests/notebook/test_demo_notebook.ipynb +++ b/tests/notebook/test_demo_notebook.ipynb @@ -1418,7 +1418,7 @@ "Operations:\n", " tasks: [('boolean_filter': ('boolean_filter': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}})), ('tail': ('sort_field': '_doc', 'count': 5))]\n", " size: 5\n", - " sort_params: _doc:desc\n", + " sort_params: {'_doc': 'desc'}\n", " _source: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']\n", " body: {'query': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}}\n", " post_processing: [('sort_index')]\n", @@ -1452,4 +1452,4 @@ }, "nbformat": 4, "nbformat_minor": 4 -} \ No newline at end of file +}