Merge pull request #17 from stevedodson/master

Adding eland.DataFrame.hist
This commit is contained in:
stevedodson 2019-07-31 10:01:52 +00:00 committed by GitHub
commit 62d244ff8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 19025 additions and 27 deletions

View File

@ -10,6 +10,7 @@ from .mappings import *
from .query import * from .query import *
from .operations import * from .operations import *
from .query_compiler import * from .query_compiler import *
from .plotting import *
from .ndframe import * from .ndframe import *
from .series import * from .series import *
from .dataframe import * from .dataframe import *

View File

@ -1,9 +1,9 @@
import warnings
import sys import sys
import warnings
import pandas as pd
import numpy as np import numpy as np
import pandas as pd
from distutils.version import LooseVersion
from pandas.compat import StringIO from pandas.compat import StringIO
from pandas.core.common import apply_if_callable, is_bool_indexer from pandas.core.common import apply_if_callable, is_bool_indexer
from pandas.io.common import _expand_user, _stringify_path from pandas.io.common import _expand_user, _stringify_path
@ -14,6 +14,7 @@ from pandas.io.formats.printing import pprint_thing
from eland import NDFrame from eland import NDFrame
from eland import Series from eland import Series
import eland.plotting as gfx
class DataFrame(NDFrame): class DataFrame(NDFrame):
# This is effectively 2 constructors # This is effectively 2 constructors
@ -74,6 +75,46 @@ class DataFrame(NDFrame):
return buf.getvalue() return buf.getvalue()
def _info_repr(self):
"""
True if the repr should show the info view.
"""
info_repr_option = (pd.get_option("display.large_repr") == "info")
return info_repr_option and not (self._repr_fits_horizontal_() and
self._repr_fits_vertical_())
def _repr_html_(self):
"""
From pandas
"""
try:
import IPython
except ImportError:
pass
else:
if LooseVersion(IPython.__version__) < LooseVersion('3.0'):
if console.in_qtconsole():
# 'HTML output is disabled in QtConsole'
return None
if self._info_repr():
buf = StringIO(u(""))
self.info(buf=buf)
# need to escape the <class>, should be the first line.
val = buf.getvalue().replace('<', r'&lt;', 1)
val = val.replace('>', r'&gt;', 1)
return '<pre>' + val + '</pre>'
if pd.get_option("display.notebook_repr_html"):
max_rows = pd.get_option("display.max_rows")
max_cols = pd.get_option("display.max_columns")
show_dimensions = pd.get_option("display.show_dimensions")
return self.to_html(max_rows=max_rows, max_cols=max_cols,
show_dimensions=show_dimensions, notebook=True)
else:
return None
def count(self): def count(self):
""" """
Count non-NA cells for each column (TODO row) Count non-NA cells for each column (TODO row)
@ -89,7 +130,6 @@ class DataFrame(NDFrame):
""" """
return self._query_compiler.count() return self._query_compiler.count()
def info_es(self): def info_es(self):
buf = StringIO() buf = StringIO()
@ -222,6 +262,45 @@ class DataFrame(NDFrame):
fmt.buffer_put_lines(buf, lines) fmt.buffer_put_lines(buf, lines)
def to_html(self, buf=None, columns=None, col_space=None, header=True,
index=True, na_rep='NaN', formatters=None, float_format=None,
sparsify=None, index_names=True, justify=None, max_rows=None,
max_cols=None, show_dimensions=False, decimal='.',
bold_rows=True, classes=None, escape=True, notebook=False,
border=None, table_id=None, render_links=False):
"""
From pandas - except we set max_rows default to avoid careless extraction of entire index
"""
if max_rows is None:
warnings.warn("DataFrame.to_string called without max_rows set "
"- this will return entire index results. "
"Setting max_rows=60, overwrite if different behaviour is required.")
max_rows = 60
# Create a slightly bigger dataframe than display
df = self._build_repr_df(max_rows + 1, max_cols)
if buf is not None:
_buf = _expand_user(_stringify_path(buf))
else:
_buf = StringIO()
df.to_html(buf=_buf, columns=columns, col_space=col_space, header=header,
index=index, na_rep=na_rep, formatters=formatters, float_format=float_format,
sparsify=sparsify, index_names=index_names, justify=justify, max_rows=max_rows,
max_cols=max_cols, show_dimensions=False, decimal=decimal,
bold_rows=bold_rows, classes=classes, escape=escape, notebook=notebook,
border=border, table_id=table_id, render_links=render_links)
# Our fake dataframe has incorrect number of rows (max_rows*2+1) - write out
# the correct number of rows
if show_dimensions:
_buf.write("\n<p>{nrows} rows x {ncols} columns</p>"
.format(nrows=len(self.index), ncols=len(self.columns)))
if buf is None:
result = _buf.getvalue()
return result
def to_string(self, buf=None, columns=None, col_space=None, header=True, def to_string(self, buf=None, columns=None, col_space=None, header=True,
index=True, na_rep='NaN', formatters=None, float_format=None, index=True, na_rep='NaN', formatters=None, float_format=None,
@ -238,7 +317,7 @@ class DataFrame(NDFrame):
max_rows = 60 max_rows = 60
# Create a slightly bigger dataframe than display # Create a slightly bigger dataframe than display
df = self._build_repr_df(max_rows+1, max_cols) df = self._build_repr_df(max_rows + 1, max_cols)
if buf is not None: if buf is not None:
_buf = _expand_user(_stringify_path(buf)) _buf = _expand_user(_stringify_path(buf))
@ -295,7 +374,6 @@ class DataFrame(NDFrame):
if key not in self.columns: if key not in self.columns:
raise KeyError("Requested column is not in the DataFrame {}".format(key)) raise KeyError("Requested column is not in the DataFrame {}".format(key))
s = self._reduce_dimension(self._query_compiler.getitem_column_array([key])) s = self._reduce_dimension(self._query_compiler.getitem_column_array([key]))
s._parent = self
return s return s
def _getitem_array(self, key): def _getitem_array(self, key):
@ -345,7 +423,7 @@ class DataFrame(NDFrame):
if not inplace: if not inplace:
return DataFrame(query_compiler=new_query_compiler) return DataFrame(query_compiler=new_query_compiler)
else: else:
self._query_compiler=new_query_compiler self._query_compiler = new_query_compiler
def _reduce_dimension(self, query_compiler): def _reduce_dimension(self, query_compiler):
return Series(query_compiler=query_compiler) return Series(query_compiler=query_compiler)
@ -353,7 +431,31 @@ class DataFrame(NDFrame):
def _to_pandas(self): def _to_pandas(self):
return self._query_compiler.to_pandas() return self._query_compiler.to_pandas()
def _empty_pd_df(self):
return self._query_compiler._empty_pd_ef()
def squeeze(self, axis=None): def squeeze(self, axis=None):
return DataFrame( return DataFrame(
query_compiler=self._query_compiler.squeeze(axis) query_compiler=self._query_compiler.squeeze(axis)
) )
@property
def shape(self):
"""
Return a tuple representing the dimensionality of the DataFrame.
Returns
-------
shape: tuple
0 - number of rows
1 - number of columns
"""
num_rows = len(self)
num_columns = len(self.columns)
return num_rows, num_columns
def keys(self):
return self.columns
hist = gfx.ed_hist_frame

View File

@ -53,7 +53,6 @@ class Index:
# Make iterable # Make iterable
def __next__(self): def __next__(self):
# TODO resolve this hack to make this 'iterable' # TODO resolve this hack to make this 'iterable'
print("In Index.__next__")
raise StopIteration() raise StopIteration()
def __iter__(self): def __iter__(self):

View File

@ -403,7 +403,7 @@ class Mappings:
return is_source_field return is_source_field
def numeric_source_fields(self, columns): def numeric_source_fields(self, columns, include_bool=True):
""" """
Returns Returns
------- -------

View File

@ -94,7 +94,6 @@ class NDFrame(BasePandasDataset):
Returns: Returns:
The value of the attribute. The value of the attribute.
""" """
print(key)
try: try:
return object.__getattribute__(self, key) return object.__getattribute__(self, key)
except AttributeError as e: except AttributeError as e:
@ -228,5 +227,26 @@ class NDFrame(BasePandasDataset):
raise NotImplementedError("Only sum of numeric fields is implemented") raise NotImplementedError("Only sum of numeric fields is implemented")
return self._query_compiler.max() return self._query_compiler.max()
def _hist(self, num_bins):
return self._query_compiler._hist(num_bins)
def describe(self): def describe(self):
return self._query_compiler.describe() return self._query_compiler.describe()
def get(self, key, default=None):
"""Get item from object for given key (DataFrame column, Panel
slice, etc.). Returns default value if not found.
Args:
key (DataFrame column, Panel slice) : the key for which value
to get
Returns:
value (type of items contained in object) : A value that is
stored at the key
"""
if key in self.keys():
return self.__getitem__(key)
else:
return default

View File

@ -2,7 +2,7 @@ import copy
from enum import Enum from enum import Enum
import pandas as pd import pandas as pd
from elasticsearch_dsl import Search import numpy as np
from eland import Index from eland import Index
from eland import Query from eland import Query
@ -127,6 +127,12 @@ class Operations:
def min(self, query_compiler): def min(self, query_compiler):
return self._metric_aggs(query_compiler, 'min') return self._metric_aggs(query_compiler, 'min')
def nunique(self, query_compiler):
return self._terms_aggs(query_compiler, 'cardinality')
def hist(self, query_compiler, bins):
return self._hist_aggs(query_compiler, bins)
def _metric_aggs(self, query_compiler, func): def _metric_aggs(self, query_compiler, func):
query_params, post_processing = self._resolve_tasks() query_params, post_processing = self._resolve_tasks()
@ -159,10 +165,116 @@ class Operations:
for field in numeric_source_fields: for field in numeric_source_fields:
results[field] = response['aggregations'][field]['value'] results[field] = response['aggregations'][field]['value']
# Return single value if this is a series
#if len(numeric_source_fields) == 1:
# return np.float64(results[numeric_source_fields[0]])
s = pd.Series(data=results, index=numeric_source_fields) s = pd.Series(data=results, index=numeric_source_fields)
return s return s
def _terms_aggs(self, query_compiler, func):
query_params, post_processing = self._resolve_tasks()
size = self._size(query_params, post_processing)
if size is not None:
raise NotImplementedError("Can not count field matches if size is set {}".format(size))
columns = self.get_columns()
numeric_source_fields = query_compiler._mappings.numeric_source_fields(columns)
body = Query(query_params['query'])
for field in numeric_source_fields:
body.metric_aggs(field, func, field)
response = query_compiler._client.search(
index=query_compiler._index_pattern,
size=0,
body=body.to_search_body())
results = {}
for field in numeric_source_fields:
results[field] = response['aggregations'][field]['value']
s = pd.Series(data=results, index=numeric_source_fields)
return s
def _hist_aggs(self, query_compiler, num_bins):
# Get histogram bins and weights for numeric columns
query_params, post_processing = self._resolve_tasks()
size = self._size(query_params, post_processing)
if size is not None:
raise NotImplementedError("Can not count field matches if size is set {}".format(size))
columns = self.get_columns()
numeric_source_fields = query_compiler._mappings.numeric_source_fields(columns)
body = Query(query_params['query'])
min_aggs = self._metric_aggs(query_compiler, 'min')
max_aggs = self._metric_aggs(query_compiler, 'max')
for field in numeric_source_fields:
body.hist_aggs(field, field, min_aggs, max_aggs, num_bins)
response = query_compiler._client.search(
index=query_compiler._index_pattern,
size=0,
body=body.to_search_body())
# results are like
# "aggregations" : {
# "DistanceKilometers" : {
# "buckets" : [
# {
# "key" : 0.0,
# "doc_count" : 2956
# },
# {
# "key" : 1988.1482421875,
# "doc_count" : 768
# },
# ...
bins = {}
weights = {}
# There is one more bin that weights
# len(bins) = len(weights) + 1
# bins = [ 0. 36. 72. 108. 144. 180. 216. 252. 288. 324. 360.]
# len(bins) == 11
# weights = [10066., 263., 386., 264., 273., 390., 324., 438., 261., 394.]
# len(weights) == 10
# ES returns
# weights = [10066., 263., 386., 264., 273., 390., 324., 438., 261., 252., 142.]
# So sum last 2 buckets
for field in numeric_source_fields:
buckets = response['aggregations'][field]['buckets']
bins[field] = []
weights[field] = []
for bucket in buckets:
bins[field].append(bucket['key'])
if bucket == buckets[-1]:
weights[field][-1] += bucket['doc_count']
else:
weights[field].append(bucket['doc_count'])
df_bins = pd.DataFrame(data=bins)
df_weights = pd.DataFrame(data=weights)
return df_bins, df_weights
def describe(self, query_compiler): def describe(self, query_compiler):
query_params, post_processing = self._resolve_tasks() query_params, post_processing = self._resolve_tasks()
@ -182,8 +294,6 @@ class Operations:
body.metric_aggs('extended_stats_' + field, 'extended_stats', field) body.metric_aggs('extended_stats_' + field, 'extended_stats', field)
body.metric_aggs('percentiles_' + field, 'percentiles', field) body.metric_aggs('percentiles_' + field, 'percentiles', field)
print(body.to_search_body())
response = query_compiler._client.search( response = query_compiler._client.search(
index=query_compiler._index_pattern, index=query_compiler._index_pattern,
size=0, size=0,
@ -220,9 +330,12 @@ class Operations:
# Only return requested columns # Only return requested columns
columns = self.get_columns() columns = self.get_columns()
es_results = None
# If size=None use scan not search - then post sort results when in df # If size=None use scan not search - then post sort results when in df
# If size>10000 use scan # If size>10000 use scan
if size is not None and size <= 10000: if size is not None and size <= 10000:
if size > 0:
es_results = query_compiler._client.search( es_results = query_compiler._client.search(
index=query_compiler._index_pattern, index=query_compiler._index_pattern,
size=size, size=size,
@ -290,7 +403,7 @@ class Operations:
if field == Index.ID_INDEX_FIELD: if field == Index.ID_INDEX_FIELD:
body.ids(items, must=True) body.ids(items, must=True)
else: else:
body.terms(items, must=True) body.terms(field, items, must=True)
return query_compiler._client.count(index=query_compiler._index_pattern, body=body.to_count_body()) return query_compiler._client.count(index=query_compiler._index_pattern, body=body.to_count_body())
@ -334,8 +447,6 @@ class Operations:
return size, sort_params return size, sort_params
1
@staticmethod @staticmethod
def _count_post_processing(post_processing): def _count_post_processing(post_processing):
size = None size = None

74
eland/plotting.py Normal file
View File

@ -0,0 +1,74 @@
import numpy as np
import pandas.core.common as com
from pandas.core.dtypes.generic import (
ABCIndexClass)
from pandas.plotting._core import (
_raise_if_no_mpl, _converter, grouped_hist, _subplots, _flatten, _set_ticks_props)
def ed_hist_frame(ed_df, column=None, by=None, grid=True, xlabelsize=None,
xrot=None, ylabelsize=None, yrot=None, ax=None, sharex=False,
sharey=False, figsize=None, layout=None, bins=10, **kwds):
"""
Derived from pandas.plotting._core.hist_frame 0.24.2
Ideally, we'd call hist_frame directly with histogram data,
but weights are applied to ALL series. For example, we can
plot a histogram of pre-binned data via:
counts, bins = np.histogram(data)
plt.hist(bins[:-1], bins, weights=counts)
However,
ax.hist(data[col].dropna().values, bins=bins, **kwds)
is for [col] and weights are a single array.
We therefore cut/paste code.
"""
# Start with empty pandas data frame derived from
ed_df_bins, ed_df_weights = ed_df._hist(num_bins=bins)
_raise_if_no_mpl()
_converter._WARN = False
if by is not None:
raise NotImplementedError("TODO")
"""
axes = grouped_hist(data, column=column, by=by, ax=ax, grid=grid,
figsize=figsize, sharex=sharex, sharey=sharey,
layout=layout, bins=bins, xlabelsize=xlabelsize,
xrot=xrot, ylabelsize=ylabelsize,
yrot=yrot, **kwds)
"""
return axes
if column is not None:
if not isinstance(column, (list, np.ndarray, ABCIndexClass)):
column = [column]
ed_df_bins = ed_df_bins[column]
ed_df_weights = ed_df_weights[column]
naxes = len(ed_df_bins.columns)
fig, axes = _subplots(naxes=naxes, ax=ax, squeeze=False,
sharex=sharex, sharey=sharey, figsize=figsize,
layout=layout)
_axes = _flatten(axes)
for i, col in enumerate(com.try_sort(ed_df_bins.columns)):
ax = _axes[i]
# pandas code
# pandas / plotting / _core.py: 2410
# ax.hist(data[col].dropna().values, bins=bins, **kwds)
ax.hist(ed_df_bins[col][:-1], bins=ed_df_bins[col], weights=ed_df_weights[col])
ax.set_title(col)
ax.grid(grid)
_set_ticks_props(axes, xlabelsize=xlabelsize, xrot=xrot,
ylabelsize=ylabelsize, yrot=yrot)
fig.subplots_adjust(wspace=0.3, hspace=0.3)
return axes

View File

@ -68,6 +68,31 @@ class Query:
} }
self._aggs[name] = agg self._aggs[name] = agg
def hist_aggs(self, name, field, min_aggs, max_aggs, num_bins):
"""
Add histogram agg e.g.
"aggs": {
"name": {
"histogram": {
"field": "AvgTicketPrice"
"interval": (max_aggs[field] - min_aggs[field])/bins
}
}
}
"""
min = min_aggs[field]
max = max_aggs[field]
interval = (max - min)/num_bins
agg = {
"histogram": {
"field": field,
"interval": interval
}
}
self._aggs[name] = agg
def to_search_body(self): def to_search_body(self):
body = {"query": self._query, "aggs": self._aggs} body = {"query": self._query, "aggs": self._aggs}
return body return body

View File

@ -152,6 +152,8 @@ class ElandQueryCompiler(BaseQueryCompiler):
TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great) TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great)
NOTE - using this lists is generally not a good way to use this API NOTE - using this lists is generally not a good way to use this API
""" """
if results is None:
return self._empty_pd_ef()
def flatten_dict(y): def flatten_dict(y):
out = {} out = {}
@ -257,6 +259,13 @@ class ElandQueryCompiler(BaseQueryCompiler):
""" """
return self._operations.index_matches(self, self.index.index_field, items) return self._operations.index_matches(self, self.index.index_field, items)
def _empty_pd_ef(self):
# Return an empty dataframe with correct columns and dtypes
df = pd.DataFrame()
for c, d in zip(self.columns, self.dtypes):
df[c] = pd.Series(dtype=d)
return df
def copy(self): def copy(self):
return self.__constructor__( return self.__constructor__(
client=self._client, client=self._client,
@ -348,6 +357,8 @@ class ElandQueryCompiler(BaseQueryCompiler):
return self._operations.min(self) return self._operations.min(self)
def max(self): def max(self):
return self._operations.max(self) return self._operations.max(self)
def nunique(self):
return self._operations.nunique(self)
def info_es(self, buf): def info_es(self, buf):
buf.write("index_pattern: {index_pattern}\n".format(index_pattern=self._index_pattern)) buf.write("index_pattern: {index_pattern}\n".format(index_pattern=self._index_pattern))
@ -358,3 +369,6 @@ class ElandQueryCompiler(BaseQueryCompiler):
def describe(self): def describe(self):
return self._operations.describe(self) return self._operations.describe(self)
def _hist(self, num_bins):
return self._operations.hist(self, num_bins)

File diff suppressed because it is too large Load Diff

Binary file not shown.

View File

@ -4,7 +4,7 @@ from io import StringIO
from eland.tests.common import TestData from eland.tests.common import TestData
class TestDataFrameInfo(TestData): class TestDataFrameDescribe(TestData):
def test_to_describe1(self): def test_to_describe1(self):
pd_flights = self.pd_flights() pd_flights = self.pd_flights()
@ -13,6 +13,9 @@ class TestDataFrameInfo(TestData):
pd_describe = pd_flights.describe() pd_describe = pd_flights.describe()
ed_describe = ed_flights.describe() ed_describe = ed_flights.describe()
print(pd_describe)
print(ed_describe)
# TODO - this fails now as ES aggregations are approximate # TODO - this fails now as ES aggregations are approximate
# if ES percentile agg uses # if ES percentile agg uses
# "hdr": { # "hdr": {

View File

@ -0,0 +1,23 @@
# File called _pytest for PyCharm compatability
import pandas as pd
import eland as ed
from eland.tests.common import TestData
from eland.tests.common import (
assert_pandas_eland_frame_equal,
assert_pandas_eland_series_equal
)
import numpy as np
class TestDataFrameGet(TestData):
def test_get1(self):
ed_flights = self.ed_flights()
pd_flights = self.pd_flights()
ed_get0 = ed_flights.get('Carrier')
pd_get0 = pd_flights.get('Carrier')
print(ed_get0, type(ed_get0))
print(pd_get0, type(pd_get0))

View File

@ -80,3 +80,10 @@ class TestDataFrameHeadTail(TestData):
pd_head_4 = pd_tail_5.head(4) pd_head_4 = pd_tail_5.head(4)
assert_pandas_eland_frame_equal(pd_head_4, ed_head_4) assert_pandas_eland_frame_equal(pd_head_4, ed_head_4)
def test_head_0(self):
ed_flights = self.ed_flights()
pd_flights = self.pd_flights()
ed_head_0 = ed_flights.head(0)
pd_head_0 = pd_flights.head(0)
assert_pandas_eland_frame_equal(pd_head_0, ed_head_0)

View File

@ -0,0 +1,31 @@
# File called _pytest for PyCharm compatability
import numpy as np
import pandas as pd
from pandas.util.testing import assert_almost_equal
from eland.tests.common import TestData
class TestDataFrameHist(TestData):
def test_to_hist1(self):
pd_flights = self.pd_flights()
ed_flights = self.ed_flights()
num_bins = 10
# pandas data
pd_distancekilometers = np.histogram(pd_flights['DistanceKilometers'], num_bins)
pd_flightdelaymin = np.histogram(pd_flights['FlightDelayMin'], num_bins)
pd_bins = pd.DataFrame(
{'DistanceKilometers': pd_distancekilometers[1], 'FlightDelayMin': pd_flightdelaymin[1]})
pd_weights = pd.DataFrame(
{'DistanceKilometers': pd_distancekilometers[0], 'FlightDelayMin': pd_flightdelaymin[0]})
ed_bins, ed_weights = ed_flights[['DistanceKilometers', 'FlightDelayMin']]._hist(num_bins=num_bins)
# Numbers are slightly different
assert_almost_equal(pd_bins, ed_bins)
assert_almost_equal(pd_weights, ed_weights)

View File

@ -3,7 +3,7 @@
from eland.tests.common import TestData from eland.tests.common import TestData
class TestDataFrameInfo(TestData): class TestDataFrameInfoEs(TestData):
def test_to_info1(self): def test_to_info1(self):
ed_flights = self.ed_flights() ed_flights = self.ed_flights()

View File

@ -6,7 +6,7 @@ from eland.tests.common import TestData
from pandas.util.testing import assert_series_equal from pandas.util.testing import assert_series_equal
class TestDataFrameMean(TestData): class TestDataFrameMetrics(TestData):
def test_to_mean(self): def test_to_mean(self):
pd_flights = self.pd_flights() pd_flights = self.pd_flights()

View File

@ -0,0 +1,25 @@
# File called _pytest for PyCharm compatability
import pandas as pd
import eland as ed
from eland.tests.common import TestData
from eland.tests.common import (
assert_pandas_eland_frame_equal,
assert_pandas_eland_series_equal
)
import numpy as np
class TestDataFrameNUnique(TestData):
def test_nunique1(self):
ed_ecommerce = self.ed_ecommerce()
pd_ecommerce = self.pd_ecommerce()
print(pd_ecommerce.dtypes)
print(ed_ecommerce.dtypes)
#ed_nunique = ed_ecommerce.nunique()
pd_selection = pd_ecommerce.drop(columns=['category'])
pd_nunique = pd_selection.nunique(axis=1)
print(pd_nunique, type(pd_nunique))

View File

@ -0,0 +1,17 @@
# File called _pytest for PyCharm compatability
from eland.tests.common import TestData
import eland as ed
class TestDataFrameReviews(TestData):
def test_explore(self):
ed_reviews = ed.DataFrame('localhost', 'anonreviews')
print(ed_reviews.head())
print(ed_reviews.describe())
print(ed_reviews.info())
print(ed_reviews.hist(column="rating", bins = 5))
#print(ed_reviews.head().info_es())

View File

@ -0,0 +1,26 @@
# File called _pytest for PyCharm compatability
from eland.tests.common import TestData
class TestDataFrameShape(TestData):
def test_to_shape1(self):
pd_ecommerce = self.pd_ecommerce()
ed_ecommerce = self.ed_ecommerce()
pd_shape = pd_ecommerce.shape
ed_shape = ed_ecommerce.shape
assert pd_shape == ed_shape
def test_to_shape2(self):
pd_flights = self.pd_flights()
ed_flights = self.ed_flights()
pd_shape = pd_flights.shape
ed_shape = ed_flights.shape
assert pd_shape == ed_shape

View File

@ -0,0 +1,44 @@
# File called _pytest for PyCharm compatability
import numpy as np
import pandas as pd
import eland as ed
from eland.tests.common import ELASTICSEARCH_HOST
from eland.tests.common import TestData
class TestDataFrameUtils(TestData):
def test_generate_es_mappings(self):
df = pd.DataFrame(data={'A': np.random.rand(3),
'B': 1,
'C': 'foo',
'D': pd.Timestamp('20190102'),
'E': [1.0, 2.0, 3.0],
'F': False,
'G': [1, 2, 3]},
index=['0', '1', '2'])
expected_mappings = {'mappings': {
'properties': {'A': {'type': 'double'},
'B': {'type': 'long'},
'C': {'type': 'keyword'},
'D': {'type': 'date'},
'E': {'type': 'double'},
'F': {'type': 'boolean'},
'G': {'type': 'long'}}}}
mappings = ed.Mappings._generate_es_mappings(df)
assert expected_mappings == mappings
# Now create index
index_name = 'eland_test_generate_es_mappings'
ed.pandas_to_es(df, ELASTICSEARCH_HOST, index_name, if_exists="replace", refresh=True)
ed_df = ed.DataFrame(ELASTICSEARCH_HOST, index_name)
ed_df_head = ed_df.head()
# assert_frame_equal(df, ed_df_head)

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,68 @@
# File called _pytest for PyCharm compatability
from eland.tests.common import TestData
from pandas.util.testing import assert_series_equal
import numpy as np
import pandas as pd
class TestDataFrameHist(TestData):
def test_dataframe_hist1(self):
test_data = TestData()
pd_flights = test_data.pd_flights()[['DistanceKilometers', 'DistanceMiles', 'FlightDelayMin', 'FlightTimeHour']]
ed_flights = test_data.ed_flights()[['DistanceKilometers', 'DistanceMiles', 'FlightDelayMin', 'FlightTimeHour']]
"""
pd_flights.hist(figsize=[10, 10])
#ed_flights.hist(figsize=[10, 10])
pd_min = pd_flights['DistanceKilometers'].min()
pd_max = pd_flights['DistanceKilometers'].max()
#ed_min = ed_flights['DistanceKilometers'].min()
#ed_max = ed_flights['DistanceKilometers'].max()
#num_bins = 10.0
#bins = np.linspace(ed_min, ed_max, num=num_bins+1)
#print(bins)
#print(np.diff(bins).mean())
#hist = ed_flights['DistanceKilometers'].hist(np.diff(bins).mean())
x = [2956., 768., 719., 2662., 2934., 1320., 641., 529., 426., 104.]
bins = [0., 1988.14823146, 3976.29646292, 5964.44469437, 7952.59292583, 9940.74115729, 11928.88938875, 13917.03762021, 15905.18585166,17893.33408312,19881.48231458]
print(len(x))
print(len(bins))
a = bins[0:10]
print(np.histogram(a, weights=x, bins=bins))
#counts, bins = np.histogram(data)
#plt.hist(bins[:-1], bins, weights=counts)
"""
h1 = np.histogram(pd_flights['DistanceKilometers'], 10)
h2 = np.histogram(pd_flights['FlightDelayMin'], 10)
l1 = list(h1[0])
l2 = list(h2[0])
l1.append(0)
l2.append(0)
d = {'DistanceKilometers': h1[1],
'FlightDelayMin': h2[1]}
df = pd.DataFrame(data=d)
df.hist(weights=[l1, l2])

View File

@ -46,11 +46,11 @@ def pandas_to_es(df, es_params, destination_index, if_exists='fail', chunk_size=
) )
elif if_exists == "replace": elif if_exists == "replace":
client.index_delete(index=destination_index) client.index_delete(index=destination_index)
client.index_create(index=destination_index, mapping=mapping) client.index_create(index=destination_index, body=mapping)
# elif if_exists == "append": # elif if_exists == "append":
# TODO validate mapping is compatible # TODO validate mapping is compatible
else: else:
client.index_create(index=destination_index, mapping=mapping) client.index_create(index=destination_index, body=mapping)
# Now add data # Now add data
actions = [] actions = []