Added DataFrame.to_csv - tests still failing

This commit is contained in:
Stephen Dodson 2019-08-09 07:54:44 +00:00
parent c6e0c5b92b
commit 49bad292d3
9 changed files with 269 additions and 118 deletions

View File

@ -6,6 +6,9 @@ import numpy as np
import pandas as pd import pandas as pd
from pandas.compat import StringIO from pandas.compat import StringIO
from pandas.core.common import apply_if_callable, is_bool_indexer from pandas.core.common import apply_if_callable, is_bool_indexer
from pandas.core.dtypes.common import (
is_list_like
)
from pandas.io.common import _expand_user, _stringify_path from pandas.io.common import _expand_user, _stringify_path
from pandas.io.formats import console from pandas.io.formats import console
from pandas.io.formats import format as fmt from pandas.io.formats import format as fmt
@ -431,6 +434,36 @@ class DataFrame(NDFrame):
def _reduce_dimension(self, query_compiler): def _reduce_dimension(self, query_compiler):
return Series(query_compiler=query_compiler) return Series(query_compiler=query_compiler)
def to_csv(self, path_or_buf=None, sep=",", na_rep='', float_format=None,
columns=None, header=True, index=True, index_label=None,
mode='w', encoding=None, compression='infer', quoting=None,
quotechar='"', line_terminator=None, chunksize=None,
tupleize_cols=None, date_format=None, doublequote=True,
escapechar=None, decimal='.'):
kwargs = {
"path_or_buf": path_or_buf,
"sep": sep,
"na_rep": na_rep,
"float_format": float_format,
"columns": columns,
"header": header,
"index": index,
"index_label": index_label,
"mode": mode,
"encoding": encoding,
"compression": compression,
"quoting": quoting,
"quotechar": quotechar,
"line_terminator": line_terminator,
"chunksize": chunksize,
"tupleize_cols": tupleize_cols,
"date_format": date_format,
"doublequote": doublequote,
"escapechar": escapechar,
"decimal": decimal,
}
return self._query_compiler.to_csv(**kwargs)
def _to_pandas(self): def _to_pandas(self):
return self._query_compiler.to_pandas() return self._query_compiler.to_pandas()
@ -469,53 +502,45 @@ class DataFrame(NDFrame):
def keys(self): def keys(self):
return self.columns return self.columns
def to_csv(
self,
path_or_buf=None,
sep=",",
na_rep="",
float_format=None,
columns=None,
header=True,
index=True,
index_label=None,
mode="w",
encoding=None,
compression="infer",
quoting=None,
quotechar='"',
line_terminator=None,
chunksize=None,
tupleize_cols=None,
date_format=None,
doublequote=True,
escapechar=None,
decimal=".",
*args,
**kwargs
):
kwargs = {
"path_or_buf": path_or_buf,
"sep": sep,
"na_rep": na_rep,
"float_format": float_format,
"columns": columns,
"header": header,
"index": index,
"index_label": index_label,
"mode": mode,
"encoding": encoding,
"compression": compression,
"quoting": quoting,
"quotechar": quotechar,
"line_terminator": line_terminator,
"chunksize": chunksize,
"tupleize_cols": tupleize_cols,
"date_format": date_format,
"doublequote": doublequote,
"escapechar": escapechar,
"decimal": decimal,
}
def aggregate(self, func, axis=0, *args, **kwargs):
"""
Aggregate using one or more operations over the specified axis.
Parameters
----------
func : function, str, list or dict
Function to use for aggregating the data. If a function, must either
work when passed a %(klass)s or when passed to %(klass)s.apply.
Accepted combinations are:
- function
- string function name
- list of functions and/or function names, e.g. ``[np.sum, 'mean']``
- dict of axis labels -> functions, function names or list of such.
%(axis)s
*args
Positional arguments to pass to `func`.
**kwargs
Keyword arguments to pass to `func`.
Returns
-------
DataFrame, Series or scalar
if DataFrame.agg is called with a single function, returns a Series
if DataFrame.agg is called with several functions, returns a DataFrame
if Series.agg is called with single function, returns a scalar
if Series.agg is called with several functions, returns a Series
"""
axis = self._get_axis_number(axis)
if axis == 1:
raise NotImplementedError("Aggregating via index not currently implemented - needs index transform")
# currently we only support a subset of functions that aggregate columns.
# ['count', 'mad', 'max', 'mean', 'median', 'min', 'mode', 'quantile', 'rank', 'sem', 'skew', 'sum', 'std', 'var', 'nunique']
agg = aggregate
hist = gfx.ed_hist_frame hist = gfx.ed_hist_frame

View File

@ -324,6 +324,48 @@ class Operations:
return df return df
def to_pandas(self, query_compiler): def to_pandas(self, query_compiler):
class PandasDataFrameCollector:
def collect(self, df):
self.df = df
def batch_size(self):
return None
collector = PandasDataFrameCollector()
self._es_results(query_compiler, collector)
return collector.df
def to_csv(self, query_compiler, **kwargs):
class PandasToCSVCollector:
def __init__(self, **kwargs):
self.kwargs = kwargs
self.ret = None
self.first_time = True
def collect(self, df):
# If this is the first time we collect results, then write header, otherwise don't write header
# and append results
if self.first_time:
self.first_time = False
df.to_csv(**self.kwargs)
else:
# Don't write header, and change mode to append
self.kwargs['header'] = False
self.kwargs['mode'] = 'a'
df.to_csv(**self.kwargs)
def batch_size(self):
# By default read 10000 docs to csv
batch_size = 10000
return batch_size
collector = PandasToCSVCollector(**kwargs)
self._es_results(query_compiler, collector)
return collector.ret
def _es_results(self, query_compiler, collector):
query_params, post_processing = self._resolve_tasks() query_params, post_processing = self._resolve_tasks()
size, sort_params = Operations._query_params_to_size_and_sort(query_params) size, sort_params = Operations._query_params_to_size_and_sort(query_params)
@ -337,6 +379,7 @@ class Operations:
# If size=None use scan not search - then post sort results when in df # If size=None use scan not search - then post sort results when in df
# If size>10000 use scan # If size>10000 use scan
is_scan = False
if size is not None and size <= 10000: if size is not None and size <= 10000:
if size > 0: if size > 0:
es_results = query_compiler._client.search( es_results = query_compiler._client.search(
@ -346,6 +389,7 @@ class Operations:
body=body.to_search_body(), body=body.to_search_body(),
_source=columns) _source=columns)
else: else:
is_scan = True
es_results = query_compiler._client.scan( es_results = query_compiler._client.scan(
index=query_compiler._index_pattern, index=query_compiler._index_pattern,
query=body.to_search_body(), query=body.to_search_body(),
@ -354,9 +398,17 @@ class Operations:
if sort_params is not None: if sort_params is not None:
post_processing.append(self._sort_params_to_postprocessing(sort_params)) post_processing.append(self._sort_params_to_postprocessing(sort_params))
df = query_compiler._es_results_to_pandas(es_results) if is_scan:
while True:
return self._apply_df_post_processing(df, post_processing) partial_result, df = query_compiler._es_results_to_pandas(es_results, collector.batch_size())
df = self._apply_df_post_processing(df, post_processing)
collector.collect(df)
if partial_result == False:
break
else:
partial_result, df = query_compiler._es_results_to_pandas(es_results)
df = self._apply_df_post_processing(df, post_processing)
collector.collect(df)
def iloc(self, index, columns): def iloc(self, index, columns):
# index and columns are indexers # index and columns are indexers
@ -639,7 +691,8 @@ class Operations:
def _resolve_post_processing_task(self, item, query_params, post_processing): def _resolve_post_processing_task(self, item, query_params, post_processing):
# Just do this in post-processing # Just do this in post-processing
post_processing.append(item) if item[0] != 'columns':
post_processing.append(item)
return query_params, post_processing return query_params, post_processing

View File

@ -6,6 +6,10 @@ from eland import Index
from eland import Mappings from eland import Mappings
from eland import Operations from eland import Operations
from pandas.core.dtypes.common import (
is_list_like
)
from pandas.core.indexes.numeric import Int64Index from pandas.core.indexes.numeric import Int64Index
from pandas.core.indexes.range import RangeIndex from pandas.core.indexes.range import RangeIndex
@ -37,9 +41,6 @@ class ElandQueryCompiler(BaseQueryCompiler):
in the first/last n fields. in the first/last n fields.
A way to mitigate this would be to post process this drop - TODO A way to mitigate this would be to post process this drop - TODO
""" """
def __init__(self, def __init__(self,
@ -95,7 +96,7 @@ class ElandQueryCompiler(BaseQueryCompiler):
# END Index, columns, and dtypes objects # END Index, columns, and dtypes objects
def _es_results_to_pandas(self, results): def _es_results_to_pandas(self, results, batch_size=None):
""" """
Parameters Parameters
---------- ----------
@ -182,17 +183,25 @@ class ElandQueryCompiler(BaseQueryCompiler):
TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great) TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great)
NOTE - using this lists is generally not a good way to use this API NOTE - using this lists is generally not a good way to use this API
""" """
partial_result = False
if results is None: if results is None:
return self._empty_pd_ef() return partial_result, self._empty_pd_ef()
rows = [] rows = []
index = [] index = []
if isinstance(results, dict): if isinstance(results, dict):
iterator = results['hits']['hits'] iterator = results['hits']['hits']
if batch_size is not None:
raise NotImplementedError("Can not specify batch_size with dict results")
else: else:
iterator = results iterator = results
i = 0
for hit in iterator: for hit in iterator:
i = i + 1
row = hit['_source'] row = hit['_source']
# get index value - can be _id or can be field value in source # get index value - can be _id or can be field value in source
@ -205,6 +214,11 @@ class ElandQueryCompiler(BaseQueryCompiler):
# flatten row to map correctly to 2D DataFrame # flatten row to map correctly to 2D DataFrame
rows.append(self._flatten_dict(row)) rows.append(self._flatten_dict(row))
if batch_size is not None:
if i >= batch_size:
partial_result = True
break
# Create pandas DataFrame # Create pandas DataFrame
df = pd.DataFrame(data=rows, index=index) df = pd.DataFrame(data=rows, index=index)
@ -221,62 +235,7 @@ class ElandQueryCompiler(BaseQueryCompiler):
# Sort columns in mapping order # Sort columns in mapping order
df = df[self.columns] df = df[self.columns]
return df return partial_result, df
def _to_csv(self, results, **kwargs):
# Very similar to _es_results_to_pandas except we create partial pandas.DataFrame
# and write these to csv
# Use chunksize in kwargs do determine size of partial data frame
if 'chunksize' in kwargs:
chunksize = kwargs['chunksize']
else:
# If no default chunk, set to 1000
chunksize = 1000
if results is None:
return self._empty_pd_ef()
rows = []
index = []
if isinstance(results, dict):
iterator = results['hits']['hits']
else:
iterator = results
i = 0
for hit in iterator:
row = hit['_source']
# get index value - can be _id or can be field value in source
if self._index.is_source_field:
index_field = row[self._index.index_field]
else:
index_field = hit[self._index.index_field]
index.append(index_field)
# flatten row to map correctly to 2D DataFrame
rows.append(self._flatten_dict(row))
i = i + 1
if i % chunksize == 0:
# Create pandas DataFrame
df = pd.DataFrame(data=rows, index=index)
# _source may not contain all columns in the mapping
# therefore, fill in missing columns
# (note this returns self.columns NOT IN df.columns)
missing_columns = list(set(self.columns) - set(df.columns))
for missing in missing_columns:
is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(missing)
df[missing] = None
df[missing].astype(pd_dtype)
# Sort columns in mapping order
df = df[self.columns]
return df
def _flatten_dict(self, y): def _flatten_dict(self, y):
out = {} out = {}
@ -301,6 +260,13 @@ class ElandQueryCompiler(BaseQueryCompiler):
# Coerce types - for now just datetime # Coerce types - for now just datetime
if pd_dtype == 'datetime64[ns]': if pd_dtype == 'datetime64[ns]':
# TODO - this doesn't work for certain ES date formats
# e.g. "@timestamp" : {
# "type" : "date",
# "format" : "epoch_millis"
# }
# 1484053499256 - we need to check ES type and format and add conversions like:
# pd.to_datetime(x, unit='ms')
x = pd.to_datetime(x) x = pd.to_datetime(x)
# Elasticsearch can have multiple values for a field. These are represented as lists, so # Elasticsearch can have multiple values for a field. These are represented as lists, so
@ -383,6 +349,15 @@ class ElandQueryCompiler(BaseQueryCompiler):
""" """
return self._operations.to_pandas(self) return self._operations.to_pandas(self)
# To CSV
def to_csv(self, **kwargs):
"""Serialises Eland Dataframe to CSV
Returns:
If path_or_buf is None, returns the resulting csv format as a string. Otherwise returns None.
"""
return self._operations.to_csv(self, **kwargs)
# __getitem__ methods # __getitem__ methods
def getitem_column_array(self, key, numeric=False): def getitem_column_array(self, key, numeric=False):
"""Get column data for target labels. """Get column data for target labels.
@ -457,3 +432,33 @@ class ElandQueryCompiler(BaseQueryCompiler):
def _hist(self, num_bins): def _hist(self, num_bins):
return self._operations.hist(self, num_bins) return self._operations.hist(self, num_bins)
def apply(self, func, axis, *args, **kwargs):
"""Apply func across given axis.
Args:
func: The function to apply.
axis: Target axis to apply the function along.
Returns:
A new QueryCompiler.
"""
"""Apply func across given axis.
Args:
func: The function to apply.
axis: Target axis to apply the function along.
Returns:
A new PandasQueryCompiler.
"""
if callable(func):
return self._callable_func(func, axis, *args, **kwargs)
elif isinstance(func, dict):
return self._dict_func(func, axis, *args, **kwargs)
elif is_list_like(func):
return self._list_like_func(func, axis, *args, **kwargs)
else:
pass

View File

@ -102,7 +102,6 @@ FLIGHTS_DF_FILE_NAME = ROOT_DIR + '/flights_df.json.gz'
FLIGHTS_SMALL_INDEX_NAME = 'flights_small' FLIGHTS_SMALL_INDEX_NAME = 'flights_small'
FLIGHTS_SMALL_MAPPING = FLIGHTS_MAPPING FLIGHTS_SMALL_MAPPING = FLIGHTS_MAPPING
FLIGHTS_SMALL_FILE_NAME = ROOT_DIR + '/flights_small.json.gz' FLIGHTS_SMALL_FILE_NAME = ROOT_DIR + '/flights_small.json.gz'
FLIGHTS_SMALL_DF_FILE_NAME = ROOT_DIR + '/flights_small_df.json.gz'
ECOMMERCE_INDEX_NAME = 'ecommerce' ECOMMERCE_INDEX_NAME = 'ecommerce'
ECOMMERCE_MAPPING = { "mappings" : { ECOMMERCE_MAPPING = { "mappings" : {

View File

View File

@ -12,6 +12,46 @@ class TestDataFrameAggs(TestData):
pd_flights = self.pd_flights() pd_flights = self.pd_flights()
ed_flights = self.ed_flights() ed_flights = self.ed_flights()
pd_numerics = pd_flights.select_dtypes(include=[np.number])
print(pd_numerics.columns)
print(pd_numerics.agg('abs')) # all rows
print(pd_numerics.agg('all')) # columns True/False
print(pd_numerics.agg('any')) # columns True/False
print(pd_numerics.agg('corr')) # matrix col/col
print(pd_numerics.agg('count')) # columns count
print(pd_numerics.agg('cov')) # matrix col/col
print(pd_numerics.agg('cummax')) # all rows
print(pd_numerics.agg('cummin')) # all rows
print(pd_numerics.agg('cumprod')) # all rows
print(pd_numerics.agg('cumsum')) # all rows
print(pd_numerics.agg('describe')) # describe
print(pd_numerics.agg('diff')) # all rows
print(pd_numerics.agg('kurt')) # ?>
print(pd_numerics.agg('mad')) # col
print('MAX')
print(pd_numerics.agg('max')) # col
print(pd_numerics.agg('mean')) # col
print(pd_numerics.agg('median')) # col
print(pd_numerics.agg('min')) # col
print(pd_numerics.agg('mode')) # col
print(pd_numerics.agg('pct_change')) # all rows
print(pd_numerics.agg('prod')) # all rows
print(pd_numerics.agg('quantile')) # col
print(pd_numerics.agg('rank')) # col
print(pd_numerics.agg('round')) # all rows
print('SEM')
print(pd_numerics.agg('sem')) # col
print(pd_numerics.agg('skew')) # col
print(pd_numerics.agg('sum')) # col
print(pd_numerics.agg('std')) # col
print(pd_numerics.agg('var')) # col
print(pd_numerics.agg('nunique')) # col
print(pd_numerics.aggs(np.sqrt)) # all rows
return
pd_sum_min = pd_flights.select_dtypes(include=[np.number]).agg(['sum', 'min']) pd_sum_min = pd_flights.select_dtypes(include=[np.number]).agg(['sum', 'min'])
print(type(pd_sum_min)) print(type(pd_sum_min))
with pd.option_context('display.max_rows', None, 'display.max_columns', None): with pd.option_context('display.max_rows', None, 'display.max_columns', None):

View File

@ -1,14 +1,43 @@
# File called _pytest for PyCharm compatability # File called _pytest for PyCharm compatability
import numpy as np
import pandas as pd import pandas as pd
import eland as ed
from eland.tests.common import ELASTICSEARCH_HOST
from eland.tests.common import TestData from eland.tests.common import TestData
from pandas.util.testing import (assert_equal, assert_frame_equal)
import ast
class TestDataFrameToCSV(TestData): class TestDataFrameToCSV(TestData):
def test_to_csv(self): def test_to_csv_head(self):
print("TODO") ed_flights = self.ed_flights().head()
pd_flights = self.pd_flights().head()
ed_flights.to_csv('results/test_to_csv_head.csv')
# Converting back from csv is messy as pd_flights is created from a json file
pd_from_csv = pd.read_csv('results/test_to_csv_head.csv', index_col=0, converters={
'DestLocation': lambda x: ast.literal_eval(x),
'OriginLocation': lambda x: ast.literal_eval(x)})
pd_from_csv.index = pd_from_csv.index.map(str)
pd_from_csv.timestamp = pd.to_datetime(pd_from_csv.timestamp)
assert_frame_equal(pd_flights, pd_from_csv)
def test_to_csv_full(self):
# Test is slow as it's for the full dataset, but it is useful as it goes over 10000 docs
ed_flights = self.ed_flights()
pd_flights = self.pd_flights()
ed_flights.to_csv('results/test_to_csv_full.csv')
# Converting back from csv is messy as pd_flights is created from a json file
pd_from_csv = pd.read_csv('results/test_to_csv_full.csv', index_col=0, converters={
'DestLocation': lambda x: ast.literal_eval(x),
'OriginLocation': lambda x: ast.literal_eval(x)})
pd_from_csv.index = pd_from_csv.index.map(str)
pd_from_csv.timestamp = pd.to_datetime(pd_from_csv.timestamp)
assert_frame_equal(pd_flights, pd_from_csv)

Binary file not shown.

Binary file not shown.