mirror of
https://github.com/elastic/eland.git
synced 2025-07-11 00:02:14 +08:00
Major refactor
Addition of new mapping module to manage mapping between pandas and ES. More tests.
This commit is contained in:
parent
674ac129e6
commit
0fa3f81bbb
@ -1,3 +1,4 @@
|
||||
from .utils import *
|
||||
from .frame import *
|
||||
from .client import *
|
||||
from .mappings import *
|
||||
|
@ -1,8 +1,9 @@
|
||||
from elasticsearch import Elasticsearch
|
||||
|
||||
# eland client - implement as facade to control access to Elasticsearch methods
|
||||
class Client(object):
|
||||
|
||||
class Client():
|
||||
"""
|
||||
eland client - implemented as facade to control access to Elasticsearch methods
|
||||
"""
|
||||
def __init__(self, es=None):
|
||||
if isinstance(es, Elasticsearch):
|
||||
self.es = es
|
||||
@ -20,4 +21,3 @@ class Client(object):
|
||||
|
||||
def field_caps(self, **kwargs):
|
||||
return self.es.field_caps(**kwargs)
|
||||
|
||||
|
235
eland/frame.py
235
eland/frame.py
@ -23,7 +23,7 @@ Similarly, only Elasticsearch searchable fields can be searched or filtered, and
|
||||
only Elasticsearch aggregatable fields can be aggregated or grouped.
|
||||
|
||||
"""
|
||||
import eland
|
||||
import eland as ed
|
||||
|
||||
from elasticsearch import Elasticsearch
|
||||
from elasticsearch_dsl import Search
|
||||
@ -66,94 +66,179 @@ class DataFrame():
|
||||
If the Elasticsearch index is deleted or index mappings are changed after this
|
||||
object is created, the object is not rebuilt and so inconsistencies can occur.
|
||||
|
||||
Mapping Elasticsearch types to pandas dtypes
|
||||
--------------------------------------------
|
||||
|
||||
Elasticsearch field datatype | Pandas dtype
|
||||
--
|
||||
text | object
|
||||
keyword | object
|
||||
long, integer, short, byte, binary | int64
|
||||
double, float, half_float, scaled_float | float64
|
||||
date, date_nanos | datetime64[ns]
|
||||
boolean | bool
|
||||
TODO - add additional mapping types
|
||||
"""
|
||||
def __init__(self, client, index_pattern):
|
||||
self.client = eland.Client(client)
|
||||
self.client = ed.Client(client)
|
||||
self.index_pattern = index_pattern
|
||||
|
||||
# Get and persist mappings, this allows use to correctly
|
||||
# Get and persist mappings, this allows us to correctly
|
||||
# map returned types from Elasticsearch to pandas datatypes
|
||||
mapping = self.client.indices().get_mapping(index=self.index_pattern)
|
||||
#field_caps = self.client.field_caps(index=self.index_pattern, fields='*')
|
||||
|
||||
#self.fields, self.aggregatable_fields, self.searchable_fields = \
|
||||
# DataFrame._es_mappings_to_pandas(mapping, field_caps)
|
||||
self.fields = DataFrame._es_mappings_to_pandas(mapping)
|
||||
|
||||
@staticmethod
|
||||
def _flatten_results(prefix, results, result):
|
||||
# TODO
|
||||
return prefix
|
||||
self.mappings = ed.Mappings(self.client, self.index_pattern)
|
||||
|
||||
def _es_results_to_pandas(self, results):
|
||||
# TODO - resolve nested fields
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
results: dict
|
||||
Elasticsearch results from self.client.search
|
||||
|
||||
Returns
|
||||
-------
|
||||
df: pandas.DataFrame
|
||||
_source values extracted from results and mapped to pandas DataFrame
|
||||
dtypes are mapped via Mapping object
|
||||
|
||||
Notes
|
||||
-----
|
||||
Fields containing lists in Elasticsearch don't map easily to pandas.DataFrame
|
||||
For example, an index with mapping:
|
||||
```
|
||||
"mappings" : {
|
||||
"properties" : {
|
||||
"group" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"user" : {
|
||||
"type" : "nested",
|
||||
"properties" : {
|
||||
"first" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"last" : {
|
||||
"type" : "keyword"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
Adding a document:
|
||||
```
|
||||
"_source" : {
|
||||
"group" : "amsterdam",
|
||||
"user" : [
|
||||
{
|
||||
"first" : "John",
|
||||
"last" : "Smith"
|
||||
},
|
||||
{
|
||||
"first" : "Alice",
|
||||
"last" : "White"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
(https://www.elastic.co/guide/en/elasticsearch/reference/current/nested.html)
|
||||
this would be transformed internally (in Elasticsearch) into a document that looks more like this:
|
||||
```
|
||||
{
|
||||
"group" : "amsterdam",
|
||||
"user.first" : [ "alice", "john" ],
|
||||
"user.last" : [ "smith", "white" ]
|
||||
}
|
||||
```
|
||||
When mapping this a pandas data frame we mimic this transformation.
|
||||
|
||||
Similarly, if a list is added to Elasticsearch:
|
||||
```
|
||||
PUT my_index/_doc/1
|
||||
{
|
||||
"list" : [
|
||||
0, 1, 2
|
||||
]
|
||||
}
|
||||
```
|
||||
The mapping is:
|
||||
```
|
||||
"mappings" : {
|
||||
"properties" : {
|
||||
"user" : {
|
||||
"type" : "long"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
TODO - explain how lists are handled (https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html)
|
||||
TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great)
|
||||
NOTE - using this lists is generally not a good way to use this API
|
||||
"""
|
||||
def flatten_dict(y):
|
||||
out = {}
|
||||
|
||||
def flatten(x, name=''):
|
||||
# We flatten into source fields e.g. if type=geo_point
|
||||
# location: {lat=52.38, lon=4.90}
|
||||
if name == '':
|
||||
is_source_field = False
|
||||
pd_dtype = 'object'
|
||||
else:
|
||||
is_source_field, pd_dtype = self.mappings.is_source_field(name[:-1])
|
||||
|
||||
if not is_source_field and type(x) is dict:
|
||||
for a in x:
|
||||
flatten(x[a], name + a + '.')
|
||||
elif not is_source_field and type(x) is list:
|
||||
for a in x:
|
||||
flatten(a, name)
|
||||
else:
|
||||
field_name = name[:-1]
|
||||
|
||||
# Coerce type
|
||||
if pd_dtype == 'datetime64':
|
||||
x = pd.to_datetime(x)
|
||||
print(field_name, pd_dtype, x, type(x))
|
||||
|
||||
# Elasticsearch can have multiple values for a field. These are represented as lists, so
|
||||
# create lists for this pivot (see notes above)
|
||||
if field_name in out:
|
||||
if type(out[field_name]) is not list:
|
||||
l = [out[field_name]]
|
||||
out[field_name] = l
|
||||
out[field_name].append(x)
|
||||
else:
|
||||
out[field_name] = x
|
||||
|
||||
flatten(y)
|
||||
|
||||
return out
|
||||
|
||||
rows = []
|
||||
i = 0
|
||||
for hit in results['hits']['hits']:
|
||||
row = hit['_source']
|
||||
rows.append(row)
|
||||
|
||||
# flatten row to map correctly to 2D DataFrame
|
||||
rows.append(flatten_dict(row))
|
||||
|
||||
i = i + 1
|
||||
if i % 100 == 0:
|
||||
print(i)
|
||||
|
||||
# Create pandas DataFrame
|
||||
df = pd.DataFrame(data=rows)
|
||||
|
||||
"""
|
||||
# Coerce types
|
||||
pd_dtypes = self.mappings.source_fields_pd_dtypes()
|
||||
|
||||
# This returns types such as:
|
||||
# {
|
||||
# 'bool': Index(['Cancelled', 'FlightDelay'], dtype='object'),
|
||||
# 'datetime64[ns]': Index(['timestamp'], dtype='object'),
|
||||
# 'float64': Index(['AvgTicketPrice', 'DistanceKilometers', 'DistanceMiles',...
|
||||
# }
|
||||
|
||||
for pd_dtype, value in pd_dtypes.items():
|
||||
# Types generally convert well e.g. 1,2,3 -> int64, 1.1,2.2,3.3 -> float64
|
||||
# so to minimise work we only convert special types.
|
||||
# TODO - add option to force all conversion
|
||||
if pd_dtype == 'datetime64':
|
||||
print(df.loc[:,value.tolist()])
|
||||
df.loc[:,value.tolist()] = df.loc[:,value.tolist()].astype('datetime64')
|
||||
"""
|
||||
|
||||
return df
|
||||
|
||||
@staticmethod
|
||||
def _extract_types_from_mapping(y):
|
||||
"""
|
||||
Extract data types from mapping for DataFrame columns.
|
||||
|
||||
Elasticsearch _source data is transformed into pandas DataFrames. This strategy is not compatible
|
||||
with all Elasticsearch configurations. Notes:
|
||||
|
||||
- This strategy is not compatible with all Elasticsearch configurations. If _source is disabled
|
||||
(https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-source-field.html#disable-source-field)
|
||||
no data values will be populated
|
||||
- Sub-fields (e.g. english.text in {"mappings":{"properties":{"text":{"type":"text","fields":{"english":{"type":"text","analyzer":"english"}}}}}})
|
||||
are not be used
|
||||
"""
|
||||
out = {}
|
||||
|
||||
# Recurse until we get a 'type: xxx' - ignore sub-fields
|
||||
def flatten(x, name=''):
|
||||
if type(x) is dict:
|
||||
for a in x:
|
||||
if a == 'type' and type(x[a]) is str: # 'type' can be a name of a field
|
||||
out[name[:-1]] = x[a]
|
||||
if a == 'properties' or a == 'fields':
|
||||
flatten(x[a], name)
|
||||
else:
|
||||
flatten(x[a], name + a + '.')
|
||||
|
||||
flatten(y)
|
||||
|
||||
return out
|
||||
|
||||
@staticmethod
|
||||
def _es_mappings_to_pandas(mappings):
|
||||
fields = {}
|
||||
for index in mappings:
|
||||
if 'properties' in mappings[index]['mappings']:
|
||||
properties = mappings[index]['mappings']['properties']
|
||||
|
||||
datatypes = DataFrame._extract_types_from_mapping(properties)
|
||||
|
||||
# Note there could be conflicts here - e.g. the same field name with different semantics in
|
||||
# different indexes - currently the last one wins TODO: review this
|
||||
fields.update(datatypes)
|
||||
|
||||
return pd.DataFrame.from_dict(data=fields, orient='index', columns=['datatype'])
|
||||
|
||||
def head(self, n=5):
|
||||
results = self.client.search(index=self.index_pattern, size=n)
|
||||
|
||||
@ -161,8 +246,6 @@ class DataFrame():
|
||||
|
||||
def describe(self):
|
||||
# First get all types
|
||||
#mapping = self.client.indices().get_mapping(index=self.index_pattern)
|
||||
mapping = self.client.indices().get_mapping(index=self.index_pattern)
|
||||
|
||||
fields = DataFrame._es_mappings_to_pandas(mapping)
|
||||
|
||||
|
286
eland/mappings.py
Normal file
286
eland/mappings.py
Normal file
@ -0,0 +1,286 @@
|
||||
import warnings
|
||||
import pandas as pd
|
||||
|
||||
class Mappings():
|
||||
"""
|
||||
General purpose to manage Elasticsearch to/from pandas mappings
|
||||
|
||||
Attributes
|
||||
----------
|
||||
|
||||
mappings_capabilities: pandas.DataFrame
|
||||
A data frame summarising the capabilities of the index mapping
|
||||
|
||||
_source - is top level field (i.e. not a multi-field sub-field)
|
||||
es_dtype - Elasticsearch field datatype
|
||||
pd_dtype - Pandas datatype
|
||||
searchable - is the field searchable?
|
||||
aggregatable- is the field aggregatable?
|
||||
_source es_dtype pd_dtype searchable aggregatable
|
||||
maps-telemetry.min True long int64 True True
|
||||
maps-telemetry.avg True float float64 True True
|
||||
city True text object True False
|
||||
user_name True keyword object True True
|
||||
origin_location.lat.keyword False keyword object True True
|
||||
type True keyword object True True
|
||||
origin_location.lat True text object True False
|
||||
|
||||
"""
|
||||
def __init__(self, client, index_pattern):
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
client: eland.Client
|
||||
Elasticsearch client
|
||||
|
||||
index_pattern: str
|
||||
Elasticsearch index pattern
|
||||
"""
|
||||
# persist index_pattern for debugging
|
||||
self.index_pattern = index_pattern
|
||||
|
||||
mappings = client.indices().get_mapping(index=index_pattern)
|
||||
|
||||
# Get all fields (including all nested) and then field_caps
|
||||
# for these names (fields=* doesn't appear to work effectively...)
|
||||
all_fields = Mappings._extract_fields_from_mapping(mappings)
|
||||
all_fields_caps = client.field_caps(index=index_pattern, fields=list(all_fields.keys()))
|
||||
|
||||
# Get top level (not sub-field multifield) mappings
|
||||
source_fields = Mappings._extract_fields_from_mapping(mappings, source_only=True)
|
||||
|
||||
# Populate capability matrix of fields
|
||||
# field_name, es_dtype, pd_dtype, is_searchable, is_aggregtable, is_source
|
||||
self.mappings_capabilities = Mappings._create_capability_matrix(all_fields, source_fields, all_fields_caps)
|
||||
|
||||
# Cache source field types for efficient lookup
|
||||
# (this massively improves performance of DataFrame.flatten)
|
||||
self.source_field_pd_dtypes = {}
|
||||
|
||||
for field_name in source_fields:
|
||||
pd_dtype = self.mappings_capabilities.loc[field_name]['pd_dtype']
|
||||
self.source_field_pd_dtypes[field_name] = pd_dtype
|
||||
|
||||
def _extract_fields_from_mapping(mappings, source_only=False):
|
||||
"""
|
||||
Extract all field names and types from a mapping.
|
||||
```
|
||||
{
|
||||
"my_index": {
|
||||
"mappings": {
|
||||
"properties": {
|
||||
"city": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
if source_only == False:
|
||||
return {'city': 'text', 'city.keyword': 'keyword'}
|
||||
else:
|
||||
return {'city': 'text'}
|
||||
|
||||
Note: first field name type wins. E.g.
|
||||
|
||||
```
|
||||
PUT my_index1 {"mappings":{"properties":{"city":{"type":"text"}}}}
|
||||
PUT my_index2 {"mappings":{"properties":{"city":{"type":"long"}}}}
|
||||
|
||||
Returns {'city': 'text'}
|
||||
```
|
||||
|
||||
Parameters
|
||||
----------
|
||||
mappings: dict
|
||||
Return from get_mapping
|
||||
|
||||
Returns
|
||||
-------
|
||||
fields: dict
|
||||
Dict of field names and types
|
||||
|
||||
"""
|
||||
fields = {}
|
||||
|
||||
# Recurse until we get a 'type: xxx'
|
||||
def flatten(x, name=''):
|
||||
if type(x) is dict:
|
||||
for a in x:
|
||||
if a == 'type' and type(x[a]) is str: # 'type' can be a name of a field
|
||||
field_name = name[:-1]
|
||||
field_type = x[a]
|
||||
|
||||
# If there is a conflicting type, warn - first values added wins
|
||||
if field_name in fields and fields[field_name] != field_type:
|
||||
warnings.warn("Field {} has conflicting types {} != {}".
|
||||
format(field_name, fields[field_name], field_type),
|
||||
UserWarning)
|
||||
else:
|
||||
fields[field_name] = field_type
|
||||
elif a == 'properties' or (not source_only and a == 'fields'):
|
||||
flatten(x[a], name)
|
||||
elif not (source_only and a == 'fields'): # ignore multi-field fields for source_only
|
||||
flatten(x[a], name + a + '.')
|
||||
|
||||
for index in mappings:
|
||||
if 'properties' in mappings[index]['mappings']:
|
||||
properties = mappings[index]['mappings']['properties']
|
||||
|
||||
flatten(properties)
|
||||
|
||||
return fields
|
||||
|
||||
def _create_capability_matrix(all_fields, source_fields, all_fields_caps):
|
||||
"""
|
||||
{
|
||||
"fields": {
|
||||
"rating": {
|
||||
"long": {
|
||||
"searchable": true,
|
||||
"aggregatable": false,
|
||||
"indices": ["index1", "index2"],
|
||||
"non_aggregatable_indices": ["index1"]
|
||||
},
|
||||
"keyword": {
|
||||
"searchable": false,
|
||||
"aggregatable": true,
|
||||
"indices": ["index3", "index4"],
|
||||
"non_searchable_indices": ["index4"]
|
||||
}
|
||||
},
|
||||
"title": {
|
||||
"text": {
|
||||
"searchable": true,
|
||||
"aggregatable": false
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
all_fields_caps_fields = all_fields_caps['fields']
|
||||
|
||||
columns = ['_source', 'es_dtype', 'pd_dtype', 'searchable', 'aggregatable']
|
||||
capability_matrix = {}
|
||||
|
||||
for field, field_caps in all_fields_caps_fields.items():
|
||||
if field in all_fields:
|
||||
# v = {'long': {'type': 'long', 'searchable': True, 'aggregatable': True}}
|
||||
for kk, vv in field_caps.items():
|
||||
_source = (field in source_fields)
|
||||
es_dtype = vv['type']
|
||||
pd_dtype = Mappings._es_dtype_to_pd_dtype(vv['type'])
|
||||
searchable = vv['searchable']
|
||||
aggregatable = vv['aggregatable']
|
||||
|
||||
caps = [_source, es_dtype, pd_dtype, searchable, aggregatable]
|
||||
|
||||
capability_matrix[field] = caps
|
||||
|
||||
if 'non_aggregatable_indices' in vv:
|
||||
warnings.warn("Field {} has conflicting aggregatable fields across indexes {}",
|
||||
format(field_name, vv['non_aggregatable_indices']),
|
||||
UserWarning)
|
||||
if 'non_searchable_indices' in vv:
|
||||
warnings.warn("Field {} has conflicting searchable fields across indexes {}",
|
||||
format(field_name, vv['non_searchable_indices']),
|
||||
UserWarning)
|
||||
|
||||
capability_matrix_df = pd.DataFrame.from_dict(capability_matrix, orient='index', columns=columns)
|
||||
|
||||
return capability_matrix_df.sort_index()
|
||||
|
||||
def _es_dtype_to_pd_dtype(es_dtype):
|
||||
"""
|
||||
Mapping Elasticsearch types to pandas dtypes
|
||||
--------------------------------------------
|
||||
|
||||
Elasticsearch field datatype | Pandas dtype
|
||||
--
|
||||
text | object
|
||||
keyword | object
|
||||
long, integer, short, byte, binary | int64
|
||||
double, float, half_float, scaled_float | float64
|
||||
date, date_nanos | datetime64
|
||||
boolean | bool
|
||||
TODO - add additional mapping types
|
||||
"""
|
||||
es_dtype_to_pd_dtype = {
|
||||
'text': 'object',
|
||||
'keyword': 'object',
|
||||
|
||||
'long': 'int64',
|
||||
'integer': 'int64',
|
||||
'short': 'int64',
|
||||
'byte': 'int64',
|
||||
'binary': 'int64',
|
||||
|
||||
'double': 'float64',
|
||||
'float': 'float64',
|
||||
'half_float': 'float64',
|
||||
'scaled_float': 'float64',
|
||||
|
||||
'date': 'datetime64',
|
||||
'date_nanos': 'datetime64',
|
||||
|
||||
'boolean': 'bool'
|
||||
}
|
||||
|
||||
if es_dtype in es_dtype_to_pd_dtype:
|
||||
return es_dtype_to_pd_dtype[es_dtype]
|
||||
|
||||
# Return 'object' for all unsupported TODO - investigate how different types could be supported
|
||||
return 'object'
|
||||
|
||||
def all_fields(self):
|
||||
"""
|
||||
Returns
|
||||
-------
|
||||
all_fields: list
|
||||
All typed fields in the index mapping
|
||||
"""
|
||||
return self.mappings_capabilities.index.tolist()
|
||||
|
||||
def source_fields_pd_dtypes(self):
|
||||
"""
|
||||
Returns
|
||||
-------
|
||||
groups: dict
|
||||
Calls pandas.core.groupby.GroupBy.groups for _source fields
|
||||
E.g.
|
||||
{
|
||||
'bool': Index(['Cancelled', 'FlightDelay'], dtype='object'),
|
||||
'datetime64[ns]': Index(['timestamp'], dtype='object'),
|
||||
'float64': Index(['AvgTicketPrice', 'DistanceKilometers', 'DistanceMiles',...
|
||||
}
|
||||
"""
|
||||
return self.mappings_capabilities[self.mappings_capabilities._source == True].groupby('pd_dtype').groups
|
||||
|
||||
def is_source_field(self, field_name):
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
field_name: str
|
||||
|
||||
Returns
|
||||
-------
|
||||
is_source_field: bool
|
||||
Is this field name a top-level source field?
|
||||
pd_dtype: str
|
||||
The pandas data type we map to
|
||||
"""
|
||||
pd_dtype = 'object'
|
||||
is_source_field = False
|
||||
|
||||
if field_name in self.source_field_pd_dtypes:
|
||||
is_source_field = True
|
||||
pd_dtype = self.source_field_pd_dtypes[field_name]
|
||||
|
||||
return is_source_field, pd_dtype
|
@ -7,10 +7,294 @@ ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
ELASTICSEARCH_HOST = 'localhost' # TODO externalise this
|
||||
|
||||
FLIGHTS_INDEX_NAME = 'flights'
|
||||
FLIGHTS_MAPPING = { "mappings" : {
|
||||
"properties" : {
|
||||
"AvgTicketPrice" : {
|
||||
"type" : "float"
|
||||
},
|
||||
"Cancelled" : {
|
||||
"type" : "boolean"
|
||||
},
|
||||
"Carrier" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"Dest" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"DestAirportID" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"DestCityName" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"DestCountry" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"DestLocation" : {
|
||||
"type" : "geo_point"
|
||||
},
|
||||
"DestRegion" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"DestWeather" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"DistanceKilometers" : {
|
||||
"type" : "float"
|
||||
},
|
||||
"DistanceMiles" : {
|
||||
"type" : "float"
|
||||
},
|
||||
"FlightDelay" : {
|
||||
"type" : "boolean"
|
||||
},
|
||||
"FlightDelayMin" : {
|
||||
"type" : "integer"
|
||||
},
|
||||
"FlightDelayType" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"FlightNum" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"FlightTimeHour" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"FlightTimeMin" : {
|
||||
"type" : "float"
|
||||
},
|
||||
"Origin" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"OriginAirportID" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"OriginCityName" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"OriginCountry" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"OriginLocation" : {
|
||||
"type" : "geo_point"
|
||||
},
|
||||
"OriginRegion" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"OriginWeather" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"dayOfWeek" : {
|
||||
"type" : "integer"
|
||||
},
|
||||
"timestamp" : {
|
||||
"type" : "date"
|
||||
}
|
||||
}
|
||||
} }
|
||||
FLIGHTS_FILE_NAME = ROOT_DIR + '/flights.json.gz'
|
||||
|
||||
ECOMMERCE_INDEX_NAME = 'ecommerce'
|
||||
ECOMMERCE_MAPPING = { "mappings" : {
|
||||
"properties" : {
|
||||
"category" : {
|
||||
"type" : "text",
|
||||
"fields" : {
|
||||
"keyword" : {
|
||||
"type" : "keyword"
|
||||
}
|
||||
}
|
||||
},
|
||||
"currency" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"customer_birth_date" : {
|
||||
"type" : "date"
|
||||
},
|
||||
"customer_first_name" : {
|
||||
"type" : "text",
|
||||
"fields" : {
|
||||
"keyword" : {
|
||||
"type" : "keyword",
|
||||
"ignore_above" : 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"customer_full_name" : {
|
||||
"type" : "text",
|
||||
"fields" : {
|
||||
"keyword" : {
|
||||
"type" : "keyword",
|
||||
"ignore_above" : 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"customer_gender" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"customer_id" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"customer_last_name" : {
|
||||
"type" : "text",
|
||||
"fields" : {
|
||||
"keyword" : {
|
||||
"type" : "keyword",
|
||||
"ignore_above" : 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"customer_phone" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"day_of_week" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"day_of_week_i" : {
|
||||
"type" : "integer"
|
||||
},
|
||||
"email" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"geoip" : {
|
||||
"properties" : {
|
||||
"city_name" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"continent_name" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"country_iso_code" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"location" : {
|
||||
"type" : "geo_point"
|
||||
},
|
||||
"region_name" : {
|
||||
"type" : "keyword"
|
||||
}
|
||||
}
|
||||
},
|
||||
"manufacturer" : {
|
||||
"type" : "text",
|
||||
"fields" : {
|
||||
"keyword" : {
|
||||
"type" : "keyword"
|
||||
}
|
||||
}
|
||||
},
|
||||
"order_date" : {
|
||||
"type" : "date"
|
||||
},
|
||||
"order_id" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"products" : {
|
||||
"properties" : {
|
||||
"_id" : {
|
||||
"type" : "text",
|
||||
"fields" : {
|
||||
"keyword" : {
|
||||
"type" : "keyword",
|
||||
"ignore_above" : 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"base_price" : {
|
||||
"type" : "half_float"
|
||||
},
|
||||
"base_unit_price" : {
|
||||
"type" : "half_float"
|
||||
},
|
||||
"category" : {
|
||||
"type" : "text",
|
||||
"fields" : {
|
||||
"keyword" : {
|
||||
"type" : "keyword"
|
||||
}
|
||||
}
|
||||
},
|
||||
"created_on" : {
|
||||
"type" : "date"
|
||||
},
|
||||
"discount_amount" : {
|
||||
"type" : "half_float"
|
||||
},
|
||||
"discount_percentage" : {
|
||||
"type" : "half_float"
|
||||
},
|
||||
"manufacturer" : {
|
||||
"type" : "text",
|
||||
"fields" : {
|
||||
"keyword" : {
|
||||
"type" : "keyword"
|
||||
}
|
||||
}
|
||||
},
|
||||
"min_price" : {
|
||||
"type" : "half_float"
|
||||
},
|
||||
"price" : {
|
||||
"type" : "half_float"
|
||||
},
|
||||
"product_id" : {
|
||||
"type" : "long"
|
||||
},
|
||||
"product_name" : {
|
||||
"type" : "text",
|
||||
"fields" : {
|
||||
"keyword" : {
|
||||
"type" : "keyword"
|
||||
}
|
||||
},
|
||||
"analyzer" : "english"
|
||||
},
|
||||
"quantity" : {
|
||||
"type" : "integer"
|
||||
},
|
||||
"sku" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"tax_amount" : {
|
||||
"type" : "half_float"
|
||||
},
|
||||
"taxful_price" : {
|
||||
"type" : "half_float"
|
||||
},
|
||||
"taxless_price" : {
|
||||
"type" : "half_float"
|
||||
},
|
||||
"unit_discount_amount" : {
|
||||
"type" : "half_float"
|
||||
}
|
||||
}
|
||||
},
|
||||
"sku" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"taxful_total_price" : {
|
||||
"type" : "half_float"
|
||||
},
|
||||
"taxless_total_price" : {
|
||||
"type" : "half_float"
|
||||
},
|
||||
"total_quantity" : {
|
||||
"type" : "integer"
|
||||
},
|
||||
"total_unique_products" : {
|
||||
"type" : "integer"
|
||||
},
|
||||
"type" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"user" : {
|
||||
"type" : "keyword"
|
||||
}
|
||||
}
|
||||
} }
|
||||
ECOMMERCE_FILE_NAME = ROOT_DIR + '/ecommerce.json.gz'
|
||||
ECOMMERCE_DF_FILE_NAME = ROOT_DIR + '/ecommerce_df.json.gz'
|
||||
ECOMMERCE_DATETIME_FIELD = 'order_date'
|
||||
|
||||
TEST_MAPPING1 = {
|
||||
'mappings': {
|
||||
@ -115,9 +399,6 @@ TEST_MAPPING1 = {
|
||||
'dest_location': {
|
||||
'type': 'geo_point'
|
||||
},
|
||||
'user': {
|
||||
'type': 'nested'
|
||||
},
|
||||
'my_join_field': {
|
||||
'type': 'join',
|
||||
'relations': {
|
||||
@ -153,8 +434,48 @@ TEST_MAPPING1_EXPECTED = {
|
||||
'text.english': 'text',
|
||||
'tweeted_at': 'date',
|
||||
'type': 'keyword',
|
||||
'user': 'nested',
|
||||
'user_name': 'keyword'
|
||||
}
|
||||
|
||||
TEST_MAPPING1_EXPECTED_DF = pd.DataFrame.from_dict(data=TEST_MAPPING1_EXPECTED, orient='index', columns=['datatype'])
|
||||
TEST_MAPPING1_EXPECTED_DF = pd.DataFrame.from_dict(data=TEST_MAPPING1_EXPECTED, orient='index', columns=['es_dtype'])
|
||||
|
||||
TEST_NESTED_USER_GROUP_INDEX_NAME = 'nested_user_group'
|
||||
TEST_NESTED_USER_GROUP_MAPPING = {
|
||||
'mappings': {
|
||||
'properties': {
|
||||
'group': {
|
||||
'type': 'keyword'
|
||||
},
|
||||
'user': {
|
||||
'properties': {
|
||||
'first': {
|
||||
'type': 'keyword'
|
||||
},
|
||||
'last': {
|
||||
'type': 'keyword'
|
||||
},
|
||||
'address' : {
|
||||
'type' : 'keyword'
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_NESTED_USER_GROUP_DOCS = [
|
||||
{'_index':TEST_NESTED_USER_GROUP_INDEX_NAME,
|
||||
'_source':
|
||||
{'group':'amsterdam','user':[
|
||||
{'first':'Manke','last':'Nelis','address':['Elandsgracht', 'Amsterdam']},
|
||||
{'first':'Johnny','last':'Jordaan','address':['Elandsstraat', 'Amsterdam']}]}},
|
||||
{'_index':TEST_NESTED_USER_GROUP_INDEX_NAME,
|
||||
'_source':
|
||||
{'group':'london','user':[
|
||||
{'first':'Alice','last':'Monkton'},
|
||||
{'first':'Jimmy','last':'White','address':['London']}]}},
|
||||
{'_index':TEST_NESTED_USER_GROUP_INDEX_NAME,
|
||||
'_source':{'group':'new york','user':[
|
||||
{'first':'Bill','last':'Jones'}]}}
|
||||
]
|
||||
|
||||
|
0
eland/tests/client/__init__.py
Normal file
0
eland/tests/client/__init__.py
Normal file
20
eland/tests/client/test_mappings_pytest.py
Normal file
20
eland/tests/client/test_mappings_pytest.py
Normal file
@ -0,0 +1,20 @@
|
||||
# File called _pytest for PyCharm compatability
|
||||
import pytest
|
||||
|
||||
from eland.tests import *
|
||||
|
||||
from pandas.util.testing import (
|
||||
assert_almost_equal, assert_frame_equal, assert_series_equal)
|
||||
|
||||
import eland as ed
|
||||
|
||||
class TestMapping():
|
||||
|
||||
# Requires 'setup_tests.py' to be run prior to this
|
||||
def test_mapping(self):
|
||||
mapping = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME)
|
||||
|
||||
assert mapping.all_fields() == TEST_MAPPING1_EXPECTED_DF.index.tolist()
|
||||
|
||||
assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mapping.mappings_capabilities['es_dtype']))
|
||||
|
BIN
eland/tests/ecommerce_df.json.gz
Normal file
BIN
eland/tests/ecommerce_df.json.gz
Normal file
Binary file not shown.
@ -10,12 +10,15 @@ ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
# Create pandas and eland data frames
|
||||
from eland.tests import ELASTICSEARCH_HOST
|
||||
from eland.tests import FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME
|
||||
from eland.tests import FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, ECOMMERCE_DF_FILE_NAME, ECOMMERCE_INDEX_NAME, \
|
||||
ECOMMERCE_DATETIME_FIELD
|
||||
|
||||
_pd_flights = pd.read_json(FLIGHTS_FILE_NAME, lines=True)
|
||||
_ed_flights = ed.read_es(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME)
|
||||
|
||||
_pd_ecommerce = pd.read_json(ECOMMERCE_FILE_NAME, lines=True)
|
||||
_pd_ecommerce = pd.read_json(ECOMMERCE_DF_FILE_NAME).sort_index()
|
||||
_pd_ecommerce[ECOMMERCE_DATETIME_FIELD] = \
|
||||
pd.to_datetime(_pd_ecommerce[ECOMMERCE_DATETIME_FIELD])
|
||||
_ed_ecommerce = ed.read_es(ELASTICSEARCH_HOST, ECOMMERCE_INDEX_NAME)
|
||||
|
||||
class TestData:
|
||||
|
@ -1,6 +1,5 @@
|
||||
# File called _pytest for PyCharm compatability
|
||||
from eland.tests.frame.common import TestData
|
||||
|
||||
from eland.tests import *
|
||||
|
||||
import eland as ed
|
||||
@ -11,18 +10,42 @@ from pandas.util.testing import (
|
||||
|
||||
class TestDataFrameIndexing(TestData):
|
||||
|
||||
def test_results(self):
|
||||
test = ed.read_es(ELASTICSEARCH_HOST, TEST_NESTED_USER_GROUP_INDEX_NAME)
|
||||
|
||||
print(test.mappings.mappings_capabilities)
|
||||
|
||||
pd.set_option('display.max_rows', 500)
|
||||
pd.set_option('display.max_columns', 500)
|
||||
pd.set_option('display.width', 1000)
|
||||
|
||||
print(test.head())
|
||||
|
||||
|
||||
def test_head(self):
|
||||
pd.set_option('display.max_rows', 500)
|
||||
pd.set_option('display.max_columns', 500)
|
||||
pd.set_option('display.width', 1000)
|
||||
|
||||
"""
|
||||
pd_flights_head = self.pd_flights().head()
|
||||
ed_flights_head = self.ed_flights().head()
|
||||
|
||||
assert_frame_equal(pd_flights_head, ed_flights_head)
|
||||
"""
|
||||
|
||||
pd_ecommerce_head = self.pd_ecommerce().head()
|
||||
ed_ecommerce_head = self.ed_ecommerce().head()
|
||||
|
||||
print(self.ed_ecommerce().mappings.source_fields_pd_dtypes())
|
||||
|
||||
print(ed_ecommerce_head.dtypes)
|
||||
print(pd_ecommerce_head.dtypes)
|
||||
|
||||
#print(ed_ecommerce_head)
|
||||
|
||||
assert_frame_equal(pd_ecommerce_head, ed_ecommerce_head)
|
||||
|
||||
def test_mappings(self):
|
||||
test_mapping1 = ed.read_es(ELASTICSEARCH_HOST, TEST_MAPPING1_INDEX_NAME)
|
||||
assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, test_mapping1.fields)
|
||||
|
||||
|
@ -5,8 +5,8 @@ from elasticsearch import helpers
|
||||
from eland.tests import *
|
||||
|
||||
DATA_LIST = [
|
||||
(FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME),
|
||||
(ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME)
|
||||
(FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, FLIGHTS_MAPPING),
|
||||
(ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME, ECOMMERCE_MAPPING)
|
||||
]
|
||||
|
||||
def _setup_data(es):
|
||||
@ -14,10 +14,13 @@ def _setup_data(es):
|
||||
for data in DATA_LIST:
|
||||
json_file_name = data[0]
|
||||
index_name = data[1]
|
||||
mapping = data[2]
|
||||
|
||||
# Delete index
|
||||
print("Deleting index:", index_name)
|
||||
es.indices.delete(index=index_name, ignore=[400, 404])
|
||||
print("Creating index:", index_name)
|
||||
es.indices.create(index=index_name, body=mapping)
|
||||
|
||||
df = pd.read_json(json_file_name, lines=True)
|
||||
|
||||
@ -50,9 +53,16 @@ def _setup_test_mappings(es):
|
||||
es.indices.delete(index=TEST_MAPPING1_INDEX_NAME, ignore=[400, 404])
|
||||
es.indices.create(index=TEST_MAPPING1_INDEX_NAME, body=TEST_MAPPING1)
|
||||
|
||||
def _setup_test_nested(es):
|
||||
es.indices.delete(index=TEST_NESTED_USER_GROUP_INDEX_NAME, ignore=[400, 404])
|
||||
es.indices.create(index=TEST_NESTED_USER_GROUP_INDEX_NAME, body=TEST_NESTED_USER_GROUP_MAPPING)
|
||||
|
||||
helpers.bulk(es, TEST_NESTED_USER_GROUP_DOCS)
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Create connection to Elasticsearch - use defaults
|
||||
es = Elasticsearch(ELASTICSEARCH_HOST)
|
||||
|
||||
_setup_data(es)
|
||||
_setup_test_mappings(es)
|
||||
_setup_test_nested(es)
|
||||
|
Loading…
x
Reference in New Issue
Block a user