Yield list of hits from _search_yield_hits() instead of individual hits

This commit is contained in:
Seth Michael Larson 2021-08-17 12:16:10 -05:00 committed by GitHub
parent 011bf29816
commit e4f88a34a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 28 deletions

View File

@ -1234,13 +1234,14 @@ class Operations:
if sort_params:
body["sort"] = [sort_params]
es_results = list(
search_yield_hits(
es_results: List[Dict[str, Any]] = sum(
_search_yield_hits(
query_compiler=query_compiler, body=body, max_number_of_hits=result_size
)
),
[],
)
_, df = query_compiler._es_results_to_pandas(
df = query_compiler._es_results_to_pandas(
results=es_results, show_progress=show_progress
)
df = self._apply_df_post_processing(df, post_processing)
@ -1447,14 +1448,16 @@ def quantile_to_percentile(quantile: Union[int, float]) -> float:
return float(min(100, max(0, quantile * 100)))
def search_yield_hits(
def _search_yield_hits(
query_compiler: "QueryCompiler",
body: Dict[str, Any],
max_number_of_hits: Optional[int],
) -> Generator[Dict[str, Any], None, None]:
) -> Generator[List[Dict[str, Any]], None, None]:
"""
This is a generator used to initialize point in time API and query the
search API and return generator which yields an individual documents
search API and return generator which yields batches of hits as they
come in. No empty batches will be yielded, if there are no hits then
no batches will be yielded instead.
Parameters
----------
@ -1469,8 +1472,8 @@ def search_yield_hits(
Examples
--------
>>> results = list(search_yield_hits(query_compiler, body, 2)) # doctest: +SKIP
[{'_index': 'flights', '_type': '_doc', '_id': '0', '_score': None, '_source': {...}, 'sort': [...]},
{'_index': 'flights', '_type': '_doc', '_id': '1', '_score': None, '_source': {...}, 'sort': [...]}]
[[{'_index': 'flights', '_type': '_doc', '_id': '0', '_score': None, '_source': {...}, 'sort': [...]},
{'_index': 'flights', '_type': '_doc', '_id': '1', '_score': None, '_source': {...}, 'sort': [...]}]]
"""
# Make a copy of 'body' to avoid mutating it outside this function.
body = body.copy()
@ -1500,7 +1503,7 @@ def _search_with_scroll(
query_compiler: "QueryCompiler",
body: Dict[str, Any],
max_number_of_hits: Optional[int],
) -> Generator[Dict[str, Any], None, None]:
) -> Generator[List[Dict[str, Any]], None, None]:
# No documents, no reason to send a search.
if max_number_of_hits == 0:
return
@ -1533,8 +1536,11 @@ def _search_with_scroll(
hits_to_yield = min(len(hits), max_number_of_hits - hits_yielded)
# Yield the hits we need to and then track the total number.
yield from hits[:hits_to_yield]
hits_yielded += hits_to_yield
# Never yield an empty list as that makes things simpler for
# downstream consumers.
if hits and hits_to_yield > 0:
yield hits[:hits_to_yield]
hits_yielded += hits_to_yield
# Retrieve the next set of results
resp = client.scroll(
@ -1555,7 +1561,7 @@ def _search_with_pit_and_search_after(
query_compiler: "QueryCompiler",
body: Dict[str, Any],
max_number_of_hits: Optional[int],
) -> Generator[Dict[str, Any], None, None]:
) -> Generator[List[Dict[str, Any]], None, None]:
# No documents, no reason to send a search.
if max_number_of_hits == 0:
@ -1602,8 +1608,11 @@ def _search_with_pit_and_search_after(
hits_to_yield = min(len(hits), max_number_of_hits - hits_yielded)
# Yield the hits we need to and then track the total number.
yield from hits[:hits_to_yield]
hits_yielded += hits_to_yield
# Never yield an empty list as that makes things simpler for
# downstream consumers.
if hits and hits_to_yield > 0:
yield hits[:hits_to_yield]
hits_yielded += hits_to_yield
# Set the 'search_after' for the next request
# to be the last sort value for this set of hits.

View File

@ -148,7 +148,6 @@ class QueryCompiler:
def _es_results_to_pandas(
self,
results: List[Dict[str, Any]],
batch_size: Optional[int] = None,
show_progress: bool = False,
) -> "pd.Dataframe":
"""
@ -239,10 +238,8 @@ class QueryCompiler:
(which isn't great)
NOTE - using this lists is generally not a good way to use this API
"""
partial_result = False
if results is None:
return partial_result, self._empty_pd_ef()
if not results:
return self._empty_pd_ef()
# This is one of the most performance critical areas of eland, and it repeatedly calls
# self._mappings.field_name_pd_dtype and self._mappings.date_field_format
@ -253,8 +250,7 @@ class QueryCompiler:
index = []
i = 0
for hit in results:
i = i + 1
for i, hit in enumerate(results, 1):
if "_source" in hit:
row = hit["_source"]
@ -277,11 +273,6 @@ class QueryCompiler:
# flatten row to map correctly to 2D DataFrame
rows.append(self._flatten_dict(row, field_mapping_cache))
if batch_size is not None:
if i >= batch_size:
partial_result = True
break
if show_progress:
if i % DEFAULT_PROGRESS_REPORTING_NUM_ROWS == 0:
print(f"{datetime.now()}: read {i} rows")
@ -310,7 +301,7 @@ class QueryCompiler:
if show_progress:
print(f"{datetime.now()}: read {i} rows")
return partial_result, df
return df
def _flatten_dict(self, y, field_mapping_cache: "FieldMappingCache"):
out = {}