mirror of
https://github.com/elastic/eland.git
synced 2025-07-11 00:02:14 +08:00
commit
1625e979f5
@ -6,6 +6,9 @@ import numpy as np
|
|||||||
import pandas as pd
|
import pandas as pd
|
||||||
from pandas.compat import StringIO
|
from pandas.compat import StringIO
|
||||||
from pandas.core.common import apply_if_callable, is_bool_indexer
|
from pandas.core.common import apply_if_callable, is_bool_indexer
|
||||||
|
from pandas.core.dtypes.common import (
|
||||||
|
is_list_like
|
||||||
|
)
|
||||||
from pandas.io.common import _expand_user, _stringify_path
|
from pandas.io.common import _expand_user, _stringify_path
|
||||||
from pandas.io.formats import console
|
from pandas.io.formats import console
|
||||||
from pandas.io.formats import format as fmt
|
from pandas.io.formats import format as fmt
|
||||||
@ -431,6 +434,36 @@ class DataFrame(NDFrame):
|
|||||||
def _reduce_dimension(self, query_compiler):
|
def _reduce_dimension(self, query_compiler):
|
||||||
return Series(query_compiler=query_compiler)
|
return Series(query_compiler=query_compiler)
|
||||||
|
|
||||||
|
def to_csv(self, path_or_buf=None, sep=",", na_rep='', float_format=None,
|
||||||
|
columns=None, header=True, index=True, index_label=None,
|
||||||
|
mode='w', encoding=None, compression='infer', quoting=None,
|
||||||
|
quotechar='"', line_terminator=None, chunksize=None,
|
||||||
|
tupleize_cols=None, date_format=None, doublequote=True,
|
||||||
|
escapechar=None, decimal='.'):
|
||||||
|
kwargs = {
|
||||||
|
"path_or_buf": path_or_buf,
|
||||||
|
"sep": sep,
|
||||||
|
"na_rep": na_rep,
|
||||||
|
"float_format": float_format,
|
||||||
|
"columns": columns,
|
||||||
|
"header": header,
|
||||||
|
"index": index,
|
||||||
|
"index_label": index_label,
|
||||||
|
"mode": mode,
|
||||||
|
"encoding": encoding,
|
||||||
|
"compression": compression,
|
||||||
|
"quoting": quoting,
|
||||||
|
"quotechar": quotechar,
|
||||||
|
"line_terminator": line_terminator,
|
||||||
|
"chunksize": chunksize,
|
||||||
|
"tupleize_cols": tupleize_cols,
|
||||||
|
"date_format": date_format,
|
||||||
|
"doublequote": doublequote,
|
||||||
|
"escapechar": escapechar,
|
||||||
|
"decimal": decimal,
|
||||||
|
}
|
||||||
|
return self._query_compiler.to_csv(**kwargs)
|
||||||
|
|
||||||
def _to_pandas(self):
|
def _to_pandas(self):
|
||||||
return self._query_compiler.to_pandas()
|
return self._query_compiler.to_pandas()
|
||||||
|
|
||||||
@ -469,53 +502,45 @@ class DataFrame(NDFrame):
|
|||||||
def keys(self):
|
def keys(self):
|
||||||
return self.columns
|
return self.columns
|
||||||
|
|
||||||
def to_csv(
|
|
||||||
self,
|
|
||||||
path_or_buf=None,
|
|
||||||
sep=",",
|
|
||||||
na_rep="",
|
|
||||||
float_format=None,
|
|
||||||
columns=None,
|
|
||||||
header=True,
|
|
||||||
index=True,
|
|
||||||
index_label=None,
|
|
||||||
mode="w",
|
|
||||||
encoding=None,
|
|
||||||
compression="infer",
|
|
||||||
quoting=None,
|
|
||||||
quotechar='"',
|
|
||||||
line_terminator=None,
|
|
||||||
chunksize=None,
|
|
||||||
tupleize_cols=None,
|
|
||||||
date_format=None,
|
|
||||||
doublequote=True,
|
|
||||||
escapechar=None,
|
|
||||||
decimal=".",
|
|
||||||
*args,
|
|
||||||
**kwargs
|
|
||||||
):
|
|
||||||
kwargs = {
|
|
||||||
"path_or_buf": path_or_buf,
|
|
||||||
"sep": sep,
|
|
||||||
"na_rep": na_rep,
|
|
||||||
"float_format": float_format,
|
|
||||||
"columns": columns,
|
|
||||||
"header": header,
|
|
||||||
"index": index,
|
|
||||||
"index_label": index_label,
|
|
||||||
"mode": mode,
|
|
||||||
"encoding": encoding,
|
|
||||||
"compression": compression,
|
|
||||||
"quoting": quoting,
|
|
||||||
"quotechar": quotechar,
|
|
||||||
"line_terminator": line_terminator,
|
|
||||||
"chunksize": chunksize,
|
|
||||||
"tupleize_cols": tupleize_cols,
|
|
||||||
"date_format": date_format,
|
|
||||||
"doublequote": doublequote,
|
|
||||||
"escapechar": escapechar,
|
|
||||||
"decimal": decimal,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
def aggregate(self, func, axis=0, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
Aggregate using one or more operations over the specified axis.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
func : function, str, list or dict
|
||||||
|
Function to use for aggregating the data. If a function, must either
|
||||||
|
work when passed a %(klass)s or when passed to %(klass)s.apply.
|
||||||
|
|
||||||
|
Accepted combinations are:
|
||||||
|
|
||||||
|
- function
|
||||||
|
- string function name
|
||||||
|
- list of functions and/or function names, e.g. ``[np.sum, 'mean']``
|
||||||
|
- dict of axis labels -> functions, function names or list of such.
|
||||||
|
%(axis)s
|
||||||
|
*args
|
||||||
|
Positional arguments to pass to `func`.
|
||||||
|
**kwargs
|
||||||
|
Keyword arguments to pass to `func`.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
DataFrame, Series or scalar
|
||||||
|
if DataFrame.agg is called with a single function, returns a Series
|
||||||
|
if DataFrame.agg is called with several functions, returns a DataFrame
|
||||||
|
if Series.agg is called with single function, returns a scalar
|
||||||
|
if Series.agg is called with several functions, returns a Series
|
||||||
|
"""
|
||||||
|
axis = self._get_axis_number(axis)
|
||||||
|
|
||||||
|
if axis == 1:
|
||||||
|
raise NotImplementedError("Aggregating via index not currently implemented - needs index transform")
|
||||||
|
|
||||||
|
# currently we only support a subset of functions that aggregate columns.
|
||||||
|
# ['count', 'mad', 'max', 'mean', 'median', 'min', 'mode', 'quantile', 'rank', 'sem', 'skew', 'sum', 'std', 'var', 'nunique']
|
||||||
|
|
||||||
|
agg = aggregate
|
||||||
|
|
||||||
hist = gfx.ed_hist_frame
|
hist = gfx.ed_hist_frame
|
||||||
|
@ -324,6 +324,48 @@ class Operations:
|
|||||||
return df
|
return df
|
||||||
|
|
||||||
def to_pandas(self, query_compiler):
|
def to_pandas(self, query_compiler):
|
||||||
|
class PandasDataFrameCollector:
|
||||||
|
def collect(self, df):
|
||||||
|
self.df = df
|
||||||
|
def batch_size(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
collector = PandasDataFrameCollector()
|
||||||
|
|
||||||
|
self._es_results(query_compiler, collector)
|
||||||
|
|
||||||
|
return collector.df
|
||||||
|
|
||||||
|
def to_csv(self, query_compiler, **kwargs):
|
||||||
|
class PandasToCSVCollector:
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
self.kwargs = kwargs
|
||||||
|
self.ret = None
|
||||||
|
self.first_time = True
|
||||||
|
def collect(self, df):
|
||||||
|
# If this is the first time we collect results, then write header, otherwise don't write header
|
||||||
|
# and append results
|
||||||
|
if self.first_time:
|
||||||
|
self.first_time = False
|
||||||
|
df.to_csv(**self.kwargs)
|
||||||
|
else:
|
||||||
|
# Don't write header, and change mode to append
|
||||||
|
self.kwargs['header'] = False
|
||||||
|
self.kwargs['mode'] = 'a'
|
||||||
|
df.to_csv(**self.kwargs)
|
||||||
|
|
||||||
|
def batch_size(self):
|
||||||
|
# By default read 10000 docs to csv
|
||||||
|
batch_size = 10000
|
||||||
|
return batch_size
|
||||||
|
|
||||||
|
collector = PandasToCSVCollector(**kwargs)
|
||||||
|
|
||||||
|
self._es_results(query_compiler, collector)
|
||||||
|
|
||||||
|
return collector.ret
|
||||||
|
|
||||||
|
def _es_results(self, query_compiler, collector):
|
||||||
query_params, post_processing = self._resolve_tasks()
|
query_params, post_processing = self._resolve_tasks()
|
||||||
|
|
||||||
size, sort_params = Operations._query_params_to_size_and_sort(query_params)
|
size, sort_params = Operations._query_params_to_size_and_sort(query_params)
|
||||||
@ -337,6 +379,7 @@ class Operations:
|
|||||||
|
|
||||||
# If size=None use scan not search - then post sort results when in df
|
# If size=None use scan not search - then post sort results when in df
|
||||||
# If size>10000 use scan
|
# If size>10000 use scan
|
||||||
|
is_scan = False
|
||||||
if size is not None and size <= 10000:
|
if size is not None and size <= 10000:
|
||||||
if size > 0:
|
if size > 0:
|
||||||
es_results = query_compiler._client.search(
|
es_results = query_compiler._client.search(
|
||||||
@ -346,6 +389,7 @@ class Operations:
|
|||||||
body=body.to_search_body(),
|
body=body.to_search_body(),
|
||||||
_source=columns)
|
_source=columns)
|
||||||
else:
|
else:
|
||||||
|
is_scan = True
|
||||||
es_results = query_compiler._client.scan(
|
es_results = query_compiler._client.scan(
|
||||||
index=query_compiler._index_pattern,
|
index=query_compiler._index_pattern,
|
||||||
query=body.to_search_body(),
|
query=body.to_search_body(),
|
||||||
@ -354,9 +398,17 @@ class Operations:
|
|||||||
if sort_params is not None:
|
if sort_params is not None:
|
||||||
post_processing.append(self._sort_params_to_postprocessing(sort_params))
|
post_processing.append(self._sort_params_to_postprocessing(sort_params))
|
||||||
|
|
||||||
df = query_compiler._es_results_to_pandas(es_results)
|
if is_scan:
|
||||||
|
while True:
|
||||||
return self._apply_df_post_processing(df, post_processing)
|
partial_result, df = query_compiler._es_results_to_pandas(es_results, collector.batch_size())
|
||||||
|
df = self._apply_df_post_processing(df, post_processing)
|
||||||
|
collector.collect(df)
|
||||||
|
if partial_result == False:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
partial_result, df = query_compiler._es_results_to_pandas(es_results)
|
||||||
|
df = self._apply_df_post_processing(df, post_processing)
|
||||||
|
collector.collect(df)
|
||||||
|
|
||||||
def iloc(self, index, columns):
|
def iloc(self, index, columns):
|
||||||
# index and columns are indexers
|
# index and columns are indexers
|
||||||
@ -639,7 +691,8 @@ class Operations:
|
|||||||
|
|
||||||
def _resolve_post_processing_task(self, item, query_params, post_processing):
|
def _resolve_post_processing_task(self, item, query_params, post_processing):
|
||||||
# Just do this in post-processing
|
# Just do this in post-processing
|
||||||
post_processing.append(item)
|
if item[0] != 'columns':
|
||||||
|
post_processing.append(item)
|
||||||
|
|
||||||
return query_params, post_processing
|
return query_params, post_processing
|
||||||
|
|
||||||
|
@ -6,6 +6,10 @@ from eland import Index
|
|||||||
from eland import Mappings
|
from eland import Mappings
|
||||||
from eland import Operations
|
from eland import Operations
|
||||||
|
|
||||||
|
from pandas.core.dtypes.common import (
|
||||||
|
is_list_like
|
||||||
|
)
|
||||||
|
|
||||||
from pandas.core.indexes.numeric import Int64Index
|
from pandas.core.indexes.numeric import Int64Index
|
||||||
from pandas.core.indexes.range import RangeIndex
|
from pandas.core.indexes.range import RangeIndex
|
||||||
|
|
||||||
@ -37,9 +41,6 @@ class ElandQueryCompiler(BaseQueryCompiler):
|
|||||||
in the first/last n fields.
|
in the first/last n fields.
|
||||||
|
|
||||||
A way to mitigate this would be to post process this drop - TODO
|
A way to mitigate this would be to post process this drop - TODO
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self,
|
def __init__(self,
|
||||||
@ -95,7 +96,7 @@ class ElandQueryCompiler(BaseQueryCompiler):
|
|||||||
|
|
||||||
# END Index, columns, and dtypes objects
|
# END Index, columns, and dtypes objects
|
||||||
|
|
||||||
def _es_results_to_pandas(self, results):
|
def _es_results_to_pandas(self, results, batch_size=None):
|
||||||
"""
|
"""
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
@ -182,17 +183,25 @@ class ElandQueryCompiler(BaseQueryCompiler):
|
|||||||
TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great)
|
TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great)
|
||||||
NOTE - using this lists is generally not a good way to use this API
|
NOTE - using this lists is generally not a good way to use this API
|
||||||
"""
|
"""
|
||||||
|
partial_result = False
|
||||||
|
|
||||||
if results is None:
|
if results is None:
|
||||||
return self._empty_pd_ef()
|
return partial_result, self._empty_pd_ef()
|
||||||
|
|
||||||
rows = []
|
rows = []
|
||||||
index = []
|
index = []
|
||||||
if isinstance(results, dict):
|
if isinstance(results, dict):
|
||||||
iterator = results['hits']['hits']
|
iterator = results['hits']['hits']
|
||||||
|
|
||||||
|
if batch_size is not None:
|
||||||
|
raise NotImplementedError("Can not specify batch_size with dict results")
|
||||||
else:
|
else:
|
||||||
iterator = results
|
iterator = results
|
||||||
|
|
||||||
|
i = 0
|
||||||
for hit in iterator:
|
for hit in iterator:
|
||||||
|
i = i + 1
|
||||||
|
|
||||||
row = hit['_source']
|
row = hit['_source']
|
||||||
|
|
||||||
# get index value - can be _id or can be field value in source
|
# get index value - can be _id or can be field value in source
|
||||||
@ -205,6 +214,11 @@ class ElandQueryCompiler(BaseQueryCompiler):
|
|||||||
# flatten row to map correctly to 2D DataFrame
|
# flatten row to map correctly to 2D DataFrame
|
||||||
rows.append(self._flatten_dict(row))
|
rows.append(self._flatten_dict(row))
|
||||||
|
|
||||||
|
if batch_size is not None:
|
||||||
|
if i >= batch_size:
|
||||||
|
partial_result = True
|
||||||
|
break
|
||||||
|
|
||||||
# Create pandas DataFrame
|
# Create pandas DataFrame
|
||||||
df = pd.DataFrame(data=rows, index=index)
|
df = pd.DataFrame(data=rows, index=index)
|
||||||
|
|
||||||
@ -221,62 +235,7 @@ class ElandQueryCompiler(BaseQueryCompiler):
|
|||||||
# Sort columns in mapping order
|
# Sort columns in mapping order
|
||||||
df = df[self.columns]
|
df = df[self.columns]
|
||||||
|
|
||||||
return df
|
return partial_result, df
|
||||||
|
|
||||||
def _to_csv(self, results, **kwargs):
|
|
||||||
# Very similar to _es_results_to_pandas except we create partial pandas.DataFrame
|
|
||||||
# and write these to csv
|
|
||||||
|
|
||||||
# Use chunksize in kwargs do determine size of partial data frame
|
|
||||||
if 'chunksize' in kwargs:
|
|
||||||
chunksize = kwargs['chunksize']
|
|
||||||
else:
|
|
||||||
# If no default chunk, set to 1000
|
|
||||||
chunksize = 1000
|
|
||||||
|
|
||||||
if results is None:
|
|
||||||
return self._empty_pd_ef()
|
|
||||||
|
|
||||||
rows = []
|
|
||||||
index = []
|
|
||||||
if isinstance(results, dict):
|
|
||||||
iterator = results['hits']['hits']
|
|
||||||
else:
|
|
||||||
iterator = results
|
|
||||||
|
|
||||||
i = 0
|
|
||||||
for hit in iterator:
|
|
||||||
row = hit['_source']
|
|
||||||
|
|
||||||
# get index value - can be _id or can be field value in source
|
|
||||||
if self._index.is_source_field:
|
|
||||||
index_field = row[self._index.index_field]
|
|
||||||
else:
|
|
||||||
index_field = hit[self._index.index_field]
|
|
||||||
index.append(index_field)
|
|
||||||
|
|
||||||
# flatten row to map correctly to 2D DataFrame
|
|
||||||
rows.append(self._flatten_dict(row))
|
|
||||||
|
|
||||||
i = i + 1
|
|
||||||
if i % chunksize == 0:
|
|
||||||
# Create pandas DataFrame
|
|
||||||
df = pd.DataFrame(data=rows, index=index)
|
|
||||||
|
|
||||||
# _source may not contain all columns in the mapping
|
|
||||||
# therefore, fill in missing columns
|
|
||||||
# (note this returns self.columns NOT IN df.columns)
|
|
||||||
missing_columns = list(set(self.columns) - set(df.columns))
|
|
||||||
|
|
||||||
for missing in missing_columns:
|
|
||||||
is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(missing)
|
|
||||||
df[missing] = None
|
|
||||||
df[missing].astype(pd_dtype)
|
|
||||||
|
|
||||||
# Sort columns in mapping order
|
|
||||||
df = df[self.columns]
|
|
||||||
|
|
||||||
return df
|
|
||||||
|
|
||||||
def _flatten_dict(self, y):
|
def _flatten_dict(self, y):
|
||||||
out = {}
|
out = {}
|
||||||
@ -301,6 +260,13 @@ class ElandQueryCompiler(BaseQueryCompiler):
|
|||||||
|
|
||||||
# Coerce types - for now just datetime
|
# Coerce types - for now just datetime
|
||||||
if pd_dtype == 'datetime64[ns]':
|
if pd_dtype == 'datetime64[ns]':
|
||||||
|
# TODO - this doesn't work for certain ES date formats
|
||||||
|
# e.g. "@timestamp" : {
|
||||||
|
# "type" : "date",
|
||||||
|
# "format" : "epoch_millis"
|
||||||
|
# }
|
||||||
|
# 1484053499256 - we need to check ES type and format and add conversions like:
|
||||||
|
# pd.to_datetime(x, unit='ms')
|
||||||
x = pd.to_datetime(x)
|
x = pd.to_datetime(x)
|
||||||
|
|
||||||
# Elasticsearch can have multiple values for a field. These are represented as lists, so
|
# Elasticsearch can have multiple values for a field. These are represented as lists, so
|
||||||
@ -383,6 +349,15 @@ class ElandQueryCompiler(BaseQueryCompiler):
|
|||||||
"""
|
"""
|
||||||
return self._operations.to_pandas(self)
|
return self._operations.to_pandas(self)
|
||||||
|
|
||||||
|
# To CSV
|
||||||
|
def to_csv(self, **kwargs):
|
||||||
|
"""Serialises Eland Dataframe to CSV
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
If path_or_buf is None, returns the resulting csv format as a string. Otherwise returns None.
|
||||||
|
"""
|
||||||
|
return self._operations.to_csv(self, **kwargs)
|
||||||
|
|
||||||
# __getitem__ methods
|
# __getitem__ methods
|
||||||
def getitem_column_array(self, key, numeric=False):
|
def getitem_column_array(self, key, numeric=False):
|
||||||
"""Get column data for target labels.
|
"""Get column data for target labels.
|
||||||
@ -457,3 +432,33 @@ class ElandQueryCompiler(BaseQueryCompiler):
|
|||||||
|
|
||||||
def _hist(self, num_bins):
|
def _hist(self, num_bins):
|
||||||
return self._operations.hist(self, num_bins)
|
return self._operations.hist(self, num_bins)
|
||||||
|
|
||||||
|
def apply(self, func, axis, *args, **kwargs):
|
||||||
|
"""Apply func across given axis.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
func: The function to apply.
|
||||||
|
axis: Target axis to apply the function along.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A new QueryCompiler.
|
||||||
|
"""
|
||||||
|
"""Apply func across given axis.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
func: The function to apply.
|
||||||
|
axis: Target axis to apply the function along.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A new PandasQueryCompiler.
|
||||||
|
"""
|
||||||
|
if callable(func):
|
||||||
|
return self._callable_func(func, axis, *args, **kwargs)
|
||||||
|
elif isinstance(func, dict):
|
||||||
|
return self._dict_func(func, axis, *args, **kwargs)
|
||||||
|
elif is_list_like(func):
|
||||||
|
return self._list_like_func(func, axis, *args, **kwargs)
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@ -102,7 +102,6 @@ FLIGHTS_DF_FILE_NAME = ROOT_DIR + '/flights_df.json.gz'
|
|||||||
FLIGHTS_SMALL_INDEX_NAME = 'flights_small'
|
FLIGHTS_SMALL_INDEX_NAME = 'flights_small'
|
||||||
FLIGHTS_SMALL_MAPPING = FLIGHTS_MAPPING
|
FLIGHTS_SMALL_MAPPING = FLIGHTS_MAPPING
|
||||||
FLIGHTS_SMALL_FILE_NAME = ROOT_DIR + '/flights_small.json.gz'
|
FLIGHTS_SMALL_FILE_NAME = ROOT_DIR + '/flights_small.json.gz'
|
||||||
FLIGHTS_SMALL_DF_FILE_NAME = ROOT_DIR + '/flights_small_df.json.gz'
|
|
||||||
|
|
||||||
ECOMMERCE_INDEX_NAME = 'ecommerce'
|
ECOMMERCE_INDEX_NAME = 'ecommerce'
|
||||||
ECOMMERCE_MAPPING = { "mappings" : {
|
ECOMMERCE_MAPPING = { "mappings" : {
|
||||||
|
0
eland/tests/dataframe/results/.gitignore
vendored
Normal file
0
eland/tests/dataframe/results/.gitignore
vendored
Normal file
@ -12,6 +12,46 @@ class TestDataFrameAggs(TestData):
|
|||||||
pd_flights = self.pd_flights()
|
pd_flights = self.pd_flights()
|
||||||
ed_flights = self.ed_flights()
|
ed_flights = self.ed_flights()
|
||||||
|
|
||||||
|
pd_numerics = pd_flights.select_dtypes(include=[np.number])
|
||||||
|
print(pd_numerics.columns)
|
||||||
|
print(pd_numerics.agg('abs')) # all rows
|
||||||
|
print(pd_numerics.agg('all')) # columns True/False
|
||||||
|
print(pd_numerics.agg('any')) # columns True/False
|
||||||
|
print(pd_numerics.agg('corr')) # matrix col/col
|
||||||
|
print(pd_numerics.agg('count')) # columns count
|
||||||
|
print(pd_numerics.agg('cov')) # matrix col/col
|
||||||
|
print(pd_numerics.agg('cummax')) # all rows
|
||||||
|
print(pd_numerics.agg('cummin')) # all rows
|
||||||
|
print(pd_numerics.agg('cumprod')) # all rows
|
||||||
|
print(pd_numerics.agg('cumsum')) # all rows
|
||||||
|
print(pd_numerics.agg('describe')) # describe
|
||||||
|
print(pd_numerics.agg('diff')) # all rows
|
||||||
|
print(pd_numerics.agg('kurt')) # ?>
|
||||||
|
print(pd_numerics.agg('mad')) # col
|
||||||
|
print('MAX')
|
||||||
|
print(pd_numerics.agg('max')) # col
|
||||||
|
print(pd_numerics.agg('mean')) # col
|
||||||
|
print(pd_numerics.agg('median')) # col
|
||||||
|
print(pd_numerics.agg('min')) # col
|
||||||
|
print(pd_numerics.agg('mode')) # col
|
||||||
|
print(pd_numerics.agg('pct_change')) # all rows
|
||||||
|
print(pd_numerics.agg('prod')) # all rows
|
||||||
|
print(pd_numerics.agg('quantile')) # col
|
||||||
|
print(pd_numerics.agg('rank')) # col
|
||||||
|
print(pd_numerics.agg('round')) # all rows
|
||||||
|
print('SEM')
|
||||||
|
print(pd_numerics.agg('sem')) # col
|
||||||
|
print(pd_numerics.agg('skew')) # col
|
||||||
|
print(pd_numerics.agg('sum')) # col
|
||||||
|
print(pd_numerics.agg('std')) # col
|
||||||
|
print(pd_numerics.agg('var')) # col
|
||||||
|
print(pd_numerics.agg('nunique')) # col
|
||||||
|
|
||||||
|
print(pd_numerics.aggs(np.sqrt)) # all rows
|
||||||
|
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
pd_sum_min = pd_flights.select_dtypes(include=[np.number]).agg(['sum', 'min'])
|
pd_sum_min = pd_flights.select_dtypes(include=[np.number]).agg(['sum', 'min'])
|
||||||
print(type(pd_sum_min))
|
print(type(pd_sum_min))
|
||||||
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
|
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
|
||||||
|
@ -1,14 +1,43 @@
|
|||||||
# File called _pytest for PyCharm compatability
|
# File called _pytest for PyCharm compatability
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
|
||||||
import eland as ed
|
|
||||||
from eland.tests.common import ELASTICSEARCH_HOST
|
|
||||||
from eland.tests.common import TestData
|
from eland.tests.common import TestData
|
||||||
|
|
||||||
|
from pandas.util.testing import (assert_equal, assert_frame_equal)
|
||||||
|
|
||||||
|
import ast
|
||||||
|
|
||||||
class TestDataFrameToCSV(TestData):
|
class TestDataFrameToCSV(TestData):
|
||||||
|
|
||||||
def test_to_csv(self):
|
def test_to_csv_head(self):
|
||||||
print("TODO")
|
ed_flights = self.ed_flights().head()
|
||||||
|
pd_flights = self.pd_flights().head()
|
||||||
|
|
||||||
|
ed_flights.to_csv('results/test_to_csv_head.csv')
|
||||||
|
# Converting back from csv is messy as pd_flights is created from a json file
|
||||||
|
pd_from_csv = pd.read_csv('results/test_to_csv_head.csv', index_col=0, converters={
|
||||||
|
'DestLocation': lambda x: ast.literal_eval(x),
|
||||||
|
'OriginLocation': lambda x: ast.literal_eval(x)})
|
||||||
|
pd_from_csv.index = pd_from_csv.index.map(str)
|
||||||
|
pd_from_csv.timestamp = pd.to_datetime(pd_from_csv.timestamp)
|
||||||
|
|
||||||
|
assert_frame_equal(pd_flights, pd_from_csv)
|
||||||
|
|
||||||
|
def test_to_csv_full(self):
|
||||||
|
# Test is slow as it's for the full dataset, but it is useful as it goes over 10000 docs
|
||||||
|
ed_flights = self.ed_flights()
|
||||||
|
pd_flights = self.pd_flights()
|
||||||
|
|
||||||
|
ed_flights.to_csv('results/test_to_csv_full.csv')
|
||||||
|
# Converting back from csv is messy as pd_flights is created from a json file
|
||||||
|
pd_from_csv = pd.read_csv('results/test_to_csv_full.csv', index_col=0, converters={
|
||||||
|
'DestLocation': lambda x: ast.literal_eval(x),
|
||||||
|
'OriginLocation': lambda x: ast.literal_eval(x)})
|
||||||
|
pd_from_csv.index = pd_from_csv.index.map(str)
|
||||||
|
pd_from_csv.timestamp = pd.to_datetime(pd_from_csv.timestamp)
|
||||||
|
|
||||||
|
assert_frame_equal(pd_flights, pd_from_csv)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
Binary file not shown.
Binary file not shown.
Loading…
x
Reference in New Issue
Block a user