Renaming modules and added mapping store

This commit is contained in:
Stephen Dodson 2019-06-18 11:48:56 +00:00
parent a3ea4db772
commit 674ac129e6
9 changed files with 379 additions and 123 deletions

View File

@ -1,108 +0,0 @@
import eland
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
import json
import pandas as pd
class DataFrame():
def __init__(self, client, index_pattern):
self.client = eland.Client(client)
self.index_pattern = index_pattern
self.client.indices().exists(index_pattern)
@staticmethod
def _flatten_results(prefix, results, result):
# TODO
return prefix
@staticmethod
def _es_results_to_pandas(results):
# TODO - resolve nested fields
rows = []
for hit in results['hits']['hits']:
row = hit['_source']
rows.append(row)
#return pd.DataFrame(data=rows)
# Converting the list of dicts to a dataframe doesn't convert datetimes
# effectively compared to read_json. TODO - https://github.com/elastic/eland/issues/2
json_rows = json.dumps(rows)
return pd.read_json(json_rows)
@staticmethod
def _flatten_mapping(prefix, properties, result):
for k, v in properties.items():
if 'properties' in v:
if(prefix == ''):
prefix = k
else:
prefix = prefix + '.' + k
DataFrame._flatten_mapping(prefix, v['properties'], result)
else:
if(prefix == ''):
key = k
else:
key = prefix + '.' + k
type = v['type']
result.append((key, type))
@staticmethod
def _es_mappings_to_pandas(mappings):
fields = []
for index in mappings:
if 'properties' in mappings[index]['mappings']:
properties = mappings[index]['mappings']['properties']
DataFrame._flatten_mapping('', properties, fields)
return pd.DataFrame(data=fields, columns=['field', 'datatype'])
def head(self, n=5):
results = self.client.search(index=self.index_pattern, size=n)
return DataFrame._es_results_to_pandas(results)
def describe(self):
# First get all types
#mapping = self.client.indices().get_mapping(index=self.index_pattern)
mapping = self.client.indices().get_mapping(index=self.index_pattern)
fields = DataFrame._es_mappings_to_pandas(mapping)
# Get numeric types (https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#the-where-method-and-masking)
# https://www.elastic.co/guide/en/elasticsearch/reference/current/number.html
# TODO refactor this list out of method
numeric_fields = fields.query('datatype == ["long", "integer", "short", "byte", "double", "float", "half_float", "scaled_float"]')
# for each field we copute:
# count, mean, std, min, 25%, 50%, 75%, max
search = Search(using=self.client, index=self.index_pattern).extra(size=0)
for field in numeric_fields.field:
search.aggs.metric('extended_stats_'+field, 'extended_stats', field=field)
search.aggs.metric('percentiles_'+field, 'percentiles', field=field)
response = search.execute()
results = pd.DataFrame(index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max'])
for field in numeric_fields.field:
values = []
values.append(response.aggregations['extended_stats_'+field]['count'])
values.append(response.aggregations['extended_stats_'+field]['avg'])
values.append(response.aggregations['extended_stats_'+field]['std_deviation'])
values.append(response.aggregations['extended_stats_'+field]['min'])
values.append(response.aggregations['percentiles_'+field]['values']['25.0'])
values.append(response.aggregations['percentiles_'+field]['values']['50.0'])
values.append(response.aggregations['percentiles_'+field]['values']['75.0'])
values.append(response.aggregations['extended_stats_'+field]['max'])
# if not None
if (values.count(None) < len(values)):
results = results.assign(**{field: values})
return results

View File

@ -1,3 +1,3 @@
from .utils import * from .utils import *
from .DataFrame import * from .frame import *
from .Client import * from .client import *

View File

@ -8,7 +8,7 @@ class Client(object):
self.es = es self.es = es
else: else:
self.es = Elasticsearch(es) self.es = Elasticsearch(es)
def info(self): def info(self):
return self.es.info() return self.es.info()
@ -17,3 +17,7 @@ class Client(object):
def search(self, **kwargs): def search(self, **kwargs):
return self.es.search(**kwargs) return self.es.search(**kwargs)
def field_caps(self, **kwargs):
return self.es.field_caps(**kwargs)

201
eland/frame.py Normal file
View File

@ -0,0 +1,201 @@
"""
DataFrame
---------
An efficient 2D container for potentially mixed-type time series or other
labeled data series.
The underlying data resides in Elasticsearch and the API aligns as much as
possible with pandas.DataFrame API.
This allows the eland.DataFrame to access large datasets stored in Elasticsearch,
without storing the dataset in local memory.
Implementation Details
----------------------
Elasticsearch indexes can be configured in many different ways, and these indexes
utilise different data structures to pandas.DataFrame.
eland.DataFrame operations that return individual rows (e.g. df.head()) return
_source data. If _source is not enabled, this data is not accessible.
Similarly, only Elasticsearch searchable fields can be searched or filtered, and
only Elasticsearch aggregatable fields can be aggregated or grouped.
"""
import eland
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
import pandas as pd
class DataFrame():
"""
pandas.DataFrame like API that proxies into Elasticsearch index(es).
Parameters
----------
client : eland.Client
A reference to a Elasticsearch python client
index_pattern : str
An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*).
See Also
--------
Examples
--------
>>> import eland as ed
>>> client = ed.Client(Elasticsearch())
>>> df = ed.DataFrame(client, 'reviews')
>>> df.head()
reviewerId vendorId rating date
0 0 0 5 2006-04-07 17:08
1 1 1 5 2006-05-04 12:16
2 2 2 4 2006-04-21 12:26
3 3 3 5 2006-04-18 15:48
4 3 4 5 2006-04-18 15:49
Notice that the types are based on Elasticsearch mappings
Notes
-----
If the Elasticsearch index is deleted or index mappings are changed after this
object is created, the object is not rebuilt and so inconsistencies can occur.
Mapping Elasticsearch types to pandas dtypes
--------------------------------------------
Elasticsearch field datatype | Pandas dtype
--
text | object
keyword | object
long, integer, short, byte, binary | int64
double, float, half_float, scaled_float | float64
date, date_nanos | datetime64[ns]
boolean | bool
TODO - add additional mapping types
"""
def __init__(self, client, index_pattern):
self.client = eland.Client(client)
self.index_pattern = index_pattern
# Get and persist mappings, this allows use to correctly
# map returned types from Elasticsearch to pandas datatypes
mapping = self.client.indices().get_mapping(index=self.index_pattern)
#field_caps = self.client.field_caps(index=self.index_pattern, fields='*')
#self.fields, self.aggregatable_fields, self.searchable_fields = \
# DataFrame._es_mappings_to_pandas(mapping, field_caps)
self.fields = DataFrame._es_mappings_to_pandas(mapping)
@staticmethod
def _flatten_results(prefix, results, result):
# TODO
return prefix
def _es_results_to_pandas(self, results):
# TODO - resolve nested fields
rows = []
for hit in results['hits']['hits']:
row = hit['_source']
rows.append(row)
df = pd.DataFrame(data=rows)
return df
@staticmethod
def _extract_types_from_mapping(y):
"""
Extract data types from mapping for DataFrame columns.
Elasticsearch _source data is transformed into pandas DataFrames. This strategy is not compatible
with all Elasticsearch configurations. Notes:
- This strategy is not compatible with all Elasticsearch configurations. If _source is disabled
(https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-source-field.html#disable-source-field)
no data values will be populated
- Sub-fields (e.g. english.text in {"mappings":{"properties":{"text":{"type":"text","fields":{"english":{"type":"text","analyzer":"english"}}}}}})
are not be used
"""
out = {}
# Recurse until we get a 'type: xxx' - ignore sub-fields
def flatten(x, name=''):
if type(x) is dict:
for a in x:
if a == 'type' and type(x[a]) is str: # 'type' can be a name of a field
out[name[:-1]] = x[a]
if a == 'properties' or a == 'fields':
flatten(x[a], name)
else:
flatten(x[a], name + a + '.')
flatten(y)
return out
@staticmethod
def _es_mappings_to_pandas(mappings):
fields = {}
for index in mappings:
if 'properties' in mappings[index]['mappings']:
properties = mappings[index]['mappings']['properties']
datatypes = DataFrame._extract_types_from_mapping(properties)
# Note there could be conflicts here - e.g. the same field name with different semantics in
# different indexes - currently the last one wins TODO: review this
fields.update(datatypes)
return pd.DataFrame.from_dict(data=fields, orient='index', columns=['datatype'])
def head(self, n=5):
results = self.client.search(index=self.index_pattern, size=n)
return self._es_results_to_pandas(results)
def describe(self):
# First get all types
#mapping = self.client.indices().get_mapping(index=self.index_pattern)
mapping = self.client.indices().get_mapping(index=self.index_pattern)
fields = DataFrame._es_mappings_to_pandas(mapping)
# Get numeric types (https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#the-where-method-and-masking)
# https://www.elastic.co/guide/en/elasticsearch/reference/current/number.html
# TODO refactor this list out of method
numeric_fields = fields.query('datatype == ["long", "integer", "short", "byte", "double", "float", "half_float", "scaled_float"]')
# for each field we copute:
# count, mean, std, min, 25%, 50%, 75%, max
search = search(using=self.client, index=self.index_pattern).extra(size=0)
for field in numeric_fields.field:
search.aggs.metric('extended_stats_'+field, 'extended_stats', field=field)
search.aggs.metric('percentiles_'+field, 'percentiles', field=field)
response = search.execute()
results = pd.dataframe(index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max'])
for field in numeric_fields.field:
values = []
values.append(response.aggregations['extended_stats_'+field]['count'])
values.append(response.aggregations['extended_stats_'+field]['avg'])
values.append(response.aggregations['extended_stats_'+field]['std_deviation'])
values.append(response.aggregations['extended_stats_'+field]['min'])
values.append(response.aggregations['percentiles_'+field]['values']['25.0'])
values.append(response.aggregations['percentiles_'+field]['values']['50.0'])
values.append(response.aggregations['percentiles_'+field]['values']['75.0'])
values.append(response.aggregations['extended_stats_'+field]['max'])
# if not None
if (values.count(None) < len(values)):
results = results.assign(**{field: values})
return results

View File

@ -1,4 +1,5 @@
import os import os
import pandas as pd
ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
@ -10,3 +11,150 @@ FLIGHTS_FILE_NAME = ROOT_DIR + '/flights.json.gz'
ECOMMERCE_INDEX_NAME = 'ecommerce' ECOMMERCE_INDEX_NAME = 'ecommerce'
ECOMMERCE_FILE_NAME = ROOT_DIR + '/ecommerce.json.gz' ECOMMERCE_FILE_NAME = ROOT_DIR + '/ecommerce.json.gz'
TEST_MAPPING1 = {
'mappings': {
'properties': {
'city': {
'type': 'text',
'fields': {
'raw': {
'type': 'keyword'
}
}
},
'text': {
'type': 'text',
'fields': {
'english': {
'type': 'text',
'analyzer': 'english'
}
}
},
'origin_location': {
'properties': {
'lat': {
'type': 'text',
'index_prefixes': {},
'fields': {
'keyword': {
'type': 'keyword',
'ignore_above': 256
}
}
},
'lon': {
'type': 'text',
'fields': {
'keyword': {
'type': 'keyword',
'ignore_above': 256
}
}
}
}
},
'maps-telemetry': {
'properties': {
'attributesPerMap': {
'properties': {
'dataSourcesCount': {
'properties': {
'avg': {
'type': 'long'
},
'max': {
'type': 'long'
},
'min': {
'type': 'long'
}
}
},
'emsVectorLayersCount': {
'dynamic': 'true',
'properties': {
'france_departments': {
'properties': {
'avg': {
'type': 'float'
},
'max': {
'type': 'long'
},
'min': {
'type': 'long'
}
}
}
}
}
}
}
}
},
'type': {
'type': 'keyword'
},
'name': {
'type': 'text'
},
'user_name': {
'type': 'keyword'
},
'email': {
'type': 'keyword'
},
'content': {
'type': 'text'
},
'tweeted_at': {
'type': 'date'
},
'dest_location': {
'type': 'geo_point'
},
'user': {
'type': 'nested'
},
'my_join_field': {
'type': 'join',
'relations': {
'question': ['answer', 'comment'],
'answer': 'vote'
}
}
}
}
}
TEST_MAPPING1_INDEX_NAME = 'mapping1'
TEST_MAPPING1_EXPECTED = {
'city': 'text',
'city.raw': 'keyword',
'content': 'text',
'dest_location': 'geo_point',
'email': 'keyword',
'maps-telemetry.attributesPerMap.dataSourcesCount.avg': 'long',
'maps-telemetry.attributesPerMap.dataSourcesCount.max': 'long',
'maps-telemetry.attributesPerMap.dataSourcesCount.min': 'long',
'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.avg': 'float',
'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.max': 'long',
'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.min': 'long',
'my_join_field': 'join',
'name': 'text',
'origin_location.lat': 'text',
'origin_location.lat.keyword': 'keyword',
'origin_location.lon': 'text',
'origin_location.lon.keyword': 'keyword',
'text': 'text',
'text.english': 'text',
'tweeted_at': 'date',
'type': 'keyword',
'user': 'nested',
'user_name': 'keyword'
}
TEST_MAPPING1_EXPECTED_DF = pd.DataFrame.from_dict(data=TEST_MAPPING1_EXPECTED, orient='index', columns=['datatype'])

View File

@ -1,5 +1,10 @@
# File called _pytest for PyCharm compatability # File called _pytest for PyCharm compatability
from eland.tests.dataframe.common import TestData from eland.tests.frame.common import TestData
from eland.tests import *
import eland as ed
import pandas as pd
from pandas.util.testing import ( from pandas.util.testing import (
assert_almost_equal, assert_frame_equal, assert_series_equal) assert_almost_equal, assert_frame_equal, assert_series_equal)
@ -15,10 +20,9 @@ class TestDataFrameIndexing(TestData):
pd_ecommerce_head = self.pd_ecommerce().head() pd_ecommerce_head = self.pd_ecommerce().head()
ed_ecommerce_head = self.ed_ecommerce().head() ed_ecommerce_head = self.ed_ecommerce().head()
#print(pd_ecommerce_head.dtypes)
#print(ed_ecommerce_head.dtypes)
assert_frame_equal(pd_ecommerce_head, ed_ecommerce_head) assert_frame_equal(pd_ecommerce_head, ed_ecommerce_head)
def test_mappings(self):
test_mapping1 = ed.read_es(ELASTICSEARCH_HOST, TEST_MAPPING1_INDEX_NAME)
assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, test_mapping1.fields)

View File

@ -2,24 +2,19 @@ import pandas as pd
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
from elasticsearch import helpers from elasticsearch import helpers
from eland.tests import FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME from eland.tests import *
DATA_LIST = [ DATA_LIST = [
(FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME), (FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME),
(ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME) (ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME)
] ]
if __name__ == '__main__': def _setup_data(es):
# Read json file and index records into Elasticsearch # Read json file and index records into Elasticsearch
for data in DATA_LIST: for data in DATA_LIST:
json_file_name = data[0] json_file_name = data[0]
index_name = data[1] index_name = data[1]
# Create connection to Elasticsearch - use defaults1
es = Elasticsearch()
# Delete index # Delete index
print("Deleting index:", index_name) print("Deleting index:", index_name)
es.indices.delete(index=index_name, ignore=[400, 404]) es.indices.delete(index=index_name, ignore=[400, 404])
@ -49,3 +44,15 @@ if __name__ == '__main__':
actions = [] actions = []
print("Done", index_name) print("Done", index_name)
def _setup_test_mappings(es):
# Create a complex mapping containing many Elasticsearch features
es.indices.delete(index=TEST_MAPPING1_INDEX_NAME, ignore=[400, 404])
es.indices.create(index=TEST_MAPPING1_INDEX_NAME, body=TEST_MAPPING1)
if __name__ == '__main__':
# Create connection to Elasticsearch - use defaults
es = Elasticsearch(ELASTICSEARCH_HOST)
_setup_data(es)
_setup_test_mappings(es)