diff --git a/eland/__init__.py b/eland/__init__.py index d8305a6..f82d933 100644 --- a/eland/__init__.py +++ b/eland/__init__.py @@ -1,5 +1,7 @@ from .utils import * -from .dataframe import * from .client import * -from .mappings import * +from .ndframe import * from .index import * +from .mappings import * +from .dataframe import * +from .series import * diff --git a/eland/dataframe.py b/eland/dataframe.py index bf2532f..35bfd4d 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -26,18 +26,18 @@ only Elasticsearch aggregatable fields can be aggregated or grouped. import sys import pandas as pd -from elasticsearch_dsl import Search -from pandas.compat import StringIO -from pandas.core import common as com -from pandas.io.common import _expand_user, _stringify_path + from pandas.io.formats import format as fmt from pandas.io.formats.printing import pprint_thing +from pandas.compat import StringIO +from pandas.io.common import _expand_user, _stringify_path from pandas.io.formats import console -import eland as ed +from eland import NDFrame +from eland import Index -class DataFrame(): +class DataFrame(NDFrame): """ pandas.DataFrame like API that proxies into Elasticsearch index(es). @@ -49,9 +49,6 @@ class DataFrame(): index_pattern : str An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*). - operations: list of operation - A list of Elasticsearch analytics operations e.g. filter, aggregations etc. - See Also -------- @@ -83,229 +80,14 @@ class DataFrame(): index_pattern, mappings=None, index_field=None): - - self._client = ed.Client(client) - self._index_pattern = index_pattern - - # Get and persist mappings, this allows us to correctly - # map returned types from Elasticsearch to pandas datatypes - if mappings is None: - self._mappings = ed.Mappings(self._client, self._index_pattern) - else: - self._mappings = mappings - - self._index = ed.Index(index_field) - - def _es_results_to_pandas(self, results): - """ - Parameters - ---------- - results: dict - Elasticsearch results from self.client.search - - Returns - ------- - df: pandas.DataFrame - _source values extracted from results and mapped to pandas DataFrame - dtypes are mapped via Mapping object - - Notes - ----- - Fields containing lists in Elasticsearch don't map easily to pandas.DataFrame - For example, an index with mapping: - ``` - "mappings" : { - "properties" : { - "group" : { - "type" : "keyword" - }, - "user" : { - "type" : "nested", - "properties" : { - "first" : { - "type" : "keyword" - }, - "last" : { - "type" : "keyword" - } - } - } - } - } - ``` - Adding a document: - ``` - "_source" : { - "group" : "amsterdam", - "user" : [ - { - "first" : "John", - "last" : "Smith" - }, - { - "first" : "Alice", - "last" : "White" - } - ] - } - ``` - (https://www.elastic.co/guide/en/elasticsearch/reference/current/nested.html) - this would be transformed internally (in Elasticsearch) into a document that looks more like this: - ``` - { - "group" : "amsterdam", - "user.first" : [ "alice", "john" ], - "user.last" : [ "smith", "white" ] - } - ``` - When mapping this a pandas data frame we mimic this transformation. - - Similarly, if a list is added to Elasticsearch: - ``` - PUT my_index/_doc/1 - { - "list" : [ - 0, 1, 2 - ] - } - ``` - The mapping is: - ``` - "mappings" : { - "properties" : { - "user" : { - "type" : "long" - } - } - } - ``` - TODO - explain how lists are handled (https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html) - TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great) - NOTE - using this lists is generally not a good way to use this API - """ - - def flatten_dict(y): - out = {} - - def flatten(x, name=''): - # We flatten into source fields e.g. if type=geo_point - # location: {lat=52.38, lon=4.90} - if name == '': - is_source_field = False - pd_dtype = 'object' - else: - is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(name[:-1]) - - if not is_source_field and type(x) is dict: - for a in x: - flatten(x[a], name + a + '.') - elif not is_source_field and type(x) is list: - for a in x: - flatten(a, name) - elif is_source_field == True: # only print source fields from mappings (TODO - not so efficient for large number of fields and filtered mapping) - field_name = name[:-1] - - # Coerce types - for now just datetime - if pd_dtype == 'datetime64[ns]': - x = pd.to_datetime(x) - - # Elasticsearch can have multiple values for a field. These are represented as lists, so - # create lists for this pivot (see notes above) - if field_name in out: - if type(out[field_name]) is not list: - l = [out[field_name]] - out[field_name] = l - out[field_name].append(x) - else: - out[field_name] = x - - flatten(y) - - return out - - rows = [] - index = [] - for hit in results['hits']['hits']: - row = hit['_source'] - - # get index value - can be _id or can be field value in source - if self._index.is_source_field: - index_field = row[self._index.index_field] - else: - index_field = hit[self._index.index_field] - index.append(index_field) - - # flatten row to map correctly to 2D DataFrame - rows.append(flatten_dict(row)) - - # Create pandas DataFrame - df = pd.DataFrame(data=rows, index=index) - - # _source may not contain all columns in the mapping - # therefore, fill in missing columns - # (note this returns self.columns NOT IN df.columns) - missing_columns = list(set(self.columns) - set(df.columns)) - - for missing in missing_columns: - is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(missing) - df[missing] = None - df[missing].astype(pd_dtype) - - # Sort columns in mapping order - df = df[self.columns] - - return df + # python 3 syntax + super().__init__(client, index_pattern, mappings=mappings, index_field=index_field) def head(self, n=5): - sort_params = self._index.sort_field + ":asc" - - results = self._client.search(index=self._index_pattern, size=n, sort=sort_params) - - return self._es_results_to_pandas(results) + return super()._head(n) def tail(self, n=5): - sort_params = self._index.sort_field + ":desc" - - results = self._client.search(index=self._index_pattern, size=n, sort=sort_params) - - df = self._es_results_to_pandas(results) - - # reverse order (index ascending) - return df.sort_index() - - def describe(self): - numeric_source_fields = self._mappings.numeric_source_fields() - - # for each field we compute: - # count, mean, std, min, 25%, 50%, 75%, max - search = Search(using=self._client, index=self._index_pattern).extra(size=0) - - for field in numeric_source_fields: - search.aggs.metric('extended_stats_' + field, 'extended_stats', field=field) - search.aggs.metric('percentiles_' + field, 'percentiles', field=field) - - response = search.execute() - - results = {} - - for field in numeric_source_fields: - values = [] - values.append(response.aggregations['extended_stats_' + field]['count']) - values.append(response.aggregations['extended_stats_' + field]['avg']) - values.append(response.aggregations['extended_stats_' + field]['std_deviation']) - values.append(response.aggregations['extended_stats_' + field]['min']) - values.append(response.aggregations['percentiles_' + field]['values']['25.0']) - values.append(response.aggregations['percentiles_' + field]['values']['50.0']) - values.append(response.aggregations['percentiles_' + field]['values']['75.0']) - values.append(response.aggregations['extended_stats_' + field]['max']) - - # if not None - if (values.count(None) < len(values)): - results[field] = values - - df = pd.DataFrame(data=results, index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max']) - - return df + return super()._tail(n) def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None, null_counts=None): @@ -325,7 +107,7 @@ class DataFrame(): lines = [] lines.append(str(type(self))) - lines.append(self.index_summary()) + lines.append(self._index_summary()) if len(self.columns) == 0: lines.append('Empty {name}'.format(name=type(self).__name__)) @@ -437,18 +219,14 @@ class DataFrame(): @property def columns(self): - return pd.Index(self._mappings.source_fields()) - - @property - def index(self): - return self._index + return super()._columns def set_index(self, index_field): copy = self.copy() - copy._index = ed.Index(index_field) + copy._index = Index(index_field) return copy - def index_summary(self): + def _index_summary(self): head = self.head(1).index[0] tail = self.tail(1).index[0] index_summary = ', %s to %s' % (pprint_thing(head), @@ -457,13 +235,6 @@ class DataFrame(): name = "Index" return '%s: %s entries%s' % (name, len(self), index_summary) - @property - def dtypes(self): - return self._mappings.dtypes() - - def get_dtype_counts(self): - return self._mappings.get_dtype_counts() - def count(self): """ Count non-NA cells for each column (TODO row) @@ -487,24 +258,9 @@ class DataFrame(): return count - def index_count(self): - """ - Returns - ------- - index_count: int - Count of docs where index_field exists - """ - exists_query = {"query": {"exists": {"field": self._index.index_field}}} + def describe(self): + return super()._describe() - index_count = self._client.count(index=self._index_pattern, body=exists_query) - - return index_count - - def _filter_by_columns(self, columns): - # Return new eland.DataFrame with modified mappings - mappings = ed.Mappings(mappings=self._mappings, columns=columns) - - return DataFrame(self._client, self._index_pattern, mappings=mappings) def __getitem__(self, key): # NOTE: there is a difference between pandas here. @@ -544,17 +300,14 @@ class DataFrame(): else: raise TypeError('__getitem__ arguments invalid: [{0}]'.format(key)) - return self._filter_by_columns(columns) + mappings = self._filter_mappings(columns) - def __len__(self): - """ - Returns length of info axis, but here we use the index. - """ - return self._client.count(index=self._index_pattern) + # Return new eland.DataFrame with modified mappings + return DataFrame(self._client, self._index_pattern, mappings=mappings) def copy(self): # TODO - test and validate...may need deep copying - return ed.DataFrame(self._client, + return DataFrame(self._client, self._index_pattern, self._mappings, self._index) @@ -590,73 +343,37 @@ class DataFrame(): if max_rows == None: max_rows = pd.get_option('display.max_rows') - sdf = self.__fake_dataframe__(max_rows=max_rows+1) - - _show_dimensions = show_dimensions + df = self._fake_head_tail_df(max_rows=max_rows+1) if buf is not None: _buf = _expand_user(_stringify_path(buf)) else: _buf = StringIO() - sdf.to_string(buf=_buf, columns=columns, - col_space=col_space, na_rep=na_rep, - formatters=formatters, - float_format=float_format, - sparsify=sparsify, justify=justify, - index_names=index_names, - header=header, index=index, - max_rows=max_rows, - max_cols=max_cols, - show_dimensions=False, # print this outside of this call - decimal=decimal, - line_width=line_width) + df.to_string(buf=_buf, columns=columns, + col_space=col_space, na_rep=na_rep, + formatters=formatters, + float_format=float_format, + sparsify=sparsify, justify=justify, + index_names=index_names, + header=header, index=index, + max_rows=max_rows, + max_cols=max_cols, + show_dimensions=False, # print this outside of this call + decimal=decimal, + line_width=line_width) - if _show_dimensions: + # Our fake dataframe has incorrect number of rows (max_rows*2+1) - write out + # the correct number of rows + if show_dimensions: _buf.write("\n\n[{nrows} rows x {ncols} columns]" - .format(nrows=self.index_count(), ncols=len(self.columns))) + .format(nrows=self._index_count(), ncols=len(self.columns))) if buf is None: result = _buf.getvalue() return result - def __fake_dataframe__(self, max_rows=1): - head_rows = int(max_rows / 2) + max_rows % 2 - tail_rows = max_rows - head_rows - - head = self.head(head_rows) - tail = self.tail(tail_rows) - - num_rows = len(self) - - if (num_rows > max_rows): - # If we have a lot of rows, create a SparseDataFrame and use - # pandas to_string logic - # NOTE: this sparse DataFrame can't be used as the middle - # section is all NaNs. However, it gives us potentially a nice way - # to use the pandas IO methods. - # TODO - if data is indexed by time series, return top/bottom of - # time series, rather than first max_rows items - """ - if tail_rows > 0: - locations = [0, num_rows - tail_rows] - lengths = [head_rows, tail_rows] - else: - locations = [0] - lengths = [head_rows] - - sdf = pd.DataFrame({item: pd.SparseArray(data=head[item], - sparse_index= - BlockIndex( - num_rows, locations, lengths)) - for item in self.columns}) - """ - return pd.concat([head, tail]) - - - return pd.concat([head, tail]) - # From pandas.DataFrame def _put_str(s, space): diff --git a/eland/ndframe.py b/eland/ndframe.py new file mode 100644 index 0000000..6f52e1c --- /dev/null +++ b/eland/ndframe.py @@ -0,0 +1,343 @@ +""" +NDFrame +--------- +Base class for eland.DataFrame and eland.Series. + +The underlying data resides in Elasticsearch and the API aligns as much as +possible with pandas APIs. + +This allows the eland.DataFrame to access large datasets stored in Elasticsearch, +without storing the dataset in local memory. + +Implementation Details +---------------------- + +Elasticsearch indexes can be configured in many different ways, and these indexes +utilise different data structures to pandas. + +eland.DataFrame operations that return individual rows (e.g. df.head()) return +_source data. If _source is not enabled, this data is not accessible. + +Similarly, only Elasticsearch searchable fields can be searched or filtered, and +only Elasticsearch aggregatable fields can be aggregated or grouped. + +""" +import pandas as pd +from elasticsearch_dsl import Search + +import eland as ed + + +class NDFrame(): + """ + pandas.DataFrame/Series like API that proxies into Elasticsearch index(es). + + Parameters + ---------- + client : eland.Client + A reference to a Elasticsearch python client + + index_pattern : str + An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*). + + See Also + -------- + + """ + + def __init__(self, + client, + index_pattern, + mappings=None, + index_field=None): + + self._client = ed.Client(client) + self._index_pattern = index_pattern + + # Get and persist mappings, this allows us to correctly + # map returned types from Elasticsearch to pandas datatypes + if mappings is None: + self._mappings = ed.Mappings(self._client, self._index_pattern) + else: + self._mappings = mappings + + self._index = ed.Index(index_field) + + def _es_results_to_pandas(self, results): + """ + Parameters + ---------- + results: dict + Elasticsearch results from self.client.search + + Returns + ------- + df: pandas.DataFrame + _source values extracted from results and mapped to pandas DataFrame + dtypes are mapped via Mapping object + + Notes + ----- + Fields containing lists in Elasticsearch don't map easily to pandas.DataFrame + For example, an index with mapping: + ``` + "mappings" : { + "properties" : { + "group" : { + "type" : "keyword" + }, + "user" : { + "type" : "nested", + "properties" : { + "first" : { + "type" : "keyword" + }, + "last" : { + "type" : "keyword" + } + } + } + } + } + ``` + Adding a document: + ``` + "_source" : { + "group" : "amsterdam", + "user" : [ + { + "first" : "John", + "last" : "Smith" + }, + { + "first" : "Alice", + "last" : "White" + } + ] + } + ``` + (https://www.elastic.co/guide/en/elasticsearch/reference/current/nested.html) + this would be transformed internally (in Elasticsearch) into a document that looks more like this: + ``` + { + "group" : "amsterdam", + "user.first" : [ "alice", "john" ], + "user.last" : [ "smith", "white" ] + } + ``` + When mapping this a pandas data frame we mimic this transformation. + + Similarly, if a list is added to Elasticsearch: + ``` + PUT my_index/_doc/1 + { + "list" : [ + 0, 1, 2 + ] + } + ``` + The mapping is: + ``` + "mappings" : { + "properties" : { + "user" : { + "type" : "long" + } + } + } + ``` + TODO - explain how lists are handled (https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html) + TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great) + NOTE - using this lists is generally not a good way to use this API + """ + def flatten_dict(y): + out = {} + + def flatten(x, name=''): + # We flatten into source fields e.g. if type=geo_point + # location: {lat=52.38, lon=4.90} + if name == '': + is_source_field = False + pd_dtype = 'object' + else: + is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(name[:-1]) + + if not is_source_field and type(x) is dict: + for a in x: + flatten(x[a], name + a + '.') + elif not is_source_field and type(x) is list: + for a in x: + flatten(a, name) + elif is_source_field == True: # only print source fields from mappings (TODO - not so efficient for large number of fields and filtered mapping) + field_name = name[:-1] + + # Coerce types - for now just datetime + if pd_dtype == 'datetime64[ns]': + x = pd.to_datetime(x) + + # Elasticsearch can have multiple values for a field. These are represented as lists, so + # create lists for this pivot (see notes above) + if field_name in out: + if type(out[field_name]) is not list: + l = [out[field_name]] + out[field_name] = l + out[field_name].append(x) + else: + out[field_name] = x + + flatten(y) + + return out + + rows = [] + index = [] + for hit in results['hits']['hits']: + row = hit['_source'] + + # get index value - can be _id or can be field value in source + if self._index.is_source_field: + index_field = row[self._index.index_field] + else: + index_field = hit[self._index.index_field] + index.append(index_field) + + # flatten row to map correctly to 2D DataFrame + rows.append(flatten_dict(row)) + + # Create pandas DataFrame + df = pd.DataFrame(data=rows, index=index) + + # _source may not contain all columns in the mapping + # therefore, fill in missing columns + # (note this returns self.columns NOT IN df.columns) + missing_columns = list(set(self._columns) - set(df.columns)) + + for missing in missing_columns: + is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(missing) + df[missing] = None + df[missing].astype(pd_dtype) + + # Sort columns in mapping order + df = df[self._columns] + + return df + + def _head(self, n=5): + """ + Protected method that returns head as pandas.DataFrame. + + Returns + ------- + _head + pandas.DataFrame of top N values + """ + sort_params = self._index.sort_field + ":asc" + + results = self._client.search(index=self._index_pattern, size=n, sort=sort_params) + + return self._es_results_to_pandas(results) + + def _tail(self, n=5): + """ + Protected method that returns tail as pandas.DataFrame. + + Returns + ------- + _tail + pandas.DataFrame of last N values + """ + sort_params = self._index.sort_field + ":desc" + + results = self._client.search(index=self._index_pattern, size=n, sort=sort_params) + + df = self._es_results_to_pandas(results) + + # reverse order (index ascending) + return df.sort_index() + + def _describe(self): + numeric_source_fields = self._mappings.numeric_source_fields() + + # for each field we compute: + # count, mean, std, min, 25%, 50%, 75%, max + search = Search(using=self._client, index=self._index_pattern).extra(size=0) + + for field in numeric_source_fields: + search.aggs.metric('extended_stats_' + field, 'extended_stats', field=field) + search.aggs.metric('percentiles_' + field, 'percentiles', field=field) + + response = search.execute() + + results = {} + + for field in numeric_source_fields: + values = list() + values.append(response.aggregations['extended_stats_' + field]['count']) + values.append(response.aggregations['extended_stats_' + field]['avg']) + values.append(response.aggregations['extended_stats_' + field]['std_deviation']) + values.append(response.aggregations['extended_stats_' + field]['min']) + values.append(response.aggregations['percentiles_' + field]['values']['25.0']) + values.append(response.aggregations['percentiles_' + field]['values']['50.0']) + values.append(response.aggregations['percentiles_' + field]['values']['75.0']) + values.append(response.aggregations['extended_stats_' + field]['max']) + + # if not None + if values.count(None) < len(values): + results[field] = values + + df = pd.DataFrame(data=results, index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max']) + + return df + + def _filter_mappings(self, columns): + mappings = ed.Mappings(mappings=self._mappings, columns=columns) + + return mappings + + @property + def index(self): + return self._index + + @property + def dtypes(self): + return self._mappings.dtypes() + + @property + def _columns(self): + return pd.Index(self._mappings.source_fields()) + + def get_dtype_counts(self): + return self._mappings.get_dtype_counts() + + + def _index_count(self): + """ + Returns + ------- + index_count: int + Count of docs where index_field exists + """ + exists_query = {"query": {"exists": {"field": self._index.index_field}}} + + index_count = self._client.count(index=self._index_pattern, body=exists_query) + + return index_count + + def __len__(self): + """ + Returns length of info axis, but here we use the index. + """ + return self._client.count(index=self._index_pattern) + + def _fake_head_tail_df(self, max_rows=1): + """ + Create a 'fake' pd.DataFrame of the entire ed.DataFrame + by concat head and tail. Used for display. + """ + head_rows = int(max_rows / 2) + max_rows % 2 + tail_rows = max_rows - head_rows + + head = self._head(head_rows) + tail = self._tail(tail_rows) + + return head.append(tail) diff --git a/eland/series.py b/eland/series.py new file mode 100644 index 0000000..735da96 --- /dev/null +++ b/eland/series.py @@ -0,0 +1,396 @@ +""" +Series +--------- +One-dimensional ndarray with axis labels (including time series). + +The underlying data resides in Elasticsearch and the API aligns as much as +possible with pandas.DataFrame API. + +This allows the eland.Series to access large datasets stored in Elasticsearch, +without storing the dataset in local memory. + +Implementation Details +---------------------- +Based on NDFrame which underpins eland.1DataFrame + +""" +import sys + +import pandas as pd +import pandas.compat as compat +from pandas.compat import StringIO +from pandas.core.dtypes.common import ( + is_categorical_dtype) +from pandas.io.formats import format as fmt +from pandas.io.formats.printing import pprint_thing + +from eland import Index +from eland import NDFrame + + +class Series(NDFrame): + """ + pandas.Series like API that proxies into Elasticsearch index(es). + + Parameters + ---------- + client : eland.Client + A reference to a Elasticsearch python client + + index_pattern : str + An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*). + + field_name : str + The field to base the series on + + See Also + -------- + + Examples + -------- + + import eland as ed + client = ed.Client(Elasticsearch()) + s = ed.DataFrame(client, 'reviews', 'date') + df.head() + reviewerId vendorId rating date + 0 0 0 5 2006-04-07 17:08 + 1 1 1 5 2006-05-04 12:16 + 2 2 2 4 2006-04-21 12:26 + 3 3 3 5 2006-04-18 15:48 + 4 3 4 5 2006-04-18 15:49 + + Notice that the types are based on Elasticsearch mappings + + Notes + ----- + If the Elasticsearch index is deleted or index mappings are changed after this + object is created, the object is not rebuilt and so inconsistencies can occur. + + """ + + def __init__(self, + client, + index_pattern, + field_name, + mappings=None, + index_field=None): + # python 3 syntax + super().__init__(client, index_pattern, mappings=mappings, index_field=index_field) + + # now select column (field_name) + self._mappings = self._filter_mappings([field_name]) + + def head(self, n=5): + return self._df_to_series(super()._head(n)) + + def tail(self, n=5): + return self._df_to_series(super()._tail(n)) + + def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None, + null_counts=None): + """ + Print a concise summary of a DataFrame. + + This method prints information about a DataFrame including + the index dtype and column dtypes, non-null values and memory usage. + + This copies a lot of code from pandas.DataFrame.info as it is difficult + to split out the appropriate code or creating a SparseDataFrame gives + incorrect results on types and counts. + """ + if buf is None: # pragma: no cover + buf = sys.stdout + + lines = [] + + lines.append(str(type(self))) + lines.append(self._index_summary()) + + if len(self.columns) == 0: + lines.append('Empty {name}'.format(name=type(self).__name__)) + fmt.buffer_put_lines(buf, lines) + return + + cols = self.columns + + # hack + if max_cols is None: + max_cols = pd.get_option('display.max_info_columns', + len(self.columns) + 1) + + max_rows = pd.get_option('display.max_info_rows', len(self) + 1) + + if null_counts is None: + show_counts = ((len(self.columns) <= max_cols) and + (len(self) < max_rows)) + else: + show_counts = null_counts + exceeds_info_cols = len(self.columns) > max_cols + + def _verbose_repr(): + lines.append('Data columns (total %d columns):' % + len(self.columns)) + space = max(len(pprint_thing(k)) for k in self.columns) + 4 + counts = None + + tmpl = "{count}{dtype}" + if show_counts: + counts = self.count() + if len(cols) != len(counts): # pragma: no cover + raise AssertionError( + 'Columns must equal counts ' + '({cols:d} != {counts:d})'.format( + cols=len(cols), counts=len(counts))) + tmpl = "{count} non-null {dtype}" + + dtypes = self.dtypes + for i, col in enumerate(self._columns): + dtype = dtypes.iloc[i] + col = pprint_thing(col) + + count = "" + if show_counts: + count = counts.iloc[i] + + lines.append(_put_str(col, space) + tmpl.format(count=count, + dtype=dtype)) + + def _non_verbose_repr(): + lines.append(self._columns._summary(name='Columns')) + + def _sizeof_fmt(num, size_qualifier): + # returns size in human readable format + for x in ['bytes', 'KB', 'MB', 'GB', 'TB']: + if num < 1024.0: + return ("{num:3.1f}{size_q} " + "{x}".format(num=num, size_q=size_qualifier, x=x)) + num /= 1024.0 + return "{num:3.1f}{size_q} {pb}".format(num=num, + size_q=size_qualifier, + pb='PB') + + if verbose: + _verbose_repr() + elif verbose is False: # specifically set to False, not nesc None + _non_verbose_repr() + else: + if exceeds_info_cols: + _non_verbose_repr() + else: + _verbose_repr() + + counts = self.get_dtype_counts() + dtypes = ['{k}({kk:d})'.format(k=k[0], kk=k[1]) for k + in sorted(counts.items())] + lines.append('dtypes: {types}'.format(types=', '.join(dtypes))) + + if memory_usage is None: + memory_usage = pd.get_option('display.memory_usage') + if memory_usage: + # append memory usage of df to display + size_qualifier = '' + + # TODO - this is different from pd.DataFrame as we shouldn't + # really hold much in memory. For now just approximate with getsizeof + ignore deep + mem_usage = sys.getsizeof(self) + lines.append("memory usage: {mem}\n".format( + mem=_sizeof_fmt(mem_usage, size_qualifier))) + + fmt.buffer_put_lines(buf, lines) + + @property + def shape(self): + """ + Return a tuple representing the dimensionality of the DataFrame. + + Returns + ------- + shape: tuple + 0 - number of rows + 1 - number of columns + """ + num_rows = len(self) + num_columns = len(self._columns) + + return num_rows, num_columns + + @property + def set_index(self, index_field): + copy = self.copy() + copy._index = Index(index_field) + return copy + + def _index_summary(self): + head = self.head(1).index[0] + tail = self.tail(1).index[0] + index_summary = ', %s to %s' % (pprint_thing(head), + pprint_thing(tail)) + + name = "Index" + return '%s: %s entries%s' % (name, len(self), index_summary) + + def count(self): + """ + Count non-NA cells for each column (TODO row) + + Counts are based on exists queries against ES + + This is inefficient, as it creates N queries (N is number of fields). + + An alternative approach is to use value_count aggregations. However, they have issues in that: + 1. They can only be used with aggregatable fields (e.g. keyword not text) + 2. For list fields they return multiple counts. E.g. tags=['elastic', 'ml'] returns value_count=2 + for a single document. + """ + counts = {} + for field in self._mappings.source_fields(): + exists_query = {"query": {"exists": {"field": field}}} + field_exists_count = self._client.count(index=self._index_pattern, body=exists_query) + counts[field] = field_exists_count + + count = pd.Series(data=counts, index=self._mappings.source_fields()) + + return count + + def describe(self): + return super()._describe() + + def _df_to_series(self, df): + return df.iloc[:, 0] + + # ---------------------------------------------------------------------- + # Rendering Methods + def __repr__(self): + """ + From pandas + """ + buf = StringIO() + + max_rows = pd.get_option("display.max_rows") + + self.to_string(buf=buf, na_rep='NaN', float_format=None, header=True, index=True, length=False, + dtype=False, name=False, max_rows=max_rows) + + return buf.getvalue() + + def to_string(self, buf=None, na_rep='NaN', + float_format=None, header=True, + index=True, length=True, dtype=True, + name=True, max_rows=None): + """ + From pandas + + Render a string representation of the Series. + + Parameters + ---------- + buf : StringIO-like, optional + buffer to write to + na_rep : string, optional + string representation of NAN to use, default 'NaN' + float_format : one-parameter function, optional + formatter function to apply to columns' elements if they are floats + default None + header : boolean, default True + Add the Series header (index name) + index : bool, optional + Add index (row) labels, default True + length : boolean, default False + Add the Series length + dtype : boolean, default False + Add the Series dtype + name : boolean, default False + Add the Series name if not None + max_rows : int, optional + Maximum number of rows to show before truncating. If None, show + all. + + Returns + ------- + formatted : string (if not buffer passed) + """ + if max_rows == None: + max_rows = pd.get_option("display.max_rows") + + df = self._fake_head_tail_df(max_rows=max_rows + 1) + + s = self._df_to_series(df) + + formatter = Series.SeriesFormatter(s, len(self), name=name, length=length, + header=header, index=index, + dtype=dtype, na_rep=na_rep, + float_format=float_format, + max_rows=max_rows) + result = formatter.to_string() + + # catch contract violations + if not isinstance(result, compat.text_type): + raise AssertionError("result must be of type unicode, type" + " of result is {0!r}" + "".format(result.__class__.__name__)) + + if buf is None: + return result + else: + try: + buf.write(result) + except AttributeError: + with open(buf, 'w') as f: + f.write(result) + + class SeriesFormatter(fmt.SeriesFormatter): + """ + A hacked overridden version of pandas.io.formats.SeriesFormatter that writes correct length + """ + + def __init__(self, series, series_length, buf=None, length=True, header=True, index=True, + na_rep='NaN', name=False, float_format=None, dtype=True, + max_rows=None): + super().__init__(series, buf=buf, length=length, header=header, index=index, + na_rep=na_rep, name=name, float_format=float_format, dtype=dtype, + max_rows=max_rows) + self._series_length = series_length + + def _get_footer(self): + """ + Overridden with length change + (from pandas 0.24.2 io.formats.SeriesFormatter) + """ + name = self.series.name + footer = '' + + if getattr(self.series.index, 'freq', None) is not None: + footer += 'Freq: {freq}'.format(freq=self.series.index.freqstr) + + if self.name is not False and name is not None: + if footer: + footer += ', ' + + series_name = pprint_thing(name, + escape_chars=('\t', '\r', '\n')) + footer += ("Name: {sname}".format(sname=series_name) + if name is not None else "") + + if (self.length is True or + (self.length == 'truncate' and self.truncate_v)): + if footer: + footer += ', ' + footer += 'Length: {length}'.format(length=self._series_length) + + if self.dtype is not False and self.dtype is not None: + name = getattr(self.tr_series.dtype, 'name', None) + if name: + if footer: + footer += ', ' + footer += 'dtype: {typ}'.format(typ=pprint_thing(name)) + + # level infos are added to the end and in a new line, like it is done + # for Categoricals + if is_categorical_dtype(self.tr_series.dtype): + level_info = self.tr_series._values._repr_categories_info() + if footer: + footer += "\n" + footer += level_info + + return compat.text_type(footer) diff --git a/eland/tests/client/test_mappings_pytest.py b/eland/tests/client/test_mappings_pytest.py index c56994f..5e50f17 100644 --- a/eland/tests/client/test_mappings_pytest.py +++ b/eland/tests/client/test_mappings_pytest.py @@ -16,7 +16,7 @@ class TestMapping(TestData): assert TEST_MAPPING1_EXPECTED_DF.index.tolist() == mappings.all_fields() - assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings.mappings_capabilities['es_dtype'])) + assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings._mappings_capabilities['es_dtype'])) assert TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT == mappings.count_source_fields() @@ -24,7 +24,7 @@ class TestMapping(TestData): mappings = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME) assert TEST_MAPPING1_EXPECTED_DF.index.tolist() == mappings.all_fields() - assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings.mappings_capabilities['es_dtype'])) + assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings._mappings_capabilities['es_dtype'])) assert TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT == mappings.count_source_fields() # Pick 1 source field @@ -43,7 +43,7 @@ class TestMapping(TestData): # Check original is still ok assert TEST_MAPPING1_EXPECTED_DF.index.tolist() == mappings.all_fields() - assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings.mappings_capabilities['es_dtype'])) + assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings._mappings_capabilities['es_dtype'])) assert TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT == mappings.count_source_fields() def test_dtypes(self): diff --git a/eland/tests/dataframe/test_indexing_pytest.py b/eland/tests/dataframe/test_basics_pytest.py similarity index 99% rename from eland/tests/dataframe/test_indexing_pytest.py rename to eland/tests/dataframe/test_basics_pytest.py index d3565d7..73e5fb1 100644 --- a/eland/tests/dataframe/test_indexing_pytest.py +++ b/eland/tests/dataframe/test_basics_pytest.py @@ -7,7 +7,7 @@ import io from pandas.util.testing import ( assert_series_equal, assert_frame_equal) -class TestDataFrameIndexing(TestData): +class TestDataFrameBasics(TestData): def test_mapping(self): ed_flights_mappings = pd.DataFrame(self.ed_flights()._mappings._mappings_capabilities diff --git a/eland/tests/series/test_basics_pytest.py b/eland/tests/series/test_basics_pytest.py new file mode 100644 index 0000000..861b8bf --- /dev/null +++ b/eland/tests/series/test_basics_pytest.py @@ -0,0 +1,32 @@ +# File called _pytest for PyCharm compatability +from eland.tests.common import TestData + +import pandas as pd +import eland as ed +import io + +from eland.tests import ELASTICSEARCH_HOST +from eland.tests import FLIGHTS_INDEX_NAME + +from pandas.util.testing import ( + assert_series_equal, assert_frame_equal) + +class TestSeriesBasics(TestData): + + def test_head_tail(self): + pd_s = self.pd_flights()['Carrier'] + ed_s = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'Carrier') + + pd_s_head = pd_s.head(10) + ed_s_head = ed_s.head(10) + + assert_series_equal(pd_s_head, ed_s_head) + + pd_s_tail = pd_s.tail(10) + ed_s_tail = ed_s.tail(10) + + assert_series_equal(pd_s_tail, ed_s_tail) + + def test_print(self): + ed_s = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'timestamp') + print(ed_s.to_string())