Adds Support for Series.value_counts() (#49)

* adds support for series.value_counts

* adds docs for series.value_counts

* adds tests for series.value_counts

* updates keyerror language

* adds es docs as an external source

* adds parameters for metrics and terms aggs

* adds 2 tests to check for exceptions

* explains the size parameter

* removes print statements from tests

* checks that es_size is a positive integer

* implements assert_series_equal
This commit is contained in:
Michael Hirsch 2019-11-19 11:27:15 -05:00 committed by GitHub
parent 885a0a4aba
commit 9c9ca90c0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 193 additions and 17 deletions

View File

@ -59,7 +59,8 @@ except ImportError:
extlinks = {
'pandas_api_docs': ('https://pandas.pydata.org/pandas-docs/version/0.25.1/reference/api/%s.html', ''),
'pandas_user_guide': ('https://pandas.pydata.org/pandas-docs/stable/user_guide/%s.html', 'Pandas User Guide/')
'pandas_user_guide': ('https://pandas.pydata.org/pandas-docs/stable/user_guide/%s.html', 'Pandas User Guide/'),
'es_api_docs': ('https://www.elastic.co/guide/en/elasticsearch/reference/current/%s.html', '')
}
numpydoc_attributes_as_param_list = False

View File

@ -30,10 +30,11 @@ In general, the data resides in elasticsearch and not in memory, which allows el
* :doc:`reference/io`
* :doc:`reference/general_utility_functions`
* :doc:`reference/dataframe`
* :doc:`reference/series`
* :doc:`reference/index`
* :doc:`reference/indexing`
* :doc:`implementation/index`
* :doc:`implementation/details`
* :doc:`implementation/dataframe_supported`

View File

@ -0,0 +1,6 @@
eland.Series.value_counts
===========================
.. currentmodule:: eland
.. automethod:: Series.value_counts

View File

@ -13,4 +13,5 @@ methods. All classes and functions exposed in ``eland.*`` namespace are public.
io
general_utility_functions
dataframe
series
indexing

View File

@ -0,0 +1,13 @@
.. _api.series:
=========
Series
=========
.. currentmodule:: eland
Computations / descriptive stats
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autosummary::
:toctree: api/
Series.value_counts

View File

@ -720,7 +720,7 @@ class DataFrame(NDFrame):
def _getitem_column(self, key):
if key not in self.columns:
raise KeyError("Requested column is not in the DataFrame {}".format(key))
raise KeyError("Requested column [{}] is not in the DataFrame.".format(key))
s = self._reduce_dimension(self._query_compiler.getitem_column_array([key]))
return s

View File

@ -130,12 +130,27 @@ class Operations:
return self._metric_aggs(query_compiler, 'min')
def nunique(self, query_compiler):
return self._terms_aggs(query_compiler, 'cardinality')
return self._metric_aggs(query_compiler, 'cardinality', field_types='aggregatable')
def value_counts(self, query_compiler, es_size):
return self._terms_aggs(query_compiler, 'terms', es_size)
def hist(self, query_compiler, bins):
return self._hist_aggs(query_compiler, bins)
def _metric_aggs(self, query_compiler, func):
def _metric_aggs(self, query_compiler, func, field_types=None):
"""
Parameters
----------
field_types: str, default None
if `aggregatable` use only columns whose fields in elasticseach are aggregatable.
If `None`, use only numeric fields.
Returns
-------
pandas.Series
Series containing results of `func` applied to the column(s)
"""
query_params, post_processing = self._resolve_tasks()
size = self._size(query_params, post_processing)
@ -144,11 +159,17 @@ class Operations:
columns = self.get_columns()
numeric_source_fields = query_compiler._mappings.numeric_source_fields(columns)
body = Query(query_params['query'])
for field in numeric_source_fields:
# some metrics aggs (including cardinality) work on all aggregatable fields
# therefore we include an optional all parameter on operations
# that call _metric_aggs
if field_types=='aggregatable':
source_fields = query_compiler._mappings.aggregatable_columns(columns)
else:
source_fields = query_compiler._mappings.numeric_source_fields(columns)
for field in source_fields:
body.metric_aggs(field, func, field)
response = query_compiler._client.search(
@ -164,18 +185,32 @@ class Operations:
# }
results = {}
for field in numeric_source_fields:
results[field] = response['aggregations'][field]['value']
if field_types=='aggregatable':
for key, value in source_fields.items():
results[value] = response['aggregations'][key]['value']
else:
for field in source_fields:
results[field] = response['aggregations'][field]['value']
# Return single value if this is a series
# if len(numeric_source_fields) == 1:
# return np.float64(results[numeric_source_fields[0]])
s = pd.Series(data=results, index=numeric_source_fields)
s = pd.Series(data=results, index=results.keys())
return s
def _terms_aggs(self, query_compiler, func):
def _terms_aggs(self, query_compiler, func, es_size=None):
"""
Parameters
----------
es_size: int, default None
Parameter used by Series.value_counts()
Returns
-------
pandas.Series
Series containing results of `func` applied to the column(s)
"""
query_params, post_processing = self._resolve_tasks()
size = self._size(query_params, post_processing)
@ -190,7 +225,7 @@ class Operations:
body = Query(query_params['query'])
for field in aggregatable_columns.keys():
body.metric_aggs(field, func, field)
body.terms_aggs(field, func, field, es_size=es_size)
response = query_compiler._client.search(
index=query_compiler._index_pattern,
@ -200,9 +235,15 @@ class Operations:
results = {}
for key, value in aggregatable_columns.items():
results[value] = response['aggregations'][key]['value']
for bucket in response['aggregations'][columns[0]]['buckets']:
results[bucket['key']] = bucket['doc_count']
s = pd.Series(data=results, index=results.keys())
try:
name = columns[0]
except IndexError:
name = None
s = pd.Series(data=results, index=results.keys(), name=name)
return s
@ -379,7 +420,7 @@ class Operations:
"""
Results are like (for 'sum', 'min')
AvgTicketPrice DistanceKilometers DistanceMiles FlightDelayMin
sum 8.204365e+06 9.261629e+07 5.754909e+07 618150
min 1.000205e+02 0.000000e+00 0.000000e+00 0

View File

@ -69,6 +69,27 @@ class Query:
else:
self._query = self._query & ~(IsIn(field, items))
def terms_aggs(self, name, func, field, es_size):
"""
Add terms agg e.g
"aggs": {
"name": {
"terms": {
"field": "Airline",
"size": 10
}
}
}
"""
agg = {
func: {
"field": field,
"size": es_size
}
}
self._aggs[name] = agg
def metric_aggs(self, name, func, field):
"""
Add metric agg e.g

View File

@ -411,6 +411,9 @@ class ElandQueryCompiler:
def nunique(self):
return self._operations.nunique(self)
def value_counts(self, es_size):
return self._operations.value_counts(self, es_size)
def info_es(self, buf):
buf.write("index_pattern: {index_pattern}\n".format(index_pattern=self._index_pattern))

View File

@ -104,6 +104,48 @@ class Series(NDFrame):
def tail(self, n=5):
return Series(query_compiler=self._query_compiler.tail(n))
def value_counts(self, es_size=10):
"""
Return the value counts for the specified field.
**Note we can only do this for aggregatable Elasticsearch fields - (in general) numeric and keyword rather than text fields**
TODO - implement remainder of pandas arguments
Parameters
----------
es_size: int, default 10
Number of buckets to return counts for, automatically sorts by count descending.
This parameter is specific to `eland`, and determines how many term buckets
elasticsearch should return out of the overall terms list.
Returns
-------
pandas.Series
number of occurences of each value in the column
See Also
--------
:pandas_api_docs:`pandas.Series.value_counts`
:es_api_docs:`search-aggregations-bucket-terms-aggregation`
Examples
--------
>>> df = ed.DataFrame('localhost', 'flights')
>>> df['Carrier'].value_counts()
Logstash Airways 3331
JetBeats 3274
Kibana Airlines 3234
ES-Air 3220
Name: Carrier, dtype: int64
"""
if not isinstance(es_size, int):
raise TypeError("es_size must be a positive integer.")
if not es_size>0:
raise ValueError("es_size must be a positive integer.")
return self._query_compiler.value_counts(es_size)
# ----------------------------------------------------------------------
# Rendering Methods
def __repr__(self):

View File

@ -0,0 +1,47 @@
# File called _pytest for PyCharm compatability
import eland as ed
from eland.tests.common import TestData
from pandas.util.testing import assert_series_equal
import pytest
class TestSeriesValueCounts(TestData):
def test_value_counts(self):
pd_s = self.pd_flights()['Carrier']
ed_s = self.ed_flights()['Carrier']
pd_vc = pd_s.value_counts()
ed_vc = ed_s.value_counts()
assert_series_equal(pd_vc, ed_vc)
def test_value_counts_size(self):
pd_s = self.pd_flights()['Carrier']
ed_s = self.ed_flights()['Carrier']
pd_vc = pd_s.value_counts()[:1]
ed_vc = ed_s.value_counts(es_size=1)
assert_series_equal(pd_vc, ed_vc)
def test_value_counts_keyerror(self):
ed_f = self.ed_flights()
with pytest.raises(KeyError):
assert ed_f['not_a_column'].value_counts()
def test_value_counts_dataframe(self):
# value_counts() is a series method, should raise AttributeError if called on a DataFrame
ed_f = self.ed_flights()
with pytest.raises(AttributeError):
assert ed_f.value_counts()
def test_value_counts_non_int(self):
ed_s = self.ed_flights()['Carrier']
with pytest.raises(TypeError):
assert ed_s.value_counts(es_size='foo')
def test_value_counts_non_positive_int(self):
ed_s = self.ed_flights()['Carrier']
with pytest.raises(ValueError):
assert ed_s.value_counts(es_size=-9)