diff --git a/eland/common.py b/eland/common.py index 053ebc4..4490571 100644 --- a/eland/common.py +++ b/eland/common.py @@ -17,6 +17,11 @@ from enum import Enum DEFAULT_NUM_ROWS_DISPLAYED = 60 +DEFAULT_CHUNK_SIZE = 10000 +DEFAULT_CSV_BATCH_OUTPUT_SIZE = 10000 +DEFAULT_PROGRESS_REPORTING_NUM_ROWS = 10000 +DEFAULT_ES_MAX_RESULT_WINDOW = 10000 # index.max_result_window + def docstring_parameter(*sub): def dec(obj): diff --git a/eland/dataframe.py b/eland/dataframe.py index f3c19b8..4150e19 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -973,7 +973,7 @@ class DataFrame(NDFrame): } return self._query_compiler.to_csv(**kwargs) - def _to_pandas(self): + def _to_pandas(self, show_progress=False): """ Utility method to convert eland.Dataframe to pandas.Dataframe @@ -981,7 +981,7 @@ class DataFrame(NDFrame): ------- pandas.DataFrame """ - return self._query_compiler.to_pandas() + return self._query_compiler.to_pandas(show_progress=show_progress) def _empty_pd_df(self): return self._query_compiler._empty_pd_ef() diff --git a/eland/ndframe.py b/eland/ndframe.py index b37769a..f14096c 100644 --- a/eland/ndframe.py +++ b/eland/ndframe.py @@ -371,7 +371,7 @@ class NDFrame(ABC): return self._query_compiler.describe() @abstractmethod - def _to_pandas(self): + def _to_pandas(self, show_progress=False): pass @abstractmethod diff --git a/eland/operations.py b/eland/operations.py index 4c8a618..378b38c 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -17,7 +17,7 @@ from collections import OrderedDict import pandas as pd -from eland import Index, SortOrder +from eland import Index, SortOrder, DEFAULT_CSV_BATCH_OUTPUT_SIZE, DEFAULT_ES_MAX_RESULT_WINDOW from eland import Query from eland.actions import SortFieldAction from eland.tasks import HeadTask, TailTask, BooleanFilterTask, ArithmeticOpFieldsTask, QueryTermsTask, \ @@ -491,54 +491,70 @@ class Operations: return df - def to_pandas(self, query_compiler): + def to_pandas(self, query_compiler, show_progress=False): class PandasDataFrameCollector: - def __init__(self): - self.df = None + def __init__(self, show_progress): + self._df = None + self._show_progress = show_progress def collect(self, df): - self.df = df + # This collector does not batch data on output. Therefore, batch_size is fixed to None and this method + # is only called once. + if self._df is not None: + raise RuntimeError("Logic error in execution, this method must only be called once for this" + "collector - batch_size == None") + self._df = df @staticmethod def batch_size(): + # Do not change (see notes on collect) return None - collector = PandasDataFrameCollector() + @property + def show_progress(self): + return self._show_progress + + collector = PandasDataFrameCollector(show_progress) self._es_results(query_compiler, collector) - return collector.df + return collector._df - def to_csv(self, query_compiler, **kwargs): + def to_csv(self, query_compiler, show_progress=False, **kwargs): class PandasToCSVCollector: - def __init__(self, **args): - self.args = args - self.ret = None - self.first_time = True + def __init__(self, show_progress, **args): + self._args = args + self._show_progress = show_progress + self._ret = None + self._first_time = True def collect(self, df): # If this is the first time we collect results, then write header, otherwise don't write header # and append results - if self.first_time: - self.first_time = False - df.to_csv(**self.args) + if self._first_time: + self._first_time = False + df.to_csv(**self._args) else: # Don't write header, and change mode to append - self.args['header'] = False - self.args['mode'] = 'a' - df.to_csv(**self.args) + self._args['header'] = False + self._args['mode'] = 'a' + df.to_csv(**self._args) @staticmethod def batch_size(): - # By default read 10000 docs to csv - batch_size = 10000 + # By default read n docs and then dump to csv + batch_size = DEFAULT_CSV_BATCH_OUTPUT_SIZE return batch_size - collector = PandasToCSVCollector(**kwargs) + @property + def show_progress(self): + return self._show_progress + + collector = PandasToCSVCollector(show_progress, **kwargs) self._es_results(query_compiler, collector) - return collector.ret + return collector._ret def _es_results(self, query_compiler, collector): query_params, post_processing = self._resolve_tasks(query_compiler) @@ -562,7 +578,7 @@ class Operations: # If size=None use scan not search - then post sort results when in df # If size>10000 use scan is_scan = False - if size is not None and size <= 10000: + if size is not None and size <= DEFAULT_ES_MAX_RESULT_WINDOW: if size > 0: try: es_results = query_compiler._client.search( @@ -594,7 +610,8 @@ class Operations: if is_scan: while True: - partial_result, df = query_compiler._es_results_to_pandas(es_results, collector.batch_size()) + partial_result, df = query_compiler._es_results_to_pandas(es_results, collector.batch_size(), + collector.show_progress) df = self._apply_df_post_processing(df, post_processing) collector.collect(df) if not partial_result: diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 6dd3cdf..efcda69 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -14,12 +14,13 @@ import copy import warnings from collections import OrderedDict +from datetime import datetime from typing import Union import numpy as np import pandas as pd -from eland import Client +from eland import Client, DEFAULT_PROGRESS_REPORTING_NUM_ROWS from eland import FieldMappings from eland import Index from eland import Operations @@ -109,7 +110,7 @@ class QueryCompiler: # END Index, columns, and dtypes objects - def _es_results_to_pandas(self, results, batch_size=None): + def _es_results_to_pandas(self, results, batch_size=None, show_progress=False): """ Parameters ---------- @@ -248,6 +249,10 @@ class QueryCompiler: partial_result = True break + if show_progress: + if i % DEFAULT_PROGRESS_REPORTING_NUM_ROWS == 0: + print("{}: read {} rows".format(datetime.now(), i)) + # Create pandas DataFrame df = pd.DataFrame(data=rows, index=index) @@ -267,6 +272,9 @@ class QueryCompiler: if len(self.columns) > 1: df = df[self.columns] + if show_progress: + print("{}: read {} rows".format(datetime.now(), i)) + return partial_result, df def _flatten_dict(self, y, field_mapping_cache): @@ -381,13 +389,13 @@ class QueryCompiler: return result # To/From Pandas - def to_pandas(self): + def to_pandas(self, show_progress=False): """Converts Eland DataFrame to Pandas DataFrame. Returns: Pandas DataFrame """ - return self._operations.to_pandas(self) + return self._operations.to_pandas(self, show_progress) # To CSV def to_csv(self, **kwargs): diff --git a/eland/series.py b/eland/series.py index cc22845..17b1e5c 100644 --- a/eland/series.py +++ b/eland/series.py @@ -387,8 +387,8 @@ class Series(NDFrame): result = _buf.getvalue() return result - def _to_pandas(self): - return self._query_compiler.to_pandas()[self.name] + def _to_pandas(self, show_progress=False): + return self._query_compiler.to_pandas(show_progress=show_progress)[self.name] @property def _dtype(self): diff --git a/eland/tests/dataframe/test_utils_pytest.py b/eland/tests/dataframe/test_utils_pytest.py index 8edb1e5..aea7bd6 100644 --- a/eland/tests/dataframe/test_utils_pytest.py +++ b/eland/tests/dataframe/test_utils_pytest.py @@ -57,4 +57,7 @@ class TestDataFrameUtils(TestData): def test_eland_to_pandas_performance(self): # TODO quantify this - pd_df = ed.eland_to_pandas(self.ed_flights()) + pd_df = ed.eland_to_pandas(self.ed_flights(), show_progress=True) + + # This test calls the same method so is redundant + #assert_pandas_eland_frame_equal(pd_df, self.ed_flights()) diff --git a/eland/utils.py b/eland/utils.py index 721b994..c388f42 100644 --- a/eland/utils.py +++ b/eland/utils.py @@ -17,12 +17,10 @@ import csv import pandas as pd from pandas.io.parsers import _c_parser_defaults -from eland import Client +from eland import Client, DEFAULT_CHUNK_SIZE from eland import DataFrame from eland import FieldMappings -DEFAULT_CHUNK_SIZE = 10000 - def read_es(es_client, es_index_pattern): """ @@ -58,7 +56,7 @@ def pandas_to_eland(pd_df, es_refresh=False, es_dropna=False, es_geo_points=None, - chunksize = None): + chunksize=None): """ Append a pandas DataFrame to an Elasticsearch index. Mainly used in testing. @@ -211,7 +209,7 @@ def pandas_to_eland(pd_df, return ed_df -def eland_to_pandas(ed_df): +def eland_to_pandas(ed_df, show_progress=False): """ Convert an eland.Dataframe to a pandas.DataFrame @@ -222,6 +220,8 @@ def eland_to_pandas(ed_df): ---------- ed_df: eland.DataFrame The source eland.Dataframe referencing the Elasticsearch index + show_progress: bool + Output progress of option to stdout? By default False. Returns ------- @@ -258,12 +258,18 @@ def eland_to_pandas(ed_df): [5 rows x 27 columns] + Convert `eland.DataFrame` to `pandas.DataFrame` and show progress every 10000 rows + + >>> pd_df = ed.eland_to_pandas(ed.DataFrame('localhost', 'flights'), show_progress=True) # doctest: +SKIP + 2020-01-29 12:43:36.572395: read 10000 rows + 2020-01-29 12:43:37.309031: read 13059 rows + See Also -------- eland.read_es: Create an eland.Dataframe from an Elasticsearch index eland.pandas_to_eland: Create an eland.Dataframe from pandas.DataFrame """ - return ed_df._to_pandas() + return ed_df._to_pandas(show_progress=show_progress) def read_csv(filepath_or_buffer,