From 674ac129e675729ea8b050e831652079f66e23e2 Mon Sep 17 00:00:00 2001 From: Stephen Dodson Date: Tue, 18 Jun 2019 11:48:56 +0000 Subject: [PATCH] Renaming modules and added mapping store --- eland/DataFrame.py | 108 ---------- eland/__init__.py | 4 +- eland/{Client.py => client.py} | 6 +- eland/frame.py | 201 ++++++++++++++++++ eland/tests/__init__.py | 148 +++++++++++++ eland/tests/{dataframe => frame}/__init__.py | 0 eland/tests/{dataframe => frame}/common.py | 0 .../test_indexing_pytest.py | 14 +- .../{index_flights.py => setup_tests.py} | 21 +- 9 files changed, 379 insertions(+), 123 deletions(-) delete mode 100644 eland/DataFrame.py rename eland/{Client.py => client.py} (85%) create mode 100644 eland/frame.py rename eland/tests/{dataframe => frame}/__init__.py (100%) rename eland/tests/{dataframe => frame}/common.py (100%) rename eland/tests/{dataframe => frame}/test_indexing_pytest.py (65%) rename eland/tests/{index_flights.py => setup_tests.py} (73%) diff --git a/eland/DataFrame.py b/eland/DataFrame.py deleted file mode 100644 index e840c63..0000000 --- a/eland/DataFrame.py +++ /dev/null @@ -1,108 +0,0 @@ -import eland - -from elasticsearch import Elasticsearch -from elasticsearch_dsl import Search - -import json - -import pandas as pd - -class DataFrame(): - - def __init__(self, client, index_pattern): - self.client = eland.Client(client) - self.index_pattern = index_pattern - - self.client.indices().exists(index_pattern) - - @staticmethod - def _flatten_results(prefix, results, result): - # TODO - return prefix - - @staticmethod - def _es_results_to_pandas(results): - # TODO - resolve nested fields - rows = [] - for hit in results['hits']['hits']: - row = hit['_source'] - rows.append(row) - #return pd.DataFrame(data=rows) - # Converting the list of dicts to a dataframe doesn't convert datetimes - # effectively compared to read_json. TODO - https://github.com/elastic/eland/issues/2 - json_rows = json.dumps(rows) - return pd.read_json(json_rows) - - @staticmethod - def _flatten_mapping(prefix, properties, result): - for k, v in properties.items(): - if 'properties' in v: - if(prefix == ''): - prefix = k - else: - prefix = prefix + '.' + k - DataFrame._flatten_mapping(prefix, v['properties'], result) - else: - if(prefix == ''): - key = k - else: - key = prefix + '.' + k - type = v['type'] - result.append((key, type)) - - @staticmethod - def _es_mappings_to_pandas(mappings): - fields = [] - for index in mappings: - if 'properties' in mappings[index]['mappings']: - properties = mappings[index]['mappings']['properties'] - - DataFrame._flatten_mapping('', properties, fields) - - return pd.DataFrame(data=fields, columns=['field', 'datatype']) - - def head(self, n=5): - results = self.client.search(index=self.index_pattern, size=n) - - return DataFrame._es_results_to_pandas(results) - - def describe(self): - # First get all types - #mapping = self.client.indices().get_mapping(index=self.index_pattern) - mapping = self.client.indices().get_mapping(index=self.index_pattern) - - fields = DataFrame._es_mappings_to_pandas(mapping) - - # Get numeric types (https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#the-where-method-and-masking) - # https://www.elastic.co/guide/en/elasticsearch/reference/current/number.html - # TODO refactor this list out of method - numeric_fields = fields.query('datatype == ["long", "integer", "short", "byte", "double", "float", "half_float", "scaled_float"]') - - # for each field we copute: - # count, mean, std, min, 25%, 50%, 75%, max - search = Search(using=self.client, index=self.index_pattern).extra(size=0) - - for field in numeric_fields.field: - search.aggs.metric('extended_stats_'+field, 'extended_stats', field=field) - search.aggs.metric('percentiles_'+field, 'percentiles', field=field) - - response = search.execute() - - results = pd.DataFrame(index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max']) - - for field in numeric_fields.field: - values = [] - values.append(response.aggregations['extended_stats_'+field]['count']) - values.append(response.aggregations['extended_stats_'+field]['avg']) - values.append(response.aggregations['extended_stats_'+field]['std_deviation']) - values.append(response.aggregations['extended_stats_'+field]['min']) - values.append(response.aggregations['percentiles_'+field]['values']['25.0']) - values.append(response.aggregations['percentiles_'+field]['values']['50.0']) - values.append(response.aggregations['percentiles_'+field]['values']['75.0']) - values.append(response.aggregations['extended_stats_'+field]['max']) - - # if not None - if (values.count(None) < len(values)): - results = results.assign(**{field: values}) - - return results diff --git a/eland/__init__.py b/eland/__init__.py index a4723d1..f728686 100644 --- a/eland/__init__.py +++ b/eland/__init__.py @@ -1,3 +1,3 @@ from .utils import * -from .DataFrame import * -from .Client import * +from .frame import * +from .client import * diff --git a/eland/Client.py b/eland/client.py similarity index 85% rename from eland/Client.py rename to eland/client.py index e951d30..5e4e23b 100644 --- a/eland/Client.py +++ b/eland/client.py @@ -8,7 +8,7 @@ class Client(object): self.es = es else: self.es = Elasticsearch(es) - + def info(self): return self.es.info() @@ -17,3 +17,7 @@ class Client(object): def search(self, **kwargs): return self.es.search(**kwargs) + + def field_caps(self, **kwargs): + return self.es.field_caps(**kwargs) + diff --git a/eland/frame.py b/eland/frame.py new file mode 100644 index 0000000..d84f2b8 --- /dev/null +++ b/eland/frame.py @@ -0,0 +1,201 @@ +""" +DataFrame +--------- +An efficient 2D container for potentially mixed-type time series or other +labeled data series. + +The underlying data resides in Elasticsearch and the API aligns as much as +possible with pandas.DataFrame API. + +This allows the eland.DataFrame to access large datasets stored in Elasticsearch, +without storing the dataset in local memory. + +Implementation Details +---------------------- + +Elasticsearch indexes can be configured in many different ways, and these indexes +utilise different data structures to pandas.DataFrame. + +eland.DataFrame operations that return individual rows (e.g. df.head()) return +_source data. If _source is not enabled, this data is not accessible. + +Similarly, only Elasticsearch searchable fields can be searched or filtered, and +only Elasticsearch aggregatable fields can be aggregated or grouped. + +""" +import eland + +from elasticsearch import Elasticsearch +from elasticsearch_dsl import Search + +import pandas as pd + +class DataFrame(): + """ + pandas.DataFrame like API that proxies into Elasticsearch index(es). + + Parameters + ---------- + client : eland.Client + A reference to a Elasticsearch python client + + index_pattern : str + An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*). + + See Also + -------- + + Examples + -------- + + >>> import eland as ed + >>> client = ed.Client(Elasticsearch()) + >>> df = ed.DataFrame(client, 'reviews') + >>> df.head() + reviewerId vendorId rating date + 0 0 0 5 2006-04-07 17:08 + 1 1 1 5 2006-05-04 12:16 + 2 2 2 4 2006-04-21 12:26 + 3 3 3 5 2006-04-18 15:48 + 4 3 4 5 2006-04-18 15:49 + + Notice that the types are based on Elasticsearch mappings + + Notes + ----- + If the Elasticsearch index is deleted or index mappings are changed after this + object is created, the object is not rebuilt and so inconsistencies can occur. + + Mapping Elasticsearch types to pandas dtypes + -------------------------------------------- + + Elasticsearch field datatype | Pandas dtype + -- + text | object + keyword | object + long, integer, short, byte, binary | int64 + double, float, half_float, scaled_float | float64 + date, date_nanos | datetime64[ns] + boolean | bool + TODO - add additional mapping types + """ + def __init__(self, client, index_pattern): + self.client = eland.Client(client) + self.index_pattern = index_pattern + + # Get and persist mappings, this allows use to correctly + # map returned types from Elasticsearch to pandas datatypes + mapping = self.client.indices().get_mapping(index=self.index_pattern) + #field_caps = self.client.field_caps(index=self.index_pattern, fields='*') + + #self.fields, self.aggregatable_fields, self.searchable_fields = \ + # DataFrame._es_mappings_to_pandas(mapping, field_caps) + self.fields = DataFrame._es_mappings_to_pandas(mapping) + + @staticmethod + def _flatten_results(prefix, results, result): + # TODO + return prefix + + def _es_results_to_pandas(self, results): + # TODO - resolve nested fields + rows = [] + for hit in results['hits']['hits']: + row = hit['_source'] + rows.append(row) + + df = pd.DataFrame(data=rows) + + return df + + @staticmethod + def _extract_types_from_mapping(y): + """ + Extract data types from mapping for DataFrame columns. + + Elasticsearch _source data is transformed into pandas DataFrames. This strategy is not compatible + with all Elasticsearch configurations. Notes: + + - This strategy is not compatible with all Elasticsearch configurations. If _source is disabled + (https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-source-field.html#disable-source-field) + no data values will be populated + - Sub-fields (e.g. english.text in {"mappings":{"properties":{"text":{"type":"text","fields":{"english":{"type":"text","analyzer":"english"}}}}}}) + are not be used + """ + out = {} + + # Recurse until we get a 'type: xxx' - ignore sub-fields + def flatten(x, name=''): + if type(x) is dict: + for a in x: + if a == 'type' and type(x[a]) is str: # 'type' can be a name of a field + out[name[:-1]] = x[a] + if a == 'properties' or a == 'fields': + flatten(x[a], name) + else: + flatten(x[a], name + a + '.') + + flatten(y) + + return out + + @staticmethod + def _es_mappings_to_pandas(mappings): + fields = {} + for index in mappings: + if 'properties' in mappings[index]['mappings']: + properties = mappings[index]['mappings']['properties'] + + datatypes = DataFrame._extract_types_from_mapping(properties) + + # Note there could be conflicts here - e.g. the same field name with different semantics in + # different indexes - currently the last one wins TODO: review this + fields.update(datatypes) + + return pd.DataFrame.from_dict(data=fields, orient='index', columns=['datatype']) + + def head(self, n=5): + results = self.client.search(index=self.index_pattern, size=n) + + return self._es_results_to_pandas(results) + + def describe(self): + # First get all types + #mapping = self.client.indices().get_mapping(index=self.index_pattern) + mapping = self.client.indices().get_mapping(index=self.index_pattern) + + fields = DataFrame._es_mappings_to_pandas(mapping) + + # Get numeric types (https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#the-where-method-and-masking) + # https://www.elastic.co/guide/en/elasticsearch/reference/current/number.html + # TODO refactor this list out of method + numeric_fields = fields.query('datatype == ["long", "integer", "short", "byte", "double", "float", "half_float", "scaled_float"]') + + # for each field we copute: + # count, mean, std, min, 25%, 50%, 75%, max + search = search(using=self.client, index=self.index_pattern).extra(size=0) + + for field in numeric_fields.field: + search.aggs.metric('extended_stats_'+field, 'extended_stats', field=field) + search.aggs.metric('percentiles_'+field, 'percentiles', field=field) + + response = search.execute() + + results = pd.dataframe(index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max']) + + for field in numeric_fields.field: + values = [] + values.append(response.aggregations['extended_stats_'+field]['count']) + values.append(response.aggregations['extended_stats_'+field]['avg']) + values.append(response.aggregations['extended_stats_'+field]['std_deviation']) + values.append(response.aggregations['extended_stats_'+field]['min']) + values.append(response.aggregations['percentiles_'+field]['values']['25.0']) + values.append(response.aggregations['percentiles_'+field]['values']['50.0']) + values.append(response.aggregations['percentiles_'+field]['values']['75.0']) + values.append(response.aggregations['extended_stats_'+field]['max']) + + # if not None + if (values.count(None) < len(values)): + results = results.assign(**{field: values}) + + return results diff --git a/eland/tests/__init__.py b/eland/tests/__init__.py index 0d8dfdf..2308a2c 100644 --- a/eland/tests/__init__.py +++ b/eland/tests/__init__.py @@ -1,4 +1,5 @@ import os +import pandas as pd ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -10,3 +11,150 @@ FLIGHTS_FILE_NAME = ROOT_DIR + '/flights.json.gz' ECOMMERCE_INDEX_NAME = 'ecommerce' ECOMMERCE_FILE_NAME = ROOT_DIR + '/ecommerce.json.gz' + +TEST_MAPPING1 = { + 'mappings': { + 'properties': { + 'city': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'text': { + 'type': 'text', + 'fields': { + 'english': { + 'type': 'text', + 'analyzer': 'english' + } + } + }, + 'origin_location': { + 'properties': { + 'lat': { + 'type': 'text', + 'index_prefixes': {}, + 'fields': { + 'keyword': { + 'type': 'keyword', + 'ignore_above': 256 + } + } + }, + 'lon': { + 'type': 'text', + 'fields': { + 'keyword': { + 'type': 'keyword', + 'ignore_above': 256 + } + } + } + } + }, + 'maps-telemetry': { + 'properties': { + 'attributesPerMap': { + 'properties': { + 'dataSourcesCount': { + 'properties': { + 'avg': { + 'type': 'long' + }, + 'max': { + 'type': 'long' + }, + 'min': { + 'type': 'long' + } + } + }, + 'emsVectorLayersCount': { + 'dynamic': 'true', + 'properties': { + 'france_departments': { + 'properties': { + 'avg': { + 'type': 'float' + }, + 'max': { + 'type': 'long' + }, + 'min': { + 'type': 'long' + } + } + } + } + } + } + } + } + }, + 'type': { + 'type': 'keyword' + }, + 'name': { + 'type': 'text' + }, + 'user_name': { + 'type': 'keyword' + }, + 'email': { + 'type': 'keyword' + }, + 'content': { + 'type': 'text' + }, + 'tweeted_at': { + 'type': 'date' + }, + 'dest_location': { + 'type': 'geo_point' + }, + 'user': { + 'type': 'nested' + }, + 'my_join_field': { + 'type': 'join', + 'relations': { + 'question': ['answer', 'comment'], + 'answer': 'vote' + } + } + } + } + } + +TEST_MAPPING1_INDEX_NAME = 'mapping1' + +TEST_MAPPING1_EXPECTED = { + 'city': 'text', + 'city.raw': 'keyword', + 'content': 'text', + 'dest_location': 'geo_point', + 'email': 'keyword', + 'maps-telemetry.attributesPerMap.dataSourcesCount.avg': 'long', + 'maps-telemetry.attributesPerMap.dataSourcesCount.max': 'long', + 'maps-telemetry.attributesPerMap.dataSourcesCount.min': 'long', + 'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.avg': 'float', + 'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.max': 'long', + 'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.min': 'long', + 'my_join_field': 'join', + 'name': 'text', + 'origin_location.lat': 'text', + 'origin_location.lat.keyword': 'keyword', + 'origin_location.lon': 'text', + 'origin_location.lon.keyword': 'keyword', + 'text': 'text', + 'text.english': 'text', + 'tweeted_at': 'date', + 'type': 'keyword', + 'user': 'nested', + 'user_name': 'keyword' +} + +TEST_MAPPING1_EXPECTED_DF = pd.DataFrame.from_dict(data=TEST_MAPPING1_EXPECTED, orient='index', columns=['datatype']) diff --git a/eland/tests/dataframe/__init__.py b/eland/tests/frame/__init__.py similarity index 100% rename from eland/tests/dataframe/__init__.py rename to eland/tests/frame/__init__.py diff --git a/eland/tests/dataframe/common.py b/eland/tests/frame/common.py similarity index 100% rename from eland/tests/dataframe/common.py rename to eland/tests/frame/common.py diff --git a/eland/tests/dataframe/test_indexing_pytest.py b/eland/tests/frame/test_indexing_pytest.py similarity index 65% rename from eland/tests/dataframe/test_indexing_pytest.py rename to eland/tests/frame/test_indexing_pytest.py index 7fb94e7..6eacac1 100644 --- a/eland/tests/dataframe/test_indexing_pytest.py +++ b/eland/tests/frame/test_indexing_pytest.py @@ -1,5 +1,10 @@ # File called _pytest for PyCharm compatability -from eland.tests.dataframe.common import TestData +from eland.tests.frame.common import TestData + +from eland.tests import * + +import eland as ed +import pandas as pd from pandas.util.testing import ( assert_almost_equal, assert_frame_equal, assert_series_equal) @@ -15,10 +20,9 @@ class TestDataFrameIndexing(TestData): pd_ecommerce_head = self.pd_ecommerce().head() ed_ecommerce_head = self.ed_ecommerce().head() - #print(pd_ecommerce_head.dtypes) - #print(ed_ecommerce_head.dtypes) - assert_frame_equal(pd_ecommerce_head, ed_ecommerce_head) - + def test_mappings(self): + test_mapping1 = ed.read_es(ELASTICSEARCH_HOST, TEST_MAPPING1_INDEX_NAME) + assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, test_mapping1.fields) diff --git a/eland/tests/index_flights.py b/eland/tests/setup_tests.py similarity index 73% rename from eland/tests/index_flights.py rename to eland/tests/setup_tests.py index 20d7380..000c456 100644 --- a/eland/tests/index_flights.py +++ b/eland/tests/setup_tests.py @@ -2,24 +2,19 @@ import pandas as pd from elasticsearch import Elasticsearch from elasticsearch import helpers -from eland.tests import FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME - +from eland.tests import * DATA_LIST = [ (FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME), (ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME) ] -if __name__ == '__main__': - +def _setup_data(es): # Read json file and index records into Elasticsearch for data in DATA_LIST: json_file_name = data[0] index_name = data[1] - # Create connection to Elasticsearch - use defaults1 - es = Elasticsearch() - # Delete index print("Deleting index:", index_name) es.indices.delete(index=index_name, ignore=[400, 404]) @@ -49,3 +44,15 @@ if __name__ == '__main__': actions = [] print("Done", index_name) + +def _setup_test_mappings(es): + # Create a complex mapping containing many Elasticsearch features + es.indices.delete(index=TEST_MAPPING1_INDEX_NAME, ignore=[400, 404]) + es.indices.create(index=TEST_MAPPING1_INDEX_NAME, body=TEST_MAPPING1) + +if __name__ == '__main__': + # Create connection to Elasticsearch - use defaults + es = Elasticsearch(ELASTICSEARCH_HOST) + + _setup_data(es) + _setup_test_mappings(es)