diff --git a/eland/__init__.py b/eland/__init__.py index f728686..094154d 100644 --- a/eland/__init__.py +++ b/eland/__init__.py @@ -1,3 +1,4 @@ from .utils import * from .frame import * from .client import * +from .mappings import * diff --git a/eland/client.py b/eland/client.py index 5e4e23b..7ff114c 100644 --- a/eland/client.py +++ b/eland/client.py @@ -1,8 +1,9 @@ from elasticsearch import Elasticsearch -# eland client - implement as facade to control access to Elasticsearch methods -class Client(object): - +class Client(): + """ + eland client - implemented as facade to control access to Elasticsearch methods + """ def __init__(self, es=None): if isinstance(es, Elasticsearch): self.es = es @@ -20,4 +21,3 @@ class Client(object): def field_caps(self, **kwargs): return self.es.field_caps(**kwargs) - diff --git a/eland/frame.py b/eland/frame.py index d84f2b8..99fc895 100644 --- a/eland/frame.py +++ b/eland/frame.py @@ -23,7 +23,7 @@ Similarly, only Elasticsearch searchable fields can be searched or filtered, and only Elasticsearch aggregatable fields can be aggregated or grouped. """ -import eland +import eland as ed from elasticsearch import Elasticsearch from elasticsearch_dsl import Search @@ -66,94 +66,179 @@ class DataFrame(): If the Elasticsearch index is deleted or index mappings are changed after this object is created, the object is not rebuilt and so inconsistencies can occur. - Mapping Elasticsearch types to pandas dtypes - -------------------------------------------- - - Elasticsearch field datatype | Pandas dtype - -- - text | object - keyword | object - long, integer, short, byte, binary | int64 - double, float, half_float, scaled_float | float64 - date, date_nanos | datetime64[ns] - boolean | bool - TODO - add additional mapping types """ def __init__(self, client, index_pattern): - self.client = eland.Client(client) + self.client = ed.Client(client) self.index_pattern = index_pattern - # Get and persist mappings, this allows use to correctly + # Get and persist mappings, this allows us to correctly # map returned types from Elasticsearch to pandas datatypes - mapping = self.client.indices().get_mapping(index=self.index_pattern) - #field_caps = self.client.field_caps(index=self.index_pattern, fields='*') - - #self.fields, self.aggregatable_fields, self.searchable_fields = \ - # DataFrame._es_mappings_to_pandas(mapping, field_caps) - self.fields = DataFrame._es_mappings_to_pandas(mapping) - - @staticmethod - def _flatten_results(prefix, results, result): - # TODO - return prefix + self.mappings = ed.Mappings(self.client, self.index_pattern) def _es_results_to_pandas(self, results): - # TODO - resolve nested fields + """ + Parameters + ---------- + results: dict + Elasticsearch results from self.client.search + + Returns + ------- + df: pandas.DataFrame + _source values extracted from results and mapped to pandas DataFrame + dtypes are mapped via Mapping object + + Notes + ----- + Fields containing lists in Elasticsearch don't map easily to pandas.DataFrame + For example, an index with mapping: + ``` + "mappings" : { + "properties" : { + "group" : { + "type" : "keyword" + }, + "user" : { + "type" : "nested", + "properties" : { + "first" : { + "type" : "keyword" + }, + "last" : { + "type" : "keyword" + } + } + } + } + } + ``` + Adding a document: + ``` + "_source" : { + "group" : "amsterdam", + "user" : [ + { + "first" : "John", + "last" : "Smith" + }, + { + "first" : "Alice", + "last" : "White" + } + ] + } + ``` + (https://www.elastic.co/guide/en/elasticsearch/reference/current/nested.html) + this would be transformed internally (in Elasticsearch) into a document that looks more like this: + ``` + { + "group" : "amsterdam", + "user.first" : [ "alice", "john" ], + "user.last" : [ "smith", "white" ] + } + ``` + When mapping this a pandas data frame we mimic this transformation. + + Similarly, if a list is added to Elasticsearch: + ``` + PUT my_index/_doc/1 + { + "list" : [ + 0, 1, 2 + ] + } + ``` + The mapping is: + ``` + "mappings" : { + "properties" : { + "user" : { + "type" : "long" + } + } + } + ``` + TODO - explain how lists are handled (https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html) + TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great) + NOTE - using this lists is generally not a good way to use this API + """ + def flatten_dict(y): + out = {} + + def flatten(x, name=''): + # We flatten into source fields e.g. if type=geo_point + # location: {lat=52.38, lon=4.90} + if name == '': + is_source_field = False + pd_dtype = 'object' + else: + is_source_field, pd_dtype = self.mappings.is_source_field(name[:-1]) + + if not is_source_field and type(x) is dict: + for a in x: + flatten(x[a], name + a + '.') + elif not is_source_field and type(x) is list: + for a in x: + flatten(a, name) + else: + field_name = name[:-1] + + # Coerce type + if pd_dtype == 'datetime64': + x = pd.to_datetime(x) + print(field_name, pd_dtype, x, type(x)) + + # Elasticsearch can have multiple values for a field. These are represented as lists, so + # create lists for this pivot (see notes above) + if field_name in out: + if type(out[field_name]) is not list: + l = [out[field_name]] + out[field_name] = l + out[field_name].append(x) + else: + out[field_name] = x + + flatten(y) + + return out + rows = [] + i = 0 for hit in results['hits']['hits']: row = hit['_source'] - rows.append(row) + # flatten row to map correctly to 2D DataFrame + rows.append(flatten_dict(row)) + + i = i + 1 + if i % 100 == 0: + print(i) + + # Create pandas DataFrame df = pd.DataFrame(data=rows) + """ + # Coerce types + pd_dtypes = self.mappings.source_fields_pd_dtypes() + + # This returns types such as: + # { + # 'bool': Index(['Cancelled', 'FlightDelay'], dtype='object'), + # 'datetime64[ns]': Index(['timestamp'], dtype='object'), + # 'float64': Index(['AvgTicketPrice', 'DistanceKilometers', 'DistanceMiles',... + # } + + for pd_dtype, value in pd_dtypes.items(): + # Types generally convert well e.g. 1,2,3 -> int64, 1.1,2.2,3.3 -> float64 + # so to minimise work we only convert special types. + # TODO - add option to force all conversion + if pd_dtype == 'datetime64': + print(df.loc[:,value.tolist()]) + df.loc[:,value.tolist()] = df.loc[:,value.tolist()].astype('datetime64') + """ + return df - @staticmethod - def _extract_types_from_mapping(y): - """ - Extract data types from mapping for DataFrame columns. - - Elasticsearch _source data is transformed into pandas DataFrames. This strategy is not compatible - with all Elasticsearch configurations. Notes: - - - This strategy is not compatible with all Elasticsearch configurations. If _source is disabled - (https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-source-field.html#disable-source-field) - no data values will be populated - - Sub-fields (e.g. english.text in {"mappings":{"properties":{"text":{"type":"text","fields":{"english":{"type":"text","analyzer":"english"}}}}}}) - are not be used - """ - out = {} - - # Recurse until we get a 'type: xxx' - ignore sub-fields - def flatten(x, name=''): - if type(x) is dict: - for a in x: - if a == 'type' and type(x[a]) is str: # 'type' can be a name of a field - out[name[:-1]] = x[a] - if a == 'properties' or a == 'fields': - flatten(x[a], name) - else: - flatten(x[a], name + a + '.') - - flatten(y) - - return out - - @staticmethod - def _es_mappings_to_pandas(mappings): - fields = {} - for index in mappings: - if 'properties' in mappings[index]['mappings']: - properties = mappings[index]['mappings']['properties'] - - datatypes = DataFrame._extract_types_from_mapping(properties) - - # Note there could be conflicts here - e.g. the same field name with different semantics in - # different indexes - currently the last one wins TODO: review this - fields.update(datatypes) - - return pd.DataFrame.from_dict(data=fields, orient='index', columns=['datatype']) - def head(self, n=5): results = self.client.search(index=self.index_pattern, size=n) @@ -161,8 +246,6 @@ class DataFrame(): def describe(self): # First get all types - #mapping = self.client.indices().get_mapping(index=self.index_pattern) - mapping = self.client.indices().get_mapping(index=self.index_pattern) fields = DataFrame._es_mappings_to_pandas(mapping) diff --git a/eland/mappings.py b/eland/mappings.py new file mode 100644 index 0000000..5179b22 --- /dev/null +++ b/eland/mappings.py @@ -0,0 +1,286 @@ +import warnings +import pandas as pd + +class Mappings(): + """ + General purpose to manage Elasticsearch to/from pandas mappings + + Attributes + ---------- + + mappings_capabilities: pandas.DataFrame + A data frame summarising the capabilities of the index mapping + + _source - is top level field (i.e. not a multi-field sub-field) + es_dtype - Elasticsearch field datatype + pd_dtype - Pandas datatype + searchable - is the field searchable? + aggregatable- is the field aggregatable? + _source es_dtype pd_dtype searchable aggregatable + maps-telemetry.min True long int64 True True + maps-telemetry.avg True float float64 True True + city True text object True False + user_name True keyword object True True + origin_location.lat.keyword False keyword object True True + type True keyword object True True + origin_location.lat True text object True False + + """ + def __init__(self, client, index_pattern): + """ + Parameters + ---------- + client: eland.Client + Elasticsearch client + + index_pattern: str + Elasticsearch index pattern + """ + # persist index_pattern for debugging + self.index_pattern = index_pattern + + mappings = client.indices().get_mapping(index=index_pattern) + + # Get all fields (including all nested) and then field_caps + # for these names (fields=* doesn't appear to work effectively...) + all_fields = Mappings._extract_fields_from_mapping(mappings) + all_fields_caps = client.field_caps(index=index_pattern, fields=list(all_fields.keys())) + + # Get top level (not sub-field multifield) mappings + source_fields = Mappings._extract_fields_from_mapping(mappings, source_only=True) + + # Populate capability matrix of fields + # field_name, es_dtype, pd_dtype, is_searchable, is_aggregtable, is_source + self.mappings_capabilities = Mappings._create_capability_matrix(all_fields, source_fields, all_fields_caps) + + # Cache source field types for efficient lookup + # (this massively improves performance of DataFrame.flatten) + self.source_field_pd_dtypes = {} + + for field_name in source_fields: + pd_dtype = self.mappings_capabilities.loc[field_name]['pd_dtype'] + self.source_field_pd_dtypes[field_name] = pd_dtype + + def _extract_fields_from_mapping(mappings, source_only=False): + """ + Extract all field names and types from a mapping. + ``` + { + "my_index": { + "mappings": { + "properties": { + "city": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + } + } + } + } + } + ``` + if source_only == False: + return {'city': 'text', 'city.keyword': 'keyword'} + else: + return {'city': 'text'} + + Note: first field name type wins. E.g. + + ``` + PUT my_index1 {"mappings":{"properties":{"city":{"type":"text"}}}} + PUT my_index2 {"mappings":{"properties":{"city":{"type":"long"}}}} + + Returns {'city': 'text'} + ``` + + Parameters + ---------- + mappings: dict + Return from get_mapping + + Returns + ------- + fields: dict + Dict of field names and types + + """ + fields = {} + + # Recurse until we get a 'type: xxx' + def flatten(x, name=''): + if type(x) is dict: + for a in x: + if a == 'type' and type(x[a]) is str: # 'type' can be a name of a field + field_name = name[:-1] + field_type = x[a] + + # If there is a conflicting type, warn - first values added wins + if field_name in fields and fields[field_name] != field_type: + warnings.warn("Field {} has conflicting types {} != {}". + format(field_name, fields[field_name], field_type), + UserWarning) + else: + fields[field_name] = field_type + elif a == 'properties' or (not source_only and a == 'fields'): + flatten(x[a], name) + elif not (source_only and a == 'fields'): # ignore multi-field fields for source_only + flatten(x[a], name + a + '.') + + for index in mappings: + if 'properties' in mappings[index]['mappings']: + properties = mappings[index]['mappings']['properties'] + + flatten(properties) + + return fields + + def _create_capability_matrix(all_fields, source_fields, all_fields_caps): + """ + { + "fields": { + "rating": { + "long": { + "searchable": true, + "aggregatable": false, + "indices": ["index1", "index2"], + "non_aggregatable_indices": ["index1"] + }, + "keyword": { + "searchable": false, + "aggregatable": true, + "indices": ["index3", "index4"], + "non_searchable_indices": ["index4"] + } + }, + "title": { + "text": { + "searchable": true, + "aggregatable": false + + } + } + } + } + """ + all_fields_caps_fields = all_fields_caps['fields'] + + columns = ['_source', 'es_dtype', 'pd_dtype', 'searchable', 'aggregatable'] + capability_matrix = {} + + for field, field_caps in all_fields_caps_fields.items(): + if field in all_fields: + # v = {'long': {'type': 'long', 'searchable': True, 'aggregatable': True}} + for kk, vv in field_caps.items(): + _source = (field in source_fields) + es_dtype = vv['type'] + pd_dtype = Mappings._es_dtype_to_pd_dtype(vv['type']) + searchable = vv['searchable'] + aggregatable = vv['aggregatable'] + + caps = [_source, es_dtype, pd_dtype, searchable, aggregatable] + + capability_matrix[field] = caps + + if 'non_aggregatable_indices' in vv: + warnings.warn("Field {} has conflicting aggregatable fields across indexes {}", + format(field_name, vv['non_aggregatable_indices']), + UserWarning) + if 'non_searchable_indices' in vv: + warnings.warn("Field {} has conflicting searchable fields across indexes {}", + format(field_name, vv['non_searchable_indices']), + UserWarning) + + capability_matrix_df = pd.DataFrame.from_dict(capability_matrix, orient='index', columns=columns) + + return capability_matrix_df.sort_index() + + def _es_dtype_to_pd_dtype(es_dtype): + """ + Mapping Elasticsearch types to pandas dtypes + -------------------------------------------- + + Elasticsearch field datatype | Pandas dtype + -- + text | object + keyword | object + long, integer, short, byte, binary | int64 + double, float, half_float, scaled_float | float64 + date, date_nanos | datetime64 + boolean | bool + TODO - add additional mapping types + """ + es_dtype_to_pd_dtype = { + 'text': 'object', + 'keyword': 'object', + + 'long': 'int64', + 'integer': 'int64', + 'short': 'int64', + 'byte': 'int64', + 'binary': 'int64', + + 'double': 'float64', + 'float': 'float64', + 'half_float': 'float64', + 'scaled_float': 'float64', + + 'date': 'datetime64', + 'date_nanos': 'datetime64', + + 'boolean': 'bool' + } + + if es_dtype in es_dtype_to_pd_dtype: + return es_dtype_to_pd_dtype[es_dtype] + + # Return 'object' for all unsupported TODO - investigate how different types could be supported + return 'object' + + def all_fields(self): + """ + Returns + ------- + all_fields: list + All typed fields in the index mapping + """ + return self.mappings_capabilities.index.tolist() + + def source_fields_pd_dtypes(self): + """ + Returns + ------- + groups: dict + Calls pandas.core.groupby.GroupBy.groups for _source fields + E.g. + { + 'bool': Index(['Cancelled', 'FlightDelay'], dtype='object'), + 'datetime64[ns]': Index(['timestamp'], dtype='object'), + 'float64': Index(['AvgTicketPrice', 'DistanceKilometers', 'DistanceMiles',... + } + """ + return self.mappings_capabilities[self.mappings_capabilities._source == True].groupby('pd_dtype').groups + + def is_source_field(self, field_name): + """ + Parameters + ---------- + field_name: str + + Returns + ------- + is_source_field: bool + Is this field name a top-level source field? + pd_dtype: str + The pandas data type we map to + """ + pd_dtype = 'object' + is_source_field = False + + if field_name in self.source_field_pd_dtypes: + is_source_field = True + pd_dtype = self.source_field_pd_dtypes[field_name] + + return is_source_field, pd_dtype diff --git a/eland/tests/__init__.py b/eland/tests/__init__.py index 2308a2c..0b466e5 100644 --- a/eland/tests/__init__.py +++ b/eland/tests/__init__.py @@ -7,10 +7,294 @@ ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) ELASTICSEARCH_HOST = 'localhost' # TODO externalise this FLIGHTS_INDEX_NAME = 'flights' +FLIGHTS_MAPPING = { "mappings" : { + "properties" : { + "AvgTicketPrice" : { + "type" : "float" + }, + "Cancelled" : { + "type" : "boolean" + }, + "Carrier" : { + "type" : "keyword" + }, + "Dest" : { + "type" : "keyword" + }, + "DestAirportID" : { + "type" : "keyword" + }, + "DestCityName" : { + "type" : "keyword" + }, + "DestCountry" : { + "type" : "keyword" + }, + "DestLocation" : { + "type" : "geo_point" + }, + "DestRegion" : { + "type" : "keyword" + }, + "DestWeather" : { + "type" : "keyword" + }, + "DistanceKilometers" : { + "type" : "float" + }, + "DistanceMiles" : { + "type" : "float" + }, + "FlightDelay" : { + "type" : "boolean" + }, + "FlightDelayMin" : { + "type" : "integer" + }, + "FlightDelayType" : { + "type" : "keyword" + }, + "FlightNum" : { + "type" : "keyword" + }, + "FlightTimeHour" : { + "type" : "keyword" + }, + "FlightTimeMin" : { + "type" : "float" + }, + "Origin" : { + "type" : "keyword" + }, + "OriginAirportID" : { + "type" : "keyword" + }, + "OriginCityName" : { + "type" : "keyword" + }, + "OriginCountry" : { + "type" : "keyword" + }, + "OriginLocation" : { + "type" : "geo_point" + }, + "OriginRegion" : { + "type" : "keyword" + }, + "OriginWeather" : { + "type" : "keyword" + }, + "dayOfWeek" : { + "type" : "integer" + }, + "timestamp" : { + "type" : "date" + } + } + } } FLIGHTS_FILE_NAME = ROOT_DIR + '/flights.json.gz' ECOMMERCE_INDEX_NAME = 'ecommerce' +ECOMMERCE_MAPPING = { "mappings" : { + "properties" : { + "category" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "currency" : { + "type" : "keyword" + }, + "customer_birth_date" : { + "type" : "date" + }, + "customer_first_name" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "customer_full_name" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "customer_gender" : { + "type" : "keyword" + }, + "customer_id" : { + "type" : "keyword" + }, + "customer_last_name" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "customer_phone" : { + "type" : "keyword" + }, + "day_of_week" : { + "type" : "keyword" + }, + "day_of_week_i" : { + "type" : "integer" + }, + "email" : { + "type" : "keyword" + }, + "geoip" : { + "properties" : { + "city_name" : { + "type" : "keyword" + }, + "continent_name" : { + "type" : "keyword" + }, + "country_iso_code" : { + "type" : "keyword" + }, + "location" : { + "type" : "geo_point" + }, + "region_name" : { + "type" : "keyword" + } + } + }, + "manufacturer" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "order_date" : { + "type" : "date" + }, + "order_id" : { + "type" : "keyword" + }, + "products" : { + "properties" : { + "_id" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "base_price" : { + "type" : "half_float" + }, + "base_unit_price" : { + "type" : "half_float" + }, + "category" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "created_on" : { + "type" : "date" + }, + "discount_amount" : { + "type" : "half_float" + }, + "discount_percentage" : { + "type" : "half_float" + }, + "manufacturer" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "min_price" : { + "type" : "half_float" + }, + "price" : { + "type" : "half_float" + }, + "product_id" : { + "type" : "long" + }, + "product_name" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + }, + "analyzer" : "english" + }, + "quantity" : { + "type" : "integer" + }, + "sku" : { + "type" : "keyword" + }, + "tax_amount" : { + "type" : "half_float" + }, + "taxful_price" : { + "type" : "half_float" + }, + "taxless_price" : { + "type" : "half_float" + }, + "unit_discount_amount" : { + "type" : "half_float" + } + } + }, + "sku" : { + "type" : "keyword" + }, + "taxful_total_price" : { + "type" : "half_float" + }, + "taxless_total_price" : { + "type" : "half_float" + }, + "total_quantity" : { + "type" : "integer" + }, + "total_unique_products" : { + "type" : "integer" + }, + "type" : { + "type" : "keyword" + }, + "user" : { + "type" : "keyword" + } + } + } } ECOMMERCE_FILE_NAME = ROOT_DIR + '/ecommerce.json.gz' +ECOMMERCE_DF_FILE_NAME = ROOT_DIR + '/ecommerce_df.json.gz' +ECOMMERCE_DATETIME_FIELD = 'order_date' TEST_MAPPING1 = { 'mappings': { @@ -115,9 +399,6 @@ TEST_MAPPING1 = { 'dest_location': { 'type': 'geo_point' }, - 'user': { - 'type': 'nested' - }, 'my_join_field': { 'type': 'join', 'relations': { @@ -153,8 +434,48 @@ TEST_MAPPING1_EXPECTED = { 'text.english': 'text', 'tweeted_at': 'date', 'type': 'keyword', - 'user': 'nested', 'user_name': 'keyword' } -TEST_MAPPING1_EXPECTED_DF = pd.DataFrame.from_dict(data=TEST_MAPPING1_EXPECTED, orient='index', columns=['datatype']) +TEST_MAPPING1_EXPECTED_DF = pd.DataFrame.from_dict(data=TEST_MAPPING1_EXPECTED, orient='index', columns=['es_dtype']) + +TEST_NESTED_USER_GROUP_INDEX_NAME = 'nested_user_group' +TEST_NESTED_USER_GROUP_MAPPING = { + 'mappings': { + 'properties': { + 'group': { + 'type': 'keyword' + }, + 'user': { + 'properties': { + 'first': { + 'type': 'keyword' + }, + 'last': { + 'type': 'keyword' + }, + 'address' : { + 'type' : 'keyword' + } + } + } + } +} +} + +TEST_NESTED_USER_GROUP_DOCS = [ +{'_index':TEST_NESTED_USER_GROUP_INDEX_NAME, +'_source': + {'group':'amsterdam','user':[ + {'first':'Manke','last':'Nelis','address':['Elandsgracht', 'Amsterdam']}, + {'first':'Johnny','last':'Jordaan','address':['Elandsstraat', 'Amsterdam']}]}}, +{'_index':TEST_NESTED_USER_GROUP_INDEX_NAME, +'_source': + {'group':'london','user':[ + {'first':'Alice','last':'Monkton'}, + {'first':'Jimmy','last':'White','address':['London']}]}}, +{'_index':TEST_NESTED_USER_GROUP_INDEX_NAME, +'_source':{'group':'new york','user':[ + {'first':'Bill','last':'Jones'}]}} +] + diff --git a/eland/tests/client/__init__.py b/eland/tests/client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/eland/tests/client/test_mappings_pytest.py b/eland/tests/client/test_mappings_pytest.py new file mode 100644 index 0000000..3814221 --- /dev/null +++ b/eland/tests/client/test_mappings_pytest.py @@ -0,0 +1,20 @@ +# File called _pytest for PyCharm compatability +import pytest + +from eland.tests import * + +from pandas.util.testing import ( + assert_almost_equal, assert_frame_equal, assert_series_equal) + +import eland as ed + +class TestMapping(): + + # Requires 'setup_tests.py' to be run prior to this + def test_mapping(self): + mapping = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME) + + assert mapping.all_fields() == TEST_MAPPING1_EXPECTED_DF.index.tolist() + + assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mapping.mappings_capabilities['es_dtype'])) + diff --git a/eland/tests/ecommerce_df.json.gz b/eland/tests/ecommerce_df.json.gz new file mode 100644 index 0000000..7000209 Binary files /dev/null and b/eland/tests/ecommerce_df.json.gz differ diff --git a/eland/tests/frame/common.py b/eland/tests/frame/common.py index d0b1ef0..64d9854 100644 --- a/eland/tests/frame/common.py +++ b/eland/tests/frame/common.py @@ -10,12 +10,15 @@ ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) # Create pandas and eland data frames from eland.tests import ELASTICSEARCH_HOST -from eland.tests import FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME +from eland.tests import FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, ECOMMERCE_DF_FILE_NAME, ECOMMERCE_INDEX_NAME, \ + ECOMMERCE_DATETIME_FIELD _pd_flights = pd.read_json(FLIGHTS_FILE_NAME, lines=True) _ed_flights = ed.read_es(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME) -_pd_ecommerce = pd.read_json(ECOMMERCE_FILE_NAME, lines=True) +_pd_ecommerce = pd.read_json(ECOMMERCE_DF_FILE_NAME).sort_index() +_pd_ecommerce[ECOMMERCE_DATETIME_FIELD] = \ + pd.to_datetime(_pd_ecommerce[ECOMMERCE_DATETIME_FIELD]) _ed_ecommerce = ed.read_es(ELASTICSEARCH_HOST, ECOMMERCE_INDEX_NAME) class TestData: diff --git a/eland/tests/frame/test_indexing_pytest.py b/eland/tests/frame/test_indexing_pytest.py index 6eacac1..c973258 100644 --- a/eland/tests/frame/test_indexing_pytest.py +++ b/eland/tests/frame/test_indexing_pytest.py @@ -1,6 +1,5 @@ # File called _pytest for PyCharm compatability from eland.tests.frame.common import TestData - from eland.tests import * import eland as ed @@ -11,18 +10,42 @@ from pandas.util.testing import ( class TestDataFrameIndexing(TestData): + def test_results(self): + test = ed.read_es(ELASTICSEARCH_HOST, TEST_NESTED_USER_GROUP_INDEX_NAME) + + print(test.mappings.mappings_capabilities) + + pd.set_option('display.max_rows', 500) + pd.set_option('display.max_columns', 500) + pd.set_option('display.width', 1000) + + print(test.head()) + + def test_head(self): + pd.set_option('display.max_rows', 500) + pd.set_option('display.max_columns', 500) + pd.set_option('display.width', 1000) + + """ pd_flights_head = self.pd_flights().head() ed_flights_head = self.ed_flights().head() assert_frame_equal(pd_flights_head, ed_flights_head) + """ pd_ecommerce_head = self.pd_ecommerce().head() ed_ecommerce_head = self.ed_ecommerce().head() + print(self.ed_ecommerce().mappings.source_fields_pd_dtypes()) + + print(ed_ecommerce_head.dtypes) + print(pd_ecommerce_head.dtypes) + + #print(ed_ecommerce_head) + assert_frame_equal(pd_ecommerce_head, ed_ecommerce_head) def test_mappings(self): test_mapping1 = ed.read_es(ELASTICSEARCH_HOST, TEST_MAPPING1_INDEX_NAME) assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, test_mapping1.fields) - diff --git a/eland/tests/setup_tests.py b/eland/tests/setup_tests.py index 000c456..b758a26 100644 --- a/eland/tests/setup_tests.py +++ b/eland/tests/setup_tests.py @@ -5,8 +5,8 @@ from elasticsearch import helpers from eland.tests import * DATA_LIST = [ - (FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME), - (ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME) + (FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, FLIGHTS_MAPPING), + (ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME, ECOMMERCE_MAPPING) ] def _setup_data(es): @@ -14,10 +14,13 @@ def _setup_data(es): for data in DATA_LIST: json_file_name = data[0] index_name = data[1] + mapping = data[2] # Delete index print("Deleting index:", index_name) es.indices.delete(index=index_name, ignore=[400, 404]) + print("Creating index:", index_name) + es.indices.create(index=index_name, body=mapping) df = pd.read_json(json_file_name, lines=True) @@ -50,9 +53,16 @@ def _setup_test_mappings(es): es.indices.delete(index=TEST_MAPPING1_INDEX_NAME, ignore=[400, 404]) es.indices.create(index=TEST_MAPPING1_INDEX_NAME, body=TEST_MAPPING1) +def _setup_test_nested(es): + es.indices.delete(index=TEST_NESTED_USER_GROUP_INDEX_NAME, ignore=[400, 404]) + es.indices.create(index=TEST_NESTED_USER_GROUP_INDEX_NAME, body=TEST_NESTED_USER_GROUP_MAPPING) + + helpers.bulk(es, TEST_NESTED_USER_GROUP_DOCS) + if __name__ == '__main__': # Create connection to Elasticsearch - use defaults es = Elasticsearch(ELASTICSEARCH_HOST) _setup_data(es) _setup_test_mappings(es) + _setup_test_nested(es)