Merge pull request #8 from stevedodson/master

head() and describe() working + major refactor and tests
This commit is contained in:
stevedodson 2019-06-21 16:42:26 +02:00 committed by GitHub
commit 956678053b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1257 additions and 175 deletions

View File

@ -1,108 +0,0 @@
import eland
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
import json
import pandas as pd
class DataFrame():
def __init__(self, client, index_pattern):
self.client = eland.Client(client)
self.index_pattern = index_pattern
self.client.indices().exists(index_pattern)
@staticmethod
def _flatten_results(prefix, results, result):
# TODO
return prefix
@staticmethod
def _es_results_to_pandas(results):
# TODO - resolve nested fields
rows = []
for hit in results['hits']['hits']:
row = hit['_source']
rows.append(row)
#return pd.DataFrame(data=rows)
# Converting the list of dicts to a dataframe doesn't convert datetimes
# effectively compared to read_json. TODO - https://github.com/elastic/eland/issues/2
json_rows = json.dumps(rows)
return pd.read_json(json_rows)
@staticmethod
def _flatten_mapping(prefix, properties, result):
for k, v in properties.items():
if 'properties' in v:
if(prefix == ''):
prefix = k
else:
prefix = prefix + '.' + k
DataFrame._flatten_mapping(prefix, v['properties'], result)
else:
if(prefix == ''):
key = k
else:
key = prefix + '.' + k
type = v['type']
result.append((key, type))
@staticmethod
def _es_mappings_to_pandas(mappings):
fields = []
for index in mappings:
if 'properties' in mappings[index]['mappings']:
properties = mappings[index]['mappings']['properties']
DataFrame._flatten_mapping('', properties, fields)
return pd.DataFrame(data=fields, columns=['field', 'datatype'])
def head(self, n=5):
results = self.client.search(index=self.index_pattern, size=n)
return DataFrame._es_results_to_pandas(results)
def describe(self):
# First get all types
#mapping = self.client.indices().get_mapping(index=self.index_pattern)
mapping = self.client.indices().get_mapping(index=self.index_pattern)
fields = DataFrame._es_mappings_to_pandas(mapping)
# Get numeric types (https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#the-where-method-and-masking)
# https://www.elastic.co/guide/en/elasticsearch/reference/current/number.html
# TODO refactor this list out of method
numeric_fields = fields.query('datatype == ["long", "integer", "short", "byte", "double", "float", "half_float", "scaled_float"]')
# for each field we copute:
# count, mean, std, min, 25%, 50%, 75%, max
search = Search(using=self.client, index=self.index_pattern).extra(size=0)
for field in numeric_fields.field:
search.aggs.metric('extended_stats_'+field, 'extended_stats', field=field)
search.aggs.metric('percentiles_'+field, 'percentiles', field=field)
response = search.execute()
results = pd.DataFrame(index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max'])
for field in numeric_fields.field:
values = []
values.append(response.aggregations['extended_stats_'+field]['count'])
values.append(response.aggregations['extended_stats_'+field]['avg'])
values.append(response.aggregations['extended_stats_'+field]['std_deviation'])
values.append(response.aggregations['extended_stats_'+field]['min'])
values.append(response.aggregations['percentiles_'+field]['values']['25.0'])
values.append(response.aggregations['percentiles_'+field]['values']['50.0'])
values.append(response.aggregations['percentiles_'+field]['values']['75.0'])
values.append(response.aggregations['extended_stats_'+field]['max'])
# if not None
if (values.count(None) < len(values)):
results = results.assign(**{field: values})
return results

View File

@ -1,3 +1,4 @@
from .utils import *
from .DataFrame import *
from .Client import *
from .frame import *
from .client import *
from .mappings import *

View File

@ -1,14 +1,15 @@
from elasticsearch import Elasticsearch
# eland client - implement as facade to control access to Elasticsearch methods
class Client(object):
class Client():
"""
eland client - implemented as facade to control access to Elasticsearch methods
"""
def __init__(self, es=None):
if isinstance(es, Elasticsearch):
self.es = es
else:
self.es = Elasticsearch(es)
def info(self):
return self.es.info()
@ -17,3 +18,6 @@ class Client(object):
def search(self, **kwargs):
return self.es.search(**kwargs)
def field_caps(self, **kwargs):
return self.es.field_caps(**kwargs)

253
eland/frame.py Normal file
View File

@ -0,0 +1,253 @@
"""
DataFrame
---------
An efficient 2D container for potentially mixed-type time series or other
labeled data series.
The underlying data resides in Elasticsearch and the API aligns as much as
possible with pandas.DataFrame API.
This allows the eland.DataFrame to access large datasets stored in Elasticsearch,
without storing the dataset in local memory.
Implementation Details
----------------------
Elasticsearch indexes can be configured in many different ways, and these indexes
utilise different data structures to pandas.DataFrame.
eland.DataFrame operations that return individual rows (e.g. df.head()) return
_source data. If _source is not enabled, this data is not accessible.
Similarly, only Elasticsearch searchable fields can be searched or filtered, and
only Elasticsearch aggregatable fields can be aggregated or grouped.
"""
import eland as ed
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
import pandas as pd
class DataFrame():
"""
pandas.DataFrame like API that proxies into Elasticsearch index(es).
Parameters
----------
client : eland.Client
A reference to a Elasticsearch python client
index_pattern : str
An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*).
See Also
--------
Examples
--------
import eland as ed
client = ed.Client(Elasticsearch())
df = ed.DataFrame(client, 'reviews')
df.head()
reviewerId vendorId rating date
0 0 0 5 2006-04-07 17:08
1 1 1 5 2006-05-04 12:16
2 2 2 4 2006-04-21 12:26
3 3 3 5 2006-04-18 15:48
4 3 4 5 2006-04-18 15:49
Notice that the types are based on Elasticsearch mappings
Notes
-----
If the Elasticsearch index is deleted or index mappings are changed after this
object is created, the object is not rebuilt and so inconsistencies can occur.
"""
def __init__(self, client, index_pattern):
self.client = ed.Client(client)
self.index_pattern = index_pattern
# Get and persist mappings, this allows us to correctly
# map returned types from Elasticsearch to pandas datatypes
self.mappings = ed.Mappings(self.client, self.index_pattern)
def _es_results_to_pandas(self, results):
"""
Parameters
----------
results: dict
Elasticsearch results from self.client.search
Returns
-------
df: pandas.DataFrame
_source values extracted from results and mapped to pandas DataFrame
dtypes are mapped via Mapping object
Notes
-----
Fields containing lists in Elasticsearch don't map easily to pandas.DataFrame
For example, an index with mapping:
```
"mappings" : {
"properties" : {
"group" : {
"type" : "keyword"
},
"user" : {
"type" : "nested",
"properties" : {
"first" : {
"type" : "keyword"
},
"last" : {
"type" : "keyword"
}
}
}
}
}
```
Adding a document:
```
"_source" : {
"group" : "amsterdam",
"user" : [
{
"first" : "John",
"last" : "Smith"
},
{
"first" : "Alice",
"last" : "White"
}
]
}
```
(https://www.elastic.co/guide/en/elasticsearch/reference/current/nested.html)
this would be transformed internally (in Elasticsearch) into a document that looks more like this:
```
{
"group" : "amsterdam",
"user.first" : [ "alice", "john" ],
"user.last" : [ "smith", "white" ]
}
```
When mapping this a pandas data frame we mimic this transformation.
Similarly, if a list is added to Elasticsearch:
```
PUT my_index/_doc/1
{
"list" : [
0, 1, 2
]
}
```
The mapping is:
```
"mappings" : {
"properties" : {
"user" : {
"type" : "long"
}
}
}
```
TODO - explain how lists are handled (https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html)
TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great)
NOTE - using this lists is generally not a good way to use this API
"""
def flatten_dict(y):
out = {}
def flatten(x, name=''):
# We flatten into source fields e.g. if type=geo_point
# location: {lat=52.38, lon=4.90}
if name == '':
is_source_field = False
pd_dtype = 'object'
else:
is_source_field, pd_dtype = self.mappings.is_source_field(name[:-1])
if not is_source_field and type(x) is dict:
for a in x:
flatten(x[a], name + a + '.')
elif not is_source_field and type(x) is list:
for a in x:
flatten(a, name)
else:
field_name = name[:-1]
# Coerce types - for now just datetime
if pd_dtype == 'datetime64[ns]':
x = pd.to_datetime(x)
# Elasticsearch can have multiple values for a field. These are represented as lists, so
# create lists for this pivot (see notes above)
if field_name in out:
if type(out[field_name]) is not list:
l = [out[field_name]]
out[field_name] = l
out[field_name].append(x)
else:
out[field_name] = x
flatten(y)
return out
rows = []
for hit in results['hits']['hits']:
row = hit['_source']
# flatten row to map correctly to 2D DataFrame
rows.append(flatten_dict(row))
# Create pandas DataFrame
df = pd.DataFrame(data=rows)
return df
def head(self, n=5):
results = self.client.search(index=self.index_pattern, size=n)
return self._es_results_to_pandas(results)
def describe(self):
numeric_source_fields = self.mappings.numeric_source_fields()
# for each field we compute:
# count, mean, std, min, 25%, 50%, 75%, max
search = Search(using=self.client, index=self.index_pattern).extra(size=0)
for field in numeric_source_fields:
search.aggs.metric('extended_stats_'+field, 'extended_stats', field=field)
search.aggs.metric('percentiles_'+field, 'percentiles', field=field)
response = search.execute()
results = {}
for field in numeric_source_fields:
values = []
values.append(response.aggregations['extended_stats_'+field]['count'])
values.append(response.aggregations['extended_stats_'+field]['avg'])
values.append(response.aggregations['extended_stats_'+field]['std_deviation'])
values.append(response.aggregations['extended_stats_'+field]['min'])
values.append(response.aggregations['percentiles_'+field]['values']['25.0'])
values.append(response.aggregations['percentiles_'+field]['values']['50.0'])
values.append(response.aggregations['percentiles_'+field]['values']['75.0'])
values.append(response.aggregations['extended_stats_'+field]['max'])
# if not None
if (values.count(None) < len(values)):
results[field] = values
df = pd.DataFrame(data=results, index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max'])
return df

300
eland/mappings.py Normal file
View File

@ -0,0 +1,300 @@
import warnings
import pandas as pd
class Mappings():
"""
General purpose to manage Elasticsearch to/from pandas mappings
Attributes
----------
mappings_capabilities: pandas.DataFrame
A data frame summarising the capabilities of the index mapping
_source - is top level field (i.e. not a multi-field sub-field)
es_dtype - Elasticsearch field datatype
pd_dtype - Pandas datatype
searchable - is the field searchable?
aggregatable- is the field aggregatable?
_source es_dtype pd_dtype searchable aggregatable
maps-telemetry.min True long int64 True True
maps-telemetry.avg True float float64 True True
city True text object True False
user_name True keyword object True True
origin_location.lat.keyword False keyword object True True
type True keyword object True True
origin_location.lat True text object True False
"""
def __init__(self, client, index_pattern):
"""
Parameters
----------
client: eland.Client
Elasticsearch client
index_pattern: str
Elasticsearch index pattern
"""
# persist index_pattern for debugging
self.index_pattern = index_pattern
mappings = client.indices().get_mapping(index=index_pattern)
# Get all fields (including all nested) and then field_caps
# for these names (fields=* doesn't appear to work effectively...)
all_fields = Mappings._extract_fields_from_mapping(mappings)
all_fields_caps = client.field_caps(index=index_pattern, fields=list(all_fields.keys()))
# Get top level (not sub-field multifield) mappings
source_fields = Mappings._extract_fields_from_mapping(mappings, source_only=True)
# Populate capability matrix of fields
# field_name, es_dtype, pd_dtype, is_searchable, is_aggregtable, is_source
self.mappings_capabilities = Mappings._create_capability_matrix(all_fields, source_fields, all_fields_caps)
# Cache source field types for efficient lookup
# (this massively improves performance of DataFrame.flatten)
self.source_field_pd_dtypes = {}
for field_name in source_fields:
pd_dtype = self.mappings_capabilities.loc[field_name]['pd_dtype']
self.source_field_pd_dtypes[field_name] = pd_dtype
def _extract_fields_from_mapping(mappings, source_only=False):
"""
Extract all field names and types from a mapping.
```
{
"my_index": {
"mappings": {
"properties": {
"city": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
}
}
```
if source_only == False:
return {'city': 'text', 'city.keyword': 'keyword'}
else:
return {'city': 'text'}
Note: first field name type wins. E.g.
```
PUT my_index1 {"mappings":{"properties":{"city":{"type":"text"}}}}
PUT my_index2 {"mappings":{"properties":{"city":{"type":"long"}}}}
Returns {'city': 'text'}
```
Parameters
----------
mappings: dict
Return from get_mapping
Returns
-------
fields: dict
Dict of field names and types
"""
fields = {}
# Recurse until we get a 'type: xxx'
def flatten(x, name=''):
if type(x) is dict:
for a in x:
if a == 'type' and type(x[a]) is str: # 'type' can be a name of a field
field_name = name[:-1]
field_type = x[a]
# If there is a conflicting type, warn - first values added wins
if field_name in fields and fields[field_name] != field_type:
warnings.warn("Field {} has conflicting types {} != {}".
format(field_name, fields[field_name], field_type),
UserWarning)
else:
fields[field_name] = field_type
elif a == 'properties' or (not source_only and a == 'fields'):
flatten(x[a], name)
elif not (source_only and a == 'fields'): # ignore multi-field fields for source_only
flatten(x[a], name + a + '.')
for index in mappings:
if 'properties' in mappings[index]['mappings']:
properties = mappings[index]['mappings']['properties']
flatten(properties)
return fields
def _create_capability_matrix(all_fields, source_fields, all_fields_caps):
"""
{
"fields": {
"rating": {
"long": {
"searchable": true,
"aggregatable": false,
"indices": ["index1", "index2"],
"non_aggregatable_indices": ["index1"]
},
"keyword": {
"searchable": false,
"aggregatable": true,
"indices": ["index3", "index4"],
"non_searchable_indices": ["index4"]
}
},
"title": {
"text": {
"searchable": true,
"aggregatable": false
}
}
}
}
"""
all_fields_caps_fields = all_fields_caps['fields']
columns = ['_source', 'es_dtype', 'pd_dtype', 'searchable', 'aggregatable']
capability_matrix = {}
for field, field_caps in all_fields_caps_fields.items():
if field in all_fields:
# v = {'long': {'type': 'long', 'searchable': True, 'aggregatable': True}}
for kk, vv in field_caps.items():
_source = (field in source_fields)
es_dtype = vv['type']
pd_dtype = Mappings._es_dtype_to_pd_dtype(vv['type'])
searchable = vv['searchable']
aggregatable = vv['aggregatable']
caps = [_source, es_dtype, pd_dtype, searchable, aggregatable]
capability_matrix[field] = caps
if 'non_aggregatable_indices' in vv:
warnings.warn("Field {} has conflicting aggregatable fields across indexes {}",
format(field_name, vv['non_aggregatable_indices']),
UserWarning)
if 'non_searchable_indices' in vv:
warnings.warn("Field {} has conflicting searchable fields across indexes {}",
format(field_name, vv['non_searchable_indices']),
UserWarning)
capability_matrix_df = pd.DataFrame.from_dict(capability_matrix, orient='index', columns=columns)
return capability_matrix_df.sort_index()
def _es_dtype_to_pd_dtype(es_dtype):
"""
Mapping Elasticsearch types to pandas dtypes
--------------------------------------------
Elasticsearch field datatype | Pandas dtype
--
text | object
keyword | object
long, integer, short, byte, binary | int64
double, float, half_float, scaled_float | float64
date, date_nanos | datetime64
boolean | bool
TODO - add additional mapping types
"""
es_dtype_to_pd_dtype = {
'text': 'object',
'keyword': 'object',
'long': 'int64',
'integer': 'int64',
'short': 'int64',
'byte': 'int64',
'binary': 'int64',
'double': 'float64',
'float': 'float64',
'half_float': 'float64',
'scaled_float': 'float64',
'date': 'datetime64[ns]',
'date_nanos': 'datetime64[ns]',
'boolean': 'bool'
}
if es_dtype in es_dtype_to_pd_dtype:
return es_dtype_to_pd_dtype[es_dtype]
# Return 'object' for all unsupported TODO - investigate how different types could be supported
return 'object'
def all_fields(self):
"""
Returns
-------
all_fields: list
All typed fields in the index mapping
"""
return self.mappings_capabilities.index.tolist()
"""
def pd_dtypes_groupby_source_fields(self):
Returns
-------
groups: dict
Calls pandas.core.groupby.GroupBy.groups for _source fields
E.g.
{
'bool': Index(['Cancelled', 'FlightDelay'], dtype='object'),
'datetime64[ns]': Index(['timestamp'], dtype='object'),
'float64': Index(['AvgTicketPrice', 'DistanceKilometers', 'DistanceMiles',...
}
return self.mappings_capabilities[self.mappings_capabilities._source == True].groupby('pd_dtype').groups
def pd_dtype
"""
def is_source_field(self, field_name):
"""
Parameters
----------
field_name: str
Returns
-------
is_source_field: bool
Is this field name a top-level source field?
pd_dtype: str
The pandas data type we map to
"""
pd_dtype = 'object'
is_source_field = False
if field_name in self.source_field_pd_dtypes:
is_source_field = True
pd_dtype = self.source_field_pd_dtypes[field_name]
return is_source_field, pd_dtype
def numeric_source_fields(self):
"""
Returns
-------
numeric_source_fields: list of str
List of source fields where pd_dtype == (int64 or float64)
"""
return self.mappings_capabilities[(self.mappings_capabilities._source == True) &
((self.mappings_capabilities.pd_dtype == 'int64') |
(self.mappings_capabilities.pd_dtype == 'float64'))].index.tolist()

View File

@ -1,4 +1,5 @@
import os
import pandas as pd
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
@ -6,7 +7,475 @@ ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
ELASTICSEARCH_HOST = 'localhost' # TODO externalise this
FLIGHTS_INDEX_NAME = 'flights'
FLIGHTS_MAPPING = { "mappings" : {
"properties" : {
"AvgTicketPrice" : {
"type" : "float"
},
"Cancelled" : {
"type" : "boolean"
},
"Carrier" : {
"type" : "keyword"
},
"Dest" : {
"type" : "keyword"
},
"DestAirportID" : {
"type" : "keyword"
},
"DestCityName" : {
"type" : "keyword"
},
"DestCountry" : {
"type" : "keyword"
},
"DestLocation" : {
"type" : "geo_point"
},
"DestRegion" : {
"type" : "keyword"
},
"DestWeather" : {
"type" : "keyword"
},
"DistanceKilometers" : {
"type" : "float"
},
"DistanceMiles" : {
"type" : "float"
},
"FlightDelay" : {
"type" : "boolean"
},
"FlightDelayMin" : {
"type" : "integer"
},
"FlightDelayType" : {
"type" : "keyword"
},
"FlightNum" : {
"type" : "keyword"
},
"FlightTimeHour" : {
"type" : "float"
},
"FlightTimeMin" : {
"type" : "float"
},
"Origin" : {
"type" : "keyword"
},
"OriginAirportID" : {
"type" : "keyword"
},
"OriginCityName" : {
"type" : "keyword"
},
"OriginCountry" : {
"type" : "keyword"
},
"OriginLocation" : {
"type" : "geo_point"
},
"OriginRegion" : {
"type" : "keyword"
},
"OriginWeather" : {
"type" : "keyword"
},
"dayOfWeek" : {
"type" : "integer"
},
"timestamp" : {
"type" : "date"
}
}
} }
FLIGHTS_FILE_NAME = ROOT_DIR + '/flights.json.gz'
FLIGHTS_DF_FILE_NAME = ROOT_DIR + '/flights_df.json.gz'
ECOMMERCE_INDEX_NAME = 'ecommerce'
ECOMMERCE_MAPPING = { "mappings" : {
"properties" : {
"category" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
},
"currency" : {
"type" : "keyword"
},
"customer_birth_date" : {
"type" : "date"
},
"customer_first_name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"customer_full_name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"customer_gender" : {
"type" : "keyword"
},
"customer_id" : {
"type" : "keyword"
},
"customer_last_name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"customer_phone" : {
"type" : "keyword"
},
"day_of_week" : {
"type" : "keyword"
},
"day_of_week_i" : {
"type" : "integer"
},
"email" : {
"type" : "keyword"
},
"geoip" : {
"properties" : {
"city_name" : {
"type" : "keyword"
},
"continent_name" : {
"type" : "keyword"
},
"country_iso_code" : {
"type" : "keyword"
},
"location" : {
"type" : "geo_point"
},
"region_name" : {
"type" : "keyword"
}
}
},
"manufacturer" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
},
"order_date" : {
"type" : "date"
},
"order_id" : {
"type" : "keyword"
},
"products" : {
"properties" : {
"_id" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"base_price" : {
"type" : "half_float"
},
"base_unit_price" : {
"type" : "half_float"
},
"category" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
},
"created_on" : {
"type" : "date"
},
"discount_amount" : {
"type" : "half_float"
},
"discount_percentage" : {
"type" : "half_float"
},
"manufacturer" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
},
"min_price" : {
"type" : "half_float"
},
"price" : {
"type" : "half_float"
},
"product_id" : {
"type" : "long"
},
"product_name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
},
"analyzer" : "english"
},
"quantity" : {
"type" : "integer"
},
"sku" : {
"type" : "keyword"
},
"tax_amount" : {
"type" : "half_float"
},
"taxful_price" : {
"type" : "half_float"
},
"taxless_price" : {
"type" : "half_float"
},
"unit_discount_amount" : {
"type" : "half_float"
}
}
},
"sku" : {
"type" : "keyword"
},
"taxful_total_price" : {
"type" : "half_float"
},
"taxless_total_price" : {
"type" : "half_float"
},
"total_quantity" : {
"type" : "integer"
},
"total_unique_products" : {
"type" : "integer"
},
"type" : {
"type" : "keyword"
},
"user" : {
"type" : "keyword"
}
}
} }
ECOMMERCE_FILE_NAME = ROOT_DIR + '/ecommerce.json.gz'
ECOMMERCE_DF_FILE_NAME = ROOT_DIR + '/ecommerce_df.json.gz'
TEST_MAPPING1 = {
'mappings': {
'properties': {
'city': {
'type': 'text',
'fields': {
'raw': {
'type': 'keyword'
}
}
},
'text': {
'type': 'text',
'fields': {
'english': {
'type': 'text',
'analyzer': 'english'
}
}
},
'origin_location': {
'properties': {
'lat': {
'type': 'text',
'index_prefixes': {},
'fields': {
'keyword': {
'type': 'keyword',
'ignore_above': 256
}
}
},
'lon': {
'type': 'text',
'fields': {
'keyword': {
'type': 'keyword',
'ignore_above': 256
}
}
}
}
},
'maps-telemetry': {
'properties': {
'attributesPerMap': {
'properties': {
'dataSourcesCount': {
'properties': {
'avg': {
'type': 'long'
},
'max': {
'type': 'long'
},
'min': {
'type': 'long'
}
}
},
'emsVectorLayersCount': {
'dynamic': 'true',
'properties': {
'france_departments': {
'properties': {
'avg': {
'type': 'float'
},
'max': {
'type': 'long'
},
'min': {
'type': 'long'
}
}
}
}
}
}
}
}
},
'type': {
'type': 'keyword'
},
'name': {
'type': 'text'
},
'user_name': {
'type': 'keyword'
},
'email': {
'type': 'keyword'
},
'content': {
'type': 'text'
},
'tweeted_at': {
'type': 'date'
},
'dest_location': {
'type': 'geo_point'
},
'my_join_field': {
'type': 'join',
'relations': {
'question': ['answer', 'comment'],
'answer': 'vote'
}
}
}
}
}
TEST_MAPPING1_INDEX_NAME = 'mapping1'
TEST_MAPPING1_EXPECTED = {
'city': 'text',
'city.raw': 'keyword',
'content': 'text',
'dest_location': 'geo_point',
'email': 'keyword',
'maps-telemetry.attributesPerMap.dataSourcesCount.avg': 'long',
'maps-telemetry.attributesPerMap.dataSourcesCount.max': 'long',
'maps-telemetry.attributesPerMap.dataSourcesCount.min': 'long',
'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.avg': 'float',
'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.max': 'long',
'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.min': 'long',
'my_join_field': 'join',
'name': 'text',
'origin_location.lat': 'text',
'origin_location.lat.keyword': 'keyword',
'origin_location.lon': 'text',
'origin_location.lon.keyword': 'keyword',
'text': 'text',
'text.english': 'text',
'tweeted_at': 'date',
'type': 'keyword',
'user_name': 'keyword'
}
TEST_MAPPING1_EXPECTED_DF = pd.DataFrame.from_dict(data=TEST_MAPPING1_EXPECTED, orient='index', columns=['es_dtype'])
TEST_NESTED_USER_GROUP_INDEX_NAME = 'nested_user_group'
TEST_NESTED_USER_GROUP_MAPPING = {
'mappings': {
'properties': {
'group': {
'type': 'keyword'
},
'user': {
'properties': {
'first': {
'type': 'keyword'
},
'last': {
'type': 'keyword'
},
'address' : {
'type' : 'keyword'
}
}
}
}
}
}
TEST_NESTED_USER_GROUP_DOCS = [
{'_index':TEST_NESTED_USER_GROUP_INDEX_NAME,
'_source':
{'group':'amsterdam','user':[
{'first':'Manke','last':'Nelis','address':['Elandsgracht', 'Amsterdam']},
{'first':'Johnny','last':'Jordaan','address':['Elandsstraat', 'Amsterdam']}]}},
{'_index':TEST_NESTED_USER_GROUP_INDEX_NAME,
'_source':
{'group':'london','user':[
{'first':'Alice','last':'Monkton'},
{'first':'Jimmy','last':'White','address':['London']}]}},
{'_index':TEST_NESTED_USER_GROUP_INDEX_NAME,
'_source':{'group':'new york','user':[
{'first':'Bill','last':'Jones'}]}}
]

View File

@ -0,0 +1,22 @@
# File called _pytest for PyCharm compatability
import pytest
from eland.tests import *
from pandas.util.testing import (
assert_almost_equal, assert_frame_equal, assert_series_equal)
import eland as ed
class TestMapping():
# Requires 'setup_tests.py' to be run prior to this
def test_mapping(self):
mapping = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME)
assert mapping.all_fields() == TEST_MAPPING1_EXPECTED_DF.index.tolist()
assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mapping.mappings_capabilities['es_dtype']))

View File

@ -1,24 +0,0 @@
# File called _pytest for PyCharm compatability
from eland.tests.dataframe.common import TestData
from pandas.util.testing import (
assert_almost_equal, assert_frame_equal, assert_series_equal)
class TestDataFrameIndexing(TestData):
def test_head(self):
pd_flights_head = self.pd_flights().head()
ed_flights_head = self.ed_flights().head()
assert_frame_equal(pd_flights_head, ed_flights_head)
pd_ecommerce_head = self.pd_ecommerce().head()
ed_ecommerce_head = self.ed_ecommerce().head()
#print(pd_ecommerce_head.dtypes)
#print(ed_ecommerce_head.dtypes)
assert_frame_equal(pd_ecommerce_head, ed_ecommerce_head)

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

View File

@ -10,12 +10,19 @@ ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
# Create pandas and eland data frames
from eland.tests import ELASTICSEARCH_HOST
from eland.tests import FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME
from eland.tests import FLIGHTS_DF_FILE_NAME, FLIGHTS_INDEX_NAME,\
ECOMMERCE_DF_FILE_NAME, ECOMMERCE_INDEX_NAME
_pd_flights = pd.read_json(FLIGHTS_FILE_NAME, lines=True)
_pd_flights = pd.read_json(FLIGHTS_DF_FILE_NAME).sort_index()
_pd_flights['timestamp'] = \
pd.to_datetime(_pd_flights['timestamp'])
_ed_flights = ed.read_es(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME)
_pd_ecommerce = pd.read_json(ECOMMERCE_FILE_NAME, lines=True)
_pd_ecommerce = pd.read_json(ECOMMERCE_DF_FILE_NAME).sort_index()
_pd_ecommerce['order_date'] = \
pd.to_datetime(_pd_ecommerce['order_date'])
_pd_ecommerce['products.created_on'] = \
_pd_ecommerce['products.created_on'].apply(lambda x: pd.to_datetime(x))
_ed_ecommerce = ed.read_es(ELASTICSEARCH_HOST, ECOMMERCE_INDEX_NAME)
class TestData:

View File

@ -0,0 +1,54 @@
# File called _pytest for PyCharm compatability
from eland.tests.frame.common import TestData
from eland.tests import *
import eland as ed
import pandas as pd
from pandas.util.testing import (
assert_almost_equal, assert_frame_equal, assert_series_equal)
class TestDataFrameIndexing(TestData):
def test_mapping(self):
ed_flights_mappings = pd.DataFrame(self.ed_flights().mappings.mappings_capabilities
[self.ed_flights().mappings.mappings_capabilities._source==True]
['pd_dtype'])
pd_flights_mappings = pd.DataFrame(self.pd_flights().dtypes, columns = ['pd_dtype'])
assert_frame_equal(pd_flights_mappings, ed_flights_mappings)
# We don't compare ecommerce here as the default dtypes in pandas from read_json
# don't match the mapping types. This is mainly because the products field is
# nested and so can be treated as a multi-field in ES, but not in pandas
def test_head(self):
pd_flights_head = self.pd_flights().head()
ed_flights_head = self.ed_flights().head()
assert_frame_equal(pd_flights_head, ed_flights_head)
pd_ecommerce_head = self.pd_ecommerce().head()
ed_ecommerce_head = self.ed_ecommerce().head()
assert_frame_equal(pd_ecommerce_head, ed_ecommerce_head)
def test_describe(self):
pd_flights_describe = self.pd_flights().describe()
ed_flights_describe = self.ed_flights().describe()
# TODO - this fails now as ES aggregations are approximate
# if ES percentile agg uses
# "hdr": {
# "number_of_significant_value_digits": 3
# }
# this works
#assert_almost_equal(pd_flights_describe, ed_flights_describe)
pd_ecommerce_describe = self.pd_ecommerce().describe()
ed_ecommerce_describe = self.ed_ecommerce().describe()
# We don't compare ecommerce here as the default dtypes in pandas from read_json
# don't match the mapping types. This is mainly because the products field is
# nested and so can be treated as a multi-field in ES, but not in pandas

View File

@ -2,27 +2,25 @@ import pandas as pd
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from eland.tests import FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME
from eland.tests import *
DATA_LIST = [
(FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME),
(ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME)
(FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, FLIGHTS_MAPPING),
(ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME, ECOMMERCE_MAPPING)
]
if __name__ == '__main__':
def _setup_data(es):
# Read json file and index records into Elasticsearch
for data in DATA_LIST:
json_file_name = data[0]
index_name = data[1]
# Create connection to Elasticsearch - use defaults1
es = Elasticsearch()
mapping = data[2]
# Delete index
print("Deleting index:", index_name)
es.indices.delete(index=index_name, ignore=[400, 404])
print("Creating index:", index_name)
es.indices.create(index=index_name, body=mapping)
df = pd.read_json(json_file_name, lines=True)
@ -49,3 +47,22 @@ if __name__ == '__main__':
actions = []
print("Done", index_name)
def _setup_test_mappings(es):
# Create a complex mapping containing many Elasticsearch features
es.indices.delete(index=TEST_MAPPING1_INDEX_NAME, ignore=[400, 404])
es.indices.create(index=TEST_MAPPING1_INDEX_NAME, body=TEST_MAPPING1)
def _setup_test_nested(es):
es.indices.delete(index=TEST_NESTED_USER_GROUP_INDEX_NAME, ignore=[400, 404])
es.indices.create(index=TEST_NESTED_USER_GROUP_INDEX_NAME, body=TEST_NESTED_USER_GROUP_MAPPING)
helpers.bulk(es, TEST_NESTED_USER_GROUP_DOCS)
if __name__ == '__main__':
# Create connection to Elasticsearch - use defaults
es = Elasticsearch(ELASTICSEARCH_HOST)
_setup_data(es)
_setup_test_mappings(es)
_setup_test_nested(es)

View File

@ -10,7 +10,11 @@
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"metadata": {
"pycharm": {
"is_executing": false
}
},
"outputs": [],
"source": [
"import pandas as pd"
@ -442,7 +446,7 @@
"metadata": {},
"outputs": [],
"source": [
"ed_df = ed.read_es('localhost', 'kibana_sample_data_flights')"
"ed_df = ed.read_es('localhost', 'flights')"
]
},
{
@ -519,7 +523,7 @@
" <td>DE-HE</td>\n",
" <td>Sunny</td>\n",
" <td>0</td>\n",
" <td>2019-05-27T00:00:00</td>\n",
" <td>2018-01-01 00:00:00</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
@ -543,7 +547,7 @@
" <td>SE-BD</td>\n",
" <td>Clear</td>\n",
" <td>0</td>\n",
" <td>2019-05-27T18:27:00</td>\n",
" <td>2018-01-01 18:27:00</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
@ -567,7 +571,7 @@
" <td>IT-34</td>\n",
" <td>Rain</td>\n",
" <td>0</td>\n",
" <td>2019-05-27T17:11:14</td>\n",
" <td>2018-01-01 17:11:14</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
@ -591,7 +595,7 @@
" <td>IT-72</td>\n",
" <td>Thunder &amp; Lightning</td>\n",
" <td>0</td>\n",
" <td>2019-05-27T10:33:28</td>\n",
" <td>2018-01-01 10:33:28</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
@ -615,7 +619,7 @@
" <td>MX-DIF</td>\n",
" <td>Damaging Wind</td>\n",
" <td>0</td>\n",
" <td>2019-05-27T05:13:00</td>\n",
" <td>2018-01-01 05:13:00</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
@ -672,12 +676,12 @@
"3 {'lat': '40.886002', 'lon': '14.2908'} IT-72 \n",
"4 {'lat': '19.4363', 'lon': '-99.072098'} MX-DIF \n",
"\n",
" OriginWeather dayOfWeek timestamp \n",
"0 Sunny 0 2019-05-27T00:00:00 \n",
"1 Clear 0 2019-05-27T18:27:00 \n",
"2 Rain 0 2019-05-27T17:11:14 \n",
"3 Thunder & Lightning 0 2019-05-27T10:33:28 \n",
"4 Damaging Wind 0 2019-05-27T05:13:00 \n",
" OriginWeather dayOfWeek timestamp \n",
"0 Sunny 0 2018-01-01 00:00:00 \n",
"1 Clear 0 2018-01-01 18:27:00 \n",
"2 Rain 0 2018-01-01 17:11:14 \n",
"3 Thunder & Lightning 0 2018-01-01 10:33:28 \n",
"4 Damaging Wind 0 2018-01-01 05:13:00 \n",
"\n",
"[5 rows x 27 columns]"
]
@ -768,12 +772,12 @@
" <td>2470.545974</td>\n",
" <td>1535.126118</td>\n",
" <td>0.000000</td>\n",
" <td>252.064162</td>\n",
" <td>251.834931</td>\n",
" <td>1.000000</td>\n",
" </tr>\n",
" <tr>\n",
" <th>50%</th>\n",
" <td>640.387285</td>\n",
" <td>640.362667</td>\n",
" <td>7612.072403</td>\n",
" <td>4729.922470</td>\n",
" <td>0.000000</td>\n",
@ -782,12 +786,12 @@
" </tr>\n",
" <tr>\n",
" <th>75%</th>\n",
" <td>842.259390</td>\n",
" <td>9735.660463</td>\n",
" <td>6049.583389</td>\n",
" <td>15.000000</td>\n",
" <td>842.262193</td>\n",
" <td>9735.210895</td>\n",
" <td>6049.600045</td>\n",
" <td>12.521186</td>\n",
" <td>720.505705</td>\n",
" <td>4.068000</td>\n",
" <td>4.109848</td>\n",
" </tr>\n",
" <tr>\n",
" <th>max</th>\n",
@ -809,8 +813,8 @@
"std 266.386661 4578.263193 2844.800855 96.743006 \n",
"min 100.020531 0.000000 0.000000 0.000000 \n",
"25% 410.008918 2470.545974 1535.126118 0.000000 \n",
"50% 640.387285 7612.072403 4729.922470 0.000000 \n",
"75% 842.259390 9735.660463 6049.583389 15.000000 \n",
"50% 640.362667 7612.072403 4729.922470 0.000000 \n",
"75% 842.262193 9735.210895 6049.600045 12.521186 \n",
"max 1199.729004 19881.482422 12353.780273 360.000000 \n",
"\n",
" FlightTimeMin dayOfWeek \n",
@ -818,9 +822,9 @@
"mean 511.127842 2.835975 \n",
"std 334.741135 1.939365 \n",
"min 0.000000 0.000000 \n",
"25% 252.064162 1.000000 \n",
"25% 251.834931 1.000000 \n",
"50% 503.148975 3.000000 \n",
"75% 720.505705 4.068000 \n",
"75% 720.505705 4.109848 \n",
"max 1902.901978 6.000000 "
]
},
@ -832,6 +836,89 @@
"source": [
"ed_df.describe()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"d = {'col1': [1.2, 20], 'col2': [int(1), int(30)], 'col3': ['2019-02-01 03:04:05', '2018-02-01 01:03:04'], 'col4': ['2019-02-01 03:04:05', '2018-02-01 01:03:04']}\n",
"df = pd.DataFrame(data=d)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>col1</th>\n",
" <th>col2</th>\n",
" <th>col3</th>\n",
" <th>col4</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>1.2</td>\n",
" <td>1</td>\n",
" <td>2019-02-01 03:04:05</td>\n",
" <td>2019-02-01 03:04:05</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>20.0</td>\n",
" <td>30</td>\n",
" <td>2018-02-01 01:03:04</td>\n",
" <td>2018-02-01 01:03:04</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" col1 col2 col3 col4\n",
"0 1.2 1 2019-02-01 03:04:05 2019-02-01 03:04:05\n",
"1 20.0 30 2018-02-01 01:03:04 2018-02-01 01:03:04"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
@ -850,7 +937,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.8"
"version": "3.7.3"
}
},
"nbformat": 4,