diff --git a/eland/DataFrame.py b/eland/DataFrame.py deleted file mode 100644 index e840c63..0000000 --- a/eland/DataFrame.py +++ /dev/null @@ -1,108 +0,0 @@ -import eland - -from elasticsearch import Elasticsearch -from elasticsearch_dsl import Search - -import json - -import pandas as pd - -class DataFrame(): - - def __init__(self, client, index_pattern): - self.client = eland.Client(client) - self.index_pattern = index_pattern - - self.client.indices().exists(index_pattern) - - @staticmethod - def _flatten_results(prefix, results, result): - # TODO - return prefix - - @staticmethod - def _es_results_to_pandas(results): - # TODO - resolve nested fields - rows = [] - for hit in results['hits']['hits']: - row = hit['_source'] - rows.append(row) - #return pd.DataFrame(data=rows) - # Converting the list of dicts to a dataframe doesn't convert datetimes - # effectively compared to read_json. TODO - https://github.com/elastic/eland/issues/2 - json_rows = json.dumps(rows) - return pd.read_json(json_rows) - - @staticmethod - def _flatten_mapping(prefix, properties, result): - for k, v in properties.items(): - if 'properties' in v: - if(prefix == ''): - prefix = k - else: - prefix = prefix + '.' + k - DataFrame._flatten_mapping(prefix, v['properties'], result) - else: - if(prefix == ''): - key = k - else: - key = prefix + '.' + k - type = v['type'] - result.append((key, type)) - - @staticmethod - def _es_mappings_to_pandas(mappings): - fields = [] - for index in mappings: - if 'properties' in mappings[index]['mappings']: - properties = mappings[index]['mappings']['properties'] - - DataFrame._flatten_mapping('', properties, fields) - - return pd.DataFrame(data=fields, columns=['field', 'datatype']) - - def head(self, n=5): - results = self.client.search(index=self.index_pattern, size=n) - - return DataFrame._es_results_to_pandas(results) - - def describe(self): - # First get all types - #mapping = self.client.indices().get_mapping(index=self.index_pattern) - mapping = self.client.indices().get_mapping(index=self.index_pattern) - - fields = DataFrame._es_mappings_to_pandas(mapping) - - # Get numeric types (https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#the-where-method-and-masking) - # https://www.elastic.co/guide/en/elasticsearch/reference/current/number.html - # TODO refactor this list out of method - numeric_fields = fields.query('datatype == ["long", "integer", "short", "byte", "double", "float", "half_float", "scaled_float"]') - - # for each field we copute: - # count, mean, std, min, 25%, 50%, 75%, max - search = Search(using=self.client, index=self.index_pattern).extra(size=0) - - for field in numeric_fields.field: - search.aggs.metric('extended_stats_'+field, 'extended_stats', field=field) - search.aggs.metric('percentiles_'+field, 'percentiles', field=field) - - response = search.execute() - - results = pd.DataFrame(index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max']) - - for field in numeric_fields.field: - values = [] - values.append(response.aggregations['extended_stats_'+field]['count']) - values.append(response.aggregations['extended_stats_'+field]['avg']) - values.append(response.aggregations['extended_stats_'+field]['std_deviation']) - values.append(response.aggregations['extended_stats_'+field]['min']) - values.append(response.aggregations['percentiles_'+field]['values']['25.0']) - values.append(response.aggregations['percentiles_'+field]['values']['50.0']) - values.append(response.aggregations['percentiles_'+field]['values']['75.0']) - values.append(response.aggregations['extended_stats_'+field]['max']) - - # if not None - if (values.count(None) < len(values)): - results = results.assign(**{field: values}) - - return results diff --git a/eland/__init__.py b/eland/__init__.py index a4723d1..094154d 100644 --- a/eland/__init__.py +++ b/eland/__init__.py @@ -1,3 +1,4 @@ from .utils import * -from .DataFrame import * -from .Client import * +from .frame import * +from .client import * +from .mappings import * diff --git a/eland/Client.py b/eland/client.py similarity index 65% rename from eland/Client.py rename to eland/client.py index e951d30..7ff114c 100644 --- a/eland/Client.py +++ b/eland/client.py @@ -1,14 +1,15 @@ from elasticsearch import Elasticsearch -# eland client - implement as facade to control access to Elasticsearch methods -class Client(object): - +class Client(): + """ + eland client - implemented as facade to control access to Elasticsearch methods + """ def __init__(self, es=None): if isinstance(es, Elasticsearch): self.es = es else: self.es = Elasticsearch(es) - + def info(self): return self.es.info() @@ -17,3 +18,6 @@ class Client(object): def search(self, **kwargs): return self.es.search(**kwargs) + + def field_caps(self, **kwargs): + return self.es.field_caps(**kwargs) diff --git a/eland/frame.py b/eland/frame.py new file mode 100644 index 0000000..47b331a --- /dev/null +++ b/eland/frame.py @@ -0,0 +1,253 @@ +""" +DataFrame +--------- +An efficient 2D container for potentially mixed-type time series or other +labeled data series. + +The underlying data resides in Elasticsearch and the API aligns as much as +possible with pandas.DataFrame API. + +This allows the eland.DataFrame to access large datasets stored in Elasticsearch, +without storing the dataset in local memory. + +Implementation Details +---------------------- + +Elasticsearch indexes can be configured in many different ways, and these indexes +utilise different data structures to pandas.DataFrame. + +eland.DataFrame operations that return individual rows (e.g. df.head()) return +_source data. If _source is not enabled, this data is not accessible. + +Similarly, only Elasticsearch searchable fields can be searched or filtered, and +only Elasticsearch aggregatable fields can be aggregated or grouped. + +""" +import eland as ed + +from elasticsearch import Elasticsearch +from elasticsearch_dsl import Search + +import pandas as pd + +class DataFrame(): + """ + pandas.DataFrame like API that proxies into Elasticsearch index(es). + + Parameters + ---------- + client : eland.Client + A reference to a Elasticsearch python client + + index_pattern : str + An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*). + + See Also + -------- + + Examples + -------- + + import eland as ed + client = ed.Client(Elasticsearch()) + df = ed.DataFrame(client, 'reviews') + df.head() + reviewerId vendorId rating date + 0 0 0 5 2006-04-07 17:08 + 1 1 1 5 2006-05-04 12:16 + 2 2 2 4 2006-04-21 12:26 + 3 3 3 5 2006-04-18 15:48 + 4 3 4 5 2006-04-18 15:49 + + Notice that the types are based on Elasticsearch mappings + + Notes + ----- + If the Elasticsearch index is deleted or index mappings are changed after this + object is created, the object is not rebuilt and so inconsistencies can occur. + + """ + def __init__(self, client, index_pattern): + self.client = ed.Client(client) + self.index_pattern = index_pattern + + # Get and persist mappings, this allows us to correctly + # map returned types from Elasticsearch to pandas datatypes + self.mappings = ed.Mappings(self.client, self.index_pattern) + + def _es_results_to_pandas(self, results): + """ + Parameters + ---------- + results: dict + Elasticsearch results from self.client.search + + Returns + ------- + df: pandas.DataFrame + _source values extracted from results and mapped to pandas DataFrame + dtypes are mapped via Mapping object + + Notes + ----- + Fields containing lists in Elasticsearch don't map easily to pandas.DataFrame + For example, an index with mapping: + ``` + "mappings" : { + "properties" : { + "group" : { + "type" : "keyword" + }, + "user" : { + "type" : "nested", + "properties" : { + "first" : { + "type" : "keyword" + }, + "last" : { + "type" : "keyword" + } + } + } + } + } + ``` + Adding a document: + ``` + "_source" : { + "group" : "amsterdam", + "user" : [ + { + "first" : "John", + "last" : "Smith" + }, + { + "first" : "Alice", + "last" : "White" + } + ] + } + ``` + (https://www.elastic.co/guide/en/elasticsearch/reference/current/nested.html) + this would be transformed internally (in Elasticsearch) into a document that looks more like this: + ``` + { + "group" : "amsterdam", + "user.first" : [ "alice", "john" ], + "user.last" : [ "smith", "white" ] + } + ``` + When mapping this a pandas data frame we mimic this transformation. + + Similarly, if a list is added to Elasticsearch: + ``` + PUT my_index/_doc/1 + { + "list" : [ + 0, 1, 2 + ] + } + ``` + The mapping is: + ``` + "mappings" : { + "properties" : { + "user" : { + "type" : "long" + } + } + } + ``` + TODO - explain how lists are handled (https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html) + TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great) + NOTE - using this lists is generally not a good way to use this API + """ + def flatten_dict(y): + out = {} + + def flatten(x, name=''): + # We flatten into source fields e.g. if type=geo_point + # location: {lat=52.38, lon=4.90} + if name == '': + is_source_field = False + pd_dtype = 'object' + else: + is_source_field, pd_dtype = self.mappings.is_source_field(name[:-1]) + + if not is_source_field and type(x) is dict: + for a in x: + flatten(x[a], name + a + '.') + elif not is_source_field and type(x) is list: + for a in x: + flatten(a, name) + else: + field_name = name[:-1] + + # Coerce types - for now just datetime + if pd_dtype == 'datetime64[ns]': + x = pd.to_datetime(x) + + # Elasticsearch can have multiple values for a field. These are represented as lists, so + # create lists for this pivot (see notes above) + if field_name in out: + if type(out[field_name]) is not list: + l = [out[field_name]] + out[field_name] = l + out[field_name].append(x) + else: + out[field_name] = x + + flatten(y) + + return out + + rows = [] + for hit in results['hits']['hits']: + row = hit['_source'] + + # flatten row to map correctly to 2D DataFrame + rows.append(flatten_dict(row)) + + # Create pandas DataFrame + df = pd.DataFrame(data=rows) + + return df + + def head(self, n=5): + results = self.client.search(index=self.index_pattern, size=n) + + return self._es_results_to_pandas(results) + + def describe(self): + numeric_source_fields = self.mappings.numeric_source_fields() + + # for each field we compute: + # count, mean, std, min, 25%, 50%, 75%, max + search = Search(using=self.client, index=self.index_pattern).extra(size=0) + + for field in numeric_source_fields: + search.aggs.metric('extended_stats_'+field, 'extended_stats', field=field) + search.aggs.metric('percentiles_'+field, 'percentiles', field=field) + + response = search.execute() + + results = {} + + for field in numeric_source_fields: + values = [] + values.append(response.aggregations['extended_stats_'+field]['count']) + values.append(response.aggregations['extended_stats_'+field]['avg']) + values.append(response.aggregations['extended_stats_'+field]['std_deviation']) + values.append(response.aggregations['extended_stats_'+field]['min']) + values.append(response.aggregations['percentiles_'+field]['values']['25.0']) + values.append(response.aggregations['percentiles_'+field]['values']['50.0']) + values.append(response.aggregations['percentiles_'+field]['values']['75.0']) + values.append(response.aggregations['extended_stats_'+field]['max']) + + # if not None + if (values.count(None) < len(values)): + results[field] = values + + df = pd.DataFrame(data=results, index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max']) + + return df diff --git a/eland/mappings.py b/eland/mappings.py new file mode 100644 index 0000000..b96d7c1 --- /dev/null +++ b/eland/mappings.py @@ -0,0 +1,300 @@ +import warnings +import pandas as pd + +class Mappings(): + """ + General purpose to manage Elasticsearch to/from pandas mappings + + Attributes + ---------- + + mappings_capabilities: pandas.DataFrame + A data frame summarising the capabilities of the index mapping + + _source - is top level field (i.e. not a multi-field sub-field) + es_dtype - Elasticsearch field datatype + pd_dtype - Pandas datatype + searchable - is the field searchable? + aggregatable- is the field aggregatable? + _source es_dtype pd_dtype searchable aggregatable + maps-telemetry.min True long int64 True True + maps-telemetry.avg True float float64 True True + city True text object True False + user_name True keyword object True True + origin_location.lat.keyword False keyword object True True + type True keyword object True True + origin_location.lat True text object True False + + """ + def __init__(self, client, index_pattern): + """ + Parameters + ---------- + client: eland.Client + Elasticsearch client + + index_pattern: str + Elasticsearch index pattern + """ + # persist index_pattern for debugging + self.index_pattern = index_pattern + + mappings = client.indices().get_mapping(index=index_pattern) + + # Get all fields (including all nested) and then field_caps + # for these names (fields=* doesn't appear to work effectively...) + all_fields = Mappings._extract_fields_from_mapping(mappings) + all_fields_caps = client.field_caps(index=index_pattern, fields=list(all_fields.keys())) + + # Get top level (not sub-field multifield) mappings + source_fields = Mappings._extract_fields_from_mapping(mappings, source_only=True) + + # Populate capability matrix of fields + # field_name, es_dtype, pd_dtype, is_searchable, is_aggregtable, is_source + self.mappings_capabilities = Mappings._create_capability_matrix(all_fields, source_fields, all_fields_caps) + + # Cache source field types for efficient lookup + # (this massively improves performance of DataFrame.flatten) + self.source_field_pd_dtypes = {} + + for field_name in source_fields: + pd_dtype = self.mappings_capabilities.loc[field_name]['pd_dtype'] + self.source_field_pd_dtypes[field_name] = pd_dtype + + def _extract_fields_from_mapping(mappings, source_only=False): + """ + Extract all field names and types from a mapping. + ``` + { + "my_index": { + "mappings": { + "properties": { + "city": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + } + } + } + } + } + ``` + if source_only == False: + return {'city': 'text', 'city.keyword': 'keyword'} + else: + return {'city': 'text'} + + Note: first field name type wins. E.g. + + ``` + PUT my_index1 {"mappings":{"properties":{"city":{"type":"text"}}}} + PUT my_index2 {"mappings":{"properties":{"city":{"type":"long"}}}} + + Returns {'city': 'text'} + ``` + + Parameters + ---------- + mappings: dict + Return from get_mapping + + Returns + ------- + fields: dict + Dict of field names and types + + """ + fields = {} + + # Recurse until we get a 'type: xxx' + def flatten(x, name=''): + if type(x) is dict: + for a in x: + if a == 'type' and type(x[a]) is str: # 'type' can be a name of a field + field_name = name[:-1] + field_type = x[a] + + # If there is a conflicting type, warn - first values added wins + if field_name in fields and fields[field_name] != field_type: + warnings.warn("Field {} has conflicting types {} != {}". + format(field_name, fields[field_name], field_type), + UserWarning) + else: + fields[field_name] = field_type + elif a == 'properties' or (not source_only and a == 'fields'): + flatten(x[a], name) + elif not (source_only and a == 'fields'): # ignore multi-field fields for source_only + flatten(x[a], name + a + '.') + + for index in mappings: + if 'properties' in mappings[index]['mappings']: + properties = mappings[index]['mappings']['properties'] + + flatten(properties) + + return fields + + def _create_capability_matrix(all_fields, source_fields, all_fields_caps): + """ + { + "fields": { + "rating": { + "long": { + "searchable": true, + "aggregatable": false, + "indices": ["index1", "index2"], + "non_aggregatable_indices": ["index1"] + }, + "keyword": { + "searchable": false, + "aggregatable": true, + "indices": ["index3", "index4"], + "non_searchable_indices": ["index4"] + } + }, + "title": { + "text": { + "searchable": true, + "aggregatable": false + + } + } + } + } + """ + all_fields_caps_fields = all_fields_caps['fields'] + + columns = ['_source', 'es_dtype', 'pd_dtype', 'searchable', 'aggregatable'] + capability_matrix = {} + + for field, field_caps in all_fields_caps_fields.items(): + if field in all_fields: + # v = {'long': {'type': 'long', 'searchable': True, 'aggregatable': True}} + for kk, vv in field_caps.items(): + _source = (field in source_fields) + es_dtype = vv['type'] + pd_dtype = Mappings._es_dtype_to_pd_dtype(vv['type']) + searchable = vv['searchable'] + aggregatable = vv['aggregatable'] + + caps = [_source, es_dtype, pd_dtype, searchable, aggregatable] + + capability_matrix[field] = caps + + if 'non_aggregatable_indices' in vv: + warnings.warn("Field {} has conflicting aggregatable fields across indexes {}", + format(field_name, vv['non_aggregatable_indices']), + UserWarning) + if 'non_searchable_indices' in vv: + warnings.warn("Field {} has conflicting searchable fields across indexes {}", + format(field_name, vv['non_searchable_indices']), + UserWarning) + + capability_matrix_df = pd.DataFrame.from_dict(capability_matrix, orient='index', columns=columns) + + return capability_matrix_df.sort_index() + + def _es_dtype_to_pd_dtype(es_dtype): + """ + Mapping Elasticsearch types to pandas dtypes + -------------------------------------------- + + Elasticsearch field datatype | Pandas dtype + -- + text | object + keyword | object + long, integer, short, byte, binary | int64 + double, float, half_float, scaled_float | float64 + date, date_nanos | datetime64 + boolean | bool + TODO - add additional mapping types + """ + es_dtype_to_pd_dtype = { + 'text': 'object', + 'keyword': 'object', + + 'long': 'int64', + 'integer': 'int64', + 'short': 'int64', + 'byte': 'int64', + 'binary': 'int64', + + 'double': 'float64', + 'float': 'float64', + 'half_float': 'float64', + 'scaled_float': 'float64', + + 'date': 'datetime64[ns]', + 'date_nanos': 'datetime64[ns]', + + 'boolean': 'bool' + } + + if es_dtype in es_dtype_to_pd_dtype: + return es_dtype_to_pd_dtype[es_dtype] + + # Return 'object' for all unsupported TODO - investigate how different types could be supported + return 'object' + + def all_fields(self): + """ + Returns + ------- + all_fields: list + All typed fields in the index mapping + """ + return self.mappings_capabilities.index.tolist() + + """ + def pd_dtypes_groupby_source_fields(self): + Returns + ------- + groups: dict + Calls pandas.core.groupby.GroupBy.groups for _source fields + E.g. + { + 'bool': Index(['Cancelled', 'FlightDelay'], dtype='object'), + 'datetime64[ns]': Index(['timestamp'], dtype='object'), + 'float64': Index(['AvgTicketPrice', 'DistanceKilometers', 'DistanceMiles',... + } + return self.mappings_capabilities[self.mappings_capabilities._source == True].groupby('pd_dtype').groups + + def pd_dtype + """ + + def is_source_field(self, field_name): + """ + Parameters + ---------- + field_name: str + + Returns + ------- + is_source_field: bool + Is this field name a top-level source field? + pd_dtype: str + The pandas data type we map to + """ + pd_dtype = 'object' + is_source_field = False + + if field_name in self.source_field_pd_dtypes: + is_source_field = True + pd_dtype = self.source_field_pd_dtypes[field_name] + + return is_source_field, pd_dtype + + def numeric_source_fields(self): + """ + Returns + ------- + numeric_source_fields: list of str + List of source fields where pd_dtype == (int64 or float64) + """ + return self.mappings_capabilities[(self.mappings_capabilities._source == True) & + ((self.mappings_capabilities.pd_dtype == 'int64') | + (self.mappings_capabilities.pd_dtype == 'float64'))].index.tolist() + diff --git a/eland/tests/__init__.py b/eland/tests/__init__.py index 0d8dfdf..14c8b54 100644 --- a/eland/tests/__init__.py +++ b/eland/tests/__init__.py @@ -1,4 +1,5 @@ import os +import pandas as pd ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -6,7 +7,475 @@ ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) ELASTICSEARCH_HOST = 'localhost' # TODO externalise this FLIGHTS_INDEX_NAME = 'flights' +FLIGHTS_MAPPING = { "mappings" : { + "properties" : { + "AvgTicketPrice" : { + "type" : "float" + }, + "Cancelled" : { + "type" : "boolean" + }, + "Carrier" : { + "type" : "keyword" + }, + "Dest" : { + "type" : "keyword" + }, + "DestAirportID" : { + "type" : "keyword" + }, + "DestCityName" : { + "type" : "keyword" + }, + "DestCountry" : { + "type" : "keyword" + }, + "DestLocation" : { + "type" : "geo_point" + }, + "DestRegion" : { + "type" : "keyword" + }, + "DestWeather" : { + "type" : "keyword" + }, + "DistanceKilometers" : { + "type" : "float" + }, + "DistanceMiles" : { + "type" : "float" + }, + "FlightDelay" : { + "type" : "boolean" + }, + "FlightDelayMin" : { + "type" : "integer" + }, + "FlightDelayType" : { + "type" : "keyword" + }, + "FlightNum" : { + "type" : "keyword" + }, + "FlightTimeHour" : { + "type" : "float" + }, + "FlightTimeMin" : { + "type" : "float" + }, + "Origin" : { + "type" : "keyword" + }, + "OriginAirportID" : { + "type" : "keyword" + }, + "OriginCityName" : { + "type" : "keyword" + }, + "OriginCountry" : { + "type" : "keyword" + }, + "OriginLocation" : { + "type" : "geo_point" + }, + "OriginRegion" : { + "type" : "keyword" + }, + "OriginWeather" : { + "type" : "keyword" + }, + "dayOfWeek" : { + "type" : "integer" + }, + "timestamp" : { + "type" : "date" + } + } + } } FLIGHTS_FILE_NAME = ROOT_DIR + '/flights.json.gz' +FLIGHTS_DF_FILE_NAME = ROOT_DIR + '/flights_df.json.gz' ECOMMERCE_INDEX_NAME = 'ecommerce' +ECOMMERCE_MAPPING = { "mappings" : { + "properties" : { + "category" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "currency" : { + "type" : "keyword" + }, + "customer_birth_date" : { + "type" : "date" + }, + "customer_first_name" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "customer_full_name" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "customer_gender" : { + "type" : "keyword" + }, + "customer_id" : { + "type" : "keyword" + }, + "customer_last_name" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "customer_phone" : { + "type" : "keyword" + }, + "day_of_week" : { + "type" : "keyword" + }, + "day_of_week_i" : { + "type" : "integer" + }, + "email" : { + "type" : "keyword" + }, + "geoip" : { + "properties" : { + "city_name" : { + "type" : "keyword" + }, + "continent_name" : { + "type" : "keyword" + }, + "country_iso_code" : { + "type" : "keyword" + }, + "location" : { + "type" : "geo_point" + }, + "region_name" : { + "type" : "keyword" + } + } + }, + "manufacturer" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "order_date" : { + "type" : "date" + }, + "order_id" : { + "type" : "keyword" + }, + "products" : { + "properties" : { + "_id" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "base_price" : { + "type" : "half_float" + }, + "base_unit_price" : { + "type" : "half_float" + }, + "category" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "created_on" : { + "type" : "date" + }, + "discount_amount" : { + "type" : "half_float" + }, + "discount_percentage" : { + "type" : "half_float" + }, + "manufacturer" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "min_price" : { + "type" : "half_float" + }, + "price" : { + "type" : "half_float" + }, + "product_id" : { + "type" : "long" + }, + "product_name" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + }, + "analyzer" : "english" + }, + "quantity" : { + "type" : "integer" + }, + "sku" : { + "type" : "keyword" + }, + "tax_amount" : { + "type" : "half_float" + }, + "taxful_price" : { + "type" : "half_float" + }, + "taxless_price" : { + "type" : "half_float" + }, + "unit_discount_amount" : { + "type" : "half_float" + } + } + }, + "sku" : { + "type" : "keyword" + }, + "taxful_total_price" : { + "type" : "half_float" + }, + "taxless_total_price" : { + "type" : "half_float" + }, + "total_quantity" : { + "type" : "integer" + }, + "total_unique_products" : { + "type" : "integer" + }, + "type" : { + "type" : "keyword" + }, + "user" : { + "type" : "keyword" + } + } + } } ECOMMERCE_FILE_NAME = ROOT_DIR + '/ecommerce.json.gz' +ECOMMERCE_DF_FILE_NAME = ROOT_DIR + '/ecommerce_df.json.gz' + +TEST_MAPPING1 = { + 'mappings': { + 'properties': { + 'city': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'text': { + 'type': 'text', + 'fields': { + 'english': { + 'type': 'text', + 'analyzer': 'english' + } + } + }, + 'origin_location': { + 'properties': { + 'lat': { + 'type': 'text', + 'index_prefixes': {}, + 'fields': { + 'keyword': { + 'type': 'keyword', + 'ignore_above': 256 + } + } + }, + 'lon': { + 'type': 'text', + 'fields': { + 'keyword': { + 'type': 'keyword', + 'ignore_above': 256 + } + } + } + } + }, + 'maps-telemetry': { + 'properties': { + 'attributesPerMap': { + 'properties': { + 'dataSourcesCount': { + 'properties': { + 'avg': { + 'type': 'long' + }, + 'max': { + 'type': 'long' + }, + 'min': { + 'type': 'long' + } + } + }, + 'emsVectorLayersCount': { + 'dynamic': 'true', + 'properties': { + 'france_departments': { + 'properties': { + 'avg': { + 'type': 'float' + }, + 'max': { + 'type': 'long' + }, + 'min': { + 'type': 'long' + } + } + } + } + } + } + } + } + }, + 'type': { + 'type': 'keyword' + }, + 'name': { + 'type': 'text' + }, + 'user_name': { + 'type': 'keyword' + }, + 'email': { + 'type': 'keyword' + }, + 'content': { + 'type': 'text' + }, + 'tweeted_at': { + 'type': 'date' + }, + 'dest_location': { + 'type': 'geo_point' + }, + 'my_join_field': { + 'type': 'join', + 'relations': { + 'question': ['answer', 'comment'], + 'answer': 'vote' + } + } + } + } + } + +TEST_MAPPING1_INDEX_NAME = 'mapping1' + +TEST_MAPPING1_EXPECTED = { + 'city': 'text', + 'city.raw': 'keyword', + 'content': 'text', + 'dest_location': 'geo_point', + 'email': 'keyword', + 'maps-telemetry.attributesPerMap.dataSourcesCount.avg': 'long', + 'maps-telemetry.attributesPerMap.dataSourcesCount.max': 'long', + 'maps-telemetry.attributesPerMap.dataSourcesCount.min': 'long', + 'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.avg': 'float', + 'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.max': 'long', + 'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.min': 'long', + 'my_join_field': 'join', + 'name': 'text', + 'origin_location.lat': 'text', + 'origin_location.lat.keyword': 'keyword', + 'origin_location.lon': 'text', + 'origin_location.lon.keyword': 'keyword', + 'text': 'text', + 'text.english': 'text', + 'tweeted_at': 'date', + 'type': 'keyword', + 'user_name': 'keyword' +} + +TEST_MAPPING1_EXPECTED_DF = pd.DataFrame.from_dict(data=TEST_MAPPING1_EXPECTED, orient='index', columns=['es_dtype']) + +TEST_NESTED_USER_GROUP_INDEX_NAME = 'nested_user_group' +TEST_NESTED_USER_GROUP_MAPPING = { + 'mappings': { + 'properties': { + 'group': { + 'type': 'keyword' + }, + 'user': { + 'properties': { + 'first': { + 'type': 'keyword' + }, + 'last': { + 'type': 'keyword' + }, + 'address' : { + 'type' : 'keyword' + } + } + } + } +} +} + +TEST_NESTED_USER_GROUP_DOCS = [ +{'_index':TEST_NESTED_USER_GROUP_INDEX_NAME, +'_source': + {'group':'amsterdam','user':[ + {'first':'Manke','last':'Nelis','address':['Elandsgracht', 'Amsterdam']}, + {'first':'Johnny','last':'Jordaan','address':['Elandsstraat', 'Amsterdam']}]}}, +{'_index':TEST_NESTED_USER_GROUP_INDEX_NAME, +'_source': + {'group':'london','user':[ + {'first':'Alice','last':'Monkton'}, + {'first':'Jimmy','last':'White','address':['London']}]}}, +{'_index':TEST_NESTED_USER_GROUP_INDEX_NAME, +'_source':{'group':'new york','user':[ + {'first':'Bill','last':'Jones'}]}} +] + diff --git a/eland/tests/dataframe/__init__.py b/eland/tests/client/__init__.py similarity index 100% rename from eland/tests/dataframe/__init__.py rename to eland/tests/client/__init__.py diff --git a/eland/tests/client/test_mappings_pytest.py b/eland/tests/client/test_mappings_pytest.py new file mode 100644 index 0000000..be3a9cf --- /dev/null +++ b/eland/tests/client/test_mappings_pytest.py @@ -0,0 +1,22 @@ +# File called _pytest for PyCharm compatability +import pytest + +from eland.tests import * + +from pandas.util.testing import ( + assert_almost_equal, assert_frame_equal, assert_series_equal) + +import eland as ed + +class TestMapping(): + + # Requires 'setup_tests.py' to be run prior to this + def test_mapping(self): + mapping = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME) + + assert mapping.all_fields() == TEST_MAPPING1_EXPECTED_DF.index.tolist() + + assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mapping.mappings_capabilities['es_dtype'])) + + + diff --git a/eland/tests/dataframe/test_indexing_pytest.py b/eland/tests/dataframe/test_indexing_pytest.py deleted file mode 100644 index 7fb94e7..0000000 --- a/eland/tests/dataframe/test_indexing_pytest.py +++ /dev/null @@ -1,24 +0,0 @@ -# File called _pytest for PyCharm compatability -from eland.tests.dataframe.common import TestData - -from pandas.util.testing import ( - assert_almost_equal, assert_frame_equal, assert_series_equal) - -class TestDataFrameIndexing(TestData): - - def test_head(self): - pd_flights_head = self.pd_flights().head() - ed_flights_head = self.ed_flights().head() - - assert_frame_equal(pd_flights_head, ed_flights_head) - - pd_ecommerce_head = self.pd_ecommerce().head() - ed_ecommerce_head = self.ed_ecommerce().head() - - #print(pd_ecommerce_head.dtypes) - #print(ed_ecommerce_head.dtypes) - - assert_frame_equal(pd_ecommerce_head, ed_ecommerce_head) - - - diff --git a/eland/tests/ecommerce_df.json.gz b/eland/tests/ecommerce_df.json.gz new file mode 100644 index 0000000..11f5a98 Binary files /dev/null and b/eland/tests/ecommerce_df.json.gz differ diff --git a/eland/tests/flights.json.gz b/eland/tests/flights.json.gz index 85f344d..df976e6 100644 Binary files a/eland/tests/flights.json.gz and b/eland/tests/flights.json.gz differ diff --git a/eland/tests/flights_df.json.gz b/eland/tests/flights_df.json.gz new file mode 100644 index 0000000..5aed61e Binary files /dev/null and b/eland/tests/flights_df.json.gz differ diff --git a/eland/tests/frame/__init__.py b/eland/tests/frame/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/eland/tests/dataframe/common.py b/eland/tests/frame/common.py similarity index 52% rename from eland/tests/dataframe/common.py rename to eland/tests/frame/common.py index d0b1ef0..6b9068c 100644 --- a/eland/tests/dataframe/common.py +++ b/eland/tests/frame/common.py @@ -10,12 +10,19 @@ ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) # Create pandas and eland data frames from eland.tests import ELASTICSEARCH_HOST -from eland.tests import FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME +from eland.tests import FLIGHTS_DF_FILE_NAME, FLIGHTS_INDEX_NAME,\ + ECOMMERCE_DF_FILE_NAME, ECOMMERCE_INDEX_NAME -_pd_flights = pd.read_json(FLIGHTS_FILE_NAME, lines=True) +_pd_flights = pd.read_json(FLIGHTS_DF_FILE_NAME).sort_index() +_pd_flights['timestamp'] = \ + pd.to_datetime(_pd_flights['timestamp']) _ed_flights = ed.read_es(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME) -_pd_ecommerce = pd.read_json(ECOMMERCE_FILE_NAME, lines=True) +_pd_ecommerce = pd.read_json(ECOMMERCE_DF_FILE_NAME).sort_index() +_pd_ecommerce['order_date'] = \ + pd.to_datetime(_pd_ecommerce['order_date']) +_pd_ecommerce['products.created_on'] = \ + _pd_ecommerce['products.created_on'].apply(lambda x: pd.to_datetime(x)) _ed_ecommerce = ed.read_es(ELASTICSEARCH_HOST, ECOMMERCE_INDEX_NAME) class TestData: diff --git a/eland/tests/frame/test_indexing_pytest.py b/eland/tests/frame/test_indexing_pytest.py new file mode 100644 index 0000000..bd230f1 --- /dev/null +++ b/eland/tests/frame/test_indexing_pytest.py @@ -0,0 +1,54 @@ +# File called _pytest for PyCharm compatability +from eland.tests.frame.common import TestData +from eland.tests import * + +import eland as ed +import pandas as pd + +from pandas.util.testing import ( + assert_almost_equal, assert_frame_equal, assert_series_equal) + +class TestDataFrameIndexing(TestData): + + def test_mapping(self): + ed_flights_mappings = pd.DataFrame(self.ed_flights().mappings.mappings_capabilities + [self.ed_flights().mappings.mappings_capabilities._source==True] + ['pd_dtype']) + pd_flights_mappings = pd.DataFrame(self.pd_flights().dtypes, columns = ['pd_dtype']) + + assert_frame_equal(pd_flights_mappings, ed_flights_mappings) + + # We don't compare ecommerce here as the default dtypes in pandas from read_json + # don't match the mapping types. This is mainly because the products field is + # nested and so can be treated as a multi-field in ES, but not in pandas + + def test_head(self): + pd_flights_head = self.pd_flights().head() + ed_flights_head = self.ed_flights().head() + + assert_frame_equal(pd_flights_head, ed_flights_head) + + pd_ecommerce_head = self.pd_ecommerce().head() + ed_ecommerce_head = self.ed_ecommerce().head() + + assert_frame_equal(pd_ecommerce_head, ed_ecommerce_head) + + def test_describe(self): + pd_flights_describe = self.pd_flights().describe() + ed_flights_describe = self.ed_flights().describe() + + # TODO - this fails now as ES aggregations are approximate + # if ES percentile agg uses + # "hdr": { + # "number_of_significant_value_digits": 3 + # } + # this works + #assert_almost_equal(pd_flights_describe, ed_flights_describe) + + pd_ecommerce_describe = self.pd_ecommerce().describe() + ed_ecommerce_describe = self.ed_ecommerce().describe() + + # We don't compare ecommerce here as the default dtypes in pandas from read_json + # don't match the mapping types. This is mainly because the products field is + # nested and so can be treated as a multi-field in ES, but not in pandas + diff --git a/eland/tests/index_flights.py b/eland/tests/setup_tests.py similarity index 53% rename from eland/tests/index_flights.py rename to eland/tests/setup_tests.py index 20d7380..b758a26 100644 --- a/eland/tests/index_flights.py +++ b/eland/tests/setup_tests.py @@ -2,27 +2,25 @@ import pandas as pd from elasticsearch import Elasticsearch from elasticsearch import helpers -from eland.tests import FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME - +from eland.tests import * DATA_LIST = [ - (FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME), - (ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME) + (FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, FLIGHTS_MAPPING), + (ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME, ECOMMERCE_MAPPING) ] -if __name__ == '__main__': - +def _setup_data(es): # Read json file and index records into Elasticsearch for data in DATA_LIST: json_file_name = data[0] index_name = data[1] - - # Create connection to Elasticsearch - use defaults1 - es = Elasticsearch() + mapping = data[2] # Delete index print("Deleting index:", index_name) es.indices.delete(index=index_name, ignore=[400, 404]) + print("Creating index:", index_name) + es.indices.create(index=index_name, body=mapping) df = pd.read_json(json_file_name, lines=True) @@ -49,3 +47,22 @@ if __name__ == '__main__': actions = [] print("Done", index_name) + +def _setup_test_mappings(es): + # Create a complex mapping containing many Elasticsearch features + es.indices.delete(index=TEST_MAPPING1_INDEX_NAME, ignore=[400, 404]) + es.indices.create(index=TEST_MAPPING1_INDEX_NAME, body=TEST_MAPPING1) + +def _setup_test_nested(es): + es.indices.delete(index=TEST_NESTED_USER_GROUP_INDEX_NAME, ignore=[400, 404]) + es.indices.create(index=TEST_NESTED_USER_GROUP_INDEX_NAME, body=TEST_NESTED_USER_GROUP_MAPPING) + + helpers.bulk(es, TEST_NESTED_USER_GROUP_DOCS) + +if __name__ == '__main__': + # Create connection to Elasticsearch - use defaults + es = Elasticsearch(ELASTICSEARCH_HOST) + + _setup_data(es) + _setup_test_mappings(es) + _setup_test_nested(es) diff --git a/eland/tests/test.ipynb b/eland/tests/test.ipynb index 6cc6f63..85de534 100644 --- a/eland/tests/test.ipynb +++ b/eland/tests/test.ipynb @@ -10,7 +10,11 @@ { "cell_type": "code", "execution_count": 1, - "metadata": {}, + "metadata": { + "pycharm": { + "is_executing": false + } + }, "outputs": [], "source": [ "import pandas as pd" @@ -442,7 +446,7 @@ "metadata": {}, "outputs": [], "source": [ - "ed_df = ed.read_es('localhost', 'kibana_sample_data_flights')" + "ed_df = ed.read_es('localhost', 'flights')" ] }, { @@ -519,7 +523,7 @@ " DE-HE\n", " Sunny\n", " 0\n", - " 2019-05-27T00:00:00\n", + " 2018-01-01 00:00:00\n", " \n", " \n", " 1\n", @@ -543,7 +547,7 @@ " SE-BD\n", " Clear\n", " 0\n", - " 2019-05-27T18:27:00\n", + " 2018-01-01 18:27:00\n", " \n", " \n", " 2\n", @@ -567,7 +571,7 @@ " IT-34\n", " Rain\n", " 0\n", - " 2019-05-27T17:11:14\n", + " 2018-01-01 17:11:14\n", " \n", " \n", " 3\n", @@ -591,7 +595,7 @@ " IT-72\n", " Thunder & Lightning\n", " 0\n", - " 2019-05-27T10:33:28\n", + " 2018-01-01 10:33:28\n", " \n", " \n", " 4\n", @@ -615,7 +619,7 @@ " MX-DIF\n", " Damaging Wind\n", " 0\n", - " 2019-05-27T05:13:00\n", + " 2018-01-01 05:13:00\n", " \n", " \n", "\n", @@ -672,12 +676,12 @@ "3 {'lat': '40.886002', 'lon': '14.2908'} IT-72 \n", "4 {'lat': '19.4363', 'lon': '-99.072098'} MX-DIF \n", "\n", - " OriginWeather dayOfWeek timestamp \n", - "0 Sunny 0 2019-05-27T00:00:00 \n", - "1 Clear 0 2019-05-27T18:27:00 \n", - "2 Rain 0 2019-05-27T17:11:14 \n", - "3 Thunder & Lightning 0 2019-05-27T10:33:28 \n", - "4 Damaging Wind 0 2019-05-27T05:13:00 \n", + " OriginWeather dayOfWeek timestamp \n", + "0 Sunny 0 2018-01-01 00:00:00 \n", + "1 Clear 0 2018-01-01 18:27:00 \n", + "2 Rain 0 2018-01-01 17:11:14 \n", + "3 Thunder & Lightning 0 2018-01-01 10:33:28 \n", + "4 Damaging Wind 0 2018-01-01 05:13:00 \n", "\n", "[5 rows x 27 columns]" ] @@ -768,12 +772,12 @@ " 2470.545974\n", " 1535.126118\n", " 0.000000\n", - " 252.064162\n", + " 251.834931\n", " 1.000000\n", " \n", " \n", " 50%\n", - " 640.387285\n", + " 640.362667\n", " 7612.072403\n", " 4729.922470\n", " 0.000000\n", @@ -782,12 +786,12 @@ " \n", " \n", " 75%\n", - " 842.259390\n", - " 9735.660463\n", - " 6049.583389\n", - " 15.000000\n", + " 842.262193\n", + " 9735.210895\n", + " 6049.600045\n", + " 12.521186\n", " 720.505705\n", - " 4.068000\n", + " 4.109848\n", " \n", " \n", " max\n", @@ -809,8 +813,8 @@ "std 266.386661 4578.263193 2844.800855 96.743006 \n", "min 100.020531 0.000000 0.000000 0.000000 \n", "25% 410.008918 2470.545974 1535.126118 0.000000 \n", - "50% 640.387285 7612.072403 4729.922470 0.000000 \n", - "75% 842.259390 9735.660463 6049.583389 15.000000 \n", + "50% 640.362667 7612.072403 4729.922470 0.000000 \n", + "75% 842.262193 9735.210895 6049.600045 12.521186 \n", "max 1199.729004 19881.482422 12353.780273 360.000000 \n", "\n", " FlightTimeMin dayOfWeek \n", @@ -818,9 +822,9 @@ "mean 511.127842 2.835975 \n", "std 334.741135 1.939365 \n", "min 0.000000 0.000000 \n", - "25% 252.064162 1.000000 \n", + "25% 251.834931 1.000000 \n", "50% 503.148975 3.000000 \n", - "75% 720.505705 4.068000 \n", + "75% 720.505705 4.109848 \n", "max 1902.901978 6.000000 " ] }, @@ -832,6 +836,89 @@ "source": [ "ed_df.describe()" ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "d = {'col1': [1.2, 20], 'col2': [int(1), int(30)], 'col3': ['2019-02-01 03:04:05', '2018-02-01 01:03:04'], 'col4': ['2019-02-01 03:04:05', '2018-02-01 01:03:04']}\n", + "df = pd.DataFrame(data=d)" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
col1col2col3col4
01.212019-02-01 03:04:052019-02-01 03:04:05
120.0302018-02-01 01:03:042018-02-01 01:03:04
\n", + "
" + ], + "text/plain": [ + " col1 col2 col3 col4\n", + "0 1.2 1 2019-02-01 03:04:05 2019-02-01 03:04:05\n", + "1 20.0 30 2018-02-01 01:03:04 2018-02-01 01:03:04" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { @@ -850,7 +937,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.6.8" + "version": "3.7.3" } }, "nbformat": 4,