Optimize to_pandas() internally to improve performance

Co-authored-by: Seth Michael Larson <seth.larson@elastic.co>
This commit is contained in:
P. Sai Vinay 2021-10-13 23:53:04 +05:30 committed by GitHub
parent 6088f2e39d
commit 704c8982bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 55 deletions

View File

@ -18,6 +18,7 @@
import copy import copy
import warnings import warnings
from collections import defaultdict from collections import defaultdict
from datetime import datetime
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Any, Any,
@ -39,6 +40,7 @@ from eland.actions import PostProcessingAction
from eland.common import ( from eland.common import (
DEFAULT_PAGINATION_SIZE, DEFAULT_PAGINATION_SIZE,
DEFAULT_PIT_KEEP_ALIVE, DEFAULT_PIT_KEEP_ALIVE,
DEFAULT_PROGRESS_REPORTING_NUM_ROWS,
DEFAULT_SEARCH_SIZE, DEFAULT_SEARCH_SIZE,
SortOrder, SortOrder,
build_pd_series, build_pd_series,
@ -1198,9 +1200,24 @@ class Operations:
def to_pandas( def to_pandas(
self, query_compiler: "QueryCompiler", show_progress: bool = False self, query_compiler: "QueryCompiler", show_progress: bool = False
) -> pd.DataFrame: ) -> pd.DataFrame:
df = self._es_results(query_compiler, show_progress)
return df df_list: List[pd.DataFrame] = []
i = 0
for df in self.search_yield_pandas_dataframes(query_compiler=query_compiler):
if show_progress:
i = i + df.shape[0]
if i % DEFAULT_PROGRESS_REPORTING_NUM_ROWS == 0:
print(f"{datetime.now()}: read {i} rows")
df_list.append(df)
if show_progress:
print(f"{datetime.now()}: read {i} rows")
# pd.concat() can't handle an empty list
# because there aren't defined columns.
if not df_list:
return query_compiler._empty_pd_ef()
return pd.concat(df_list)
def to_csv( def to_csv(
self, self,
@ -1208,8 +1225,9 @@ class Operations:
show_progress: bool = False, show_progress: bool = False,
**kwargs: Union[bool, str], **kwargs: Union[bool, str],
) -> Optional[str]: ) -> Optional[str]:
df = self._es_results(query_compiler, show_progress) return self.to_pandas( # type: ignore[no-any-return]
return df.to_csv(**kwargs) # type: ignore[no-any-return] query_compiler=query_compiler, show_progress=show_progress
).to_csv(**kwargs)
def search_yield_pandas_dataframes( def search_yield_pandas_dataframes(
self, query_compiler: "QueryCompiler" self, query_compiler: "QueryCompiler"
@ -1241,42 +1259,6 @@ class Operations:
df = self._apply_df_post_processing(df, post_processing) df = self._apply_df_post_processing(df, post_processing)
yield df yield df
def _es_results(
self, query_compiler: "QueryCompiler", show_progress: bool = False
) -> pd.DataFrame:
query_params, post_processing = self._resolve_tasks(query_compiler)
result_size, sort_params = Operations._query_params_to_size_and_sort(
query_params
)
script_fields = query_params.script_fields
query = Query(query_params.query)
body = query.to_search_body()
if script_fields is not None:
body["script_fields"] = script_fields
# Only return requested field_names and add them to body
_source = query_compiler.get_field_names(include_scripted_fields=False)
body["_source"] = _source if _source else False
if sort_params:
body["sort"] = [sort_params]
es_results: List[Dict[str, Any]] = sum(
_search_yield_hits(
query_compiler=query_compiler, body=body, max_number_of_hits=result_size
),
[],
)
df = query_compiler._es_results_to_pandas(
results=es_results, show_progress=show_progress
)
df = self._apply_df_post_processing(df, post_processing)
return df
def index_count(self, query_compiler: "QueryCompiler", field: str) -> int: def index_count(self, query_compiler: "QueryCompiler", field: str) -> int:
# field is the index field so count values # field is the index field so count values
query_params, post_processing = self._resolve_tasks(query_compiler) query_params, post_processing = self._resolve_tasks(query_compiler)

View File

@ -16,7 +16,6 @@
# under the License. # under the License.
import copy import copy
from datetime import datetime
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Any, Any,
@ -33,11 +32,7 @@ from typing import (
import numpy as np import numpy as np
import pandas as pd # type: ignore import pandas as pd # type: ignore
from eland.common import ( from eland.common import elasticsearch_date_to_pandas_date, ensure_es_client
DEFAULT_PROGRESS_REPORTING_NUM_ROWS,
elasticsearch_date_to_pandas_date,
ensure_es_client,
)
from eland.field_mappings import FieldMappings from eland.field_mappings import FieldMappings
from eland.filter import BooleanFilter, QueryFilter from eland.filter import BooleanFilter, QueryFilter
from eland.index import Index from eland.index import Index
@ -149,7 +144,6 @@ class QueryCompiler:
def _es_results_to_pandas( def _es_results_to_pandas(
self, self,
results: List[Dict[str, Any]], results: List[Dict[str, Any]],
show_progress: bool = False,
) -> "pd.Dataframe": ) -> "pd.Dataframe":
""" """
Parameters Parameters
@ -274,10 +268,6 @@ class QueryCompiler:
# flatten row to map correctly to 2D DataFrame # flatten row to map correctly to 2D DataFrame
rows.append(self._flatten_dict(row, field_mapping_cache)) rows.append(self._flatten_dict(row, field_mapping_cache))
if show_progress:
if i % DEFAULT_PROGRESS_REPORTING_NUM_ROWS == 0:
print(f"{datetime.now()}: read {i} rows")
# Create pandas DataFrame # Create pandas DataFrame
df = pd.DataFrame(data=rows, index=index) df = pd.DataFrame(data=rows, index=index)
@ -299,9 +289,6 @@ class QueryCompiler:
if len(self.columns) > 1: if len(self.columns) > 1:
df = df[self.columns] df = df[self.columns]
if show_progress:
print(f"{datetime.now()}: read {i} rows")
return df return df
def _flatten_dict(self, y, field_mapping_cache: "FieldMappingCache"): def _flatten_dict(self, y, field_mapping_cache: "FieldMappingCache"):
@ -383,7 +370,7 @@ class QueryCompiler:
self, self.index.es_index_field, items self, self.index.es_index_field, items
) )
def _empty_pd_ef(self): def _empty_pd_ef(self) -> "pd.DataFrame":
# Return an empty dataframe with correct columns and dtypes # Return an empty dataframe with correct columns and dtypes
df = pd.DataFrame() df = pd.DataFrame()
for c, d in zip(self.dtypes.index, self.dtypes.values): for c, d in zip(self.dtypes.index, self.dtypes.values):