Partial implementation of hist - does not work

Backup push
This commit is contained in:
Stephen Dodson 2019-07-12 15:24:32 +00:00
parent 9bf3505b7e
commit 1fa4d3fbe7
22 changed files with 18947 additions and 21 deletions

View File

@ -10,6 +10,7 @@ from .mappings import *
from .query import * from .query import *
from .operations import * from .operations import *
from .query_compiler import * from .query_compiler import *
from .plotting import *
from .ndframe import * from .ndframe import *
from .series import * from .series import *
from .dataframe import * from .dataframe import *

View File

@ -1,9 +1,9 @@
import warnings
import sys import sys
import warnings
import pandas as pd
import numpy as np import numpy as np
import pandas as pd
from distutils.version import LooseVersion
from pandas.compat import StringIO from pandas.compat import StringIO
from pandas.core.common import apply_if_callable, is_bool_indexer from pandas.core.common import apply_if_callable, is_bool_indexer
from pandas.io.common import _expand_user, _stringify_path from pandas.io.common import _expand_user, _stringify_path
@ -13,7 +13,7 @@ from pandas.io.formats.printing import pprint_thing
from eland import NDFrame from eland import NDFrame
from eland import Series from eland import Series
from eland import hist_frame
class DataFrame(NDFrame): class DataFrame(NDFrame):
# This is effectively 2 constructors # This is effectively 2 constructors
@ -74,6 +74,46 @@ class DataFrame(NDFrame):
return buf.getvalue() return buf.getvalue()
def _info_repr(self):
"""
True if the repr should show the info view.
"""
info_repr_option = (pd.get_option("display.large_repr") == "info")
return info_repr_option and not (self._repr_fits_horizontal_() and
self._repr_fits_vertical_())
def _repr_html_(self):
"""
From pandas
"""
try:
import IPython
except ImportError:
pass
else:
if LooseVersion(IPython.__version__) < LooseVersion('3.0'):
if console.in_qtconsole():
# 'HTML output is disabled in QtConsole'
return None
if self._info_repr():
buf = StringIO(u(""))
self.info(buf=buf)
# need to escape the <class>, should be the first line.
val = buf.getvalue().replace('<', r'&lt;', 1)
val = val.replace('>', r'&gt;', 1)
return '<pre>' + val + '</pre>'
if pd.get_option("display.notebook_repr_html"):
max_rows = pd.get_option("display.max_rows")
max_cols = pd.get_option("display.max_columns")
show_dimensions = pd.get_option("display.show_dimensions")
return self.to_html(max_rows=max_rows, max_cols=max_cols,
show_dimensions=show_dimensions, notebook=True)
else:
return None
def count(self): def count(self):
""" """
Count non-NA cells for each column (TODO row) Count non-NA cells for each column (TODO row)
@ -89,7 +129,6 @@ class DataFrame(NDFrame):
""" """
return self._query_compiler.count() return self._query_compiler.count()
def info_es(self): def info_es(self):
buf = StringIO() buf = StringIO()
@ -222,6 +261,45 @@ class DataFrame(NDFrame):
fmt.buffer_put_lines(buf, lines) fmt.buffer_put_lines(buf, lines)
def to_html(self, buf=None, columns=None, col_space=None, header=True,
index=True, na_rep='NaN', formatters=None, float_format=None,
sparsify=None, index_names=True, justify=None, max_rows=None,
max_cols=None, show_dimensions=False, decimal='.',
bold_rows=True, classes=None, escape=True, notebook=False,
border=None, table_id=None, render_links=False):
"""
From pandas - except we set max_rows default to avoid careless extraction of entire index
"""
if max_rows is None:
warnings.warn("DataFrame.to_string called without max_rows set "
"- this will return entire index results. "
"Setting max_rows=60, overwrite if different behaviour is required.")
max_rows = 60
# Create a slightly bigger dataframe than display
df = self._build_repr_df(max_rows + 1, max_cols)
if buf is not None:
_buf = _expand_user(_stringify_path(buf))
else:
_buf = StringIO()
df.to_html(buf=_buf, columns=columns, col_space=col_space, header=header,
index=index, na_rep=na_rep, formatters=formatters, float_format=float_format,
sparsify=sparsify, index_names=index_names, justify=justify, max_rows=max_rows,
max_cols=max_cols, show_dimensions=False, decimal=decimal,
bold_rows=bold_rows, classes=classes, escape=escape, notebook=notebook,
border=border, table_id=table_id, render_links=render_links)
# Our fake dataframe has incorrect number of rows (max_rows*2+1) - write out
# the correct number of rows
if show_dimensions:
_buf.write("\n<p>{nrows} rows x {ncols} columns</p>"
.format(nrows=len(self.index), ncols=len(self.columns)))
if buf is None:
result = _buf.getvalue()
return result
def to_string(self, buf=None, columns=None, col_space=None, header=True, def to_string(self, buf=None, columns=None, col_space=None, header=True,
index=True, na_rep='NaN', formatters=None, float_format=None, index=True, na_rep='NaN', formatters=None, float_format=None,
@ -238,7 +316,7 @@ class DataFrame(NDFrame):
max_rows = 60 max_rows = 60
# Create a slightly bigger dataframe than display # Create a slightly bigger dataframe than display
df = self._build_repr_df(max_rows+1, max_cols) df = self._build_repr_df(max_rows + 1, max_cols)
if buf is not None: if buf is not None:
_buf = _expand_user(_stringify_path(buf)) _buf = _expand_user(_stringify_path(buf))
@ -295,7 +373,6 @@ class DataFrame(NDFrame):
if key not in self.columns: if key not in self.columns:
raise KeyError("Requested column is not in the DataFrame {}".format(key)) raise KeyError("Requested column is not in the DataFrame {}".format(key))
s = self._reduce_dimension(self._query_compiler.getitem_column_array([key])) s = self._reduce_dimension(self._query_compiler.getitem_column_array([key]))
s._parent = self
return s return s
def _getitem_array(self, key): def _getitem_array(self, key):
@ -345,7 +422,7 @@ class DataFrame(NDFrame):
if not inplace: if not inplace:
return DataFrame(query_compiler=new_query_compiler) return DataFrame(query_compiler=new_query_compiler)
else: else:
self._query_compiler=new_query_compiler self._query_compiler = new_query_compiler
def _reduce_dimension(self, query_compiler): def _reduce_dimension(self, query_compiler):
return Series(query_compiler=query_compiler) return Series(query_compiler=query_compiler)
@ -353,7 +430,31 @@ class DataFrame(NDFrame):
def _to_pandas(self): def _to_pandas(self):
return self._query_compiler.to_pandas() return self._query_compiler.to_pandas()
def _empty_pd_df(self):
return self._query_compiler._empty_pd_ef()
def squeeze(self, axis=None): def squeeze(self, axis=None):
return DataFrame( return DataFrame(
query_compiler=self._query_compiler.squeeze(axis) query_compiler=self._query_compiler.squeeze(axis)
) )
@property
def shape(self):
"""
Return a tuple representing the dimensionality of the DataFrame.
Returns
-------
shape: tuple
0 - number of rows
1 - number of columns
"""
num_rows = len(self)
num_columns = len(self.columns)
return num_rows, num_columns
def keys(self):
return self.columns
hist = hist_frame

View File

@ -53,7 +53,6 @@ class Index:
# Make iterable # Make iterable
def __next__(self): def __next__(self):
# TODO resolve this hack to make this 'iterable' # TODO resolve this hack to make this 'iterable'
print("In Index.__next__")
raise StopIteration() raise StopIteration()
def __iter__(self): def __iter__(self):

View File

@ -403,7 +403,7 @@ class Mappings:
return is_source_field return is_source_field
def numeric_source_fields(self, columns): def numeric_source_fields(self, columns, include_bool=True):
""" """
Returns Returns
------- -------

View File

@ -94,7 +94,6 @@ class NDFrame(BasePandasDataset):
Returns: Returns:
The value of the attribute. The value of the attribute.
""" """
print(key)
try: try:
return object.__getattribute__(self, key) return object.__getattribute__(self, key)
except AttributeError as e: except AttributeError as e:
@ -228,5 +227,26 @@ class NDFrame(BasePandasDataset):
raise NotImplementedError("Only sum of numeric fields is implemented") raise NotImplementedError("Only sum of numeric fields is implemented")
return self._query_compiler.max() return self._query_compiler.max()
def _hist(self, interval)
return self._query_compiler._hist(interval)
def describe(self): def describe(self):
return self._query_compiler.describe() return self._query_compiler.describe()
def get(self, key, default=None):
"""Get item from object for given key (DataFrame column, Panel
slice, etc.). Returns default value if not found.
Args:
key (DataFrame column, Panel slice) : the key for which value
to get
Returns:
value (type of items contained in object) : A value that is
stored at the key
"""
if key in self.keys():
return self.__getitem__(key)
else:
return default

View File

@ -2,6 +2,7 @@ import copy
from enum import Enum from enum import Enum
import pandas as pd import pandas as pd
import numpy as np
from eland import Index from eland import Index
from eland import Query from eland import Query
@ -126,6 +127,12 @@ class Operations:
def min(self, query_compiler): def min(self, query_compiler):
return self._metric_aggs(query_compiler, 'min') return self._metric_aggs(query_compiler, 'min')
def nunique(self, query_compiler):
return self._terms_aggs(query_compiler, 'cardinality')
def hist(self, query_compiler, bins):
return self._hist_aggs(query_compiler, bins)
def _metric_aggs(self, query_compiler, func): def _metric_aggs(self, query_compiler, func):
query_params, post_processing = self._resolve_tasks() query_params, post_processing = self._resolve_tasks()
@ -155,6 +162,73 @@ class Operations:
# } # }
results = {} results = {}
for field in numeric_source_fields:
results[field] = response['aggregations'][field]['value']
# Return single value if this is a series
if len(numeric_source_fields) == 1:
return np.float64(results[numeric_source_fields[0]])
s = pd.Series(data=results, index=numeric_source_fields)
return s
def _terms_aggs(self, query_compiler, func):
query_params, post_processing = self._resolve_tasks()
size = self._size(query_params, post_processing)
if size is not None:
raise NotImplementedError("Can not count field matches if size is set {}".format(size))
columns = self.get_columns()
numeric_source_fields = query_compiler._mappings.numeric_source_fields(columns)
body = Query(query_params['query'])
for field in numeric_source_fields:
body.metric_aggs(field, func, field)
response = query_compiler._client.search(
index=query_compiler._index_pattern,
size=0,
body=body.to_search_body())
results = {}
for field in numeric_source_fields:
results[field] = response['aggregations'][field]['value']
s = pd.Series(data=results, index=numeric_source_fields)
return s
def _hist_aggs(self, query_compiler, bins):
query_params, post_processing = self._resolve_tasks()
size = self._size(query_params, post_processing)
if size is not None:
raise NotImplementedError("Can not count field matches if size is set {}".format(size))
columns = self.get_columns()
numeric_source_fields = query_compiler._mappings.numeric_source_fields(columns)
body = Query(query_params['query'])
min_aggs = self._metric_aggs(query_compiler, 'min')
max_aggs = self._metric_aggs(query_compiler, 'max')
for field in numeric_source_fields:
body.hist_aggs(field, min_aggs, max_aggs, bins)
response = query_compiler._client.search(
index=query_compiler._index_pattern,
size=0,
body=body.to_search_body())
results = {}
for field in numeric_source_fields: for field in numeric_source_fields:
results[field] = response['aggregations'][field]['value'] results[field] = response['aggregations'][field]['value']
@ -181,8 +255,6 @@ class Operations:
body.metric_aggs('extended_stats_' + field, 'extended_stats', field) body.metric_aggs('extended_stats_' + field, 'extended_stats', field)
body.metric_aggs('percentiles_' + field, 'percentiles', field) body.metric_aggs('percentiles_' + field, 'percentiles', field)
print(body.to_search_body())
response = query_compiler._client.search( response = query_compiler._client.search(
index=query_compiler._index_pattern, index=query_compiler._index_pattern,
size=0, size=0,
@ -219,15 +291,18 @@ class Operations:
# Only return requested columns # Only return requested columns
columns = self.get_columns() columns = self.get_columns()
es_results = None
# If size=None use scan not search - then post sort results when in df # If size=None use scan not search - then post sort results when in df
# If size>10000 use scan # If size>10000 use scan
if size is not None and size <= 10000: if size is not None and size <= 10000:
es_results = query_compiler._client.search( if size > 0:
index=query_compiler._index_pattern, es_results = query_compiler._client.search(
size=size, index=query_compiler._index_pattern,
sort=sort_params, size=size,
body=body.to_search_body(), sort=sort_params,
_source=columns) body=body.to_search_body(),
_source=columns)
else: else:
es_results = query_compiler._client.scan( es_results = query_compiler._client.scan(
index=query_compiler._index_pattern, index=query_compiler._index_pattern,

51
eland/plotting.py Normal file
View File

@ -0,0 +1,51 @@
import numpy as np
import pandas.core.common as com
from pandas.core.dtypes.generic import (
ABCIndexClass)
from pandas.plotting._core import (
_raise_if_no_mpl, _converter, grouped_hist, _subplots, _flatten, _set_ticks_props)
def hist_frame(ed_df, column=None, by=None, grid=True, xlabelsize=None,
xrot=None, ylabelsize=None, yrot=None, ax=None, sharex=False,
sharey=False, figsize=None, layout=None, bins=10, **kwds):
"""
Derived from pandas.plotting._core.hist_frame 0.24.2
"""
# Start with empty pandas data frame derived from
empty_pd_df = ed_df._empty_pd_df()
_raise_if_no_mpl()
_converter._WARN = False
if by is not None:
axes = grouped_hist(empty_pd_df, column=column, by=by, ax=ax, grid=grid,
figsize=figsize, sharex=sharex, sharey=sharey,
layout=layout, bins=bins, xlabelsize=xlabelsize,
xrot=xrot, ylabelsize=ylabelsize,
yrot=yrot, **kwds)
return axes
if column is not None:
if not isinstance(column, (list, np.ndarray, ABCIndexClass)):
column = [column]
empty_pd_df = empty_pd_df[column]
data = empty_pd_df._get_numeric_data()
naxes = len(empty_pd_df.columns)
fig, axes = _subplots(naxes=naxes, ax=ax, squeeze=False,
sharex=sharex, sharey=sharey, figsize=figsize,
layout=layout)
_axes = _flatten(axes)
for i, col in enumerate(com.try_sort(data.columns)):
ax = _axes[i]
ax.hist(empty_pd_df[col].dropna().values, bins=bins, **kwds)
ax.set_title(col)
ax.grid(grid)
_set_ticks_props(axes, xlabelsize=xlabelsize, xrot=xrot,
ylabelsize=ylabelsize, yrot=yrot)
fig.subplots_adjust(wspace=0.3, hspace=0.3)
return axes

View File

@ -68,6 +68,19 @@ class Query:
} }
self._aggs[name] = agg self._aggs[name] = agg
def hist_aggs(self, name, field, min_aggs, max_aggs, bins):
"""
Add histogram agg e.g.
"aggs": {
"name": {
"histogram": {
"field": "AvgTicketPrice"
"interval": (max_aggs[field] - min_aggs[field])/bins
}
}
}
"""
def to_search_body(self): def to_search_body(self):
body = {"query": self._query, "aggs": self._aggs} body = {"query": self._query, "aggs": self._aggs}
return body return body

View File

@ -152,6 +152,8 @@ class ElandQueryCompiler(BaseQueryCompiler):
TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great) TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great)
NOTE - using this lists is generally not a good way to use this API NOTE - using this lists is generally not a good way to use this API
""" """
if results is None:
return self._empty_pd_ef()
def flatten_dict(y): def flatten_dict(y):
out = {} out = {}
@ -257,6 +259,13 @@ class ElandQueryCompiler(BaseQueryCompiler):
""" """
return self._operations.index_matches(self, self.index.index_field, items) return self._operations.index_matches(self, self.index.index_field, items)
def _empty_pd_ef(self):
# Return an empty dataframe with correct columns and dtypes
df = pd.DataFrame()
for c, d in zip(self.columns, self.dtypes):
df[c] = pd.Series(dtype=d)
return df
def copy(self): def copy(self):
return self.__constructor__( return self.__constructor__(
client=self._client, client=self._client,
@ -348,6 +357,8 @@ class ElandQueryCompiler(BaseQueryCompiler):
return self._operations.min(self) return self._operations.min(self)
def max(self): def max(self):
return self._operations.max(self) return self._operations.max(self)
def nunique(self):
return self._operations.nunique(self)
def info_es(self, buf): def info_es(self, buf):
buf.write("index_pattern: {index_pattern}\n".format(index_pattern=self._index_pattern)) buf.write("index_pattern: {index_pattern}\n".format(index_pattern=self._index_pattern))
@ -358,3 +369,6 @@ class ElandQueryCompiler(BaseQueryCompiler):
def describe(self): def describe(self):
return self._operations.describe(self) return self._operations.describe(self)
def _hist(self, interval):
return self._operations.hist(self, interval)

File diff suppressed because it is too large Load Diff

Binary file not shown.

View File

@ -13,6 +13,9 @@ class TestDataFrameInfo(TestData):
pd_describe = pd_flights.describe() pd_describe = pd_flights.describe()
ed_describe = ed_flights.describe() ed_describe = ed_flights.describe()
print(pd_describe)
print(ed_describe)
# TODO - this fails now as ES aggregations are approximate # TODO - this fails now as ES aggregations are approximate
# if ES percentile agg uses # if ES percentile agg uses
# "hdr": { # "hdr": {

View File

@ -0,0 +1,23 @@
# File called _pytest for PyCharm compatability
import pandas as pd
import eland as ed
from eland.tests.common import TestData
from eland.tests.common import (
assert_pandas_eland_frame_equal,
assert_pandas_eland_series_equal
)
import numpy as np
class TestDataFrameiLoc(TestData):
def test_get1(self):
ed_flights = self.ed_flights()
pd_flights = self.pd_flights()
ed_get0 = ed_flights.get('Carrier')
pd_get0 = pd_flights.get('Carrier')
print(ed_get0, type(ed_get0))
print(pd_get0, type(pd_get0))

View File

@ -80,3 +80,10 @@ class TestDataFrameHeadTail(TestData):
pd_head_4 = pd_tail_5.head(4) pd_head_4 = pd_tail_5.head(4)
assert_pandas_eland_frame_equal(pd_head_4, ed_head_4) assert_pandas_eland_frame_equal(pd_head_4, ed_head_4)
def test_head_0(self):
ed_flights = self.ed_flights()
pd_flights = self.pd_flights()
ed_head_0 = ed_flights.head(0)
pd_head_0 = pd_flights.head(0)
assert_pandas_eland_frame_equal(pd_head_0, ed_head_0)

View File

@ -0,0 +1,25 @@
# File called _pytest for PyCharm compatability
import pandas as pd
import eland as ed
from eland.tests.common import TestData
from eland.tests.common import (
assert_pandas_eland_frame_equal,
assert_pandas_eland_series_equal
)
import numpy as np
class TestDataFrameNUnique(TestData):
def test_nunique1(self):
ed_ecommerce = self.ed_ecommerce()
pd_ecommerce = self.pd_ecommerce()
print(pd_ecommerce.dtypes)
print(ed_ecommerce.dtypes)
#ed_nunique = ed_ecommerce.nunique()
pd_selection = pd_ecommerce.drop(columns=['category'])
pd_nunique = pd_selection.nunique(axis=1)
print(pd_nunique, type(pd_nunique))

View File

@ -0,0 +1,17 @@
# File called _pytest for PyCharm compatability
from eland.tests.common import TestData
import eland as ed
class TestDataFrameReviews(TestData):
def test_explore(self):
ed_reviews = ed.DataFrame('localhost', 'anonreviews')
print(ed_reviews.head())
print(ed_reviews.describe())
print(ed_reviews.info())
print(ed_reviews.hist(column="rating", bins = 5))
#print(ed_reviews.head().info_es())

View File

@ -0,0 +1,26 @@
# File called _pytest for PyCharm compatability
from eland.tests.common import TestData
class TestDataFrameShape(TestData):
def test_to_shape1(self):
pd_ecommerce = self.pd_ecommerce()
ed_ecommerce = self.ed_ecommerce()
pd_shape = pd_ecommerce.shape
ed_shape = ed_ecommerce.shape
assert pd_shape == ed_shape
def test_to_shape2(self):
pd_flights = self.pd_flights()
ed_flights = self.ed_flights()
pd_shape = pd_flights.shape
ed_shape = ed_flights.shape
assert pd_shape == ed_shape

View File

@ -0,0 +1,44 @@
# File called _pytest for PyCharm compatability
import numpy as np
import pandas as pd
import eland as ed
from eland.tests.common import ELASTICSEARCH_HOST
from eland.tests.common import TestData
class TestDataFrameUtils(TestData):
def test_generate_es_mappings(self):
df = pd.DataFrame(data={'A': np.random.rand(3),
'B': 1,
'C': 'foo',
'D': pd.Timestamp('20190102'),
'E': [1.0, 2.0, 3.0],
'F': False,
'G': [1, 2, 3]},
index=['0', '1', '2'])
expected_mappings = {'mappings': {
'properties': {'A': {'type': 'double'},
'B': {'type': 'long'},
'C': {'type': 'keyword'},
'D': {'type': 'date'},
'E': {'type': 'double'},
'F': {'type': 'boolean'},
'G': {'type': 'long'}}}}
mappings = ed.Mappings._generate_es_mappings(df)
assert expected_mappings == mappings
# Now create index
index_name = 'eland_test_generate_es_mappings'
ed.pandas_to_es(df, ELASTICSEARCH_HOST, index_name, if_exists="replace", refresh=True)
ed_df = ed.DataFrame(ELASTICSEARCH_HOST, index_name)
ed_df_head = ed_df.head()
# assert_frame_equal(df, ed_df_head)

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,46 @@
# File called _pytest for PyCharm compatability
from eland.tests.common import TestData
from pandas.util.testing import assert_series_equal
import numpy as np
class TestDataFrameHist(TestData):
def test_dataframe_hist1(self):
test_data = TestData()
pd_flights = test_data.pd_flights()[['DistanceKilometers', 'DistanceMiles', 'FlightDelayMin', 'FlightTimeHour']]
ed_flights = test_data.ed_flights()[['DistanceKilometers', 'DistanceMiles', 'FlightDelayMin', 'FlightTimeHour']]
pd_flights.hist(figsize=[10, 10])
ed_flights.hist(figsize=[10, 10])
pd_min = pd_flights['DistanceKilometers'].min()
pd_max = pd_flights['DistanceKilometers'].max()
ed_min = ed_flights['DistanceKilometers'].min()
ed_max = ed_flights['DistanceKilometers'].max()
num_bins = 10.0
bins = np.linspace(ed_min, ed_max, num=num_bins+1)
print(bins)
print(np.diff(bins).mean())
hist = ed_flights['DistanceKilometers'].hist(np.diff(bins).mean())
x = [2956., 768., 719., 2662., 2934., 1320., 641., 529., 426., 104.]
bins = [0., 1988.14823146, 3976.29646292, 5964.44469437, 7952.59292583, 9940.74115729, 11928.88938875, 13917.03762021, 15905.18585166,17893.33408312,19881.48231458]
print(len(x))
print(len(bins))
a = bins[0:10]
print(np.histogram(a, weights=x, bins=bins))

View File

@ -46,11 +46,11 @@ def pandas_to_es(df, es_params, destination_index, if_exists='fail', chunk_size=
) )
elif if_exists == "replace": elif if_exists == "replace":
client.index_delete(index=destination_index) client.index_delete(index=destination_index)
client.index_create(index=destination_index, mapping=mapping) client.index_create(index=destination_index, body=mapping)
# elif if_exists == "append": # elif if_exists == "append":
# TODO validate mapping is compatible # TODO validate mapping is compatible
else: else:
client.index_create(index=destination_index, mapping=mapping) client.index_create(index=destination_index, body=mapping)
# Now add data # Now add data
actions = [] actions = []