diff --git a/docs/source/conf.py b/docs/source/conf.py index 3e214b0..31b417f 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -59,7 +59,8 @@ except ImportError: extlinks = { 'pandas_api_docs': ('https://pandas.pydata.org/pandas-docs/version/0.25.1/reference/api/%s.html', ''), - 'pandas_user_guide': ('https://pandas.pydata.org/pandas-docs/stable/user_guide/%s.html', 'Pandas User Guide/') + 'pandas_user_guide': ('https://pandas.pydata.org/pandas-docs/stable/user_guide/%s.html', 'Pandas User Guide/'), + 'es_api_docs': ('https://www.elastic.co/guide/en/elasticsearch/reference/current/%s.html', '') } numpydoc_attributes_as_param_list = False diff --git a/docs/source/index.rst b/docs/source/index.rst index f8ba777..578bc75 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -30,10 +30,11 @@ In general, the data resides in elasticsearch and not in memory, which allows el * :doc:`reference/io` * :doc:`reference/general_utility_functions` * :doc:`reference/dataframe` + * :doc:`reference/series` + * :doc:`reference/index` * :doc:`reference/indexing` * :doc:`implementation/index` * :doc:`implementation/details` * :doc:`implementation/dataframe_supported` - diff --git a/docs/source/reference/api/eland.Series.value_counts.rst b/docs/source/reference/api/eland.Series.value_counts.rst new file mode 100644 index 0000000..8d020b0 --- /dev/null +++ b/docs/source/reference/api/eland.Series.value_counts.rst @@ -0,0 +1,6 @@ +eland.Series.value_counts +=========================== + +.. currentmodule:: eland + +.. automethod:: Series.value_counts diff --git a/docs/source/reference/index.rst b/docs/source/reference/index.rst index b7c6bf4..fc78c40 100644 --- a/docs/source/reference/index.rst +++ b/docs/source/reference/index.rst @@ -13,4 +13,5 @@ methods. All classes and functions exposed in ``eland.*`` namespace are public. io general_utility_functions dataframe + series indexing diff --git a/docs/source/reference/series.rst b/docs/source/reference/series.rst new file mode 100644 index 0000000..cbc8898 --- /dev/null +++ b/docs/source/reference/series.rst @@ -0,0 +1,13 @@ +.. _api.series: + +========= +Series +========= +.. currentmodule:: eland + +Computations / descriptive stats +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +.. autosummary:: + :toctree: api/ + + Series.value_counts \ No newline at end of file diff --git a/eland/dataframe.py b/eland/dataframe.py index 92d88a2..c724f6e 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -720,7 +720,7 @@ class DataFrame(NDFrame): def _getitem_column(self, key): if key not in self.columns: - raise KeyError("Requested column is not in the DataFrame {}".format(key)) + raise KeyError("Requested column [{}] is not in the DataFrame.".format(key)) s = self._reduce_dimension(self._query_compiler.getitem_column_array([key])) return s diff --git a/eland/operations.py b/eland/operations.py index aa1aea5..7f69446 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -130,12 +130,27 @@ class Operations: return self._metric_aggs(query_compiler, 'min') def nunique(self, query_compiler): - return self._terms_aggs(query_compiler, 'cardinality') + return self._metric_aggs(query_compiler, 'cardinality', field_types='aggregatable') + + def value_counts(self, query_compiler, es_size): + return self._terms_aggs(query_compiler, 'terms', es_size) def hist(self, query_compiler, bins): return self._hist_aggs(query_compiler, bins) - def _metric_aggs(self, query_compiler, func): + def _metric_aggs(self, query_compiler, func, field_types=None): + """ + Parameters + ---------- + field_types: str, default None + if `aggregatable` use only columns whose fields in elasticseach are aggregatable. + If `None`, use only numeric fields. + + Returns + ------- + pandas.Series + Series containing results of `func` applied to the column(s) + """ query_params, post_processing = self._resolve_tasks() size = self._size(query_params, post_processing) @@ -144,11 +159,17 @@ class Operations: columns = self.get_columns() - numeric_source_fields = query_compiler._mappings.numeric_source_fields(columns) - body = Query(query_params['query']) - for field in numeric_source_fields: + # some metrics aggs (including cardinality) work on all aggregatable fields + # therefore we include an optional all parameter on operations + # that call _metric_aggs + if field_types=='aggregatable': + source_fields = query_compiler._mappings.aggregatable_columns(columns) + else: + source_fields = query_compiler._mappings.numeric_source_fields(columns) + + for field in source_fields: body.metric_aggs(field, func, field) response = query_compiler._client.search( @@ -164,18 +185,32 @@ class Operations: # } results = {} - for field in numeric_source_fields: - results[field] = response['aggregations'][field]['value'] + if field_types=='aggregatable': + for key, value in source_fields.items(): + results[value] = response['aggregations'][key]['value'] + else: + for field in source_fields: + results[field] = response['aggregations'][field]['value'] # Return single value if this is a series # if len(numeric_source_fields) == 1: # return np.float64(results[numeric_source_fields[0]]) - - s = pd.Series(data=results, index=numeric_source_fields) + s = pd.Series(data=results, index=results.keys()) return s - def _terms_aggs(self, query_compiler, func): + def _terms_aggs(self, query_compiler, func, es_size=None): + """ + Parameters + ---------- + es_size: int, default None + Parameter used by Series.value_counts() + + Returns + ------- + pandas.Series + Series containing results of `func` applied to the column(s) + """ query_params, post_processing = self._resolve_tasks() size = self._size(query_params, post_processing) @@ -190,7 +225,7 @@ class Operations: body = Query(query_params['query']) for field in aggregatable_columns.keys(): - body.metric_aggs(field, func, field) + body.terms_aggs(field, func, field, es_size=es_size) response = query_compiler._client.search( index=query_compiler._index_pattern, @@ -200,9 +235,15 @@ class Operations: results = {} for key, value in aggregatable_columns.items(): - results[value] = response['aggregations'][key]['value'] + for bucket in response['aggregations'][columns[0]]['buckets']: + results[bucket['key']] = bucket['doc_count'] - s = pd.Series(data=results, index=results.keys()) + try: + name = columns[0] + except IndexError: + name = None + + s = pd.Series(data=results, index=results.keys(), name=name) return s @@ -379,7 +420,7 @@ class Operations: """ Results are like (for 'sum', 'min') - + AvgTicketPrice DistanceKilometers DistanceMiles FlightDelayMin sum 8.204365e+06 9.261629e+07 5.754909e+07 618150 min 1.000205e+02 0.000000e+00 0.000000e+00 0 diff --git a/eland/query.py b/eland/query.py index e7f02da..80a5161 100644 --- a/eland/query.py +++ b/eland/query.py @@ -69,6 +69,27 @@ class Query: else: self._query = self._query & ~(IsIn(field, items)) + def terms_aggs(self, name, func, field, es_size): + """ + Add terms agg e.g + + "aggs": { + "name": { + "terms": { + "field": "Airline", + "size": 10 + } + } + } + """ + agg = { + func: { + "field": field, + "size": es_size + } + } + self._aggs[name] = agg + def metric_aggs(self, name, func, field): """ Add metric agg e.g diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 1d24c5a..1288ba3 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -411,6 +411,9 @@ class ElandQueryCompiler: def nunique(self): return self._operations.nunique(self) + def value_counts(self, es_size): + return self._operations.value_counts(self, es_size) + def info_es(self, buf): buf.write("index_pattern: {index_pattern}\n".format(index_pattern=self._index_pattern)) diff --git a/eland/series.py b/eland/series.py index d7a3b3b..6318028 100644 --- a/eland/series.py +++ b/eland/series.py @@ -104,6 +104,48 @@ class Series(NDFrame): def tail(self, n=5): return Series(query_compiler=self._query_compiler.tail(n)) + def value_counts(self, es_size=10): + """ + Return the value counts for the specified field. + + **Note we can only do this for aggregatable Elasticsearch fields - (in general) numeric and keyword rather than text fields** + + TODO - implement remainder of pandas arguments + + Parameters + ---------- + es_size: int, default 10 + Number of buckets to return counts for, automatically sorts by count descending. + This parameter is specific to `eland`, and determines how many term buckets + elasticsearch should return out of the overall terms list. + + Returns + ------- + pandas.Series + number of occurences of each value in the column + + See Also + -------- + :pandas_api_docs:`pandas.Series.value_counts` + :es_api_docs:`search-aggregations-bucket-terms-aggregation` + + Examples + -------- + >>> df = ed.DataFrame('localhost', 'flights') + >>> df['Carrier'].value_counts() + Logstash Airways 3331 + JetBeats 3274 + Kibana Airlines 3234 + ES-Air 3220 + Name: Carrier, dtype: int64 + """ + if not isinstance(es_size, int): + raise TypeError("es_size must be a positive integer.") + if not es_size>0: + raise ValueError("es_size must be a positive integer.") + + return self._query_compiler.value_counts(es_size) + # ---------------------------------------------------------------------- # Rendering Methods def __repr__(self): diff --git a/eland/tests/series/test_value_counts_pytest.py b/eland/tests/series/test_value_counts_pytest.py new file mode 100644 index 0000000..f79bf4c --- /dev/null +++ b/eland/tests/series/test_value_counts_pytest.py @@ -0,0 +1,47 @@ +# File called _pytest for PyCharm compatability +import eland as ed +from eland.tests.common import TestData +from pandas.util.testing import assert_series_equal +import pytest + + +class TestSeriesValueCounts(TestData): + + def test_value_counts(self): + pd_s = self.pd_flights()['Carrier'] + ed_s = self.ed_flights()['Carrier'] + + pd_vc = pd_s.value_counts() + ed_vc = ed_s.value_counts() + + assert_series_equal(pd_vc, ed_vc) + + def test_value_counts_size(self): + pd_s = self.pd_flights()['Carrier'] + ed_s = self.ed_flights()['Carrier'] + + pd_vc = pd_s.value_counts()[:1] + ed_vc = ed_s.value_counts(es_size=1) + + assert_series_equal(pd_vc, ed_vc) + + def test_value_counts_keyerror(self): + ed_f = self.ed_flights() + with pytest.raises(KeyError): + assert ed_f['not_a_column'].value_counts() + + def test_value_counts_dataframe(self): + # value_counts() is a series method, should raise AttributeError if called on a DataFrame + ed_f = self.ed_flights() + with pytest.raises(AttributeError): + assert ed_f.value_counts() + + def test_value_counts_non_int(self): + ed_s = self.ed_flights()['Carrier'] + with pytest.raises(TypeError): + assert ed_s.value_counts(es_size='foo') + + def test_value_counts_non_positive_int(self): + ed_s = self.ed_flights()['Carrier'] + with pytest.raises(ValueError): + assert ed_s.value_counts(es_size=-9)