Demo day notebook + minor updates added

This commit is contained in:
Stephen Dodson 2019-08-15 12:26:58 +00:00
parent ef289bfe78
commit 337bef1c5d
10 changed files with 7250 additions and 20 deletions

View File

@ -227,6 +227,9 @@ class NDFrame(BasePandasDataset):
raise NotImplementedError("Only sum of numeric fields is implemented") raise NotImplementedError("Only sum of numeric fields is implemented")
return self._query_compiler.max() return self._query_compiler.max()
def nunique(self):
return self._query_compiler.nunique()
def _hist(self, num_bins): def _hist(self, num_bins):
return self._query_compiler._hist(num_bins) return self._query_compiler._hist(num_bins)

View File

@ -185,12 +185,12 @@ class Operations:
raise NotImplementedError("Can not count field matches if size is set {}".format(size)) raise NotImplementedError("Can not count field matches if size is set {}".format(size))
columns = self.get_columns() columns = self.get_columns()
if columns is None:
numeric_source_fields = query_compiler._mappings.numeric_source_fields(columns) columns = query_compiler._mappings.source_fields()
body = Query(query_params['query']) body = Query(query_params['query'])
for field in numeric_source_fields: for field in columns:
body.metric_aggs(field, func, field) body.metric_aggs(field, func, field)
response = query_compiler._client.search( response = query_compiler._client.search(
@ -200,10 +200,10 @@ class Operations:
results = {} results = {}
for field in numeric_source_fields: for field in columns:
results[field] = response['aggregations'][field]['value'] results[field] = response['aggregations'][field]['value']
s = pd.Series(data=results, index=numeric_source_fields) s = pd.Series(data=results, index=columns)
return s return s

View File

@ -33,6 +33,9 @@ class BooleanFilter(object):
return True return True
return False return False
def __repr__(self):
return str(self._filter)
@property @property
def subtree(self): def subtree(self):
if 'bool' in self._filter: if 'bool' in self._filter:

View File

@ -63,7 +63,7 @@ def ed_hist_frame(ed_df, column=None, by=None, grid=True, xlabelsize=None,
# pandas / plotting / _core.py: 2410 # pandas / plotting / _core.py: 2410
# ax.hist(data[col].dropna().values, bins=bins, **kwds) # ax.hist(data[col].dropna().values, bins=bins, **kwds)
ax.hist(ed_df_bins[col][:-1], bins=ed_df_bins[col], weights=ed_df_weights[col]) ax.hist(ed_df_bins[col][:-1], bins=ed_df_bins[col], weights=ed_df_weights[col], **kwds)
ax.set_title(col) ax.set_title(col)
ax.grid(grid) ax.grid(grid)

View File

@ -471,4 +471,6 @@ class ElandQueryCompiler(BaseQueryCompiler):
return result return result
#def isna(self):

View File

@ -20,7 +20,7 @@ import warnings
import pandas as pd import pandas as pd
from eland import NDFrame from eland import NDFrame
from eland.operators import Equal, Greater, ScriptFilter from eland.operators import NotFilter, Equal, Greater, Less, GreaterEqual, LessEqual, ScriptFilter
class Series(NDFrame): class Series(NDFrame):
@ -164,6 +164,37 @@ class Series(NDFrame):
else: else:
raise NotImplementedError(other, type(other)) raise NotImplementedError(other, type(other))
def __lt__(self, other):
if isinstance(other, Series):
# Need to use scripted query to compare to values
painless = "doc['{}'].value < doc['{}'].value".format(self.name, other.name)
return ScriptFilter(painless)
elif isinstance(other, (int, float)):
return Less(field=self.name, value=other)
else:
raise NotImplementedError(other, type(other))
def __ge__(self, other):
if isinstance(other, Series):
# Need to use scripted query to compare to values
painless = "doc['{}'].value >= doc['{}'].value".format(self.name, other.name)
return ScriptFilter(painless)
elif isinstance(other, (int, float)):
return GreaterEqual(field=self.name, value=other)
else:
raise NotImplementedError(other, type(other))
def __le__(self, other):
if isinstance(other, Series):
# Need to use scripted query to compare to values
painless = "doc['{}'].value <= doc['{}'].value".format(self.name, other.name)
return ScriptFilter(painless)
elif isinstance(other, (int, float)):
return LessEqual(field=self.name, value=other)
else:
raise NotImplementedError(other, type(other))
def __eq__(self, other): def __eq__(self, other):
if isinstance(other, Series): if isinstance(other, Series):
# Need to use scripted query to compare to values # Need to use scripted query to compare to values
@ -171,5 +202,19 @@ class Series(NDFrame):
return ScriptFilter(painless) return ScriptFilter(painless)
elif isinstance(other, (int, float)): elif isinstance(other, (int, float)):
return Equal(field=self.name, value=other) return Equal(field=self.name, value=other)
elif isinstance(other, str):
return Equal(field=self.name, value=other)
else:
raise NotImplementedError(other, type(other))
def __ne__(self, other):
if isinstance(other, Series):
# Need to use scripted query to compare to values
painless = "doc['{}'].value != doc['{}'].value".format(self.name, other.name)
return ScriptFilter(painless)
elif isinstance(other, (int, float)):
return NotFilter(Equal(field=self.name, value=other))
elif isinstance(other, str):
return NotFilter(Equal(field=self.name, value=other))
else: else:
raise NotImplementedError(other, type(other)) raise NotImplementedError(other, type(other))

23
eland/tests/DEMO.md Normal file
View File

@ -0,0 +1,23 @@
https://docs.google.com/presentation/d/1A3S5aIJC8SuEbi80PhEzyxTUNMjWJ7-_Om92yU9p3yo/edit#slide=id.g5f8a4bcb09_0_3
https://www.kaggle.com/pmarcelino/comprehensive-data-exploration-with-python
https://nbviewer.jupyter.org/github/parente/nbestimate/blob/master/estimate.ipynb
https://stackoverflow.blog/2017/09/14/python-growing-quickly/
https://github.com/elastic/eland
http://localhost:8889/notebooks/eland/tests/demo_day_20190815.ipynb
http://localhost:5601/app/kibana#/dev_tools/console?_g=()
devtool console:
```
GET _cat/indices
# Clean demo
DELETE ed_jetbeats_routes
# Demo day schema
GET flights
GET flights/_search
GET ed_jetbeats_routes
GET ed_jetbeats_routes/_search
```

View File

@ -13,13 +13,10 @@ import numpy as np
class TestDataFrameNUnique(TestData): class TestDataFrameNUnique(TestData):
def test_nunique1(self): def test_nunique1(self):
ed_ecommerce = self.ed_ecommerce() ed_flights = self.ed_flights()
pd_ecommerce = self.pd_ecommerce() pd_flights = self.pd_flights()
print(pd_ecommerce.dtypes) print(pd_flights.dtypes)
print(ed_ecommerce.dtypes) print(ed_flights.dtypes)
#ed_nunique = ed_ecommerce.nunique() print(ed_flights.nunique())
pd_selection = pd_ecommerce.drop(columns=['category'])
pd_nunique = pd_selection.nunique(axis=1)
print(pd_nunique, type(pd_nunique))

View File

@ -39,10 +39,8 @@ class TestDataFrameQuery(TestData):
assert_pandas_eland_frame_equal(pd_q2, ed_q2) assert_pandas_eland_frame_equal(pd_q2, ed_q2)
assert_pandas_eland_frame_equal(pd_q3, ed_q3) assert_pandas_eland_frame_equal(pd_q3, ed_q3)
def test_query2(self): pd_q4 = pd_df[(pd_df.A > 2) & (pd_df.B > 3)]
ed_flights = self.ed_flights() ed_q4 = ed_df[(ed_df.A > 2) & (ed_df.B > 3)]
pd_flights = self.pd_flights()
cancelled = pd_flights[pd_flights.Cancelled == True] assert_pandas_eland_frame_equal(pd_q4, ed_q4)
print(cancelled.groupby(['DestWeather']).count())

File diff suppressed because one or more lines are too long