From 30df901fce6b8911a72eba507879867b3f2b8e2a Mon Sep 17 00:00:00 2001 From: Stephen Dodson Date: Mon, 1 Jul 2019 18:41:56 +0000 Subject: [PATCH 1/2] Introduction of eland.Series - big refactor Creation of eland.NDFrame as base class for DataFrame and Series --- eland/__init__.py | 6 +- eland/dataframe.py | 357 ++-------------- eland/ndframe.py | 343 +++++++++++++++ eland/series.py | 396 ++++++++++++++++++ eland/tests/client/test_mappings_pytest.py | 6 +- ...dexing_pytest.py => test_basics_pytest.py} | 2 +- eland/tests/series/test_basics_pytest.py | 32 ++ 7 files changed, 816 insertions(+), 326 deletions(-) create mode 100644 eland/ndframe.py create mode 100644 eland/series.py rename eland/tests/dataframe/{test_indexing_pytest.py => test_basics_pytest.py} (99%) create mode 100644 eland/tests/series/test_basics_pytest.py diff --git a/eland/__init__.py b/eland/__init__.py index d8305a6..f82d933 100644 --- a/eland/__init__.py +++ b/eland/__init__.py @@ -1,5 +1,7 @@ from .utils import * -from .dataframe import * from .client import * -from .mappings import * +from .ndframe import * from .index import * +from .mappings import * +from .dataframe import * +from .series import * diff --git a/eland/dataframe.py b/eland/dataframe.py index bf2532f..35bfd4d 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -26,18 +26,18 @@ only Elasticsearch aggregatable fields can be aggregated or grouped. import sys import pandas as pd -from elasticsearch_dsl import Search -from pandas.compat import StringIO -from pandas.core import common as com -from pandas.io.common import _expand_user, _stringify_path + from pandas.io.formats import format as fmt from pandas.io.formats.printing import pprint_thing +from pandas.compat import StringIO +from pandas.io.common import _expand_user, _stringify_path from pandas.io.formats import console -import eland as ed +from eland import NDFrame +from eland import Index -class DataFrame(): +class DataFrame(NDFrame): """ pandas.DataFrame like API that proxies into Elasticsearch index(es). @@ -49,9 +49,6 @@ class DataFrame(): index_pattern : str An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*). - operations: list of operation - A list of Elasticsearch analytics operations e.g. filter, aggregations etc. - See Also -------- @@ -83,229 +80,14 @@ class DataFrame(): index_pattern, mappings=None, index_field=None): - - self._client = ed.Client(client) - self._index_pattern = index_pattern - - # Get and persist mappings, this allows us to correctly - # map returned types from Elasticsearch to pandas datatypes - if mappings is None: - self._mappings = ed.Mappings(self._client, self._index_pattern) - else: - self._mappings = mappings - - self._index = ed.Index(index_field) - - def _es_results_to_pandas(self, results): - """ - Parameters - ---------- - results: dict - Elasticsearch results from self.client.search - - Returns - ------- - df: pandas.DataFrame - _source values extracted from results and mapped to pandas DataFrame - dtypes are mapped via Mapping object - - Notes - ----- - Fields containing lists in Elasticsearch don't map easily to pandas.DataFrame - For example, an index with mapping: - ``` - "mappings" : { - "properties" : { - "group" : { - "type" : "keyword" - }, - "user" : { - "type" : "nested", - "properties" : { - "first" : { - "type" : "keyword" - }, - "last" : { - "type" : "keyword" - } - } - } - } - } - ``` - Adding a document: - ``` - "_source" : { - "group" : "amsterdam", - "user" : [ - { - "first" : "John", - "last" : "Smith" - }, - { - "first" : "Alice", - "last" : "White" - } - ] - } - ``` - (https://www.elastic.co/guide/en/elasticsearch/reference/current/nested.html) - this would be transformed internally (in Elasticsearch) into a document that looks more like this: - ``` - { - "group" : "amsterdam", - "user.first" : [ "alice", "john" ], - "user.last" : [ "smith", "white" ] - } - ``` - When mapping this a pandas data frame we mimic this transformation. - - Similarly, if a list is added to Elasticsearch: - ``` - PUT my_index/_doc/1 - { - "list" : [ - 0, 1, 2 - ] - } - ``` - The mapping is: - ``` - "mappings" : { - "properties" : { - "user" : { - "type" : "long" - } - } - } - ``` - TODO - explain how lists are handled (https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html) - TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great) - NOTE - using this lists is generally not a good way to use this API - """ - - def flatten_dict(y): - out = {} - - def flatten(x, name=''): - # We flatten into source fields e.g. if type=geo_point - # location: {lat=52.38, lon=4.90} - if name == '': - is_source_field = False - pd_dtype = 'object' - else: - is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(name[:-1]) - - if not is_source_field and type(x) is dict: - for a in x: - flatten(x[a], name + a + '.') - elif not is_source_field and type(x) is list: - for a in x: - flatten(a, name) - elif is_source_field == True: # only print source fields from mappings (TODO - not so efficient for large number of fields and filtered mapping) - field_name = name[:-1] - - # Coerce types - for now just datetime - if pd_dtype == 'datetime64[ns]': - x = pd.to_datetime(x) - - # Elasticsearch can have multiple values for a field. These are represented as lists, so - # create lists for this pivot (see notes above) - if field_name in out: - if type(out[field_name]) is not list: - l = [out[field_name]] - out[field_name] = l - out[field_name].append(x) - else: - out[field_name] = x - - flatten(y) - - return out - - rows = [] - index = [] - for hit in results['hits']['hits']: - row = hit['_source'] - - # get index value - can be _id or can be field value in source - if self._index.is_source_field: - index_field = row[self._index.index_field] - else: - index_field = hit[self._index.index_field] - index.append(index_field) - - # flatten row to map correctly to 2D DataFrame - rows.append(flatten_dict(row)) - - # Create pandas DataFrame - df = pd.DataFrame(data=rows, index=index) - - # _source may not contain all columns in the mapping - # therefore, fill in missing columns - # (note this returns self.columns NOT IN df.columns) - missing_columns = list(set(self.columns) - set(df.columns)) - - for missing in missing_columns: - is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(missing) - df[missing] = None - df[missing].astype(pd_dtype) - - # Sort columns in mapping order - df = df[self.columns] - - return df + # python 3 syntax + super().__init__(client, index_pattern, mappings=mappings, index_field=index_field) def head(self, n=5): - sort_params = self._index.sort_field + ":asc" - - results = self._client.search(index=self._index_pattern, size=n, sort=sort_params) - - return self._es_results_to_pandas(results) + return super()._head(n) def tail(self, n=5): - sort_params = self._index.sort_field + ":desc" - - results = self._client.search(index=self._index_pattern, size=n, sort=sort_params) - - df = self._es_results_to_pandas(results) - - # reverse order (index ascending) - return df.sort_index() - - def describe(self): - numeric_source_fields = self._mappings.numeric_source_fields() - - # for each field we compute: - # count, mean, std, min, 25%, 50%, 75%, max - search = Search(using=self._client, index=self._index_pattern).extra(size=0) - - for field in numeric_source_fields: - search.aggs.metric('extended_stats_' + field, 'extended_stats', field=field) - search.aggs.metric('percentiles_' + field, 'percentiles', field=field) - - response = search.execute() - - results = {} - - for field in numeric_source_fields: - values = [] - values.append(response.aggregations['extended_stats_' + field]['count']) - values.append(response.aggregations['extended_stats_' + field]['avg']) - values.append(response.aggregations['extended_stats_' + field]['std_deviation']) - values.append(response.aggregations['extended_stats_' + field]['min']) - values.append(response.aggregations['percentiles_' + field]['values']['25.0']) - values.append(response.aggregations['percentiles_' + field]['values']['50.0']) - values.append(response.aggregations['percentiles_' + field]['values']['75.0']) - values.append(response.aggregations['extended_stats_' + field]['max']) - - # if not None - if (values.count(None) < len(values)): - results[field] = values - - df = pd.DataFrame(data=results, index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max']) - - return df + return super()._tail(n) def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None, null_counts=None): @@ -325,7 +107,7 @@ class DataFrame(): lines = [] lines.append(str(type(self))) - lines.append(self.index_summary()) + lines.append(self._index_summary()) if len(self.columns) == 0: lines.append('Empty {name}'.format(name=type(self).__name__)) @@ -437,18 +219,14 @@ class DataFrame(): @property def columns(self): - return pd.Index(self._mappings.source_fields()) - - @property - def index(self): - return self._index + return super()._columns def set_index(self, index_field): copy = self.copy() - copy._index = ed.Index(index_field) + copy._index = Index(index_field) return copy - def index_summary(self): + def _index_summary(self): head = self.head(1).index[0] tail = self.tail(1).index[0] index_summary = ', %s to %s' % (pprint_thing(head), @@ -457,13 +235,6 @@ class DataFrame(): name = "Index" return '%s: %s entries%s' % (name, len(self), index_summary) - @property - def dtypes(self): - return self._mappings.dtypes() - - def get_dtype_counts(self): - return self._mappings.get_dtype_counts() - def count(self): """ Count non-NA cells for each column (TODO row) @@ -487,24 +258,9 @@ class DataFrame(): return count - def index_count(self): - """ - Returns - ------- - index_count: int - Count of docs where index_field exists - """ - exists_query = {"query": {"exists": {"field": self._index.index_field}}} + def describe(self): + return super()._describe() - index_count = self._client.count(index=self._index_pattern, body=exists_query) - - return index_count - - def _filter_by_columns(self, columns): - # Return new eland.DataFrame with modified mappings - mappings = ed.Mappings(mappings=self._mappings, columns=columns) - - return DataFrame(self._client, self._index_pattern, mappings=mappings) def __getitem__(self, key): # NOTE: there is a difference between pandas here. @@ -544,17 +300,14 @@ class DataFrame(): else: raise TypeError('__getitem__ arguments invalid: [{0}]'.format(key)) - return self._filter_by_columns(columns) + mappings = self._filter_mappings(columns) - def __len__(self): - """ - Returns length of info axis, but here we use the index. - """ - return self._client.count(index=self._index_pattern) + # Return new eland.DataFrame with modified mappings + return DataFrame(self._client, self._index_pattern, mappings=mappings) def copy(self): # TODO - test and validate...may need deep copying - return ed.DataFrame(self._client, + return DataFrame(self._client, self._index_pattern, self._mappings, self._index) @@ -590,73 +343,37 @@ class DataFrame(): if max_rows == None: max_rows = pd.get_option('display.max_rows') - sdf = self.__fake_dataframe__(max_rows=max_rows+1) - - _show_dimensions = show_dimensions + df = self._fake_head_tail_df(max_rows=max_rows+1) if buf is not None: _buf = _expand_user(_stringify_path(buf)) else: _buf = StringIO() - sdf.to_string(buf=_buf, columns=columns, - col_space=col_space, na_rep=na_rep, - formatters=formatters, - float_format=float_format, - sparsify=sparsify, justify=justify, - index_names=index_names, - header=header, index=index, - max_rows=max_rows, - max_cols=max_cols, - show_dimensions=False, # print this outside of this call - decimal=decimal, - line_width=line_width) + df.to_string(buf=_buf, columns=columns, + col_space=col_space, na_rep=na_rep, + formatters=formatters, + float_format=float_format, + sparsify=sparsify, justify=justify, + index_names=index_names, + header=header, index=index, + max_rows=max_rows, + max_cols=max_cols, + show_dimensions=False, # print this outside of this call + decimal=decimal, + line_width=line_width) - if _show_dimensions: + # Our fake dataframe has incorrect number of rows (max_rows*2+1) - write out + # the correct number of rows + if show_dimensions: _buf.write("\n\n[{nrows} rows x {ncols} columns]" - .format(nrows=self.index_count(), ncols=len(self.columns))) + .format(nrows=self._index_count(), ncols=len(self.columns))) if buf is None: result = _buf.getvalue() return result - def __fake_dataframe__(self, max_rows=1): - head_rows = int(max_rows / 2) + max_rows % 2 - tail_rows = max_rows - head_rows - - head = self.head(head_rows) - tail = self.tail(tail_rows) - - num_rows = len(self) - - if (num_rows > max_rows): - # If we have a lot of rows, create a SparseDataFrame and use - # pandas to_string logic - # NOTE: this sparse DataFrame can't be used as the middle - # section is all NaNs. However, it gives us potentially a nice way - # to use the pandas IO methods. - # TODO - if data is indexed by time series, return top/bottom of - # time series, rather than first max_rows items - """ - if tail_rows > 0: - locations = [0, num_rows - tail_rows] - lengths = [head_rows, tail_rows] - else: - locations = [0] - lengths = [head_rows] - - sdf = pd.DataFrame({item: pd.SparseArray(data=head[item], - sparse_index= - BlockIndex( - num_rows, locations, lengths)) - for item in self.columns}) - """ - return pd.concat([head, tail]) - - - return pd.concat([head, tail]) - # From pandas.DataFrame def _put_str(s, space): diff --git a/eland/ndframe.py b/eland/ndframe.py new file mode 100644 index 0000000..6f52e1c --- /dev/null +++ b/eland/ndframe.py @@ -0,0 +1,343 @@ +""" +NDFrame +--------- +Base class for eland.DataFrame and eland.Series. + +The underlying data resides in Elasticsearch and the API aligns as much as +possible with pandas APIs. + +This allows the eland.DataFrame to access large datasets stored in Elasticsearch, +without storing the dataset in local memory. + +Implementation Details +---------------------- + +Elasticsearch indexes can be configured in many different ways, and these indexes +utilise different data structures to pandas. + +eland.DataFrame operations that return individual rows (e.g. df.head()) return +_source data. If _source is not enabled, this data is not accessible. + +Similarly, only Elasticsearch searchable fields can be searched or filtered, and +only Elasticsearch aggregatable fields can be aggregated or grouped. + +""" +import pandas as pd +from elasticsearch_dsl import Search + +import eland as ed + + +class NDFrame(): + """ + pandas.DataFrame/Series like API that proxies into Elasticsearch index(es). + + Parameters + ---------- + client : eland.Client + A reference to a Elasticsearch python client + + index_pattern : str + An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*). + + See Also + -------- + + """ + + def __init__(self, + client, + index_pattern, + mappings=None, + index_field=None): + + self._client = ed.Client(client) + self._index_pattern = index_pattern + + # Get and persist mappings, this allows us to correctly + # map returned types from Elasticsearch to pandas datatypes + if mappings is None: + self._mappings = ed.Mappings(self._client, self._index_pattern) + else: + self._mappings = mappings + + self._index = ed.Index(index_field) + + def _es_results_to_pandas(self, results): + """ + Parameters + ---------- + results: dict + Elasticsearch results from self.client.search + + Returns + ------- + df: pandas.DataFrame + _source values extracted from results and mapped to pandas DataFrame + dtypes are mapped via Mapping object + + Notes + ----- + Fields containing lists in Elasticsearch don't map easily to pandas.DataFrame + For example, an index with mapping: + ``` + "mappings" : { + "properties" : { + "group" : { + "type" : "keyword" + }, + "user" : { + "type" : "nested", + "properties" : { + "first" : { + "type" : "keyword" + }, + "last" : { + "type" : "keyword" + } + } + } + } + } + ``` + Adding a document: + ``` + "_source" : { + "group" : "amsterdam", + "user" : [ + { + "first" : "John", + "last" : "Smith" + }, + { + "first" : "Alice", + "last" : "White" + } + ] + } + ``` + (https://www.elastic.co/guide/en/elasticsearch/reference/current/nested.html) + this would be transformed internally (in Elasticsearch) into a document that looks more like this: + ``` + { + "group" : "amsterdam", + "user.first" : [ "alice", "john" ], + "user.last" : [ "smith", "white" ] + } + ``` + When mapping this a pandas data frame we mimic this transformation. + + Similarly, if a list is added to Elasticsearch: + ``` + PUT my_index/_doc/1 + { + "list" : [ + 0, 1, 2 + ] + } + ``` + The mapping is: + ``` + "mappings" : { + "properties" : { + "user" : { + "type" : "long" + } + } + } + ``` + TODO - explain how lists are handled (https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html) + TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great) + NOTE - using this lists is generally not a good way to use this API + """ + def flatten_dict(y): + out = {} + + def flatten(x, name=''): + # We flatten into source fields e.g. if type=geo_point + # location: {lat=52.38, lon=4.90} + if name == '': + is_source_field = False + pd_dtype = 'object' + else: + is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(name[:-1]) + + if not is_source_field and type(x) is dict: + for a in x: + flatten(x[a], name + a + '.') + elif not is_source_field and type(x) is list: + for a in x: + flatten(a, name) + elif is_source_field == True: # only print source fields from mappings (TODO - not so efficient for large number of fields and filtered mapping) + field_name = name[:-1] + + # Coerce types - for now just datetime + if pd_dtype == 'datetime64[ns]': + x = pd.to_datetime(x) + + # Elasticsearch can have multiple values for a field. These are represented as lists, so + # create lists for this pivot (see notes above) + if field_name in out: + if type(out[field_name]) is not list: + l = [out[field_name]] + out[field_name] = l + out[field_name].append(x) + else: + out[field_name] = x + + flatten(y) + + return out + + rows = [] + index = [] + for hit in results['hits']['hits']: + row = hit['_source'] + + # get index value - can be _id or can be field value in source + if self._index.is_source_field: + index_field = row[self._index.index_field] + else: + index_field = hit[self._index.index_field] + index.append(index_field) + + # flatten row to map correctly to 2D DataFrame + rows.append(flatten_dict(row)) + + # Create pandas DataFrame + df = pd.DataFrame(data=rows, index=index) + + # _source may not contain all columns in the mapping + # therefore, fill in missing columns + # (note this returns self.columns NOT IN df.columns) + missing_columns = list(set(self._columns) - set(df.columns)) + + for missing in missing_columns: + is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(missing) + df[missing] = None + df[missing].astype(pd_dtype) + + # Sort columns in mapping order + df = df[self._columns] + + return df + + def _head(self, n=5): + """ + Protected method that returns head as pandas.DataFrame. + + Returns + ------- + _head + pandas.DataFrame of top N values + """ + sort_params = self._index.sort_field + ":asc" + + results = self._client.search(index=self._index_pattern, size=n, sort=sort_params) + + return self._es_results_to_pandas(results) + + def _tail(self, n=5): + """ + Protected method that returns tail as pandas.DataFrame. + + Returns + ------- + _tail + pandas.DataFrame of last N values + """ + sort_params = self._index.sort_field + ":desc" + + results = self._client.search(index=self._index_pattern, size=n, sort=sort_params) + + df = self._es_results_to_pandas(results) + + # reverse order (index ascending) + return df.sort_index() + + def _describe(self): + numeric_source_fields = self._mappings.numeric_source_fields() + + # for each field we compute: + # count, mean, std, min, 25%, 50%, 75%, max + search = Search(using=self._client, index=self._index_pattern).extra(size=0) + + for field in numeric_source_fields: + search.aggs.metric('extended_stats_' + field, 'extended_stats', field=field) + search.aggs.metric('percentiles_' + field, 'percentiles', field=field) + + response = search.execute() + + results = {} + + for field in numeric_source_fields: + values = list() + values.append(response.aggregations['extended_stats_' + field]['count']) + values.append(response.aggregations['extended_stats_' + field]['avg']) + values.append(response.aggregations['extended_stats_' + field]['std_deviation']) + values.append(response.aggregations['extended_stats_' + field]['min']) + values.append(response.aggregations['percentiles_' + field]['values']['25.0']) + values.append(response.aggregations['percentiles_' + field]['values']['50.0']) + values.append(response.aggregations['percentiles_' + field]['values']['75.0']) + values.append(response.aggregations['extended_stats_' + field]['max']) + + # if not None + if values.count(None) < len(values): + results[field] = values + + df = pd.DataFrame(data=results, index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max']) + + return df + + def _filter_mappings(self, columns): + mappings = ed.Mappings(mappings=self._mappings, columns=columns) + + return mappings + + @property + def index(self): + return self._index + + @property + def dtypes(self): + return self._mappings.dtypes() + + @property + def _columns(self): + return pd.Index(self._mappings.source_fields()) + + def get_dtype_counts(self): + return self._mappings.get_dtype_counts() + + + def _index_count(self): + """ + Returns + ------- + index_count: int + Count of docs where index_field exists + """ + exists_query = {"query": {"exists": {"field": self._index.index_field}}} + + index_count = self._client.count(index=self._index_pattern, body=exists_query) + + return index_count + + def __len__(self): + """ + Returns length of info axis, but here we use the index. + """ + return self._client.count(index=self._index_pattern) + + def _fake_head_tail_df(self, max_rows=1): + """ + Create a 'fake' pd.DataFrame of the entire ed.DataFrame + by concat head and tail. Used for display. + """ + head_rows = int(max_rows / 2) + max_rows % 2 + tail_rows = max_rows - head_rows + + head = self._head(head_rows) + tail = self._tail(tail_rows) + + return head.append(tail) diff --git a/eland/series.py b/eland/series.py new file mode 100644 index 0000000..735da96 --- /dev/null +++ b/eland/series.py @@ -0,0 +1,396 @@ +""" +Series +--------- +One-dimensional ndarray with axis labels (including time series). + +The underlying data resides in Elasticsearch and the API aligns as much as +possible with pandas.DataFrame API. + +This allows the eland.Series to access large datasets stored in Elasticsearch, +without storing the dataset in local memory. + +Implementation Details +---------------------- +Based on NDFrame which underpins eland.1DataFrame + +""" +import sys + +import pandas as pd +import pandas.compat as compat +from pandas.compat import StringIO +from pandas.core.dtypes.common import ( + is_categorical_dtype) +from pandas.io.formats import format as fmt +from pandas.io.formats.printing import pprint_thing + +from eland import Index +from eland import NDFrame + + +class Series(NDFrame): + """ + pandas.Series like API that proxies into Elasticsearch index(es). + + Parameters + ---------- + client : eland.Client + A reference to a Elasticsearch python client + + index_pattern : str + An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*). + + field_name : str + The field to base the series on + + See Also + -------- + + Examples + -------- + + import eland as ed + client = ed.Client(Elasticsearch()) + s = ed.DataFrame(client, 'reviews', 'date') + df.head() + reviewerId vendorId rating date + 0 0 0 5 2006-04-07 17:08 + 1 1 1 5 2006-05-04 12:16 + 2 2 2 4 2006-04-21 12:26 + 3 3 3 5 2006-04-18 15:48 + 4 3 4 5 2006-04-18 15:49 + + Notice that the types are based on Elasticsearch mappings + + Notes + ----- + If the Elasticsearch index is deleted or index mappings are changed after this + object is created, the object is not rebuilt and so inconsistencies can occur. + + """ + + def __init__(self, + client, + index_pattern, + field_name, + mappings=None, + index_field=None): + # python 3 syntax + super().__init__(client, index_pattern, mappings=mappings, index_field=index_field) + + # now select column (field_name) + self._mappings = self._filter_mappings([field_name]) + + def head(self, n=5): + return self._df_to_series(super()._head(n)) + + def tail(self, n=5): + return self._df_to_series(super()._tail(n)) + + def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None, + null_counts=None): + """ + Print a concise summary of a DataFrame. + + This method prints information about a DataFrame including + the index dtype and column dtypes, non-null values and memory usage. + + This copies a lot of code from pandas.DataFrame.info as it is difficult + to split out the appropriate code or creating a SparseDataFrame gives + incorrect results on types and counts. + """ + if buf is None: # pragma: no cover + buf = sys.stdout + + lines = [] + + lines.append(str(type(self))) + lines.append(self._index_summary()) + + if len(self.columns) == 0: + lines.append('Empty {name}'.format(name=type(self).__name__)) + fmt.buffer_put_lines(buf, lines) + return + + cols = self.columns + + # hack + if max_cols is None: + max_cols = pd.get_option('display.max_info_columns', + len(self.columns) + 1) + + max_rows = pd.get_option('display.max_info_rows', len(self) + 1) + + if null_counts is None: + show_counts = ((len(self.columns) <= max_cols) and + (len(self) < max_rows)) + else: + show_counts = null_counts + exceeds_info_cols = len(self.columns) > max_cols + + def _verbose_repr(): + lines.append('Data columns (total %d columns):' % + len(self.columns)) + space = max(len(pprint_thing(k)) for k in self.columns) + 4 + counts = None + + tmpl = "{count}{dtype}" + if show_counts: + counts = self.count() + if len(cols) != len(counts): # pragma: no cover + raise AssertionError( + 'Columns must equal counts ' + '({cols:d} != {counts:d})'.format( + cols=len(cols), counts=len(counts))) + tmpl = "{count} non-null {dtype}" + + dtypes = self.dtypes + for i, col in enumerate(self._columns): + dtype = dtypes.iloc[i] + col = pprint_thing(col) + + count = "" + if show_counts: + count = counts.iloc[i] + + lines.append(_put_str(col, space) + tmpl.format(count=count, + dtype=dtype)) + + def _non_verbose_repr(): + lines.append(self._columns._summary(name='Columns')) + + def _sizeof_fmt(num, size_qualifier): + # returns size in human readable format + for x in ['bytes', 'KB', 'MB', 'GB', 'TB']: + if num < 1024.0: + return ("{num:3.1f}{size_q} " + "{x}".format(num=num, size_q=size_qualifier, x=x)) + num /= 1024.0 + return "{num:3.1f}{size_q} {pb}".format(num=num, + size_q=size_qualifier, + pb='PB') + + if verbose: + _verbose_repr() + elif verbose is False: # specifically set to False, not nesc None + _non_verbose_repr() + else: + if exceeds_info_cols: + _non_verbose_repr() + else: + _verbose_repr() + + counts = self.get_dtype_counts() + dtypes = ['{k}({kk:d})'.format(k=k[0], kk=k[1]) for k + in sorted(counts.items())] + lines.append('dtypes: {types}'.format(types=', '.join(dtypes))) + + if memory_usage is None: + memory_usage = pd.get_option('display.memory_usage') + if memory_usage: + # append memory usage of df to display + size_qualifier = '' + + # TODO - this is different from pd.DataFrame as we shouldn't + # really hold much in memory. For now just approximate with getsizeof + ignore deep + mem_usage = sys.getsizeof(self) + lines.append("memory usage: {mem}\n".format( + mem=_sizeof_fmt(mem_usage, size_qualifier))) + + fmt.buffer_put_lines(buf, lines) + + @property + def shape(self): + """ + Return a tuple representing the dimensionality of the DataFrame. + + Returns + ------- + shape: tuple + 0 - number of rows + 1 - number of columns + """ + num_rows = len(self) + num_columns = len(self._columns) + + return num_rows, num_columns + + @property + def set_index(self, index_field): + copy = self.copy() + copy._index = Index(index_field) + return copy + + def _index_summary(self): + head = self.head(1).index[0] + tail = self.tail(1).index[0] + index_summary = ', %s to %s' % (pprint_thing(head), + pprint_thing(tail)) + + name = "Index" + return '%s: %s entries%s' % (name, len(self), index_summary) + + def count(self): + """ + Count non-NA cells for each column (TODO row) + + Counts are based on exists queries against ES + + This is inefficient, as it creates N queries (N is number of fields). + + An alternative approach is to use value_count aggregations. However, they have issues in that: + 1. They can only be used with aggregatable fields (e.g. keyword not text) + 2. For list fields they return multiple counts. E.g. tags=['elastic', 'ml'] returns value_count=2 + for a single document. + """ + counts = {} + for field in self._mappings.source_fields(): + exists_query = {"query": {"exists": {"field": field}}} + field_exists_count = self._client.count(index=self._index_pattern, body=exists_query) + counts[field] = field_exists_count + + count = pd.Series(data=counts, index=self._mappings.source_fields()) + + return count + + def describe(self): + return super()._describe() + + def _df_to_series(self, df): + return df.iloc[:, 0] + + # ---------------------------------------------------------------------- + # Rendering Methods + def __repr__(self): + """ + From pandas + """ + buf = StringIO() + + max_rows = pd.get_option("display.max_rows") + + self.to_string(buf=buf, na_rep='NaN', float_format=None, header=True, index=True, length=False, + dtype=False, name=False, max_rows=max_rows) + + return buf.getvalue() + + def to_string(self, buf=None, na_rep='NaN', + float_format=None, header=True, + index=True, length=True, dtype=True, + name=True, max_rows=None): + """ + From pandas + + Render a string representation of the Series. + + Parameters + ---------- + buf : StringIO-like, optional + buffer to write to + na_rep : string, optional + string representation of NAN to use, default 'NaN' + float_format : one-parameter function, optional + formatter function to apply to columns' elements if they are floats + default None + header : boolean, default True + Add the Series header (index name) + index : bool, optional + Add index (row) labels, default True + length : boolean, default False + Add the Series length + dtype : boolean, default False + Add the Series dtype + name : boolean, default False + Add the Series name if not None + max_rows : int, optional + Maximum number of rows to show before truncating. If None, show + all. + + Returns + ------- + formatted : string (if not buffer passed) + """ + if max_rows == None: + max_rows = pd.get_option("display.max_rows") + + df = self._fake_head_tail_df(max_rows=max_rows + 1) + + s = self._df_to_series(df) + + formatter = Series.SeriesFormatter(s, len(self), name=name, length=length, + header=header, index=index, + dtype=dtype, na_rep=na_rep, + float_format=float_format, + max_rows=max_rows) + result = formatter.to_string() + + # catch contract violations + if not isinstance(result, compat.text_type): + raise AssertionError("result must be of type unicode, type" + " of result is {0!r}" + "".format(result.__class__.__name__)) + + if buf is None: + return result + else: + try: + buf.write(result) + except AttributeError: + with open(buf, 'w') as f: + f.write(result) + + class SeriesFormatter(fmt.SeriesFormatter): + """ + A hacked overridden version of pandas.io.formats.SeriesFormatter that writes correct length + """ + + def __init__(self, series, series_length, buf=None, length=True, header=True, index=True, + na_rep='NaN', name=False, float_format=None, dtype=True, + max_rows=None): + super().__init__(series, buf=buf, length=length, header=header, index=index, + na_rep=na_rep, name=name, float_format=float_format, dtype=dtype, + max_rows=max_rows) + self._series_length = series_length + + def _get_footer(self): + """ + Overridden with length change + (from pandas 0.24.2 io.formats.SeriesFormatter) + """ + name = self.series.name + footer = '' + + if getattr(self.series.index, 'freq', None) is not None: + footer += 'Freq: {freq}'.format(freq=self.series.index.freqstr) + + if self.name is not False and name is not None: + if footer: + footer += ', ' + + series_name = pprint_thing(name, + escape_chars=('\t', '\r', '\n')) + footer += ("Name: {sname}".format(sname=series_name) + if name is not None else "") + + if (self.length is True or + (self.length == 'truncate' and self.truncate_v)): + if footer: + footer += ', ' + footer += 'Length: {length}'.format(length=self._series_length) + + if self.dtype is not False and self.dtype is not None: + name = getattr(self.tr_series.dtype, 'name', None) + if name: + if footer: + footer += ', ' + footer += 'dtype: {typ}'.format(typ=pprint_thing(name)) + + # level infos are added to the end and in a new line, like it is done + # for Categoricals + if is_categorical_dtype(self.tr_series.dtype): + level_info = self.tr_series._values._repr_categories_info() + if footer: + footer += "\n" + footer += level_info + + return compat.text_type(footer) diff --git a/eland/tests/client/test_mappings_pytest.py b/eland/tests/client/test_mappings_pytest.py index c56994f..5e50f17 100644 --- a/eland/tests/client/test_mappings_pytest.py +++ b/eland/tests/client/test_mappings_pytest.py @@ -16,7 +16,7 @@ class TestMapping(TestData): assert TEST_MAPPING1_EXPECTED_DF.index.tolist() == mappings.all_fields() - assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings.mappings_capabilities['es_dtype'])) + assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings._mappings_capabilities['es_dtype'])) assert TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT == mappings.count_source_fields() @@ -24,7 +24,7 @@ class TestMapping(TestData): mappings = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME) assert TEST_MAPPING1_EXPECTED_DF.index.tolist() == mappings.all_fields() - assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings.mappings_capabilities['es_dtype'])) + assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings._mappings_capabilities['es_dtype'])) assert TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT == mappings.count_source_fields() # Pick 1 source field @@ -43,7 +43,7 @@ class TestMapping(TestData): # Check original is still ok assert TEST_MAPPING1_EXPECTED_DF.index.tolist() == mappings.all_fields() - assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings.mappings_capabilities['es_dtype'])) + assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings._mappings_capabilities['es_dtype'])) assert TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT == mappings.count_source_fields() def test_dtypes(self): diff --git a/eland/tests/dataframe/test_indexing_pytest.py b/eland/tests/dataframe/test_basics_pytest.py similarity index 99% rename from eland/tests/dataframe/test_indexing_pytest.py rename to eland/tests/dataframe/test_basics_pytest.py index d3565d7..73e5fb1 100644 --- a/eland/tests/dataframe/test_indexing_pytest.py +++ b/eland/tests/dataframe/test_basics_pytest.py @@ -7,7 +7,7 @@ import io from pandas.util.testing import ( assert_series_equal, assert_frame_equal) -class TestDataFrameIndexing(TestData): +class TestDataFrameBasics(TestData): def test_mapping(self): ed_flights_mappings = pd.DataFrame(self.ed_flights()._mappings._mappings_capabilities diff --git a/eland/tests/series/test_basics_pytest.py b/eland/tests/series/test_basics_pytest.py new file mode 100644 index 0000000..861b8bf --- /dev/null +++ b/eland/tests/series/test_basics_pytest.py @@ -0,0 +1,32 @@ +# File called _pytest for PyCharm compatability +from eland.tests.common import TestData + +import pandas as pd +import eland as ed +import io + +from eland.tests import ELASTICSEARCH_HOST +from eland.tests import FLIGHTS_INDEX_NAME + +from pandas.util.testing import ( + assert_series_equal, assert_frame_equal) + +class TestSeriesBasics(TestData): + + def test_head_tail(self): + pd_s = self.pd_flights()['Carrier'] + ed_s = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'Carrier') + + pd_s_head = pd_s.head(10) + ed_s_head = ed_s.head(10) + + assert_series_equal(pd_s_head, ed_s_head) + + pd_s_tail = pd_s.tail(10) + ed_s_tail = ed_s.tail(10) + + assert_series_equal(pd_s_tail, ed_s_tail) + + def test_print(self): + ed_s = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'timestamp') + print(ed_s.to_string()) From 5e10b2e818c9af5e1c39309c395aff76f2814821 Mon Sep 17 00:00:00 2001 From: Stephen Dodson Date: Wed, 3 Jul 2019 09:49:58 +0000 Subject: [PATCH 2/2] Checkpoint code before attempting major investigation into using modin --- eland/__init__.py | 4 +- eland/client.py | 9 +- eland/dataframe.py | 28 ++++-- eland/mappings.py | 91 ++++++++++++++++++++ eland/ndframe.py | 34 +++++++- eland/series.py | 20 +++-- eland/tests/client/test_mappings_pytest.py | 34 ++++++++ eland/tests/dataframe/test_basics_pytest.py | 4 + eland/tests/dataframe/test_getitem_pytest.py | 28 ++---- eland/utils.py | 73 +++++++++++++++- 10 files changed, 281 insertions(+), 44 deletions(-) diff --git a/eland/__init__.py b/eland/__init__.py index f82d933..eee58ae 100644 --- a/eland/__init__.py +++ b/eland/__init__.py @@ -1,7 +1,7 @@ -from .utils import * from .client import * from .ndframe import * from .index import * from .mappings import * -from .dataframe import * from .series import * +from .dataframe import * +from .utils import * diff --git a/eland/client.py b/eland/client.py index aa1a6a1..3b1231f 100644 --- a/eland/client.py +++ b/eland/client.py @@ -1,4 +1,5 @@ from elasticsearch import Elasticsearch +from elasticsearch import helpers class Client(): """ @@ -17,7 +18,13 @@ class Client(): def indices(self): return self.es.indices - + + def bulk(self, actions, refresh=False): + return helpers.bulk(self.es, actions, refresh=refresh) + + def scan(self, **kwargs): + return helpers.scan(self.es, **kwargs) + def search(self, **kwargs): return self.es.search(**kwargs) diff --git a/eland/dataframe.py b/eland/dataframe.py index 35bfd4d..b4ed1c4 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -32,9 +32,14 @@ from pandas.io.formats.printing import pprint_thing from pandas.compat import StringIO from pandas.io.common import _expand_user, _stringify_path from pandas.io.formats import console +from pandas.core import common as com from eland import NDFrame from eland import Index +from eland import Series + + + class DataFrame(NDFrame): @@ -217,10 +222,6 @@ class DataFrame(NDFrame): return num_rows, num_columns - @property - def columns(self): - return super()._columns - def set_index(self, index_field): copy = self.copy() copy._index = Index(index_field) @@ -265,7 +266,6 @@ class DataFrame(NDFrame): def __getitem__(self, key): # NOTE: there is a difference between pandas here. # e.g. df['a'] returns pd.Series, df[['a','b']] return pd.DataFrame - # we always return DataFrame - TODO maybe create eland.Series at some point... # Implementation mainly copied from pandas v0.24.2 # (https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html) @@ -291,10 +291,12 @@ class DataFrame(NDFrame): # We are left with two options: a single key, and a collection of keys, columns = [] + is_single_key = False if isinstance(key, str): if not self._mappings.is_source_field(key): raise TypeError('Column does not exist: [{0}]'.format(key)) columns.append(key) + is_single_key = True elif isinstance(key, list): columns.extend(key) else: @@ -303,7 +305,18 @@ class DataFrame(NDFrame): mappings = self._filter_mappings(columns) # Return new eland.DataFrame with modified mappings - return DataFrame(self._client, self._index_pattern, mappings=mappings) + if is_single_key: + return Series(self._client, self._index_pattern, mappings=mappings) + else: + return DataFrame(self._client, self._index_pattern, mappings=mappings) + + + def __getattr__(self, name): + # Note: obj.x will always call obj.__getattribute__('x') prior to + # calling obj.__getattr__('x'). + mappings = self._filter_mappings([name]) + + return Series(self._client, self._index_pattern, mappings=mappings) def copy(self): # TODO - test and validate...may need deep copying @@ -373,7 +386,8 @@ class DataFrame(NDFrame): result = _buf.getvalue() return result - + def to_pandas(selfs): + return super()._to_pandas() # From pandas.DataFrame def _put_str(s, space): diff --git a/eland/mappings.py b/eland/mappings.py index b9e31e7..1dc0bee 100644 --- a/eland/mappings.py +++ b/eland/mappings.py @@ -2,6 +2,7 @@ import warnings import pandas as pd +from pandas.core.dtypes.common import (is_float_dtype, is_bool_dtype, is_integer_dtype, is_datetime_or_timedelta_dtype, is_string_dtype) class Mappings(): """ @@ -217,6 +218,7 @@ class Mappings(): return capability_matrix_df.sort_index() + @staticmethod def _es_dtype_to_pd_dtype(es_dtype): """ Mapping Elasticsearch types to pandas dtypes @@ -259,6 +261,84 @@ class Mappings(): # Return 'object' for all unsupported TODO - investigate how different types could be supported return 'object' + @staticmethod + def _pd_dtype_to_es_dtype(pd_dtype): + """ + Mapping pandas dtypes to Elasticsearch dtype + -------------------------------------------- + + ``` + Pandas dtype Python type NumPy type Usage + object str string_, unicode_ Text + int64 int int_, int8, int16, int32, int64, uint8, uint16, uint32, uint64 Integer numbers + float64 float float_, float16, float32, float64 Floating point numbers + bool bool bool_ True/False values + datetime64 NA datetime64[ns] Date and time values + timedelta[ns] NA NA Differences between two datetimes + category NA NA Finite list of text values + ``` + """ + es_dtype = None + + # Map all to 64-bit - TODO map to specifics: int32 -> int etc. + if is_float_dtype(pd_dtype): + es_dtype = 'double' + elif is_integer_dtype(pd_dtype): + es_dtype = 'long' + elif is_bool_dtype(pd_dtype): + es_dtype = 'boolean' + elif is_string_dtype(pd_dtype): + es_dtype = 'keyword' + elif is_datetime_or_timedelta_dtype(pd_dtype): + es_dtype = 'date' + else: + warnings.warn('No mapping for pd_dtype: [{0}], using default mapping'.format(pd_dtype)) + + return es_dtype + + @staticmethod + def _generate_es_mappings(dataframe): + """Given a pandas dataframe, generate the associated Elasticsearch mapping + + Parameters + ---------- + dataframe : pandas.DataFrame + pandas.DataFrame to create schema from + + Returns + ------- + mapping : str + """ + + """ + "mappings" : { + "properties" : { + "AvgTicketPrice" : { + "type" : "float" + }, + "Cancelled" : { + "type" : "boolean" + }, + "Carrier" : { + "type" : "keyword" + }, + "Dest" : { + "type" : "keyword" + } + } + } + """ + + mappings = {} + mappings['properties'] = {} + for column_name, dtype in dataframe.dtypes.iteritems(): + es_dtype = Mappings._pd_dtype_to_es_dtype(dtype) + + mappings['properties'][column_name] = {} + mappings['properties'][column_name]['type'] = es_dtype + + return {"mappings": mappings} + def all_fields(self): """ Returns @@ -379,3 +459,14 @@ class Mappings(): """ return pd.Series(self._mappings_capabilities[self._mappings_capabilities._source == True].groupby('pd_dtype')[ '_source'].count().to_dict()) + + def to_pandas(self): + """ + + Returns + ------- + df : pd.DataFrame + pandas DaraFrame representing this index + """ + + diff --git a/eland/ndframe.py b/eland/ndframe.py index 6f52e1c..9fbd312 100644 --- a/eland/ndframe.py +++ b/eland/ndframe.py @@ -23,10 +23,14 @@ only Elasticsearch aggregatable fields can be aggregated or grouped. """ import pandas as pd +import functools from elasticsearch_dsl import Search import eland as ed +from pandas.core.generic import NDFrame as pd_NDFrame +from pandas._libs import Timestamp, iNaT, properties + class NDFrame(): """ @@ -44,7 +48,6 @@ class NDFrame(): -------- """ - def __init__(self, client, index_pattern, @@ -191,7 +194,12 @@ class NDFrame(): rows = [] index = [] - for hit in results['hits']['hits']: + if isinstance(results, dict): + iterator = results['hits']['hits'] + else: + iterator = results + + for hit in iterator: row = hit['_source'] # get index value - can be _id or can be field value in source @@ -255,6 +263,23 @@ class NDFrame(): # reverse order (index ascending) return df.sort_index() + def _to_pandas(self): + """ + Protected method that returns all data as pandas.DataFrame. + + Returns + ------- + df + pandas.DataFrame of all values + """ + sort_params = self._index.sort_field + ":asc" + + results = self._client.scan(index=self._index_pattern) + + # We sort here rather than in scan - once everything is in core this + # should be faster + return self._es_results_to_pandas(results) + def _describe(self): numeric_source_fields = self._mappings.numeric_source_fields() @@ -294,6 +319,10 @@ class NDFrame(): return mappings + @property + def columns(self): + return self._columns + @property def index(self): return self._index @@ -309,7 +338,6 @@ class NDFrame(): def get_dtype_counts(self): return self._mappings.get_dtype_counts() - def _index_count(self): """ Returns diff --git a/eland/series.py b/eland/series.py index 735da96..47473ed 100644 --- a/eland/series.py +++ b/eland/series.py @@ -72,14 +72,17 @@ class Series(NDFrame): def __init__(self, client, index_pattern, - field_name, + field_name=None, mappings=None, index_field=None): # python 3 syntax super().__init__(client, index_pattern, mappings=mappings, index_field=index_field) # now select column (field_name) - self._mappings = self._filter_mappings([field_name]) + if field_name is not None: + self._mappings = self._filter_mappings([field_name]) + elif len(self._mappings.source_fields()) != 1: + raise TypeError('Series must have 1 field: [{0}]'.format(len(self._mappings.source_fields()))) def head(self, n=5): return self._df_to_series(super()._head(n)) @@ -199,6 +202,10 @@ class Series(NDFrame): fmt.buffer_put_lines(buf, lines) + @property + def name(self): + return list(self._mappings.source_fields())[0] + @property def shape(self): """ @@ -257,7 +264,7 @@ class Series(NDFrame): return super()._describe() def _df_to_series(self, df): - return df.iloc[:, 0] + return df[self.name] # ---------------------------------------------------------------------- # Rendering Methods @@ -269,8 +276,8 @@ class Series(NDFrame): max_rows = pd.get_option("display.max_rows") - self.to_string(buf=buf, na_rep='NaN', float_format=None, header=True, index=True, length=False, - dtype=False, name=False, max_rows=max_rows) + self.to_string(buf=buf, na_rep='NaN', float_format=None, header=True, index=True, length=True, + dtype=True, name=True, max_rows=max_rows) return buf.getvalue() @@ -279,7 +286,7 @@ class Series(NDFrame): index=True, length=True, dtype=True, name=True, max_rows=None): """ - From pandas + From pandas 0.24.2 Render a string representation of the Series. @@ -343,7 +350,6 @@ class Series(NDFrame): """ A hacked overridden version of pandas.io.formats.SeriesFormatter that writes correct length """ - def __init__(self, series, series_length, buf=None, length=True, header=True, index=True, na_rep='NaN', name=False, float_format=None, dtype=True, max_rows=None): diff --git a/eland/tests/client/test_mappings_pytest.py b/eland/tests/client/test_mappings_pytest.py index 5e50f17..c19d43f 100644 --- a/eland/tests/client/test_mappings_pytest.py +++ b/eland/tests/client/test_mappings_pytest.py @@ -1,5 +1,6 @@ # File called _pytest for PyCharm compatability +import numpy as np from pandas.util.testing import ( assert_series_equal, assert_frame_equal) @@ -88,3 +89,36 @@ class TestMapping(TestData): assert 'object' == field_capabilities['pd_dtype'] assert True == field_capabilities['searchable'] assert True == field_capabilities['aggregatable'] + + def test_generate_es_mappings(self): + df = pd.DataFrame(data={'A': np.random.rand(3), + 'B': 1, + 'C': 'foo', + 'D': pd.Timestamp('20190102'), + 'E': [1.0, 2.0, 3.0], + 'F': False, + 'G': [1, 2, 3]}, + index=['0','1','2']) + + expected_mappings = {'mappings': { + 'properties': {'A': {'type': 'double'}, + 'B': {'type': 'long'}, + 'C': {'type': 'keyword'}, + 'D': {'type': 'date'}, + 'E': {'type': 'double'}, + 'F': {'type': 'boolean'}, + 'G': {'type': 'long'}}}} + + mappings = ed.Mappings._generate_es_mappings(df) + + assert expected_mappings == mappings + + # Now create index + index_name = 'eland_test_generate_es_mappings' + + ed.pandas_to_es(df, ELASTICSEARCH_HOST, index_name, if_exists="replace", refresh=True) + + ed_df = ed.DataFrame(ELASTICSEARCH_HOST, index_name) + ed_df_head = ed_df.head() + + assert_frame_equal(df, ed_df_head) diff --git a/eland/tests/dataframe/test_basics_pytest.py b/eland/tests/dataframe/test_basics_pytest.py index 73e5fb1..dce92e9 100644 --- a/eland/tests/dataframe/test_basics_pytest.py +++ b/eland/tests/dataframe/test_basics_pytest.py @@ -153,3 +153,7 @@ class TestDataFrameBasics(TestData): ed_flights_timestamp.info() ed_flights.info() + def test_to_pandas(self): + ed_ecommerce_pd_df = self.ed_ecommerce().to_pandas() + + assert_frame_equal(self.pd_ecommerce(), ed_ecommerce_pd_df) diff --git a/eland/tests/dataframe/test_getitem_pytest.py b/eland/tests/dataframe/test_getitem_pytest.py index 0b6a5bf..e9b58ba 100644 --- a/eland/tests/dataframe/test_getitem_pytest.py +++ b/eland/tests/dataframe/test_getitem_pytest.py @@ -15,7 +15,7 @@ class TestDataFrameGetItem(TestData): ed_carrier = self.ed_flights()['Carrier'] # pandas returns a Series here - assert_frame_equal(pd.DataFrame(pd_carrier.head(100)), ed_carrier.head(100)) + assert_series_equal(pd_carrier.head(100), ed_carrier.head(100)) pd_3_items = self.pd_flights()[['Dest','Carrier','FlightDelay']] ed_3_items = self.ed_flights()[['Dest','Carrier','FlightDelay']] @@ -36,28 +36,12 @@ class TestDataFrameGetItem(TestData): def test_getattr_basic(self): # Test 1 attribute pd_carrier = self.pd_flights().Carrier - #ed_carrier = self.ed_flights().Carrier + ed_carrier = self.ed_flights().Carrier - print(type(pd_carrier)) - print(pd_carrier) + assert_series_equal(pd_carrier.head(100), ed_carrier.head(100)) - def test_boolean(self): - # Test 1 attribute - pd_carrier = self.pd_flights()['Carrier == "Kibana Airlines"'] - #ed_carrier = self.ed_flights().Carrier + pd_avgticketprice = self.pd_flights().AvgTicketPrice + ed_avgticketprice = self.ed_flights().AvgTicketPrice - print(type(pd_carrier)) - print(pd_carrier) - - - def test_loc(self): - pd = self.pd_flights().loc[10:15, ['Dest', 'Carrier']] - - print(type(pd)) - print(pd) - - pd = self.pd_flights().loc[10] - - print(type(pd)) - print(pd) + assert_series_equal(pd_avgticketprice.head(100), ed_avgticketprice.head(100)) diff --git a/eland/utils.py b/eland/utils.py index 99f597c..074785b 100644 --- a/eland/utils.py +++ b/eland/utils.py @@ -1,4 +1,73 @@ -import eland as ed +from eland import Client +from eland import DataFrame +from eland import Mappings def read_es(es_params, index_pattern): - return ed.DataFrame(client=es_params, index_pattern=index_pattern) + return DataFrame(client=es_params, index_pattern=index_pattern) + +def pandas_to_es(df, es_params, destination_index, if_exists='fail', chunk_size=10000, refresh=False): + """ + Append a pandas DataFrame to an Elasticsearch index. + Mainly used in testing. + + Parameters + ---------- + es_params : Elasticsearch client argument + elasticsearch-py parameters or + elasticsearch-py instance or + eland.Client instance + + destination_index : str + Name of Elasticsearch index to be written + + if_exists : str, default 'fail' + Behavior when the destination index exists. Value can be one of: + ``'fail'`` + If table exists, do nothing. + ``'replace'`` + If table exists, drop it, recreate it, and insert data. + ``'append'`` + If table exists, insert data. Create if does not exist. + """ + client = Client(es_params) + + mapping = Mappings._generate_es_mappings(df) + + # If table exists, check if_exists parameter + if client.indices().exists(destination_index): + if if_exists == "fail": + raise ValueError( + "Could not create the index [{0}] because it " + "already exists. " + "Change the if_exists parameter to " + "'append' or 'replace' data.".format(destination_index) + ) + elif if_exists == "replace": + client.indices().delete(destination_index) + client.indices().create(destination_index, mapping) + #elif if_exists == "append": + # TODO validate mapping is compatible + else: + client.indices().create(destination_index, mapping) + + # Now add data + actions = [] + n = 0 + for row in df.iterrows(): + # Use index as _id + id = row[0] + values = row[1].to_dict() + + # Use integer as id field for repeatable results + action = {'_index': destination_index, '_source': values, '_id': str(id)} + + actions.append(action) + + n = n + 1 + + if n % chunk_size == 0: + client.bulk(actions, refresh=refresh) + actions = [] + + client.bulk(actions, refresh=refresh) +