diff --git a/eland/dataframe.py b/eland/dataframe.py index 53649b8..dd47a59 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -6,6 +6,9 @@ import numpy as np import pandas as pd from pandas.compat import StringIO from pandas.core.common import apply_if_callable, is_bool_indexer +from pandas.core.dtypes.common import ( + is_list_like +) from pandas.io.common import _expand_user, _stringify_path from pandas.io.formats import console from pandas.io.formats import format as fmt @@ -431,6 +434,36 @@ class DataFrame(NDFrame): def _reduce_dimension(self, query_compiler): return Series(query_compiler=query_compiler) + def to_csv(self, path_or_buf=None, sep=",", na_rep='', float_format=None, + columns=None, header=True, index=True, index_label=None, + mode='w', encoding=None, compression='infer', quoting=None, + quotechar='"', line_terminator=None, chunksize=None, + tupleize_cols=None, date_format=None, doublequote=True, + escapechar=None, decimal='.'): + kwargs = { + "path_or_buf": path_or_buf, + "sep": sep, + "na_rep": na_rep, + "float_format": float_format, + "columns": columns, + "header": header, + "index": index, + "index_label": index_label, + "mode": mode, + "encoding": encoding, + "compression": compression, + "quoting": quoting, + "quotechar": quotechar, + "line_terminator": line_terminator, + "chunksize": chunksize, + "tupleize_cols": tupleize_cols, + "date_format": date_format, + "doublequote": doublequote, + "escapechar": escapechar, + "decimal": decimal, + } + return self._query_compiler.to_csv(**kwargs) + def _to_pandas(self): return self._query_compiler.to_pandas() @@ -469,53 +502,45 @@ class DataFrame(NDFrame): def keys(self): return self.columns - def to_csv( - self, - path_or_buf=None, - sep=",", - na_rep="", - float_format=None, - columns=None, - header=True, - index=True, - index_label=None, - mode="w", - encoding=None, - compression="infer", - quoting=None, - quotechar='"', - line_terminator=None, - chunksize=None, - tupleize_cols=None, - date_format=None, - doublequote=True, - escapechar=None, - decimal=".", - *args, - **kwargs - ): - kwargs = { - "path_or_buf": path_or_buf, - "sep": sep, - "na_rep": na_rep, - "float_format": float_format, - "columns": columns, - "header": header, - "index": index, - "index_label": index_label, - "mode": mode, - "encoding": encoding, - "compression": compression, - "quoting": quoting, - "quotechar": quotechar, - "line_terminator": line_terminator, - "chunksize": chunksize, - "tupleize_cols": tupleize_cols, - "date_format": date_format, - "doublequote": doublequote, - "escapechar": escapechar, - "decimal": decimal, - } + def aggregate(self, func, axis=0, *args, **kwargs): + """ + Aggregate using one or more operations over the specified axis. + + Parameters + ---------- + func : function, str, list or dict + Function to use for aggregating the data. If a function, must either + work when passed a %(klass)s or when passed to %(klass)s.apply. + + Accepted combinations are: + + - function + - string function name + - list of functions and/or function names, e.g. ``[np.sum, 'mean']`` + - dict of axis labels -> functions, function names or list of such. + %(axis)s + *args + Positional arguments to pass to `func`. + **kwargs + Keyword arguments to pass to `func`. + + Returns + ------- + DataFrame, Series or scalar + if DataFrame.agg is called with a single function, returns a Series + if DataFrame.agg is called with several functions, returns a DataFrame + if Series.agg is called with single function, returns a scalar + if Series.agg is called with several functions, returns a Series + """ + axis = self._get_axis_number(axis) + + if axis == 1: + raise NotImplementedError("Aggregating via index not currently implemented - needs index transform") + + # currently we only support a subset of functions that aggregate columns. + # ['count', 'mad', 'max', 'mean', 'median', 'min', 'mode', 'quantile', 'rank', 'sem', 'skew', 'sum', 'std', 'var', 'nunique'] + + agg = aggregate hist = gfx.ed_hist_frame diff --git a/eland/operations.py b/eland/operations.py index 8b226cb..7ec29c2 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -324,6 +324,48 @@ class Operations: return df def to_pandas(self, query_compiler): + class PandasDataFrameCollector: + def collect(self, df): + self.df = df + def batch_size(self): + return None + + collector = PandasDataFrameCollector() + + self._es_results(query_compiler, collector) + + return collector.df + + def to_csv(self, query_compiler, **kwargs): + class PandasToCSVCollector: + def __init__(self, **kwargs): + self.kwargs = kwargs + self.ret = None + self.first_time = True + def collect(self, df): + # If this is the first time we collect results, then write header, otherwise don't write header + # and append results + if self.first_time: + self.first_time = False + df.to_csv(**self.kwargs) + else: + # Don't write header, and change mode to append + self.kwargs['header'] = False + self.kwargs['mode'] = 'a' + df.to_csv(**self.kwargs) + + def batch_size(self): + # By default read 10000 docs to csv + batch_size = 10000 + return batch_size + + collector = PandasToCSVCollector(**kwargs) + + self._es_results(query_compiler, collector) + + return collector.ret + + def _es_results(self, query_compiler, collector): query_params, post_processing = self._resolve_tasks() size, sort_params = Operations._query_params_to_size_and_sort(query_params) @@ -337,6 +379,7 @@ class Operations: # If size=None use scan not search - then post sort results when in df # If size>10000 use scan + is_scan = False if size is not None and size <= 10000: if size > 0: es_results = query_compiler._client.search( @@ -346,6 +389,7 @@ class Operations: body=body.to_search_body(), _source=columns) else: + is_scan = True es_results = query_compiler._client.scan( index=query_compiler._index_pattern, query=body.to_search_body(), @@ -354,9 +398,17 @@ class Operations: if sort_params is not None: post_processing.append(self._sort_params_to_postprocessing(sort_params)) - df = query_compiler._es_results_to_pandas(es_results) - - return self._apply_df_post_processing(df, post_processing) + if is_scan: + while True: + partial_result, df = query_compiler._es_results_to_pandas(es_results, collector.batch_size()) + df = self._apply_df_post_processing(df, post_processing) + collector.collect(df) + if partial_result == False: + break + else: + partial_result, df = query_compiler._es_results_to_pandas(es_results) + df = self._apply_df_post_processing(df, post_processing) + collector.collect(df) def iloc(self, index, columns): # index and columns are indexers @@ -639,7 +691,8 @@ class Operations: def _resolve_post_processing_task(self, item, query_params, post_processing): # Just do this in post-processing - post_processing.append(item) + if item[0] != 'columns': + post_processing.append(item) return query_params, post_processing diff --git a/eland/query_compiler.py b/eland/query_compiler.py index db2e1f5..92fd6a8 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -6,6 +6,10 @@ from eland import Index from eland import Mappings from eland import Operations +from pandas.core.dtypes.common import ( + is_list_like +) + from pandas.core.indexes.numeric import Int64Index from pandas.core.indexes.range import RangeIndex @@ -37,9 +41,6 @@ class ElandQueryCompiler(BaseQueryCompiler): in the first/last n fields. A way to mitigate this would be to post process this drop - TODO - - - """ def __init__(self, @@ -95,7 +96,7 @@ class ElandQueryCompiler(BaseQueryCompiler): # END Index, columns, and dtypes objects - def _es_results_to_pandas(self, results): + def _es_results_to_pandas(self, results, batch_size=None): """ Parameters ---------- @@ -182,17 +183,25 @@ class ElandQueryCompiler(BaseQueryCompiler): TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great) NOTE - using this lists is generally not a good way to use this API """ + partial_result = False + if results is None: - return self._empty_pd_ef() + return partial_result, self._empty_pd_ef() rows = [] index = [] if isinstance(results, dict): iterator = results['hits']['hits'] + + if batch_size is not None: + raise NotImplementedError("Can not specify batch_size with dict results") else: iterator = results + i = 0 for hit in iterator: + i = i + 1 + row = hit['_source'] # get index value - can be _id or can be field value in source @@ -205,6 +214,11 @@ class ElandQueryCompiler(BaseQueryCompiler): # flatten row to map correctly to 2D DataFrame rows.append(self._flatten_dict(row)) + if batch_size is not None: + if i >= batch_size: + partial_result = True + break + # Create pandas DataFrame df = pd.DataFrame(data=rows, index=index) @@ -221,62 +235,7 @@ class ElandQueryCompiler(BaseQueryCompiler): # Sort columns in mapping order df = df[self.columns] - return df - - def _to_csv(self, results, **kwargs): - # Very similar to _es_results_to_pandas except we create partial pandas.DataFrame - # and write these to csv - - # Use chunksize in kwargs do determine size of partial data frame - if 'chunksize' in kwargs: - chunksize = kwargs['chunksize'] - else: - # If no default chunk, set to 1000 - chunksize = 1000 - - if results is None: - return self._empty_pd_ef() - - rows = [] - index = [] - if isinstance(results, dict): - iterator = results['hits']['hits'] - else: - iterator = results - - i = 0 - for hit in iterator: - row = hit['_source'] - - # get index value - can be _id or can be field value in source - if self._index.is_source_field: - index_field = row[self._index.index_field] - else: - index_field = hit[self._index.index_field] - index.append(index_field) - - # flatten row to map correctly to 2D DataFrame - rows.append(self._flatten_dict(row)) - - i = i + 1 - if i % chunksize == 0: - # Create pandas DataFrame - df = pd.DataFrame(data=rows, index=index) - - # _source may not contain all columns in the mapping - # therefore, fill in missing columns - # (note this returns self.columns NOT IN df.columns) - missing_columns = list(set(self.columns) - set(df.columns)) - - for missing in missing_columns: - is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(missing) - df[missing] = None - df[missing].astype(pd_dtype) - - # Sort columns in mapping order - df = df[self.columns] - - return df + return partial_result, df def _flatten_dict(self, y): out = {} @@ -301,6 +260,13 @@ class ElandQueryCompiler(BaseQueryCompiler): # Coerce types - for now just datetime if pd_dtype == 'datetime64[ns]': + # TODO - this doesn't work for certain ES date formats + # e.g. "@timestamp" : { + # "type" : "date", + # "format" : "epoch_millis" + # } + # 1484053499256 - we need to check ES type and format and add conversions like: + # pd.to_datetime(x, unit='ms') x = pd.to_datetime(x) # Elasticsearch can have multiple values for a field. These are represented as lists, so @@ -383,6 +349,15 @@ class ElandQueryCompiler(BaseQueryCompiler): """ return self._operations.to_pandas(self) + # To CSV + def to_csv(self, **kwargs): + """Serialises Eland Dataframe to CSV + + Returns: + If path_or_buf is None, returns the resulting csv format as a string. Otherwise returns None. + """ + return self._operations.to_csv(self, **kwargs) + # __getitem__ methods def getitem_column_array(self, key, numeric=False): """Get column data for target labels. @@ -457,3 +432,33 @@ class ElandQueryCompiler(BaseQueryCompiler): def _hist(self, num_bins): return self._operations.hist(self, num_bins) + + def apply(self, func, axis, *args, **kwargs): + """Apply func across given axis. + + Args: + func: The function to apply. + axis: Target axis to apply the function along. + + Returns: + A new QueryCompiler. + """ + """Apply func across given axis. + + Args: + func: The function to apply. + axis: Target axis to apply the function along. + + Returns: + A new PandasQueryCompiler. + """ + if callable(func): + return self._callable_func(func, axis, *args, **kwargs) + elif isinstance(func, dict): + return self._dict_func(func, axis, *args, **kwargs) + elif is_list_like(func): + return self._list_like_func(func, axis, *args, **kwargs) + else: + pass + + diff --git a/eland/tests/__init__.py b/eland/tests/__init__.py index 1be1a2a..f5dbb7c 100644 --- a/eland/tests/__init__.py +++ b/eland/tests/__init__.py @@ -102,7 +102,6 @@ FLIGHTS_DF_FILE_NAME = ROOT_DIR + '/flights_df.json.gz' FLIGHTS_SMALL_INDEX_NAME = 'flights_small' FLIGHTS_SMALL_MAPPING = FLIGHTS_MAPPING FLIGHTS_SMALL_FILE_NAME = ROOT_DIR + '/flights_small.json.gz' -FLIGHTS_SMALL_DF_FILE_NAME = ROOT_DIR + '/flights_small_df.json.gz' ECOMMERCE_INDEX_NAME = 'ecommerce' ECOMMERCE_MAPPING = { "mappings" : { diff --git a/eland/tests/dataframe/results/.gitignore b/eland/tests/dataframe/results/.gitignore new file mode 100644 index 0000000..e69de29 diff --git a/eland/tests/dataframe/test_aggs_pytest.py b/eland/tests/dataframe/test_aggs_pytest.py index b03847c..6affb79 100644 --- a/eland/tests/dataframe/test_aggs_pytest.py +++ b/eland/tests/dataframe/test_aggs_pytest.py @@ -12,6 +12,46 @@ class TestDataFrameAggs(TestData): pd_flights = self.pd_flights() ed_flights = self.ed_flights() + pd_numerics = pd_flights.select_dtypes(include=[np.number]) + print(pd_numerics.columns) + print(pd_numerics.agg('abs')) # all rows + print(pd_numerics.agg('all')) # columns True/False + print(pd_numerics.agg('any')) # columns True/False + print(pd_numerics.agg('corr')) # matrix col/col + print(pd_numerics.agg('count')) # columns count + print(pd_numerics.agg('cov')) # matrix col/col + print(pd_numerics.agg('cummax')) # all rows + print(pd_numerics.agg('cummin')) # all rows + print(pd_numerics.agg('cumprod')) # all rows + print(pd_numerics.agg('cumsum')) # all rows + print(pd_numerics.agg('describe')) # describe + print(pd_numerics.agg('diff')) # all rows + print(pd_numerics.agg('kurt')) # ?> + print(pd_numerics.agg('mad')) # col + print('MAX') + print(pd_numerics.agg('max')) # col + print(pd_numerics.agg('mean')) # col + print(pd_numerics.agg('median')) # col + print(pd_numerics.agg('min')) # col + print(pd_numerics.agg('mode')) # col + print(pd_numerics.agg('pct_change')) # all rows + print(pd_numerics.agg('prod')) # all rows + print(pd_numerics.agg('quantile')) # col + print(pd_numerics.agg('rank')) # col + print(pd_numerics.agg('round')) # all rows + print('SEM') + print(pd_numerics.agg('sem')) # col + print(pd_numerics.agg('skew')) # col + print(pd_numerics.agg('sum')) # col + print(pd_numerics.agg('std')) # col + print(pd_numerics.agg('var')) # col + print(pd_numerics.agg('nunique')) # col + + print(pd_numerics.aggs(np.sqrt)) # all rows + + + return + pd_sum_min = pd_flights.select_dtypes(include=[np.number]).agg(['sum', 'min']) print(type(pd_sum_min)) with pd.option_context('display.max_rows', None, 'display.max_columns', None): diff --git a/eland/tests/dataframe/test_to_csv_pytest.py b/eland/tests/dataframe/test_to_csv_pytest.py index a6deabe..b2b0911 100644 --- a/eland/tests/dataframe/test_to_csv_pytest.py +++ b/eland/tests/dataframe/test_to_csv_pytest.py @@ -1,14 +1,43 @@ # File called _pytest for PyCharm compatability -import numpy as np import pandas as pd -import eland as ed -from eland.tests.common import ELASTICSEARCH_HOST from eland.tests.common import TestData +from pandas.util.testing import (assert_equal, assert_frame_equal) + +import ast class TestDataFrameToCSV(TestData): - def test_to_csv(self): - print("TODO") + def test_to_csv_head(self): + ed_flights = self.ed_flights().head() + pd_flights = self.pd_flights().head() + + ed_flights.to_csv('results/test_to_csv_head.csv') + # Converting back from csv is messy as pd_flights is created from a json file + pd_from_csv = pd.read_csv('results/test_to_csv_head.csv', index_col=0, converters={ + 'DestLocation': lambda x: ast.literal_eval(x), + 'OriginLocation': lambda x: ast.literal_eval(x)}) + pd_from_csv.index = pd_from_csv.index.map(str) + pd_from_csv.timestamp = pd.to_datetime(pd_from_csv.timestamp) + + assert_frame_equal(pd_flights, pd_from_csv) + + def test_to_csv_full(self): + # Test is slow as it's for the full dataset, but it is useful as it goes over 10000 docs + ed_flights = self.ed_flights() + pd_flights = self.pd_flights() + + ed_flights.to_csv('results/test_to_csv_full.csv') + # Converting back from csv is messy as pd_flights is created from a json file + pd_from_csv = pd.read_csv('results/test_to_csv_full.csv', index_col=0, converters={ + 'DestLocation': lambda x: ast.literal_eval(x), + 'OriginLocation': lambda x: ast.literal_eval(x)}) + pd_from_csv.index = pd_from_csv.index.map(str) + pd_from_csv.timestamp = pd.to_datetime(pd_from_csv.timestamp) + + assert_frame_equal(pd_flights, pd_from_csv) + + + diff --git a/eland/tests/flights.json.gz b/eland/tests/flights.json.gz index 9a7b962..24a9e90 100644 Binary files a/eland/tests/flights.json.gz and b/eland/tests/flights.json.gz differ diff --git a/eland/tests/flights_df.json.gz b/eland/tests/flights_df.json.gz index 0cb2e29..10a8fae 100644 Binary files a/eland/tests/flights_df.json.gz and b/eland/tests/flights_df.json.gz differ