Merge pull request #13 from stevedodson/master

Adding eland.Index feature
This commit is contained in:
stevedodson 2019-06-28 16:47:59 +02:00 committed by GitHub
commit 99279724f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 5186 additions and 461 deletions

58
NOTES.md Normal file
View File

@ -0,0 +1,58 @@
# Implementation Notes
The goal of an `eland.DataFrame` is to enable users who are familiar with `pandas.DataFrame`
to access, explore and manipulate data that resides in Elasticsearch.
Ideally, all data should reside in Elasticsearch and not to reside in memory.
This restricts the API, but allows access to huge data sets that do not fit into memory, and allows
use of powerful Elasticsearch features such as aggrergations.
## Implementation Details
### 3rd Party System Access
Generally, integrations with [3rd party storage systems](https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html)
(SQL, Google Big Query etc.) involve accessing these systems and reading all external data into an
in-core pandas data structure. This also applies to [Apache Arrow](https://arrow.apache.org/docs/python/pandas.html)
structures.
Whilst this provides access to data in these systems, for large datasets this can require significant
in-core memory, and for systems such as Elasticsearch, bulk export of data can be an inefficient way
of exploring the data.
An alternative option is to create an API that proxies `pandas.DataFrame`-like calls to Elasticsearch
queries and operations. This could allow the Elasticsearch cluster to perform operations such as
aggregations rather than exporting all the data and performing this operation in-core.
### Implementation Options
An option would be to replace the `pandas.DataFrame` backend in-core memory structures with Elasticsearch
accessors. This would allow full access to the `pandas.DataFrame` APIs. However, this has issues:
* If a `pandas.DataFrame` instance maps to an index, typical manipulation of a `pandas.DataFrame`
may involve creating many derived `pandas.DataFrame` instances. Constructing an index per
`pandas.DataFrame` may result in many Elasticsearch indexes and a significant load on Elasticsearch.
For example, `df_a = df['a']` should not require Elasticsearch indices `df` and `df_a`
* Not all `pandas.DataFrame` APIs map to things we may want to do in Elasticsearch. In particular,
API calls that involve exporting all data from Elasticsearch into memory e.g. `df.to_dict()`.
* The backend `pandas.DataFrame` structures are not easily abstractable and are deeply embedded in
the implementation.
Another option is to create a `eland.DataFrame` API that mimics appropriate aspects of
the `pandas.DataFrame` API. This resolves some of the issues above as:
* `df_a = df['a']` could be implemented as a change to the Elasticsearch query used, rather
than a new index
* Instead of supporting the enitre `pandas.DataFrame` API we can support a subset appropriate for
Elasticsearch. If addition calls are required, we could to create a `eland.DataFrame.to_pandas()`
method which would explicitly export all data to a `pandas.DataFrame`
* Creating a new `eland.DataFrame` API gives us full flexibility in terms of implementation. However,
it does create a large amount of work which may duplicate a lot of the `pandas` code - for example,
printing objects etc. - this creates maintenance issues etc.

View File

@ -1,4 +1,5 @@
from .utils import * from .utils import *
from .frame import * from .dataframe import *
from .client import * from .client import *
from .mappings import * from .mappings import *
from .index import *

View File

@ -23,21 +23,19 @@ Similarly, only Elasticsearch searchable fields can be searched or filtered, and
only Elasticsearch aggregatable fields can be aggregated or grouped. only Elasticsearch aggregatable fields can be aggregated or grouped.
""" """
import eland as ed import sys
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
import pandas as pd import pandas as pd
from elasticsearch_dsl import Search
from pandas.core.arrays.sparse import BlockIndex from pandas.compat import StringIO
from pandas.core import common as com
from pandas.io.common import _expand_user, _stringify_path
from pandas.io.formats import format as fmt from pandas.io.formats import format as fmt
from pandas.io.formats.printing import pprint_thing from pandas.io.formats.printing import pprint_thing
from pandas.io.formats import console
from io import StringIO import eland as ed
import sys
class DataFrame(): class DataFrame():
""" """
@ -79,26 +77,24 @@ class DataFrame():
object is created, the object is not rebuilt and so inconsistencies can occur. object is created, the object is not rebuilt and so inconsistencies can occur.
""" """
def __init__(self, def __init__(self,
client, client,
index_pattern, index_pattern,
mappings=None, mappings=None,
operations=None): index_field=None):
self.client = ed.Client(client)
self.index_pattern = index_pattern self._client = ed.Client(client)
self._index_pattern = index_pattern
# Get and persist mappings, this allows us to correctly # Get and persist mappings, this allows us to correctly
# map returned types from Elasticsearch to pandas datatypes # map returned types from Elasticsearch to pandas datatypes
if mappings is None: if mappings is None:
self.mappings = ed.Mappings(self.client, self.index_pattern) self._mappings = ed.Mappings(self._client, self._index_pattern)
else: else:
self.mappings = mappings self._mappings = mappings
# Initialise a list of 'operations' self._index = ed.Index(index_field)
# these are filters
self.operations = []
if operations is not None:
self.operations.extend(operations)
def _es_results_to_pandas(self, results): def _es_results_to_pandas(self, results):
""" """
@ -187,6 +183,7 @@ class DataFrame():
TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great) TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great)
NOTE - using this lists is generally not a good way to use this API NOTE - using this lists is generally not a good way to use this API
""" """
def flatten_dict(y): def flatten_dict(y):
out = {} out = {}
@ -197,7 +194,7 @@ class DataFrame():
is_source_field = False is_source_field = False
pd_dtype = 'object' pd_dtype = 'object'
else: else:
is_source_field, pd_dtype = self.mappings.source_field_pd_dtype(name[:-1]) is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(name[:-1])
if not is_source_field and type(x) is dict: if not is_source_field and type(x) is dict:
for a in x: for a in x:
@ -205,7 +202,7 @@ class DataFrame():
elif not is_source_field and type(x) is list: elif not is_source_field and type(x) is list:
for a in x: for a in x:
flatten(a, name) flatten(a, name)
elif is_source_field == True: # only print source fields from mappings (TODO - not so efficient for large number of fields and filtered mapping) elif is_source_field == True: # only print source fields from mappings (TODO - not so efficient for large number of fields and filtered mapping)
field_name = name[:-1] field_name = name[:-1]
# Coerce types - for now just datetime # Coerce types - for now just datetime
@ -227,14 +224,22 @@ class DataFrame():
return out return out
rows = [] rows = []
index = []
for hit in results['hits']['hits']: for hit in results['hits']['hits']:
row = hit['_source'] row = hit['_source']
# get index value - can be _id or can be field value in source
if self._index.is_source_field:
index_field = row[self._index.index_field]
else:
index_field = hit[self._index.index_field]
index.append(index_field)
# flatten row to map correctly to 2D DataFrame # flatten row to map correctly to 2D DataFrame
rows.append(flatten_dict(row)) rows.append(flatten_dict(row))
# Create pandas DataFrame # Create pandas DataFrame
df = pd.DataFrame(data=rows) df = pd.DataFrame(data=rows, index=index)
# _source may not contain all columns in the mapping # _source may not contain all columns in the mapping
# therefore, fill in missing columns # therefore, fill in missing columns
@ -242,7 +247,7 @@ class DataFrame():
missing_columns = list(set(self.columns) - set(df.columns)) missing_columns = list(set(self.columns) - set(df.columns))
for missing in missing_columns: for missing in missing_columns:
is_source_field, pd_dtype = self.mappings.source_field_pd_dtype(missing) is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(missing)
df[missing] = None df[missing] = None
df[missing].astype(pd_dtype) df[missing].astype(pd_dtype)
@ -252,20 +257,32 @@ class DataFrame():
return df return df
def head(self, n=5): def head(self, n=5):
results = self.client.search(index=self.index_pattern, size=n) sort_params = self._index.sort_field + ":asc"
results = self._client.search(index=self._index_pattern, size=n, sort=sort_params)
return self._es_results_to_pandas(results) return self._es_results_to_pandas(results)
def tail(self, n=5):
sort_params = self._index.sort_field + ":desc"
results = self._client.search(index=self._index_pattern, size=n, sort=sort_params)
df = self._es_results_to_pandas(results)
# reverse order (index ascending)
return df.sort_index()
def describe(self): def describe(self):
numeric_source_fields = self.mappings.numeric_source_fields() numeric_source_fields = self._mappings.numeric_source_fields()
# for each field we compute: # for each field we compute:
# count, mean, std, min, 25%, 50%, 75%, max # count, mean, std, min, 25%, 50%, 75%, max
search = Search(using=self.client, index=self.index_pattern).extra(size=0) search = Search(using=self._client, index=self._index_pattern).extra(size=0)
for field in numeric_source_fields: for field in numeric_source_fields:
search.aggs.metric('extended_stats_'+field, 'extended_stats', field=field) search.aggs.metric('extended_stats_' + field, 'extended_stats', field=field)
search.aggs.metric('percentiles_'+field, 'percentiles', field=field) search.aggs.metric('percentiles_' + field, 'percentiles', field=field)
response = search.execute() response = search.execute()
@ -273,21 +290,21 @@ class DataFrame():
for field in numeric_source_fields: for field in numeric_source_fields:
values = [] values = []
values.append(response.aggregations['extended_stats_'+field]['count']) values.append(response.aggregations['extended_stats_' + field]['count'])
values.append(response.aggregations['extended_stats_'+field]['avg']) values.append(response.aggregations['extended_stats_' + field]['avg'])
values.append(response.aggregations['extended_stats_'+field]['std_deviation']) values.append(response.aggregations['extended_stats_' + field]['std_deviation'])
values.append(response.aggregations['extended_stats_'+field]['min']) values.append(response.aggregations['extended_stats_' + field]['min'])
values.append(response.aggregations['percentiles_'+field]['values']['25.0']) values.append(response.aggregations['percentiles_' + field]['values']['25.0'])
values.append(response.aggregations['percentiles_'+field]['values']['50.0']) values.append(response.aggregations['percentiles_' + field]['values']['50.0'])
values.append(response.aggregations['percentiles_'+field]['values']['75.0']) values.append(response.aggregations['percentiles_' + field]['values']['75.0'])
values.append(response.aggregations['extended_stats_'+field]['max']) values.append(response.aggregations['extended_stats_' + field]['max'])
# if not None # if not None
if (values.count(None) < len(values)): if (values.count(None) < len(values)):
results[field] = values results[field] = values
df = pd.DataFrame(data=results, index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max']) df = pd.DataFrame(data=results, index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max'])
return df return df
def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None, def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None,
@ -305,12 +322,10 @@ class DataFrame():
if buf is None: # pragma: no cover if buf is None: # pragma: no cover
buf = sys.stdout buf = sys.stdout
fake_df = self.__fake_dataframe__()
lines = [] lines = []
lines.append(str(type(self))) lines.append(str(type(self)))
lines.append(fake_df.index._summary()) lines.append(self.index_summary())
if len(self.columns) == 0: if len(self.columns) == 0:
lines.append('Empty {name}'.format(name=type(self).__name__)) lines.append('Empty {name}'.format(name=type(self).__name__))
@ -322,7 +337,7 @@ class DataFrame():
# hack # hack
if max_cols is None: if max_cols is None:
max_cols = pd.get_option('display.max_info_columns', max_cols = pd.get_option('display.max_info_columns',
len(self.columns) + 1) len(self.columns) + 1)
max_rows = pd.get_option('display.max_info_rows', len(self) + 1) max_rows = pd.get_option('display.max_info_rows', len(self) + 1)
@ -404,7 +419,6 @@ class DataFrame():
fmt.buffer_put_lines(buf, lines) fmt.buffer_put_lines(buf, lines)
@property @property
def shape(self): def shape(self):
""" """
@ -423,14 +437,32 @@ class DataFrame():
@property @property
def columns(self): def columns(self):
return pd.Index(self.mappings.source_fields()) return pd.Index(self._mappings.source_fields())
@property
def index(self):
return self._index
def set_index(self, index_field):
copy = self.copy()
copy._index = ed.Index(index_field)
return copy
def index_summary(self):
head = self.head(1).index[0]
tail = self.tail(1).index[0]
index_summary = ', %s to %s' % (pprint_thing(head),
pprint_thing(tail))
name = "Index"
return '%s: %s entries%s' % (name, len(self), index_summary)
@property @property
def dtypes(self): def dtypes(self):
return self.mappings.dtypes() return self._mappings.dtypes()
def get_dtype_counts(self): def get_dtype_counts(self):
return self.mappings.get_dtype_counts() return self._mappings.get_dtype_counts()
def count(self): def count(self):
""" """
@ -446,63 +478,155 @@ class DataFrame():
for a single document. for a single document.
""" """
counts = {} counts = {}
for field in self.mappings.source_fields(): for field in self._mappings.source_fields():
exists_query = {"query":{"exists":{"field":field}}} exists_query = {"query": {"exists": {"field": field}}}
field_exists_count = self.client.count(index=self.index_pattern, body=exists_query) field_exists_count = self._client.count(index=self._index_pattern, body=exists_query)
counts[field] = field_exists_count counts[field] = field_exists_count
count = pd.Series(data=counts, index=self.mappings.source_fields()) count = pd.Series(data=counts, index=self._mappings.source_fields())
return count return count
def index_count(self):
"""
Returns
-------
index_count: int
Count of docs where index_field exists
"""
exists_query = {"query": {"exists": {"field": self._index.index_field}}}
def __getitem__(self, item): index_count = self._client.count(index=self._index_pattern, body=exists_query)
# df['a'] -> item == str
# df['a', 'b'] -> item == (str, str) tuple return index_count
def _filter_by_columns(self, columns):
# Return new eland.DataFrame with modified mappings
mappings = ed.Mappings(mappings=self._mappings, columns=columns)
return DataFrame(self._client, self._index_pattern, mappings=mappings)
def __getitem__(self, key):
# NOTE: there is a difference between pandas here.
# e.g. df['a'] returns pd.Series, df[['a','b']] return pd.DataFrame
# we always return DataFrame - TODO maybe create eland.Series at some point...
# Implementation mainly copied from pandas v0.24.2
# (https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html)
key = com.apply_if_callable(key, self)
# TODO - add slice capabilities - need to add index features first
# e.g. set index etc.
# Do we have a slicer (on rows)?
"""
indexer = convert_to_index_sliceable(self, key)
if indexer is not None:
return self._slice(indexer, axis=0)
# Do we have a (boolean) DataFrame?
if isinstance(key, DataFrame):
return self._getitem_frame(key)
"""
# Do we have a (boolean) 1d indexer?
"""
if com.is_bool_indexer(key):
return self._getitem_bool_array(key)
"""
# We are left with two options: a single key, and a collection of keys,
columns = [] columns = []
if isinstance(item, str): if isinstance(key, str):
if not self.mappings.is_source_field(item): if not self._mappings.is_source_field(key):
raise TypeError('Column does not exist: [{0}]'.format(item)) raise TypeError('Column does not exist: [{0}]'.format(key))
columns.append(item) columns.append(key)
elif isinstance(item, tuple): elif isinstance(key, list):
columns.extend(list(item)) columns.extend(key)
elif isinstance(item, list):
columns.extend(item)
if len(columns) > 0:
# Return new eland.DataFrame with modified mappings
mappings = ed.Mappings(mappings=self.mappings, columns=columns)
return DataFrame(self.client, self.index_pattern, mappings=mappings)
"""
elif isinstance(item, BooleanFilter):
self._filter = item.build()
return self
else: else:
raise TypeError('Unsupported expr: [{0}]'.format(item)) raise TypeError('__getitem__ arguments invalid: [{0}]'.format(key))
"""
return self._filter_by_columns(columns)
def __len__(self): def __len__(self):
""" """
Returns length of info axis, but here we use the index. Returns length of info axis, but here we use the index.
""" """
return self.client.count(index=self.index_pattern) return self._client.count(index=self._index_pattern)
def copy(self):
# TODO - test and validate...may need deep copying
return ed.DataFrame(self._client,
self._index_pattern,
self._mappings,
self._index)
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
# Rendering Methods # Rendering Methods
def __repr__(self): def __repr__(self):
""" """
Return a string representation for a particular DataFrame. From pandas
""" """
return self.to_string() buf = StringIO()
max_rows = pd.get_option("display.max_rows")
max_cols = pd.get_option("display.max_columns")
show_dimensions = pd.get_option("display.show_dimensions")
if pd.get_option("display.expand_frame_repr"):
width, _ = console.get_console_size()
else:
width = None
self.to_string(buf=buf, max_rows=max_rows, max_cols=max_cols,
line_width=width, show_dimensions=show_dimensions)
return buf.getvalue()
def to_string(self, buf=None, columns=None, col_space=None, header=True,
index=True, na_rep='NaN', formatters=None, float_format=None,
sparsify=None, index_names=True, justify=None,
max_rows=None, max_cols=None, show_dimensions=True,
decimal='.', line_width=None):
"""
From pandas
"""
if max_rows == None:
max_rows = pd.get_option('display.max_rows')
sdf = self.__fake_dataframe__(max_rows=max_rows+1)
_show_dimensions = show_dimensions
if buf is not None:
_buf = _expand_user(_stringify_path(buf))
else:
_buf = StringIO()
sdf.to_string(buf=_buf, columns=columns,
col_space=col_space, na_rep=na_rep,
formatters=formatters,
float_format=float_format,
sparsify=sparsify, justify=justify,
index_names=index_names,
header=header, index=index,
max_rows=max_rows,
max_cols=max_cols,
show_dimensions=False, # print this outside of this call
decimal=decimal,
line_width=line_width)
if _show_dimensions:
_buf.write("\n\n[{nrows} rows x {ncols} columns]"
.format(nrows=self.index_count(), ncols=len(self.columns)))
if buf is None:
result = _buf.getvalue()
return result
def __fake_dataframe__(self, max_rows=1): def __fake_dataframe__(self, max_rows=1):
head_rows = max_rows / 2 + 1 head_rows = int(max_rows / 2) + max_rows % 2
tail_rows = max_rows - head_rows tail_rows = max_rows - head_rows
head = self.head(max_rows) head = self.head(head_rows)
tail = self.tail(tail_rows)
num_rows = len(self) num_rows = len(self)
@ -514,8 +638,9 @@ class DataFrame():
# to use the pandas IO methods. # to use the pandas IO methods.
# TODO - if data is indexed by time series, return top/bottom of # TODO - if data is indexed by time series, return top/bottom of
# time series, rather than first max_rows items # time series, rather than first max_rows items
"""
if tail_rows > 0: if tail_rows > 0:
locations = [0, num_rows-tail_rows] locations = [0, num_rows - tail_rows]
lengths = [head_rows, tail_rows] lengths = [head_rows, tail_rows]
else: else:
locations = [0] locations = [0]
@ -526,21 +651,13 @@ class DataFrame():
BlockIndex( BlockIndex(
num_rows, locations, lengths)) num_rows, locations, lengths))
for item in self.columns}) for item in self.columns})
"""
return sdf return pd.concat([head, tail])
return head
def to_string(self): return pd.concat([head, tail])
# TODO - this doesn't return 'notebook' friendly results yet..
# TODO - don't hard code max_rows - use pandas default/ES default
max_rows = 60
df = self.__fake_dataframe__(max_rows=max_rows)
return df.to_string(max_rows=max_rows, show_dimensions=True)
# From pandas.DataFrame
def _put_str(s, space): def _put_str(s, space):
return '{s}'.format(s=s)[:space].ljust(space) return '{s}'.format(s=s)[:space].ljust(space)

46
eland/index.py Normal file
View File

@ -0,0 +1,46 @@
"""
class Index
The index for an eland.DataFrame.
Currently, the index is a field that exists in every document in an Elasticsearch index.
For slicing and sorting operations it must be a docvalues field. By default _id is used,
which can't be used for range queries and is inefficient for sorting:
https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html
(The value of the _id field is also accessible in aggregations or for sorting,
but doing so is discouraged as it requires to load a lot of data in memory.
In case sorting or aggregating on the _id field is required, it is advised to duplicate
the content of the _id field in another field that has doc_values enabled.)
"""
class Index:
ID_INDEX_FIELD = '_id'
ID_SORT_FIELD = '_doc' # if index field is _id, sort by _doc
def __init__(self, index_field=None):
# Calls setter
self.index_field = index_field
@property
def sort_field(self):
if self._index_field == self.ID_INDEX_FIELD:
return self.ID_SORT_FIELD
return self._index_field
@property
def is_source_field(self):
return self._is_source_field
@property
def index_field(self):
return self._index_field
@index_field.setter
def index_field(self, index_field):
if index_field == None:
self._index_field = Index.ID_INDEX_FIELD
self._is_source_field = False
else:
self._index_field = index_field
self._is_source_field = True

View File

@ -64,18 +64,22 @@ class Mappings():
# Populate capability matrix of fields # Populate capability matrix of fields
# field_name, es_dtype, pd_dtype, is_searchable, is_aggregtable, is_source # field_name, es_dtype, pd_dtype, is_searchable, is_aggregtable, is_source
self.mappings_capabilities = Mappings._create_capability_matrix(all_fields, source_fields, all_fields_caps) self._mappings_capabilities = Mappings._create_capability_matrix(all_fields, source_fields, all_fields_caps)
else: else:
# Reference object and restrict mapping columns if columns is not None:
self.mappings_capabilities = mappings.mappings_capabilities.loc[columns] # Reference object and restrict mapping columns
self._mappings_capabilities = mappings._mappings_capabilities.loc[columns]
else:
# straight copy
self._mappings_capabilities = mappings._mappings_capabilities.copy()
# Cache source field types for efficient lookup # Cache source field types for efficient lookup
# (this massively improves performance of DataFrame.flatten) # (this massively improves performance of DataFrame.flatten)
self.source_field_pd_dtypes = {} self._source_field_pd_dtypes = {}
for field_name in self.mappings_capabilities[self.mappings_capabilities._source == True].index: for field_name in self._mappings_capabilities[self._mappings_capabilities._source == True].index:
pd_dtype = self.mappings_capabilities.loc[field_name]['pd_dtype'] pd_dtype = self._mappings_capabilities.loc[field_name]['pd_dtype']
self.source_field_pd_dtypes[field_name] = pd_dtype self._source_field_pd_dtypes[field_name] = pd_dtype
def _extract_fields_from_mapping(mappings, source_only=False): def _extract_fields_from_mapping(mappings, source_only=False):
""" """
@ -262,24 +266,29 @@ class Mappings():
all_fields: list all_fields: list
All typed fields in the index mapping All typed fields in the index mapping
""" """
return self.mappings_capabilities.index.tolist() return self._mappings_capabilities.index.tolist()
def field_capabilities(self, field_name):
"""
Parameters
----------
field_name: str
"""
def pd_dtypes_groupby_source_fields(self):
Returns Returns
------- -------
groups: dict mappings_capabilities: pd.Series with index values:
Calls pandas.core.groupby.GroupBy.groups for _source fields _source: bool
E.g. Is this field name a top-level source field?
{ ed_dtype: str
'bool': Index(['Cancelled', 'FlightDelay'], dtype='object'), The Elasticsearch data type
'datetime64[ns]': Index(['timestamp'], dtype='object'), pd_dtype: str
'float64': Index(['AvgTicketPrice', 'DistanceKilometers', 'DistanceMiles',... The pandas data type
} searchable: bool
return self.mappings_capabilities[self.mappings_capabilities._source == True].groupby('pd_dtype').groups Is the field searchable in Elasticsearch?
aggregatable: bool
def pd_dtype Is the field aggregatable in Elasticsearch?
""" """
return self._mappings_capabilities.loc[field_name]
def source_field_pd_dtype(self, field_name): def source_field_pd_dtype(self, field_name):
""" """
@ -297,9 +306,9 @@ class Mappings():
pd_dtype = 'object' pd_dtype = 'object'
is_source_field = False is_source_field = False
if field_name in self.source_field_pd_dtypes: if field_name in self._source_field_pd_dtypes:
is_source_field = True is_source_field = True
pd_dtype = self.source_field_pd_dtypes[field_name] pd_dtype = self._source_field_pd_dtypes[field_name]
return is_source_field, pd_dtype return is_source_field, pd_dtype
@ -316,7 +325,7 @@ class Mappings():
""" """
is_source_field = False is_source_field = False
if field_name in self.source_field_pd_dtypes: if field_name in self._source_field_pd_dtypes:
is_source_field = True is_source_field = True
return is_source_field return is_source_field
@ -328,9 +337,9 @@ class Mappings():
numeric_source_fields: list of str numeric_source_fields: list of str
List of source fields where pd_dtype == (int64 or float64) List of source fields where pd_dtype == (int64 or float64)
""" """
return self.mappings_capabilities[(self.mappings_capabilities._source == True) & return self._mappings_capabilities[(self._mappings_capabilities._source == True) &
((self.mappings_capabilities.pd_dtype == 'int64') | ((self._mappings_capabilities.pd_dtype == 'int64') |
(self.mappings_capabilities.pd_dtype == 'float64'))].index.tolist() (self._mappings_capabilities.pd_dtype == 'float64'))].index.tolist()
def source_fields(self): def source_fields(self):
""" """
@ -339,7 +348,7 @@ class Mappings():
source_fields: list of str source_fields: list of str
List of source fields List of source fields
""" """
return self.source_field_pd_dtypes.keys() return self._source_field_pd_dtypes.keys()
def count_source_fields(self): def count_source_fields(self):
""" """
@ -357,7 +366,7 @@ class Mappings():
dtypes: pd.Series dtypes: pd.Series
Source field name + pd_dtype Source field name + pd_dtype
""" """
return pd.Series(self.source_field_pd_dtypes) return pd.Series(self._source_field_pd_dtypes)
def get_dtype_counts(self): def get_dtype_counts(self):
""" """
@ -368,5 +377,5 @@ class Mappings():
get_dtype_counts : Series get_dtype_counts : Series
Series with the count of columns with each dtype. Series with the count of columns with each dtype.
""" """
return pd.Series(self.mappings_capabilities[self.mappings_capabilities._source == True].groupby('pd_dtype')[ return pd.Series(self._mappings_capabilities[self._mappings_capabilities._source == True].groupby('pd_dtype')[
'_source'].count().to_dict()) '_source'].count().to_dict())

View File

@ -69,3 +69,22 @@ class TestMapping(TestData):
expected_get_dtype_counts = pd.Series({'datetime64[ns]': 1, 'float64': 1, 'int64': 5, 'object': 11}) expected_get_dtype_counts = pd.Series({'datetime64[ns]': 1, 'float64': 1, 'int64': 5, 'object': 11})
assert_series_equal(expected_get_dtype_counts, mappings.get_dtype_counts()) assert_series_equal(expected_get_dtype_counts, mappings.get_dtype_counts())
def test_mapping_capabilities(self):
mappings = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME)
field_capabilities = mappings.field_capabilities('city')
assert True == field_capabilities['_source']
assert 'text' == field_capabilities['es_dtype']
assert 'object' == field_capabilities['pd_dtype']
assert True == field_capabilities['searchable']
assert False == field_capabilities['aggregatable']
field_capabilities = mappings.field_capabilities('city.raw')
assert False == field_capabilities['_source']
assert 'keyword' == field_capabilities['es_dtype']
assert 'object' == field_capabilities['pd_dtype']
assert True == field_capabilities['searchable']
assert True == field_capabilities['aggregatable']

View File

@ -16,6 +16,7 @@ from eland.tests import FLIGHTS_DF_FILE_NAME, FLIGHTS_INDEX_NAME,\
_pd_flights = pd.read_json(FLIGHTS_DF_FILE_NAME).sort_index() _pd_flights = pd.read_json(FLIGHTS_DF_FILE_NAME).sort_index()
_pd_flights['timestamp'] = \ _pd_flights['timestamp'] = \
pd.to_datetime(_pd_flights['timestamp']) pd.to_datetime(_pd_flights['timestamp'])
_pd_flights.index = _pd_flights.index.map(str) # make index 'object' not int
_ed_flights = ed.read_es(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME) _ed_flights = ed.read_es(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME)
_pd_ecommerce = pd.read_json(ECOMMERCE_DF_FILE_NAME).sort_index() _pd_ecommerce = pd.read_json(ECOMMERCE_DF_FILE_NAME).sort_index()
@ -24,6 +25,7 @@ _pd_ecommerce['order_date'] = \
_pd_ecommerce['products.created_on'] = \ _pd_ecommerce['products.created_on'] = \
_pd_ecommerce['products.created_on'].apply(lambda x: pd.to_datetime(x)) _pd_ecommerce['products.created_on'].apply(lambda x: pd.to_datetime(x))
_pd_ecommerce.insert(2, 'customer_birth_date', None) _pd_ecommerce.insert(2, 'customer_birth_date', None)
_pd_ecommerce.index = _pd_ecommerce.index.map(str) # make index 'object' not int
_pd_ecommerce['customer_birth_date'].astype('datetime64') _pd_ecommerce['customer_birth_date'].astype('datetime64')
_ed_ecommerce = ed.read_es(ELASTICSEARCH_HOST, ECOMMERCE_INDEX_NAME) _ed_ecommerce = ed.read_es(ELASTICSEARCH_HOST, ECOMMERCE_INDEX_NAME)

View File

@ -0,0 +1,63 @@
# File called _pytest for PyCharm compatability
from eland.tests.common import TestData
import pandas as pd
import io
from pandas.util.testing import (
assert_series_equal, assert_frame_equal)
class TestDataFrameGetItem(TestData):
def test_getitem_basic(self):
# Test 1 attribute
pd_carrier = self.pd_flights()['Carrier']
ed_carrier = self.ed_flights()['Carrier']
# pandas returns a Series here
assert_frame_equal(pd.DataFrame(pd_carrier.head(100)), ed_carrier.head(100))
pd_3_items = self.pd_flights()[['Dest','Carrier','FlightDelay']]
ed_3_items = self.ed_flights()[['Dest','Carrier','FlightDelay']]
assert_frame_equal(pd_3_items.head(100), ed_3_items.head(100))
# Test numerics
numerics = ['DistanceMiles', 'AvgTicketPrice', 'FlightTimeMin']
ed_numerics = self.ed_flights()[numerics]
pd_numerics = self.pd_flights()[numerics]
assert_frame_equal(pd_numerics.head(100), ed_numerics.head(100))
# just test headers
ed_numerics_describe = ed_numerics.describe()
assert ed_numerics_describe.columns.tolist() == numerics
def test_getattr_basic(self):
# Test 1 attribute
pd_carrier = self.pd_flights().Carrier
#ed_carrier = self.ed_flights().Carrier
print(type(pd_carrier))
print(pd_carrier)
def test_boolean(self):
# Test 1 attribute
pd_carrier = self.pd_flights()['Carrier == "Kibana Airlines"']
#ed_carrier = self.ed_flights().Carrier
print(type(pd_carrier))
print(pd_carrier)
def test_loc(self):
pd = self.pd_flights().loc[10:15, ['Dest', 'Carrier']]
print(type(pd))
print(pd)
pd = self.pd_flights().loc[10]
print(type(pd))
print(pd)

View File

@ -10,8 +10,8 @@ from pandas.util.testing import (
class TestDataFrameIndexing(TestData): class TestDataFrameIndexing(TestData):
def test_mapping(self): def test_mapping(self):
ed_flights_mappings = pd.DataFrame(self.ed_flights().mappings.mappings_capabilities ed_flights_mappings = pd.DataFrame(self.ed_flights()._mappings._mappings_capabilities
[self.ed_flights().mappings.mappings_capabilities._source==True] [self.ed_flights()._mappings._mappings_capabilities._source==True]
['pd_dtype']) ['pd_dtype'])
pd_flights_mappings = pd.DataFrame(self.pd_flights().dtypes, columns = ['pd_dtype']) pd_flights_mappings = pd.DataFrame(self.pd_flights().dtypes, columns = ['pd_dtype'])
@ -25,6 +25,8 @@ class TestDataFrameIndexing(TestData):
pd_flights_head = self.pd_flights().head() pd_flights_head = self.pd_flights().head()
ed_flights_head = self.ed_flights().head() ed_flights_head = self.ed_flights().head()
print(ed_flights_head)
assert_frame_equal(pd_flights_head, ed_flights_head) assert_frame_equal(pd_flights_head, ed_flights_head)
pd_ecommerce_head = self.pd_ecommerce().head() pd_ecommerce_head = self.pd_ecommerce().head()
@ -32,10 +34,25 @@ class TestDataFrameIndexing(TestData):
assert_frame_equal(pd_ecommerce_head, ed_ecommerce_head) assert_frame_equal(pd_ecommerce_head, ed_ecommerce_head)
def test_tail(self):
pd_flights_tail = self.pd_flights().tail()
ed_flights_tail = self.ed_flights().tail()
print(ed_flights_tail)
assert_frame_equal(pd_flights_tail, ed_flights_tail)
pd_ecommerce_tail = self.pd_ecommerce().tail()
ed_ecommerce_tail = self.ed_ecommerce().tail()
assert_frame_equal(pd_ecommerce_tail, ed_ecommerce_tail)
def test_describe(self): def test_describe(self):
pd_flights_describe = self.pd_flights().describe() pd_flights_describe = self.pd_flights().describe()
ed_flights_describe = self.ed_flights().describe() ed_flights_describe = self.ed_flights().describe()
print(ed_flights_describe)
# TODO - this fails now as ES aggregations are approximate # TODO - this fails now as ES aggregations are approximate
# if ES percentile agg uses # if ES percentile agg uses
# "hdr": { # "hdr": {
@ -47,6 +64,8 @@ class TestDataFrameIndexing(TestData):
pd_ecommerce_describe = self.pd_ecommerce().describe() pd_ecommerce_describe = self.pd_ecommerce().describe()
ed_ecommerce_describe = self.ed_ecommerce().describe() ed_ecommerce_describe = self.ed_ecommerce().describe()
print(ed_ecommerce_describe)
# We don't compare ecommerce here as the default dtypes in pandas from read_json # We don't compare ecommerce here as the default dtypes in pandas from read_json
# don't match the mapping types. This is mainly because the products field is # don't match the mapping types. This is mainly because the products field is
# nested and so can be treated as a multi-field in ES, but not in pandas # nested and so can be treated as a multi-field in ES, but not in pandas
@ -57,52 +76,7 @@ class TestDataFrameIndexing(TestData):
def test_to_string(self): def test_to_string(self):
print(self.ed_flights()) print(self.ed_flights())
print(self.ed_flights().to_string())
def test_getitem(self):
# Test 1 attribute
ed_carrier = self.ed_flights()['Carrier']
carrier_head = ed_carrier.head(5)
carrier_head_expected = pd.DataFrame(
{'Carrier':[
'Kibana Airlines',
'Logstash Airways',
'Logstash Airways',
'Kibana Airlines',
'Kibana Airlines'
]})
assert_frame_equal(carrier_head_expected, carrier_head)
#carrier_to_string = ed_carrier.to_string()
#print(carrier_to_string)
# Test multiple attributes (out of order)
ed_3_items = self.ed_flights()['Dest','Carrier','FlightDelay']
ed_3_items_head = ed_3_items.head(5)
ed_3_items_expected = pd.DataFrame(dict(
Dest={0: 'Sydney Kingsford Smith International Airport', 1: 'Venice Marco Polo Airport',
2: 'Venice Marco Polo Airport', 3: "Treviso-Sant'Angelo Airport",
4: "Xi'an Xianyang International Airport"},
Carrier={0: 'Kibana Airlines', 1: 'Logstash Airways', 2: 'Logstash Airways', 3: 'Kibana Airlines',
4: 'Kibana Airlines'},
FlightDelay={0: False, 1: False, 2: False, 3: True, 4: False}))
assert_frame_equal(ed_3_items_expected, ed_3_items_head)
#ed_3_items_to_string = ed_3_items.to_string()
#print(ed_3_items_to_string)
# Test numerics
numerics = ['DistanceMiles', 'AvgTicketPrice', 'FlightTimeMin']
ed_numerics = self.ed_flights()[numerics]
# just test headers
ed_numerics_describe = ed_numerics.describe()
assert ed_numerics_describe.columns.tolist() == numerics
def test_info(self): def test_info(self):
ed_flights_info_buf = io.StringIO() ed_flights_info_buf = io.StringIO()
@ -111,6 +85,8 @@ class TestDataFrameIndexing(TestData):
self.ed_flights().info(buf=ed_flights_info_buf) self.ed_flights().info(buf=ed_flights_info_buf)
self.pd_flights().info(buf=pd_flights_info_buf) self.pd_flights().info(buf=pd_flights_info_buf)
print(ed_flights_info_buf.getvalue())
ed_flights_info = (ed_flights_info_buf.getvalue().splitlines()) ed_flights_info = (ed_flights_info_buf.getvalue().splitlines())
pd_flights_info = (pd_flights_info_buf.getvalue().splitlines()) pd_flights_info = (pd_flights_info_buf.getvalue().splitlines())
@ -148,7 +124,7 @@ class TestDataFrameIndexing(TestData):
assert_series_equal(pd_flights_get_dtype_counts, ed_flights_get_dtype_counts) assert_series_equal(pd_flights_get_dtype_counts, ed_flights_get_dtype_counts)
def test_properties(self): def test_get_properties(self):
pd_flights_shape = self.pd_flights().shape pd_flights_shape = self.pd_flights().shape
ed_flights_shape = self.ed_flights().shape ed_flights_shape = self.ed_flights().shape
@ -164,3 +140,16 @@ class TestDataFrameIndexing(TestData):
assert_series_equal(pd_flights_dtypes, ed_flights_dtypes) assert_series_equal(pd_flights_dtypes, ed_flights_dtypes)
def test_index(self):
pd_flights = self.pd_flights()
pd_flights_timestamp = pd_flights.set_index('timestamp')
pd_flights.info()
pd_flights_timestamp.info()
pd_flights.info()
ed_flights = self.ed_flights()
ed_flights_timestamp = ed_flights.set_index('timestamp')
ed_flights.info()
ed_flights_timestamp.info()
ed_flights.info()

View File

@ -33,7 +33,8 @@ def _setup_data(es):
# make timestamp datetime 2018-01-01T12:09:35 # make timestamp datetime 2018-01-01T12:09:35
#values['timestamp'] = datetime.strptime(values['timestamp'], '%Y-%m-%dT%H:%M:%S') #values['timestamp'] = datetime.strptime(values['timestamp'], '%Y-%m-%dT%H:%M:%S')
action = {'_index': index_name, '_source': values} # Use integer as id field for repeatable results
action = {'_index': index_name, '_source': values, '_id': str(n)}
actions.append(action) actions.append(action)

File diff suppressed because it is too large Load Diff