diff --git a/eland/operations.py b/eland/operations.py index 0fad8ef..48ff43f 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -18,6 +18,7 @@ import copy import warnings from collections import defaultdict +from datetime import datetime from typing import ( TYPE_CHECKING, Any, @@ -39,6 +40,7 @@ from eland.actions import PostProcessingAction from eland.common import ( DEFAULT_PAGINATION_SIZE, DEFAULT_PIT_KEEP_ALIVE, + DEFAULT_PROGRESS_REPORTING_NUM_ROWS, DEFAULT_SEARCH_SIZE, SortOrder, build_pd_series, @@ -1198,9 +1200,24 @@ class Operations: def to_pandas( self, query_compiler: "QueryCompiler", show_progress: bool = False ) -> pd.DataFrame: - df = self._es_results(query_compiler, show_progress) - return df + df_list: List[pd.DataFrame] = [] + i = 0 + for df in self.search_yield_pandas_dataframes(query_compiler=query_compiler): + if show_progress: + i = i + df.shape[0] + if i % DEFAULT_PROGRESS_REPORTING_NUM_ROWS == 0: + print(f"{datetime.now()}: read {i} rows") + df_list.append(df) + + if show_progress: + print(f"{datetime.now()}: read {i} rows") + + # pd.concat() can't handle an empty list + # because there aren't defined columns. + if not df_list: + return query_compiler._empty_pd_ef() + return pd.concat(df_list) def to_csv( self, @@ -1208,8 +1225,9 @@ class Operations: show_progress: bool = False, **kwargs: Union[bool, str], ) -> Optional[str]: - df = self._es_results(query_compiler, show_progress) - return df.to_csv(**kwargs) # type: ignore[no-any-return] + return self.to_pandas( # type: ignore[no-any-return] + query_compiler=query_compiler, show_progress=show_progress + ).to_csv(**kwargs) def search_yield_pandas_dataframes( self, query_compiler: "QueryCompiler" @@ -1241,42 +1259,6 @@ class Operations: df = self._apply_df_post_processing(df, post_processing) yield df - def _es_results( - self, query_compiler: "QueryCompiler", show_progress: bool = False - ) -> pd.DataFrame: - query_params, post_processing = self._resolve_tasks(query_compiler) - - result_size, sort_params = Operations._query_params_to_size_and_sort( - query_params - ) - - script_fields = query_params.script_fields - query = Query(query_params.query) - - body = query.to_search_body() - if script_fields is not None: - body["script_fields"] = script_fields - - # Only return requested field_names and add them to body - _source = query_compiler.get_field_names(include_scripted_fields=False) - body["_source"] = _source if _source else False - - if sort_params: - body["sort"] = [sort_params] - - es_results: List[Dict[str, Any]] = sum( - _search_yield_hits( - query_compiler=query_compiler, body=body, max_number_of_hits=result_size - ), - [], - ) - - df = query_compiler._es_results_to_pandas( - results=es_results, show_progress=show_progress - ) - df = self._apply_df_post_processing(df, post_processing) - return df - def index_count(self, query_compiler: "QueryCompiler", field: str) -> int: # field is the index field so count values query_params, post_processing = self._resolve_tasks(query_compiler) diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 4a81804..8c8b562 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -16,7 +16,6 @@ # under the License. import copy -from datetime import datetime from typing import ( TYPE_CHECKING, Any, @@ -33,11 +32,7 @@ from typing import ( import numpy as np import pandas as pd # type: ignore -from eland.common import ( - DEFAULT_PROGRESS_REPORTING_NUM_ROWS, - elasticsearch_date_to_pandas_date, - ensure_es_client, -) +from eland.common import elasticsearch_date_to_pandas_date, ensure_es_client from eland.field_mappings import FieldMappings from eland.filter import BooleanFilter, QueryFilter from eland.index import Index @@ -149,7 +144,6 @@ class QueryCompiler: def _es_results_to_pandas( self, results: List[Dict[str, Any]], - show_progress: bool = False, ) -> "pd.Dataframe": """ Parameters @@ -274,10 +268,6 @@ class QueryCompiler: # flatten row to map correctly to 2D DataFrame rows.append(self._flatten_dict(row, field_mapping_cache)) - if show_progress: - if i % DEFAULT_PROGRESS_REPORTING_NUM_ROWS == 0: - print(f"{datetime.now()}: read {i} rows") - # Create pandas DataFrame df = pd.DataFrame(data=rows, index=index) @@ -299,9 +289,6 @@ class QueryCompiler: if len(self.columns) > 1: df = df[self.columns] - if show_progress: - print(f"{datetime.now()}: read {i} rows") - return df def _flatten_dict(self, y, field_mapping_cache: "FieldMappingCache"): @@ -383,7 +370,7 @@ class QueryCompiler: self, self.index.es_index_field, items ) - def _empty_pd_ef(self): + def _empty_pd_ef(self) -> "pd.DataFrame": # Return an empty dataframe with correct columns and dtypes df = pd.DataFrame() for c, d in zip(self.dtypes.index, self.dtypes.values):