Feature/show progress (#120)

* Adding show_progress debug option to eland_to_pandas

* Adding show_progress debug option to eland_to_pandas
This commit is contained in:
stevedodson 2020-01-29 12:59:48 +00:00 committed by GitHub
parent 409cb043c8
commit 2ca538c49d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 79 additions and 40 deletions

View File

@ -17,6 +17,11 @@ from enum import Enum
DEFAULT_NUM_ROWS_DISPLAYED = 60
DEFAULT_CHUNK_SIZE = 10000
DEFAULT_CSV_BATCH_OUTPUT_SIZE = 10000
DEFAULT_PROGRESS_REPORTING_NUM_ROWS = 10000
DEFAULT_ES_MAX_RESULT_WINDOW = 10000 # index.max_result_window
def docstring_parameter(*sub):
def dec(obj):

View File

@ -973,7 +973,7 @@ class DataFrame(NDFrame):
}
return self._query_compiler.to_csv(**kwargs)
def _to_pandas(self):
def _to_pandas(self, show_progress=False):
"""
Utility method to convert eland.Dataframe to pandas.Dataframe
@ -981,7 +981,7 @@ class DataFrame(NDFrame):
-------
pandas.DataFrame
"""
return self._query_compiler.to_pandas()
return self._query_compiler.to_pandas(show_progress=show_progress)
def _empty_pd_df(self):
return self._query_compiler._empty_pd_ef()

View File

@ -371,7 +371,7 @@ class NDFrame(ABC):
return self._query_compiler.describe()
@abstractmethod
def _to_pandas(self):
def _to_pandas(self, show_progress=False):
pass
@abstractmethod

View File

@ -17,7 +17,7 @@ from collections import OrderedDict
import pandas as pd
from eland import Index, SortOrder
from eland import Index, SortOrder, DEFAULT_CSV_BATCH_OUTPUT_SIZE, DEFAULT_ES_MAX_RESULT_WINDOW
from eland import Query
from eland.actions import SortFieldAction
from eland.tasks import HeadTask, TailTask, BooleanFilterTask, ArithmeticOpFieldsTask, QueryTermsTask, \
@ -491,54 +491,70 @@ class Operations:
return df
def to_pandas(self, query_compiler):
def to_pandas(self, query_compiler, show_progress=False):
class PandasDataFrameCollector:
def __init__(self):
self.df = None
def __init__(self, show_progress):
self._df = None
self._show_progress = show_progress
def collect(self, df):
self.df = df
# This collector does not batch data on output. Therefore, batch_size is fixed to None and this method
# is only called once.
if self._df is not None:
raise RuntimeError("Logic error in execution, this method must only be called once for this"
"collector - batch_size == None")
self._df = df
@staticmethod
def batch_size():
# Do not change (see notes on collect)
return None
collector = PandasDataFrameCollector()
@property
def show_progress(self):
return self._show_progress
collector = PandasDataFrameCollector(show_progress)
self._es_results(query_compiler, collector)
return collector.df
return collector._df
def to_csv(self, query_compiler, **kwargs):
def to_csv(self, query_compiler, show_progress=False, **kwargs):
class PandasToCSVCollector:
def __init__(self, **args):
self.args = args
self.ret = None
self.first_time = True
def __init__(self, show_progress, **args):
self._args = args
self._show_progress = show_progress
self._ret = None
self._first_time = True
def collect(self, df):
# If this is the first time we collect results, then write header, otherwise don't write header
# and append results
if self.first_time:
self.first_time = False
df.to_csv(**self.args)
if self._first_time:
self._first_time = False
df.to_csv(**self._args)
else:
# Don't write header, and change mode to append
self.args['header'] = False
self.args['mode'] = 'a'
df.to_csv(**self.args)
self._args['header'] = False
self._args['mode'] = 'a'
df.to_csv(**self._args)
@staticmethod
def batch_size():
# By default read 10000 docs to csv
batch_size = 10000
# By default read n docs and then dump to csv
batch_size = DEFAULT_CSV_BATCH_OUTPUT_SIZE
return batch_size
collector = PandasToCSVCollector(**kwargs)
@property
def show_progress(self):
return self._show_progress
collector = PandasToCSVCollector(show_progress, **kwargs)
self._es_results(query_compiler, collector)
return collector.ret
return collector._ret
def _es_results(self, query_compiler, collector):
query_params, post_processing = self._resolve_tasks(query_compiler)
@ -562,7 +578,7 @@ class Operations:
# If size=None use scan not search - then post sort results when in df
# If size>10000 use scan
is_scan = False
if size is not None and size <= 10000:
if size is not None and size <= DEFAULT_ES_MAX_RESULT_WINDOW:
if size > 0:
try:
es_results = query_compiler._client.search(
@ -594,7 +610,8 @@ class Operations:
if is_scan:
while True:
partial_result, df = query_compiler._es_results_to_pandas(es_results, collector.batch_size())
partial_result, df = query_compiler._es_results_to_pandas(es_results, collector.batch_size(),
collector.show_progress)
df = self._apply_df_post_processing(df, post_processing)
collector.collect(df)
if not partial_result:

View File

@ -14,12 +14,13 @@
import copy
import warnings
from collections import OrderedDict
from datetime import datetime
from typing import Union
import numpy as np
import pandas as pd
from eland import Client
from eland import Client, DEFAULT_PROGRESS_REPORTING_NUM_ROWS
from eland import FieldMappings
from eland import Index
from eland import Operations
@ -109,7 +110,7 @@ class QueryCompiler:
# END Index, columns, and dtypes objects
def _es_results_to_pandas(self, results, batch_size=None):
def _es_results_to_pandas(self, results, batch_size=None, show_progress=False):
"""
Parameters
----------
@ -248,6 +249,10 @@ class QueryCompiler:
partial_result = True
break
if show_progress:
if i % DEFAULT_PROGRESS_REPORTING_NUM_ROWS == 0:
print("{}: read {} rows".format(datetime.now(), i))
# Create pandas DataFrame
df = pd.DataFrame(data=rows, index=index)
@ -267,6 +272,9 @@ class QueryCompiler:
if len(self.columns) > 1:
df = df[self.columns]
if show_progress:
print("{}: read {} rows".format(datetime.now(), i))
return partial_result, df
def _flatten_dict(self, y, field_mapping_cache):
@ -381,13 +389,13 @@ class QueryCompiler:
return result
# To/From Pandas
def to_pandas(self):
def to_pandas(self, show_progress=False):
"""Converts Eland DataFrame to Pandas DataFrame.
Returns:
Pandas DataFrame
"""
return self._operations.to_pandas(self)
return self._operations.to_pandas(self, show_progress)
# To CSV
def to_csv(self, **kwargs):

View File

@ -387,8 +387,8 @@ class Series(NDFrame):
result = _buf.getvalue()
return result
def _to_pandas(self):
return self._query_compiler.to_pandas()[self.name]
def _to_pandas(self, show_progress=False):
return self._query_compiler.to_pandas(show_progress=show_progress)[self.name]
@property
def _dtype(self):

View File

@ -57,4 +57,7 @@ class TestDataFrameUtils(TestData):
def test_eland_to_pandas_performance(self):
# TODO quantify this
pd_df = ed.eland_to_pandas(self.ed_flights())
pd_df = ed.eland_to_pandas(self.ed_flights(), show_progress=True)
# This test calls the same method so is redundant
#assert_pandas_eland_frame_equal(pd_df, self.ed_flights())

View File

@ -17,12 +17,10 @@ import csv
import pandas as pd
from pandas.io.parsers import _c_parser_defaults
from eland import Client
from eland import Client, DEFAULT_CHUNK_SIZE
from eland import DataFrame
from eland import FieldMappings
DEFAULT_CHUNK_SIZE = 10000
def read_es(es_client, es_index_pattern):
"""
@ -58,7 +56,7 @@ def pandas_to_eland(pd_df,
es_refresh=False,
es_dropna=False,
es_geo_points=None,
chunksize = None):
chunksize=None):
"""
Append a pandas DataFrame to an Elasticsearch index.
Mainly used in testing.
@ -211,7 +209,7 @@ def pandas_to_eland(pd_df,
return ed_df
def eland_to_pandas(ed_df):
def eland_to_pandas(ed_df, show_progress=False):
"""
Convert an eland.Dataframe to a pandas.DataFrame
@ -222,6 +220,8 @@ def eland_to_pandas(ed_df):
----------
ed_df: eland.DataFrame
The source eland.Dataframe referencing the Elasticsearch index
show_progress: bool
Output progress of option to stdout? By default False.
Returns
-------
@ -258,12 +258,18 @@ def eland_to_pandas(ed_df):
<BLANKLINE>
[5 rows x 27 columns]
Convert `eland.DataFrame` to `pandas.DataFrame` and show progress every 10000 rows
>>> pd_df = ed.eland_to_pandas(ed.DataFrame('localhost', 'flights'), show_progress=True) # doctest: +SKIP
2020-01-29 12:43:36.572395: read 10000 rows
2020-01-29 12:43:37.309031: read 13059 rows
See Also
--------
eland.read_es: Create an eland.Dataframe from an Elasticsearch index
eland.pandas_to_eland: Create an eland.Dataframe from pandas.DataFrame
"""
return ed_df._to_pandas()
return ed_df._to_pandas(show_progress=show_progress)
def read_csv(filepath_or_buffer,